Skip to content

Commit 11e74bc

Browse files
committed
prefix alignment
1 parent 7303f9e commit 11e74bc

File tree

1 file changed

+2
-2
lines changed

1 file changed

+2
-2
lines changed

‎pkg/distributor/tenant_topic_tee.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ func (cfg *TenantTopicConfig) Validate() error {
9090
// RegisterFlags adds the flags required to configure this flag set.
9191
func (cfg *TenantTopicConfig) RegisterFlags(f *flag.FlagSet) {
9292
f.BoolVar(&cfg.Enabled, "distributor.tenant-topic-tee.enabled", false, "Enable the tenant topic tee")
93-
f.StringVar(&cfg.TopicPrefix, "distributor.tenant-topic-tee.topic-prefix", "loki.tenant.", "Prefix to prepend to tenant IDs to form the final Kafka topic name")
93+
f.StringVar(&cfg.TopicPrefix, "distributor.tenant-topic-tee.topic-prefix", "loki.tenant", "Prefix to prepend to tenant IDs to form the final Kafka topic name")
9494
cfg.MaxBufferedBytes = 100 << 20 // 100MB
9595
f.Var(&cfg.MaxBufferedBytes, "distributor.tenant-topic-tee.max-buffered-bytes", "Maximum number of bytes that can be buffered before producing to Kafka")
9696
f.DurationVar(&cfg.BatchTimeout, "distributor.tenant-topic-tee.batch-timeout", 10*time.Second, "Maximum amount of time to wait before sending a batch to Kafka")
@@ -120,7 +120,7 @@ func NewSimplePartitionResolver(topicPrefix string) *SimplePartitionResolver {
120120

121121
// Resolve implements PartitionResolver
122122
func (r *SimplePartitionResolver) Resolve(_ context.Context, tenant string, totalPartitions uint32, stream KeyedStream) (string, int32, error) {
123-
topic := fmt.Sprintf("%s%s", r.topicPrefix, tenant)
123+
topic := fmt.Sprintf("%s.%s", r.topicPrefix, tenant)
124124
partition := int32(stream.HashKey % totalPartitions)
125125
return topic, partition, nil
126126
}

0 commit comments

Comments
 (0)