Skip to content

Commit be4f17e

Browse files
authored
feat: sanitize structured metadata during ingestion in the distributor (#15141)
Signed-off-by: Callum Styan <callumstyan@gmail.com>
1 parent 990f71c commit be4f17e

File tree

3 files changed

+149
-17
lines changed

3 files changed

+149
-17
lines changed

‎pkg/distributor/distributor.go

+17
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@ import (
1212
"strings"
1313
"sync"
1414
"time"
15+
"unicode/utf8"
16+
17+
otlptranslate "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheus"
1518

1619
"github.com/go-kit/log"
1720
"github.com/go-kit/log/level"
@@ -66,6 +69,14 @@ const (
6669
var (
6770
maxLabelCacheSize = 100000
6871
rfStats = analytics.NewInt("distributor_replication_factor")
72+
73+
// the rune error replacement is rejected by Prometheus hence replacing them with space.
74+
removeInvalidUtf = func(r rune) rune {
75+
if r == utf8.RuneError {
76+
return 32 // rune value for space
77+
}
78+
return r
79+
}
6980
)
7081

7182
// Config for a Distributor.
@@ -517,6 +528,12 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
517528
}
518529

519530
structuredMetadata := logproto.FromLabelAdaptersToLabels(entry.StructuredMetadata)
531+
for i := range entry.StructuredMetadata {
532+
structuredMetadata[i].Name = otlptranslate.NormalizeLabel(structuredMetadata[i].Name)
533+
if strings.ContainsRune(structuredMetadata[i].Value, utf8.RuneError) {
534+
structuredMetadata[i].Value = strings.Map(removeInvalidUtf, structuredMetadata[i].Value)
535+
}
536+
}
520537
if shouldDiscoverLevels {
521538
logLevel, ok := levelDetector.extractLogLevel(lbs, structuredMetadata, entry)
522539
if ok {

‎pkg/distributor/distributor_test.go

+128-13
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,11 @@ import (
1111
"sync"
1212
"testing"
1313
"time"
14+
"unicode/utf8"
15+
16+
otlptranslate "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheus"
17+
18+
"github.com/grafana/loki/pkg/push"
1419

1520
"github.com/c2h5oh/datasize"
1621
"github.com/go-kit/log"
@@ -46,6 +51,13 @@ import (
4651
"github.com/grafana/loki/v3/pkg/validation"
4752
)
4853

54+
const (
55+
smValidName = "valid_name"
56+
smInvalidName = "invalid-name"
57+
smValidValue = "valid-value私"
58+
smInvalidValue = "valid-value�"
59+
)
60+
4961
var (
5062
success = &logproto.PushResponse{}
5163
ctx = user.InjectOrgID(context.Background(), "test")
@@ -428,7 +440,7 @@ func TestDistributorPushConcurrently(t *testing.T) {
428440
[]string{
429441
fmt.Sprintf(`{app="foo-%d"}`, n),
430442
fmt.Sprintf(`{instance="bar-%d"}`, n),
431-
},
443+
}, false, false, false,
432444
)
433445
response, err := distributors[n%len(distributors)].Push(ctx, request)
434446
assert.NoError(t, err)
@@ -1226,17 +1238,58 @@ func Benchmark_Push(b *testing.B) {
12261238
limits.RejectOldSamplesMaxAge = model.Duration(24 * time.Hour)
12271239
limits.CreationGracePeriod = model.Duration(24 * time.Hour)
12281240
distributors, _ := prepare(&testing.T{}, 1, 5, limits, nil)
1229-
request := makeWriteRequest(100000, 100)
1230-
12311241
b.ResetTimer()
12321242
b.ReportAllocs()
12331243

1234-
for n := 0; n < b.N; n++ {
1235-
_, err := distributors[0].Push(ctx, request)
1236-
if err != nil {
1237-
require.NoError(b, err)
1244+
b.Run("no structured metadata", func(b *testing.B) {
1245+
for n := 0; n < b.N; n++ {
1246+
request := makeWriteRequestWithLabels(100000, 100, []string{`{foo="bar"}`}, false, false, false)
1247+
_, err := distributors[0].Push(ctx, request)
1248+
if err != nil {
1249+
require.NoError(b, err)
1250+
}
12381251
}
1239-
}
1252+
})
1253+
1254+
b.Run("all valid structured metadata", func(b *testing.B) {
1255+
for n := 0; n < b.N; n++ {
1256+
request := makeWriteRequestWithLabels(100000, 100, []string{`{foo="bar"}`}, true, false, false)
1257+
_, err := distributors[0].Push(ctx, request)
1258+
if err != nil {
1259+
require.NoError(b, err)
1260+
}
1261+
}
1262+
})
1263+
1264+
b.Run("structured metadata with invalid names", func(b *testing.B) {
1265+
for n := 0; n < b.N; n++ {
1266+
request := makeWriteRequestWithLabels(100000, 100, []string{`{foo="bar"}`}, true, true, false)
1267+
_, err := distributors[0].Push(ctx, request)
1268+
if err != nil {
1269+
require.NoError(b, err)
1270+
}
1271+
}
1272+
})
1273+
1274+
b.Run("structured metadata with invalid values", func(b *testing.B) {
1275+
for n := 0; n < b.N; n++ {
1276+
request := makeWriteRequestWithLabels(100000, 100, []string{`{foo="bar"}`}, true, false, true)
1277+
_, err := distributors[0].Push(ctx, request)
1278+
if err != nil {
1279+
require.NoError(b, err)
1280+
}
1281+
}
1282+
})
1283+
1284+
b.Run("structured metadata with invalid names and values", func(b *testing.B) {
1285+
for n := 0; n < b.N; n++ {
1286+
request := makeWriteRequestWithLabels(100000, 100, []string{`{foo="bar"}`}, true, true, true)
1287+
_, err := distributors[0].Push(ctx, request)
1288+
if err != nil {
1289+
require.NoError(b, err)
1290+
}
1291+
}
1292+
})
12401293
}
12411294

12421295
func TestShardCalculation(t *testing.T) {
@@ -1696,7 +1749,7 @@ func makeWriteRequestWithLabelsWithLevel(lines, size int, labels []string, level
16961749
}
16971750
}
16981751

1699-
func makeWriteRequestWithLabels(lines, size int, labels []string) *logproto.PushRequest {
1752+
func makeWriteRequestWithLabels(lines, size int, labels []string, addStructuredMetadata, invalidName, invalidValue bool) *logproto.PushRequest {
17001753
streams := make([]logproto.Stream, len(labels))
17011754
for i := 0; i < len(labels); i++ {
17021755
stream := logproto.Stream{Labels: labels[i]}
@@ -1705,11 +1758,24 @@ func makeWriteRequestWithLabels(lines, size int, labels []string) *logproto.Push
17051758
// Construct the log line, honoring the input size
17061759
line := strconv.Itoa(j) + strings.Repeat("0", size)
17071760
line = line[:size]
1708-
1709-
stream.Entries = append(stream.Entries, logproto.Entry{
1761+
entry := logproto.Entry{
17101762
Timestamp: time.Now().Add(time.Duration(j) * time.Millisecond),
17111763
Line: line,
1712-
})
1764+
}
1765+
if addStructuredMetadata {
1766+
name := smValidName
1767+
value := smValidValue
1768+
if invalidName {
1769+
name = smInvalidName
1770+
}
1771+
if invalidValue {
1772+
value = smInvalidValue
1773+
}
1774+
entry.StructuredMetadata = push.LabelsAdapter{
1775+
{Name: name, Value: value},
1776+
}
1777+
}
1778+
stream.Entries = append(stream.Entries, entry)
17131779
}
17141780

17151781
streams[i] = stream
@@ -1721,7 +1787,7 @@ func makeWriteRequestWithLabels(lines, size int, labels []string) *logproto.Push
17211787
}
17221788

17231789
func makeWriteRequest(lines, size int) *logproto.PushRequest {
1724-
return makeWriteRequestWithLabels(lines, size, []string{`{foo="bar"}`})
1790+
return makeWriteRequestWithLabels(lines, size, []string{`{foo="bar"}`}, false, false, false)
17251791
}
17261792

17271793
type mockKafkaWriter struct {
@@ -1777,6 +1843,19 @@ func (i *mockIngester) Push(_ context.Context, in *logproto.PushRequest, _ ...gr
17771843

17781844
i.mu.Lock()
17791845
defer i.mu.Unlock()
1846+
for _, s := range in.Streams {
1847+
for _, e := range s.Entries {
1848+
for _, sm := range e.StructuredMetadata {
1849+
if strings.ContainsRune(sm.Value, utf8.RuneError) {
1850+
return nil, fmt.Errorf("sm value was not sanitized before being pushed to ignester, invalid utf 8 rune %d", utf8.RuneError)
1851+
}
1852+
if sm.Name != otlptranslate.NormalizeLabel(sm.Name) {
1853+
return nil, fmt.Errorf("sm name was not sanitized before being sent to ingester, contained characters %s", sm.Name)
1854+
1855+
}
1856+
}
1857+
}
1858+
}
17801859

17811860
i.pushed = append(i.pushed, in)
17821861
return nil, nil
@@ -1875,3 +1954,39 @@ func TestDistributorTee(t *testing.T) {
18751954
require.Equal(t, "test", tee.tenant)
18761955
}
18771956
}
1957+
1958+
func TestDistributor_StructuredMetadataSanitization(t *testing.T) {
1959+
limits := &validation.Limits{}
1960+
flagext.DefaultValues(limits)
1961+
for _, tc := range []struct {
1962+
req *logproto.PushRequest
1963+
expectedResponse *logproto.PushResponse
1964+
}{
1965+
{
1966+
makeWriteRequestWithLabels(10, 10, []string{`{foo="bar"}`}, true, false, false),
1967+
success,
1968+
},
1969+
{
1970+
makeWriteRequestWithLabels(10, 10, []string{`{foo="bar"}`}, true, true, false),
1971+
success,
1972+
},
1973+
{
1974+
makeWriteRequestWithLabels(10, 10, []string{`{foo="bar"}`}, true, false, true),
1975+
success,
1976+
},
1977+
{
1978+
makeWriteRequestWithLabels(10, 10, []string{`{foo="bar"}`}, true, true, true),
1979+
success,
1980+
},
1981+
} {
1982+
distributors, _ := prepare(t, 1, 5, limits, nil)
1983+
1984+
var request logproto.PushRequest
1985+
request.Streams = append(request.Streams, tc.req.Streams[0])
1986+
1987+
// the error would happen in the ingester mock, it's set to reject SM that has not been sanitized
1988+
response, err := distributors[0].Push(ctx, &request)
1989+
require.NoError(t, err)
1990+
assert.Equal(t, tc.expectedResponse, response)
1991+
}
1992+
}

‎pkg/distributor/level_detection_test.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ func Test_DetectLogLevels(t *testing.T) {
3131
limits, ingester := setup(false)
3232
distributors, _ := prepare(t, 1, 5, limits, func(_ string) (ring_client.PoolClient, error) { return ingester, nil })
3333

34-
writeReq := makeWriteRequestWithLabels(1, 10, []string{`{foo="bar"}`})
34+
writeReq := makeWriteRequestWithLabels(1, 10, []string{`{foo="bar"}`}, false, false, false)
3535
_, err := distributors[0].Push(ctx, writeReq)
3636
require.NoError(t, err)
3737
topVal := ingester.Peek()
@@ -43,7 +43,7 @@ func Test_DetectLogLevels(t *testing.T) {
4343
limits, ingester := setup(true)
4444
distributors, _ := prepare(t, 1, 5, limits, func(_ string) (ring_client.PoolClient, error) { return ingester, nil })
4545

46-
writeReq := makeWriteRequestWithLabels(1, 10, []string{`{foo="bar"}`})
46+
writeReq := makeWriteRequestWithLabels(1, 10, []string{`{foo="bar"}`}, false, false, false)
4747
_, err := distributors[0].Push(ctx, writeReq)
4848
require.NoError(t, err)
4949
topVal := ingester.Peek()
@@ -80,7 +80,7 @@ func Test_DetectLogLevels(t *testing.T) {
8080
limits, ingester := setup(true)
8181
distributors, _ := prepare(t, 1, 5, limits, func(_ string) (ring_client.PoolClient, error) { return ingester, nil })
8282

83-
writeReq := makeWriteRequestWithLabels(1, 10, []string{`{foo="bar", level="debug"}`})
83+
writeReq := makeWriteRequestWithLabels(1, 10, []string{`{foo="bar", level="debug"}`}, false, false, false)
8484
_, err := distributors[0].Push(ctx, writeReq)
8585
require.NoError(t, err)
8686
topVal := ingester.Peek()
@@ -95,7 +95,7 @@ func Test_DetectLogLevels(t *testing.T) {
9595
limits, ingester := setup(true)
9696
distributors, _ := prepare(t, 1, 5, limits, func(_ string) (ring_client.PoolClient, error) { return ingester, nil })
9797

98-
writeReq := makeWriteRequestWithLabels(1, 10, []string{`{foo="bar"}`})
98+
writeReq := makeWriteRequestWithLabels(1, 10, []string{`{foo="bar"}`}, false, false, false)
9999
writeReq.Streams[0].Entries[0].StructuredMetadata = push.LabelsAdapter{
100100
{
101101
Name: "severity",

0 commit comments

Comments
 (0)