@@ -2,8 +2,10 @@ package distributor
2
2
3
3
import (
4
4
"context"
5
+ "encoding/binary"
5
6
"flag"
6
7
"fmt"
8
+ "hash/fnv"
7
9
"math"
8
10
"net/http"
9
11
"runtime/pprof"
@@ -44,9 +46,11 @@ import (
44
46
"github.com/grafana/loki/v3/pkg/distributor/shardstreams"
45
47
"github.com/grafana/loki/v3/pkg/distributor/writefailures"
46
48
"github.com/grafana/loki/v3/pkg/ingester"
47
- "github.com/grafana/loki/v3/pkg/ingester/client"
49
+ ingester_client "github.com/grafana/loki/v3/pkg/ingester/client"
48
50
"github.com/grafana/loki/v3/pkg/kafka"
49
51
kafka_client "github.com/grafana/loki/v3/pkg/kafka/client"
52
+ limits_frontend "github.com/grafana/loki/v3/pkg/limits/frontend"
53
+ limits_frontend_client "github.com/grafana/loki/v3/pkg/limits/frontend/client"
50
54
"github.com/grafana/loki/v3/pkg/loghttp/push"
51
55
"github.com/grafana/loki/v3/pkg/logproto"
52
56
"github.com/grafana/loki/v3/pkg/logql/syntax"
@@ -96,9 +100,11 @@ type Config struct {
96
100
97
101
OTLPConfig push.GlobalOTLPConfig `yaml:"otlp_config"`
98
102
99
- KafkaEnabled bool `yaml:"kafka_writes_enabled"`
100
- IngesterEnabled bool `yaml:"ingester_writes_enabled"`
101
- KafkaConfig kafka.Config `yaml:"-"`
103
+ KafkaEnabled bool `yaml:"kafka_writes_enabled"`
104
+ IngesterEnabled bool `yaml:"ingester_writes_enabled"`
105
+ IngestLimitsEnabled bool `yaml:"ingest_limits_enabled"`
106
+
107
+ KafkaConfig kafka.Config `yaml:"-"`
102
108
103
109
// TODO: cleanup config
104
110
TenantTopic TenantTopicConfig `yaml:"tenant_topic" category:"experimental"`
@@ -114,6 +120,7 @@ func (cfg *Config) RegisterFlags(fs *flag.FlagSet) {
114
120
fs .IntVar (& cfg .PushWorkerCount , "distributor.push-worker-count" , 256 , "Number of workers to push batches to ingesters." )
115
121
fs .BoolVar (& cfg .KafkaEnabled , "distributor.kafka-writes-enabled" , false , "Enable writes to Kafka during Push requests." )
116
122
fs .BoolVar (& cfg .IngesterEnabled , "distributor.ingester-writes-enabled" , true , "Enable writes to Ingesters during Push requests. Defaults to true." )
123
+ fs .BoolVar (& cfg .IngestLimitsEnabled , "distributor.ingest-limits-enabled" , false , "Enable checking limits against the ingest-limits service. Defaults to false." )
117
124
}
118
125
119
126
func (cfg * Config ) Validate () error {
@@ -143,12 +150,12 @@ type Distributor struct {
143
150
cfg Config
144
151
ingesterCfg ingester.Config
145
152
logger log.Logger
146
- clientCfg client .Config
153
+ clientCfg ingester_client .Config
147
154
tenantConfigs * runtime.TenantConfigs
148
155
tenantsRetention * retention.TenantsRetention
149
156
ingestersRing ring.ReadRing
150
157
validator * Validator
151
- pool * ring_client.Pool
158
+ ingesterClients * ring_client.Pool
152
159
tee Tee
153
160
154
161
rateStore RateStore
@@ -184,10 +191,20 @@ type Distributor struct {
184
191
ingesterTasks chan pushIngesterTask
185
192
ingesterTaskWg sync.WaitGroup
186
193
194
+ // Will succeed usage tracker in future.
195
+ limitsFrontendRing ring.ReadRing
196
+ limitsFrontends * ring_client.Pool
197
+
187
198
// kafka
188
199
kafkaWriter KafkaProducer
189
200
partitionRing ring.PartitionRingReader
190
201
202
+ // The number of partitions for the stream metadata topic. Unlike stream
203
+ // records, where entries are sharded over just the active partitions,
204
+ // stream metadata is sharded over all partitions, and all partitions
205
+ // are consumed.
206
+ numMetadataPartitions int
207
+
191
208
// kafka metrics
192
209
kafkaAppends * prometheus.CounterVec
193
210
kafkaWriteBytesTotal prometheus.Counter
@@ -199,7 +216,7 @@ type Distributor struct {
199
216
func New (
200
217
cfg Config ,
201
218
ingesterCfg ingester.Config ,
202
- clientCfg client .Config ,
219
+ clientCfg ingester_client .Config ,
203
220
configs * runtime.TenantConfigs ,
204
221
ingestersRing ring.ReadRing ,
205
222
partitionRing ring.PartitionRingReader ,
@@ -208,26 +225,31 @@ func New(
208
225
metricsNamespace string ,
209
226
tee Tee ,
210
227
usageTracker push.UsageTracker ,
228
+ limitsFrontendCfg limits_frontend_client.Config ,
229
+ limitsFrontendRing ring.ReadRing ,
230
+ numMetadataPartitions int ,
211
231
logger log.Logger ,
212
232
) (* Distributor , error ) {
213
- factory := cfg .factory
214
- if factory == nil {
215
- factory = ring_client .PoolAddrFunc (func (addr string ) (ring_client.PoolClient , error ) {
216
- return client .New (clientCfg , addr )
233
+ ingesterClientFactory := cfg .factory
234
+ if ingesterClientFactory == nil {
235
+ ingesterClientFactory = ring_client .PoolAddrFunc (func (addr string ) (ring_client.PoolClient , error ) {
236
+ return ingester_client .New (clientCfg , addr )
217
237
})
218
238
}
219
239
220
- internalFactory := func (addr string ) (ring_client.PoolClient , error ) {
240
+ internalIngesterClientFactory := func (addr string ) (ring_client.PoolClient , error ) {
221
241
internalCfg := clientCfg
222
242
internalCfg .Internal = true
223
- return client .New (internalCfg , addr )
243
+ return ingester_client .New (internalCfg , addr )
224
244
}
225
245
226
246
validator , err := NewValidator (overrides , usageTracker )
227
247
if err != nil {
228
248
return nil , err
229
249
}
230
250
251
+ limitsFrontendClientFactory := limits_frontend_client .NewPoolFactory (limitsFrontendCfg )
252
+
231
253
// Create the configured ingestion rate limit strategy (local or global).
232
254
var ingestionRateStrategy limiter.RateLimiterStrategy
233
255
var distributorsLifecycler * ring.BasicLifecycler
@@ -274,7 +296,7 @@ func New(
274
296
tenantsRetention : retention .NewTenantsRetention (overrides ),
275
297
ingestersRing : ingestersRing ,
276
298
validator : validator ,
277
- pool : clientpool .NewPool ("ingester" , clientCfg .PoolConfig , ingestersRing , factory , logger , metricsNamespace ),
299
+ ingesterClients : clientpool .NewPool ("ingester" , clientCfg .PoolConfig , ingestersRing , ingesterClientFactory , logger , metricsNamespace ),
278
300
labelCache : labelCache ,
279
301
shardTracker : NewShardTracker (),
280
302
healthyInstancesCount : atomic .NewUint32 (0 ),
@@ -335,6 +357,15 @@ func New(
335
357
writeFailuresManager : writefailures .NewManager (logger , registerer , cfg .WriteFailuresLogging , configs , "distributor" ),
336
358
kafkaWriter : kafkaWriter ,
337
359
partitionRing : partitionRing ,
360
+ limitsFrontendRing : limitsFrontendRing ,
361
+ limitsFrontends : limits_frontend_client .NewPool (
362
+ limits_frontend .RingName ,
363
+ limitsFrontendCfg .PoolConfig ,
364
+ limitsFrontendRing ,
365
+ limitsFrontendClientFactory ,
366
+ logger ,
367
+ ),
368
+ numMetadataPartitions : numMetadataPartitions ,
338
369
}
339
370
340
371
if overrides .IngestionRateStrategy () == validation .GlobalIngestionRateStrategy {
@@ -366,7 +397,7 @@ func New(
366
397
"rate-store" ,
367
398
clientCfg .PoolConfig ,
368
399
ingestersRing ,
369
- ring_client .PoolAddrFunc (internalFactory ),
400
+ ring_client .PoolAddrFunc (internalIngesterClientFactory ),
370
401
logger ,
371
402
metricsNamespace ,
372
403
),
@@ -375,7 +406,7 @@ func New(
375
406
)
376
407
d .rateStore = rs
377
408
378
- servs = append (servs , d .pool , rs )
409
+ servs = append (servs , d .ingesterClients , rs )
379
410
d .subservices , err = services .NewManager (servs ... )
380
411
if err != nil {
381
412
return nil , errors .Wrap (err , "services manager" )
@@ -417,8 +448,9 @@ func (d *Distributor) stopping(_ error) error {
417
448
}
418
449
419
450
type KeyedStream struct {
420
- HashKey uint32
421
- Stream logproto.Stream
451
+ HashKey uint32
452
+ HashKeyNoShard uint64
453
+ Stream logproto.Stream
422
454
}
423
455
424
456
// TODO taken from Cortex, see if we can refactor out an usable interface.
@@ -474,6 +506,17 @@ func (d *Distributor) PushWithResolver(ctx context.Context, req *logproto.PushRe
474
506
return & logproto.PushResponse {}, httpgrpc .Errorf (http .StatusUnprocessableEntity , validation .MissingStreamsErrorMsg )
475
507
}
476
508
509
+ if d .cfg .IngestLimitsEnabled {
510
+ exceedsLimits , err := d .exceedsLimits (ctx , tenantID , req .Streams )
511
+ if err != nil {
512
+ level .Error (d .logger ).Log ("msg" , "failed to check if request exceeds limits, request has been accepted" , "err" , err )
513
+ } else if len (exceedsLimits .RejectedStreams ) > 0 {
514
+ level .Error (d .logger ).Log ("msg" , "request exceeded limits" , "tenant" , tenantID )
515
+ } else {
516
+ level .Debug (d .logger ).Log ("msg" , "request accepted" , "tenant" , tenantID )
517
+ }
518
+ }
519
+
477
520
// First we flatten out the request into a list of samples.
478
521
// We use the heuristic of 1 sample per TS to size the array.
479
522
// We also work out the hash value at the same time.
@@ -494,8 +537,9 @@ func (d *Distributor) PushWithResolver(ctx context.Context, req *logproto.PushRe
494
537
return
495
538
}
496
539
streams = append (streams , KeyedStream {
497
- HashKey : lokiring .TokenFor (tenantID , stream .Labels ),
498
- Stream : stream ,
540
+ HashKey : lokiring .TokenFor (tenantID , stream .Labels ),
541
+ HashKeyNoShard : stream .Hash ,
542
+ Stream : stream ,
499
543
})
500
544
}
501
545
@@ -932,7 +976,7 @@ func (d *Distributor) shardStream(stream logproto.Stream, pushSize int, tenantID
932
976
shardCount := d .shardCountFor (logger , & stream , pushSize , tenantID , shardStreamsCfg )
933
977
934
978
if shardCount <= 1 {
935
- return []KeyedStream {{HashKey : lokiring .TokenFor (tenantID , stream .Labels ), Stream : stream }}
979
+ return []KeyedStream {{HashKey : lokiring .TokenFor (tenantID , stream .Labels ), HashKeyNoShard : stream . Hash , Stream : stream }}
936
980
}
937
981
938
982
d .streamShardCount .Inc ()
@@ -976,8 +1020,9 @@ func (d *Distributor) createShards(stream logproto.Stream, totalShards int, tena
976
1020
shard := d .createShard (streamLabels , streamPattern , shardNum , entriesPerShard )
977
1021
978
1022
derivedStreams = append (derivedStreams , KeyedStream {
979
- HashKey : lokiring .TokenFor (tenantID , shard .Labels ),
980
- Stream : shard ,
1023
+ HashKey : lokiring .TokenFor (tenantID , shard .Labels ),
1024
+ HashKeyNoShard : stream .Hash ,
1025
+ Stream : shard ,
981
1026
})
982
1027
983
1028
if shardStreamsCfg .LoggingEnabled {
@@ -1107,9 +1152,65 @@ func (d *Distributor) sendStreams(task pushIngesterTask) {
1107
1152
}
1108
1153
}
1109
1154
1155
+ func (d * Distributor ) exceedsLimits (ctx context.Context , tenantID string , streams []logproto.Stream ) (* logproto.ExceedsLimitsResponse , error ) {
1156
+ // We use an FNV-1 of all stream hashes in the request to load balance requests
1157
+ // to limits-frontends instances.
1158
+ h := fnv .New32 ()
1159
+
1160
+ // The distributor sends the hashes of all streams in the request to the
1161
+ // limits-frontend. The limits-frontend is responsible for deciding if
1162
+ // the request would exceed the tenants limits, and if so, which streams
1163
+ // from the request caused it to exceed its limits.
1164
+ streamHashes := make ([]* logproto.StreamMetadata , 0 , len (streams ))
1165
+ for _ , stream := range streams {
1166
+ // Add the stream hash to FNV-1.
1167
+ buf := make ([]byte , binary .MaxVarintLen64 )
1168
+ binary .PutUvarint (buf , stream .Hash )
1169
+ _ , _ = h .Write (buf )
1170
+ // Add the stream hash to the request. This is sent to limits-frontend.
1171
+ streamHashes = append (streamHashes , & logproto.StreamMetadata {
1172
+ StreamHash : stream .Hash ,
1173
+ })
1174
+ }
1175
+
1176
+ req := logproto.ExceedsLimitsRequest {
1177
+ Tenant : tenantID ,
1178
+ Streams : streamHashes ,
1179
+ }
1180
+
1181
+ // Get the limits-frontend instances from the ring.
1182
+ var descs [5 ]ring.InstanceDesc
1183
+ rs , err := d .limitsFrontendRing .Get (h .Sum32 (), limits_frontend_client .LimitsRead , descs [0 :], nil , nil )
1184
+ if err != nil {
1185
+ return nil , fmt .Errorf ("failed to get limits-frontend instances from ring: %w" , err )
1186
+ }
1187
+
1188
+ var lastErr error
1189
+ // Send the request to the limits-frontend to see if it exceeds the tenant
1190
+ // limits. If the RPC fails, failover to the next instance in the ring.
1191
+ for _ , instance := range rs .Instances {
1192
+ c , err := d .limitsFrontends .GetClientFor (instance .Addr )
1193
+ if err != nil {
1194
+ lastErr = err
1195
+ continue
1196
+ }
1197
+
1198
+ client := c .(logproto.IngestLimitsFrontendClient )
1199
+ resp , err := client .ExceedsLimits (ctx , & req )
1200
+ if err != nil {
1201
+ lastErr = err
1202
+ continue
1203
+ }
1204
+
1205
+ return resp , nil
1206
+ }
1207
+
1208
+ return nil , lastErr
1209
+ }
1210
+
1110
1211
// TODO taken from Cortex, see if we can refactor out an usable interface.
1111
1212
func (d * Distributor ) sendStreamsErr (ctx context.Context , ingester ring.InstanceDesc , streams []* streamTracker ) error {
1112
- c , err := d .pool .GetClientFor (ingester .Addr )
1213
+ c , err := d .ingesterClients .GetClientFor (ingester .Addr )
1113
1214
if err != nil {
1114
1215
return err
1115
1216
}
@@ -1150,20 +1251,48 @@ func (d *Distributor) sendStreamToKafka(ctx context.Context, stream KeyedStream,
1150
1251
if len (stream .Stream .Entries ) == 0 {
1151
1252
return nil
1152
1253
}
1153
- partitionID , err := subring .ActivePartitionForKey (stream .HashKey )
1254
+
1255
+ // The distributor writes stream records to one of the active partitions
1256
+ // in the partition ring. The number of active partitions is equal to the
1257
+ // number of ingesters.
1258
+ streamPartitionID , err := subring .ActivePartitionForKey (stream .HashKey )
1154
1259
if err != nil {
1155
1260
d .kafkaAppends .WithLabelValues ("kafka" , "fail" ).Inc ()
1156
1261
return fmt .Errorf ("failed to find active partition for stream: %w" , err )
1157
1262
}
1158
-
1159
1263
startTime := time .Now ()
1160
-
1161
- records , err := kafka .Encode (partitionID , tenant , stream .Stream , d .cfg .KafkaConfig .ProducerMaxRecordSizeBytes )
1264
+ records , err := kafka .Encode (
1265
+ streamPartitionID ,
1266
+ tenant ,
1267
+ stream .Stream ,
1268
+ d .cfg .KafkaConfig .ProducerMaxRecordSizeBytes ,
1269
+ )
1162
1270
if err != nil {
1163
- d .kafkaAppends .WithLabelValues (fmt .Sprintf ("partition_%d" , partitionID ), "fail" ).Inc ()
1271
+ d .kafkaAppends .WithLabelValues (
1272
+ fmt .Sprintf ("partition_%d" , streamPartitionID ),
1273
+ "fail" ,
1274
+ ).Inc ()
1164
1275
return fmt .Errorf ("failed to marshal write request to records: %w" , err )
1165
1276
}
1166
1277
1278
+ // However, unlike stream records, the distributor writes stream metadata
1279
+ // records to one of a fixed number of partitions, the size of which is
1280
+ // determined ahead of time. It does not use a ring. The reason for this
1281
+ // is that we want to be able to scale components that consume metadata
1282
+ // records independent of ingesters.
1283
+ metadataPartitionID := int32 (stream .HashKeyNoShard % uint64 (d .numMetadataPartitions ))
1284
+ metadata , err := kafka .EncodeStreamMetadata (
1285
+ metadataPartitionID ,
1286
+ d .cfg .KafkaConfig .Topic ,
1287
+ tenant ,
1288
+ stream .HashKeyNoShard ,
1289
+ )
1290
+ if err != nil {
1291
+ return fmt .Errorf ("failed to marshal metadata: %w" , err )
1292
+ }
1293
+
1294
+ records = append (records , metadata )
1295
+
1167
1296
d .kafkaRecordsPerRequest .Observe (float64 (len (records )))
1168
1297
1169
1298
produceResults := d .kafkaWriter .ProduceSync (ctx , records )
@@ -1176,10 +1305,10 @@ func (d *Distributor) sendStreamToKafka(ctx context.Context, stream KeyedStream,
1176
1305
var finalErr error
1177
1306
for _ , result := range produceResults {
1178
1307
if result .Err != nil {
1179
- d .kafkaAppends .WithLabelValues (fmt .Sprintf ("partition_%d" , partitionID ), "fail" ).Inc ()
1308
+ d .kafkaAppends .WithLabelValues (fmt .Sprintf ("partition_%d" , streamPartitionID ), "fail" ).Inc ()
1180
1309
finalErr = result .Err
1181
1310
} else {
1182
- d .kafkaAppends .WithLabelValues (fmt .Sprintf ("partition_%d" , partitionID ), "success" ).Inc ()
1311
+ d .kafkaAppends .WithLabelValues (fmt .Sprintf ("partition_%d" , streamPartitionID ), "success" ).Inc ()
1183
1312
}
1184
1313
}
1185
1314
0 commit comments