Skip to content

Commit 3a3df62

Browse files
authored
feat(ingest-limits): Implement global tenant rate limiting (#16727)
1 parent 8429b0f commit 3a3df62

File tree

11 files changed

+1526
-432
lines changed

11 files changed

+1526
-432
lines changed

‎docs/sources/shared/configuration.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -883,6 +883,16 @@ ingest_limits:
883883
# CLI flag: -ingest-limits.window-size
884884
[window_size: <duration> | default = 1h]
885885

886+
# The time window for rate calculation. This should match the window used in
887+
# Prometheus rate() queries for consistency.
888+
# CLI flag: -ingest-limits.rate-window
889+
[rate_window: <duration> | default = 5m]
890+
891+
# The granularity of time buckets used for sliding window rate calculation.
892+
# Smaller buckets provide more precise rate tracking but require more memory.
893+
# CLI flag: -ingest-limits.bucket-duration
894+
[bucket_duration: <duration> | default = 1m]
895+
886896
lifecycler:
887897
ring:
888898
kvstore:
@@ -1186,6 +1196,10 @@ ingest_limits_frontend:
11861196
# CLI flag: -ingest-limits-frontend.lifecycler.ID
11871197
[id: <string> | default = "<hostname>"]
11881198

1199+
# The period to recheck per tenant ingestion rate limit configuration.
1200+
# CLI flag: -ingest-limits-frontend.recheck-period
1201+
[recheck_period: <duration> | default = 10s]
1202+
11891203
ingest_limits_frontend_client:
11901204
# Configures client gRPC connections to limits service.
11911205
# The CLI flags prefix for this block configuration is:

‎pkg/distributor/distributor.go

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1161,21 +1161,27 @@ func (d *Distributor) exceedsLimits(ctx context.Context, tenantID string, stream
11611161
// limits-frontend. The limits-frontend is responsible for deciding if
11621162
// the request would exceed the tenants limits, and if so, which streams
11631163
// from the request caused it to exceed its limits.
1164-
streamHashes := make([]*logproto.StreamMetadata, 0, len(streams))
1164+
streamMetadata := make([]*logproto.StreamMetadata, 0, len(streams))
11651165
for _, stream := range streams {
11661166
// Add the stream hash to FNV-1.
11671167
buf := make([]byte, binary.MaxVarintLen64)
11681168
binary.PutUvarint(buf, stream.HashKeyNoShard)
11691169
_, _ = h.Write(buf)
1170+
1171+
// Calculate the size of the stream.
1172+
entriesSize, structuredMetadataSize := calculateStreamSizes(stream.Stream)
1173+
11701174
// Add the stream hash to the request. This is sent to limits-frontend.
1171-
streamHashes = append(streamHashes, &logproto.StreamMetadata{
1172-
StreamHash: stream.HashKeyNoShard,
1175+
streamMetadata = append(streamMetadata, &logproto.StreamMetadata{
1176+
StreamHash: stream.HashKeyNoShard,
1177+
EntriesSize: entriesSize,
1178+
StructuredMetadataSize: structuredMetadataSize,
11731179
})
11741180
}
11751181

11761182
req := logproto.ExceedsLimitsRequest{
11771183
Tenant: tenantID,
1178-
Streams: streamHashes,
1184+
Streams: streamMetadata,
11791185
}
11801186

11811187
// Get the limits-frontend instances from the ring.
@@ -1275,6 +1281,8 @@ func (d *Distributor) sendStreamToKafka(ctx context.Context, stream KeyedStream,
12751281
return fmt.Errorf("failed to marshal write request to records: %w", err)
12761282
}
12771283

1284+
entriesSize, structuredMetadataSize := calculateStreamSizes(stream.Stream)
1285+
12781286
// However, unlike stream records, the distributor writes stream metadata
12791287
// records to one of a fixed number of partitions, the size of which is
12801288
// determined ahead of time. It does not use a ring. The reason for this
@@ -1286,6 +1294,8 @@ func (d *Distributor) sendStreamToKafka(ctx context.Context, stream KeyedStream,
12861294
d.cfg.KafkaConfig.Topic,
12871295
tenant,
12881296
stream.HashKeyNoShard,
1297+
entriesSize,
1298+
structuredMetadataSize,
12891299
)
12901300
if err != nil {
12911301
return fmt.Errorf("failed to marshal metadata: %w", err)
@@ -1403,6 +1413,15 @@ func calculateShards(rate int64, pushSize, desiredRate int) int {
14031413
return int(math.Ceil(shards))
14041414
}
14051415

1416+
func calculateStreamSizes(stream logproto.Stream) (uint64, uint64) {
1417+
var entriesSize, structuredMetadataSize uint64
1418+
for _, entry := range stream.Entries {
1419+
entriesSize += uint64(len(entry.Line))
1420+
structuredMetadataSize += uint64(util.StructuredMetadataSize(entry.StructuredMetadata))
1421+
}
1422+
return entriesSize, structuredMetadataSize
1423+
}
1424+
14061425
// newRingAndLifecycler creates a new distributor ring and lifecycler with all required lifecycler delegates
14071426
func newRingAndLifecycler(cfg RingConfig, instanceCount *atomic.Uint32, logger log.Logger, reg prometheus.Registerer, metricsNamespace string) (*ring.Ring, *ring.BasicLifecycler, error) {
14081427
kvStore, err := kv.NewClient(cfg.KVStore, ring.GetCodec(), kv.RegistererWithKVName(reg, "distributor-lifecycler"), logger)

‎pkg/kafka/encoding.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -197,14 +197,16 @@ func sovPush(x uint64) (n int) {
197197

198198
// EncodeStreamMetadata encodes the stream metadata into a Kafka record
199199
// using the tenantID as the key and partition as the target partition
200-
func EncodeStreamMetadata(partition int32, topic string, tenantID string, streamHash uint64) (*kgo.Record, error) {
200+
func EncodeStreamMetadata(partition int32, topic, tenantID string, streamHash, entriesSize, structuredMetadataSize uint64) (*kgo.Record, error) {
201201
// Validate stream hash
202202
if streamHash == 0 {
203203
return nil, fmt.Errorf("invalid stream hash '%d'", streamHash)
204204
}
205205

206206
metadata := logproto.StreamMetadata{
207-
StreamHash: streamHash,
207+
StreamHash: streamHash,
208+
EntriesSize: entriesSize,
209+
StructuredMetadataSize: structuredMetadataSize,
208210
}
209211

210212
// Encode the metadata into a byte slice

‎pkg/kafka/encoding_test.go

Lines changed: 37 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -153,35 +153,51 @@ func generateRandomString(length int) string {
153153

154154
func TestEncodeDecodeStreamMetadata(t *testing.T) {
155155
tests := []struct {
156-
name string
157-
hash uint64
158-
partition int32
159-
topic string
160-
tenantID string
161-
expectErr bool
156+
name string
157+
hash uint64
158+
partition int32
159+
topic string
160+
tenantID string
161+
entriesSize uint64
162+
structuredMetadataSize uint64
163+
expectErr bool
162164
}{
163165
{
164-
name: "Valid metadata",
165-
hash: 12345,
166-
partition: 1,
167-
topic: "logs",
168-
tenantID: "tenant-1",
169-
expectErr: false,
166+
name: "Valid metadata",
167+
hash: 12345,
168+
partition: 1,
169+
topic: "logs",
170+
tenantID: "tenant-1",
171+
entriesSize: 1024,
172+
structuredMetadataSize: 512,
173+
expectErr: false,
170174
},
171175
{
172-
name: "Zero hash - should error",
173-
hash: 0,
174-
partition: 3,
175-
topic: "traces",
176-
tenantID: "tenant-3",
177-
expectErr: true,
176+
name: "Valid metadata with zero sizes",
177+
hash: 67890,
178+
partition: 2,
179+
topic: "metrics",
180+
tenantID: "tenant-2",
181+
entriesSize: 0,
182+
structuredMetadataSize: 0,
183+
expectErr: false,
184+
},
185+
{
186+
name: "Zero hash - should error",
187+
hash: 0,
188+
partition: 3,
189+
topic: "traces",
190+
tenantID: "tenant-3",
191+
entriesSize: 2048,
192+
structuredMetadataSize: 1024,
193+
expectErr: true,
178194
},
179195
}
180196

181197
for _, tt := range tests {
182198
t.Run(tt.name, func(t *testing.T) {
183199
// Encode metadata
184-
record, err := EncodeStreamMetadata(tt.partition, tt.topic, tt.tenantID, tt.hash)
200+
record, err := EncodeStreamMetadata(tt.partition, tt.topic, tt.tenantID, tt.hash, tt.entriesSize, tt.structuredMetadataSize)
185201
if tt.expectErr {
186202
require.Error(t, err)
187203
require.Nil(t, record)
@@ -201,6 +217,8 @@ func TestEncodeDecodeStreamMetadata(t *testing.T) {
201217

202218
// Verify decoded values
203219
require.Equal(t, tt.hash, metadata.StreamHash)
220+
require.Equal(t, tt.entriesSize, metadata.EntriesSize)
221+
require.Equal(t, tt.structuredMetadataSize, metadata.StructuredMetadataSize)
204222
})
205223
}
206224

‎pkg/limits/frontend/frontend.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,11 @@ import (
1111
"flag"
1212
"fmt"
1313
"net/http"
14+
"time"
1415

1516
"github.com/go-kit/log"
1617
"github.com/go-kit/log/level"
18+
"github.com/grafana/dskit/limiter"
1719
"github.com/grafana/dskit/ring"
1820
"github.com/grafana/dskit/services"
1921
"github.com/grafana/dskit/user"
@@ -34,11 +36,13 @@ const (
3436
type Config struct {
3537
ClientConfig limits_client.Config `yaml:"client_config"`
3638
LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler,omitempty"`
39+
RecheckPeriod time.Duration `yaml:"recheck_period"`
3740
}
3841

3942
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
4043
cfg.ClientConfig.RegisterFlagsWithPrefix("ingest-limits-frontend", f)
4144
cfg.LifecyclerConfig.RegisterFlagsWithPrefix("ingest-limits-frontend.", f, util_log.Logger)
45+
f.DurationVar(&cfg.RecheckPeriod, "ingest-limits-frontend.recheck-period", 10*time.Second, "The period to recheck per tenant ingestion rate limit configuration.")
4246
}
4347

4448
func (cfg *Config) Validate() error {
@@ -71,7 +75,8 @@ func New(cfg Config, ringName string, limitsRing ring.ReadRing, limits Limits, l
7175

7276
factory := limits_client.NewPoolFactory(cfg.ClientConfig)
7377
pool := limits_client.NewPool(ringName, cfg.ClientConfig.PoolConfig, limitsRing, factory, logger)
74-
limitsSrv := NewRingIngestLimitsService(limitsRing, pool, limits, logger, reg)
78+
rateLimiter := limiter.NewRateLimiter(newIngestionRateStrategy(limits), cfg.RecheckPeriod)
79+
limitsSrv := NewRingIngestLimitsService(limitsRing, pool, limits, rateLimiter, logger, reg)
7580

7681
f := &Frontend{
7782
cfg: cfg,

0 commit comments

Comments
 (0)