Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
fix: Ensure partition-reader starts up correctly
  • Loading branch information
benclive committed Nov 8, 2024
commit 6fbf23df41329a7c3eebc504444c27b5e65a7aa7
18 changes: 16 additions & 2 deletions pkg/kafka/partition/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@

// We manage our commits manually, so we must fetch the last offset for our consumer group to find out where to read from.
lastCommittedOffset := p.fetchLastCommittedOffset(ctx)
if lastCommittedOffset > 0 {
lastCommittedOffset += 1 // We want to begin to read from the next offset, but only if we've previously committed an offset.

Check warning on line 107 in pkg/kafka/partition/reader.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

increment-decrement: should replace lastCommittedOffset += 1 with lastCommittedOffset++ (revive)
}
p.client.AddConsumePartitions(map[string]map[int32]kgo.Offset{
p.kafkaCfg.Topic: {p.partitionID: kgo.NewOffset().At(lastCommittedOffset)},
})
Expand Down Expand Up @@ -349,6 +352,8 @@
continue
}

consumerGroupLastCommittedOffset := p.fetchLastCommittedOffset(ctx)

// Send a direct request to the Kafka backend to fetch the last produced offset.
// We intentionally don't use WaitNextFetchLastProducedOffset() to not introduce further
// latency.
Expand All @@ -371,6 +376,11 @@
return 0, nil
}

if consumerGroupLastCommittedOffset == lastProducedOffset {
level.Info(logger).Log("msg", "partition reader found no records to consume because it is already up-to-date", "last_committed_offset", consumerGroupLastCommittedOffset, "last_produced_offset", lastProducedOffset)
return 0, nil
}

// This message is NOT expected to be logged with a very high rate. In this log we display the last measured
// lag. If we don't have it (lag is zero value), then it will not be logged.
level.Info(loggerWithCurrentLagIfSet(logger, currLag)).Log("msg", "partition reader is consuming records to honor target and max consumer lag", "partition_start_offset", partitionStartOffset, "last_produced_offset", lastProducedOffset, "last_processed_offset", p.lastProcessedOffset, "offset_lag", lastProducedOffset-p.lastProcessedOffset)
Expand All @@ -380,9 +390,13 @@
if lastProducedOffset <= p.lastProcessedOffset {
break
}
if time.Since(lastProducedOffsetRequestedAt) > time.Minute {
level.Info(loggerWithCurrentLagIfSet(logger, currLag)).Log("msg", "partition reader is still consuming records...", "last_processed_offset", p.lastProcessedOffset, "offset_lag", lastProducedOffset-p.lastProcessedOffset)
}

records := p.poll(ctx)
recordsChan <- records
timedCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
recordsChan <- p.poll(timedCtx)
cancel()
}
if boff.Err() != nil {
return 0, boff.ErrCause()
Expand Down
109 changes: 109 additions & 0 deletions pkg/kafka/partition/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo"

"github.com/grafana/loki/v3/pkg/kafka"
Expand Down Expand Up @@ -232,3 +233,111 @@ func TestPartitionReader_ProcessCommits(t *testing.T) {
// We expect to have processed all the records, including initial + one per iteration.
assert.Equal(t, iterations+1, recordsCount)
}

func TestPartitionReader_StartsAtNextOffset(t *testing.T) {
kaf, kafkaCfg := testkafka.CreateCluster(t, 1, "test")
consumer := newMockConsumer()

kaf.CurrentNode()
consumerFactory := func(_ Committer) (Consumer, error) {
return consumer, nil
}

// Produce some records
producer, err := client.NewWriterClient(kafkaCfg, 100, log.NewNopLogger(), prometheus.NewRegistry())
require.NoError(t, err)
stream := logproto.Stream{
Labels: labels.FromStrings("foo", "bar").String(),
}
for i := 0; i < 5; i++ {
stream.Entries = []logproto.Entry{{Timestamp: time.Now(), Line: fmt.Sprintf("test-%d", i)}}
records, err := kafka.Encode(0, "test-tenant", stream, 10<<20)
require.NoError(t, err)
require.Len(t, records, 1)

producer.ProduceSync(context.Background(), records...)
}

// Set our offset part way through the records we just produced
offset := int64(1)
kafkaClient, err := client.NewReaderClient(kafkaCfg, nil, log.NewNopLogger())
require.NoError(t, err)
admClient := kadm.NewClient(kafkaClient)
toCommit := kadm.Offsets{}
toCommit.AddOffset(kafkaCfg.Topic, 0, offset, -1)
resp, err := admClient.CommitOffsets(context.Background(), "test-consumer-group", toCommit)
require.NoError(t, err)
require.NoError(t, resp.Error())

// Start reading
partitionReader, err := NewReader(kafkaCfg, 0, "test-consumer-group", consumerFactory, log.NewNopLogger(), prometheus.NewRegistry())
require.NoError(t, err)
err = services.StartAndAwaitRunning(context.Background(), partitionReader)
require.NoError(t, err)

// Wait for records to be processed
require.Eventually(t, func() bool {
return len(consumer.recordsChan) == 1 // All pending messages will be received in one batch
}, 10*time.Second, 10*time.Millisecond)

// Check we only received records from the last commit onwards, and the last committed offset is not reprocessed.
receivedRecords := <-consumer.recordsChan
require.Len(t, receivedRecords, 3) // Offsets are 0 based, so we should read offsets 2,3,4
for _, record := range receivedRecords {
assert.NotContainsf(t, record.Content, "test-0", "record %q should not contain test-0", record.Content)
assert.NotContainsf(t, record.Content, "test-1", "record %q should not contain test-1", record.Content)
}

err = services.StopAndAwaitTerminated(context.Background(), partitionReader)
require.NoError(t, err)
}

func TestPartitionReader_StartsUpIfNoNewRecordsAreAvailable(t *testing.T) {
kaf, kafkaCfg := testkafka.CreateCluster(t, 1, "test")
consumer := newMockConsumer()

kaf.CurrentNode()
consumerFactory := func(_ Committer) (Consumer, error) {
return consumer, nil
}

// Produce some records
producer, err := client.NewWriterClient(kafkaCfg, 100, log.NewNopLogger(), prometheus.NewRegistry())
require.NoError(t, err)
stream := logproto.Stream{
Labels: labels.FromStrings("foo", "bar").String(),
}
for i := 0; i < 5; i++ {
stream.Entries = []logproto.Entry{{Timestamp: time.Now(), Line: fmt.Sprintf("test-%d", i)}}
records, err := kafka.Encode(0, "test-tenant", stream, 10<<20)
require.NoError(t, err)
require.Len(t, records, 1)

producer.ProduceSync(context.Background(), records...)
}

// Set our offset to the last record produced
offset := int64(4)
kafkaClient, err := client.NewReaderClient(kafkaCfg, nil, log.NewNopLogger())
require.NoError(t, err)
admClient := kadm.NewClient(kafkaClient)
toCommit := kadm.Offsets{}
toCommit.AddOffset(kafkaCfg.Topic, 0, offset, -1)
resp, err := admClient.CommitOffsets(context.Background(), "test-consumer-group", toCommit)
require.NoError(t, err)
require.NoError(t, resp.Error())

// Start reading
partitionReader, err := NewReader(kafkaCfg, 0, "test-consumer-group", consumerFactory, log.NewNopLogger(), prometheus.NewRegistry())
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err = services.StartAndAwaitRunning(ctx, partitionReader)
require.NoError(t, err)

// Check we didn't receive any records: This is a sanity check. We shouldn't get this far if we deadlock during startup.
require.Len(t, consumer.recordsChan, 0)

err = services.StopAndAwaitTerminated(context.Background(), partitionReader)
require.NoError(t, err)
}
Loading