Skip to content

feat: Mark truncated log lines with identifier #18262

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -3720,6 +3720,10 @@ The `limits_config` block configures global and per-tenant limits in Loki. The v
# CLI flag: -distributor.max-line-size-truncate
[max_line_size_truncate: <boolean> | default = false]

# Identifier that is added at the end of a truncated log line.
# CLI flag: -distributor.max-line-size-truncate-identifier
[max_line_size_truncate_identifier: <string> | default = ""]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternative naming proposal:

Suggested change
[max_line_size_truncate_identifier: <string> | default = ""]
[max_line_size_truncate_suffix: <string> | default = ""]

# Alter the log line timestamp during ingestion when the timestamp is the same
# as the previous entry for the same stream. When enabled, if a log line in a
# push request has the same timestamp as the previous line for the same stream,
Expand Down
11 changes: 9 additions & 2 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1110,13 +1110,20 @@ func (d *Distributor) truncateLines(vContext validationContext, stream *logproto
return
}

suffix := vContext.maxLineSizeTruncateIdentifier

var truncatedSamples, truncatedBytes int
for i, e := range stream.Entries {
if maxSize := vContext.maxLineSize; maxSize != 0 && len(e.Line) > maxSize {
stream.Entries[i].Line = e.Line[:maxSize]
truncateTo := maxSize - len(suffix)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a test? :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given this is hotpath, should we have the suffix logic in its own branch only when len(suffix) > 0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ofc!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if truncateTo <= 0 {
continue
}

stream.Entries[i].Line = e.Line[:truncateTo] + suffix
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i guess this also allocs new slice

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it creates a new string. There are benchmarks in the original truncation PR, which show that there is very little overhead, though.


truncatedSamples++
truncatedBytes += len(e.Line) - maxSize
truncatedBytes += len(e.Line) - truncateTo
}
}

Expand Down
14 changes: 14 additions & 0 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,20 @@ func Test_TruncateLogLines(t *testing.T) {
topVal := ingester.Peek()
require.Len(t, topVal.Streams[0].Entries[0].Line, 5)
})

t.Run("it truncates lines and adds suffix if configured", func(t *testing.T) {
limits, ingester := setup()
limits.MaxLineSize = 8
limits.MaxLineSizeTruncateIdentifier = "[...]"

distributors, _ := prepare(t, 1, 5, limits, func(_ string) (ring_client.PoolClient, error) { return ingester, nil })

_, err := distributors[0].Push(ctx, makeWriteRequest(1, 10))
require.NoError(t, err)
topVal := ingester.Peek()
require.Len(t, topVal.Streams[0].Entries[0].Line, int(limits.MaxLineSize))
require.Equal(t, "000[...]", topVal.Streams[0].Entries[0].Line)
})
}

func Test_DiscardEmptyStreamsAfterValidation(t *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/distributor/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ type Limits interface {
retention.Limits
MaxLineSize(userID string) int
MaxLineSizeTruncate(userID string) bool
MaxLineSizeTruncateIdentifier(userID string) string

MaxLabelNamesPerSeries(userID string) int
MaxLabelNameLength(userID string) int
MaxLabelValueLength(userID string) int
Expand Down
50 changes: 26 additions & 24 deletions pkg/distributor/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ type validationContext struct {
rejectOldSampleMaxAge int64
creationGracePeriod int64

maxLineSize int
maxLineSizeTruncate bool
maxLineSize int
maxLineSizeTruncate bool
maxLineSizeTruncateIdentifier string

maxLabelNamesPerSeries int
maxLabelNameLength int
Expand Down Expand Up @@ -68,28 +69,29 @@ func (v Validator) getValidationContextForTime(now time.Time, userID string) val
retentionHours := util.RetentionHours(v.RetentionPeriod(userID))

return validationContext{
userID: userID,
rejectOldSample: v.RejectOldSamples(userID),
rejectOldSampleMaxAge: now.Add(-v.RejectOldSamplesMaxAge(userID)).UnixNano(),
creationGracePeriod: now.Add(v.CreationGracePeriod(userID)).UnixNano(),
maxLineSize: v.MaxLineSize(userID),
maxLineSizeTruncate: v.MaxLineSizeTruncate(userID),
maxLabelNamesPerSeries: v.MaxLabelNamesPerSeries(userID),
maxLabelNameLength: v.MaxLabelNameLength(userID),
maxLabelValueLength: v.MaxLabelValueLength(userID),
incrementDuplicateTimestamps: v.IncrementDuplicateTimestamps(userID),
discoverServiceName: v.DiscoverServiceName(userID),
discoverLogLevels: v.DiscoverLogLevels(userID),
logLevelFields: v.LogLevelFields(userID),
logLevelFromJSONMaxDepth: v.LogLevelFromJSONMaxDepth(userID),
discoverGenericFields: v.DiscoverGenericFields(userID),
allowStructuredMetadata: v.AllowStructuredMetadata(userID),
maxStructuredMetadataSize: v.MaxStructuredMetadataSize(userID),
maxStructuredMetadataCount: v.MaxStructuredMetadataCount(userID),
blockIngestionUntil: v.BlockIngestionUntil(userID),
blockIngestionStatusCode: v.BlockIngestionStatusCode(userID),
enforcedLabels: v.EnforcedLabels(userID),
validationMetrics: newValidationMetrics(retentionHours),
userID: userID,
rejectOldSample: v.RejectOldSamples(userID),
rejectOldSampleMaxAge: now.Add(-v.RejectOldSamplesMaxAge(userID)).UnixNano(),
creationGracePeriod: now.Add(v.CreationGracePeriod(userID)).UnixNano(),
maxLineSize: v.MaxLineSize(userID),
maxLineSizeTruncate: v.MaxLineSizeTruncate(userID),
maxLineSizeTruncateIdentifier: v.MaxLineSizeTruncateIdentifier(userID),
maxLabelNamesPerSeries: v.MaxLabelNamesPerSeries(userID),
maxLabelNameLength: v.MaxLabelNameLength(userID),
maxLabelValueLength: v.MaxLabelValueLength(userID),
incrementDuplicateTimestamps: v.IncrementDuplicateTimestamps(userID),
discoverServiceName: v.DiscoverServiceName(userID),
discoverLogLevels: v.DiscoverLogLevels(userID),
logLevelFields: v.LogLevelFields(userID),
logLevelFromJSONMaxDepth: v.LogLevelFromJSONMaxDepth(userID),
discoverGenericFields: v.DiscoverGenericFields(userID),
allowStructuredMetadata: v.AllowStructuredMetadata(userID),
maxStructuredMetadataSize: v.MaxStructuredMetadataSize(userID),
maxStructuredMetadataCount: v.MaxStructuredMetadataCount(userID),
blockIngestionUntil: v.BlockIngestionUntil(userID),
blockIngestionStatusCode: v.BlockIngestionStatusCode(userID),
enforcedLabels: v.EnforcedLabels(userID),
validationMetrics: newValidationMetrics(retentionHours),
}
}

Expand Down
33 changes: 20 additions & 13 deletions pkg/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,19 +72,20 @@ const (
// to support user-friendly duration format (e.g: "1h30m45s") in JSON value.
type Limits struct {
// Distributor enforced limits.
IngestionRateStrategy string `yaml:"ingestion_rate_strategy" json:"ingestion_rate_strategy"`
IngestionRateMB float64 `yaml:"ingestion_rate_mb" json:"ingestion_rate_mb"`
IngestionBurstSizeMB float64 `yaml:"ingestion_burst_size_mb" json:"ingestion_burst_size_mb"`
MaxLabelNameLength int `yaml:"max_label_name_length" json:"max_label_name_length"`
MaxLabelValueLength int `yaml:"max_label_value_length" json:"max_label_value_length"`
MaxLabelNamesPerSeries int `yaml:"max_label_names_per_series" json:"max_label_names_per_series"`
RejectOldSamples bool `yaml:"reject_old_samples" json:"reject_old_samples"`
RejectOldSamplesMaxAge model.Duration `yaml:"reject_old_samples_max_age" json:"reject_old_samples_max_age"`
CreationGracePeriod model.Duration `yaml:"creation_grace_period" json:"creation_grace_period"`
MaxLineSize flagext.ByteSize `yaml:"max_line_size" json:"max_line_size"`
MaxLineSizeTruncate bool `yaml:"max_line_size_truncate" json:"max_line_size_truncate"`
IncrementDuplicateTimestamp bool `yaml:"increment_duplicate_timestamp" json:"increment_duplicate_timestamp"`
SimulatedPushLatency time.Duration `yaml:"simulated_push_latency" json:"simulated_push_latency" doc:"description=Simulated latency to add to push requests. Used for testing. Set to 0s to disable."`
IngestionRateStrategy string `yaml:"ingestion_rate_strategy" json:"ingestion_rate_strategy"`
IngestionRateMB float64 `yaml:"ingestion_rate_mb" json:"ingestion_rate_mb"`
IngestionBurstSizeMB float64 `yaml:"ingestion_burst_size_mb" json:"ingestion_burst_size_mb"`
MaxLabelNameLength int `yaml:"max_label_name_length" json:"max_label_name_length"`
MaxLabelValueLength int `yaml:"max_label_value_length" json:"max_label_value_length"`
MaxLabelNamesPerSeries int `yaml:"max_label_names_per_series" json:"max_label_names_per_series"`
RejectOldSamples bool `yaml:"reject_old_samples" json:"reject_old_samples"`
RejectOldSamplesMaxAge model.Duration `yaml:"reject_old_samples_max_age" json:"reject_old_samples_max_age"`
CreationGracePeriod model.Duration `yaml:"creation_grace_period" json:"creation_grace_period"`
MaxLineSize flagext.ByteSize `yaml:"max_line_size" json:"max_line_size"`
MaxLineSizeTruncate bool `yaml:"max_line_size_truncate" json:"max_line_size_truncate"`
MaxLineSizeTruncateIdentifier string `yaml:"max_line_size_truncate_identifier" json:"max_line_size_truncate_identifier"`
IncrementDuplicateTimestamp bool `yaml:"increment_duplicate_timestamp" json:"increment_duplicate_timestamp"`
SimulatedPushLatency time.Duration `yaml:"simulated_push_latency" json:"simulated_push_latency" doc:"description=Simulated latency to add to push requests. Used for testing. Set to 0s to disable."`

// LogQL engine options
EnableMultiVariantQueries bool `yaml:"enable_multi_variant_queries" json:"enable_multi_variant_queries"`
Expand Down Expand Up @@ -283,6 +284,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
_ = l.MaxLineSize.Set("256KB")
f.Var(&l.MaxLineSize, "distributor.max-line-size", "Maximum line size on ingestion path. Example: 256kb. Any log line exceeding this limit will be discarded unless `distributor.max-line-size-truncate` is set which in case it is truncated instead of discarding it completely. There is no limit when unset or set to 0.")
f.BoolVar(&l.MaxLineSizeTruncate, "distributor.max-line-size-truncate", false, "Whether to truncate lines that exceed max_line_size.")
f.StringVar(&l.MaxLineSizeTruncateIdentifier, "distributor.max-line-size-truncate-identifier", "", "Identifier that is added at the end of a truncated log line.")
f.IntVar(&l.MaxLabelNameLength, "validation.max-length-label-name", 1024, "Maximum length accepted for label names.")
f.IntVar(&l.MaxLabelValueLength, "validation.max-length-label-value", 2048, "Maximum length accepted for label value. This setting also applies to the metric name.")
f.IntVar(&l.MaxLabelNamesPerSeries, "validation.max-label-names-per-series", 15, "Maximum number of label names per series.")
Expand Down Expand Up @@ -826,6 +828,11 @@ func (o *Overrides) MaxLineSizeTruncate(userID string) bool {
return o.getOverridesForUser(userID).MaxLineSizeTruncate
}

// MaxLineSizeTruncateIdentifier returns whether lines longer than max should be truncated.
func (o *Overrides) MaxLineSizeTruncateIdentifier(userID string) string {
return o.getOverridesForUser(userID).MaxLineSizeTruncateIdentifier
}

// MaxEntriesLimitPerQuery returns the limit to number of entries the querier should return per query.
func (o *Overrides) MaxEntriesLimitPerQuery(_ context.Context, userID string) int {
return o.getOverridesForUser(userID).MaxEntriesLimitPerQuery
Expand Down
Loading