Skip to content

feat: return all reasons a stream was rejected from the frontend #16826

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 34 additions & 46 deletions pkg/limits/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,51 +174,31 @@ func (f *Frontend) ExceedsLimits(ctx context.Context, req *logproto.ExceedsLimit

var (
activeStreamsTotal uint64
tenantRateBytes float64
rateTotal float64
)
// Sum the number of active streams and rates of all responses.
for _, resp := range resps {
activeStreamsTotal += resp.Response.ActiveStreams
tenantRateBytes += float64(resp.Response.Rate)
rateTotal += float64(resp.Response.Rate)
}

f.metrics.tenantActiveStreams.WithLabelValues(req.Tenant).Set(float64(activeStreamsTotal))

// Take the intersection of unknown streams from all responses by counting
// the number of occurrences. If the number of occurrences matches the
// number of responses, we know the stream was unknown to all instances.
unknownStreams := make(map[uint64]int)
for _, resp := range resps {
for _, unknownStream := range resp.Response.UnknownStreams {
unknownStreams[unknownStream]++
}
}

tenantRateLimit := f.rateLimiter.Limit(time.Now(), req.Tenant)
if tenantRateBytes > tenantRateLimit {
rateLimitedStreams := make([]*logproto.RejectedStream, 0, len(streamHashes))
// Rate limit would be exceeded, all streams must be rejected.
for _, streamHash := range streamHashes {
rateLimitedStreams = append(rateLimitedStreams, &logproto.RejectedStream{
StreamHash: streamHash,
Reason: ReasonExceedsRateLimit,
})
}

// Count rejections by reason
f.metrics.tenantExceedsLimits.WithLabelValues(req.Tenant).Inc()
f.metrics.tenantRejectedStreams.WithLabelValues(req.Tenant, ReasonExceedsRateLimit).Add(float64(len(rateLimitedStreams)))

return &logproto.ExceedsLimitsResponse{
Tenant: req.Tenant,
RejectedStreams: rateLimitedStreams,
}, nil
}

// A slice containing the rejected streams returned to the caller.
// If len(rejectedStreams) == 0 then the request does not exceed limits.
var rejectedStreams []*logproto.RejectedStream

// Check if max streams limit would be exceeded.
maxGlobalStreams := f.limits.MaxGlobalStreamsPerUser(req.Tenant)
if activeStreamsTotal >= uint64(maxGlobalStreams) {
// Take the intersection of unknown streams from all responses by counting
// the number of occurrences. If the number of occurrences matches the
// number of responses, we know the stream was unknown to all instances.
unknownStreams := make(map[uint64]int)
for _, resp := range resps {
for _, unknownStream := range resp.Response.UnknownStreams {
unknownStreams[unknownStream]++
}
}
for _, resp := range resps {
for _, unknownStream := range resp.Response.UnknownStreams {
// If the stream is unknown to all instances, it must be a new
Expand All @@ -232,21 +212,29 @@ func (f *Frontend) ExceedsLimits(ctx context.Context, req *logproto.ExceedsLimit
}
}
}
f.metrics.tenantRejectedStreams.WithLabelValues(
req.Tenant,
ReasonExceedsMaxStreams,
).Add(float64(len(rejectedStreams)))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's OK to add 0 here I think, and avoids the need for another if statement.


if len(rejectedStreams) > 0 {
f.metrics.tenantExceedsLimits.WithLabelValues(req.Tenant).Inc()

// Count rejections by reason
exceedsLimitCount := 0
for _, rejected := range rejectedStreams {
if rejected.Reason == ReasonExceedsMaxStreams {
exceedsLimitCount++
}
// Check if rate limits would be exceeded.
tenantRateLimit := f.rateLimiter.Limit(time.Now(), req.Tenant)
if rateTotal > tenantRateLimit {
// Rate limit would be exceeded, all streams must be rejected.
for _, streamHash := range streamHashes {
rejectedStreams = append(rejectedStreams, &logproto.RejectedStream{
StreamHash: streamHash,
Reason: ReasonExceedsRateLimit,
})
}
f.metrics.tenantRejectedStreams.WithLabelValues(
req.Tenant,
ReasonExceedsRateLimit,
).Add(float64(len(streamHashes)))
}

if exceedsLimitCount > 0 {
f.metrics.tenantRejectedStreams.WithLabelValues(req.Tenant, ReasonExceedsMaxStreams).Add(float64(exceedsLimitCount))
}
if len(rejectedStreams) > 0 {
f.metrics.tenantExceedsLimits.WithLabelValues(req.Tenant).Inc()
}

return &logproto.ExceedsLimitsResponse{
Expand Down
2 changes: 2 additions & 0 deletions pkg/limits/frontend/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,8 @@ func TestFrontend_ExceedsLimits(t *testing.T) {
maxGlobalStreams: 5,
ingestionRate: 100,
expected: []*logproto.RejectedStream{
{StreamHash: 0x6, Reason: ReasonExceedsMaxStreams},
{StreamHash: 0x7, Reason: ReasonExceedsMaxStreams},
{StreamHash: 0x6, Reason: ReasonExceedsRateLimit},
{StreamHash: 0x7, Reason: ReasonExceedsRateLimit},
},
Expand Down