Skip to content

Commit 161a192

Browse files
fix: Use separate variable to track the consume offset (#15095)
1 parent c27d116 commit 161a192

File tree

1 file changed

+6
-4
lines changed

1 file changed

+6
-4
lines changed

‎pkg/kafka/partition/reader_service.go

+6-4
Original file line numberDiff line numberDiff line change
@@ -143,14 +143,16 @@ func (s *ReaderService) starting(ctx context.Context) error {
143143

144144
if lastCommittedOffset == int64(KafkaEndOffset) {
145145
level.Warn(s.logger).Log("msg", fmt.Sprintf("no committed offset found, starting from %d", kafkaStartOffset))
146-
lastCommittedOffset = int64(KafkaStartOffset)
146+
} else {
147+
level.Debug(s.logger).Log("msg", "last committed offset", "offset", lastCommittedOffset)
147148
}
148149

150+
consumeOffset := int64(kafkaStartOffset)
149151
if lastCommittedOffset >= 0 {
150-
lastCommittedOffset++ // We want to begin to read from the next offset, but only if we've previously committed an offset.
152+
// Read from the next offset.
153+
consumeOffset = lastCommittedOffset + 1
151154
}
152-
153-
s.reader.SetOffsetForConsumption(lastCommittedOffset)
155+
s.reader.SetOffsetForConsumption(consumeOffset)
154156

155157
if targetLag, maxLag := s.cfg.TargetConsumerLagAtStartup, s.cfg.MaxConsumerLagAtStartup; targetLag > 0 && maxLag > 0 {
156158
consumer, err := s.consumerFactory(s.committer)

0 commit comments

Comments
 (0)