Skip to content

Commit 12386a2

Browse files
feat: remove target lag and keep just maximum lag (#15120)
1 parent 65eda52 commit 12386a2

File tree

4 files changed

+67
-135
lines changed

4 files changed

+67
-135
lines changed

‎docs/sources/shared/configuration.md‎

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -839,17 +839,10 @@ kafka_config:
839839
# CLI flag: -kafka.producer-max-buffered-bytes
840840
[producer_max_buffered_bytes: <int> | default = 1073741824]
841841

842-
# The best-effort maximum lag a consumer tries to achieve at startup. Set both
843-
# -kafka.target-consumer-lag-at-startup and -kafka.max-consumer-lag-at-startup
844-
# to 0 to disable waiting for maximum consumer lag being honored at startup.
845-
# CLI flag: -kafka.target-consumer-lag-at-startup
846-
[target_consumer_lag_at_startup: <duration> | default = 2s]
847-
848842
# The guaranteed maximum lag before a consumer is considered to have caught up
849843
# reading from a partition at startup, becomes ACTIVE in the hash ring and
850-
# passes the readiness check. Set both -kafka.target-consumer-lag-at-startup
851-
# and -kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum
852-
# consumer lag being honored at startup.
844+
# passes the readiness check. Set -kafka.max-consumer-lag-at-startup to 0 to
845+
# disable waiting for maximum consumer lag being honored at startup.
853846
# CLI flag: -kafka.max-consumer-lag-at-startup
854847
[max_consumer_lag_at_startup: <duration> | default = 15s]
855848

‎pkg/kafka/config.go‎

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,6 @@ const (
3131
var (
3232
ErrMissingKafkaAddress = errors.New("the Kafka address has not been configured")
3333
ErrMissingKafkaTopic = errors.New("the Kafka topic has not been configured")
34-
ErrInconsistentConsumerLagAtStartup = errors.New("the target and max consumer lag at startup must be either both set to 0 or to a value greater than 0")
35-
ErrInvalidMaxConsumerLagAtStartup = errors.New("the configured max consumer lag at startup must greater or equal than the configured target consumer lag")
3634
ErrInconsistentSASLUsernameAndPassword = errors.New("both sasl username and password must be set")
3735
ErrInvalidProducerMaxRecordSizeBytes = fmt.Errorf("the configured producer max record size bytes must be a value between %d and %d", minProducerRecordDataBytesLimit, maxProducerRecordDataBytesLimit)
3836
)
@@ -59,8 +57,7 @@ type Config struct {
5957
ProducerMaxRecordSizeBytes int `yaml:"producer_max_record_size_bytes"`
6058
ProducerMaxBufferedBytes int64 `yaml:"producer_max_buffered_bytes"`
6159

62-
TargetConsumerLagAtStartup time.Duration `yaml:"target_consumer_lag_at_startup"`
63-
MaxConsumerLagAtStartup time.Duration `yaml:"max_consumer_lag_at_startup"`
60+
MaxConsumerLagAtStartup time.Duration `yaml:"max_consumer_lag_at_startup"`
6461
}
6562

6663
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
@@ -88,8 +85,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
8885
f.IntVar(&cfg.ProducerMaxRecordSizeBytes, prefix+".producer-max-record-size-bytes", maxProducerRecordDataBytesLimit, "The maximum size of a Kafka record data that should be generated by the producer. An incoming write request larger than this size is split into multiple Kafka records. We strongly recommend to not change this setting unless for testing purposes.")
8986
f.Int64Var(&cfg.ProducerMaxBufferedBytes, prefix+".producer-max-buffered-bytes", 1024*1024*1024, "The maximum size of (uncompressed) buffered and unacknowledged produced records sent to Kafka. The produce request fails once this limit is reached. This limit is per Kafka client. 0 to disable the limit.")
9087

91-
consumerLagUsage := fmt.Sprintf("Set both -%s and -%s to 0 to disable waiting for maximum consumer lag being honored at startup.", prefix+".target-consumer-lag-at-startup", prefix+".max-consumer-lag-at-startup")
92-
f.DurationVar(&cfg.TargetConsumerLagAtStartup, prefix+".target-consumer-lag-at-startup", 2*time.Second, "The best-effort maximum lag a consumer tries to achieve at startup. "+consumerLagUsage)
88+
consumerLagUsage := fmt.Sprintf("Set -%s to 0 to disable waiting for maximum consumer lag being honored at startup.", prefix+".max-consumer-lag-at-startup")
9389
f.DurationVar(&cfg.MaxConsumerLagAtStartup, prefix+".max-consumer-lag-at-startup", 15*time.Second, "The guaranteed maximum lag before a consumer is considered to have caught up reading from a partition at startup, becomes ACTIVE in the hash ring and passes the readiness check. "+consumerLagUsage)
9490
}
9591

@@ -103,13 +99,6 @@ func (cfg *Config) Validate() error {
10399
if cfg.ProducerMaxRecordSizeBytes < minProducerRecordDataBytesLimit || cfg.ProducerMaxRecordSizeBytes > maxProducerRecordDataBytesLimit {
104100
return ErrInvalidProducerMaxRecordSizeBytes
105101
}
106-
if (cfg.TargetConsumerLagAtStartup == 0 && cfg.MaxConsumerLagAtStartup != 0) || (cfg.TargetConsumerLagAtStartup != 0 && cfg.MaxConsumerLagAtStartup == 0) {
107-
return ErrInconsistentConsumerLagAtStartup
108-
}
109-
if cfg.MaxConsumerLagAtStartup < cfg.TargetConsumerLagAtStartup {
110-
return ErrInvalidMaxConsumerLagAtStartup
111-
}
112-
113102
if (cfg.SASLUsername == "") != (cfg.SASLPassword.String() == "") {
114103
return ErrInconsistentSASLUsernameAndPassword
115104
}

‎pkg/kafka/partition/reader_service.go‎

Lines changed: 62 additions & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,12 @@ import (
1010
"github.com/go-kit/log/level"
1111
"github.com/grafana/dskit/backoff"
1212
"github.com/grafana/dskit/services"
13-
"github.com/pkg/errors"
1413
"github.com/prometheus/client_golang/prometheus"
1514
"github.com/prometheus/client_golang/prometheus/promauto"
1615

1716
"github.com/grafana/loki/v3/pkg/kafka"
1817
)
1918

20-
var errWaitTargetLagDeadlineExceeded = errors.New("waiting for target lag deadline exceeded")
21-
2219
const (
2320
kafkaStartOffset = -2
2421
kafkaEndOffset = -1
@@ -65,7 +62,6 @@ type ReaderService struct {
6562
}
6663

6764
type ReaderConfig struct {
68-
TargetConsumerLagAtStartup time.Duration
6965
MaxConsumerLagAtStartup time.Duration
7066
ConsumerGroupOffsetCommitFreq time.Duration
7167
}
@@ -92,7 +88,6 @@ func NewReaderService(
9288
}
9389
return newReaderService(
9490
ReaderConfig{
95-
TargetConsumerLagAtStartup: kafkaCfg.TargetConsumerLagAtStartup,
9691
MaxConsumerLagAtStartup: kafkaCfg.MaxConsumerLagAtStartup,
9792
ConsumerGroupOffsetCommitFreq: kafkaCfg.ConsumerGroupOffsetCommitInterval,
9893
},
@@ -151,33 +146,8 @@ func (s *ReaderService) starting(ctx context.Context) error {
151146
level.Debug(s.logger).Log("msg", "consuming from offset", "offset", consumeOffset)
152147
s.reader.SetOffsetForConsumption(consumeOffset)
153148

154-
return s.processConsumerLag(ctx)
155-
}
156-
157-
func (s *ReaderService) processConsumerLag(ctx context.Context) error {
158-
targetLag := s.cfg.TargetConsumerLagAtStartup
159-
maxLag := s.cfg.MaxConsumerLagAtStartup
160-
161-
if targetLag > 0 && maxLag > 0 {
162-
consumer, err := s.consumerFactory(s.committer)
163-
if err != nil {
164-
return fmt.Errorf("failed to create consumer: %w", err)
165-
}
166-
167-
cancelCtx, cancel := context.WithCancel(ctx)
168-
recordsChan := make(chan []Record)
169-
wait := consumer.Start(cancelCtx, recordsChan)
170-
defer func() {
171-
close(recordsChan)
172-
cancel()
173-
wait()
174-
}()
175-
176-
err = s.processNextFetchesUntilTargetOrMaxLagHonored(ctx, maxLag, targetLag, recordsChan)
177-
if err != nil {
178-
level.Error(s.logger).Log("msg", "failed to catch up to max lag", "err", err)
179-
return err
180-
}
149+
if err = s.processConsumerLagAtStartup(ctx); err != nil {
150+
return fmt.Errorf("failed to process consumer lag at startup: %w", err)
181151
}
182152

183153
return nil
@@ -202,89 +172,65 @@ func (s *ReaderService) running(ctx context.Context) error {
202172
return nil
203173
}
204174

205-
// processNextFetchesUntilTargetOrMaxLagHonored process records from Kafka until at least the maxLag is honored.
206-
// This function does a best-effort to get lag below targetLag, but it's not guaranteed that it will be
207-
// reached once this function successfully returns (only maxLag is guaranteed).
208-
func (s *ReaderService) processNextFetchesUntilTargetOrMaxLagHonored(ctx context.Context, targetLag, maxLag time.Duration, recordsChan chan<- []Record) error {
209-
logger := log.With(s.logger, "target_lag", targetLag, "max_lag", maxLag)
210-
level.Info(logger).Log("msg", "partition reader is starting to consume partition until target and max consumer lag is honored")
211-
212-
attempts := []func() (time.Duration, error){
213-
// First process fetches until at least the max lag is honored.
214-
func() (time.Duration, error) {
215-
return s.processNextFetchesUntilLagHonored(ctx, maxLag, logger, recordsChan, time.Since)
216-
},
217-
218-
// If the target lag hasn't been reached with the first attempt (which stops once at least the max lag
219-
// is honored) then we try to reach the (lower) target lag within a fixed time (best-effort).
220-
// The timeout is equal to the max lag. This is done because we expect at least a 2x replay speed
221-
// from Kafka (which means at most it takes 1s to ingest 2s of data): assuming new data is continuously
222-
// written to the partition, we give the reader maxLag time to replay the backlog + ingest the new data
223-
// written in the meanwhile.
224-
func() (time.Duration, error) {
225-
timedCtx, cancel := context.WithTimeoutCause(ctx, maxLag, errWaitTargetLagDeadlineExceeded)
226-
defer cancel()
227-
228-
return s.processNextFetchesUntilLagHonored(timedCtx, targetLag, logger, recordsChan, time.Since)
229-
},
175+
func (s *ReaderService) processConsumerLagAtStartup(ctx context.Context) error {
176+
if s.cfg.MaxConsumerLagAtStartup <= 0 {
177+
level.Debug(s.logger).Log("msg", "processing consumer lag at startup is disabled")
178+
return nil
179+
}
230180

231-
// If the target lag hasn't been reached with the previous attempt then we'll move on. However,
232-
// we still need to guarantee that in the meanwhile the lag didn't increase and max lag is still honored.
233-
func() (time.Duration, error) {
234-
return s.processNextFetchesUntilLagHonored(ctx, maxLag, logger, recordsChan, time.Since)
235-
},
181+
consumer, err := s.consumerFactory(s.committer)
182+
if err != nil {
183+
return fmt.Errorf("failed to create consumer: %w", err)
236184
}
237185

238-
var currLag time.Duration
239-
for _, attempt := range attempts {
240-
var err error
186+
cancelCtx, cancel := context.WithCancel(ctx)
187+
recordsCh := make(chan []Record)
188+
wait := consumer.Start(cancelCtx, recordsCh)
189+
defer func() {
190+
close(recordsCh)
191+
cancel()
192+
wait()
193+
}()
241194

242-
currLag, err = attempt()
243-
if errors.Is(err, errWaitTargetLagDeadlineExceeded) {
244-
continue
245-
}
246-
if err != nil {
247-
return err
248-
}
249-
if currLag <= targetLag {
250-
level.Info(logger).Log(
251-
"msg", "partition reader consumed partition and current lag is lower or equal to configured target consumer lag",
252-
"last_consumed_offset", s.committer.lastCommittedOffset,
253-
"current_lag", currLag,
254-
)
255-
return nil
256-
}
195+
level.Debug(s.logger).Log("msg", "processing consumer lag at startup")
196+
_, err = s.fetchUntilLagSatisfied(ctx, s.cfg.MaxConsumerLagAtStartup, s.logger, recordsCh, time.Since)
197+
if err != nil {
198+
level.Error(s.logger).Log("msg", "failed to catch up", "err", err)
199+
return err
257200
}
201+
level.Debug(s.logger).Log("msg", "processing consumer lag at startup finished")
258202

259-
level.Warn(logger).Log(
260-
"msg", "partition reader consumed partition and current lag is lower than configured max consumer lag but higher than target consumer lag",
261-
"last_consumed_offset", s.committer.lastCommittedOffset,
262-
"current_lag", currLag,
263-
)
264203
return nil
265204
}
266205

267-
func (s *ReaderService) processNextFetchesUntilLagHonored(ctx context.Context, maxLag time.Duration, logger log.Logger, recordsChan chan<- []Record, timeSince func(time.Time) time.Duration) (time.Duration, error) {
268-
boff := backoff.New(ctx, backoff.Config{
206+
func (s *ReaderService) fetchUntilLagSatisfied(
207+
ctx context.Context,
208+
targetLag time.Duration,
209+
logger log.Logger,
210+
recordsCh chan<- []Record,
211+
timeSince func(time.Time) time.Duration,
212+
) (time.Duration, error) {
213+
b := backoff.New(ctx, backoff.Config{
269214
MinBackoff: 100 * time.Millisecond,
270215
MaxBackoff: time.Second,
271-
MaxRetries: 0, // Retry forever (unless context is canceled / deadline exceeded).
216+
// Retry forever (unless context is canceled / deadline exceeded).
217+
MaxRetries: 0,
272218
})
273-
currLag := time.Duration(0)
219+
currentLag := time.Duration(0)
274220

275-
for boff.Ongoing() {
221+
for b.Ongoing() {
276222
// Send a direct request to the Kafka backend to fetch the partition start offset.
277223
partitionStartOffset, err := s.reader.FetchPartitionOffset(ctx, kafkaStartOffset)
278224
if err != nil {
279225
level.Warn(logger).Log("msg", "partition reader failed to fetch partition start offset", "err", err)
280-
boff.Wait()
226+
b.Wait()
281227
continue
282228
}
283229

284230
consumerGroupLastCommittedOffset, err := s.reader.FetchLastCommittedOffset(ctx)
285231
if err != nil {
286232
level.Warn(logger).Log("msg", "partition reader failed to fetch last committed offset", "err", err)
287-
boff.Wait()
233+
b.Wait()
288234
continue
289235
}
290236

@@ -295,17 +241,22 @@ func (s *ReaderService) processNextFetchesUntilLagHonored(ctx context.Context, m
295241
lastProducedOffset, err := s.reader.FetchPartitionOffset(ctx, kafkaEndOffset)
296242
if err != nil {
297243
level.Warn(logger).Log("msg", "partition reader failed to fetch last produced offset", "err", err)
298-
boff.Wait()
244+
b.Wait()
299245
continue
300246
}
301-
lastProducedOffset = lastProducedOffset - 1 // Kafka returns the next empty offset so we must subtract 1 to get the oldest written offset.
247+
248+
// Kafka returns the next empty offset so we must subtract 1 to get the oldest written offset.
249+
lastProducedOffset = lastProducedOffset - 1
302250

303251
level.Debug(logger).Log(
304-
"msg", "fetched latest offset information",
305-
"partition_start_offset", partitionStartOffset,
306-
"last_produced_offset", lastProducedOffset,
307-
"last_committed_offset", consumerGroupLastCommittedOffset,
308-
)
252+
"msg",
253+
"fetched latest offset information",
254+
"partition_start_offset",
255+
partitionStartOffset,
256+
"last_produced_offset",
257+
lastProducedOffset,
258+
"last_committed_offset",
259+
consumerGroupLastCommittedOffset)
309260

310261
// Ensure there are some records to consume. For example, if the partition has been inactive for a long
311262
// time and all its records have been deleted, the partition start offset may be > 0 but there are no
@@ -322,15 +273,15 @@ func (s *ReaderService) processNextFetchesUntilLagHonored(ctx context.Context, m
322273

323274
// This message is NOT expected to be logged with a very high rate. In this log we display the last measured
324275
// lag. If we don't have it (lag is zero value), then it will not be logged.
325-
level.Info(loggerWithCurrentLagIfSet(logger, currLag)).Log("msg", "partition reader is consuming records to honor target and max consumer lag", "partition_start_offset", partitionStartOffset, "last_produced_offset", lastProducedOffset, "last_processed_offset", s.lastProcessedOffset, "offset_lag", lastProducedOffset-s.lastProcessedOffset)
276+
level.Info(loggerWithCurrentLagIfSet(logger, currentLag)).Log("msg", "partition reader is consuming records to honor target and max consumer lag", "partition_start_offset", partitionStartOffset, "last_produced_offset", lastProducedOffset, "last_processed_offset", s.lastProcessedOffset, "offset_lag", lastProducedOffset-s.lastProcessedOffset)
326277

327-
for boff.Ongoing() {
278+
for b.Ongoing() {
328279
// Continue reading until we reached the desired offset.
329280
if lastProducedOffset <= s.lastProcessedOffset {
330281
break
331282
}
332283
if time.Since(lastProducedOffsetRequestedAt) > time.Minute {
333-
level.Info(loggerWithCurrentLagIfSet(logger, currLag)).Log("msg", "partition reader is still consuming records...", "last_processed_offset", s.lastProcessedOffset, "offset_lag", lastProducedOffset-s.lastProcessedOffset)
284+
level.Info(loggerWithCurrentLagIfSet(logger, currentLag)).Log("msg", "partition reader is still consuming records...", "last_processed_offset", s.lastProcessedOffset, "offset_lag", lastProducedOffset-s.lastProcessedOffset)
334285
}
335286

336287
timedCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
@@ -342,23 +293,23 @@ func (s *ReaderService) processNextFetchesUntilLagHonored(ctx context.Context, m
342293
continue
343294
}
344295
if len(records) > 0 {
345-
recordsChan <- records
296+
recordsCh <- records
346297
s.lastProcessedOffset = records[len(records)-1].Offset
347298
}
348299
}
349300

350-
if boff.Err() != nil {
351-
return 0, boff.ErrCause()
301+
if b.Err() != nil {
302+
return 0, b.ErrCause()
352303
}
353304

354305
// If it took less than the max desired lag to replay the partition
355306
// then we can stop here, otherwise we'll have to redo it.
356-
if currLag = timeSince(lastProducedOffsetRequestedAt); currLag <= maxLag {
357-
return currLag, nil
307+
if currentLag = timeSince(lastProducedOffsetRequestedAt); currentLag <= targetLag {
308+
return currentLag, nil
358309
}
359310
}
360311

361-
return 0, boff.ErrCause()
312+
return 0, b.ErrCause()
362313
}
363314

364315
func (s *ReaderService) startFetchLoop(ctx context.Context) chan []Record {
@@ -399,10 +350,10 @@ func (s *serviceMetrics) reportRunning() {
399350
s.phase.WithLabelValues(phaseRunning).Set(1)
400351
}
401352

402-
func loggerWithCurrentLagIfSet(logger log.Logger, currLag time.Duration) log.Logger {
403-
if currLag <= 0 {
353+
func loggerWithCurrentLagIfSet(logger log.Logger, currentLag time.Duration) log.Logger {
354+
if currentLag <= 0 {
404355
return logger
405356
}
406357

407-
return log.With(logger, "current_lag", currLag)
358+
return log.With(logger, "current_lag", currentLag)
408359
}

‎pkg/kafka/partition/reader_test.go‎

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,6 @@ func TestPartitionReader_ProcessCatchUpAtStartup(t *testing.T) {
168168
producer.ProduceSync(context.Background(), records...)
169169

170170
// Enable the catch up logic so starting the reader will read any existing records.
171-
kafkaCfg.TargetConsumerLagAtStartup = time.Second * 1
172171
kafkaCfg.MaxConsumerLagAtStartup = time.Second * 2
173172

174173
err = services.StartAndAwaitRunning(context.Background(), partitionReader)
@@ -246,7 +245,7 @@ func TestPartitionReader_ProcessCommits(t *testing.T) {
246245
return targetLag - 1
247246
}
248247

249-
_, err = readerSvc.processNextFetchesUntilLagHonored(ctx, targetLag, log.NewNopLogger(), recordsChan, timeSince)
248+
_, err = readerSvc.fetchUntilLagSatisfied(ctx, targetLag, log.NewNopLogger(), recordsChan, timeSince)
250249
assert.NoError(t, err)
251250

252251
// Wait to process all the records

0 commit comments

Comments
 (0)