Skip to content

fix: bug where known streams could be rejected #16823

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 17 additions & 9 deletions pkg/limits/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,27 +166,32 @@ func (f *Frontend) ExceedsLimits(ctx context.Context, req *logproto.ExceedsLimit
return nil, err
}

maxGlobalStreams := f.limits.MaxGlobalStreamsPerUser(req.Tenant)

var (
activeStreamsTotal uint64
tenantRateBytes float64
)
// Sum the number of active streams and rates of all responses.
for _, resp := range resps {
activeStreamsTotal += resp.Response.ActiveStreams
tenantRateBytes += float64(resp.Response.Rate)
}

f.metrics.tenantActiveStreams.WithLabelValues(req.Tenant).Set(float64(activeStreamsTotal))

var (
rejectedStreams []*logproto.RejectedStream
uniqueStreamHashes = make(map[uint64]bool)
)
// Take the intersection of unknown streams from all responses by counting
// the number of occurrences. If the number of occurrences matches the
// number of responses, we know the stream was unknown to all instances.
unknownStreams := make(map[uint64]int)
for _, resp := range resps {
for _, unknownStream := range resp.Response.UnknownStreams {
unknownStreams[unknownStream]++
}
}

tenantRateLimit := f.rateLimiter.Limit(time.Now(), req.Tenant)
if tenantRateBytes > tenantRateLimit {
rateLimitedStreams := make([]*logproto.RejectedStream, 0, len(streamHashes))
// Rate limit would be exceeded, all streams must be rejected.
for _, streamHash := range streamHashes {
rateLimitedStreams = append(rateLimitedStreams, &logproto.RejectedStream{
StreamHash: streamHash,
Expand All @@ -204,12 +209,15 @@ func (f *Frontend) ExceedsLimits(ctx context.Context, req *logproto.ExceedsLimit
}, nil
}

// Only process global limit if we're exceeding it
var rejectedStreams []*logproto.RejectedStream
// Check if max streams limit would be exceeded.
maxGlobalStreams := f.limits.MaxGlobalStreamsPerUser(req.Tenant)
if activeStreamsTotal >= uint64(maxGlobalStreams) {
for _, resp := range resps {
for _, unknownStream := range resp.Response.UnknownStreams {
if !uniqueStreamHashes[unknownStream] {
uniqueStreamHashes[unknownStream] = true
// If the stream is unknown to all instances, it must be a new
// stream.
if unknownStreams[unknownStream] == len(resps) {
rejectedStreams = append(rejectedStreams, &logproto.RejectedStream{
StreamHash: unknownStream,
Reason: ReasonExceedsMaxStreams,
Expand Down
98 changes: 61 additions & 37 deletions pkg/limits/frontend/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,29 @@ func TestFrontend_ExceedsLimits(t *testing.T) {
},
expected: nil,
}, {
name: "below the limit",
name: "no response",
exceedsLimitsRequest: &logproto.ExceedsLimitsRequest{
Tenant: "test",
Streams: []*logproto.StreamMetadata{
{StreamHash: 0x1},
},
},
getAssignedPartitionsResponses: []*logproto.GetAssignedPartitionsResponse{{
AssignedPartitions: map[int32]int64{
0: time.Now().UnixNano(),
},
}},
expectedStreamUsageRequest: []*logproto.GetStreamUsageRequest{{
Tenant: "test",
StreamHashes: []uint64{0x1},
Partitions: []int32{0},
}},
getStreamUsageResponses: []*logproto.GetStreamUsageResponse{{}},
maxGlobalStreams: 10,
ingestionRate: 100,
expected: nil,
}, {
name: "within limits",
exceedsLimitsRequest: &logproto.ExceedsLimitsRequest{
Tenant: "test",
Streams: []*logproto.StreamMetadata{
Expand All @@ -60,7 +82,7 @@ func TestFrontend_ExceedsLimits(t *testing.T) {
ingestionRate: 100,
expected: nil,
}, {
name: "exceeds limit with new streams",
name: "exceeds max streams limit, rejects new streams",
exceedsLimitsRequest: &logproto.ExceedsLimitsRequest{
Tenant: "test",
Streams: []*logproto.StreamMetadata{
Expand Down Expand Up @@ -91,7 +113,7 @@ func TestFrontend_ExceedsLimits(t *testing.T) {
{StreamHash: 0x2, Reason: ReasonExceedsMaxStreams},
},
}, {
name: "exceeds limit but allows existing streams and rejects new streams",
name: "exceeds max streams limit, allows existing streams and rejects new streams",
exceedsLimitsRequest: &logproto.ExceedsLimitsRequest{
Tenant: "test",
Streams: []*logproto.StreamMetadata{
Expand Down Expand Up @@ -127,56 +149,58 @@ func TestFrontend_ExceedsLimits(t *testing.T) {
{StreamHash: 7, Reason: ReasonExceedsMaxStreams},
},
}, {
name: "no response",
// This test checks the case where a tenant's streams are sharded over
// two instances, each holding one each stream. Each instance will
// return an response stating that it doesn't know about the other
// stream. The frontend is responsible for taking the intersection of
// the two responses and calculating the actual set of unknown streams.
name: "exceeds max streams limit, streams sharded over two instances",
exceedsLimitsRequest: &logproto.ExceedsLimitsRequest{
Tenant: "test",
Streams: []*logproto.StreamMetadata{
{StreamHash: 0x1},
{StreamHash: 0x1}, // Exceeds limits.
{StreamHash: 0x2}, // Also exceeds limits.
},
},
getAssignedPartitionsResponses: []*logproto.GetAssignedPartitionsResponse{{
AssignedPartitions: map[int32]int64{
0: time.Now().UnixNano(),
0: time.Now().UnixNano(), // Instance 0 owns partition 0.
},
}},
expectedStreamUsageRequest: []*logproto.GetStreamUsageRequest{{
Tenant: "test",
StreamHashes: []uint64{0x1},
Partitions: []int32{0},
}},
getStreamUsageResponses: []*logproto.GetStreamUsageResponse{{}},
maxGlobalStreams: 10,
ingestionRate: 100,
expected: nil, // No rejections because activeStreamsTotal is 0
}, {
name: "rate limit not exceeded",
exceedsLimitsRequest: &logproto.ExceedsLimitsRequest{
Tenant: "test",
Streams: []*logproto.StreamMetadata{
{StreamHash: 0x1},
{StreamHash: 0x2},
},
},
getAssignedPartitionsResponses: []*logproto.GetAssignedPartitionsResponse{{
}, {
AssignedPartitions: map[int32]int64{
0: time.Now().UnixNano(),
1: time.Now().UnixNano(), // Instance 1 owns partition 1.
},
}},
// The frontend will ask instance 0 for the data for partition 0,
// and instance 1 for the data for partition 1.
expectedStreamUsageRequest: []*logproto.GetStreamUsageRequest{{
Tenant: "test",
StreamHashes: []uint64{0x1, 0x2},
Partitions: []int32{0},
}, {
Tenant: "test",
StreamHashes: []uint64{0x1, 0x2},
Partitions: []int32{1},
}},
// Each instance will respond stating that it doesn't know about the
// other stream.
getStreamUsageResponses: []*logproto.GetStreamUsageResponse{{
Tenant: "test",
ActiveStreams: 2,
Rate: 50, // Below the limit of 100 bytes/sec
Tenant: "test",
ActiveStreams: 1,
Rate: 5,
UnknownStreams: []uint64{0x2},
}, {
Tenant: "test",
ActiveStreams: 1,
Rate: 5,
UnknownStreams: []uint64{0x1},
}},
maxGlobalStreams: 10,
maxGlobalStreams: 1,
ingestionRate: 100,
expected: nil,
// No streams should be rejected.
expected: nil,
}, {
name: "rate limit exceeded",
name: "exceeds rate limits, rejects all streams",
exceedsLimitsRequest: &logproto.ExceedsLimitsRequest{
Tenant: "test",
Streams: []*logproto.StreamMetadata{
Expand All @@ -201,7 +225,7 @@ func TestFrontend_ExceedsLimits(t *testing.T) {
{StreamHash: 2, Reason: ReasonExceedsRateLimit},
},
}, {
name: "rate limit exceeded with multiple instances",
name: "exceeds rate limits, rates sharded over two instances",
exceedsLimitsRequest: &logproto.ExceedsLimitsRequest{
Tenant: "test",
Streams: []*logproto.StreamMetadata{
Expand All @@ -211,11 +235,11 @@ func TestFrontend_ExceedsLimits(t *testing.T) {
},
getAssignedPartitionsResponses: []*logproto.GetAssignedPartitionsResponse{{
AssignedPartitions: map[int32]int64{
0: 1,
0: time.Now().UnixNano(),
},
}, {
AssignedPartitions: map[int32]int64{
1: 1,
1: time.Now().UnixNano(),
},
}},
getStreamUsageResponses: []*logproto.GetStreamUsageResponse{{
Expand All @@ -234,7 +258,7 @@ func TestFrontend_ExceedsLimits(t *testing.T) {
{StreamHash: 2, Reason: ReasonExceedsRateLimit},
},
}, {
name: "both global limit and rate limit exceeded",
name: "exceeds both max stream limit and rate limits",
exceedsLimitsRequest: &logproto.ExceedsLimitsRequest{
Tenant: "test",
Streams: []*logproto.StreamMetadata{
Expand Down