Skip to content

Commit 3d6163a

Browse files
authored
fix: Fix blocked ingestion returned error when 260 (#16387)
**What this PR does / why we need it**: Modify how we handle blocked ingestion errors: * If it is 200, don't expose any error * Otherwise, expose the error as the configured statusCode, but only if no other validation error occur. I'm calling this a secondTierErr Also modifying the enforced labels error message to log present labels.
1 parent 073c94c commit 3d6163a

File tree

3 files changed

+19
-6
lines changed

3 files changed

+19
-6
lines changed

‎pkg/distributor/distributor.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -516,6 +516,8 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
516516
}
517517
}
518518

519+
var ingestionBlockedError error
520+
519521
func() {
520522
sp := opentracing.SpanFromContext(ctx)
521523
if sp != nil {
@@ -547,7 +549,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
547549
}
548550

549551
if missing, lbsMissing := d.missingEnforcedLabels(lbs, tenantID, policy); missing {
550-
err := fmt.Errorf(validation.MissingEnforcedLabelsErrorMsg, strings.Join(lbsMissing, ","), tenantID)
552+
err := fmt.Errorf(validation.MissingEnforcedLabelsErrorMsg, strings.Join(lbsMissing, ","), tenantID, stream.Labels)
551553
d.writeFailuresManager.Log(tenantID, err)
552554
validationErrors.Add(err)
553555
discardedBytes := util.EntriesTotalSize(stream.Entries)
@@ -560,14 +562,15 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
560562
discardedBytes := util.EntriesTotalSize(stream.Entries)
561563
d.validator.reportDiscardedData(reason, validationContext, retentionHours, policy, discardedBytes, len(stream.Entries))
562564

563-
// If the status code is 200, return success.
565+
// If the status code is 200, return no error.
564566
// Note that we still log the error and increment the metrics.
565567
if statusCode == http.StatusOK {
566-
// do not add error to validationErrors.
567568
continue
568569
}
569570

570-
validationErrors.Add(err)
571+
// return an error but do not add it to validationErrors
572+
// otherwise client will get a 400 and will log it.
573+
ingestionBlockedError = httpgrpc.Errorf(statusCode, "%s", err.Error())
571574
continue
572575
}
573576

@@ -647,6 +650,9 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
647650
var validationErr error
648651
if validationErrors.Err() != nil {
649652
validationErr = httpgrpc.Errorf(http.StatusBadRequest, "%s", validationErrors.Error())
653+
} else if ingestionBlockedError != nil {
654+
// Any validation error takes precedence over the status code and error message for blocked ingestion.
655+
validationErr = ingestionBlockedError
650656
}
651657

652658
// Return early if none of the streams contained entries

‎pkg/distributor/distributor_test.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -475,7 +475,7 @@ func Test_PushWithEnforcedLabels(t *testing.T) {
475475
// enforced labels configured, but all labels are missing.
476476
_, err := distributors[0].Push(ctx, req)
477477
require.Error(t, err)
478-
expectedErr := httpgrpc.Errorf(http.StatusBadRequest, validation.MissingEnforcedLabelsErrorMsg, "app,env", "test")
478+
expectedErr := httpgrpc.Errorf(http.StatusBadRequest, validation.MissingEnforcedLabelsErrorMsg, "app,env", "test", "{foo=\"bar\"}")
479479
require.EqualError(t, err, expectedErr.Error())
480480

481481
// Verify metrics for discarded samples due to missing enforced labels
@@ -1677,6 +1677,13 @@ func TestDistributor_PushIngestionBlocked(t *testing.T) {
16771677
expectError: false,
16781678
expectedStatusCode: http.StatusOK,
16791679
},
1680+
{
1681+
name: "blocked with status code 260",
1682+
blockUntil: time.Now().Add(1 * time.Hour),
1683+
blockStatusCode: 260,
1684+
expectError: true,
1685+
expectedStatusCode: 260,
1686+
},
16801687
} {
16811688
t.Run(tc.name, func(t *testing.T) {
16821689
limits := &validation.Limits{}

‎pkg/validation/validate.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ const (
7474
BlockedIngestionPolicy = "blocked_ingestion_policy"
7575
BlockedIngestionPolicyErrorMsg = "ingestion blocked for user %s until '%s' with status code '%d'"
7676
MissingEnforcedLabels = "missing_enforced_labels"
77-
MissingEnforcedLabelsErrorMsg = "missing required labels %s for user %s"
77+
MissingEnforcedLabelsErrorMsg = "missing required labels %s for user %s for stream %s"
7878
)
7979

8080
type ErrStreamRateLimit struct {

0 commit comments

Comments
 (0)