Skip to content

feat(distributor): Add simulated latency #16733

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Mar 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -3477,6 +3477,11 @@ The `limits_config` block configures global and per-tenant limits in Loki. The v
# CLI flag: -validation.increment-duplicate-timestamps
[increment_duplicate_timestamp: <boolean> | default = false]

# Simulated latency to add to push requests. Used for testing. Set to 0s to
# disable.
# CLI flag: -limits.simulated-push-latency
[simulated_push_latency: <duration> | default = 0s]

# Experimental: Detect fields from stream labels, structured metadata, or
# json/logfmt formatted log line and put them into structured metadata of the
# log entry.
Expand Down
19 changes: 19 additions & 0 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,11 +485,30 @@ func (p *pushTracker) doneWithResult(err error) {
}
}

func (d *Distributor) waitSimulatedLatency(ctx context.Context, tenantID string, start time.Time) {
latency := d.validator.Limits.SimulatedPushLatency(tenantID)
if latency > 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this line can be removed as the condition is tested again when we check if wait > 0. For example, when latency is 0, then wait is negative.

However, I suppose someone could set latency to a negative number though? Is that a concern?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see, you are using 0 as a special value to disable the feature.

// All requests must wait at least the simulated latency. However,
// we want to avoid adding additional latency on top of slow requests
// that already took longer then the simulated latency.
wait := latency - time.Since(start)
if wait > 0 {
select {
case <-time.After(wait):
case <-ctx.Done():
return // The client canceled the request.
}
}
}
}

func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*logproto.PushResponse, error) {
tenantID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
}
start := time.Now()
defer d.waitSimulatedLatency(ctx, tenantID, start)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw it's OK to use time.Now() as the argument to a deferred call. Go evaluates arguments for deferred functions when the defer is pushed onto the call stack, not when it is executed.

Suggested change
defer d.waitSimulatedLatency(ctx, tenantID, start)
defer d.waitSimulatedLatency(ctx, tenantID, time.Now())
return d.PushWithResolver(ctx, req, newRequestScopedStreamResolver(tenantID, d.validator.Limits, d.logger))
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/distributor/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,6 @@ type Limits interface {
PolicyEnforcedLabels(userID string, policy string) []string

IngestionPartitionsTenantShardSize(userID string) int

SimulatedPushLatency(userID string) time.Duration
}
6 changes: 6 additions & 0 deletions pkg/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ type Limits struct {
MaxLineSize flagext.ByteSize `yaml:"max_line_size" json:"max_line_size"`
MaxLineSizeTruncate bool `yaml:"max_line_size_truncate" json:"max_line_size_truncate"`
IncrementDuplicateTimestamp bool `yaml:"increment_duplicate_timestamp" json:"increment_duplicate_timestamp"`
SimulatedPushLatency time.Duration `yaml:"simulated_push_latency" json:"simulated_push_latency" doc:"description=Simulated latency to add to push requests. Used for testing. Set to 0s to disable."`

// Metadata field extraction
DiscoverGenericFields FieldDetectorConfig `yaml:"discover_generic_fields" json:"discover_generic_fields" doc:"description=Experimental: Detect fields from stream labels, structured metadata, or json/logfmt formatted log line and put them into structured metadata of the log entry."`
Expand Down Expand Up @@ -463,6 +464,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
false,
"Enable metric aggregation. When enabled, pushed streams will be sampled for bytes and count, and these metric will be written back into Loki as a special __aggregated_metric__ stream, which can be queried for faster histogram queries.",
)
f.DurationVar(&l.SimulatedPushLatency, "limits.simulated-push-latency", 0, "Simulated latency to add to push requests. This is used to test the performance of the write path under different latency conditions.")
}

// SetGlobalOTLPConfig set GlobalOTLPConfig which is used while unmarshaling per-tenant otlp config to use the default list of resource attributes picked as index labels.
Expand Down Expand Up @@ -1267,3 +1269,7 @@ func (sm *OverwriteMarshalingStringMap) UnmarshalYAML(unmarshal func(interface{}

return nil
}

func (o *Overrides) SimulatedPushLatency(userID string) time.Duration {
return o.getOverridesForUser(userID).SimulatedPushLatency
}
Loading