Skip to content
Next Next commit
chore: split kafka reader into reader and offset manager
  • Loading branch information
ashwanthgoli committed Dec 3, 2024
commit 8b2f67c11be32a0c4814b39fff1ed4de7423f1ae
39 changes: 21 additions & 18 deletions pkg/blockbuilder/builder/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,17 @@ type PartitionController interface {
//
// containing log data and "committed" is the consumer group
type PartitionJobController struct {
stepLen int64
part partition.Reader
backoff backoff.Config
decoder *kafka.Decoder
logger log.Logger
stepLen int64
reader partition.Reader
offsetManager partition.OffsetManager
backoff backoff.Config
decoder *kafka.Decoder
logger log.Logger
}

func NewPartitionJobController(
controller partition.Reader,
reader partition.Reader,
offsetManager partition.OffsetManager,
backoff backoff.Config,
logger log.Logger,
) (*PartitionJobController, error) {
Expand All @@ -73,14 +75,15 @@ func NewPartitionJobController(
return nil, err
}
return &PartitionJobController{
stepLen: 1000, // Default step length of 1000 offsets per job
part: controller,
backoff: backoff,
decoder: decoder,
stepLen: 1000, // Default step length of 1000 offsets per job
reader: reader,
offsetManager: offsetManager,
backoff: backoff,
decoder: decoder,
logger: log.With(logger,
"component", "job-controller",
"topic", controller.Topic(),
"partition", controller.Partition(),
"topic", offsetManager.Topic(),
"partition", offsetManager.Partition(),
),
}, nil
}
Expand All @@ -90,7 +93,7 @@ func (l *PartitionJobController) HighestCommittedOffset(ctx context.Context) (in
ctx,
l.backoff,
func() (int64, error) {
return l.part.FetchLastCommittedOffset(ctx)
return l.offsetManager.FetchLastCommittedOffset(ctx)
},
)
}
Expand All @@ -100,7 +103,7 @@ func (l *PartitionJobController) HighestPartitionOffset(ctx context.Context) (in
ctx,
l.backoff,
func() (int64, error) {
return l.part.FetchPartitionOffset(ctx, partition.KafkaEndOffset)
return l.offsetManager.FetchPartitionOffset(ctx, partition.KafkaEndOffset)
},
)
}
Expand All @@ -110,13 +113,13 @@ func (l *PartitionJobController) EarliestPartitionOffset(ctx context.Context) (i
ctx,
l.backoff,
func() (int64, error) {
return l.part.FetchPartitionOffset(ctx, partition.KafkaStartOffset)
return l.offsetManager.FetchPartitionOffset(ctx, partition.KafkaStartOffset)
},
)
}

func (l *PartitionJobController) Process(ctx context.Context, offsets types.Offsets, ch chan<- []AppendInput) (int64, error) {
l.part.SetOffsetForConsumption(offsets.Min)
l.reader.SetOffsetForConsumption(offsets.Min)

var (
lastOffset = offsets.Min - 1
Expand All @@ -126,7 +129,7 @@ func (l *PartitionJobController) Process(ctx context.Context, offsets types.Offs

for lastOffset < offsets.Max && boff.Ongoing() {
var records []partition.Record
records, err = l.part.Poll(ctx, int(offsets.Max-lastOffset))
records, err = l.reader.Poll(ctx, int(offsets.Max-lastOffset))
if err != nil {
boff.Wait()
continue
Expand Down Expand Up @@ -217,7 +220,7 @@ func (l *PartitionJobController) LoadJob(ctx context.Context) (bool, *types.Job,
}

// Convert partition from int32 to int
job := types.NewJob(int(l.part.Partition()), offsets)
job := types.NewJob(int(l.reader.Partition()), offsets)
return true, job, nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/blockbuilder/builder/slimgester.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ func (i *BlockBuilder) runOne(ctx context.Context) (skipped bool, err error) {
return false, nil
}

if err = i.jobController.part.Commit(ctx, lastOffset); err != nil {
if err = i.jobController.offsetManager.Commit(ctx, lastOffset); err != nil {
level.Error(logger).Log(
"msg", "failed to commit offset",
"last_offset", lastOffset,
Expand Down
2 changes: 1 addition & 1 deletion pkg/blockbuilder/scheduler/strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func TestTimeRangePlanner_Plan(t *testing.T) {
require.NoError(t, err)

require.Equal(t, len(tc.expectedJobs), len(jobs))
require.Equal(t, tc.expectedJobs, jobs)
require.ElementsMatch(t, tc.expectedJobs, jobs)
})
}
}
Expand Down
24 changes: 12 additions & 12 deletions pkg/kafka/partition/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,34 +25,34 @@ type partitionCommitter struct {
commitFailuresTotal prometheus.Counter
lastCommittedOffset prometheus.Gauge

logger log.Logger
reader Reader
commitFreq time.Duration
logger log.Logger
offsetManager OffsetManager
commitFreq time.Duration

toCommit *atomic.Int64
wg sync.WaitGroup
cancel context.CancelFunc
}

func newCommitter(reader Reader, commitFreq time.Duration, logger log.Logger, reg prometheus.Registerer) *partitionCommitter {
func newCommitter(offsetManager OffsetManager, commitFreq time.Duration, logger log.Logger, reg prometheus.Registerer) *partitionCommitter {
c := &partitionCommitter{
logger: logger,
reader: reader,
commitFreq: commitFreq,
logger: logger,
offsetManager: offsetManager,
commitFreq: commitFreq,
commitRequestsTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "loki_ingest_storage_reader_offset_commit_requests_total",
Help: "Total number of requests issued to commit the last consumed offset (includes both successful and failed requests).",
ConstLabels: prometheus.Labels{"partition": strconv.Itoa(int(reader.Partition()))},
ConstLabels: prometheus.Labels{"partition": strconv.Itoa(int(offsetManager.Partition()))},
}),
commitFailuresTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "loki_ingest_storage_reader_offset_commit_failures_total",
Help: "Total number of failed requests to commit the last consumed offset.",
ConstLabels: prometheus.Labels{"partition": strconv.Itoa(int(reader.Partition()))},
ConstLabels: prometheus.Labels{"partition": strconv.Itoa(int(offsetManager.Partition()))},
}),
commitRequestsLatency: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Name: "loki_ingest_storage_reader_offset_commit_request_duration_seconds",
Help: "The duration of requests to commit the last consumed offset.",
ConstLabels: prometheus.Labels{"partition": strconv.Itoa(int(reader.Partition()))},
ConstLabels: prometheus.Labels{"partition": strconv.Itoa(int(offsetManager.Partition()))},
NativeHistogramBucketFactor: 1.1,
NativeHistogramMaxBucketNumber: 100,
NativeHistogramMinResetDuration: time.Hour,
Expand All @@ -61,7 +61,7 @@ func newCommitter(reader Reader, commitFreq time.Duration, logger log.Logger, re
lastCommittedOffset: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "loki_ingest_storage_reader_last_committed_offset",
Help: "The last consumed offset successfully committed by the partition reader. Set to -1 if not offset has been committed yet.",
ConstLabels: prometheus.Labels{"partition": strconv.Itoa(int(reader.Partition()))},
ConstLabels: prometheus.Labels{"partition": strconv.Itoa(int(offsetManager.Partition()))},
}),
toCommit: atomic.NewInt64(-1),
}
Expand Down Expand Up @@ -116,7 +116,7 @@ func (c *partitionCommitter) Commit(ctx context.Context, offset int64) error {
startTime := time.Now()
c.commitRequestsTotal.Inc()

if err := c.reader.Commit(ctx, offset); err != nil {
if err := c.offsetManager.Commit(ctx, offset); err != nil {
level.Error(c.logger).Log("msg", "failed to commit offset", "err", err, "offset", offset)
c.commitFailuresTotal.Inc()
c.commitRequestsLatency.Observe(time.Since(startTime).Seconds())
Expand Down
2 changes: 1 addition & 1 deletion pkg/kafka/partition/committer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestPartitionCommitter(t *testing.T) {
reg := prometheus.NewRegistry()
partitionID := int32(1)
consumerGroup := "test-consumer-group"
reader := newKafkaReader(
reader := newKafkaOffsetManager(
client,
kafkaCfg.Topic,
partitionID,
Expand Down
Loading
Loading