-
Notifications
You must be signed in to change notification settings - Fork 3.7k
feat(block-scheduler): adds service and basic planner support for scheduler #15200
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 6 commits
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
7af3c28
add scaffolding
ashwanthgoli 58ce162
time span planner
ashwanthgoli a689048
Merge branch 'main' into block-scheduler
ashwanthgoli 96f97e6
tie in planner changes with the new pkg
ashwanthgoli 07e7245
enqueue planned jobs
ashwanthgoli acbe602
lint
ashwanthgoli c9d4403
review suggestions
ashwanthgoli ca094ee
fix tests
ashwanthgoli d3a3377
make format
ashwanthgoli File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
// SPDX-License-Identifier: AGPL-3.0-only | ||
|
||
package scheduler | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"fmt" | ||
"sync" | ||
|
||
"github.com/twmb/franz-go/pkg/kadm" | ||
"github.com/twmb/franz-go/pkg/kerr" | ||
) | ||
|
||
// GetGroupLag is similar to `kadm.Client.Lag` but works when the group doesn't have live participants. | ||
// Similar to `kadm.CalculateGroupLagWithStartOffsets`, it takes into account that the group may not have any commits. | ||
// | ||
// The lag is the difference between the last produced offset (high watermark) and an offset in the "past". | ||
// If the block builder committed an offset for a given partition to the consumer group at least once, then | ||
// the lag is the difference between the last produced offset and the offset committed in the consumer group. | ||
// Otherwise, if the block builder didn't commit an offset for a given partition yet (e.g. block builder is | ||
// running for the first time), then the lag is the difference between the last produced offset and fallbackOffsetMillis. | ||
func GetGroupLag(ctx context.Context, admClient *kadm.Client, topic, group string, fallbackOffsetMillis int64) (kadm.GroupLag, error) { | ||
offsets, err := admClient.FetchOffsets(ctx, group) | ||
if err != nil { | ||
if !errors.Is(err, kerr.GroupIDNotFound) { | ||
return nil, fmt.Errorf("fetch offsets: %w", err) | ||
} | ||
} | ||
if err := offsets.Error(); err != nil { | ||
return nil, fmt.Errorf("fetch offsets got error in response: %w", err) | ||
} | ||
|
||
startOffsets, err := admClient.ListStartOffsets(ctx, topic) | ||
if err != nil { | ||
return nil, err | ||
} | ||
endOffsets, err := admClient.ListEndOffsets(ctx, topic) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
resolveFallbackOffsets := sync.OnceValues(func() (kadm.ListedOffsets, error) { | ||
if fallbackOffsetMillis < 0 { | ||
return nil, fmt.Errorf("cannot resolve fallback offset for value %v", fallbackOffsetMillis) | ||
} | ||
return admClient.ListOffsetsAfterMilli(ctx, fallbackOffsetMillis, topic) | ||
}) | ||
// If the group-partition in offsets doesn't have a commit, fall back depending on where fallbackOffsetMillis points at. | ||
for topic, pt := range startOffsets.Offsets() { | ||
for partition, startOffset := range pt { | ||
if _, ok := offsets.Lookup(topic, partition); ok { | ||
continue | ||
} | ||
fallbackOffsets, err := resolveFallbackOffsets() | ||
if err != nil { | ||
return nil, fmt.Errorf("resolve fallback offsets: %w", err) | ||
} | ||
o, ok := fallbackOffsets.Lookup(topic, partition) | ||
if !ok { | ||
return nil, fmt.Errorf("partition %d not found in fallback offsets for topic %s", partition, topic) | ||
} | ||
if o.Offset < startOffset.At { | ||
// Skip the resolved fallback offset if it's before the partition's start offset (i.e. before the earliest offset of the partition). | ||
// This should not happen in Kafka, but can happen in Kafka-compatible systems, e.g. Warpstream. | ||
continue | ||
} | ||
offsets.Add(kadm.OffsetResponse{Offset: kadm.Offset{ | ||
Topic: o.Topic, | ||
Partition: o.Partition, | ||
At: o.Offset, | ||
LeaderEpoch: o.LeaderEpoch, | ||
}}) | ||
} | ||
} | ||
|
||
descrGroup := kadm.DescribedGroup{ | ||
// "Empty" is the state that indicates that the group doesn't have active consumer members; this is always the case for block-builder, | ||
// because we don't use group consumption. | ||
State: "Empty", | ||
} | ||
return kadm.CalculateGroupLagWithStartOffsets(descrGroup, offsets, startOffsets, endOffsets), nil | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,169 @@ | ||
// SPDX-License-Identifier: AGPL-3.0-only | ||
|
||
package scheduler | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"testing" | ||
"time" | ||
|
||
"github.com/stretchr/testify/require" | ||
"github.com/twmb/franz-go/pkg/kadm" | ||
"github.com/twmb/franz-go/pkg/kgo" | ||
|
||
"github.com/grafana/loki/v3/pkg/kafka/testkafka" | ||
) | ||
|
||
const ( | ||
testTopic = "test" | ||
testGroup = "testgroup" | ||
) | ||
|
||
func TestKafkaGetGroupLag(t *testing.T) { | ||
ctx, cancel := context.WithCancelCause(context.Background()) | ||
t.Cleanup(func() { cancel(errors.New("test done")) }) | ||
|
||
_, addr := testkafka.CreateClusterWithoutCustomConsumerGroupsSupport(t, 3, testTopic) | ||
kafkaClient := mustKafkaClient(t, addr) | ||
admClient := kadm.NewClient(kafkaClient) | ||
|
||
const numRecords = 5 | ||
|
||
var producedRecords []kgo.Record | ||
kafkaTime := time.Now().Add(-12 * time.Hour) | ||
for i := int64(0); i < numRecords; i++ { | ||
kafkaTime = kafkaTime.Add(time.Minute) | ||
|
||
// Produce and keep records to partition 0. | ||
res := produceRecords(ctx, t, kafkaClient, kafkaTime, "1", testTopic, 0, []byte(`test value`)) | ||
rec, err := res.First() | ||
require.NoError(t, err) | ||
require.NotNil(t, rec) | ||
|
||
producedRecords = append(producedRecords, *rec) | ||
|
||
// Produce same records to partition 1 (this partition won't have any commits). | ||
produceRecords(ctx, t, kafkaClient, kafkaTime, "1", testTopic, 1, []byte(`test value`)) | ||
} | ||
require.Len(t, producedRecords, numRecords) | ||
|
||
// Commit last produced record from partition 0. | ||
rec := producedRecords[len(producedRecords)-1] | ||
offsets := make(kadm.Offsets) | ||
offsets.Add(kadm.Offset{ | ||
Topic: rec.Topic, | ||
Partition: rec.Partition, | ||
At: rec.Offset + 1, | ||
LeaderEpoch: rec.LeaderEpoch, | ||
}) | ||
err := admClient.CommitAllOffsets(ctx, testGroup, offsets) | ||
require.NoError(t, err) | ||
|
||
// Truncate partition 1 after second to last record to emulate the retention | ||
// Note Kafka sets partition's start offset to the requested offset. Any records within the segment before the requested offset can no longer be read. | ||
// Note the difference between DeleteRecords and DeleteOffsets in kadm docs. | ||
deleteRecOffsets := make(kadm.Offsets) | ||
deleteRecOffsets.Add(kadm.Offset{ | ||
Topic: testTopic, | ||
Partition: 1, | ||
At: numRecords - 2, | ||
}) | ||
_, err = admClient.DeleteRecords(ctx, deleteRecOffsets) | ||
require.NoError(t, err) | ||
|
||
getTopicPartitionLag := func(t *testing.T, lag kadm.GroupLag, topic string, part int32) int64 { | ||
l, ok := lag.Lookup(topic, part) | ||
require.True(t, ok) | ||
return l.Lag | ||
} | ||
|
||
t.Run("fallbackOffset=milliseconds", func(t *testing.T) { | ||
// get the timestamp of the last produced record | ||
rec := producedRecords[len(producedRecords)-1] | ||
fallbackOffset := rec.Timestamp.Add(-time.Millisecond).UnixMilli() | ||
groupLag, err := GetGroupLag(ctx, admClient, testTopic, testGroup, fallbackOffset) | ||
require.NoError(t, err) | ||
|
||
require.EqualValues(t, 0, getTopicPartitionLag(t, groupLag, testTopic, 0), "partition 0 must have no lag") | ||
require.EqualValues(t, 1, getTopicPartitionLag(t, groupLag, testTopic, 1), "partition 1 must fall back to known record and get its lag from there") | ||
require.EqualValues(t, 0, getTopicPartitionLag(t, groupLag, testTopic, 2), "partition 2 has no data and must have no lag") | ||
}) | ||
|
||
t.Run("fallbackOffset=before-earliest", func(t *testing.T) { | ||
// get the timestamp of third to last produced record (record before earliest in partition 1) | ||
rec := producedRecords[len(producedRecords)-3] | ||
fallbackOffset := rec.Timestamp.Add(-time.Millisecond).UnixMilli() | ||
groupLag, err := GetGroupLag(ctx, admClient, testTopic, testGroup, fallbackOffset) | ||
require.NoError(t, err) | ||
|
||
require.EqualValues(t, 0, getTopicPartitionLag(t, groupLag, testTopic, 0), "partition 0 must have no lag") | ||
require.EqualValues(t, 2, getTopicPartitionLag(t, groupLag, testTopic, 1), "partition 1 must fall back to earliest and get its lag from there") | ||
require.EqualValues(t, 0, getTopicPartitionLag(t, groupLag, testTopic, 2), "partition 2 has no data and must have no lag") | ||
}) | ||
|
||
t.Run("fallbackOffset=0", func(t *testing.T) { | ||
groupLag, err := GetGroupLag(ctx, admClient, testTopic, testGroup, 0) | ||
require.NoError(t, err) | ||
|
||
require.EqualValues(t, 0, getTopicPartitionLag(t, groupLag, testTopic, 0), "partition 0 must have no lag") | ||
require.EqualValues(t, 2, getTopicPartitionLag(t, groupLag, testTopic, 1), "partition 1 must fall back to the earliest and get its lag from there") | ||
require.EqualValues(t, 0, getTopicPartitionLag(t, groupLag, testTopic, 2), "partition 2 has no data and must have no lag") | ||
}) | ||
|
||
t.Run("fallbackOffset=wrong", func(t *testing.T) { | ||
_, err := GetGroupLag(ctx, admClient, testTopic, testGroup, -1) | ||
require.Error(t, err) | ||
}) | ||
|
||
t.Run("group=unknown", func(t *testing.T) { | ||
groupLag, err := GetGroupLag(ctx, admClient, testTopic, "unknown", 0) | ||
require.NoError(t, err) | ||
|
||
// This group doesn't have any commits, so it must calc its lag from the fallback. | ||
require.EqualValues(t, numRecords, getTopicPartitionLag(t, groupLag, testTopic, 0)) | ||
require.EqualValues(t, 2, getTopicPartitionLag(t, groupLag, testTopic, 1), "partition 1 must fall back to the earliest and get its lag from there") | ||
require.EqualValues(t, 0, getTopicPartitionLag(t, groupLag, testTopic, 2), "partition 2 has no data and must have no lag") | ||
}) | ||
} | ||
|
||
func mustKafkaClient(t *testing.T, addrs ...string) *kgo.Client { | ||
writeClient, err := kgo.NewClient( | ||
kgo.SeedBrokers(addrs...), | ||
kgo.AllowAutoTopicCreation(), | ||
// We will choose the partition of each record. | ||
kgo.RecordPartitioner(kgo.ManualPartitioner()), | ||
) | ||
require.NoError(t, err) | ||
t.Cleanup(writeClient.Close) | ||
return writeClient | ||
} | ||
|
||
func produceRecords( | ||
ctx context.Context, | ||
t *testing.T, | ||
kafkaClient *kgo.Client, | ||
ts time.Time, | ||
userID string, | ||
topic string, | ||
part int32, | ||
val []byte, | ||
) kgo.ProduceResults { | ||
rec := &kgo.Record{ | ||
Timestamp: ts, | ||
Key: []byte(userID), | ||
Value: val, | ||
Topic: topic, | ||
Partition: part, // samples in this batch are split between N partitions | ||
} | ||
produceResult := kafkaClient.ProduceSync(ctx, rec) | ||
require.NoError(t, produceResult.FirstErr()) | ||
return produceResult | ||
} | ||
|
||
func commitOffset(ctx context.Context, t *testing.T, kafkaClient *kgo.Client, group string, offset kadm.Offset) { | ||
offsets := make(kadm.Offsets) | ||
offsets.Add(offset) | ||
err := kadm.NewClient(kafkaClient).CommitAllOffsets(ctx, group, offsets) | ||
require.NoError(t, err) | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
package scheduler | ||
|
||
import ( | ||
"github.com/prometheus/client_golang/prometheus" | ||
"github.com/prometheus/client_golang/prometheus/promauto" | ||
) | ||
|
||
type Metrics struct { | ||
lag *prometheus.GaugeVec | ||
committedOffset *prometheus.GaugeVec | ||
} | ||
|
||
func NewMetrics(reg prometheus.Registerer) *Metrics { | ||
return &Metrics{ | ||
lag: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ | ||
Name: "loki_block_scheduler_group_lag", | ||
Help: "How far behind the block scheduler consumer group is from the latest offset.", | ||
}, []string{"partition"}), | ||
committedOffset: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ | ||
Name: "loki_block_scheduler_group_committed_offset", | ||
Help: "The current offset the block scheduler consumer group is at.", | ||
}, []string{"partition"}), | ||
} | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
package scheduler | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
|
||
"github.com/twmb/franz-go/pkg/kadm" | ||
"github.com/twmb/franz-go/pkg/kgo" | ||
) | ||
|
||
type offsetReader struct { | ||
topic string | ||
consumerGroup string | ||
adminClient *kadm.Client | ||
} | ||
|
||
func NewOffsetReader(topic, consumerGroup string, client *kgo.Client) OffsetReader { | ||
return &offsetReader{ | ||
topic: topic, | ||
consumerGroup: consumerGroup, | ||
adminClient: kadm.NewClient(client), | ||
} | ||
} | ||
|
||
func (r *offsetReader) GroupLag(ctx context.Context) (map[int32]kadm.GroupMemberLag, error) { | ||
lag, err := GetGroupLag(ctx, r.adminClient, r.topic, r.consumerGroup, -1) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
offsets, ok := lag[r.topic] | ||
if !ok { | ||
return nil, errors.New("no lag found for the topic") | ||
} | ||
|
||
return offsets, nil | ||
} | ||
|
||
func (r *offsetReader) ListOffsetsAfterMilli(ctx context.Context, ts int64) (map[int32]kadm.ListedOffset, error) { | ||
offsets, err := r.adminClient.ListOffsetsAfterMilli(ctx, ts, r.topic) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
resp, ok := offsets[r.topic] | ||
if !ok { | ||
return nil, errors.New("no offsets found for the topic") | ||
} | ||
|
||
return resp, nil | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.