Skip to content

Commit ad322c0

Browse files
authored
feat(block-scheduler): adds service and basic planner support for scheduler (#15200)
1 parent f598389 commit ad322c0

File tree

12 files changed

+834
-20
lines changed

12 files changed

+834
-20
lines changed

‎docs/sources/shared/configuration.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,26 @@ block_builder:
188188
# CLI flag: -blockbuilder.backoff..backoff-retries
189189
[max_retries: <int> | default = 10]
190190

191+
block_scheduler:
192+
# Consumer group used by block scheduler to track the last consumed offset.
193+
# CLI flag: -block-scheduler.consumer-group
194+
[consumer_group: <string> | default = "block-scheduler"]
195+
196+
# How often the scheduler should plan jobs.
197+
# CLI flag: -block-scheduler.interval
198+
[interval: <duration> | default = 5m]
199+
200+
# Period used by the planner to calculate the start and end offset such that
201+
# each job consumes records spanning the target period.
202+
# CLI flag: -block-scheduler.target-records-spanning-period
203+
[target_records_spanning_period: <duration> | default = 1h]
204+
205+
# Lookback period in milliseconds used by the scheduler to plan jobs when the
206+
# consumer group has no commits. -1 consumes from the latest offset. -2
207+
# consumes from the start of the partition.
208+
# CLI flag: -block-scheduler.lookback-period
209+
[lookback_period: <int> | default = -2]
210+
191211
pattern_ingester:
192212
# Whether the pattern ingester is enabled.
193213
# CLI flag: -pattern-ingester.enabled
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
// SPDX-License-Identifier: AGPL-3.0-only
2+
3+
package scheduler
4+
5+
import (
6+
"context"
7+
"errors"
8+
"fmt"
9+
"sync"
10+
11+
"github.com/twmb/franz-go/pkg/kadm"
12+
"github.com/twmb/franz-go/pkg/kerr"
13+
)
14+
15+
// GetGroupLag is similar to `kadm.Client.Lag` but works when the group doesn't have live participants.
16+
// Similar to `kadm.CalculateGroupLagWithStartOffsets`, it takes into account that the group may not have any commits.
17+
//
18+
// The lag is the difference between the last produced offset (high watermark) and an offset in the "past".
19+
// If the block builder committed an offset for a given partition to the consumer group at least once, then
20+
// the lag is the difference between the last produced offset and the offset committed in the consumer group.
21+
// Otherwise, if the block builder didn't commit an offset for a given partition yet (e.g. block builder is
22+
// running for the first time), then the lag is the difference between the last produced offset and fallbackOffsetMillis.
23+
func GetGroupLag(ctx context.Context, admClient *kadm.Client, topic, group string, fallbackOffsetMillis int64) (kadm.GroupLag, error) {
24+
offsets, err := admClient.FetchOffsets(ctx, group)
25+
if err != nil {
26+
if !errors.Is(err, kerr.GroupIDNotFound) {
27+
return nil, fmt.Errorf("fetch offsets: %w", err)
28+
}
29+
}
30+
if err := offsets.Error(); err != nil {
31+
return nil, fmt.Errorf("fetch offsets got error in response: %w", err)
32+
}
33+
34+
startOffsets, err := admClient.ListStartOffsets(ctx, topic)
35+
if err != nil {
36+
return nil, err
37+
}
38+
endOffsets, err := admClient.ListEndOffsets(ctx, topic)
39+
if err != nil {
40+
return nil, err
41+
}
42+
43+
resolveFallbackOffsets := sync.OnceValues(func() (kadm.ListedOffsets, error) {
44+
return admClient.ListOffsetsAfterMilli(ctx, fallbackOffsetMillis, topic)
45+
})
46+
// If the group-partition in offsets doesn't have a commit, fall back depending on where fallbackOffsetMillis points at.
47+
for topic, pt := range startOffsets.Offsets() {
48+
for partition, startOffset := range pt {
49+
if _, ok := offsets.Lookup(topic, partition); ok {
50+
continue
51+
}
52+
fallbackOffsets, err := resolveFallbackOffsets()
53+
if err != nil {
54+
return nil, fmt.Errorf("resolve fallback offsets: %w", err)
55+
}
56+
o, ok := fallbackOffsets.Lookup(topic, partition)
57+
if !ok {
58+
return nil, fmt.Errorf("partition %d not found in fallback offsets for topic %s", partition, topic)
59+
}
60+
if o.Offset < startOffset.At {
61+
// Skip the resolved fallback offset if it's before the partition's start offset (i.e. before the earliest offset of the partition).
62+
// This should not happen in Kafka, but can happen in Kafka-compatible systems, e.g. Warpstream.
63+
continue
64+
}
65+
offsets.Add(kadm.OffsetResponse{Offset: kadm.Offset{
66+
Topic: o.Topic,
67+
Partition: o.Partition,
68+
At: o.Offset,
69+
LeaderEpoch: o.LeaderEpoch,
70+
}})
71+
}
72+
}
73+
74+
descrGroup := kadm.DescribedGroup{
75+
// "Empty" is the state that indicates that the group doesn't have active consumer members; this is always the case for block-builder,
76+
// because we don't use group consumption.
77+
State: "Empty",
78+
}
79+
return kadm.CalculateGroupLagWithStartOffsets(descrGroup, offsets, startOffsets, endOffsets), nil
80+
}
Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
// SPDX-License-Identifier: AGPL-3.0-only
2+
3+
package scheduler
4+
5+
import (
6+
"context"
7+
"errors"
8+
"testing"
9+
"time"
10+
11+
"github.com/stretchr/testify/require"
12+
"github.com/twmb/franz-go/pkg/kadm"
13+
"github.com/twmb/franz-go/pkg/kgo"
14+
15+
"github.com/grafana/loki/v3/pkg/kafka/testkafka"
16+
)
17+
18+
const (
19+
testTopic = "test"
20+
testGroup = "testgroup"
21+
)
22+
23+
func TestKafkaGetGroupLag(t *testing.T) {
24+
ctx, cancel := context.WithCancelCause(context.Background())
25+
t.Cleanup(func() { cancel(errors.New("test done")) })
26+
27+
_, addr := testkafka.CreateClusterWithoutCustomConsumerGroupsSupport(t, 3, testTopic)
28+
kafkaClient := mustKafkaClient(t, addr)
29+
admClient := kadm.NewClient(kafkaClient)
30+
31+
const numRecords = 5
32+
33+
var producedRecords []kgo.Record
34+
kafkaTime := time.Now().Add(-12 * time.Hour)
35+
for i := int64(0); i < numRecords; i++ {
36+
kafkaTime = kafkaTime.Add(time.Minute)
37+
38+
// Produce and keep records to partition 0.
39+
res := produceRecords(ctx, t, kafkaClient, kafkaTime, "1", testTopic, 0, []byte(`test value`))
40+
rec, err := res.First()
41+
require.NoError(t, err)
42+
require.NotNil(t, rec)
43+
44+
producedRecords = append(producedRecords, *rec)
45+
46+
// Produce same records to partition 1 (this partition won't have any commits).
47+
produceRecords(ctx, t, kafkaClient, kafkaTime, "1", testTopic, 1, []byte(`test value`))
48+
}
49+
require.Len(t, producedRecords, numRecords)
50+
51+
// Commit last produced record from partition 0.
52+
rec := producedRecords[len(producedRecords)-1]
53+
offsets := make(kadm.Offsets)
54+
offsets.Add(kadm.Offset{
55+
Topic: rec.Topic,
56+
Partition: rec.Partition,
57+
At: rec.Offset + 1,
58+
LeaderEpoch: rec.LeaderEpoch,
59+
})
60+
err := admClient.CommitAllOffsets(ctx, testGroup, offsets)
61+
require.NoError(t, err)
62+
63+
// Truncate partition 1 after second to last record to emulate the retention
64+
// Note Kafka sets partition's start offset to the requested offset. Any records within the segment before the requested offset can no longer be read.
65+
// Note the difference between DeleteRecords and DeleteOffsets in kadm docs.
66+
deleteRecOffsets := make(kadm.Offsets)
67+
deleteRecOffsets.Add(kadm.Offset{
68+
Topic: testTopic,
69+
Partition: 1,
70+
At: numRecords - 2,
71+
})
72+
_, err = admClient.DeleteRecords(ctx, deleteRecOffsets)
73+
require.NoError(t, err)
74+
75+
getTopicPartitionLag := func(t *testing.T, lag kadm.GroupLag, topic string, part int32) int64 {
76+
l, ok := lag.Lookup(topic, part)
77+
require.True(t, ok)
78+
return l.Lag
79+
}
80+
81+
t.Run("fallbackOffset=milliseconds", func(t *testing.T) {
82+
// get the timestamp of the last produced record
83+
rec := producedRecords[len(producedRecords)-1]
84+
fallbackOffset := rec.Timestamp.Add(-time.Millisecond).UnixMilli()
85+
groupLag, err := GetGroupLag(ctx, admClient, testTopic, testGroup, fallbackOffset)
86+
require.NoError(t, err)
87+
88+
require.EqualValues(t, 0, getTopicPartitionLag(t, groupLag, testTopic, 0), "partition 0 must have no lag")
89+
require.EqualValues(t, 1, getTopicPartitionLag(t, groupLag, testTopic, 1), "partition 1 must fall back to known record and get its lag from there")
90+
require.EqualValues(t, 0, getTopicPartitionLag(t, groupLag, testTopic, 2), "partition 2 has no data and must have no lag")
91+
})
92+
93+
t.Run("fallbackOffset=before-earliest", func(t *testing.T) {
94+
// get the timestamp of third to last produced record (record before earliest in partition 1)
95+
rec := producedRecords[len(producedRecords)-3]
96+
fallbackOffset := rec.Timestamp.Add(-time.Millisecond).UnixMilli()
97+
groupLag, err := GetGroupLag(ctx, admClient, testTopic, testGroup, fallbackOffset)
98+
require.NoError(t, err)
99+
100+
require.EqualValues(t, 0, getTopicPartitionLag(t, groupLag, testTopic, 0), "partition 0 must have no lag")
101+
require.EqualValues(t, 2, getTopicPartitionLag(t, groupLag, testTopic, 1), "partition 1 must fall back to earliest and get its lag from there")
102+
require.EqualValues(t, 0, getTopicPartitionLag(t, groupLag, testTopic, 2), "partition 2 has no data and must have no lag")
103+
})
104+
105+
t.Run("fallbackOffset=0", func(t *testing.T) {
106+
groupLag, err := GetGroupLag(ctx, admClient, testTopic, testGroup, 0)
107+
require.NoError(t, err)
108+
109+
require.EqualValues(t, 0, getTopicPartitionLag(t, groupLag, testTopic, 0), "partition 0 must have no lag")
110+
require.EqualValues(t, 2, getTopicPartitionLag(t, groupLag, testTopic, 1), "partition 1 must fall back to the earliest and get its lag from there")
111+
require.EqualValues(t, 0, getTopicPartitionLag(t, groupLag, testTopic, 2), "partition 2 has no data and must have no lag")
112+
})
113+
114+
t.Run("group=unknown", func(t *testing.T) {
115+
groupLag, err := GetGroupLag(ctx, admClient, testTopic, "unknown", 0)
116+
require.NoError(t, err)
117+
118+
// This group doesn't have any commits, so it must calc its lag from the fallback.
119+
require.EqualValues(t, numRecords, getTopicPartitionLag(t, groupLag, testTopic, 0))
120+
require.EqualValues(t, 2, getTopicPartitionLag(t, groupLag, testTopic, 1), "partition 1 must fall back to the earliest and get its lag from there")
121+
require.EqualValues(t, 0, getTopicPartitionLag(t, groupLag, testTopic, 2), "partition 2 has no data and must have no lag")
122+
})
123+
}
124+
125+
func mustKafkaClient(t *testing.T, addrs ...string) *kgo.Client {
126+
writeClient, err := kgo.NewClient(
127+
kgo.SeedBrokers(addrs...),
128+
kgo.AllowAutoTopicCreation(),
129+
// We will choose the partition of each record.
130+
kgo.RecordPartitioner(kgo.ManualPartitioner()),
131+
)
132+
require.NoError(t, err)
133+
t.Cleanup(writeClient.Close)
134+
return writeClient
135+
}
136+
137+
func produceRecords(
138+
ctx context.Context,
139+
t *testing.T,
140+
kafkaClient *kgo.Client,
141+
ts time.Time,
142+
userID string,
143+
topic string,
144+
part int32,
145+
val []byte,
146+
) kgo.ProduceResults {
147+
rec := &kgo.Record{
148+
Timestamp: ts,
149+
Key: []byte(userID),
150+
Value: val,
151+
Topic: topic,
152+
Partition: part, // samples in this batch are split between N partitions
153+
}
154+
produceResult := kafkaClient.ProduceSync(ctx, rec)
155+
require.NoError(t, produceResult.FirstErr())
156+
return produceResult
157+
}
158+
159+
func commitOffset(ctx context.Context, t *testing.T, kafkaClient *kgo.Client, group string, offset kadm.Offset) {
160+
offsets := make(kadm.Offsets)
161+
offsets.Add(offset)
162+
err := kadm.NewClient(kafkaClient).CommitAllOffsets(ctx, group, offsets)
163+
require.NoError(t, err)
164+
}

‎pkg/blockbuilder/scheduler/metrics.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package scheduler
2+
3+
import (
4+
"github.com/prometheus/client_golang/prometheus"
5+
"github.com/prometheus/client_golang/prometheus/promauto"
6+
)
7+
8+
type Metrics struct {
9+
lag *prometheus.GaugeVec
10+
committedOffset *prometheus.GaugeVec
11+
}
12+
13+
func NewMetrics(reg prometheus.Registerer) *Metrics {
14+
return &Metrics{
15+
lag: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
16+
Name: "loki_block_scheduler_group_lag",
17+
Help: "How far behind the block scheduler consumer group is from the latest offset.",
18+
}, []string{"partition"}),
19+
committedOffset: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
20+
Name: "loki_block_scheduler_group_committed_offset",
21+
Help: "The current offset the block scheduler consumer group is at.",
22+
}, []string{"partition"}),
23+
}
24+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package scheduler
2+
3+
import (
4+
"context"
5+
"errors"
6+
"time"
7+
8+
"github.com/twmb/franz-go/pkg/kadm"
9+
"github.com/twmb/franz-go/pkg/kgo"
10+
)
11+
12+
type offsetReader struct {
13+
topic string
14+
consumerGroup string
15+
fallbackOffsetMillis int64
16+
17+
adminClient *kadm.Client
18+
}
19+
20+
func NewOffsetReader(topic, consumerGroup string, lookbackPeriodInMs int64, client *kgo.Client) OffsetReader {
21+
var fallbackOffsetMillis int64
22+
if lookbackPeriodInMs >= 0 {
23+
fallbackOffsetMillis = time.Now().UnixMilli() - lookbackPeriodInMs
24+
} else {
25+
fallbackOffsetMillis = lookbackPeriodInMs
26+
}
27+
28+
return &offsetReader{
29+
topic: topic,
30+
consumerGroup: consumerGroup,
31+
adminClient: kadm.NewClient(client),
32+
fallbackOffsetMillis: fallbackOffsetMillis,
33+
}
34+
}
35+
36+
func (r *offsetReader) GroupLag(ctx context.Context) (map[int32]kadm.GroupMemberLag, error) {
37+
lag, err := GetGroupLag(ctx, r.adminClient, r.topic, r.consumerGroup, r.fallbackOffsetMillis)
38+
if err != nil {
39+
return nil, err
40+
}
41+
42+
offsets, ok := lag[r.topic]
43+
if !ok {
44+
return nil, errors.New("no lag found for the topic")
45+
}
46+
47+
return offsets, nil
48+
}
49+
50+
func (r *offsetReader) ListOffsetsAfterMilli(ctx context.Context, ts int64) (map[int32]kadm.ListedOffset, error) {
51+
offsets, err := r.adminClient.ListOffsetsAfterMilli(ctx, ts, r.topic)
52+
if err != nil {
53+
return nil, err
54+
}
55+
56+
resp, ok := offsets[r.topic]
57+
if !ok {
58+
return nil, errors.New("no offsets found for the topic")
59+
}
60+
61+
return resp, nil
62+
}

‎pkg/blockbuilder/scheduler/queue.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,25 @@ func NewJobQueue() *JobQueue {
3030
}
3131
}
3232

33+
func (q *JobQueue) Exists(job *types.Job) (types.JobStatus, bool) {
34+
q.mu.RLock()
35+
defer q.mu.RUnlock()
36+
37+
if _, ok := q.inProgress[job.ID]; ok {
38+
return types.JobStatusInProgress, true
39+
}
40+
41+
if _, ok := q.pending[job.ID]; ok {
42+
return types.JobStatusPending, true
43+
}
44+
45+
if _, ok := q.completed[job.ID]; ok {
46+
return types.JobStatusComplete, true
47+
}
48+
49+
return -1, false
50+
}
51+
3352
// Enqueue adds a new job to the pending queue
3453
// This is a naive implementation, intended to be refactored
3554
func (q *JobQueue) Enqueue(job *types.Job) error {

0 commit comments

Comments
 (0)