Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fleshing out support for non successful statuses
  • Loading branch information
owen-d committed Dec 10, 2024
commit fddd01ae903a30c315b12c1799aa32f06809690a
138 changes: 85 additions & 53 deletions pkg/blockbuilder/scheduler/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,36 +12,37 @@ const (
defaultCompletedJobsCapacity = 100
)

// JobWithPriority wraps a job with a priority value
type JobWithPriority[T comparable] struct {
Job *types.Job
Priority T
// JobWithPriority wraps a job with its priority
type JobWithPriority[P any] struct {
*types.Job
Priority P
}

// NewJobWithPriority creates a new JobWithPriority instance
func NewJobWithPriority[T comparable](job *types.Job, priority T) *JobWithPriority[T] {
return &JobWithPriority[T]{
func NewJobWithPriority[P any](job *types.Job, priority P) *JobWithPriority[P] {
return &JobWithPriority[P]{
Job: job,
Priority: priority,
}
}

// inProgressJob contains a job and its start time
type inProgressJob struct {
job *types.Job
startTime time.Time
// JobWithStatus wraps a job with its completion status and time
type JobWithStatus struct {
*types.Job
Status types.JobStatus
CompletedAt time.Time
}

// Duration returns how long the job has been running
func (j *inProgressJob) Duration() time.Duration {
return time.Since(j.startTime)
// inProgressJob tracks a job that is currently being processed
type inProgressJob struct {
*types.Job
StartTime time.Time
}

// JobQueue manages the queue of pending jobs and tracks their state.
type JobQueue struct {
pending *PriorityQueue[string, *JobWithPriority[int]] // Jobs waiting to be processed, ordered by priority
inProgress map[string]*inProgressJob // Jobs currently being processed, key is job ID
completed *CircularBuffer[*types.Job] // Last N completed jobs
completed *CircularBuffer[*JobWithStatus] // Last N completed jobs with their status
statusMap map[string]types.JobStatus // Maps job ID to its current status
mu sync.RWMutex
}
Expand All @@ -53,25 +54,24 @@ func NewJobQueue() *JobQueue {
func(a, b *JobWithPriority[int]) bool {
return a.Priority > b.Priority // Higher priority first
},
func(a *JobWithPriority[int]) string {
return a.Job.ID
},
func(j *JobWithPriority[int]) string { return j.ID },
),
inProgress: make(map[string]*inProgressJob),
completed: NewCircularBuffer[*types.Job](defaultCompletedJobsCapacity),
completed: NewCircularBuffer[*JobWithStatus](defaultCompletedJobsCapacity), // Keep last 100 completed jobs
statusMap: make(map[string]types.JobStatus),
}
}

// Exists checks if a job exists in any state and returns its status
func (q *JobQueue) Exists(job *types.Job) (types.JobStatus, bool) {
q.mu.RLock()
defer q.mu.RUnlock()

status, exists := q.statusMap[job.ID]
return status, exists
status, ok := q.statusMap[job.ID]
return status, ok
}

// Enqueue adds a new job to the pending queue with a priority
// Enqueue adds a job to the pending queue with the given priority
func (q *JobQueue) Enqueue(job *types.Job, priority int) error {
q.mu.Lock()
defer q.mu.Unlock()
Expand All @@ -81,56 +81,88 @@ func (q *JobQueue) Enqueue(job *types.Job, priority int) error {
return fmt.Errorf("job %s already exists with status %v", job.ID, status)
}

jobWithPriority := NewJobWithPriority(job, priority)
q.pending.Push(jobWithPriority)
q.pending.Push(NewJobWithPriority(job, priority))
q.statusMap[job.ID] = types.JobStatusPending
return nil
}

// Dequeue gets the next available job and assigns it to a builder
func (q *JobQueue) Dequeue(_ string) (*types.Job, bool, error) {
// Dequeue removes and returns the highest priority job from the pending queue
func (q *JobQueue) Dequeue() (*types.Job, bool) {
q.mu.Lock()
defer q.mu.Unlock()

if q.pending.Len() == 0 {
return nil, false, nil
job, ok := q.pending.Pop()
if !ok {
return nil, false
}

jobWithPriority, ok := q.pending.Pop()
if !ok {
return nil, false, nil
q.inProgress[job.ID] = &inProgressJob{
Job: job.Job,
StartTime: time.Now(),
}
q.statusMap[job.ID] = types.JobStatusInProgress

// Add to in-progress with current time
q.inProgress[jobWithPriority.Job.ID] = &inProgressJob{
job: jobWithPriority.Job,
startTime: time.Now(),
return job.Job, true
}

// GetInProgressJob retrieves a job that is currently being processed
func (q *JobQueue) GetInProgressJob(id string) (*types.Job, time.Time, bool) {
q.mu.RLock()
defer q.mu.RUnlock()

if job, ok := q.inProgress[id]; ok {
return job.Job, job.StartTime, true
}
q.statusMap[jobWithPriority.Job.ID] = types.JobStatusInProgress
return nil, time.Time{}, false
}

return jobWithPriority.Job, true, nil
// RemoveInProgress removes a job from the in-progress map
func (q *JobQueue) RemoveInProgress(id string) {
q.mu.Lock()
defer q.mu.Unlock()

delete(q.inProgress, id)
}

// MarkComplete moves a job from in-progress to completed
func (q *JobQueue) MarkComplete(jobID string) {
// MarkComplete moves a job from in-progress to completed with the given status
func (q *JobQueue) MarkComplete(id string, status types.JobStatus) {
q.mu.Lock()
defer q.mu.Unlock()

// Find job in in-progress map
inProgressJob, exists := q.inProgress[jobID]
// if it doesn't exist, it could be previously removed (duplicate job execution)
// or the scheduler may have restarted and not have the job state anymore.
if exists {
// Remove from in-progress
delete(q.inProgress, jobID)
job, ok := q.inProgress[id]
if !ok {
return
}

// Add to completed buffer and handle evicted job
if evictedJob, hasEvicted := q.completed.Push(inProgressJob.job); hasEvicted {
// Remove evicted job from status map
delete(q.statusMap, evictedJob.ID)
// Add to completed buffer with status
completedJob := &JobWithStatus{
Job: job.Job,
Status: status,
CompletedAt: time.Now(),
}
_, _ = q.completed.Push(completedJob)

// Update status map and clean up
q.statusMap[id] = status
delete(q.inProgress, id)

// If the job failed, re-enqueue it with its original priority
if status == types.JobStatusFailed {
// Look up the original priority from the pending queue
if origJob, ok := q.pending.Lookup(id); ok {
q.pending.Push(origJob) // Re-add with original priority
q.statusMap[id] = types.JobStatusPending
}
}
q.statusMap[jobID] = types.JobStatusComplete
}

// GetStatus returns the current status of a job
func (q *JobQueue) GetStatus(id string) (types.JobStatus, bool) {
q.mu.RLock()
defer q.mu.RUnlock()

status, ok := q.statusMap[id]
return status, ok
}

// SyncJob registers a job as in-progress, used for restoring state after scheduler restarts
Expand All @@ -140,8 +172,8 @@ func (q *JobQueue) SyncJob(jobID string, _ string, job *types.Job) {

// Add directly to in-progress
q.inProgress[jobID] = &inProgressJob{
job: job,
startTime: time.Now(),
Job: job,
StartTime: time.Now(),
}
q.statusMap[jobID] = types.JobStatusInProgress
}
18 changes: 8 additions & 10 deletions pkg/blockbuilder/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,29 +204,27 @@ func (s *BlockScheduler) publishLagMetrics(lag map[int32]kadm.GroupMemberLag) {
}
}

func (s *BlockScheduler) HandleGetJob(ctx context.Context, builderID string) (*types.Job, bool, error) {
func (s *BlockScheduler) HandleGetJob(ctx context.Context, _ string) (*types.Job, bool, error) {
select {
case <-ctx.Done():
return nil, false, ctx.Err()
default:
return s.queue.Dequeue(builderID)
job, ok := s.queue.Dequeue()
return job, ok, nil
}
}

func (s *BlockScheduler) HandleCompleteJob(_ context.Context, _ string, job *types.Job, success bool) error {
logger := log.With(s.logger, "job", job.ID)

status, exists := s.queue.Exists(job)
if !exists {
level.Error(logger).Log("msg", "cannot complete job, job does not exist")
if success {
level.Info(logger).Log("msg", "job completed successfully")
s.queue.MarkComplete(job.ID, types.JobStatusComplete)
return nil
}

if !success {
level.Error(logger).Log("msg", "job failed, re-enqueuing")
return nil
}
s.queue.MarkComplete(job.ID)
level.Error(logger).Log("msg", "job failed, re-enqueuing")
s.queue.MarkComplete(job.ID, types.JobStatusFailed)
return nil
}

Expand Down
19 changes: 19 additions & 0 deletions pkg/blockbuilder/types/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,27 @@ const (
JobStatusPending JobStatus = iota
JobStatusInProgress
JobStatusComplete
JobStatusFailed // Job failed and may be retried
JobStatusExpired // Job failed too many times or is too old
)

func (s JobStatus) String() string {
switch s {
case JobStatusPending:
return "pending"
case JobStatusInProgress:
return "in_progress"
case JobStatusComplete:
return "complete"
case JobStatusFailed:
return "failed"
case JobStatusExpired:
return "expired"
default:
return "unknown"
}
}

// Offsets represents the range of offsets to process
type Offsets struct {
Min int64
Expand Down