Skip to content

Commit 54ff5dc

Browse files
authored
feat(ingest-limits): Add partition id caching (#16964)
1 parent db59d0e commit 54ff5dc

File tree

21 files changed

+1797
-35
lines changed

21 files changed

+1797
-35
lines changed

‎docs/sources/shared/configuration.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1209,6 +1209,10 @@ ingest_limits_frontend:
12091209
# CLI flag: -ingest-limits-frontend.num-partitions
12101210
[num_partitions: <int> | default = 64]
12111211

1212+
# The TTL for the stream usage cache.
1213+
# CLI flag: -ingest-limits-frontend.partition-id-cache-ttl
1214+
[partition_id_cache_ttl: <duration> | default = 1m]
1215+
12121216
ingest_limits_frontend_client:
12131217
# Configures client gRPC connections to limits service.
12141218
# The CLI flags prefix for this block configuration is:

‎go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ require (
132132
github.com/grafana/loki/pkg/push v0.0.0-20240924133635-758364c7775f
133133
github.com/heroku/x v0.4.3
134134
github.com/influxdata/tdigest v0.0.2-0.20210216194612-fc98d27c9e8b
135+
github.com/jellydator/ttlcache/v3 v3.3.0
135136
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db
136137
github.com/ncw/swift/v2 v2.0.3
137138
github.com/parquet-go/parquet-go v0.25.0

‎go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -776,6 +776,8 @@ github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh6
776776
github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs=
777777
github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY=
778778
github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc=
779+
github.com/jellydator/ttlcache/v3 v3.3.0 h1:BdoC9cE81qXfrxeb9eoJi9dWrdhSuwXMAnHTbnBm4Wc=
780+
github.com/jellydator/ttlcache/v3 v3.3.0/go.mod h1:bj2/e0l4jRnQdrnSTaGTsh4GSXvMjQcy41i7th0GVGw=
779781
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
780782
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
781783
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=

‎pkg/limits/frontend/cache.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package frontend
2+
3+
import (
4+
"time"
5+
6+
"github.com/jellydator/ttlcache/v3"
7+
8+
"github.com/grafana/loki/v3/pkg/logproto"
9+
)
10+
11+
type PartitionConsumersCache = *ttlcache.Cache[string, logproto.GetAssignedPartitionsResponse]
12+
13+
func NewPartitionConsumerCache(ttl time.Duration) PartitionConsumersCache {
14+
return ttlcache.New(
15+
ttlcache.WithTTL[string, logproto.GetAssignedPartitionsResponse](ttl),
16+
ttlcache.WithDisableTouchOnHit[string, logproto.GetAssignedPartitionsResponse](),
17+
)
18+
}

‎pkg/limits/frontend/config.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,19 @@ import (
1313

1414
// Config contains the config for an ingest-limits-frontend.
1515
type Config struct {
16-
ClientConfig limits_client.Config `yaml:"client_config"`
17-
LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler,omitempty"`
18-
RecheckPeriod time.Duration `yaml:"recheck_period"`
19-
NumPartitions int `yaml:"num_partitions"`
16+
ClientConfig limits_client.Config `yaml:"client_config"`
17+
LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler,omitempty"`
18+
RecheckPeriod time.Duration `yaml:"recheck_period"`
19+
NumPartitions int `yaml:"num_partitions"`
20+
PartitionIDCacheTTL time.Duration `yaml:"partition_id_cache_ttl"`
2021
}
2122

2223
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
2324
cfg.ClientConfig.RegisterFlagsWithPrefix("ingest-limits-frontend", f)
2425
cfg.LifecyclerConfig.RegisterFlagsWithPrefix("ingest-limits-frontend.", f, util_log.Logger)
2526
f.DurationVar(&cfg.RecheckPeriod, "ingest-limits-frontend.recheck-period", 10*time.Second, "The period to recheck per tenant ingestion rate limit configuration.")
2627
f.IntVar(&cfg.NumPartitions, "ingest-limits-frontend.num-partitions", 64, "The number of partitions to use for the ring.")
28+
f.DurationVar(&cfg.PartitionIDCacheTTL, "ingest-limits-frontend.partition-id-cache-ttl", 1*time.Minute, "The TTL for the stream usage cache.")
2729
}
2830

2931
func (cfg *Config) Validate() error {

‎pkg/limits/frontend/frontend.go

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,11 @@ type Frontend struct {
5858
cfg Config
5959
logger log.Logger
6060

61-
limits Limits
62-
rateLimiter *limiter.RateLimiter
63-
streamUsage StreamUsageGatherer
64-
metrics *metrics
61+
limits Limits
62+
rateLimiter *limiter.RateLimiter
63+
streamUsage StreamUsageGatherer
64+
partitionIDCache PartitionConsumersCache
65+
metrics *metrics
6566

6667
subservices *services.Manager
6768
subservicesWatcher *services.FailureWatcher
@@ -84,15 +85,17 @@ func New(cfg Config, ringName string, limitsRing ring.ReadRing, limits Limits, l
8485
)
8586

8687
rateLimiter := limiter.NewRateLimiter(newRateLimitsAdapter(limits), cfg.RecheckPeriod)
87-
streamUsage := NewRingStreamUsageGatherer(limitsRing, clientPool, logger, cfg.NumPartitions)
88+
partitionIDCache := NewPartitionConsumerCache(cfg.PartitionIDCacheTTL)
89+
streamUsage := NewRingStreamUsageGatherer(limitsRing, clientPool, logger, partitionIDCache, cfg.NumPartitions)
8890

8991
f := &Frontend{
90-
cfg: cfg,
91-
logger: logger,
92-
limits: limits,
93-
rateLimiter: rateLimiter,
94-
streamUsage: streamUsage,
95-
metrics: newMetrics(reg),
92+
cfg: cfg,
93+
logger: logger,
94+
limits: limits,
95+
rateLimiter: rateLimiter,
96+
streamUsage: streamUsage,
97+
partitionIDCache: partitionIDCache,
98+
metrics: newMetrics(reg),
9699
}
97100

98101
lifecycler, err := ring.NewLifecycler(cfg.LifecyclerConfig, f, RingName, RingKey, true, logger, reg)
@@ -135,6 +138,8 @@ func (f *Frontend) starting(ctx context.Context) (err error) {
135138
return fmt.Errorf("failed to start subservices: %w", err)
136139
}
137140

141+
go f.partitionIDCache.Start()
142+
138143
return nil
139144
}
140145

@@ -150,6 +155,7 @@ func (f *Frontend) running(ctx context.Context) error {
150155

151156
// stopping implements services.Service.
152157
func (f *Frontend) stopping(_ error) error {
158+
f.partitionIDCache.Stop()
153159
return services.StopManagerAndAwaitStopped(context.Background(), f.subservices)
154160
}
155161

‎pkg/limits/frontend/frontend_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -305,11 +305,12 @@ func TestFrontend_ExceedsLimits(t *testing.T) {
305305
ingestionRate: test.ingestionRate,
306306
}
307307
rl := limiter.NewRateLimiter(newRateLimitsAdapter(l), 10*time.Second)
308+
cache := NewPartitionConsumerCache(1 * time.Millisecond)
308309

309310
f := Frontend{
310311
limits: l,
311312
rateLimiter: rl,
312-
streamUsage: NewRingStreamUsageGatherer(readRing, clientPool, log.NewNopLogger(), 2),
313+
streamUsage: NewRingStreamUsageGatherer(readRing, clientPool, log.NewNopLogger(), cache, 2),
313314
metrics: newMetrics(prometheus.NewRegistry()),
314315
}
315316

‎pkg/limits/frontend/http.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package frontend
33
import (
44
"encoding/json"
55
"net/http"
6+
"text/template"
67

78
"github.com/go-kit/log/level"
89
"github.com/grafana/dskit/user"
@@ -11,6 +12,46 @@ import (
1112
"github.com/grafana/loki/v3/pkg/util"
1213
)
1314

15+
const ringStreamUsageTemplate = `
16+
<!DOCTYPE html>
17+
<html>
18+
<head>
19+
<title>Ring Stream Usage Cache</title>
20+
<style>
21+
table { border-collapse: collapse; width: 100%; }
22+
th, td { border: 1px solid #ddd; padding: 8px; text-align: left; }
23+
th { background-color: #f2f2f2; }
24+
form { display: inline; }
25+
button { padding: 5px 10px; margin: 2px; }
26+
</style>
27+
</head>
28+
<body>
29+
<h1>Ring Stream Usage Cache</h1>
30+
<form method="POST">
31+
<button type="submit">Clear All Cache</button>
32+
</form>
33+
<table>
34+
<tr>
35+
<th>Instance Address</th>
36+
<th>Partitions</th>
37+
<th>Actions</th>
38+
</tr>
39+
{{range $addr, $partitions := .Entries}}
40+
<tr>
41+
<td>{{$addr}}</td>
42+
<td>{{range $i, $p := $partitions}}{{if $i}}, {{end}}{{$p}}{{end}}</td>
43+
<td>
44+
<form method="POST">
45+
<input type="hidden" name="instance" value="{{$addr}}">
46+
<button type="submit">Clear Cache</button>
47+
</form>
48+
</td>
49+
</tr>
50+
{{end}}
51+
</table>
52+
</body>
53+
</html>`
54+
1455
type httpExceedsLimitsRequest struct {
1556
TenantID string `json:"tenantID"`
1657
StreamHashes []uint64 `json:"streamHashes"`
@@ -61,3 +102,46 @@ func (f *Frontend) ServeHTTP(w http.ResponseWriter, r *http.Request) {
61102
Results: resp.Results,
62103
})
63104
}
105+
106+
// PartitionConsumersCacheHandler handles the GET request to display the cache.
107+
func (f *Frontend) PartitionConsumersCacheHandler(w http.ResponseWriter, _ *http.Request) {
108+
data := struct {
109+
Entries map[string][]int32
110+
}{
111+
Entries: make(map[string][]int32),
112+
}
113+
114+
for addr, entry := range f.partitionIDCache.Items() {
115+
assignedPartitions := entry.Value().AssignedPartitions
116+
for partition := range assignedPartitions {
117+
data.Entries[addr] = append(data.Entries[addr], partition)
118+
}
119+
}
120+
121+
w.Header().Set("Content-Type", "text/html; charset=utf-8")
122+
tmpl := template.Must(template.New("cache").Parse(ringStreamUsageTemplate))
123+
if err := tmpl.Execute(w, data); err != nil {
124+
http.Error(w, "Failed to render template", http.StatusInternalServerError)
125+
return
126+
}
127+
}
128+
129+
// PartitionConsumersCacheEvictHandler handles the POST request to clear the cache.
130+
func (f *Frontend) PartitionConsumersCacheEvictHandler(w http.ResponseWriter, r *http.Request) {
131+
if err := r.ParseForm(); err != nil {
132+
http.Error(w, "Failed to parse form", http.StatusBadRequest)
133+
return
134+
}
135+
136+
instance := r.FormValue("instance")
137+
if instance == "" {
138+
// Clear all cache
139+
f.partitionIDCache.DeleteAll()
140+
} else {
141+
// Clear specific instance
142+
f.partitionIDCache.Delete(instance)
143+
}
144+
145+
// Redirect back to the GET page
146+
http.Redirect(w, r, r.URL.Path, http.StatusSeeOther)
147+
}

0 commit comments

Comments
 (0)