Skip to content

Commit c258419

Browse files
authored
feat(kafka): tenant topics (#15977)
1 parent f163e44 commit c258419

File tree

6 files changed

+449
-16
lines changed

6 files changed

+449
-16
lines changed

‎docs/sources/shared/configuration.md

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2368,6 +2368,28 @@ otlp_config:
23682368
# Enable writes to Ingesters during Push requests. Defaults to true.
23692369
# CLI flag: -distributor.ingester-writes-enabled
23702370
[ingester_writes_enabled: <boolean> | default = true]
2371+
2372+
tenant_topic:
2373+
# Enable the tenant topic tee, which writes logs to Kafka topics based on
2374+
# tenant IDs instead of using multitenant topics/partitions.
2375+
# CLI flag: -distributor.tenant-topic-tee.enabled
2376+
[enabled: <boolean> | default = false]
2377+
2378+
# Prefix to prepend to tenant IDs to form the final Kafka topic name
2379+
# CLI flag: -distributor.tenant-topic-tee.topic-prefix
2380+
[topic_prefix: <string> | default = "loki.tenant"]
2381+
2382+
# Maximum number of bytes that can be buffered before producing to Kafka
2383+
# CLI flag: -distributor.tenant-topic-tee.max-buffered-bytes
2384+
[max_buffered_bytes: <int> | default = 100MiB]
2385+
2386+
# Maximum size of a single Kafka record in bytes
2387+
# CLI flag: -distributor.tenant-topic-tee.max-record-size-bytes
2388+
[max_record_size_bytes: <int> | default = 15MiB249KiB]
2389+
2390+
# Topic strategy to use. Valid values are 'simple' or 'automatic'
2391+
# CLI flag: -distributor.tenant-topic-tee.strategy
2392+
[strategy: <string> | default = "simple"]
23712393
```
23722394

23732395
### etcd

‎pkg/distributor/distributor.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,9 @@ type Config struct {
9999
KafkaEnabled bool `yaml:"kafka_writes_enabled"`
100100
IngesterEnabled bool `yaml:"ingester_writes_enabled"`
101101
KafkaConfig kafka.Config `yaml:"-"`
102+
103+
// TODO: cleanup config
104+
TenantTopic TenantTopicConfig `yaml:"tenant_topic" category:"experimental"`
102105
}
103106

104107
// RegisterFlags registers distributor-related flags.
@@ -107,6 +110,7 @@ func (cfg *Config) RegisterFlags(fs *flag.FlagSet) {
107110
cfg.DistributorRing.RegisterFlags(fs)
108111
cfg.RateStore.RegisterFlagsWithPrefix("distributor.rate-store", fs)
109112
cfg.WriteFailuresLogging.RegisterFlagsWithPrefix("distributor.write-failures-logging", fs)
113+
cfg.TenantTopic.RegisterFlags(fs)
110114
fs.IntVar(&cfg.PushWorkerCount, "distributor.push-worker-count", 256, "Number of workers to push batches to ingesters.")
111115
fs.BoolVar(&cfg.KafkaEnabled, "distributor.kafka-writes-enabled", false, "Enable writes to Kafka during Push requests.")
112116
fs.BoolVar(&cfg.IngesterEnabled, "distributor.ingester-writes-enabled", true, "Enable writes to Ingesters during Push requests. Defaults to true.")
@@ -116,6 +120,9 @@ func (cfg *Config) Validate() error {
116120
if !cfg.KafkaEnabled && !cfg.IngesterEnabled {
117121
return fmt.Errorf("at least one of kafka and ingestor writes must be enabled")
118122
}
123+
if err := cfg.TenantTopic.Validate(); err != nil {
124+
return errors.Wrap(err, "validating tenant topic config")
125+
}
119126
return nil
120127
}
121128

@@ -246,6 +253,16 @@ func New(
246253
}
247254
kafkaWriter = kafka_client.NewProducer(kafkaClient, cfg.KafkaConfig.ProducerMaxBufferedBytes,
248255
prometheus.WrapRegistererWithPrefix("loki_", registerer))
256+
257+
// TODO: cleanup/make independent of whether we write kafka as primary?
258+
if cfg.TenantTopic.Enabled {
259+
w, err := NewTenantTopicWriter(cfg.TenantTopic, kafkaClient, overrides, registerer, logger)
260+
if err != nil {
261+
return nil, fmt.Errorf("failed to start tenant topic tee: %w", err)
262+
}
263+
264+
tee = WrapTee(tee, w)
265+
}
249266
}
250267

251268
d := &Distributor{

‎pkg/distributor/ratestore.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -257,13 +257,6 @@ func (s *rateStore) aggregateByShard(ctx context.Context, streamRates map[string
257257
return rates
258258
}
259259

260-
func max(a, b int64) int64 {
261-
if a > b {
262-
return a
263-
}
264-
return b
265-
}
266-
267260
func (s *rateStore) getRates(ctx context.Context, clients []ingesterClient) map[string]map[uint64]*logproto.StreamRate {
268261
if s.debug {
269262
if sp := opentracing.SpanFromContext(ctx); sp != nil {

0 commit comments

Comments
 (0)