Skip to content

Commit 13ea254

Browse files
authored
fix(block-builder): return from Process call early if max offset is reached (#15073)
1 parent 1ea49e3 commit 13ea254

File tree

5 files changed

+56
-23
lines changed

5 files changed

+56
-23
lines changed

‎pkg/blockbuilder/controller.go‎

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import (
55
"fmt"
66
"time"
77

8+
"github.com/go-kit/log"
9+
"github.com/go-kit/log/level"
810
"github.com/prometheus/prometheus/model/labels"
911

1012
"github.com/grafana/dskit/backoff"
@@ -68,11 +70,13 @@ type PartitionJobController struct {
6870
part partition.Reader
6971
backoff backoff.Config
7072
decoder *kafka.Decoder
73+
logger log.Logger
7174
}
7275

7376
func NewPartitionJobController(
7477
controller partition.Reader,
7578
backoff backoff.Config,
79+
logger log.Logger,
7680
) (*PartitionJobController, error) {
7781
decoder, err := kafka.NewDecoder()
7882
if err != nil {
@@ -83,6 +87,11 @@ func NewPartitionJobController(
8387
part: controller,
8488
backoff: backoff,
8589
decoder: decoder,
90+
logger: log.With(logger,
91+
"component", "job-controller",
92+
"topic", controller.Topic(),
93+
"partition", controller.Partition(),
94+
),
8695
}, nil
8796
}
8897

@@ -125,9 +134,9 @@ func (l *PartitionJobController) Process(ctx context.Context, offsets Offsets, c
125134
err error
126135
)
127136

128-
for boff.Ongoing() {
137+
for lastOffset < offsets.Max && boff.Ongoing() {
129138
var records []partition.Record
130-
records, err = l.part.Poll(ctx)
139+
records, err = l.part.Poll(ctx, int(offsets.Max-lastOffset))
131140
if err != nil {
132141
boff.Wait()
133142
continue
@@ -143,11 +152,11 @@ func (l *PartitionJobController) Process(ctx context.Context, offsets Offsets, c
143152

144153
converted := make([]AppendInput, 0, len(records))
145154
for _, record := range records {
146-
offset := records[len(records)-1].Offset
147-
if offset >= offsets.Max {
155+
if record.Offset >= offsets.Max {
156+
level.Debug(l.logger).Log("msg", "record offset exceeds job max offset. stop processing", "record offset", record.Offset, "max offset", offsets.Max)
148157
break
149158
}
150-
lastOffset = offset
159+
lastOffset = record.Offset
151160

152161
stream, labels, err := l.decoder.Decode(record.Content)
153162
if err != nil {
@@ -163,7 +172,9 @@ func (l *PartitionJobController) Process(ctx context.Context, offsets Offsets, c
163172
labelsStr: stream.Labels,
164173
entries: stream.Entries,
165174
})
175+
}
166176

177+
if len(converted) > 0 {
167178
select {
168179
case ch <- converted:
169180
case <-ctx.Done():
@@ -198,7 +209,14 @@ func (l *PartitionJobController) LoadJob(ctx context.Context) (bool, Job, error)
198209
if err != nil {
199210
return false, Job{}, err
200211
}
212+
213+
if highestOffset < committedOffset {
214+
level.Error(l.logger).Log("msg", "partition highest offset is less than committed offset", "highest", highestOffset, "committed", committedOffset)
215+
return false, Job{}, fmt.Errorf("partition highest offset is less than committed offset")
216+
}
217+
201218
if highestOffset == committedOffset {
219+
level.Info(l.logger).Log("msg", "no pending records to process")
202220
return false, Job{}, nil
203221
}
204222

‎pkg/blockbuilder/slimgester.go‎

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ func (i *BlockBuilder) running(ctx context.Context) error {
141141
default:
142142
_, err := i.runOne(ctx)
143143
if err != nil {
144-
return err
144+
level.Error(i.logger).Log("msg", "block builder run failed", "err", err)
145145
}
146146
}
147147

@@ -157,7 +157,7 @@ func (i *BlockBuilder) running(ctx context.Context) error {
157157
"err", err,
158158
)
159159
if err != nil {
160-
return err
160+
level.Error(i.logger).Log("msg", "block builder run failed", "err", err)
161161
}
162162
}
163163
}
@@ -213,6 +213,8 @@ func (i *BlockBuilder) runOne(ctx context.Context) (skipped bool, err error) {
213213
level.Debug(logger).Log(
214214
"msg", "finished loading records",
215215
"ctx_error", ctx.Err(),
216+
"last_offset", lastOffset,
217+
"total_records", lastOffset-job.Offsets.Min,
216218
)
217219
close(inputCh)
218220
return nil
@@ -305,6 +307,7 @@ func (i *BlockBuilder) runOne(ctx context.Context) (skipped bool, err error) {
305307
func() (res struct{}, err error) {
306308
err = i.store.PutOne(ctx, chk.From, chk.Through, *chk)
307309
if err != nil {
310+
level.Error(logger).Log("msg", "failed to flush chunk", "err", err)
308311
i.metrics.chunksFlushFailures.Inc()
309312
return
310313
}
@@ -320,6 +323,10 @@ func (i *BlockBuilder) runOne(ctx context.Context) (skipped bool, err error) {
320323
Entries: uint32(chk.Data.Entries()),
321324
}
322325
err = indexer.Append(chk.UserID, chk.Metric, chk.ChunkRef.Fingerprint, index.ChunkMetas{meta})
326+
if err != nil {
327+
level.Error(logger).Log("msg", "failed to append chunk to index", "err", err)
328+
}
329+
323330
return
324331
},
325332
); err != nil {
@@ -346,24 +353,30 @@ func (i *BlockBuilder) runOne(ctx context.Context) (skipped bool, err error) {
346353

347354
built, err := indexer.create(ctx, nodeName, tableRanges)
348355
if err != nil {
356+
level.Error(logger).Log("msg", "failed to build index", "err", err)
349357
return false, err
350358
}
351359

360+
u := newUploader(i.objStore)
352361
for _, db := range built {
353-
u := newUploader(i.objStore)
354-
if err := u.Put(ctx, db); err != nil {
355-
level.Error(util_log.Logger).Log(
356-
"msg", "failed to upload tsdb",
357-
"path", db.id.Path(),
358-
)
362+
if _, err := withBackoff(ctx, i.cfg.Backoff, func() (res struct{}, err error) {
363+
err = u.Put(ctx, db)
364+
if err != nil {
365+
level.Error(util_log.Logger).Log(
366+
"msg", "failed to upload tsdb",
367+
"path", db.id.Path(),
368+
)
369+
return
370+
}
359371

372+
level.Debug(logger).Log(
373+
"msg", "uploaded tsdb",
374+
"name", db.id.Name(),
375+
)
376+
return
377+
}); err != nil {
360378
return false, err
361379
}
362-
363-
level.Debug(logger).Log(
364-
"msg", "uploaded tsdb",
365-
"name", db.id.Name(),
366-
)
367380
}
368381

369382
if lastOffset <= job.Offsets.Min {

‎pkg/kafka/partition/reader.go‎

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ type Reader interface {
4343
ConsumerGroup() string
4444
FetchLastCommittedOffset(ctx context.Context) (int64, error)
4545
FetchPartitionOffset(ctx context.Context, position SpecialOffset) (int64, error)
46-
Poll(ctx context.Context) ([]Record, error)
46+
Poll(ctx context.Context, maxPollRecords int) ([]Record, error)
4747
Commit(ctx context.Context, offset int64) error
4848
// Set the target offset for consumption. reads will begin from here.
4949
SetOffsetForConsumption(offset int64)
@@ -257,9 +257,10 @@ func (r *StdReader) FetchPartitionOffset(ctx context.Context, position SpecialOf
257257
}
258258

259259
// Poll retrieves the next batch of records from Kafka
260-
func (r *StdReader) Poll(ctx context.Context) ([]Record, error) {
260+
// Number of records fetched can be limited by configuring maxPollRecords to a non-zero value.
261+
func (r *StdReader) Poll(ctx context.Context, maxPollRecords int) ([]Record, error) {
261262
start := time.Now()
262-
fetches := r.client.PollFetches(ctx)
263+
fetches := r.client.PollRecords(ctx, maxPollRecords)
263264
r.metrics.fetchWaitDuration.Observe(time.Since(start).Seconds())
264265

265266
// Record metrics

‎pkg/kafka/partition/reader_service.go‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,7 @@ func (s *ReaderService) processNextFetchesUntilLagHonored(ctx context.Context, m
346346
}
347347

348348
timedCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
349-
records, err := s.reader.Poll(timedCtx)
349+
records, err := s.reader.Poll(timedCtx, -1)
350350
cancel()
351351

352352
if err != nil {
@@ -382,7 +382,7 @@ func (s *ReaderService) startFetchLoop(ctx context.Context) chan []Record {
382382
case <-ctx.Done():
383383
return
384384
default:
385-
res, err := s.reader.Poll(ctx)
385+
res, err := s.reader.Poll(ctx, -1)
386386
if err != nil {
387387
level.Error(s.logger).Log("msg", "error polling records", "err", err)
388388
continue

‎pkg/loki/modules.go‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1822,6 +1822,7 @@ func (t *Loki) initBlockBuilder() (services.Service, error) {
18221822
controller, err := blockbuilder.NewPartitionJobController(
18231823
reader,
18241824
t.Cfg.BlockBuilder.Backoff,
1825+
logger,
18251826
)
18261827

18271828
if err != nil {

0 commit comments

Comments
 (0)