Skip to content

Commit 7119f6e

Browse files
authored
feat(distributor): Add simulated latency (#16733)
1 parent 9bdd5d1 commit 7119f6e

File tree

4 files changed

+32
-0
lines changed

4 files changed

+32
-0
lines changed

‎docs/sources/shared/configuration.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3491,6 +3491,11 @@ The `limits_config` block configures global and per-tenant limits in Loki. The v
34913491
# CLI flag: -validation.increment-duplicate-timestamps
34923492
[increment_duplicate_timestamp: <boolean> | default = false]
34933493

3494+
# Simulated latency to add to push requests. Used for testing. Set to 0s to
3495+
# disable.
3496+
# CLI flag: -limits.simulated-push-latency
3497+
[simulated_push_latency: <duration> | default = 0s]
3498+
34943499
# Experimental: Detect fields from stream labels, structured metadata, or
34953500
# json/logfmt formatted log line and put them into structured metadata of the
34963501
# log entry.

‎pkg/distributor/distributor.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -485,11 +485,30 @@ func (p *pushTracker) doneWithResult(err error) {
485485
}
486486
}
487487

488+
func (d *Distributor) waitSimulatedLatency(ctx context.Context, tenantID string, start time.Time) {
489+
latency := d.validator.Limits.SimulatedPushLatency(tenantID)
490+
if latency > 0 {
491+
// All requests must wait at least the simulated latency. However,
492+
// we want to avoid adding additional latency on top of slow requests
493+
// that already took longer then the simulated latency.
494+
wait := latency - time.Since(start)
495+
if wait > 0 {
496+
select {
497+
case <-time.After(wait):
498+
case <-ctx.Done():
499+
return // The client canceled the request.
500+
}
501+
}
502+
}
503+
}
504+
488505
func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*logproto.PushResponse, error) {
489506
tenantID, err := tenant.TenantID(ctx)
490507
if err != nil {
491508
return nil, err
492509
}
510+
start := time.Now()
511+
defer d.waitSimulatedLatency(ctx, tenantID, start)
493512
return d.PushWithResolver(ctx, req, newRequestScopedStreamResolver(tenantID, d.validator.Limits, d.logger))
494513
}
495514

‎pkg/distributor/limits.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,4 +44,6 @@ type Limits interface {
4444
PolicyEnforcedLabels(userID string, policy string) []string
4545

4646
IngestionPartitionsTenantShardSize(userID string) int
47+
48+
SimulatedPushLatency(userID string) time.Duration
4749
}

‎pkg/validation/limits.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ type Limits struct {
8484
MaxLineSize flagext.ByteSize `yaml:"max_line_size" json:"max_line_size"`
8585
MaxLineSizeTruncate bool `yaml:"max_line_size_truncate" json:"max_line_size_truncate"`
8686
IncrementDuplicateTimestamp bool `yaml:"increment_duplicate_timestamp" json:"increment_duplicate_timestamp"`
87+
SimulatedPushLatency time.Duration `yaml:"simulated_push_latency" json:"simulated_push_latency" doc:"description=Simulated latency to add to push requests. Used for testing. Set to 0s to disable."`
8788

8889
// Metadata field extraction
8990
DiscoverGenericFields FieldDetectorConfig `yaml:"discover_generic_fields" json:"discover_generic_fields" doc:"description=Experimental: Detect fields from stream labels, structured metadata, or json/logfmt formatted log line and put them into structured metadata of the log entry."`
@@ -463,6 +464,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
463464
false,
464465
"Enable metric aggregation. When enabled, pushed streams will be sampled for bytes and count, and these metric will be written back into Loki as a special __aggregated_metric__ stream, which can be queried for faster histogram queries.",
465466
)
467+
f.DurationVar(&l.SimulatedPushLatency, "limits.simulated-push-latency", 0, "Simulated latency to add to push requests. This is used to test the performance of the write path under different latency conditions.")
466468
}
467469

468470
// SetGlobalOTLPConfig set GlobalOTLPConfig which is used while unmarshaling per-tenant otlp config to use the default list of resource attributes picked as index labels.
@@ -1267,3 +1269,7 @@ func (sm *OverwriteMarshalingStringMap) UnmarshalYAML(unmarshal func(interface{}
12671269

12681270
return nil
12691271
}
1272+
1273+
func (o *Overrides) SimulatedPushLatency(userID string) time.Duration {
1274+
return o.getOverridesForUser(userID).SimulatedPushLatency
1275+
}

0 commit comments

Comments
 (0)