Skip to content

Commit 0bff2e3

Browse files
feat: add dry-run mode for checking limits in distributors
This commit adds a dry-run mode for checking limits in distributors. It allows the distributors to call the ingest limits service and check if the request exceeds limits, without rejecting requests that are exceeding limits.
1 parent 9bdd5d1 commit 0bff2e3

File tree

2 files changed

+17
-7
lines changed

2 files changed

+17
-7
lines changed

‎docs/sources/shared/configuration.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2557,6 +2557,11 @@ otlp_config:
25572557
# CLI flag: -distributor.ingest-limits-enabled
25582558
[ingest_limits_enabled: <boolean> | default = false]
25592559
2560+
# Enable dry-run mode where limits are checked the ingest-limits service, but
2561+
# not enforced. Defaults to false.
2562+
# CLI flag: -distributor.ingest-limits-dry-run-enabled
2563+
[ingest_limits_dry_run_enabled: <boolean> | default = false]
2564+
25602565
tenant_topic:
25612566
# Enable the tenant topic tee, which writes logs to Kafka topics based on
25622567
# tenant IDs instead of using multitenant topics/partitions.

‎pkg/distributor/distributor.go

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -100,9 +100,10 @@ type Config struct {
100100

101101
OTLPConfig push.GlobalOTLPConfig `yaml:"otlp_config"`
102102

103-
KafkaEnabled bool `yaml:"kafka_writes_enabled"`
104-
IngesterEnabled bool `yaml:"ingester_writes_enabled"`
105-
IngestLimitsEnabled bool `yaml:"ingest_limits_enabled"`
103+
KafkaEnabled bool `yaml:"kafka_writes_enabled"`
104+
IngesterEnabled bool `yaml:"ingester_writes_enabled"`
105+
IngestLimitsEnabled bool `yaml:"ingest_limits_enabled"`
106+
IngestLimitsDryRunEnabled bool `yaml:"ingest_limits_dry_run_enabled"`
106107

107108
KafkaConfig kafka.Config `yaml:"-"`
108109

@@ -121,6 +122,7 @@ func (cfg *Config) RegisterFlags(fs *flag.FlagSet) {
121122
fs.BoolVar(&cfg.KafkaEnabled, "distributor.kafka-writes-enabled", false, "Enable writes to Kafka during Push requests.")
122123
fs.BoolVar(&cfg.IngesterEnabled, "distributor.ingester-writes-enabled", true, "Enable writes to Ingesters during Push requests. Defaults to true.")
123124
fs.BoolVar(&cfg.IngestLimitsEnabled, "distributor.ingest-limits-enabled", false, "Enable checking limits against the ingest-limits service. Defaults to false.")
125+
fs.BoolVar(&cfg.IngestLimitsDryRunEnabled, "distributor.ingest-limits-dry-run-enabled", false, "Enable dry-run mode where limits are checked the ingest-limits service, but not enforced. Defaults to false.")
124126
}
125127

126128
func (cfg *Config) Validate() error {
@@ -696,12 +698,15 @@ func (d *Distributor) PushWithResolver(ctx context.Context, req *logproto.PushRe
696698
}
697699

698700
if d.cfg.IngestLimitsEnabled {
699-
exceedsLimits, _, err := d.exceedsLimits(ctx, tenantID, streams, d.doExceedsLimitsRPC)
701+
exceedsLimits, reasons, err := d.exceedsLimits(ctx, tenantID, streams, d.doExceedsLimitsRPC)
700702
if err != nil {
701703
level.Error(d.logger).Log("msg", "failed to check if request exceeds limits, request has been accepted", "err", err)
702-
}
703-
if exceedsLimits {
704-
level.Info(d.logger).Log("msg", "request exceeded limits", "tenant", tenantID)
704+
} else if exceedsLimits {
705+
if d.cfg.IngestLimitsDryRunEnabled {
706+
level.Debug(d.logger).Log("msg", "request exceeded limits", "tenant", tenantID)
707+
} else {
708+
return nil, httpgrpc.Error(http.StatusBadRequest, strings.Join(reasons, ","))
709+
}
705710
}
706711
}
707712

0 commit comments

Comments
 (0)