Skip to content
Prev Previous commit
Next Next commit
[wip] priority for planning jobs
  • Loading branch information
owen-d committed Dec 3, 2024
commit 2d4ab8bf8ca4aaa2fc3d806109e9f083b4df3c24
43 changes: 27 additions & 16 deletions pkg/blockbuilder/scheduler/priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,30 +86,41 @@ func NewCircularBuffer[T any](capacity int) *CircularBuffer[T] {
}
}

// Push adds an element to the circular buffer.
func (cb *CircularBuffer[T]) Push(v T) {
if cb.size == len(cb.buffer) {
cb.head = (cb.head + 1) % len(cb.buffer)
// Push adds an element to the circular buffer and returns the evicted element if any
func (b *CircularBuffer[T]) Push(v T) (T, bool) {
var evicted T
hasEvicted := false

if b.size == len(b.buffer) {
// If buffer is full, evict the oldest element (at head)
evicted = b.buffer[b.head]
hasEvicted = true
b.head = (b.head + 1) % len(b.buffer)
} else {
cb.size++
b.size++
}
cb.buffer[cb.tail] = v
cb.tail = (cb.tail + 1) % len(cb.buffer)

b.buffer[b.tail] = v
b.tail = (b.tail + 1) % len(b.buffer)

return evicted, hasEvicted
}

// Pop removes and returns the oldest element from the circular buffer.
func (cb *CircularBuffer[T]) Pop() (T, bool) {
if cb.size == 0 {
// Pop removes and returns the oldest element from the buffer
func (b *CircularBuffer[T]) Pop() (T, bool) {
if b.size == 0 {
var zero T
return zero, false
}
v := cb.buffer[cb.head]
cb.head = (cb.head + 1) % len(cb.buffer)
cb.size--

v := b.buffer[b.head]
b.head = (b.head + 1) % len(b.buffer)
b.size--

return v, true
}

// Len returns the number of elements in the circular buffer.
func (cb *CircularBuffer[T]) Len() int {
return cb.size
// Len returns the number of elements in the buffer
func (b *CircularBuffer[T]) Len() int {
return b.size
}
140 changes: 83 additions & 57 deletions pkg/blockbuilder/scheduler/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,69 +3,82 @@ package scheduler
import (
"fmt"
"sync"
"time"

"github.com/grafana/loki/v3/pkg/blockbuilder/types"
)

// jobAssignment tracks a job and its assigned builder
type jobAssignment struct {
const (
defaultCompletedJobsCapacity = 100
)

// JobWithPriority wraps a job with a priority value
type JobWithPriority[T comparable] struct {
Job *types.Job
Priority T
}

// NewJobWithPriority creates a new JobWithPriority instance
func NewJobWithPriority[T comparable](job *types.Job, priority T) *JobWithPriority[T] {
return &JobWithPriority[T]{
Job: job,
Priority: priority,
}
}

// inProgressJob contains a job and its start time
type inProgressJob struct {
job *types.Job
builderID string
startTime time.Time
}

// Duration returns how long the job has been running
func (j *inProgressJob) Duration() time.Duration {
return time.Since(j.startTime)
}

// JobQueue manages the queue of pending jobs and tracks their state.
type JobQueue struct {
pending map[string]*types.Job // Jobs waiting to be processed, key is job ID
inProgress map[string]*jobAssignment // job ID -> assignment info
completed map[string]*types.Job // Completed jobs, key is job ID
pending *PriorityQueue[*JobWithPriority[int]] // Jobs waiting to be processed, ordered by priority
inProgress map[string]*inProgressJob // Jobs currently being processed, key is job ID
completed *CircularBuffer[*types.Job] // Last N completed jobs
statusMap map[string]types.JobStatus // Maps job ID to its current status
mu sync.RWMutex
}

// NewJobQueue creates a new job queue instance
func NewJobQueue() *JobQueue {
return &JobQueue{
pending: make(map[string]*types.Job),
inProgress: make(map[string]*jobAssignment),
completed: make(map[string]*types.Job),
pending: NewPriorityQueue[*JobWithPriority[int]](func(a, b *JobWithPriority[int]) bool {
return a.Priority > b.Priority // Higher priority first
}),
inProgress: make(map[string]*inProgressJob),
completed: NewCircularBuffer[*types.Job](defaultCompletedJobsCapacity),
statusMap: make(map[string]types.JobStatus),
}
}

func (q *JobQueue) Exists(job *types.Job) (types.JobStatus, bool) {
q.mu.RLock()
defer q.mu.RUnlock()

if _, ok := q.inProgress[job.ID]; ok {
return types.JobStatusInProgress, true
}

if _, ok := q.pending[job.ID]; ok {
return types.JobStatusPending, true
}

if _, ok := q.completed[job.ID]; ok {
return types.JobStatusComplete, true
}

return -1, false
status, exists := q.statusMap[job.ID]
return status, exists
}

// Enqueue adds a new job to the pending queue
// This is a naive implementation, intended to be refactored
func (q *JobQueue) Enqueue(job *types.Job) error {
// Enqueue adds a new job to the pending queue with a priority
func (q *JobQueue) Enqueue(job *types.Job, priority int) error {
q.mu.Lock()
defer q.mu.Unlock()

if _, exists := q.pending[job.ID]; exists {
return fmt.Errorf("job %s already exists in pending queue", job.ID)
}
if _, exists := q.inProgress[job.ID]; exists {
return fmt.Errorf("job %s already exists in progress", job.ID)
}
if _, exists := q.completed[job.ID]; exists {
return fmt.Errorf("job %s already completed", job.ID)
// Check if job already exists
if status, exists := q.statusMap[job.ID]; exists {
return fmt.Errorf("job %s already exists with status %v", job.ID, status)
}

q.pending[job.ID] = job
jobWithPriority := NewJobWithPriority(job, priority)
q.pending.Push(jobWithPriority)
q.statusMap[job.ID] = types.JobStatusPending
return nil
}

Expand All @@ -74,52 +87,65 @@ func (q *JobQueue) Dequeue(builderID string) (*types.Job, bool, error) {
q.mu.Lock()
defer q.mu.Unlock()

// Simple FIFO for now
for id, job := range q.pending {
delete(q.pending, id)
q.inProgress[id] = &jobAssignment{
job: job,
builderID: builderID,
}
return job, true, nil
if q.pending.Len() == 0 {
return nil, false, nil
}

return nil, false, nil
jobWithPriority, ok := q.pending.Pop()
if !ok {
return nil, false, nil
}

// Add to in-progress with current time
q.inProgress[jobWithPriority.Job.ID] = &inProgressJob{
job: jobWithPriority.Job,
startTime: time.Now(),
}
q.statusMap[jobWithPriority.Job.ID] = types.JobStatusInProgress

return jobWithPriority.Job, true, nil
}

// MarkComplete moves a job from in-progress to completed
func (q *JobQueue) MarkComplete(jobID string, builderID string) error {
func (q *JobQueue) MarkComplete(jobID string) error {
q.mu.Lock()
defer q.mu.Unlock()

assignment, exists := q.inProgress[jobID]
// Find job in in-progress map
inProgressJob, exists := q.inProgress[jobID]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe this should not error? if a scheduler restarted and lost it's state we'd miss committing already consumed jobs.

but this can be worried about later when we add the committer logic

if !exists {
return fmt.Errorf("job %s not found in progress", jobID)
}

if assignment.builderID != builderID {
return fmt.Errorf("job %s not assigned to builder %s", jobID, builderID)
// Remove from in-progress
delete(q.inProgress, jobID)

// Add to completed buffer and handle evicted job
if evictedJob, hasEvicted := q.completed.Push(inProgressJob.job); hasEvicted {
// Remove evicted job from status map
delete(q.statusMap, evictedJob.ID)
}
q.statusMap[jobID] = types.JobStatusComplete

delete(q.inProgress, jobID)
q.completed[jobID] = assignment.job
return nil
}

// SyncJob updates the state of an in-progress job
func (q *JobQueue) SyncJob(jobID string, builderID string, job *types.Job) error {
// SyncJob registers a job as in-progress, used for restoring state after scheduler restarts
func (q *JobQueue) SyncJob(jobID string, _ string, job *types.Job) error {
q.mu.Lock()
defer q.mu.Unlock()

assignment, exists := q.inProgress[jobID]
if !exists {
return fmt.Errorf("job %s not found in progress", jobID)
// Check if job already exists
if status, exists := q.statusMap[jobID]; exists {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should not error?

return fmt.Errorf("job %s already exists with status %v", jobID, status)
}

if assignment.builderID != builderID {
return fmt.Errorf("job %s not assigned to builder %s", jobID, builderID)
// Add directly to in-progress
q.inProgress[jobID] = &inProgressJob{
job: job,
startTime: time.Now(),
}
q.statusMap[jobID] = types.JobStatusInProgress

assignment.job = job
return nil
}
8 changes: 4 additions & 4 deletions pkg/blockbuilder/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,12 @@ func (s *BlockScheduler) runOnce(ctx context.Context) error {

for _, job := range jobs {
// TODO: end offset keeps moving each time we plan jobs, maybe we should not use it as part of the job ID
if status, ok := s.queue.Exists(&job); ok {
if status, ok := s.queue.Exists(job.Job); ok {
level.Debug(s.logger).Log("msg", "job already exists", "job", job, "status", status)
continue
}

if err := s.queue.Enqueue(&job); err != nil {
if err := s.queue.Enqueue(job.Job, job.Priority); err != nil {
level.Error(s.logger).Log("msg", "failed to enqueue job", "job", job, "err", err)
}
}
Expand All @@ -144,9 +144,9 @@ func (s *BlockScheduler) HandleGetJob(ctx context.Context, builderID string) (*t
}
}

func (s *BlockScheduler) HandleCompleteJob(_ context.Context, builderID string, job *types.Job) error {
func (s *BlockScheduler) HandleCompleteJob(_ context.Context, _ string, job *types.Job) error {
// TODO: handle commits
return s.queue.MarkComplete(job.ID, builderID)
return s.queue.MarkComplete(job.ID)
}

func (s *BlockScheduler) HandleSyncJob(_ context.Context, builderID string, job *types.Job) error {
Expand Down
6 changes: 3 additions & 3 deletions pkg/blockbuilder/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestScheduleAndProcessJob(t *testing.T) {

// Create and enqueue a test job
job := types.NewJob(1, types.Offsets{Min: 100, Max: 200})
err := env.queue.Enqueue(job)
err := env.queue.Enqueue(job, 100)
if err != nil {
t.Fatalf("failed to enqueue job: %v", err)
}
Expand Down Expand Up @@ -98,11 +98,11 @@ func TestMultipleBuilders(t *testing.T) {
job2 := types.NewJob(2, types.Offsets{Min: 300, Max: 400})

// Enqueue jobs
err := env1.queue.Enqueue(job1)
err := env1.queue.Enqueue(job1, 100)
if err != nil {
t.Fatalf("failed to enqueue job1: %v", err)
}
err = env1.queue.Enqueue(job2)
err = env1.queue.Enqueue(job2, 100)
if err != nil {
t.Fatalf("failed to enqueue job2: %v", err)
}
Expand Down
Loading
Loading