Skip to content

Commit 09d18d8

Browse files
committed
record count strategy default
1 parent 4218c7c commit 09d18d8

File tree

4 files changed

+230
-159
lines changed

4 files changed

+230
-159
lines changed

‎pkg/blockbuilder/scheduler/scheduler.go

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ import (
44
"context"
55
"errors"
66
"flag"
7+
"fmt"
78
"strconv"
9+
"strings"
810
"time"
911

1012
"github.com/go-kit/log"
@@ -22,17 +24,28 @@ var (
2224
)
2325

2426
type Config struct {
25-
ConsumerGroup string `yaml:"consumer_group"`
26-
Interval time.Duration `yaml:"interval"`
27-
TargetRecordConsumptionPeriod time.Duration `yaml:"target_records_spanning_period"`
28-
LookbackPeriod int64 `yaml:"lookback_period"`
27+
ConsumerGroup string `yaml:"consumer_group"`
28+
Interval time.Duration `yaml:"interval"`
29+
LookbackPeriod int64 `yaml:"lookback_period"`
30+
Strategy string `yaml:"strategy"`
31+
planner Planner `yaml:"-"` // validated planner
32+
TargetRecordCount int64 `yaml:"target_record_count"`
2933
}
3034

3135
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
3236
f.DurationVar(&cfg.Interval, prefix+"interval", 5*time.Minute, "How often the scheduler should plan jobs.")
33-
f.DurationVar(&cfg.TargetRecordConsumptionPeriod, prefix+"target-records-spanning-period", time.Hour, "Period used by the planner to calculate the start and end offset such that each job consumes records spanning the target period.")
3437
f.StringVar(&cfg.ConsumerGroup, prefix+"consumer-group", "block-scheduler", "Consumer group used by block scheduler to track the last consumed offset.")
3538
f.Int64Var(&cfg.LookbackPeriod, prefix+"lookback-period", -2, "Lookback period in milliseconds used by the scheduler to plan jobs when the consumer group has no commits. -1 consumes from the latest offset. -2 consumes from the start of the partition.")
39+
f.StringVar(
40+
&cfg.Strategy,
41+
prefix+"strategy",
42+
RecordCountStrategy,
43+
fmt.Sprintf(
44+
"Strategy used by the planner to plan jobs. One of %s",
45+
strings.Join(validStrategies, ", "),
46+
),
47+
)
48+
f.Int64Var(&cfg.TargetRecordCount, prefix+"target-record-count", 1000, "Target record count used by the planner to plan jobs.")
3649
}
3750

3851
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
@@ -48,6 +61,16 @@ func (cfg *Config) Validate() error {
4861
return errors.New("only -1(latest) and -2(earliest) are valid as negative values for lookback_period")
4962
}
5063

64+
switch cfg.Strategy {
65+
case RecordCountStrategy:
66+
if cfg.TargetRecordCount <= 0 {
67+
return errors.New("target record count must be a non-zero value")
68+
}
69+
cfg.planner = NewRecordCountPlanner(cfg.TargetRecordCount)
70+
default:
71+
return fmt.Errorf("invalid strategy: %s", cfg.Strategy)
72+
}
73+
5174
return nil
5275
}
5376

@@ -66,10 +89,9 @@ type BlockScheduler struct {
6689

6790
// NewScheduler creates a new scheduler instance
6891
func NewScheduler(cfg Config, queue *JobQueue, offsetReader OffsetReader, logger log.Logger, r prometheus.Registerer) *BlockScheduler {
69-
planner := NewTimeRangePlanner(cfg.TargetRecordConsumptionPeriod, offsetReader, func() time.Time { return time.Now().UTC() }, logger)
7092
s := &BlockScheduler{
7193
cfg: cfg,
72-
planner: planner,
94+
planner: cfg.planner,
7395
offsetReader: offsetReader,
7496
logger: logger,
7597
metrics: NewMetrics(r),

‎pkg/blockbuilder/scheduler/scheduler_test.go

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,3 +149,104 @@ func TestMultipleBuilders(t *testing.T) {
149149
t.Error("builder1 got unexpected second job")
150150
}
151151
}
152+
153+
func TestConfig_Validate(t *testing.T) {
154+
tests := []struct {
155+
name string
156+
cfg Config
157+
wantErr string
158+
}{
159+
{
160+
name: "valid config with record count strategy",
161+
cfg: Config{
162+
Interval: time.Minute,
163+
LookbackPeriod: -1,
164+
Strategy: RecordCountStrategy,
165+
TargetRecordCount: 1000,
166+
},
167+
},
168+
{
169+
name: "zero interval",
170+
cfg: Config{
171+
Interval: 0,
172+
LookbackPeriod: -1,
173+
Strategy: RecordCountStrategy,
174+
TargetRecordCount: 1000,
175+
},
176+
wantErr: "interval must be a non-zero value",
177+
},
178+
{
179+
name: "negative interval",
180+
cfg: Config{
181+
Interval: -time.Minute,
182+
LookbackPeriod: -1,
183+
Strategy: RecordCountStrategy,
184+
TargetRecordCount: 1000,
185+
},
186+
wantErr: "interval must be a non-zero value",
187+
},
188+
{
189+
name: "invalid lookback period",
190+
cfg: Config{
191+
Interval: time.Minute,
192+
LookbackPeriod: -3,
193+
Strategy: RecordCountStrategy,
194+
TargetRecordCount: 1000,
195+
},
196+
wantErr: "only -1(latest) and -2(earliest) are valid as negative values for lookback_period",
197+
},
198+
{
199+
name: "invalid strategy",
200+
cfg: Config{
201+
Interval: time.Minute,
202+
LookbackPeriod: -1,
203+
Strategy: "invalid",
204+
TargetRecordCount: 1000,
205+
},
206+
wantErr: "invalid strategy: invalid",
207+
},
208+
{
209+
name: "zero target record count",
210+
cfg: Config{
211+
Interval: time.Minute,
212+
LookbackPeriod: -1,
213+
Strategy: RecordCountStrategy,
214+
TargetRecordCount: 0,
215+
},
216+
wantErr: "target record count must be a non-zero value",
217+
},
218+
{
219+
name: "negative target record count",
220+
cfg: Config{
221+
Interval: time.Minute,
222+
LookbackPeriod: -1,
223+
Strategy: RecordCountStrategy,
224+
TargetRecordCount: -1000,
225+
},
226+
wantErr: "target record count must be a non-zero value",
227+
},
228+
}
229+
230+
for _, tt := range tests {
231+
t.Run(tt.name, func(t *testing.T) {
232+
err := tt.cfg.Validate()
233+
if tt.wantErr != "" {
234+
if err == nil {
235+
t.Errorf("Validate() error = nil, wantErr %v", tt.wantErr)
236+
return
237+
}
238+
if err.Error() != tt.wantErr {
239+
t.Errorf("Validate() error = %v, wantErr %v", err, tt.wantErr)
240+
}
241+
return
242+
}
243+
if err != nil {
244+
t.Errorf("Validate() error = %v, wantErr nil", err)
245+
}
246+
// Check that planner is set for valid configs
247+
if tt.cfg.planner == nil {
248+
t.Error("Validate() did not set planner for valid config")
249+
}
250+
})
251+
}
252+
}

‎pkg/blockbuilder/scheduler/strategy.go

Lines changed: 26 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package scheduler
33
import (
44
"context"
55
"sort"
6-
"time"
76

87
"github.com/go-kit/log"
98
"github.com/go-kit/log/level"
@@ -14,7 +13,6 @@ import (
1413

1514
// OffsetReader is an interface to list offsets for all partitions of a topic from Kafka.
1615
type OffsetReader interface {
17-
ListOffsetsAfterMilli(context.Context, int64) (map[int32]kadm.ListedOffset, error)
1816
GroupLag(context.Context) (map[int32]kadm.GroupMemberLag, error)
1917
}
2018

@@ -24,10 +22,13 @@ type Planner interface {
2422
}
2523

2624
const (
27-
RecordCountStrategy = "record_count"
28-
TimeRangeStrategy = "time_range"
25+
RecordCountStrategy = "record-count"
2926
)
3027

28+
var validStrategies = []string{
29+
RecordCountStrategy,
30+
}
31+
3132
// tries to consume upto targetRecordCount records per partition
3233
type RecordCountPlanner struct {
3334
targetRecordCount int64
@@ -52,101 +53,40 @@ func (p *RecordCountPlanner) Plan(ctx context.Context) ([]*JobWithPriority[int],
5253
return nil, err
5354
}
5455

55-
var jobs []*JobWithPriority[int]
56+
jobs := make([]*JobWithPriority[int], 0, len(offsets))
5657
for _, partitionOffset := range offsets {
5758
// kadm.GroupMemberLag contains valid Commit.At even when consumer group never committed any offset.
5859
// no additional validation is needed here
5960
startOffset := partitionOffset.Commit.At + 1
60-
endOffset := min(startOffset+p.targetRecordCount, partitionOffset.End.Offset)
61-
62-
job := NewJobWithPriority(
63-
types.NewJob(int(partitionOffset.Partition), types.Offsets{
64-
Min: startOffset,
65-
Max: endOffset,
66-
}), int(partitionOffset.End.Offset-startOffset),
67-
)
68-
69-
jobs = append(jobs, job)
70-
}
71-
72-
// Sort jobs by partition number to ensure consistent ordering
73-
sort.Slice(jobs, func(i, j int) bool {
74-
return jobs[i].Job.Partition < jobs[j].Job.Partition
75-
})
76-
77-
return jobs, nil
78-
}
79-
80-
// Targets consuming records spanning a configured period.
81-
// This is a stateless planner, it is upto the caller to deduplicate or update jobs that are already in queue or progress.
82-
type TimeRangePlanner struct {
83-
offsetReader OffsetReader
84-
85-
buffer time.Duration
86-
targetPeriod time.Duration
87-
now func() time.Time
88-
89-
logger log.Logger
90-
}
91-
92-
func NewTimeRangePlanner(interval time.Duration, offsetReader OffsetReader, now func() time.Time, logger log.Logger) *TimeRangePlanner {
93-
return &TimeRangePlanner{
94-
targetPeriod: interval,
95-
buffer: interval,
96-
offsetReader: offsetReader,
97-
now: now,
98-
logger: logger,
99-
}
100-
}
101-
102-
func (p *TimeRangePlanner) Name() string {
103-
return TimeRangeStrategy
104-
}
105-
106-
func (p *TimeRangePlanner) Plan(ctx context.Context) ([]*JobWithPriority[int], error) {
107-
// truncate to the nearest Interval
108-
consumeUptoTS := p.now().Add(-p.buffer).Truncate(p.targetPeriod)
109-
110-
// this will return the latest offset in the partition if no records are produced after this ts.
111-
consumeUptoOffsets, err := p.offsetReader.ListOffsetsAfterMilli(ctx, consumeUptoTS.UnixMilli())
112-
if err != nil {
113-
level.Error(p.logger).Log("msg", "failed to list offsets after timestamp", "err", err)
114-
return nil, err
115-
}
116-
117-
offsets, err := p.offsetReader.GroupLag(ctx)
118-
if err != nil {
119-
level.Error(p.logger).Log("msg", "failed to get group lag", "err", err)
120-
return nil, err
121-
}
122-
123-
var jobs []*JobWithPriority[int]
124-
for _, partitionOffset := range offsets {
125-
startOffset := partitionOffset.Commit.At + 1
126-
// TODO: we could further break down the work into Interval sized chunks if this partition has pending records spanning a long time range
127-
// or have the builder consume in chunks and commit the job status back to scheduler.
128-
endOffset := consumeUptoOffsets[partitionOffset.Partition].Offset
61+
endOffset := partitionOffset.End.Offset
12962

63+
// Skip if there's no lag
13064
if startOffset >= endOffset {
131-
level.Info(p.logger).Log("msg", "no pending records to process", "partition", partitionOffset.Partition,
132-
"commitOffset", partitionOffset.Commit.At,
133-
"consumeUptoOffset", consumeUptoOffsets[partitionOffset.Partition].Offset)
13465
continue
13566
}
13667

137-
job := NewJobWithPriority(
138-
types.NewJob(int(partitionOffset.Partition), types.Offsets{
139-
Min: startOffset,
140-
Max: endOffset,
141-
}), int(endOffset-startOffset),
142-
)
68+
// Create jobs of size targetRecordCount until we reach endOffset
69+
for currentStart := startOffset; currentStart < endOffset; {
70+
currentEnd := min(currentStart+p.targetRecordCount, endOffset)
71+
72+
job := NewJobWithPriority(
73+
types.NewJob(int(partitionOffset.Partition), types.Offsets{
74+
Min: currentStart,
75+
Max: currentEnd,
76+
}), int(endOffset-currentStart), // priority is remaining records to process
77+
)
78+
jobs = append(jobs, job)
14379

144-
jobs = append(jobs, job)
80+
currentStart = currentEnd
81+
}
14582
}
14683

147-
// Sort jobs by partition number to ensure consistent ordering
84+
// Sort jobs by partition then priority
14885
sort.Slice(jobs, func(i, j int) bool {
149-
return jobs[i].Job.Partition < jobs[j].Job.Partition
86+
if jobs[i].Job.Partition != jobs[j].Job.Partition {
87+
return jobs[i].Job.Partition < jobs[j].Job.Partition
88+
}
89+
return jobs[i].Priority > jobs[j].Priority
15090
})
15191

15292
return jobs, nil

0 commit comments

Comments
 (0)