Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions pkg/kafka/partition/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ func (p *Reader) start(ctx context.Context) error {

// We manage our commits manually, so we must fetch the last offset for our consumer group to find out where to read from.
lastCommittedOffset := p.fetchLastCommittedOffset(ctx)
if lastCommittedOffset > 0 {
lastCommittedOffset++ // We want to begin to read from the next offset, but only if we've previously committed an offset.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this compare with the comment on Line 379:

lastProducedOffset = lastProducedOffset - 1 // Kafka returns the next empty offset so we must subtract 1 to get the oldest written offset.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If Kafka returns the next empty offset, why do we need to add 1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's the last produced offset, vs last committed offset. Apparently they behave slightly differently!

}
p.client.AddConsumePartitions(map[string]map[int32]kgo.Offset{
p.kafkaCfg.Topic: {p.partitionID: kgo.NewOffset().At(lastCommittedOffset)},
})
Expand Down Expand Up @@ -349,6 +352,8 @@ func (p *Reader) processNextFetchesUntilLagHonored(ctx context.Context, maxLag t
continue
}

consumerGroupLastCommittedOffset := p.fetchLastCommittedOffset(ctx)

// Send a direct request to the Kafka backend to fetch the last produced offset.
// We intentionally don't use WaitNextFetchLastProducedOffset() to not introduce further
// latency.
Expand All @@ -371,6 +376,11 @@ func (p *Reader) processNextFetchesUntilLagHonored(ctx context.Context, maxLag t
return 0, nil
}

if consumerGroupLastCommittedOffset == lastProducedOffset {
level.Info(logger).Log("msg", "partition reader found no records to consume because it is already up-to-date", "last_committed_offset", consumerGroupLastCommittedOffset, "last_produced_offset", lastProducedOffset)
return 0, nil
}

// This message is NOT expected to be logged with a very high rate. In this log we display the last measured
// lag. If we don't have it (lag is zero value), then it will not be logged.
level.Info(loggerWithCurrentLagIfSet(logger, currLag)).Log("msg", "partition reader is consuming records to honor target and max consumer lag", "partition_start_offset", partitionStartOffset, "last_produced_offset", lastProducedOffset, "last_processed_offset", p.lastProcessedOffset, "offset_lag", lastProducedOffset-p.lastProcessedOffset)
Expand All @@ -380,9 +390,13 @@ func (p *Reader) processNextFetchesUntilLagHonored(ctx context.Context, maxLag t
if lastProducedOffset <= p.lastProcessedOffset {
break
}
if time.Since(lastProducedOffsetRequestedAt) > time.Minute {
level.Info(loggerWithCurrentLagIfSet(logger, currLag)).Log("msg", "partition reader is still consuming records...", "last_processed_offset", p.lastProcessedOffset, "offset_lag", lastProducedOffset-p.lastProcessedOffset)
}

records := p.poll(ctx)
recordsChan <- records
timedCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
recordsChan <- p.poll(timedCtx)
cancel()
}
if boff.Err() != nil {
return 0, boff.ErrCause()
Expand Down
109 changes: 109 additions & 0 deletions pkg/kafka/partition/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo"

"github.com/grafana/loki/v3/pkg/kafka"
Expand Down Expand Up @@ -232,3 +233,111 @@ func TestPartitionReader_ProcessCommits(t *testing.T) {
// We expect to have processed all the records, including initial + one per iteration.
assert.Equal(t, iterations+1, recordsCount)
}

func TestPartitionReader_StartsAtNextOffset(t *testing.T) {
kaf, kafkaCfg := testkafka.CreateCluster(t, 1, "test")
consumer := newMockConsumer()

kaf.CurrentNode()
consumerFactory := func(_ Committer) (Consumer, error) {
return consumer, nil
}

// Produce some records
producer, err := client.NewWriterClient(kafkaCfg, 100, log.NewNopLogger(), prometheus.NewRegistry())
require.NoError(t, err)
stream := logproto.Stream{
Labels: labels.FromStrings("foo", "bar").String(),
}
for i := 0; i < 5; i++ {
stream.Entries = []logproto.Entry{{Timestamp: time.Now(), Line: fmt.Sprintf("test-%d", i)}}
records, err := kafka.Encode(0, "test-tenant", stream, 10<<20)
require.NoError(t, err)
require.Len(t, records, 1)

producer.ProduceSync(context.Background(), records...)
}

// Set our offset part way through the records we just produced
offset := int64(1)
kafkaClient, err := client.NewReaderClient(kafkaCfg, nil, log.NewNopLogger())
require.NoError(t, err)
admClient := kadm.NewClient(kafkaClient)
toCommit := kadm.Offsets{}
toCommit.AddOffset(kafkaCfg.Topic, 0, offset, -1)
resp, err := admClient.CommitOffsets(context.Background(), "test-consumer-group", toCommit)
require.NoError(t, err)
require.NoError(t, resp.Error())

// Start reading
partitionReader, err := NewReader(kafkaCfg, 0, "test-consumer-group", consumerFactory, log.NewNopLogger(), prometheus.NewRegistry())
require.NoError(t, err)
err = services.StartAndAwaitRunning(context.Background(), partitionReader)
require.NoError(t, err)

// Wait for records to be processed
require.Eventually(t, func() bool {
return len(consumer.recordsChan) == 1 // All pending messages will be received in one batch
}, 10*time.Second, 10*time.Millisecond)

// Check we only received records from the last commit onwards, and the last committed offset is not reprocessed.
receivedRecords := <-consumer.recordsChan
require.Len(t, receivedRecords, 3) // Offsets are 0 based, so we should read offsets 2,3,4
for _, record := range receivedRecords {
assert.NotContainsf(t, record.Content, "test-0", "record %q should not contain test-0", record.Content)
assert.NotContainsf(t, record.Content, "test-1", "record %q should not contain test-1", record.Content)
}

err = services.StopAndAwaitTerminated(context.Background(), partitionReader)
require.NoError(t, err)
}

func TestPartitionReader_StartsUpIfNoNewRecordsAreAvailable(t *testing.T) {
kaf, kafkaCfg := testkafka.CreateCluster(t, 1, "test")
consumer := newMockConsumer()

kaf.CurrentNode()
consumerFactory := func(_ Committer) (Consumer, error) {
return consumer, nil
}

// Produce some records
producer, err := client.NewWriterClient(kafkaCfg, 100, log.NewNopLogger(), prometheus.NewRegistry())
require.NoError(t, err)
stream := logproto.Stream{
Labels: labels.FromStrings("foo", "bar").String(),
}
for i := 0; i < 5; i++ {
stream.Entries = []logproto.Entry{{Timestamp: time.Now(), Line: fmt.Sprintf("test-%d", i)}}
records, err := kafka.Encode(0, "test-tenant", stream, 10<<20)
require.NoError(t, err)
require.Len(t, records, 1)

producer.ProduceSync(context.Background(), records...)
}

// Set our offset to the last record produced
offset := int64(4)
kafkaClient, err := client.NewReaderClient(kafkaCfg, nil, log.NewNopLogger())
require.NoError(t, err)
admClient := kadm.NewClient(kafkaClient)
toCommit := kadm.Offsets{}
toCommit.AddOffset(kafkaCfg.Topic, 0, offset, -1)
resp, err := admClient.CommitOffsets(context.Background(), "test-consumer-group", toCommit)
require.NoError(t, err)
require.NoError(t, resp.Error())

// Start reading
partitionReader, err := NewReader(kafkaCfg, 0, "test-consumer-group", consumerFactory, log.NewNopLogger(), prometheus.NewRegistry())
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err = services.StartAndAwaitRunning(ctx, partitionReader)
require.NoError(t, err)

// Check we didn't receive any records: This is a sanity check. We shouldn't get this far if we deadlock during startup.
require.Len(t, consumer.recordsChan, 0)

err = services.StopAndAwaitTerminated(context.Background(), partitionReader)
require.NoError(t, err)
}
Loading