Skip to content

Commit 402be06

Browse files
feat: return all reasons a stream was rejected from the frontend (#16826)
1 parent d0ae843 commit 402be06

File tree

2 files changed

+36
-46
lines changed

2 files changed

+36
-46
lines changed

‎pkg/limits/frontend/frontend.go

Lines changed: 34 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -174,51 +174,31 @@ func (f *Frontend) ExceedsLimits(ctx context.Context, req *logproto.ExceedsLimit
174174

175175
var (
176176
activeStreamsTotal uint64
177-
tenantRateBytes float64
177+
rateTotal float64
178178
)
179179
// Sum the number of active streams and rates of all responses.
180180
for _, resp := range resps {
181181
activeStreamsTotal += resp.Response.ActiveStreams
182-
tenantRateBytes += float64(resp.Response.Rate)
182+
rateTotal += float64(resp.Response.Rate)
183183
}
184-
185184
f.metrics.tenantActiveStreams.WithLabelValues(req.Tenant).Set(float64(activeStreamsTotal))
186185

187-
// Take the intersection of unknown streams from all responses by counting
188-
// the number of occurrences. If the number of occurrences matches the
189-
// number of responses, we know the stream was unknown to all instances.
190-
unknownStreams := make(map[uint64]int)
191-
for _, resp := range resps {
192-
for _, unknownStream := range resp.Response.UnknownStreams {
193-
unknownStreams[unknownStream]++
194-
}
195-
}
196-
197-
tenantRateLimit := f.rateLimiter.Limit(time.Now(), req.Tenant)
198-
if tenantRateBytes > tenantRateLimit {
199-
rateLimitedStreams := make([]*logproto.RejectedStream, 0, len(streamHashes))
200-
// Rate limit would be exceeded, all streams must be rejected.
201-
for _, streamHash := range streamHashes {
202-
rateLimitedStreams = append(rateLimitedStreams, &logproto.RejectedStream{
203-
StreamHash: streamHash,
204-
Reason: ReasonExceedsRateLimit,
205-
})
206-
}
207-
208-
// Count rejections by reason
209-
f.metrics.tenantExceedsLimits.WithLabelValues(req.Tenant).Inc()
210-
f.metrics.tenantRejectedStreams.WithLabelValues(req.Tenant, ReasonExceedsRateLimit).Add(float64(len(rateLimitedStreams)))
211-
212-
return &logproto.ExceedsLimitsResponse{
213-
Tenant: req.Tenant,
214-
RejectedStreams: rateLimitedStreams,
215-
}, nil
216-
}
217-
186+
// A slice containing the rejected streams returned to the caller.
187+
// If len(rejectedStreams) == 0 then the request does not exceed limits.
218188
var rejectedStreams []*logproto.RejectedStream
189+
219190
// Check if max streams limit would be exceeded.
220191
maxGlobalStreams := f.limits.MaxGlobalStreamsPerUser(req.Tenant)
221192
if activeStreamsTotal >= uint64(maxGlobalStreams) {
193+
// Take the intersection of unknown streams from all responses by counting
194+
// the number of occurrences. If the number of occurrences matches the
195+
// number of responses, we know the stream was unknown to all instances.
196+
unknownStreams := make(map[uint64]int)
197+
for _, resp := range resps {
198+
for _, unknownStream := range resp.Response.UnknownStreams {
199+
unknownStreams[unknownStream]++
200+
}
201+
}
222202
for _, resp := range resps {
223203
for _, unknownStream := range resp.Response.UnknownStreams {
224204
// If the stream is unknown to all instances, it must be a new
@@ -232,21 +212,29 @@ func (f *Frontend) ExceedsLimits(ctx context.Context, req *logproto.ExceedsLimit
232212
}
233213
}
234214
}
215+
f.metrics.tenantRejectedStreams.WithLabelValues(
216+
req.Tenant,
217+
ReasonExceedsMaxStreams,
218+
).Add(float64(len(rejectedStreams)))
235219

236-
if len(rejectedStreams) > 0 {
237-
f.metrics.tenantExceedsLimits.WithLabelValues(req.Tenant).Inc()
238-
239-
// Count rejections by reason
240-
exceedsLimitCount := 0
241-
for _, rejected := range rejectedStreams {
242-
if rejected.Reason == ReasonExceedsMaxStreams {
243-
exceedsLimitCount++
244-
}
220+
// Check if rate limits would be exceeded.
221+
tenantRateLimit := f.rateLimiter.Limit(time.Now(), req.Tenant)
222+
if rateTotal > tenantRateLimit {
223+
// Rate limit would be exceeded, all streams must be rejected.
224+
for _, streamHash := range streamHashes {
225+
rejectedStreams = append(rejectedStreams, &logproto.RejectedStream{
226+
StreamHash: streamHash,
227+
Reason: ReasonExceedsRateLimit,
228+
})
245229
}
230+
f.metrics.tenantRejectedStreams.WithLabelValues(
231+
req.Tenant,
232+
ReasonExceedsRateLimit,
233+
).Add(float64(len(streamHashes)))
234+
}
246235

247-
if exceedsLimitCount > 0 {
248-
f.metrics.tenantRejectedStreams.WithLabelValues(req.Tenant, ReasonExceedsMaxStreams).Add(float64(exceedsLimitCount))
249-
}
236+
if len(rejectedStreams) > 0 {
237+
f.metrics.tenantExceedsLimits.WithLabelValues(req.Tenant).Inc()
250238
}
251239

252240
return &logproto.ExceedsLimitsResponse{

‎pkg/limits/frontend/frontend_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,8 @@ func TestFrontend_ExceedsLimits(t *testing.T) {
280280
maxGlobalStreams: 5,
281281
ingestionRate: 100,
282282
expected: []*logproto.RejectedStream{
283+
{StreamHash: 0x6, Reason: ReasonExceedsMaxStreams},
284+
{StreamHash: 0x7, Reason: ReasonExceedsMaxStreams},
283285
{StreamHash: 0x6, Reason: ReasonExceedsRateLimit},
284286
{StreamHash: 0x7, Reason: ReasonExceedsRateLimit},
285287
},

0 commit comments

Comments
 (0)