Skip to content

feat(scheduler): implement and register block builder rpc service #15248

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,10 @@ block_builder:
# CLI flag: -blockbuilder.sync-interval
[sync_interval: <duration> | default = 30s]

# The interval at which to poll for new jobs.
# CLI flag: -blockbuilder.poll-interval
[poll_interval: <duration> | default = 30s]

# Address of the scheduler in the format described here:
# https://github.com/grpc/grpc/blob/master/doc/naming.md
# CLI flag: -blockbuilder.scheduler-address
Expand Down
32 changes: 23 additions & 9 deletions pkg/blockbuilder/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type Config struct {
Backoff backoff.Config `yaml:"backoff_config"`
WorkerParallelism int `yaml:"worker_parallelism"`
SyncInterval time.Duration `yaml:"sync_interval"`
PollInterval time.Duration `yaml:"poll_interval"`

SchedulerAddress string `yaml:"scheduler_address"`
// SchedulerGRPCClientConfig configures the gRPC connection between the block-builder and its scheduler.
Expand All @@ -58,6 +59,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&cfg.ChunkEncoding, prefix+"chunk-encoding", compression.Snappy.String(), fmt.Sprintf("The algorithm to use for compressing chunk. (%s)", compression.SupportedCodecs()))
f.DurationVar(&cfg.MaxChunkAge, prefix+"max-chunk-age", 2*time.Hour, "The maximum duration of a timeseries chunk in memory. If a timeseries runs for longer than this, the current chunk will be flushed to the store and a new chunk created.")
f.DurationVar(&cfg.SyncInterval, prefix+"sync-interval", 30*time.Second, "The interval at which to sync job status with the scheduler.")
f.DurationVar(&cfg.PollInterval, prefix+"poll-interval", 30*time.Second, "The interval at which to poll for new jobs.")
f.IntVar(&cfg.WorkerParallelism, prefix+"worker-parallelism", 1, "The number of workers to run in parallel to process jobs.")
f.StringVar(&cfg.SchedulerAddress, prefix+"scheduler-address", "", "Address of the scheduler in the format described here: https://github.com/grpc/grpc/blob/master/doc/naming.md")

Expand All @@ -81,6 +83,10 @@ func (cfg *Config) Validate() error {
return errors.New("sync interval must be greater than 0")
}

if cfg.PollInterval <= 0 {
return errors.New("poll interval must be greater than 0")
}

if cfg.WorkerParallelism < 1 {
return errors.New("worker parallelism must be greater than 0")
}
Expand Down Expand Up @@ -165,15 +171,23 @@ func (i *BlockBuilder) running(ctx context.Context) error {
go func(id string) {
defer wg.Done()

var waitFor time.Duration
for {
select {
case <-ctx.Done():
return
default:
err := i.runOne(ctx, id)
case <-time.After(waitFor):
gotJob, err := i.runOne(ctx, id)
if err != nil {
level.Error(i.logger).Log("msg", "block builder run failed", "err", err)
}

// poll only when there are no jobs
if gotJob {
waitFor = 0
} else {
waitFor = i.cfg.PollInterval
}
}
}
}(fmt.Sprintf("worker-%d", j))
Expand Down Expand Up @@ -218,18 +232,18 @@ func (i *BlockBuilder) syncJobs(ctx context.Context) error {
return nil
}

func (i *BlockBuilder) runOne(ctx context.Context, workerID string) error {
func (i *BlockBuilder) runOne(ctx context.Context, workerID string) (bool, error) {
// assuming GetJob blocks/polls until a job is available
resp, err := i.SendGetJobRequest(ctx, &types.GetJobRequest{
BuilderID: workerID,
})
if err != nil {
return err
return false, err
}

if !resp.OK {
level.Info(i.logger).Log("msg", "no available job to process")
return nil
return false, nil
}

job := resp.Job
Expand Down Expand Up @@ -262,15 +276,15 @@ func (i *BlockBuilder) runOne(ctx context.Context, workerID string) error {
return
},
); err != nil {
return err
return true, err
}

i.jobsMtx.Lock()
delete(i.inflightJobs, job.ID)
i.metrics.inflightJobs.Set(float64(len(i.inflightJobs)))
i.jobsMtx.Unlock()

return err
return true, err
}

func (i *BlockBuilder) processJob(ctx context.Context, job *types.Job, logger log.Logger) (lastOffsetConsumed int64, err error) {
Expand All @@ -297,7 +311,7 @@ func (i *BlockBuilder) processJob(ctx context.Context, job *types.Job, logger lo
"load records",
1,
func(ctx context.Context) error {
lastOffset, err = i.loadRecords(ctx, int32(job.Partition), job.Offsets, inputCh)
lastOffset, err = i.loadRecords(ctx, job.Partition, job.Offsets, inputCh)
return err
},
func(ctx context.Context) error {
Expand Down Expand Up @@ -545,7 +559,7 @@ func (i *BlockBuilder) loadRecords(ctx context.Context, partitionID int32, offse
}
}

return lastOffset, err
return lastOffset, boff.Err()
}

func withBackoff[T any](
Expand Down
16 changes: 0 additions & 16 deletions pkg/blockbuilder/builder/builder_test.go

This file was deleted.

45 changes: 35 additions & 10 deletions pkg/blockbuilder/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ import (
"github.com/twmb/franz-go/pkg/kadm"

"github.com/grafana/loki/v3/pkg/blockbuilder/types"
"github.com/grafana/loki/v3/pkg/blockbuilder/types/proto"
)

var (
_ types.Scheduler = unimplementedScheduler{}
_ types.Scheduler = &BlockScheduler{}
)

Expand Down Expand Up @@ -185,17 +185,42 @@ func (s *BlockScheduler) HandleSyncJob(_ context.Context, builderID string, job
return nil
}

// unimplementedScheduler provides default implementations that panic.
type unimplementedScheduler struct{}

func (s unimplementedScheduler) HandleGetJob(_ context.Context, _ string) (*types.Job, bool, error) {
panic("unimplemented")
func (s *BlockScheduler) CompleteJob(_ context.Context, req *proto.CompleteJobRequest) (*proto.CompleteJobResponse, error) {
s.queue.MarkComplete(req.Job.Id)
return &proto.CompleteJobResponse{}, nil
}

func (s unimplementedScheduler) HandleCompleteJob(_ context.Context, _ string, _ *types.Job) error {
panic("unimplemented")
func (s *BlockScheduler) SyncJob(_ context.Context, req *proto.SyncJobRequest) (*proto.SyncJobResponse, error) {
s.queue.SyncJob(req.Job.Id, req.BuilderId, &types.Job{
ID: req.Job.Id,
Partition: req.Job.Partition,
Offsets: types.Offsets{
Min: req.Job.Offsets.Min,
Max: req.Job.Offsets.Max,
},
})

return &proto.SyncJobResponse{}, nil
}

func (s unimplementedScheduler) HandleSyncJob(_ context.Context, _ string, _ *types.Job) error {
panic("unimplemented")
func (s *BlockScheduler) GetJob(_ context.Context, req *proto.GetJobRequest) (*proto.GetJobResponse, error) {
var resp proto.GetJobResponse
job, ok, err := s.queue.Dequeue(req.BuilderId)
if err != nil {
return &resp, err
}

if ok {
resp.Ok = true
resp.Job = &proto.Job{
Id: job.ID,
Partition: job.Partition,
Offsets: &proto.Offsets{
Min: job.Offsets.Min,
Max: job.Offsets.Max,
},
}
}

return &resp, nil
}
2 changes: 1 addition & 1 deletion pkg/blockbuilder/scheduler/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (p *RecordCountPlanner) Plan(ctx context.Context) ([]*JobWithPriority[int],
currentEnd := min(currentStart+p.targetRecordCount, endOffset)

job := NewJobWithPriority(
types.NewJob(int(partitionOffset.Partition), types.Offsets{
types.NewJob(partitionOffset.Partition, types.Offsets{
Min: currentStart,
Max: currentEnd,
}), int(endOffset-currentStart), // priority is remaining records to process
Expand Down
4 changes: 2 additions & 2 deletions pkg/blockbuilder/types/grpc_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func protoToJob(p *proto.Job) *Job {
}
return &Job{
ID: p.GetId(),
Partition: int(p.GetPartition()),
Partition: p.GetPartition(),
Offsets: Offsets{
Min: p.GetOffsets().GetMin(),
Max: p.GetOffsets().GetMax(),
Expand All @@ -126,7 +126,7 @@ func jobToProto(j *Job) *proto.Job {
}
return &proto.Job{
Id: j.ID,
Partition: int32(j.Partition),
Partition: j.Partition,
Offsets: &proto.Offsets{
Min: j.Offsets.Min,
Max: j.Offsets.Max,
Expand Down
6 changes: 3 additions & 3 deletions pkg/blockbuilder/types/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import "fmt"
type Job struct {
ID string
// Partition and offset information
Partition int
Partition int32
Offsets Offsets
}

Expand All @@ -26,7 +26,7 @@ type Offsets struct {
}

// NewJob creates a new job with the given partition and offsets
func NewJob(partition int, offsets Offsets) *Job {
func NewJob(partition int32, offsets Offsets) *Job {
return &Job{
ID: GenerateJobID(partition, offsets),
Partition: partition,
Expand All @@ -35,6 +35,6 @@ func NewJob(partition int, offsets Offsets) *Job {
}

// GenerateJobID creates a deterministic job ID from partition and offsets
func GenerateJobID(partition int, offsets Offsets) string {
func GenerateJobID(partition int32, offsets Offsets) string {
return fmt.Sprintf("job-%d-%d-%d", partition, offsets.Min, offsets.Max)
}
11 changes: 10 additions & 1 deletion pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/grafana/loki/v3/pkg/analytics"
blockbuilder "github.com/grafana/loki/v3/pkg/blockbuilder/builder"
blockscheduler "github.com/grafana/loki/v3/pkg/blockbuilder/scheduler"
blockprotos "github.com/grafana/loki/v3/pkg/blockbuilder/types/proto"
"github.com/grafana/loki/v3/pkg/bloombuild/builder"
"github.com/grafana/loki/v3/pkg/bloombuild/planner"
bloomprotos "github.com/grafana/loki/v3/pkg/bloombuild/protos"
Expand Down Expand Up @@ -1861,8 +1862,16 @@ func (t *Loki) initBlockScheduler() (services.Service, error) {
if err != nil {
return nil, fmt.Errorf("creating kafka client: %w", err)
}

offsetReader := blockscheduler.NewOffsetReader(t.Cfg.KafkaConfig.Topic, t.Cfg.BlockScheduler.ConsumerGroup, t.Cfg.BlockScheduler.LookbackPeriod, c)
return blockscheduler.NewScheduler(t.Cfg.BlockScheduler, blockscheduler.NewJobQueue(), offsetReader, logger, prometheus.DefaultRegisterer), nil
s, err := blockscheduler.NewScheduler(t.Cfg.BlockScheduler, blockscheduler.NewJobQueue(), offsetReader, logger, prometheus.DefaultRegisterer), nil
if err != nil {
return s, err
}

blockprotos.RegisterBlockBuilderServiceServer(t.Server.GRPC, s)

return s, err
}

func (t *Loki) deleteRequestsClient(clientType string, limits limiter.CombinedLimits) (deletion.DeleteRequestsClient, error) {
Expand Down
Loading