Skip to content

Commit 25c0620

Browse files
authored
feat: Do not enforce labels vs agg metric stream (#16696)
**What this PR does / why we need it**: Modify our push logic to not enforce labels if the current stream is the especial aggregated metric stream
1 parent 9a99859 commit 25c0620

File tree

3 files changed

+27
-9
lines changed

3 files changed

+27
-9
lines changed

‎pkg/distributor/distributor.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -548,13 +548,15 @@ func (d *Distributor) PushWithResolver(ctx context.Context, req *logproto.PushRe
548548
continue
549549
}
550550

551-
if missing, lbsMissing := d.missingEnforcedLabels(lbs, tenantID, policy); missing {
552-
err := fmt.Errorf(validation.MissingEnforcedLabelsErrorMsg, strings.Join(lbsMissing, ","), tenantID, stream.Labels)
553-
d.writeFailuresManager.Log(tenantID, err)
554-
validationErrors.Add(err)
555-
discardedBytes := util.EntriesTotalSize(stream.Entries)
556-
d.validator.reportDiscardedDataWithTracker(ctx, validation.MissingEnforcedLabels, validationContext, lbs, retentionHours, policy, discardedBytes, len(stream.Entries))
557-
continue
551+
if !d.validator.IsAggregatedMetricStream(lbs) {
552+
if missing, lbsMissing := d.missingEnforcedLabels(lbs, tenantID, policy); missing {
553+
err := fmt.Errorf(validation.MissingEnforcedLabelsErrorMsg, strings.Join(lbsMissing, ","), tenantID, stream.Labels)
554+
d.writeFailuresManager.Log(tenantID, err)
555+
validationErrors.Add(err)
556+
discardedBytes := util.EntriesTotalSize(stream.Entries)
557+
d.validator.reportDiscardedDataWithTracker(ctx, validation.MissingEnforcedLabels, validationContext, lbs, retentionHours, policy, discardedBytes, len(stream.Entries))
558+
continue
559+
}
558560
}
559561

560562
if block, statusCode, reason, err := d.validator.ShouldBlockIngestion(validationContext, now, policy); block {

‎pkg/distributor/distributor_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -507,6 +507,18 @@ func Test_PushWithEnforcedLabels(t *testing.T) {
507507
// Metrics should remain unchanged
508508
assert.Equal(t, float64(10000), testutil.ToFloat64(validation.DiscardedBytes))
509509
assert.Equal(t, float64(100), testutil.ToFloat64(validation.DiscardedSamples))
510+
511+
// enforced labels are configured but the stream is an aggregated metric, so no errors.
512+
limits.EnforcedLabels = []string{"app", "env"}
513+
distributors, _ = prepare(t, 1, 3, limits, nil)
514+
515+
req = makeWriteRequestWithLabels(100, 100, []string{`{__aggregated_metric__="foo"}`}, false, false, false)
516+
_, err = distributors[0].Push(ctx, req)
517+
require.NoError(t, err)
518+
519+
// Metrics should remain unchanged
520+
assert.Equal(t, float64(10000), testutil.ToFloat64(validation.DiscardedBytes))
521+
assert.Equal(t, float64(100), testutil.ToFloat64(validation.DiscardedSamples))
510522
}
511523

512524
func TestDistributorPushConcurrently(t *testing.T) {

‎pkg/distributor/validator.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,11 @@ func (v Validator) ValidateEntry(ctx context.Context, vCtx validationContext, la
143143
return nil
144144
}
145145

146-
// Validate labels returns an error if the labels are invalid
146+
func (v Validator) IsAggregatedMetricStream(ls labels.Labels) bool {
147+
return ls.Has(push.AggregatedMetricLabel)
148+
}
149+
150+
// Validate labels returns an error if the labels are invalid and if the stream is an aggregated metric stream
147151
func (v Validator) ValidateLabels(vCtx validationContext, ls labels.Labels, stream logproto.Stream, retentionHours, policy string) error {
148152
if len(ls) == 0 {
149153
// TODO: is this one correct?
@@ -152,7 +156,7 @@ func (v Validator) ValidateLabels(vCtx validationContext, ls labels.Labels, stre
152156
}
153157

154158
// Skip validation for aggregated metric streams, as we create those for internal use
155-
if ls.Has(push.AggregatedMetricLabel) {
159+
if v.IsAggregatedMetricStream(ls) {
156160
return nil
157161
}
158162

0 commit comments

Comments
 (0)