Skip to content

Commit dd12872

Browse files
feat: Mark truncated log lines with identifier (backport k261) (#18279)
Signed-off-by: Christian Haudum <christian.haudum@gmail.com> Co-authored-by: Christian Haudum <christian.haudum@gmail.com>
1 parent 2adce4f commit dd12872

File tree

6 files changed

+75
-39
lines changed

6 files changed

+75
-39
lines changed

‎docs/sources/shared/configuration.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3720,6 +3720,10 @@ The `limits_config` block configures global and per-tenant limits in Loki. The v
37203720
# CLI flag: -distributor.max-line-size-truncate
37213721
[max_line_size_truncate: <boolean> | default = false]
37223722

3723+
# Identifier that is added at the end of a truncated log line.
3724+
# CLI flag: -distributor.max-line-size-truncate-identifier
3725+
[max_line_size_truncate_identifier: <string> | default = ""]
3726+
37233727
# Alter the log line timestamp during ingestion when the timestamp is the same
37243728
# as the previous entry for the same stream. When enabled, if a log line in a
37253729
# push request has the same timestamp as the previous line for the same stream,

‎pkg/distributor/distributor.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1110,13 +1110,20 @@ func (d *Distributor) truncateLines(vContext validationContext, stream *logproto
11101110
return
11111111
}
11121112

1113+
suffix := vContext.maxLineSizeTruncateIdentifier
1114+
11131115
var truncatedSamples, truncatedBytes int
11141116
for i, e := range stream.Entries {
11151117
if maxSize := vContext.maxLineSize; maxSize != 0 && len(e.Line) > maxSize {
1116-
stream.Entries[i].Line = e.Line[:maxSize]
1118+
truncateTo := maxSize - len(suffix)
1119+
if truncateTo <= 0 {
1120+
continue
1121+
}
1122+
1123+
stream.Entries[i].Line = e.Line[:truncateTo] + suffix
11171124

11181125
truncatedSamples++
1119-
truncatedBytes += len(e.Line) - maxSize
1126+
truncatedBytes += len(e.Line) - truncateTo
11201127
}
11211128
}
11221129

‎pkg/distributor/distributor_test.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -725,6 +725,20 @@ func Test_TruncateLogLines(t *testing.T) {
725725
topVal := ingester.Peek()
726726
require.Len(t, topVal.Streams[0].Entries[0].Line, 5)
727727
})
728+
729+
t.Run("it truncates lines and adds suffix if configured", func(t *testing.T) {
730+
limits, ingester := setup()
731+
limits.MaxLineSize = 8
732+
limits.MaxLineSizeTruncateIdentifier = "[...]"
733+
734+
distributors, _ := prepare(t, 1, 5, limits, func(_ string) (ring_client.PoolClient, error) { return ingester, nil })
735+
736+
_, err := distributors[0].Push(ctx, makeWriteRequest(1, 10))
737+
require.NoError(t, err)
738+
topVal := ingester.Peek()
739+
require.Len(t, topVal.Streams[0].Entries[0].Line, int(limits.MaxLineSize))
740+
require.Equal(t, "000[...]", topVal.Streams[0].Entries[0].Line)
741+
})
728742
}
729743

730744
func Test_DiscardEmptyStreamsAfterValidation(t *testing.T) {

‎pkg/distributor/limits.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ type Limits interface {
1313
retention.Limits
1414
MaxLineSize(userID string) int
1515
MaxLineSizeTruncate(userID string) bool
16+
MaxLineSizeTruncateIdentifier(userID string) string
17+
1618
MaxLabelNamesPerSeries(userID string) int
1719
MaxLabelNameLength(userID string) int
1820
MaxLabelValueLength(userID string) int

‎pkg/distributor/validator.go

Lines changed: 26 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,9 @@ type validationContext struct {
3737
rejectOldSampleMaxAge int64
3838
creationGracePeriod int64
3939

40-
maxLineSize int
41-
maxLineSizeTruncate bool
40+
maxLineSize int
41+
maxLineSizeTruncate bool
42+
maxLineSizeTruncateIdentifier string
4243

4344
maxLabelNamesPerSeries int
4445
maxLabelNameLength int
@@ -68,28 +69,29 @@ func (v Validator) getValidationContextForTime(now time.Time, userID string) val
6869
retentionHours := util.RetentionHours(v.RetentionPeriod(userID))
6970

7071
return validationContext{
71-
userID: userID,
72-
rejectOldSample: v.RejectOldSamples(userID),
73-
rejectOldSampleMaxAge: now.Add(-v.RejectOldSamplesMaxAge(userID)).UnixNano(),
74-
creationGracePeriod: now.Add(v.CreationGracePeriod(userID)).UnixNano(),
75-
maxLineSize: v.MaxLineSize(userID),
76-
maxLineSizeTruncate: v.MaxLineSizeTruncate(userID),
77-
maxLabelNamesPerSeries: v.MaxLabelNamesPerSeries(userID),
78-
maxLabelNameLength: v.MaxLabelNameLength(userID),
79-
maxLabelValueLength: v.MaxLabelValueLength(userID),
80-
incrementDuplicateTimestamps: v.IncrementDuplicateTimestamps(userID),
81-
discoverServiceName: v.DiscoverServiceName(userID),
82-
discoverLogLevels: v.DiscoverLogLevels(userID),
83-
logLevelFields: v.LogLevelFields(userID),
84-
logLevelFromJSONMaxDepth: v.LogLevelFromJSONMaxDepth(userID),
85-
discoverGenericFields: v.DiscoverGenericFields(userID),
86-
allowStructuredMetadata: v.AllowStructuredMetadata(userID),
87-
maxStructuredMetadataSize: v.MaxStructuredMetadataSize(userID),
88-
maxStructuredMetadataCount: v.MaxStructuredMetadataCount(userID),
89-
blockIngestionUntil: v.BlockIngestionUntil(userID),
90-
blockIngestionStatusCode: v.BlockIngestionStatusCode(userID),
91-
enforcedLabels: v.EnforcedLabels(userID),
92-
validationMetrics: newValidationMetrics(retentionHours),
72+
userID: userID,
73+
rejectOldSample: v.RejectOldSamples(userID),
74+
rejectOldSampleMaxAge: now.Add(-v.RejectOldSamplesMaxAge(userID)).UnixNano(),
75+
creationGracePeriod: now.Add(v.CreationGracePeriod(userID)).UnixNano(),
76+
maxLineSize: v.MaxLineSize(userID),
77+
maxLineSizeTruncate: v.MaxLineSizeTruncate(userID),
78+
maxLineSizeTruncateIdentifier: v.MaxLineSizeTruncateIdentifier(userID),
79+
maxLabelNamesPerSeries: v.MaxLabelNamesPerSeries(userID),
80+
maxLabelNameLength: v.MaxLabelNameLength(userID),
81+
maxLabelValueLength: v.MaxLabelValueLength(userID),
82+
incrementDuplicateTimestamps: v.IncrementDuplicateTimestamps(userID),
83+
discoverServiceName: v.DiscoverServiceName(userID),
84+
discoverLogLevels: v.DiscoverLogLevels(userID),
85+
logLevelFields: v.LogLevelFields(userID),
86+
logLevelFromJSONMaxDepth: v.LogLevelFromJSONMaxDepth(userID),
87+
discoverGenericFields: v.DiscoverGenericFields(userID),
88+
allowStructuredMetadata: v.AllowStructuredMetadata(userID),
89+
maxStructuredMetadataSize: v.MaxStructuredMetadataSize(userID),
90+
maxStructuredMetadataCount: v.MaxStructuredMetadataCount(userID),
91+
blockIngestionUntil: v.BlockIngestionUntil(userID),
92+
blockIngestionStatusCode: v.BlockIngestionStatusCode(userID),
93+
enforcedLabels: v.EnforcedLabels(userID),
94+
validationMetrics: newValidationMetrics(retentionHours),
9395
}
9496
}
9597

‎pkg/validation/limits.go

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -72,19 +72,20 @@ const (
7272
// to support user-friendly duration format (e.g: "1h30m45s") in JSON value.
7373
type Limits struct {
7474
// Distributor enforced limits.
75-
IngestionRateStrategy string `yaml:"ingestion_rate_strategy" json:"ingestion_rate_strategy"`
76-
IngestionRateMB float64 `yaml:"ingestion_rate_mb" json:"ingestion_rate_mb"`
77-
IngestionBurstSizeMB float64 `yaml:"ingestion_burst_size_mb" json:"ingestion_burst_size_mb"`
78-
MaxLabelNameLength int `yaml:"max_label_name_length" json:"max_label_name_length"`
79-
MaxLabelValueLength int `yaml:"max_label_value_length" json:"max_label_value_length"`
80-
MaxLabelNamesPerSeries int `yaml:"max_label_names_per_series" json:"max_label_names_per_series"`
81-
RejectOldSamples bool `yaml:"reject_old_samples" json:"reject_old_samples"`
82-
RejectOldSamplesMaxAge model.Duration `yaml:"reject_old_samples_max_age" json:"reject_old_samples_max_age"`
83-
CreationGracePeriod model.Duration `yaml:"creation_grace_period" json:"creation_grace_period"`
84-
MaxLineSize flagext.ByteSize `yaml:"max_line_size" json:"max_line_size"`
85-
MaxLineSizeTruncate bool `yaml:"max_line_size_truncate" json:"max_line_size_truncate"`
86-
IncrementDuplicateTimestamp bool `yaml:"increment_duplicate_timestamp" json:"increment_duplicate_timestamp"`
87-
SimulatedPushLatency time.Duration `yaml:"simulated_push_latency" json:"simulated_push_latency" doc:"description=Simulated latency to add to push requests. Used for testing. Set to 0s to disable."`
75+
IngestionRateStrategy string `yaml:"ingestion_rate_strategy" json:"ingestion_rate_strategy"`
76+
IngestionRateMB float64 `yaml:"ingestion_rate_mb" json:"ingestion_rate_mb"`
77+
IngestionBurstSizeMB float64 `yaml:"ingestion_burst_size_mb" json:"ingestion_burst_size_mb"`
78+
MaxLabelNameLength int `yaml:"max_label_name_length" json:"max_label_name_length"`
79+
MaxLabelValueLength int `yaml:"max_label_value_length" json:"max_label_value_length"`
80+
MaxLabelNamesPerSeries int `yaml:"max_label_names_per_series" json:"max_label_names_per_series"`
81+
RejectOldSamples bool `yaml:"reject_old_samples" json:"reject_old_samples"`
82+
RejectOldSamplesMaxAge model.Duration `yaml:"reject_old_samples_max_age" json:"reject_old_samples_max_age"`
83+
CreationGracePeriod model.Duration `yaml:"creation_grace_period" json:"creation_grace_period"`
84+
MaxLineSize flagext.ByteSize `yaml:"max_line_size" json:"max_line_size"`
85+
MaxLineSizeTruncate bool `yaml:"max_line_size_truncate" json:"max_line_size_truncate"`
86+
MaxLineSizeTruncateIdentifier string `yaml:"max_line_size_truncate_identifier" json:"max_line_size_truncate_identifier"`
87+
IncrementDuplicateTimestamp bool `yaml:"increment_duplicate_timestamp" json:"increment_duplicate_timestamp"`
88+
SimulatedPushLatency time.Duration `yaml:"simulated_push_latency" json:"simulated_push_latency" doc:"description=Simulated latency to add to push requests. Used for testing. Set to 0s to disable."`
8889

8990
// LogQL engine options
9091
EnableMultiVariantQueries bool `yaml:"enable_multi_variant_queries" json:"enable_multi_variant_queries"`
@@ -283,6 +284,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
283284
_ = l.MaxLineSize.Set("256KB")
284285
f.Var(&l.MaxLineSize, "distributor.max-line-size", "Maximum line size on ingestion path. Example: 256kb. Any log line exceeding this limit will be discarded unless `distributor.max-line-size-truncate` is set which in case it is truncated instead of discarding it completely. There is no limit when unset or set to 0.")
285286
f.BoolVar(&l.MaxLineSizeTruncate, "distributor.max-line-size-truncate", false, "Whether to truncate lines that exceed max_line_size.")
287+
f.StringVar(&l.MaxLineSizeTruncateIdentifier, "distributor.max-line-size-truncate-identifier", "", "Identifier that is added at the end of a truncated log line.")
286288
f.IntVar(&l.MaxLabelNameLength, "validation.max-length-label-name", 1024, "Maximum length accepted for label names.")
287289
f.IntVar(&l.MaxLabelValueLength, "validation.max-length-label-value", 2048, "Maximum length accepted for label value. This setting also applies to the metric name.")
288290
f.IntVar(&l.MaxLabelNamesPerSeries, "validation.max-label-names-per-series", 15, "Maximum number of label names per series.")
@@ -826,6 +828,11 @@ func (o *Overrides) MaxLineSizeTruncate(userID string) bool {
826828
return o.getOverridesForUser(userID).MaxLineSizeTruncate
827829
}
828830

831+
// MaxLineSizeTruncateIdentifier returns whether lines longer than max should be truncated.
832+
func (o *Overrides) MaxLineSizeTruncateIdentifier(userID string) string {
833+
return o.getOverridesForUser(userID).MaxLineSizeTruncateIdentifier
834+
}
835+
829836
// MaxEntriesLimitPerQuery returns the limit to number of entries the querier should return per query.
830837
func (o *Overrides) MaxEntriesLimitPerQuery(_ context.Context, userID string) int {
831838
return o.getOverridesForUser(userID).MaxEntriesLimitPerQuery

0 commit comments

Comments
 (0)