Skip to content

Commit c519ab6

Browse files
authored
feat(scheduler): implement and register block builder rpc service (#15248)
1 parent 17f1972 commit c519ab6

File tree

8 files changed

+78
-42
lines changed

8 files changed

+78
-42
lines changed

‎docs/sources/shared/configuration.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,10 @@ block_builder:
192192
# CLI flag: -blockbuilder.sync-interval
193193
[sync_interval: <duration> | default = 30s]
194194

195+
# The interval at which to poll for new jobs.
196+
# CLI flag: -blockbuilder.poll-interval
197+
[poll_interval: <duration> | default = 30s]
198+
195199
# Address of the scheduler in the format described here:
196200
# https://github.com/grpc/grpc/blob/master/doc/naming.md
197201
# CLI flag: -blockbuilder.scheduler-address

‎pkg/blockbuilder/builder/builder.go

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ type Config struct {
4242
Backoff backoff.Config `yaml:"backoff_config"`
4343
WorkerParallelism int `yaml:"worker_parallelism"`
4444
SyncInterval time.Duration `yaml:"sync_interval"`
45+
PollInterval time.Duration `yaml:"poll_interval"`
4546

4647
SchedulerAddress string `yaml:"scheduler_address"`
4748
// SchedulerGRPCClientConfig configures the gRPC connection between the block-builder and its scheduler.
@@ -58,6 +59,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
5859
f.StringVar(&cfg.ChunkEncoding, prefix+"chunk-encoding", compression.Snappy.String(), fmt.Sprintf("The algorithm to use for compressing chunk. (%s)", compression.SupportedCodecs()))
5960
f.DurationVar(&cfg.MaxChunkAge, prefix+"max-chunk-age", 2*time.Hour, "The maximum duration of a timeseries chunk in memory. If a timeseries runs for longer than this, the current chunk will be flushed to the store and a new chunk created.")
6061
f.DurationVar(&cfg.SyncInterval, prefix+"sync-interval", 30*time.Second, "The interval at which to sync job status with the scheduler.")
62+
f.DurationVar(&cfg.PollInterval, prefix+"poll-interval", 30*time.Second, "The interval at which to poll for new jobs.")
6163
f.IntVar(&cfg.WorkerParallelism, prefix+"worker-parallelism", 1, "The number of workers to run in parallel to process jobs.")
6264
f.StringVar(&cfg.SchedulerAddress, prefix+"scheduler-address", "", "Address of the scheduler in the format described here: https://github.com/grpc/grpc/blob/master/doc/naming.md")
6365

@@ -81,6 +83,10 @@ func (cfg *Config) Validate() error {
8183
return errors.New("sync interval must be greater than 0")
8284
}
8385

86+
if cfg.PollInterval <= 0 {
87+
return errors.New("poll interval must be greater than 0")
88+
}
89+
8490
if cfg.WorkerParallelism < 1 {
8591
return errors.New("worker parallelism must be greater than 0")
8692
}
@@ -165,15 +171,23 @@ func (i *BlockBuilder) running(ctx context.Context) error {
165171
go func(id string) {
166172
defer wg.Done()
167173

174+
var waitFor time.Duration
168175
for {
169176
select {
170177
case <-ctx.Done():
171178
return
172-
default:
173-
err := i.runOne(ctx, id)
179+
case <-time.After(waitFor):
180+
gotJob, err := i.runOne(ctx, id)
174181
if err != nil {
175182
level.Error(i.logger).Log("msg", "block builder run failed", "err", err)
176183
}
184+
185+
// poll only when there are no jobs
186+
if gotJob {
187+
waitFor = 0
188+
} else {
189+
waitFor = i.cfg.PollInterval
190+
}
177191
}
178192
}
179193
}(fmt.Sprintf("worker-%d", j))
@@ -218,18 +232,18 @@ func (i *BlockBuilder) syncJobs(ctx context.Context) error {
218232
return nil
219233
}
220234

221-
func (i *BlockBuilder) runOne(ctx context.Context, workerID string) error {
235+
func (i *BlockBuilder) runOne(ctx context.Context, workerID string) (bool, error) {
222236
// assuming GetJob blocks/polls until a job is available
223237
resp, err := i.SendGetJobRequest(ctx, &types.GetJobRequest{
224238
BuilderID: workerID,
225239
})
226240
if err != nil {
227-
return err
241+
return false, err
228242
}
229243

230244
if !resp.OK {
231245
level.Info(i.logger).Log("msg", "no available job to process")
232-
return nil
246+
return false, nil
233247
}
234248

235249
job := resp.Job
@@ -262,15 +276,15 @@ func (i *BlockBuilder) runOne(ctx context.Context, workerID string) error {
262276
return
263277
},
264278
); err != nil {
265-
return err
279+
return true, err
266280
}
267281

268282
i.jobsMtx.Lock()
269283
delete(i.inflightJobs, job.ID)
270284
i.metrics.inflightJobs.Set(float64(len(i.inflightJobs)))
271285
i.jobsMtx.Unlock()
272286

273-
return err
287+
return true, err
274288
}
275289

276290
func (i *BlockBuilder) processJob(ctx context.Context, job *types.Job, logger log.Logger) (lastOffsetConsumed int64, err error) {
@@ -297,7 +311,7 @@ func (i *BlockBuilder) processJob(ctx context.Context, job *types.Job, logger lo
297311
"load records",
298312
1,
299313
func(ctx context.Context) error {
300-
lastOffset, err = i.loadRecords(ctx, int32(job.Partition), job.Offsets, inputCh)
314+
lastOffset, err = i.loadRecords(ctx, job.Partition, job.Offsets, inputCh)
301315
return err
302316
},
303317
func(ctx context.Context) error {
@@ -545,7 +559,7 @@ func (i *BlockBuilder) loadRecords(ctx context.Context, partitionID int32, offse
545559
}
546560
}
547561

548-
return lastOffset, err
562+
return lastOffset, boff.Err()
549563
}
550564

551565
func withBackoff[T any](

‎pkg/blockbuilder/builder/builder_test.go

Lines changed: 0 additions & 16 deletions
This file was deleted.

‎pkg/blockbuilder/scheduler/scheduler.go

Lines changed: 35 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@ import (
1616
"github.com/twmb/franz-go/pkg/kadm"
1717

1818
"github.com/grafana/loki/v3/pkg/blockbuilder/types"
19+
"github.com/grafana/loki/v3/pkg/blockbuilder/types/proto"
1920
)
2021

2122
var (
22-
_ types.Scheduler = unimplementedScheduler{}
2323
_ types.Scheduler = &BlockScheduler{}
2424
)
2525

@@ -185,17 +185,42 @@ func (s *BlockScheduler) HandleSyncJob(_ context.Context, builderID string, job
185185
return nil
186186
}
187187

188-
// unimplementedScheduler provides default implementations that panic.
189-
type unimplementedScheduler struct{}
190-
191-
func (s unimplementedScheduler) HandleGetJob(_ context.Context, _ string) (*types.Job, bool, error) {
192-
panic("unimplemented")
188+
func (s *BlockScheduler) CompleteJob(_ context.Context, req *proto.CompleteJobRequest) (*proto.CompleteJobResponse, error) {
189+
s.queue.MarkComplete(req.Job.Id)
190+
return &proto.CompleteJobResponse{}, nil
193191
}
194192

195-
func (s unimplementedScheduler) HandleCompleteJob(_ context.Context, _ string, _ *types.Job) error {
196-
panic("unimplemented")
193+
func (s *BlockScheduler) SyncJob(_ context.Context, req *proto.SyncJobRequest) (*proto.SyncJobResponse, error) {
194+
s.queue.SyncJob(req.Job.Id, req.BuilderId, &types.Job{
195+
ID: req.Job.Id,
196+
Partition: req.Job.Partition,
197+
Offsets: types.Offsets{
198+
Min: req.Job.Offsets.Min,
199+
Max: req.Job.Offsets.Max,
200+
},
201+
})
202+
203+
return &proto.SyncJobResponse{}, nil
197204
}
198205

199-
func (s unimplementedScheduler) HandleSyncJob(_ context.Context, _ string, _ *types.Job) error {
200-
panic("unimplemented")
206+
func (s *BlockScheduler) GetJob(_ context.Context, req *proto.GetJobRequest) (*proto.GetJobResponse, error) {
207+
var resp proto.GetJobResponse
208+
job, ok, err := s.queue.Dequeue(req.BuilderId)
209+
if err != nil {
210+
return &resp, err
211+
}
212+
213+
if ok {
214+
resp.Ok = true
215+
resp.Job = &proto.Job{
216+
Id: job.ID,
217+
Partition: job.Partition,
218+
Offsets: &proto.Offsets{
219+
Min: job.Offsets.Min,
220+
Max: job.Offsets.Max,
221+
},
222+
}
223+
}
224+
225+
return &resp, nil
201226
}

‎pkg/blockbuilder/scheduler/strategy.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func (p *RecordCountPlanner) Plan(ctx context.Context) ([]*JobWithPriority[int],
7070
currentEnd := min(currentStart+p.targetRecordCount, endOffset)
7171

7272
job := NewJobWithPriority(
73-
types.NewJob(int(partitionOffset.Partition), types.Offsets{
73+
types.NewJob(partitionOffset.Partition, types.Offsets{
7474
Min: currentStart,
7575
Max: currentEnd,
7676
}), int(endOffset-currentStart), // priority is remaining records to process

‎pkg/blockbuilder/types/grpc_transport.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ func protoToJob(p *proto.Job) *Job {
111111
}
112112
return &Job{
113113
ID: p.GetId(),
114-
Partition: int(p.GetPartition()),
114+
Partition: p.GetPartition(),
115115
Offsets: Offsets{
116116
Min: p.GetOffsets().GetMin(),
117117
Max: p.GetOffsets().GetMax(),
@@ -126,7 +126,7 @@ func jobToProto(j *Job) *proto.Job {
126126
}
127127
return &proto.Job{
128128
Id: j.ID,
129-
Partition: int32(j.Partition),
129+
Partition: j.Partition,
130130
Offsets: &proto.Offsets{
131131
Min: j.Offsets.Min,
132132
Max: j.Offsets.Max,

‎pkg/blockbuilder/types/job.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import "fmt"
66
type Job struct {
77
ID string
88
// Partition and offset information
9-
Partition int
9+
Partition int32
1010
Offsets Offsets
1111
}
1212

@@ -26,7 +26,7 @@ type Offsets struct {
2626
}
2727

2828
// NewJob creates a new job with the given partition and offsets
29-
func NewJob(partition int, offsets Offsets) *Job {
29+
func NewJob(partition int32, offsets Offsets) *Job {
3030
return &Job{
3131
ID: GenerateJobID(partition, offsets),
3232
Partition: partition,
@@ -35,6 +35,6 @@ func NewJob(partition int, offsets Offsets) *Job {
3535
}
3636

3737
// GenerateJobID creates a deterministic job ID from partition and offsets
38-
func GenerateJobID(partition int, offsets Offsets) string {
38+
func GenerateJobID(partition int32, offsets Offsets) string {
3939
return fmt.Sprintf("job-%d-%d-%d", partition, offsets.Min, offsets.Max)
4040
}

‎pkg/loki/modules.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import (
3838
"github.com/grafana/loki/v3/pkg/analytics"
3939
blockbuilder "github.com/grafana/loki/v3/pkg/blockbuilder/builder"
4040
blockscheduler "github.com/grafana/loki/v3/pkg/blockbuilder/scheduler"
41+
blockprotos "github.com/grafana/loki/v3/pkg/blockbuilder/types/proto"
4142
"github.com/grafana/loki/v3/pkg/bloombuild/builder"
4243
"github.com/grafana/loki/v3/pkg/bloombuild/planner"
4344
bloomprotos "github.com/grafana/loki/v3/pkg/bloombuild/protos"
@@ -1862,8 +1863,16 @@ func (t *Loki) initBlockScheduler() (services.Service, error) {
18621863
if err != nil {
18631864
return nil, fmt.Errorf("creating kafka client: %w", err)
18641865
}
1866+
18651867
offsetReader := blockscheduler.NewOffsetReader(t.Cfg.KafkaConfig.Topic, t.Cfg.BlockScheduler.ConsumerGroup, t.Cfg.BlockScheduler.LookbackPeriod, c)
1866-
return blockscheduler.NewScheduler(t.Cfg.BlockScheduler, blockscheduler.NewJobQueue(), offsetReader, logger, prometheus.DefaultRegisterer), nil
1868+
s, err := blockscheduler.NewScheduler(t.Cfg.BlockScheduler, blockscheduler.NewJobQueue(), offsetReader, logger, prometheus.DefaultRegisterer), nil
1869+
if err != nil {
1870+
return s, err
1871+
}
1872+
1873+
blockprotos.RegisterBlockBuilderServiceServer(t.Server.GRPC, s)
1874+
1875+
return s, err
18671876
}
18681877

18691878
func (t *Loki) deleteRequestsClient(clientType string, limits limiter.CombinedLimits) (deletion.DeleteRequestsClient, error) {

0 commit comments

Comments
 (0)