Skip to content

Commit d6c499b

Browse files
feat: add dry-run mode for checking limits in distributors (#16754)
1 parent 7119f6e commit d6c499b

File tree

2 files changed

+17
-7
lines changed

2 files changed

+17
-7
lines changed

‎docs/sources/shared/configuration.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2557,6 +2557,11 @@ otlp_config:
25572557
# CLI flag: -distributor.ingest-limits-enabled
25582558
[ingest_limits_enabled: <boolean> | default = false]
25592559
2560+
# Enable dry-run mode where limits are checked the ingest-limits service, but
2561+
# not enforced. Defaults to false.
2562+
# CLI flag: -distributor.ingest-limits-dry-run-enabled
2563+
[ingest_limits_dry_run_enabled: <boolean> | default = false]
2564+
25602565
tenant_topic:
25612566
# Enable the tenant topic tee, which writes logs to Kafka topics based on
25622567
# tenant IDs instead of using multitenant topics/partitions.

‎pkg/distributor/distributor.go

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -100,9 +100,10 @@ type Config struct {
100100

101101
OTLPConfig push.GlobalOTLPConfig `yaml:"otlp_config"`
102102

103-
KafkaEnabled bool `yaml:"kafka_writes_enabled"`
104-
IngesterEnabled bool `yaml:"ingester_writes_enabled"`
105-
IngestLimitsEnabled bool `yaml:"ingest_limits_enabled"`
103+
KafkaEnabled bool `yaml:"kafka_writes_enabled"`
104+
IngesterEnabled bool `yaml:"ingester_writes_enabled"`
105+
IngestLimitsEnabled bool `yaml:"ingest_limits_enabled"`
106+
IngestLimitsDryRunEnabled bool `yaml:"ingest_limits_dry_run_enabled"`
106107

107108
KafkaConfig kafka.Config `yaml:"-"`
108109

@@ -121,6 +122,7 @@ func (cfg *Config) RegisterFlags(fs *flag.FlagSet) {
121122
fs.BoolVar(&cfg.KafkaEnabled, "distributor.kafka-writes-enabled", false, "Enable writes to Kafka during Push requests.")
122123
fs.BoolVar(&cfg.IngesterEnabled, "distributor.ingester-writes-enabled", true, "Enable writes to Ingesters during Push requests. Defaults to true.")
123124
fs.BoolVar(&cfg.IngestLimitsEnabled, "distributor.ingest-limits-enabled", false, "Enable checking limits against the ingest-limits service. Defaults to false.")
125+
fs.BoolVar(&cfg.IngestLimitsDryRunEnabled, "distributor.ingest-limits-dry-run-enabled", false, "Enable dry-run mode where limits are checked the ingest-limits service, but not enforced. Defaults to false.")
124126
}
125127

126128
func (cfg *Config) Validate() error {
@@ -715,12 +717,15 @@ func (d *Distributor) PushWithResolver(ctx context.Context, req *logproto.PushRe
715717
}
716718

717719
if d.cfg.IngestLimitsEnabled {
718-
exceedsLimits, _, err := d.exceedsLimits(ctx, tenantID, streams, d.doExceedsLimitsRPC)
720+
exceedsLimits, reasons, err := d.exceedsLimits(ctx, tenantID, streams, d.doExceedsLimitsRPC)
719721
if err != nil {
720722
level.Error(d.logger).Log("msg", "failed to check if request exceeds limits, request has been accepted", "err", err)
721-
}
722-
if exceedsLimits {
723-
level.Info(d.logger).Log("msg", "request exceeded limits", "tenant", tenantID)
723+
} else if exceedsLimits {
724+
if d.cfg.IngestLimitsDryRunEnabled {
725+
level.Debug(d.logger).Log("msg", "request exceeded limits", "tenant", tenantID)
726+
} else {
727+
return nil, httpgrpc.Error(http.StatusBadRequest, strings.Join(reasons, ","))
728+
}
724729
}
725730
}
726731

0 commit comments

Comments
 (0)