@@ -506,17 +506,6 @@ func (d *Distributor) PushWithResolver(ctx context.Context, req *logproto.PushRe
506
506
return & logproto.PushResponse {}, httpgrpc .Errorf (http .StatusUnprocessableEntity , validation .MissingStreamsErrorMsg )
507
507
}
508
508
509
- if d .cfg .IngestLimitsEnabled {
510
- exceedsLimits , err := d .exceedsLimits (ctx , tenantID , req .Streams )
511
- if err != nil {
512
- level .Error (d .logger ).Log ("msg" , "failed to check if request exceeds limits, request has been accepted" , "err" , err )
513
- } else if len (exceedsLimits .RejectedStreams ) > 0 {
514
- level .Error (d .logger ).Log ("msg" , "request exceeded limits" , "tenant" , tenantID )
515
- } else {
516
- level .Debug (d .logger ).Log ("msg" , "request accepted" , "tenant" , tenantID )
517
- }
518
- }
519
-
520
509
// First we flatten out the request into a list of samples.
521
510
// We use the heuristic of 1 sample per TS to size the array.
522
511
// We also work out the hash value at the same time.
@@ -706,6 +695,17 @@ func (d *Distributor) PushWithResolver(ctx context.Context, req *logproto.PushRe
706
695
return & logproto.PushResponse {}, validationErr
707
696
}
708
697
698
+ if d .cfg .IngestLimitsEnabled {
699
+ exceedsLimits , err := d .exceedsLimits (ctx , tenantID , streams )
700
+ if err != nil {
701
+ level .Error (d .logger ).Log ("msg" , "failed to check if request exceeds limits, request has been accepted" , "err" , err )
702
+ } else if len (exceedsLimits .RejectedStreams ) > 0 {
703
+ level .Error (d .logger ).Log ("msg" , "request exceeded limits" , "tenant" , tenantID )
704
+ } else {
705
+ level .Debug (d .logger ).Log ("msg" , "request accepted" , "tenant" , tenantID )
706
+ }
707
+ }
708
+
709
709
if ! d .ingestionRateLimiter .AllowN (now , tenantID , validationContext .validationMetrics .aggregatedPushStats .lineSize ) {
710
710
d .trackDiscardedData (ctx , req , validationContext , tenantID , validationContext .validationMetrics , validation .RateLimited , streamResolver )
711
711
@@ -1152,7 +1152,7 @@ func (d *Distributor) sendStreams(task pushIngesterTask) {
1152
1152
}
1153
1153
}
1154
1154
1155
- func (d * Distributor ) exceedsLimits (ctx context.Context , tenantID string , streams []logproto. Stream ) (* logproto.ExceedsLimitsResponse , error ) {
1155
+ func (d * Distributor ) exceedsLimits (ctx context.Context , tenantID string , streams []KeyedStream ) (* logproto.ExceedsLimitsResponse , error ) {
1156
1156
// We use an FNV-1 of all stream hashes in the request to load balance requests
1157
1157
// to limits-frontends instances.
1158
1158
h := fnv .New32 ()
@@ -1165,11 +1165,11 @@ func (d *Distributor) exceedsLimits(ctx context.Context, tenantID string, stream
1165
1165
for _ , stream := range streams {
1166
1166
// Add the stream hash to FNV-1.
1167
1167
buf := make ([]byte , binary .MaxVarintLen64 )
1168
- binary .PutUvarint (buf , stream .Hash )
1168
+ binary .PutUvarint (buf , stream .HashKeyNoShard )
1169
1169
_ , _ = h .Write (buf )
1170
1170
// Add the stream hash to the request. This is sent to limits-frontend.
1171
1171
streamHashes = append (streamHashes , & logproto.StreamMetadata {
1172
- StreamHash : stream .Hash ,
1172
+ StreamHash : stream .HashKeyNoShard ,
1173
1173
})
1174
1174
}
1175
1175
0 commit comments