Skip to content

Commit 86dc47b

Browse files
authored
feat: update log level discovery from json to detect nested fields (#16026)
1 parent 76183fa commit 86dc47b

File tree

6 files changed

+192
-15
lines changed

6 files changed

+192
-15
lines changed

‎docs/sources/shared/configuration.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3432,6 +3432,12 @@ discover_generic_fields:
34323432
# CLI flag: -validation.log-level-fields
34333433
[log_level_fields: <list of strings> | default = [level LEVEL Level Severity severity SEVERITY lvl LVL Lvl]]
34343434

3435+
# Maximum depth to search for log level fields in JSON logs. A value of 0 or
3436+
# less means unlimited depth. Default is 2 which searches the first 2 levels of
3437+
# the JSON object.
3438+
# CLI flag: -validation.log-level-from-json-max-depth
3439+
[log_level_from_json_max_depth: <int> | default = 2]
3440+
34353441
# When true an ingester takes into account only the streams that it owns
34363442
# according to the ring while applying the stream limit.
34373443
# CLI flag: -ingester.use-owned-stream-count

‎pkg/distributor/field_detection.go

Lines changed: 63 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package distributor
22

33
import (
44
"bytes"
5+
"errors"
56
"strconv"
67
"strings"
78
"unicode"
@@ -34,25 +35,50 @@ var (
3435
critical = []byte("critical")
3536
fatal = []byte("fatal")
3637

37-
defaultAllowedLevelFields = []string{"level", "LEVEL", "Level", "severity", "SEVERITY", "Severity", "lvl", "LVL", "Lvl"}
38+
defaultAllowedLevelFields = []string{
39+
"level",
40+
"LEVEL",
41+
"Level",
42+
"log.level",
43+
"severity",
44+
"SEVERITY",
45+
"Severity",
46+
"SeverityText",
47+
"lvl",
48+
"LVL",
49+
"Lvl",
50+
}
51+
52+
errKeyFound = errors.New("key found")
3853
)
3954

4055
func allowedLabelsForLevel(allowedFields []string) []string {
4156
if len(allowedFields) == 0 {
4257
return defaultAllowedLevelFields
4358
}
59+
4460
return allowedFields
4561
}
4662

4763
type FieldDetector struct {
48-
validationContext validationContext
49-
allowedLevelLabels []string
64+
validationContext validationContext
65+
allowedLevelLabelsMap map[string]struct{}
66+
allowedLevelLabels []string
67+
logLevelFromJSONMaxDepth int
5068
}
5169

5270
func newFieldDetector(validationContext validationContext) *FieldDetector {
71+
allowedLevelLabels := allowedLabelsForLevel(validationContext.logLevelFields)
72+
allowedLevelLabelsMap := make(map[string]struct{}, len(allowedLevelLabels))
73+
for _, field := range allowedLevelLabels {
74+
allowedLevelLabelsMap[field] = struct{}{}
75+
}
76+
5377
return &FieldDetector{
54-
validationContext: validationContext,
55-
allowedLevelLabels: allowedLabelsForLevel(validationContext.logLevelFields),
78+
validationContext: validationContext,
79+
allowedLevelLabelsMap: allowedLevelLabelsMap,
80+
allowedLevelLabels: allowedLevelLabels,
81+
logLevelFromJSONMaxDepth: validationContext.logLevelFromJSONMaxDepth,
5682
}
5783
}
5884

@@ -154,7 +180,7 @@ func (l *FieldDetector) extractLogLevelFromLogLine(log string) string {
154180
lineBytes := unsafe.Slice(unsafe.StringData(log), len(log))
155181
var v []byte
156182
if isJSON(log) {
157-
v = getValueUsingJSONParser(lineBytes, l.allowedLevelLabels)
183+
v = getLevelUsingJSONParser(lineBytes, l.allowedLevelLabelsMap, l.logLevelFromJSONMaxDepth)
158184
} else if isLogFmt(lineBytes) {
159185
v = getValueUsingLogfmtParser(lineBytes, l.allowedLevelLabels)
160186
} else {
@@ -219,6 +245,37 @@ func getValueUsingJSONParser(line []byte, hints []string) []byte {
219245
return res
220246
}
221247

248+
func getLevelUsingJSONParser(line []byte, allowedLevelFields map[string]struct{}, maxDepth int) []byte {
249+
var result []byte
250+
var detectLevel func([]byte, int) error
251+
detectLevel = func(data []byte, depth int) error {
252+
// maxDepth <= 0 means no limit
253+
if maxDepth > 0 && depth >= maxDepth {
254+
return nil
255+
}
256+
257+
return jsonparser.ObjectEach(data, func(key []byte, value []byte, dataType jsonparser.ValueType, _ int) error {
258+
switch dataType {
259+
case jsonparser.String:
260+
if _, ok := allowedLevelFields[unsafe.String(unsafe.SliceData(key), len(key))]; ok {
261+
result = value
262+
// ErrKeyFound is used to stop parsing once we find the desired key
263+
return errKeyFound
264+
}
265+
case jsonparser.Object:
266+
if err := detectLevel(value, depth+1); err != nil {
267+
return err
268+
}
269+
}
270+
271+
return nil
272+
})
273+
}
274+
275+
_ = detectLevel(line, 0)
276+
return result
277+
}
278+
222279
func isLogFmt(line []byte) bool {
223280
equalIndex := bytes.Index(line, []byte("="))
224281
if len(line) == 0 || equalIndex == -1 {

‎pkg/distributor/field_detection_test.go

Lines changed: 110 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -406,18 +406,28 @@ func Benchmark_extractLogLevelFromLogLine(b *testing.B) {
406406
}
407407

408408
func Benchmark_optParseExtractLogLevelFromLogLineJson(b *testing.B) {
409-
logLine := `{"msg": "something" , "level": "error", "id": "1"}`
410-
409+
tests := map[string]string{
410+
"level field at start": `{"level": "error", "field1": "value1", "field2": "value2", "field3": "value3", "field4": "value4", "field5": "value5", "field6": "value6", "field7": "value7", "field8": "value8", "field9": "value9"}`,
411+
"level field in middle": `{"field1": "value1", "field2": "value2", "field3": "value3", "field4": "value4", "level": "error", "field5": "value5", "field6": "value6", "field7": "value7", "field8": "value8", "field9": "value9"}`,
412+
"level field at end": `{"field1": "value1", "field2": "value2", "field3": "value3", "field4": "value4", "field5": "value5", "field6": "value6", "field7": "value7", "field8": "value8", "field9": "value9", "level": "error"}`,
413+
"no level field": `{"field1": "value1", "field2": "value2", "field3": "value3", "field4": "value4", "field5": "value5", "field6": "value6", "field7": "value7", "field8": "value8", "field9": "value9"}`,
414+
"nested level field": `{"metadata": {"level": "error"}, "field1": "value1", "field2": "value2", "field3": "value3", "field4": "value4", "field5": "value5", "field6": "value6", "field7": "value7", "field8": "value8", "field9": "value9"}`,
415+
"deeply nested level field": `{"a": {"b": {"c": {"level": "error"}}}, "field1": "value1", "field2": "value2", "field3": "value3", "field4": "value4", "field5": "value5", "field6": "value6", "field7": "value7", "field8": "value8", "field9": "value9"}`,
416+
}
411417
ld := newFieldDetector(
412418
validationContext{
413419
discoverLogLevels: true,
414420
allowStructuredMetadata: true,
415421
logLevelFields: []string{"level", "LEVEL", "Level", "severity", "SEVERITY", "Severity", "lvl", "LVL", "Lvl"},
416422
})
417423

418-
for i := 0; i < b.N; i++ {
419-
level := ld.extractLogLevelFromLogLine(logLine)
420-
require.Equal(b, constants.LogLevelError, level)
424+
for name, logLine := range tests {
425+
b.Run(name, func(b *testing.B) {
426+
b.ResetTimer()
427+
for i := 0; i < b.N; i++ {
428+
_ = ld.extractLogLevelFromLogLine(logLine)
429+
}
430+
})
421431
}
422432
}
423433

@@ -605,3 +615,98 @@ func Test_DetectGenericFields(t *testing.T) {
605615
})
606616
}
607617
}
618+
619+
func TestGetLevelUsingJsonParser(t *testing.T) {
620+
tests := []struct {
621+
name string
622+
json string
623+
allowedLevelFields map[string]struct{}
624+
maxDepth int
625+
want string
626+
}{
627+
{
628+
name: "simple top level field",
629+
json: `{"level": "error"}`,
630+
allowedLevelFields: map[string]struct{}{"level": {}},
631+
want: "error",
632+
},
633+
{
634+
name: "nested field one level deep",
635+
json: `{"a": {"level": "info"}}`,
636+
allowedLevelFields: map[string]struct{}{"level": {}},
637+
want: "info",
638+
},
639+
{
640+
name: "deeply nested field",
641+
json: `{"a": {"b": {"c": {"level": "warn"}}}}`,
642+
allowedLevelFields: map[string]struct{}{"level": {}},
643+
want: "warn",
644+
},
645+
{
646+
name: "multiple allowed fields picks first",
647+
json: `{"severity": "error", "level": "info"}`,
648+
allowedLevelFields: map[string]struct{}{"level": {}, "severity": {}},
649+
want: "error",
650+
},
651+
{
652+
name: "multiple nested fields picks first",
653+
json: `{"a": {"level": "error"}, "b": {"level": "info"}}`,
654+
allowedLevelFields: map[string]struct{}{"level": {}},
655+
want: "error",
656+
},
657+
{
658+
name: "array values are ignored",
659+
json: `{"arr": [{"level": "debug"}], "level": "info"}`,
660+
allowedLevelFields: map[string]struct{}{"level": {}},
661+
want: "info",
662+
},
663+
{
664+
name: "non-string values are ignored",
665+
json: `{"level": 123, "severity": "warn"}`,
666+
allowedLevelFields: map[string]struct{}{"level": {}, "severity": {}},
667+
want: "warn",
668+
},
669+
{
670+
name: "empty when no match",
671+
json: `{"foo": "bar"}`,
672+
allowedLevelFields: map[string]struct{}{"level": {}},
673+
want: "",
674+
},
675+
{
676+
name: "empty for invalid json",
677+
json: `{"foo": "bar"`,
678+
allowedLevelFields: map[string]struct{}{"level": {}},
679+
want: "",
680+
},
681+
{
682+
name: "custom field names",
683+
json: `{"custom_level": "error", "log_severity": "warn"}`,
684+
allowedLevelFields: map[string]struct{}{"custom_level": {}, "log_severity": {}},
685+
want: "error",
686+
},
687+
// Adding depth-specific test cases
688+
{
689+
name: "depth limited - only top level",
690+
json: `{"a": {"level": "debug"}, "level": "info"}`,
691+
allowedLevelFields: map[string]struct{}{"level": {}},
692+
maxDepth: 1,
693+
want: "info",
694+
},
695+
{
696+
name: "depth limited - no match",
697+
json: `{"a": {"level": "debug"}}`,
698+
allowedLevelFields: map[string]struct{}{"level": {}},
699+
maxDepth: 1,
700+
want: "",
701+
},
702+
}
703+
704+
for _, tt := range tests {
705+
t.Run(tt.name, func(t *testing.T) {
706+
got := getLevelUsingJSONParser([]byte(tt.json), tt.allowedLevelFields, tt.maxDepth)
707+
if string(got) != tt.want {
708+
t.Errorf("getLevelUsingJsonParser() = %v, want %v", string(got), tt.want)
709+
}
710+
})
711+
}
712+
}

‎pkg/distributor/limits.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ type Limits interface {
2626
DiscoverGenericFields(userID string) map[string][]string
2727
DiscoverLogLevels(userID string) bool
2828
LogLevelFields(userID string) []string
29+
LogLevelFromJSONMaxDepth(userID string) int
2930

3031
ShardStreams(userID string) shardstreams.Config
3132
IngestionRateStrategy() string

‎pkg/distributor/validator.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ type validationContext struct {
4848
discoverGenericFields map[string][]string
4949
discoverLogLevels bool
5050
logLevelFields []string
51+
logLevelFromJSONMaxDepth int
5152

5253
allowStructuredMetadata bool
5354
maxStructuredMetadataSize int
@@ -79,6 +80,7 @@ func (v Validator) getValidationContextForTime(now time.Time, userID string) val
7980
discoverServiceName: v.DiscoverServiceName(userID),
8081
discoverLogLevels: v.DiscoverLogLevels(userID),
8182
logLevelFields: v.LogLevelFields(userID),
83+
logLevelFromJSONMaxDepth: v.LogLevelFromJSONMaxDepth(userID),
8284
discoverGenericFields: v.DiscoverGenericFields(userID),
8385
allowStructuredMetadata: v.AllowStructuredMetadata(userID),
8486
maxStructuredMetadataSize: v.MaxStructuredMetadataSize(userID),

‎pkg/validation/limits.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,10 +86,11 @@ type Limits struct {
8686
IncrementDuplicateTimestamp bool `yaml:"increment_duplicate_timestamp" json:"increment_duplicate_timestamp"`
8787

8888
// Metadata field extraction
89-
DiscoverGenericFields FieldDetectorConfig `yaml:"discover_generic_fields" json:"discover_generic_fields" doc:"description=Experimental: Detect fields from stream labels, structured metadata, or json/logfmt formatted log line and put them into structured metadata of the log entry."`
90-
DiscoverServiceName []string `yaml:"discover_service_name" json:"discover_service_name"`
91-
DiscoverLogLevels bool `yaml:"discover_log_levels" json:"discover_log_levels"`
92-
LogLevelFields []string `yaml:"log_level_fields" json:"log_level_fields"`
89+
DiscoverGenericFields FieldDetectorConfig `yaml:"discover_generic_fields" json:"discover_generic_fields" doc:"description=Experimental: Detect fields from stream labels, structured metadata, or json/logfmt formatted log line and put them into structured metadata of the log entry."`
90+
DiscoverServiceName []string `yaml:"discover_service_name" json:"discover_service_name"`
91+
DiscoverLogLevels bool `yaml:"discover_log_levels" json:"discover_log_levels"`
92+
LogLevelFields []string `yaml:"log_level_fields" json:"log_level_fields"`
93+
LogLevelFromJSONMaxDepth int `yaml:"log_level_from_json_max_depth" json:"log_level_from_json_max_depth"`
9394

9495
// Ingester enforced limits.
9596
UseOwnedStreamCount bool `yaml:"use_owned_stream_count" json:"use_owned_stream_count"`
@@ -297,6 +298,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
297298
f.BoolVar(&l.DiscoverLogLevels, "validation.discover-log-levels", true, "Discover and add log levels during ingestion, if not present already. Levels would be added to Structured Metadata with name level/LEVEL/Level/Severity/severity/SEVERITY/lvl/LVL/Lvl (case-sensitive) and one of the values from 'trace', 'debug', 'info', 'warn', 'error', 'critical', 'fatal' (case insensitive).")
298299
l.LogLevelFields = []string{"level", "LEVEL", "Level", "Severity", "severity", "SEVERITY", "lvl", "LVL", "Lvl"}
299300
f.Var((*dskit_flagext.StringSlice)(&l.LogLevelFields), "validation.log-level-fields", "Field name to use for log levels. If not set, log level would be detected based on pre-defined labels as mentioned above.")
301+
f.IntVar(&l.LogLevelFromJSONMaxDepth, "validation.log-level-from-json-max-depth", 2, "Maximum depth to search for log level fields in JSON logs. A value of 0 or less means unlimited depth. Default is 2 which searches the first 2 levels of the JSON object.")
300302

301303
_ = l.RejectOldSamplesMaxAge.Set("7d")
302304
f.Var(&l.RejectOldSamplesMaxAge, "validation.reject-old-samples.max-age", "Maximum accepted sample age before rejecting.")
@@ -1016,6 +1018,10 @@ func (o *Overrides) LogLevelFields(userID string) []string {
10161018
return o.getOverridesForUser(userID).LogLevelFields
10171019
}
10181020

1021+
func (o *Overrides) LogLevelFromJSONMaxDepth(userID string) int {
1022+
return o.getOverridesForUser(userID).LogLevelFromJSONMaxDepth
1023+
}
1024+
10191025
// VolumeEnabled returns whether volume endpoints are enabled for a user.
10201026
func (o *Overrides) VolumeEnabled(userID string) bool {
10211027
return o.getOverridesForUser(userID).VolumeEnabled

0 commit comments

Comments
 (0)