Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
consolidate to JobWithMetadata
  • Loading branch information
owen-d committed Dec 10, 2024
commit 118a9b96765e0f89ba38810d62dc0e18f0c587ab
106 changes: 52 additions & 54 deletions pkg/blockbuilder/scheduler/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,52 +12,45 @@ const (
defaultCompletedJobsCapacity = 100
)

// JobWithPriority wraps a job with its priority
type JobWithPriority[P any] struct {
*types.Job
Priority P
}

func NewJobWithPriority[P any](job *types.Job, priority P) *JobWithPriority[P] {
return &JobWithPriority[P]{
Job: job,
Priority: priority,
}
}

// JobWithStatus wraps a job with its completion status and time
type JobWithStatus struct {
// JobWithMetadata wraps a job with additional metadata for tracking its lifecycle
type JobWithMetadata struct {
*types.Job
Priority int
Status types.JobStatus
CompletedAt time.Time
StartTime time.Time
UpdateTime time.Time
}

// inProgressJob tracks a job that is currently being processed
type inProgressJob struct {
*types.Job
StartTime time.Time
// NewJobWithMetadata creates a new JobWithMetadata instance
func NewJobWithMetadata(job *types.Job, priority int) *JobWithMetadata {
return &JobWithMetadata{
Job: job,
Priority: priority,
Status: types.JobStatusPending,
UpdateTime: time.Now(),
}
}

// JobQueue manages the queue of pending jobs and tracks their state.
type JobQueue struct {
pending *PriorityQueue[string, *JobWithPriority[int]] // Jobs waiting to be processed, ordered by priority
inProgress map[string]*inProgressJob // Jobs currently being processed, key is job ID
completed *CircularBuffer[*JobWithStatus] // Last N completed jobs with their status
statusMap map[string]types.JobStatus // Maps job ID to its current status
pending *PriorityQueue[string, *JobWithMetadata] // Jobs waiting to be processed, ordered by priority
inProgress map[string]*JobWithMetadata // Jobs currently being processed
completed *CircularBuffer[*JobWithMetadata] // Last N completed jobs
statusMap map[string]types.JobStatus // Maps job ID to its current status
mu sync.RWMutex
}

// NewJobQueue creates a new job queue instance
func NewJobQueue() *JobQueue {
return &JobQueue{
pending: NewPriorityQueue(
func(a, b *JobWithPriority[int]) bool {
func(a, b *JobWithMetadata) bool {
return a.Priority > b.Priority // Higher priority first
},
func(j *JobWithPriority[int]) string { return j.ID },
func(j *JobWithMetadata) string { return j.ID },
),
inProgress: make(map[string]*inProgressJob),
completed: NewCircularBuffer[*JobWithStatus](defaultCompletedJobsCapacity), // Keep last 100 completed jobs
inProgress: make(map[string]*JobWithMetadata),
completed: NewCircularBuffer[*JobWithMetadata](defaultCompletedJobsCapacity),
statusMap: make(map[string]types.JobStatus),
}
}
Expand All @@ -81,7 +74,8 @@ func (q *JobQueue) Enqueue(job *types.Job, priority int) error {
return fmt.Errorf("job %s already exists with status %v", job.ID, status)
}

q.pending.Push(NewJobWithPriority(job, priority))
jobMeta := NewJobWithMetadata(job, priority)
q.pending.Push(jobMeta)
q.statusMap[job.ID] = types.JobStatusPending
return nil
}
Expand All @@ -91,27 +85,29 @@ func (q *JobQueue) Dequeue() (*types.Job, bool) {
q.mu.Lock()
defer q.mu.Unlock()

job, ok := q.pending.Pop()
jobMeta, ok := q.pending.Pop()
if !ok {
return nil, false
}

q.inProgress[job.ID] = &inProgressJob{
Job: job.Job,
StartTime: time.Now(),
}
q.statusMap[job.ID] = types.JobStatusInProgress
// Update metadata for in-progress state
jobMeta.Status = types.JobStatusInProgress
jobMeta.StartTime = time.Now()
jobMeta.UpdateTime = jobMeta.StartTime

return job.Job, true
q.inProgress[jobMeta.ID] = jobMeta
q.statusMap[jobMeta.ID] = types.JobStatusInProgress

return jobMeta.Job, true
}

// GetInProgressJob retrieves a job that is currently being processed
func (q *JobQueue) GetInProgressJob(id string) (*types.Job, time.Time, bool) {
q.mu.RLock()
defer q.mu.RUnlock()

if job, ok := q.inProgress[id]; ok {
return job.Job, job.StartTime, true
if jobMeta, ok := q.inProgress[id]; ok {
return jobMeta.Job, jobMeta.StartTime, true
}
return nil, time.Time{}, false
}
Expand All @@ -129,30 +125,28 @@ func (q *JobQueue) MarkComplete(id string, status types.JobStatus) {
q.mu.Lock()
defer q.mu.Unlock()

job, ok := q.inProgress[id]
jobMeta, ok := q.inProgress[id]
if !ok {
return
}

// Add to completed buffer with status
completedJob := &JobWithStatus{
Job: job.Job,
Status: status,
CompletedAt: time.Now(),
}
_, _ = q.completed.Push(completedJob)
// Update metadata for completion
jobMeta.Status = status
jobMeta.UpdateTime = time.Now()

// Add to completed buffer
_, _ = q.completed.Push(jobMeta)

// Update status map and clean up
q.statusMap[id] = status
delete(q.inProgress, id)

// If the job failed, re-enqueue it with its original priority
if status == types.JobStatusFailed {
// Look up the original priority from the pending queue
if origJob, ok := q.pending.Lookup(id); ok {
q.pending.Push(origJob) // Re-add with original priority
q.statusMap[id] = types.JobStatusPending
}
// Create new metadata for the re-enqueued job
newJobMeta := NewJobWithMetadata(jobMeta.Job, jobMeta.Priority)
q.pending.Push(newJobMeta)
q.statusMap[id] = types.JobStatusPending
}
}

Expand All @@ -171,9 +165,13 @@ func (q *JobQueue) SyncJob(jobID string, _ string, job *types.Job) {
defer q.mu.Unlock()

// Add directly to in-progress
q.inProgress[jobID] = &inProgressJob{
Job: job,
StartTime: time.Now(),
jobMeta := &JobWithMetadata{
Job: job,
Priority: 0, // Priority is not known in this case
Status: types.JobStatusInProgress,
StartTime: time.Now(),
UpdateTime: time.Now(),
}
q.inProgress[jobID] = jobMeta
q.statusMap[jobID] = types.JobStatusInProgress
}
11 changes: 6 additions & 5 deletions pkg/blockbuilder/scheduler/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type OffsetReader interface {

type Planner interface {
Name() string
Plan(ctx context.Context) ([]*JobWithPriority[int], error)
Plan(ctx context.Context) ([]*JobWithMetadata, error)
}

const (
Expand Down Expand Up @@ -46,14 +46,14 @@ func (p *RecordCountPlanner) Name() string {
return RecordCountStrategy
}

func (p *RecordCountPlanner) Plan(ctx context.Context) ([]*JobWithPriority[int], error) {
func (p *RecordCountPlanner) Plan(ctx context.Context) ([]*JobWithMetadata, error) {
offsets, err := p.offsetReader.GroupLag(ctx)
if err != nil {
level.Error(p.logger).Log("msg", "failed to get group lag", "err", err)
return nil, err
}

jobs := make([]*JobWithPriority[int], 0, len(offsets))
jobs := make([]*JobWithMetadata, 0, len(offsets))
for _, partitionOffset := range offsets {
// kadm.GroupMemberLag contains valid Commit.At even when consumer group never committed any offset.
// no additional validation is needed here
Expand All @@ -69,11 +69,12 @@ func (p *RecordCountPlanner) Plan(ctx context.Context) ([]*JobWithPriority[int],
for currentStart := startOffset; currentStart < endOffset; {
currentEnd := min(currentStart+p.targetRecordCount, endOffset)

job := NewJobWithPriority(
job := NewJobWithMetadata(
types.NewJob(partitionOffset.Partition, types.Offsets{
Min: currentStart,
Max: currentEnd,
}), int(endOffset-currentStart), // priority is remaining records to process
}),
int(endOffset-currentStart), // priority is remaining records to process
)
jobs = append(jobs, job)

Expand Down
38 changes: 24 additions & 14 deletions pkg/blockbuilder/scheduler/strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,19 @@ func (m *mockOffsetReader) GroupLag(_ context.Context) (map[int32]kadm.GroupMemb
return m.groupLag, nil
}

// compareJobs compares two JobWithMetadata instances ignoring UpdateTime
func compareJobs(t *testing.T, expected, actual *JobWithMetadata) {
require.Equal(t, expected.Job, actual.Job)
require.Equal(t, expected.Priority, actual.Priority)
require.Equal(t, expected.Status, actual.Status)
require.Equal(t, expected.StartTime, actual.StartTime)
}

func TestRecordCountPlanner_Plan(t *testing.T) {
for _, tc := range []struct {
name string
recordCount int64
expectedJobs []*JobWithPriority[int]
expectedJobs []*JobWithMetadata
groupLag map[int32]kadm.GroupMemberLag
}{
{
Expand All @@ -40,8 +48,8 @@ func TestRecordCountPlanner_Plan(t *testing.T) {
Partition: 0,
},
},
expectedJobs: []*JobWithPriority[int]{
NewJobWithPriority(
expectedJobs: []*JobWithMetadata{
NewJobWithMetadata(
types.NewJob(0, types.Offsets{Min: 101, Max: 150}),
49, // 150-101
),
Expand All @@ -61,12 +69,12 @@ func TestRecordCountPlanner_Plan(t *testing.T) {
Partition: 0,
},
},
expectedJobs: []*JobWithPriority[int]{
NewJobWithPriority(
expectedJobs: []*JobWithMetadata{
NewJobWithMetadata(
types.NewJob(0, types.Offsets{Min: 101, Max: 151}),
99, // priority is total remaining: 200-101
),
NewJobWithPriority(
NewJobWithMetadata(
types.NewJob(0, types.Offsets{Min: 151, Max: 200}),
49, // priority is total remaining: 200-151
),
Expand Down Expand Up @@ -95,19 +103,19 @@ func TestRecordCountPlanner_Plan(t *testing.T) {
Partition: 1,
},
},
expectedJobs: []*JobWithPriority[int]{
NewJobWithPriority(
expectedJobs: []*JobWithMetadata{
NewJobWithMetadata(
types.NewJob(0, types.Offsets{Min: 101, Max: 150}),
49, // priority is total remaining: 150-101
),
NewJobWithMetadata(
types.NewJob(1, types.Offsets{Min: 201, Max: 301}),
199, // priority is total remaining: 400-201
),
NewJobWithPriority(
NewJobWithMetadata(
types.NewJob(1, types.Offsets{Min: 301, Max: 400}),
99, // priority is total remaining: 400-301
),
NewJobWithPriority(
types.NewJob(0, types.Offsets{Min: 101, Max: 150}),
49, // priority is total remaining: 150-101
),
},
},
{
Expand Down Expand Up @@ -145,7 +153,9 @@ func TestRecordCountPlanner_Plan(t *testing.T) {
require.NoError(t, err)

require.Equal(t, len(tc.expectedJobs), len(jobs))
require.ElementsMatch(t, tc.expectedJobs, jobs)
for i := range tc.expectedJobs {
compareJobs(t, tc.expectedJobs[i], jobs[i])
}
})
}
}