Skip to content

Commit e9d0c3e

Browse files
authored
feat: Add a metric ingestion time SM sanitization (#15222)
Signed-off-by: Callum Styan <callumstyan@gmail.com>
1 parent 658fb24 commit e9d0c3e

File tree

2 files changed

+25
-5
lines changed

2 files changed

+25
-5
lines changed

‎pkg/distributor/distributor.go

+17-5
Original file line numberDiff line numberDiff line change
@@ -167,10 +167,11 @@ type Distributor struct {
167167
RequestParserWrapper push.RequestParserWrapper
168168

169169
// metrics
170-
ingesterAppends *prometheus.CounterVec
171-
ingesterAppendTimeouts *prometheus.CounterVec
172-
replicationFactor prometheus.Gauge
173-
streamShardCount prometheus.Counter
170+
ingesterAppends *prometheus.CounterVec
171+
ingesterAppendTimeouts *prometheus.CounterVec
172+
replicationFactor prometheus.Gauge
173+
streamShardCount prometheus.Counter
174+
tenantPushSanitizedStructuredMetadata *prometheus.CounterVec
174175

175176
usageTracker push.UsageTracker
176177
ingesterTasks chan pushIngesterTask
@@ -284,6 +285,11 @@ func New(
284285
Name: "stream_sharding_count",
285286
Help: "Total number of times the distributor has sharded streams",
286287
}),
288+
tenantPushSanitizedStructuredMetadata: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
289+
Namespace: constants.Loki,
290+
Name: "distributor_push_structured_metadata_sanitized_total",
291+
Help: "The total number of times we've had to sanitize structured metadata (names or values) at ingestion time per tenant.",
292+
}, []string{"tenant"}),
287293
kafkaAppends: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
288294
Namespace: constants.Loki,
289295
Name: "distributor_kafka_appends_total",
@@ -527,11 +533,17 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
527533
continue
528534
}
529535

536+
var normalized string
530537
structuredMetadata := logproto.FromLabelAdaptersToLabels(entry.StructuredMetadata)
531538
for i := range entry.StructuredMetadata {
532-
structuredMetadata[i].Name = otlptranslate.NormalizeLabel(structuredMetadata[i].Name)
539+
normalized = otlptranslate.NormalizeLabel(structuredMetadata[i].Name)
540+
if normalized != structuredMetadata[i].Name {
541+
structuredMetadata[i].Name = normalized
542+
d.tenantPushSanitizedStructuredMetadata.WithLabelValues(tenantID).Inc()
543+
}
533544
if strings.ContainsRune(structuredMetadata[i].Value, utf8.RuneError) {
534545
structuredMetadata[i].Value = strings.Map(removeInvalidUtf, structuredMetadata[i].Value)
546+
d.tenantPushSanitizedStructuredMetadata.WithLabelValues(tenantID).Inc()
535547
}
536548
}
537549
if shouldDiscoverLevels {

‎pkg/distributor/distributor_test.go

+8
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ import (
1313
"time"
1414
"unicode/utf8"
1515

16+
"github.com/prometheus/client_golang/prometheus/testutil"
17+
1618
otlptranslate "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheus"
1719

1820
"github.com/grafana/loki/pkg/push"
@@ -1961,22 +1963,27 @@ func TestDistributor_StructuredMetadataSanitization(t *testing.T) {
19611963
for _, tc := range []struct {
19621964
req *logproto.PushRequest
19631965
expectedResponse *logproto.PushResponse
1966+
numSanitizations float64
19641967
}{
19651968
{
19661969
makeWriteRequestWithLabels(10, 10, []string{`{foo="bar"}`}, true, false, false),
19671970
success,
1971+
0,
19681972
},
19691973
{
19701974
makeWriteRequestWithLabels(10, 10, []string{`{foo="bar"}`}, true, true, false),
19711975
success,
1976+
10,
19721977
},
19731978
{
19741979
makeWriteRequestWithLabels(10, 10, []string{`{foo="bar"}`}, true, false, true),
19751980
success,
1981+
10,
19761982
},
19771983
{
19781984
makeWriteRequestWithLabels(10, 10, []string{`{foo="bar"}`}, true, true, true),
19791985
success,
1986+
20,
19801987
},
19811988
} {
19821989
distributors, _ := prepare(t, 1, 5, limits, nil)
@@ -1988,5 +1995,6 @@ func TestDistributor_StructuredMetadataSanitization(t *testing.T) {
19881995
response, err := distributors[0].Push(ctx, &request)
19891996
require.NoError(t, err)
19901997
assert.Equal(t, tc.expectedResponse, response)
1998+
assert.Equal(t, tc.numSanitizations, testutil.ToFloat64(distributors[0].tenantPushSanitizedStructuredMetadata.WithLabelValues("test")))
19911999
}
19922000
}

0 commit comments

Comments
 (0)