Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/blockbuilder/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ func (i *BlockBuilder) processJob(ctx context.Context, job *types.Job, logger lo
"load records",
1,
func(ctx context.Context) error {
lastOffset, err = i.loadRecords(ctx, int32(job.Partition), job.Offsets, inputCh)
lastOffset, err = i.loadRecords(ctx, job.Partition, job.Offsets, inputCh)
return err
},
func(ctx context.Context) error {
Expand Down Expand Up @@ -545,7 +545,7 @@ func (i *BlockBuilder) loadRecords(ctx context.Context, partitionID int32, offse
}
}

return lastOffset, err
return lastOffset, boff.Err()
}

func withBackoff[T any](
Expand Down
16 changes: 0 additions & 16 deletions pkg/blockbuilder/builder/builder_test.go

This file was deleted.

23 changes: 20 additions & 3 deletions pkg/blockbuilder/scheduler/queue.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package scheduler

import (
"context"
"fmt"
"sync"
"time"

"github.com/grafana/loki/v3/pkg/blockbuilder/types"
"github.com/grafana/loki/v3/pkg/queue"
)

const (
Expand Down Expand Up @@ -44,18 +46,24 @@ type JobQueue struct {
completed *CircularBuffer[*types.Job] // Last N completed jobs
statusMap map[string]types.JobStatus // Maps job ID to its current status
mu sync.RWMutex
cond queue.ContextCond // to signal when a job is available
}

// NewJobQueue creates a new job queue instance
func NewJobQueue() *JobQueue {
return &JobQueue{
q := &JobQueue{
pending: NewPriorityQueue[*JobWithPriority[int]](func(a, b *JobWithPriority[int]) bool {
return a.Priority > b.Priority // Higher priority first
}),
inProgress: make(map[string]*inProgressJob),
completed: NewCircularBuffer[*types.Job](defaultCompletedJobsCapacity),
statusMap: make(map[string]types.JobStatus),
}
q.cond = queue.ContextCond{
Cond: sync.NewCond(&q.mu),
}

return q
}

func (q *JobQueue) Exists(job *types.Job) (types.JobStatus, bool) {
Expand All @@ -79,15 +87,24 @@ func (q *JobQueue) Enqueue(job *types.Job, priority int) error {
jobWithPriority := NewJobWithPriority(job, priority)
q.pending.Push(jobWithPriority)
q.statusMap[job.ID] = types.JobStatusPending
q.cond.Broadcast()
return nil
}

// Dequeue gets the next available job and assigns it to a builder
func (q *JobQueue) Dequeue(_ string) (*types.Job, bool, error) {
func (q *JobQueue) Dequeue(ctx context.Context, _ string, wait bool) (*types.Job, bool, error) {
q.mu.Lock()
defer q.mu.Unlock()

if q.pending.Len() == 0 {
if wait {
for q.pending.Len() == 0 && ctx.Err() == nil {
q.cond.Wait(ctx)
}

if ctx.Err() != nil {
return nil, false, ctx.Err()
}
} else if q.pending.Len() == 0 {
return nil, false, nil
}

Expand Down
49 changes: 38 additions & 11 deletions pkg/blockbuilder/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ import (
"github.com/twmb/franz-go/pkg/kadm"

"github.com/grafana/loki/v3/pkg/blockbuilder/types"
"github.com/grafana/loki/v3/pkg/blockbuilder/types/proto"
)

var (
_ types.Scheduler = unimplementedScheduler{}
_ types.Scheduler = &BlockScheduler{}
)

Expand Down Expand Up @@ -140,7 +140,7 @@ func (s *BlockScheduler) HandleGetJob(ctx context.Context, builderID string) (*t
case <-ctx.Done():
return nil, false, ctx.Err()
default:
return s.queue.Dequeue(builderID)
return s.queue.Dequeue(ctx, builderID, false)
}
}

Expand All @@ -155,17 +155,44 @@ func (s *BlockScheduler) HandleSyncJob(_ context.Context, builderID string, job
return nil
}

// unimplementedScheduler provides default implementations that panic.
type unimplementedScheduler struct{}

func (s unimplementedScheduler) HandleGetJob(_ context.Context, _ string) (*types.Job, bool, error) {
panic("unimplemented")
func (s *BlockScheduler) CompleteJob(_ context.Context, req *proto.CompleteJobRequest) (*proto.CompleteJobResponse, error) {
s.queue.MarkComplete(req.Job.Id)
return nil, nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should return non-nil response types to satisfy caller expectations even though they're basically empty structs.

}

func (s unimplementedScheduler) HandleCompleteJob(_ context.Context, _ string, _ *types.Job) error {
panic("unimplemented")
func (s *BlockScheduler) SyncJob(_ context.Context, req *proto.SyncJobRequest) (*proto.SyncJobResponse, error) {
s.queue.SyncJob(req.Job.Id, req.BuilderId, &types.Job{
ID: req.Job.Id,
Partition: req.Job.Partition,
Offsets: types.Offsets{
Min: req.Job.Offsets.Min,
Max: req.Job.Offsets.Max,
},
})

return nil, nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should return non-nil response types to satisfy caller expectations even though they're basically empty structs.

}

func (s unimplementedScheduler) HandleSyncJob(_ context.Context, _ string, _ *types.Job) error {
panic("unimplemented")
func (s *BlockScheduler) GetJob(req *proto.GetJobRequest, stream proto.BlockBuilderService_GetJobServer) error {
job, ok, err := s.queue.Dequeue(stream.Context(), req.BuilderId, true)
if err != nil {
return err
}

var jobpb *proto.Job
if ok {
jobpb = &proto.Job{
Id: job.ID,
Partition: job.Partition,
Offsets: &proto.Offsets{
Min: job.Offsets.Min,
Max: job.Offsets.Max,
},
}
}

return stream.Send(&proto.GetJobResponse{
Job: jobpb,
Ok: ok,
})
}
4 changes: 2 additions & 2 deletions pkg/blockbuilder/scheduler/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (p *RecordCountPlanner) Plan(ctx context.Context) ([]*JobWithPriority[int],
endOffset := min(startOffset+p.targetRecordCount, partitionOffset.End.Offset)

job := NewJobWithPriority(
types.NewJob(int(partitionOffset.Partition), types.Offsets{
types.NewJob(partitionOffset.Partition, types.Offsets{
Min: startOffset,
Max: endOffset,
}), int(partitionOffset.End.Offset-startOffset),
Expand Down Expand Up @@ -135,7 +135,7 @@ func (p *TimeRangePlanner) Plan(ctx context.Context) ([]*JobWithPriority[int], e
}

job := NewJobWithPriority(
types.NewJob(int(partitionOffset.Partition), types.Offsets{
types.NewJob(partitionOffset.Partition, types.Offsets{
Min: startOffset,
Max: endOffset,
}), int(endOffset-startOffset),
Expand Down
11 changes: 8 additions & 3 deletions pkg/blockbuilder/types/grpc_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,12 @@ func (t *GRPCTransport) SendGetJobRequest(ctx context.Context, req *GetJobReques
BuilderId: req.BuilderID,
}

resp, err := t.GetJob(ctx, protoReq)
client, err := t.GetJob(ctx, protoReq)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think creating streaming clients for every RPC is any better than using the non-streaming RPCs we had prior. These are generally run in a loop

if err != nil {
return nil, err
}

resp, err := client.Recv()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -111,7 +116,7 @@ func protoToJob(p *proto.Job) *Job {
}
return &Job{
ID: p.GetId(),
Partition: int(p.GetPartition()),
Partition: p.GetPartition(),
Offsets: Offsets{
Min: p.GetOffsets().GetMin(),
Max: p.GetOffsets().GetMax(),
Expand All @@ -126,7 +131,7 @@ func jobToProto(j *Job) *proto.Job {
}
return &proto.Job{
Id: j.ID,
Partition: int32(j.Partition),
Partition: j.Partition,
Offsets: &proto.Offsets{
Min: j.Offsets.Min,
Max: j.Offsets.Max,
Expand Down
6 changes: 3 additions & 3 deletions pkg/blockbuilder/types/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import "fmt"
type Job struct {
ID string
// Partition and offset information
Partition int
Partition int32
Offsets Offsets
}

Expand All @@ -26,7 +26,7 @@ type Offsets struct {
}

// NewJob creates a new job with the given partition and offsets
func NewJob(partition int, offsets Offsets) *Job {
func NewJob(partition int32, offsets Offsets) *Job {
return &Job{
ID: GenerateJobID(partition, offsets),
Partition: partition,
Expand All @@ -35,6 +35,6 @@ func NewJob(partition int, offsets Offsets) *Job {
}

// GenerateJobID creates a deterministic job ID from partition and offsets
func GenerateJobID(partition int, offsets Offsets) string {
func GenerateJobID(partition int32, offsets Offsets) string {
return fmt.Sprintf("job-%d-%d-%d", partition, offsets.Min, offsets.Max)
}
Loading
Loading