Skip to content

Commit 52cb0af

Browse files
authored
chore(dataobj): Use kgo Balancer for dataobj-consumer (#16146)
1 parent 1fa952d commit 52cb0af

File tree

3 files changed

+34
-19
lines changed

3 files changed

+34
-19
lines changed

‎pkg/dataobj/consumer/partition_processor.go

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ type partitionProcessor struct {
3737
builderOnce sync.Once
3838
builderCfg dataobj.BuilderConfig
3939
bucket objstore.Bucket
40-
flushBuffer *bytes.Buffer
40+
bufPool *sync.Pool
4141

4242
// Metrics
4343
metrics *partitionOffsetMetrics
@@ -50,7 +50,7 @@ type partitionProcessor struct {
5050
logger log.Logger
5151
}
5252

53-
func newPartitionProcessor(ctx context.Context, client *kgo.Client, builderCfg dataobj.BuilderConfig, uploaderCfg uploader.Config, bucket objstore.Bucket, tenantID string, virtualShard int32, topic string, partition int32, logger log.Logger, reg prometheus.Registerer) *partitionProcessor {
53+
func newPartitionProcessor(ctx context.Context, client *kgo.Client, builderCfg dataobj.BuilderConfig, uploaderCfg uploader.Config, bucket objstore.Bucket, tenantID string, virtualShard int32, topic string, partition int32, logger log.Logger, reg prometheus.Registerer, bufPool *sync.Pool) *partitionProcessor {
5454
ctx, cancel := context.WithCancel(ctx)
5555
decoder, err := kafka.NewDecoder()
5656
if err != nil {
@@ -94,6 +94,7 @@ func newPartitionProcessor(ctx context.Context, client *kgo.Client, builderCfg d
9494
metrics: metrics,
9595
uploader: uploader,
9696
metastoreManager: metastoreManager,
97+
bufPool: bufPool,
9798
}
9899
}
99100

@@ -158,7 +159,6 @@ func (p *partitionProcessor) initBuilder() error {
158159
return
159160
}
160161
p.builder = builder
161-
p.flushBuffer = bytes.NewBuffer(make([]byte, 0, p.builderCfg.TargetObjectSize))
162162
})
163163
return initErr
164164
}
@@ -194,22 +194,29 @@ func (p *partitionProcessor) processRecord(record *kgo.Record) {
194194
return
195195
}
196196

197-
flushedDataobjStats, err := p.builder.Flush(p.flushBuffer)
198-
if err != nil {
199-
level.Error(p.logger).Log("msg", "failed to flush builder", "err", err)
200-
return
201-
}
197+
func() {
198+
flushBuffer := p.bufPool.Get().(*bytes.Buffer)
199+
defer p.bufPool.Put(flushBuffer)
202200

203-
objectPath, err := p.uploader.Upload(p.ctx, p.flushBuffer)
204-
if err != nil {
205-
level.Error(p.logger).Log("msg", "failed to upload object", "err", err)
206-
return
207-
}
201+
flushBuffer.Reset()
208202

209-
if err := p.metastoreManager.UpdateMetastore(p.ctx, objectPath, flushedDataobjStats); err != nil {
210-
level.Error(p.logger).Log("msg", "failed to update metastore", "err", err)
211-
return
212-
}
203+
flushedDataobjStats, err := p.builder.Flush(flushBuffer)
204+
if err != nil {
205+
level.Error(p.logger).Log("msg", "failed to flush builder", "err", err)
206+
return
207+
}
208+
209+
objectPath, err := p.uploader.Upload(p.ctx, flushBuffer)
210+
if err != nil {
211+
level.Error(p.logger).Log("msg", "failed to upload object", "err", err)
212+
return
213+
}
214+
215+
if err := p.metastoreManager.UpdateMetastore(p.ctx, objectPath, flushedDataobjStats); err != nil {
216+
level.Error(p.logger).Log("msg", "failed to update metastore", "err", err)
217+
return
218+
}
219+
}()
213220

214221
if err := p.commitRecords(record); err != nil {
215222
level.Error(p.logger).Log("msg", "failed to commit records", "err", err)

‎pkg/dataobj/consumer/service.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package consumer
22

33
import (
4+
"bytes"
45
"context"
56
"errors"
67
"strconv"
@@ -39,6 +40,8 @@ type Service struct {
3940
// Partition management
4041
partitionMtx sync.RWMutex
4142
partitionHandlers map[string]map[int32]*partitionProcessor
43+
44+
bufPool *sync.Pool
4245
}
4346

4447
func New(kafkaCfg kafka.Config, cfg Config, topicPrefix string, bucket objstore.Bucket, instanceID string, partitionRing ring.PartitionRingReader, reg prometheus.Registerer, logger log.Logger) *Service {
@@ -49,6 +52,11 @@ func New(kafkaCfg kafka.Config, cfg Config, topicPrefix string, bucket objstore.
4952
codec: distributor.TenantPrefixCodec(topicPrefix),
5053
partitionHandlers: make(map[string]map[int32]*partitionProcessor),
5154
reg: reg,
55+
bufPool: &sync.Pool{
56+
New: func() interface{} {
57+
return bytes.NewBuffer(make([]byte, 0, cfg.BuilderConfig.TargetObjectSize))
58+
},
59+
},
5260
}
5361

5462
client, err := consumer.NewGroupClient(
@@ -92,7 +100,7 @@ func (s *Service) handlePartitionsAssigned(ctx context.Context, client *kgo.Clie
92100
}
93101

94102
for _, partition := range parts {
95-
processor := newPartitionProcessor(ctx, client, s.cfg.BuilderConfig, s.cfg.UploaderConfig, s.bucket, tenant, virtualShard, topic, partition, s.logger, s.reg)
103+
processor := newPartitionProcessor(ctx, client, s.cfg.BuilderConfig, s.cfg.UploaderConfig, s.bucket, tenant, virtualShard, topic, partition, s.logger, s.reg, s.bufPool)
96104
s.partitionHandlers[topic][partition] = processor
97105
processor.start()
98106
}

‎pkg/kafka/partitionring/consumer/client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ func NewGroupClient(kafkaCfg kafka.Config, partitionRing ring.PartitionRingReade
3939
kgo.ConsumerGroup(groupName),
4040
kgo.ConsumeRegex(),
4141
kgo.ConsumeTopics(kafkaCfg.Topic),
42-
kgo.Balancers(NewCooperativeActiveStickyBalancer(partitionRing)),
42+
kgo.Balancers(kgo.CooperativeStickyBalancer()),
4343
kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()),
4444
kgo.DisableAutoCommit(),
4545
kgo.RebalanceTimeout(5 * time.Minute),

0 commit comments

Comments
 (0)