Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
feat: add phase to log lines in kafka_consumer.go
This commit adds the phase to the log lines in kafka_consumer.go to
make it easier to understand if the ingester is in starting phase
or in running phase.
  • Loading branch information
grobinson-grafana committed Nov 28, 2024
commit 470b3fc4a6aa0dfa7d7b2e6daa9c3a641950058d
2 changes: 1 addition & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ func New(cfg Config, clientConfig client.Config, store Store, limits Limits, con
cfg.KafkaIngestion.KafkaConfig,
i.ingestPartitionID,
cfg.LifecyclerConfig.ID,
NewKafkaConsumerFactory(i, logger, registerer),
NewKafkaConsumerFactory(i, registerer),
logger,
registerer,
)
Expand Down
4 changes: 2 additions & 2 deletions pkg/ingester/kafka_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ func newConsumerMetrics(reg prometheus.Registerer) *consumerMetrics {
}
}

func NewKafkaConsumerFactory(pusher logproto.PusherServer, logger log.Logger, reg prometheus.Registerer) partition.ConsumerFactory {
func NewKafkaConsumerFactory(pusher logproto.PusherServer, reg prometheus.Registerer) partition.ConsumerFactory {
metrics := newConsumerMetrics(reg)
return func(committer partition.Committer) (partition.Consumer, error) {
return func(committer partition.Committer, logger log.Logger) (partition.Consumer, error) {
decoder, err := kafka.NewDecoder()
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/kafka_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func TestConsumer(t *testing.T) {
pusher = &fakePusher{t: t}
)

consumer, err := NewKafkaConsumerFactory(pusher, log.NewLogfmtLogger(os.Stdout), prometheus.NewRegistry())(&noopCommitter{})
consumer, err := NewKafkaConsumerFactory(pusher, prometheus.NewRegistry())(&noopCommitter{}, log.NewLogfmtLogger(os.Stdout))
require.NoError(t, err)

records, err := kafka.Encode(0, tenantID, streamBar, 10000)
Expand Down
28 changes: 15 additions & 13 deletions pkg/kafka/partition/reader_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ const (
phaseRunning = "running"
)

type ConsumerFactory func(committer Committer) (Consumer, error)
type ConsumerFactory func(committer Committer, logger log.Logger) (Consumer, error)

type Consumer interface {
Start(ctx context.Context, recordsChan <-chan []Record) func()
Expand Down Expand Up @@ -126,27 +126,29 @@ func (s *ReaderService) starting(ctx context.Context) error {
s.metrics.reportOwnerOfPartition(s.reader.Partition())
s.metrics.reportStarting()

logger := log.With(s.logger, "phase", phaseStarting)

// Fetch the last committed offset to determine where to start reading
lastCommittedOffset, err := s.reader.FetchLastCommittedOffset(ctx)
if err != nil {
return fmt.Errorf("fetching last committed offset: %w", err)
}

if lastCommittedOffset == int64(KafkaEndOffset) {
level.Warn(s.logger).Log("msg", fmt.Sprintf("no committed offset found, starting from %d", kafkaStartOffset))
level.Warn(logger).Log("msg", fmt.Sprintf("no committed offset found, starting from %d", kafkaStartOffset))
} else {
level.Debug(s.logger).Log("msg", "last committed offset", "offset", lastCommittedOffset)
level.Debug(logger).Log("msg", "last committed offset", "offset", lastCommittedOffset)
}

consumeOffset := int64(kafkaStartOffset)
if lastCommittedOffset >= 0 {
// Read from the next offset.
consumeOffset = lastCommittedOffset + 1
}
level.Debug(s.logger).Log("msg", "consuming from offset", "offset", consumeOffset)
level.Debug(logger).Log("msg", "consuming from offset", "offset", consumeOffset)
s.reader.SetOffsetForConsumption(consumeOffset)

if err = s.processConsumerLagAtStartup(ctx); err != nil {
if err = s.processConsumerLagAtStartup(ctx, logger); err != nil {
return fmt.Errorf("failed to process consumer lag at startup: %w", err)
}

Expand All @@ -157,7 +159,7 @@ func (s *ReaderService) running(ctx context.Context) error {
level.Info(s.logger).Log("msg", "reader service running")
s.metrics.reportRunning()

consumer, err := s.consumerFactory(s.committer)
consumer, err := s.consumerFactory(s.committer, log.With(s.logger, "phase", phaseRunning))
if err != nil {
return fmt.Errorf("creating consumer: %w", err)
}
Expand All @@ -172,13 +174,13 @@ func (s *ReaderService) running(ctx context.Context) error {
return nil
}

func (s *ReaderService) processConsumerLagAtStartup(ctx context.Context) error {
func (s *ReaderService) processConsumerLagAtStartup(ctx context.Context, logger log.Logger) error {
if s.cfg.MaxConsumerLagAtStartup <= 0 {
level.Debug(s.logger).Log("msg", "processing consumer lag at startup is disabled")
level.Debug(logger).Log("msg", "processing consumer lag at startup is disabled")
return nil
}

consumer, err := s.consumerFactory(s.committer)
consumer, err := s.consumerFactory(s.committer, logger)
if err != nil {
return fmt.Errorf("failed to create consumer: %w", err)
}
Expand All @@ -192,13 +194,13 @@ func (s *ReaderService) processConsumerLagAtStartup(ctx context.Context) error {
wait()
}()

level.Debug(s.logger).Log("msg", "processing consumer lag at startup")
_, err = s.fetchUntilLagSatisfied(ctx, s.cfg.MaxConsumerLagAtStartup, s.logger, recordsCh, time.Since)
level.Debug(logger).Log("msg", "processing consumer lag at startup")
_, err = s.fetchUntilLagSatisfied(ctx, s.cfg.MaxConsumerLagAtStartup, logger, recordsCh, time.Since)
if err != nil {
level.Error(s.logger).Log("msg", "failed to catch up", "err", err)
level.Error(logger).Log("msg", "failed to catch up", "err", err)
return err
}
level.Debug(s.logger).Log("msg", "processing consumer lag at startup finished")
level.Debug(logger).Log("msg", "processing consumer lag at startup finished")

return nil
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/kafka/partition/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestPartitionReader_BasicFunctionality(t *testing.T) {
_, kafkaCfg := testkafka.CreateCluster(t, 1, "test")
consumer := newMockConsumer()

consumerFactory := func(_ Committer) (Consumer, error) {
consumerFactory := func(_ Committer, _ log.Logger) (Consumer, error) {
return consumer, nil
}

Expand Down Expand Up @@ -136,7 +136,7 @@ func TestPartitionReader_ProcessCatchUpAtStartup(t *testing.T) {
_, kafkaCfg := testkafka.CreateCluster(t, 1, "test-topic")
var consumerStarting *mockConsumer

consumerFactory := func(_ Committer) (Consumer, error) {
consumerFactory := func(_ Committer, _ log.Logger) (Consumer, error) {
// Return two consumers to ensure we are processing requests during service `start()` and not during `run()`.
if consumerStarting == nil {
consumerStarting = newMockConsumer()
Expand Down Expand Up @@ -198,7 +198,7 @@ func TestPartitionReader_ProcessCommits(t *testing.T) {
_, kafkaCfg := testkafka.CreateCluster(t, 1, "test-topic")
consumer := newMockConsumer()

consumerFactory := func(_ Committer) (Consumer, error) {
consumerFactory := func(_ Committer, _ log.Logger) (Consumer, error) {
return consumer, nil
}

Expand Down Expand Up @@ -267,7 +267,7 @@ func TestPartitionReader_StartsAtNextOffset(t *testing.T) {
consumer := newMockConsumer()

kaf.CurrentNode()
consumerFactory := func(_ Committer) (Consumer, error) {
consumerFactory := func(_ Committer, _ log.Logger) (Consumer, error) {
return consumer, nil
}

Expand Down Expand Up @@ -329,7 +329,7 @@ func TestPartitionReader_StartsUpIfNoNewRecordsAreAvailable(t *testing.T) {
consumer := newMockConsumer()

kaf.CurrentNode()
consumerFactory := func(_ Committer) (Consumer, error) {
consumerFactory := func(_ Committer, _ log.Logger) (Consumer, error) {
return consumer, nil
}

Expand Down