Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
job fields are private to force constructor & accessor usage
  • Loading branch information
owen-d committed Dec 10, 2024
commit 02155592b4263b196e52ea40aa6e3fdd67906cc2
16 changes: 8 additions & 8 deletions pkg/blockbuilder/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,13 +250,13 @@ func (i *BlockBuilder) runOne(ctx context.Context, workerID string) (bool, error
logger := log.With(
i.logger,
"worker_id", workerID,
"partition", job.Partition,
"job_min_offset", job.Offsets.Min,
"job_max_offset", job.Offsets.Max,
"partition", job.Partition(),
"job_min_offset", job.Offsets().Min,
"job_max_offset", job.Offsets().Max,
)

i.jobsMtx.Lock()
i.inflightJobs[job.ID] = job
i.inflightJobs[job.ID()] = job
i.metrics.inflightJobs.Set(float64(len(i.inflightJobs)))
i.jobsMtx.Unlock()

Expand Down Expand Up @@ -284,7 +284,7 @@ func (i *BlockBuilder) runOne(ctx context.Context, workerID string) (bool, error
}

i.jobsMtx.Lock()
delete(i.inflightJobs, job.ID)
delete(i.inflightJobs, job.ID())
i.metrics.inflightJobs.Set(float64(len(i.inflightJobs)))
i.jobsMtx.Unlock()

Expand Down Expand Up @@ -315,15 +315,15 @@ func (i *BlockBuilder) processJob(ctx context.Context, job *types.Job, logger lo
"load records",
1,
func(ctx context.Context) error {
lastOffset, err = i.loadRecords(ctx, job.Partition, job.Offsets, inputCh)
lastOffset, err = i.loadRecords(ctx, job.Partition(), job.Offsets(), inputCh)
return err
},
func(ctx context.Context) error {
level.Debug(logger).Log(
"msg", "finished loading records",
"ctx_error", ctx.Err(),
"last_offset", lastOffset,
"total_records", lastOffset-job.Offsets.Min,
"total_records", lastOffset-job.Offsets().Min,
)
close(inputCh)
return nil
Expand Down Expand Up @@ -488,7 +488,7 @@ func (i *BlockBuilder) processJob(ctx context.Context, job *types.Job, logger lo
}
}

if lastOffset <= job.Offsets.Min {
if lastOffset <= job.Offsets().Min {
return lastOffset, nil
}

Expand Down
16 changes: 8 additions & 8 deletions pkg/blockbuilder/scheduler/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func NewJobQueue() *JobQueue {
func(a, b *JobWithMetadata) bool {
return a.Priority > b.Priority // Higher priority first
},
func(j *JobWithMetadata) string { return j.ID },
func(j *JobWithMetadata) string { return j.ID() },
),
inProgress: make(map[string]*JobWithMetadata),
completed: NewCircularBuffer[*JobWithMetadata](defaultCompletedJobsCapacity),
Expand All @@ -60,7 +60,7 @@ func (q *JobQueue) Exists(job *types.Job) (types.JobStatus, bool) {
q.mu.RLock()
defer q.mu.RUnlock()

status, ok := q.statusMap[job.ID]
status, ok := q.statusMap[job.ID()]
return status, ok
}

Expand All @@ -70,13 +70,13 @@ func (q *JobQueue) Enqueue(job *types.Job, priority int) error {
defer q.mu.Unlock()

// Check if job already exists
if status, exists := q.statusMap[job.ID]; exists {
return fmt.Errorf("job %s already exists with status %v", job.ID, status)
if status, exists := q.statusMap[job.ID()]; exists {
return fmt.Errorf("job %s already exists with status %v", job.ID(), status)
}

jobMeta := NewJobWithMetadata(job, priority)
q.pending.Push(jobMeta)
q.statusMap[job.ID] = types.JobStatusPending
q.statusMap[job.ID()] = types.JobStatusPending
return nil
}

Expand All @@ -95,8 +95,8 @@ func (q *JobQueue) Dequeue() (*types.Job, bool) {
jobMeta.StartTime = time.Now()
jobMeta.UpdateTime = jobMeta.StartTime

q.inProgress[jobMeta.ID] = jobMeta
q.statusMap[jobMeta.ID] = types.JobStatusInProgress
q.inProgress[jobMeta.ID()] = jobMeta
q.statusMap[jobMeta.ID()] = types.JobStatusInProgress

return jobMeta.Job, true
}
Expand Down Expand Up @@ -137,7 +137,7 @@ func (q *JobQueue) MarkComplete(id string, status types.JobStatus) {
// Add to completed buffer
if old, evicted := q.completed.Push(jobMeta); evicted {
// If the buffer is full, evict the oldest job and remove it from the status map to avoid leaks
delete(q.statusMap, old.ID)
delete(q.statusMap, old.ID())
}

// Update status map and clean up
Expand Down
12 changes: 6 additions & 6 deletions pkg/blockbuilder/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (s *BlockScheduler) runOnce(ctx context.Context) error {

logger := log.With(
s.logger,
"job", job.Job.ID,
"job", job.Job.ID(),
"priority", job.Priority,
)

Expand All @@ -174,7 +174,7 @@ func (s *BlockScheduler) runOnce(ctx context.Context) error {
"msg", "job is pending, updating priority",
"old_priority", job.Priority,
)
s.queue.pending.UpdatePriority(job.Job.ID, job)
s.queue.pending.UpdatePriority(job.Job.ID(), job)
case types.JobStatusInProgress:
level.Debug(s.logger).Log(
"msg", "job is in progress, ignoring",
Expand Down Expand Up @@ -215,20 +215,20 @@ func (s *BlockScheduler) HandleGetJob(ctx context.Context, _ string) (*types.Job
}

func (s *BlockScheduler) HandleCompleteJob(_ context.Context, _ string, job *types.Job, success bool) error {
logger := log.With(s.logger, "job", job.ID)
logger := log.With(s.logger, "job", job.ID())

if success {
level.Info(logger).Log("msg", "job completed successfully")
s.queue.MarkComplete(job.ID, types.JobStatusComplete)
s.queue.MarkComplete(job.ID(), types.JobStatusComplete)
return nil
}

level.Error(logger).Log("msg", "job failed, re-enqueuing")
s.queue.MarkComplete(job.ID, types.JobStatusFailed)
s.queue.MarkComplete(job.ID(), types.JobStatusFailed)
return nil
}

func (s *BlockScheduler) HandleSyncJob(_ context.Context, builderID string, job *types.Job) error {
s.queue.SyncJob(job.ID, builderID, job)
s.queue.SyncJob(job.ID(), builderID, job)
return nil
}
6 changes: 3 additions & 3 deletions pkg/blockbuilder/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ func TestScheduleAndProcessJob(t *testing.T) {
if !ok {
t.Fatal("expected to receive job")
}
if receivedJob.ID != job.ID {
t.Errorf("got job ID %s, want %s", receivedJob.ID, job.ID)
if receivedJob.ID() != job.ID() {
t.Errorf("got job ID %s, want %s", receivedJob.ID(), job.ID())
}

// Builder completes job
Expand Down Expand Up @@ -124,7 +124,7 @@ func TestMultipleBuilders(t *testing.T) {
}

// Verify different jobs were assigned
if receivedJob1.ID == receivedJob2.ID {
if receivedJob1.ID() == receivedJob2.ID() {
t.Error("builders received same job")
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/blockbuilder/scheduler/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ func (p *RecordCountPlanner) Plan(ctx context.Context) ([]*JobWithMetadata, erro

// Sort jobs by partition then priority
sort.Slice(jobs, func(i, j int) bool {
if jobs[i].Job.Partition != jobs[j].Job.Partition {
return jobs[i].Job.Partition < jobs[j].Job.Partition
if jobs[i].Job.Partition() != jobs[j].Job.Partition() {
return jobs[i].Job.Partition() < jobs[j].Job.Partition()
}
return jobs[i].Priority > jobs[j].Priority
})
Expand Down
14 changes: 7 additions & 7 deletions pkg/blockbuilder/types/grpc_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,9 @@ func protoToJob(p *proto.Job) *Job {
return nil
}
return &Job{
ID: p.GetId(),
Partition: p.GetPartition(),
Offsets: Offsets{
id: p.GetId(),
partition: p.GetPartition(),
offsets: Offsets{
Min: p.GetOffsets().GetMin(),
Max: p.GetOffsets().GetMax(),
},
Expand All @@ -125,11 +125,11 @@ func jobToProto(j *Job) *proto.Job {
return nil
}
return &proto.Job{
Id: j.ID,
Partition: j.Partition,
Id: j.ID(),
Partition: j.Partition(),
Offsets: &proto.Offsets{
Min: j.Offsets.Min,
Max: j.Offsets.Max,
Min: j.offsets.Min,
Max: j.offsets.Max,
},
}
}
28 changes: 18 additions & 10 deletions pkg/blockbuilder/types/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,22 @@ import "fmt"

// Job represents a block building task.
type Job struct {
ID string
id string
// Partition and offset information
Partition int32
Offsets Offsets
partition int32
offsets Offsets
}

func (j *Job) GetID() string {
return j.ID
func (j *Job) ID() string {
return j.id
}

func (j *Job) Partition() int32 {
return j.partition
}

func (j *Job) Offsets() Offsets {
return j.offsets
}

// JobStatus represents the current state of a job
Expand All @@ -21,8 +29,8 @@ const (
JobStatusPending JobStatus = iota
JobStatusInProgress
JobStatusComplete
JobStatusFailed // Job failed and may be retried
JobStatusExpired // Job failed too many times or is too old
JobStatusFailed // Job failed and may be retried
JobStatusExpired // Job failed too many times or is too old
)

func (s JobStatus) String() string {
Expand Down Expand Up @@ -51,9 +59,9 @@ type Offsets struct {
// NewJob creates a new job with the given partition and offsets
func NewJob(partition int32, offsets Offsets) *Job {
return &Job{
ID: GenerateJobID(partition, offsets),
Partition: partition,
Offsets: offsets,
id: GenerateJobID(partition, offsets),
partition: partition,
offsets: offsets,
}
}

Expand Down