Skip to content

Commit da361dd

Browse files
committed
feat: Detect generic fields and put them into structured metadata
Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
1 parent 49eed79 commit da361dd

File tree

8 files changed

+230
-44
lines changed

8 files changed

+230
-44
lines changed

‎docs/sources/shared/configuration.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3329,6 +3329,10 @@ The `limits_config` block configures global and per-tenant limits in Loki. The v
33293329
# CLI flag: -validation.increment-duplicate-timestamps
33303330
[increment_duplicate_timestamp: <boolean> | default = false]
33313331

3332+
# Detect fields from stream labels, structured metadata, or json/logfmt
3333+
# formatted log line and put them into structured metadata of the log entry.
3334+
[discover_generic_fields: <map of string to list of strings>]
3335+
33323336
# If no service_name label exists, Loki maps a single label from the configured
33333337
# list to service_name. If none of the configured labels exist in the stream,
33343338
# label is set to unknown_service. Empty list disables setting the label.

‎pkg/distributor/distributor.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -456,6 +456,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
456456
validationContext := d.validator.getValidationContextForTime(now, tenantID)
457457
levelDetector := newFieldDetector(validationContext)
458458
shouldDiscoverLevels := levelDetector.shouldDiscoverLogLevels()
459+
shouldDiscoverGenericFields := levelDetector.shouldDiscoverGenericFields()
459460

460461
shardStreamsCfg := d.validator.Limits.ShardStreams(tenantID)
461462
maybeShardByRate := func(stream logproto.Stream, pushSize int) {
@@ -540,6 +541,14 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
540541
entry.StructuredMetadata = append(entry.StructuredMetadata, logLevel)
541542
}
542543
}
544+
if shouldDiscoverGenericFields {
545+
for field, hints := range levelDetector.validationContext.discoverGenericFields {
546+
extracted, ok := levelDetector.extractGenericField(field, hints, lbs, structuredMetadata, entry)
547+
if ok {
548+
entry.StructuredMetadata = append(entry.StructuredMetadata, extracted)
549+
}
550+
}
551+
}
543552
stream.Entries[n] = entry
544553

545554
// If configured for this tenant, increment duplicate timestamps. Note, this is imperfect

‎pkg/distributor/field_detection.go

Lines changed: 59 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -31,46 +31,43 @@ var (
3131
errorAbbrv = []byte("err")
3232
critical = []byte("critical")
3333
fatal = []byte("fatal")
34+
35+
defaultAllowedLevelFields = []string{"level", "LEVEL", "Level", "severity", "SEVERITY", "Severity", "lvl", "LVL", "Lvl"}
3436
)
3537

36-
func allowedLabelsForLevel(allowedFields []string) map[string]struct{} {
38+
func allowedLabelsForLevel(allowedFields []string) []string {
3739
if len(allowedFields) == 0 {
38-
return map[string]struct{}{
39-
"level": {}, "LEVEL": {}, "Level": {},
40-
"severity": {}, "SEVERITY": {}, "Severity": {},
41-
"lvl": {}, "LVL": {}, "Lvl": {},
42-
}
43-
}
44-
allowedFieldsMap := make(map[string]struct{}, len(allowedFields))
45-
for _, field := range allowedFields {
46-
allowedFieldsMap[field] = struct{}{}
40+
return defaultAllowedLevelFields
4741
}
48-
return allowedFieldsMap
42+
return allowedFields
4943
}
5044

5145
type FieldDetector struct {
52-
validationContext validationContext
53-
allowedLabels map[string]struct{}
46+
validationContext validationContext
47+
allowedLevelLabels []string
5448
}
5549

5650
func newFieldDetector(validationContext validationContext) *FieldDetector {
57-
logLevelFields := validationContext.logLevelFields
5851
return &FieldDetector{
59-
validationContext: validationContext,
60-
allowedLabels: allowedLabelsForLevel(logLevelFields),
52+
validationContext: validationContext,
53+
allowedLevelLabels: allowedLabelsForLevel(validationContext.logLevelFields),
6154
}
6255
}
6356

6457
func (l *FieldDetector) shouldDiscoverLogLevels() bool {
6558
return l.validationContext.allowStructuredMetadata && l.validationContext.discoverLogLevels
6659
}
6760

61+
func (l *FieldDetector) shouldDiscoverGenericFields() bool {
62+
return l.validationContext.allowStructuredMetadata && len(l.validationContext.discoverGenericFields) > 0
63+
}
64+
6865
func (l *FieldDetector) extractLogLevel(labels labels.Labels, structuredMetadata labels.Labels, entry logproto.Entry) (logproto.LabelAdapter, bool) {
69-
levelFromLabel, hasLevelLabel := l.hasAnyLevelLabels(labels)
66+
levelFromLabel, hasLevelLabel := labelsContainAny(labels, l.allowedLevelLabels)
7067
var logLevel string
7168
if hasLevelLabel {
7269
logLevel = levelFromLabel
73-
} else if levelFromMetadata, ok := l.hasAnyLevelLabels(structuredMetadata); ok {
70+
} else if levelFromMetadata, ok := labelsContainAny(structuredMetadata, l.allowedLevelLabels); ok {
7471
logLevel = levelFromMetadata
7572
} else {
7673
logLevel = l.detectLogLevelFromLogEntry(entry, structuredMetadata)
@@ -85,10 +82,27 @@ func (l *FieldDetector) extractLogLevel(labels labels.Labels, structuredMetadata
8582
}, true
8683
}
8784

88-
func (l *FieldDetector) hasAnyLevelLabels(labels labels.Labels) (string, bool) {
89-
for lbl := range l.allowedLabels {
90-
if labels.Has(lbl) {
91-
return labels.Get(lbl), true
85+
func (l *FieldDetector) extractGenericField(name string, hints []string, labels labels.Labels, structuredMetadata labels.Labels, entry logproto.Entry) (logproto.LabelAdapter, bool) {
86+
87+
var value string
88+
if v, ok := labelsContainAny(labels, hints); ok {
89+
value = v
90+
} else if v, ok := labelsContainAny(structuredMetadata, hints); ok {
91+
value = v
92+
} else {
93+
value = l.detectGenericFieldFromLogEntry(entry, hints)
94+
}
95+
96+
if value == "" {
97+
return logproto.LabelAdapter{}, false
98+
}
99+
return logproto.LabelAdapter{Name: name, Value: value}, true
100+
}
101+
102+
func labelsContainAny(labels labels.Labels, names []string) (string, bool) {
103+
for _, name := range names {
104+
if labels.Has(name) {
105+
return labels.Get(name), true
92106
}
93107
}
94108
return "", false
@@ -123,13 +137,24 @@ func (l *FieldDetector) detectLogLevelFromLogEntry(entry logproto.Entry, structu
123137
return l.extractLogLevelFromLogLine(entry.Line)
124138
}
125139

140+
func (l *FieldDetector) detectGenericFieldFromLogEntry(entry logproto.Entry, hints []string) string {
141+
lineBytes := unsafe.Slice(unsafe.StringData(entry.Line), len(entry.Line))
142+
var v []byte
143+
if isJSON(entry.Line) {
144+
v = l.getValueUsingJSONParser(lineBytes, hints)
145+
} else if isLogFmt(lineBytes) {
146+
v = l.getValueUsingLogfmtParser(lineBytes, hints)
147+
}
148+
return string(v)
149+
}
150+
126151
func (l *FieldDetector) extractLogLevelFromLogLine(log string) string {
127-
logSlice := unsafe.Slice(unsafe.StringData(log), len(log))
152+
lineBytes := unsafe.Slice(unsafe.StringData(log), len(log))
128153
var v []byte
129154
if isJSON(log) {
130-
v = l.getValueUsingJSONParser(logSlice)
131-
} else if isLogFmt(logSlice) {
132-
v = l.getValueUsingLogfmtParser(logSlice)
155+
v = l.getValueUsingJSONParser(lineBytes, l.allowedLevelLabels)
156+
} else if isLogFmt(lineBytes) {
157+
v = l.getValueUsingLogfmtParser(lineBytes, l.allowedLevelLabels)
133158
} else {
134159
return detectLevelFromLogLine(log)
135160
}
@@ -154,18 +179,21 @@ func (l *FieldDetector) extractLogLevelFromLogLine(log string) string {
154179
}
155180
}
156181

157-
func (l *FieldDetector) getValueUsingLogfmtParser(line []byte) []byte {
182+
func (l *FieldDetector) getValueUsingLogfmtParser(line []byte, hints []string) []byte {
158183
d := logfmt.NewDecoder(line)
159184
for !d.EOL() && d.ScanKeyval() {
160-
if _, ok := l.allowedLabels[string(d.Key())]; ok {
161-
return d.Value()
185+
k := unsafe.String(unsafe.SliceData(d.Key()), len(d.Key()))
186+
for _, hint := range hints {
187+
if strings.EqualFold(k, hint) {
188+
return d.Value()
189+
}
162190
}
163191
}
164192
return nil
165193
}
166194

167-
func (l *FieldDetector) getValueUsingJSONParser(log []byte) []byte {
168-
for allowedLabel := range l.allowedLabels {
195+
func (l *FieldDetector) getValueUsingJSONParser(log []byte, hints []string) []byte {
196+
for _, allowedLabel := range hints {
169197
l, _, _, err := jsonparser.Get(log, allowedLabel)
170198
if err == nil {
171199
return l

‎pkg/distributor/field_detection_test.go

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66

77
"github.com/grafana/dskit/flagext"
88
ring_client "github.com/grafana/dskit/ring/client"
9+
"github.com/prometheus/prometheus/model/labels"
910
"github.com/stretchr/testify/require"
1011
"go.opentelemetry.io/collector/pdata/plog"
1112

@@ -434,3 +435,133 @@ func Benchmark_optParseExtractLogLevelFromLogLineLogfmt(b *testing.B) {
434435
require.Equal(b, constants.LogLevelInfo, level)
435436
}
436437
}
438+
439+
func Test_DetectGenericFields_Enabled(t *testing.T) {
440+
t.Run("disabled if map is empty", func(t *testing.T) {
441+
detector := newFieldDetector(
442+
validationContext{
443+
discoverGenericFields: make(map[string][]string, 0),
444+
allowStructuredMetadata: true,
445+
})
446+
require.False(t, detector.shouldDiscoverGenericFields())
447+
})
448+
t.Run("disabled if structured metadata is not allowed", func(t *testing.T) {
449+
detector := newFieldDetector(
450+
validationContext{
451+
discoverGenericFields: map[string][]string{"trace_id": []string{"trace_id", "TRACE_ID"}},
452+
allowStructuredMetadata: false,
453+
})
454+
require.False(t, detector.shouldDiscoverGenericFields())
455+
})
456+
t.Run("enabled if structured metadata is allowed and map is not empty", func(t *testing.T) {
457+
detector := newFieldDetector(
458+
validationContext{
459+
discoverGenericFields: map[string][]string{"trace_id": []string{"trace_id", "TRACE_ID"}},
460+
allowStructuredMetadata: true,
461+
})
462+
require.True(t, detector.shouldDiscoverGenericFields())
463+
})
464+
}
465+
466+
func Test_DetectGenericFields(t *testing.T) {
467+
468+
detector := newFieldDetector(
469+
validationContext{
470+
discoverGenericFields: map[string][]string{
471+
"trace_id": []string{"trace_id"},
472+
"org_id": []string{"org_id", "user_id", "tenant_id"},
473+
},
474+
allowStructuredMetadata: true,
475+
})
476+
477+
for _, tc := range []struct {
478+
name string
479+
labels labels.Labels
480+
entry logproto.Entry
481+
expected push.LabelsAdapter
482+
}{
483+
{
484+
name: "no match",
485+
labels: labels.Labels{
486+
{Name: "env", Value: "prod"},
487+
},
488+
entry: push.Entry{
489+
Line: "log line does not match",
490+
StructuredMetadata: push.LabelsAdapter{},
491+
},
492+
expected: push.LabelsAdapter{},
493+
},
494+
{
495+
name: "stream label matches",
496+
labels: labels.Labels{
497+
{Name: "trace_id", Value: "8c5f2ecbade6f01d"},
498+
{Name: "tenant_id", Value: "fake"},
499+
},
500+
entry: push.Entry{
501+
Line: "log line does not match",
502+
StructuredMetadata: push.LabelsAdapter{},
503+
},
504+
expected: push.LabelsAdapter{
505+
{Name: "trace_id", Value: "8c5f2ecbade6f01d"},
506+
{Name: "org_id", Value: "fake"},
507+
},
508+
},
509+
{
510+
name: "metadata matches",
511+
labels: labels.Labels{
512+
{Name: "env", Value: "prod"},
513+
},
514+
entry: push.Entry{
515+
Line: "log line does not match",
516+
StructuredMetadata: push.LabelsAdapter{
517+
{Name: "trace_id", Value: "8c5f2ecbade6f01d"},
518+
{Name: "user_id", Value: "fake"},
519+
},
520+
},
521+
expected: push.LabelsAdapter{
522+
{Name: "trace_id", Value: "8c5f2ecbade6f01d"},
523+
{Name: "org_id", Value: "fake"},
524+
},
525+
},
526+
{
527+
name: "logline (logfmt) matches",
528+
labels: labels.Labels{
529+
{Name: "env", Value: "prod"},
530+
},
531+
entry: push.Entry{
532+
Line: `msg="this log line matches" trace_id="8c5f2ecbade6f01d" org_id=fake duration=1h`,
533+
StructuredMetadata: push.LabelsAdapter{},
534+
},
535+
expected: push.LabelsAdapter{
536+
{Name: "trace_id", Value: "8c5f2ecbade6f01d"},
537+
{Name: "org_id", Value: "fake"},
538+
},
539+
},
540+
{
541+
name: "logline (json) matches",
542+
labels: labels.Labels{
543+
{Name: "env", Value: "prod"},
544+
},
545+
entry: push.Entry{
546+
Line: `{"msg": "this log line matches", "trace_id": "8c5f2ecbade6f01d", "org_id": "fake", "duration": "1s"}`,
547+
StructuredMetadata: push.LabelsAdapter{},
548+
},
549+
expected: push.LabelsAdapter{
550+
{Name: "trace_id", Value: "8c5f2ecbade6f01d"},
551+
{Name: "org_id", Value: "fake"},
552+
},
553+
},
554+
} {
555+
t.Run(tc.name, func(t *testing.T) {
556+
extracted := push.LabelsAdapter{}
557+
metadata := logproto.FromLabelAdaptersToLabels(tc.entry.StructuredMetadata)
558+
for name, hints := range detector.validationContext.discoverGenericFields {
559+
field, ok := detector.extractGenericField(name, hints, tc.labels, metadata, tc.entry)
560+
if ok {
561+
extracted = append(extracted, field)
562+
}
563+
}
564+
require.ElementsMatch(t, tc.expected, extracted)
565+
})
566+
}
567+
}

‎pkg/distributor/limits.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ type Limits interface {
2323

2424
IncrementDuplicateTimestamps(userID string) bool
2525
DiscoverServiceName(userID string) []string
26+
DiscoverGenericFields(userID string) map[string][]string
2627
DiscoverLogLevels(userID string) bool
2728
LogLevelFields(userID string) []string
2829

‎pkg/distributor/validator.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ type validationContext struct {
4545

4646
incrementDuplicateTimestamps bool
4747
discoverServiceName []string
48+
discoverGenericFields map[string][]string
4849
discoverLogLevels bool
4950
logLevelFields []string
5051

@@ -73,6 +74,7 @@ func (v Validator) getValidationContextForTime(now time.Time, userID string) val
7374
discoverServiceName: v.DiscoverServiceName(userID),
7475
discoverLogLevels: v.DiscoverLogLevels(userID),
7576
logLevelFields: v.LogLevelFields(userID),
77+
discoverGenericFields: v.DiscoverGenericFields(userID),
7678
allowStructuredMetadata: v.AllowStructuredMetadata(userID),
7779
maxStructuredMetadataSize: v.MaxStructuredMetadataSize(userID),
7880
maxStructuredMetadataCount: v.MaxStructuredMetadataCount(userID),

‎pkg/validation/limits.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,9 +84,12 @@ type Limits struct {
8484
MaxLineSize flagext.ByteSize `yaml:"max_line_size" json:"max_line_size"`
8585
MaxLineSizeTruncate bool `yaml:"max_line_size_truncate" json:"max_line_size_truncate"`
8686
IncrementDuplicateTimestamp bool `yaml:"increment_duplicate_timestamp" json:"increment_duplicate_timestamp"`
87-
DiscoverServiceName []string `yaml:"discover_service_name" json:"discover_service_name"`
88-
DiscoverLogLevels bool `yaml:"discover_log_levels" json:"discover_log_levels"`
89-
LogLevelFields []string `yaml:"log_level_fields" json:"log_level_fields"`
87+
88+
// Metadata field extraction
89+
DiscoverGenericFields map[string][]string `yaml:"discover_generic_fields" json:"discover_generic_fields" doc:"description=Detect fields from stream labels, structured metadata, or json/logfmt formatted log line and put them into structured metadata of the log entry."`
90+
DiscoverServiceName []string `yaml:"discover_service_name" json:"discover_service_name"`
91+
DiscoverLogLevels bool `yaml:"discover_log_levels" json:"discover_log_levels"`
92+
LogLevelFields []string `yaml:"log_level_fields" json:"log_level_fields"`
9093

9194
// Ingester enforced limits.
9295
UseOwnedStreamCount bool `yaml:"use_owned_stream_count" json:"use_owned_stream_count"`
@@ -428,7 +431,6 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
428431
)
429432

430433
l.ShardStreams.RegisterFlagsWithPrefix("shard-streams", f)
431-
432434
f.IntVar(&l.VolumeMaxSeries, "limits.volume-max-series", 1000, "The default number of aggregated series or labels that can be returned from a log-volume endpoint")
433435

434436
f.BoolVar(&l.AllowStructuredMetadata, "validation.allow-structured-metadata", true, "Allow user to send structured metadata (non-indexed labels) in push payload.")
@@ -996,6 +998,10 @@ func (o *Overrides) IncrementDuplicateTimestamps(userID string) bool {
996998
return o.getOverridesForUser(userID).IncrementDuplicateTimestamp
997999
}
9981000

1001+
func (o *Overrides) DiscoverGenericFields(userID string) map[string][]string {
1002+
return o.getOverridesForUser(userID).DiscoverGenericFields
1003+
}
1004+
9991005
func (o *Overrides) DiscoverServiceName(userID string) []string {
10001006
return o.getOverridesForUser(userID).DiscoverServiceName
10011007
}

0 commit comments

Comments
 (0)