Skip to content

fix(ingest-limits): Use stripe locking for metadata #17150

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Apr 25, 2025
27 changes: 12 additions & 15 deletions pkg/limits/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,6 @@ type httpTenantLimitsResponse struct {
// ServeHTTP implements the http.Handler interface.
// It returns the current stream counts and status per tenant as a JSON response.
func (s *IngestLimits) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// TODO(grobinson): Avoid acquiring the mutex for the entire duration
// of the request.
s.mtx.RLock()
defer s.mtx.RUnlock()

tenant := mux.Vars(r)["tenant"]
if tenant == "" {
http.Error(w, "invalid tenant", http.StatusBadRequest)
Expand All @@ -43,20 +38,22 @@ func (s *IngestLimits) ServeHTTP(w http.ResponseWriter, r *http.Request) {
response httpTenantLimitsResponse
)

for _, partitions := range s.metadata[tenant] {
for _, stream := range partitions {
if stream.lastSeenAt >= cutoff {
activeStreams++
s.metadata.All(func(tenantID string, _ int32, stream Stream) {
if tenantID != tenant {
return
}

if stream.LastSeenAt >= cutoff {
activeStreams++

// Calculate size only within the rate window
for _, bucket := range stream.rateBuckets {
if bucket.timestamp >= rateWindowCutoff {
totalSize += bucket.size
}
// Calculate size only within the rate window
for _, bucket := range stream.RateBuckets {
if bucket.Timestamp >= rateWindowCutoff {
totalSize += bucket.Size
}
}
}
}
})

// Calculate rate using only data from within the rate window
calculatedRate := float64(totalSize) / s.cfg.WindowSize.Seconds()
Expand Down
32 changes: 21 additions & 11 deletions pkg/limits/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,30 @@ func TestIngestLimits_ServeHTTP(t *testing.T) {
RateWindow: time.Minute,
BucketDuration: 30 * time.Second,
},
metadata: map[string]map[int32][]streamMetadata{
"tenant": {
0: {{
hash: 0x1,
totalSize: 100,
rateBuckets: []rateBucket{{
timestamp: time.Now().UnixNano(),
size: 1,
}},
lastSeenAt: time.Now().UnixNano(),
}},
metadata: &streamMetadata{
stripes: []map[string]map[int32][]Stream{
{
"tenant": {
0: {{
Hash: 0x1,
TotalSize: 100,
RateBuckets: []RateBucket{{
Timestamp: time.Now().UnixNano(),
Size: 1,
}},
LastSeenAt: time.Now().UnixNano(),
}},
},
},
},
locks: make([]stripeLock, 1),
},
logger: log.NewNopLogger(),
partitionManager: &PartitionManager{
partitions: map[int32]int64{
0: time.Now().UnixNano(),
},
},
}

// Set up a mux router for the test server otherwise mux.Vars() won't work.
Expand Down
Loading
Loading