Skip to content

Commit b2f3d2e

Browse files
authored
fix: Ensure partition-reader starts up correctly (#14845)
1 parent 4bfa380 commit b2f3d2e

File tree

2 files changed

+125
-2
lines changed

2 files changed

+125
-2
lines changed

‎pkg/kafka/partition/reader.go

+16-2
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,9 @@ func (p *Reader) start(ctx context.Context) error {
103103

104104
// We manage our commits manually, so we must fetch the last offset for our consumer group to find out where to read from.
105105
lastCommittedOffset := p.fetchLastCommittedOffset(ctx)
106+
if lastCommittedOffset > 0 {
107+
lastCommittedOffset++ // We want to begin to read from the next offset, but only if we've previously committed an offset.
108+
}
106109
p.client.AddConsumePartitions(map[string]map[int32]kgo.Offset{
107110
p.kafkaCfg.Topic: {p.partitionID: kgo.NewOffset().At(lastCommittedOffset)},
108111
})
@@ -349,6 +352,8 @@ func (p *Reader) processNextFetchesUntilLagHonored(ctx context.Context, maxLag t
349352
continue
350353
}
351354

355+
consumerGroupLastCommittedOffset := p.fetchLastCommittedOffset(ctx)
356+
352357
// Send a direct request to the Kafka backend to fetch the last produced offset.
353358
// We intentionally don't use WaitNextFetchLastProducedOffset() to not introduce further
354359
// latency.
@@ -371,6 +376,11 @@ func (p *Reader) processNextFetchesUntilLagHonored(ctx context.Context, maxLag t
371376
return 0, nil
372377
}
373378

379+
if consumerGroupLastCommittedOffset == lastProducedOffset {
380+
level.Info(logger).Log("msg", "partition reader found no records to consume because it is already up-to-date", "last_committed_offset", consumerGroupLastCommittedOffset, "last_produced_offset", lastProducedOffset)
381+
return 0, nil
382+
}
383+
374384
// This message is NOT expected to be logged with a very high rate. In this log we display the last measured
375385
// lag. If we don't have it (lag is zero value), then it will not be logged.
376386
level.Info(loggerWithCurrentLagIfSet(logger, currLag)).Log("msg", "partition reader is consuming records to honor target and max consumer lag", "partition_start_offset", partitionStartOffset, "last_produced_offset", lastProducedOffset, "last_processed_offset", p.lastProcessedOffset, "offset_lag", lastProducedOffset-p.lastProcessedOffset)
@@ -380,9 +390,13 @@ func (p *Reader) processNextFetchesUntilLagHonored(ctx context.Context, maxLag t
380390
if lastProducedOffset <= p.lastProcessedOffset {
381391
break
382392
}
393+
if time.Since(lastProducedOffsetRequestedAt) > time.Minute {
394+
level.Info(loggerWithCurrentLagIfSet(logger, currLag)).Log("msg", "partition reader is still consuming records...", "last_processed_offset", p.lastProcessedOffset, "offset_lag", lastProducedOffset-p.lastProcessedOffset)
395+
}
383396

384-
records := p.poll(ctx)
385-
recordsChan <- records
397+
timedCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
398+
recordsChan <- p.poll(timedCtx)
399+
cancel()
386400
}
387401
if boff.Err() != nil {
388402
return 0, boff.ErrCause()

‎pkg/kafka/partition/reader_test.go

+109
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/stretchr/testify/assert"
1515
"github.com/stretchr/testify/mock"
1616
"github.com/stretchr/testify/require"
17+
"github.com/twmb/franz-go/pkg/kadm"
1718
"github.com/twmb/franz-go/pkg/kgo"
1819

1920
"github.com/grafana/loki/v3/pkg/kafka"
@@ -232,3 +233,111 @@ func TestPartitionReader_ProcessCommits(t *testing.T) {
232233
// We expect to have processed all the records, including initial + one per iteration.
233234
assert.Equal(t, iterations+1, recordsCount)
234235
}
236+
237+
func TestPartitionReader_StartsAtNextOffset(t *testing.T) {
238+
kaf, kafkaCfg := testkafka.CreateCluster(t, 1, "test")
239+
consumer := newMockConsumer()
240+
241+
kaf.CurrentNode()
242+
consumerFactory := func(_ Committer) (Consumer, error) {
243+
return consumer, nil
244+
}
245+
246+
// Produce some records
247+
producer, err := client.NewWriterClient(kafkaCfg, 100, log.NewNopLogger(), prometheus.NewRegistry())
248+
require.NoError(t, err)
249+
stream := logproto.Stream{
250+
Labels: labels.FromStrings("foo", "bar").String(),
251+
}
252+
for i := 0; i < 5; i++ {
253+
stream.Entries = []logproto.Entry{{Timestamp: time.Now(), Line: fmt.Sprintf("test-%d", i)}}
254+
records, err := kafka.Encode(0, "test-tenant", stream, 10<<20)
255+
require.NoError(t, err)
256+
require.Len(t, records, 1)
257+
258+
producer.ProduceSync(context.Background(), records...)
259+
}
260+
261+
// Set our offset part way through the records we just produced
262+
offset := int64(1)
263+
kafkaClient, err := client.NewReaderClient(kafkaCfg, nil, log.NewNopLogger())
264+
require.NoError(t, err)
265+
admClient := kadm.NewClient(kafkaClient)
266+
toCommit := kadm.Offsets{}
267+
toCommit.AddOffset(kafkaCfg.Topic, 0, offset, -1)
268+
resp, err := admClient.CommitOffsets(context.Background(), "test-consumer-group", toCommit)
269+
require.NoError(t, err)
270+
require.NoError(t, resp.Error())
271+
272+
// Start reading
273+
partitionReader, err := NewReader(kafkaCfg, 0, "test-consumer-group", consumerFactory, log.NewNopLogger(), prometheus.NewRegistry())
274+
require.NoError(t, err)
275+
err = services.StartAndAwaitRunning(context.Background(), partitionReader)
276+
require.NoError(t, err)
277+
278+
// Wait for records to be processed
279+
require.Eventually(t, func() bool {
280+
return len(consumer.recordsChan) == 1 // All pending messages will be received in one batch
281+
}, 10*time.Second, 10*time.Millisecond)
282+
283+
// Check we only received records from the last commit onwards, and the last committed offset is not reprocessed.
284+
receivedRecords := <-consumer.recordsChan
285+
require.Len(t, receivedRecords, 3) // Offsets are 0 based, so we should read offsets 2,3,4
286+
for _, record := range receivedRecords {
287+
assert.NotContainsf(t, record.Content, "test-0", "record %q should not contain test-0", record.Content)
288+
assert.NotContainsf(t, record.Content, "test-1", "record %q should not contain test-1", record.Content)
289+
}
290+
291+
err = services.StopAndAwaitTerminated(context.Background(), partitionReader)
292+
require.NoError(t, err)
293+
}
294+
295+
func TestPartitionReader_StartsUpIfNoNewRecordsAreAvailable(t *testing.T) {
296+
kaf, kafkaCfg := testkafka.CreateCluster(t, 1, "test")
297+
consumer := newMockConsumer()
298+
299+
kaf.CurrentNode()
300+
consumerFactory := func(_ Committer) (Consumer, error) {
301+
return consumer, nil
302+
}
303+
304+
// Produce some records
305+
producer, err := client.NewWriterClient(kafkaCfg, 100, log.NewNopLogger(), prometheus.NewRegistry())
306+
require.NoError(t, err)
307+
stream := logproto.Stream{
308+
Labels: labels.FromStrings("foo", "bar").String(),
309+
}
310+
for i := 0; i < 5; i++ {
311+
stream.Entries = []logproto.Entry{{Timestamp: time.Now(), Line: fmt.Sprintf("test-%d", i)}}
312+
records, err := kafka.Encode(0, "test-tenant", stream, 10<<20)
313+
require.NoError(t, err)
314+
require.Len(t, records, 1)
315+
316+
producer.ProduceSync(context.Background(), records...)
317+
}
318+
319+
// Set our offset to the last record produced
320+
offset := int64(4)
321+
kafkaClient, err := client.NewReaderClient(kafkaCfg, nil, log.NewNopLogger())
322+
require.NoError(t, err)
323+
admClient := kadm.NewClient(kafkaClient)
324+
toCommit := kadm.Offsets{}
325+
toCommit.AddOffset(kafkaCfg.Topic, 0, offset, -1)
326+
resp, err := admClient.CommitOffsets(context.Background(), "test-consumer-group", toCommit)
327+
require.NoError(t, err)
328+
require.NoError(t, resp.Error())
329+
330+
// Start reading
331+
partitionReader, err := NewReader(kafkaCfg, 0, "test-consumer-group", consumerFactory, log.NewNopLogger(), prometheus.NewRegistry())
332+
require.NoError(t, err)
333+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
334+
defer cancel()
335+
err = services.StartAndAwaitRunning(ctx, partitionReader)
336+
require.NoError(t, err)
337+
338+
// Check we didn't receive any records: This is a sanity check. We shouldn't get this far if we deadlock during startup.
339+
require.Len(t, consumer.recordsChan, 0)
340+
341+
err = services.StopAndAwaitTerminated(context.Background(), partitionReader)
342+
require.NoError(t, err)
343+
}

0 commit comments

Comments
 (0)