Skip to content

Commit a06a562

Browse files
fix: zeroed stream hash when checking limits
1 parent c790ad8 commit a06a562

File tree

1 file changed

+14
-14
lines changed

1 file changed

+14
-14
lines changed

‎pkg/distributor/distributor.go

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -506,17 +506,6 @@ func (d *Distributor) PushWithResolver(ctx context.Context, req *logproto.PushRe
506506
return &logproto.PushResponse{}, httpgrpc.Errorf(http.StatusUnprocessableEntity, validation.MissingStreamsErrorMsg)
507507
}
508508

509-
if d.cfg.IngestLimitsEnabled {
510-
exceedsLimits, err := d.exceedsLimits(ctx, tenantID, req.Streams)
511-
if err != nil {
512-
level.Error(d.logger).Log("msg", "failed to check if request exceeds limits, request has been accepted", "err", err)
513-
} else if len(exceedsLimits.RejectedStreams) > 0 {
514-
level.Error(d.logger).Log("msg", "request exceeded limits", "tenant", tenantID)
515-
} else {
516-
level.Debug(d.logger).Log("msg", "request accepted", "tenant", tenantID)
517-
}
518-
}
519-
520509
// First we flatten out the request into a list of samples.
521510
// We use the heuristic of 1 sample per TS to size the array.
522511
// We also work out the hash value at the same time.
@@ -693,6 +682,17 @@ func (d *Distributor) PushWithResolver(ctx context.Context, req *logproto.PushRe
693682
}
694683
}()
695684

685+
if d.cfg.IngestLimitsEnabled {
686+
exceedsLimits, err := d.exceedsLimits(ctx, tenantID, streams)
687+
if err != nil {
688+
level.Error(d.logger).Log("msg", "failed to check if request exceeds limits, request has been accepted", "err", err)
689+
} else if len(exceedsLimits.RejectedStreams) > 0 {
690+
level.Error(d.logger).Log("msg", "request exceeded limits", "tenant", tenantID)
691+
} else {
692+
level.Debug(d.logger).Log("msg", "request accepted", "tenant", tenantID)
693+
}
694+
}
695+
696696
var validationErr error
697697
if validationErrors.Err() != nil {
698698
validationErr = httpgrpc.Errorf(http.StatusBadRequest, "%s", validationErrors.Error())
@@ -1152,7 +1152,7 @@ func (d *Distributor) sendStreams(task pushIngesterTask) {
11521152
}
11531153
}
11541154

1155-
func (d *Distributor) exceedsLimits(ctx context.Context, tenantID string, streams []logproto.Stream) (*logproto.ExceedsLimitsResponse, error) {
1155+
func (d *Distributor) exceedsLimits(ctx context.Context, tenantID string, streams []KeyedStream) (*logproto.ExceedsLimitsResponse, error) {
11561156
// We use an FNV-1 of all stream hashes in the request to load balance requests
11571157
// to limits-frontends instances.
11581158
h := fnv.New32()
@@ -1165,11 +1165,11 @@ func (d *Distributor) exceedsLimits(ctx context.Context, tenantID string, stream
11651165
for _, stream := range streams {
11661166
// Add the stream hash to FNV-1.
11671167
buf := make([]byte, binary.MaxVarintLen64)
1168-
binary.PutUvarint(buf, stream.Hash)
1168+
binary.PutUvarint(buf, stream.HashKeyNoShard)
11691169
_, _ = h.Write(buf)
11701170
// Add the stream hash to the request. This is sent to limits-frontend.
11711171
streamHashes = append(streamHashes, &logproto.StreamMetadata{
1172-
StreamHash: stream.Hash,
1172+
StreamHash: stream.HashKeyNoShard,
11731173
})
11741174
}
11751175

0 commit comments

Comments
 (0)