Skip to content
Prev Previous commit
Next Next commit
naive memory impls
  • Loading branch information
owen-d committed Dec 2, 2024
commit 0efd9ca65863db5bce44d2863a61e1ba08b0651f
76 changes: 76 additions & 0 deletions pkg/blockbuilder/builder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package blockbuilder

import (
"context"
"fmt"
"sync"
)

// TestBuilder implements Worker for testing
type TestBuilder struct {
ID string
transport Transport
mu sync.Mutex
jobs map[string]*Job // Track jobs this builder has processed
}

// NewTestBuilder creates a new test builder instance
func NewTestBuilder(id string, transport Transport) *TestBuilder {
return &TestBuilder{
ID: id,
transport: transport,
jobs: make(map[string]*Job),
}
}

func (b *TestBuilder) GetJob(ctx context.Context) (*Job, bool, error) {
resp, err := b.transport.SendGetJobRequest(ctx, &GetJobRequest{
BuilderID: b.ID,
})
if err != nil {
return nil, false, err
}
if !resp.OK {
return nil, false, nil
}

b.mu.Lock()
b.jobs[resp.Job.ID] = resp.Job
b.mu.Unlock()

return resp.Job, true, nil
}

func (b *TestBuilder) CompleteJob(ctx context.Context, job *Job) error {
b.mu.Lock()
defer b.mu.Unlock()

if _, exists := b.jobs[job.ID]; !exists {
return fmt.Errorf("job %s not found in builder %s", job.ID, b.ID)
}

err := b.transport.SendCompleteJob(ctx, &CompleteJobRequest{
BuilderID: b.ID,
Job: job,
})
if err != nil {
return err
}

delete(b.jobs, job.ID)
return nil
}

func (b *TestBuilder) SyncJob(ctx context.Context, job *Job) error {
b.mu.Lock()
defer b.mu.Unlock()

if _, exists := b.jobs[job.ID]; !exists {
return fmt.Errorf("job %s not found in builder %s", job.ID, b.ID)
}

return b.transport.SendSyncJob(ctx, &SyncJobRequest{
BuilderID: b.ID,
Job: job,
})
}
20 changes: 9 additions & 11 deletions pkg/blockbuilder/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,16 +167,16 @@ func (l *PartitionJobController) Process(ctx context.Context, offsets Offsets, c

// LoadJob(ctx) returns the next job by finding the most recent unconsumed offset in the partition
// Returns whether an applicable job exists, the job, and an error
func (l *PartitionJobController) LoadJob(ctx context.Context) (bool, Job, error) {
func (l *PartitionJobController) LoadJob(ctx context.Context) (bool, *Job, error) {
// Read the most recent committed offset
committedOffset, err := l.HighestCommittedOffset(ctx)
if err != nil {
return false, Job{}, err
return false, nil, err
}

earliestOffset, err := l.EarliestPartitionOffset(ctx)
if err != nil {
return false, Job{}, err
return false, nil, err
}

startOffset := committedOffset + 1
Expand All @@ -186,21 +186,19 @@ func (l *PartitionJobController) LoadJob(ctx context.Context) (bool, Job, error)

highestOffset, err := l.HighestPartitionOffset(ctx)
if err != nil {
return false, Job{}, err
return false, nil, err
}
if highestOffset == committedOffset {
return false, Job{}, nil
return false, nil, nil
}

// Create the job with the calculated offsets
job := Job{
Partition: l.part.Partition(),
Offsets: Offsets{
Min: startOffset,
Max: min(startOffset+l.stepLen, highestOffset),
},
offsets := Offsets{
Min: startOffset,
Max: min(startOffset+l.stepLen, highestOffset),
}

job := NewJob(l.part.Partition(), offsets)
return true, job, nil
}

Expand Down
17 changes: 17 additions & 0 deletions pkg/blockbuilder/job.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package blockbuilder

import "fmt"

// Job represents a unit of work with a specific Kafka partition and offset range.
type Job struct {
ID string
Expand All @@ -8,6 +10,21 @@ type Job struct {
Status JobStatus
}

// NewJob creates a new Job with a deterministic ID based on partition and offsets
func NewJob(partition int32, offsets Offsets) *Job {
return &Job{
ID: GenerateJobID(partition, offsets),
Partition: partition,
Offsets: offsets,
Status: JobStatusPending,
}
}

// GenerateJobID creates a deterministic ID from partition and offsets
func GenerateJobID(partition int32, offsets Offsets) string {
return fmt.Sprintf("job-%d-%d-%d", partition, offsets.Min, offsets.Max)
}

// JobStatus represents the current state of a job
type JobStatus int

Expand Down
119 changes: 108 additions & 11 deletions pkg/blockbuilder/queue.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,120 @@
package blockbuilder

// JobQueue manages the queue of pending jobs.
import (
"fmt"
"sync"
)

// JobQueue manages the queue of pending jobs and tracks their state.
type JobQueue struct {
// Add necessary fields here
pending map[string]*Job // Jobs waiting to be processed, key is job ID
inProgress map[string]*jobAssignment // job ID -> assignment info
completed map[string]*Job // Completed jobs, key is job ID
mu sync.RWMutex
}

type jobAssignment struct {
builderID string
job *Job
}

// NewJobQueue creates a new JobQueue instance.
// NewJobQueue creates a new job queue instance
func NewJobQueue() *JobQueue {
return &JobQueue{}
return &JobQueue{
pending: make(map[string]*Job),
inProgress: make(map[string]*jobAssignment),
completed: make(map[string]*Job),
}
}

// Enqueue adds a job to the queue.
func (q *JobQueue) Enqueue(job Job) error {
// Implementation goes here
// Enqueue adds a new job to the pending queue
func (q *JobQueue) Enqueue(job *Job) error {
q.mu.Lock()
defer q.mu.Unlock()

if _, exists := q.pending[job.ID]; exists {
return fmt.Errorf("job %s already exists in pending queue", job.ID)
}
if _, exists := q.inProgress[job.ID]; exists {
return fmt.Errorf("job %s already exists in progress", job.ID)
}
if _, exists := q.completed[job.ID]; exists {
return fmt.Errorf("job %s already completed", job.ID)
}

job.Status = JobStatusPending
q.pending[job.ID] = job
return nil
}

// Dequeue removes a job from the queue.
func (q *JobQueue) Dequeue() (Job, error) {
// Implementation goes here
return Job{}, nil
// Dequeue gets the next available job and assigns it to a builder
func (q *JobQueue) Dequeue(builderID string) (*Job, bool, error) {
if builderID == "" {
return nil, false, fmt.Errorf("builder ID cannot be empty")
}

q.mu.Lock()
defer q.mu.Unlock()

// Get first available pending job
var jobID string
var job *Job
for id, j := range q.pending {
jobID = id
job = j
break
}

if job == nil {
return nil, false, nil
}

// Move job to in progress
delete(q.pending, jobID)
job.Status = JobStatusInProgress
q.inProgress[jobID] = &jobAssignment{
builderID: builderID,
job: job,
}

return job, true, nil
}

// MarkComplete moves a job from in progress to completed
func (q *JobQueue) MarkComplete(jobID string, builderID string) error {
q.mu.Lock()
defer q.mu.Unlock()

assignment, exists := q.inProgress[jobID]
if !exists {
return fmt.Errorf("job %s not found in progress", jobID)
}
if assignment.builderID != builderID {
return fmt.Errorf("job %s is assigned to builder %s, not %s", jobID, assignment.builderID, builderID)
}

job := assignment.job
delete(q.inProgress, jobID)
job.Status = JobStatusComplete
q.completed[jobID] = job

return nil
}

// SyncJob updates the state of an in-progress job
func (q *JobQueue) SyncJob(jobID string, builderID string, job *Job) error {
q.mu.Lock()
defer q.mu.Unlock()

assignment, exists := q.inProgress[jobID]
if !exists {
return fmt.Errorf("job %s not found in progress", jobID)
}
if assignment.builderID != builderID {
return fmt.Errorf("job %s is assigned to builder %s, not %s", jobID, assignment.builderID, builderID)
}

// Update job state
assignment.job = job
return nil
}
55 changes: 38 additions & 17 deletions pkg/blockbuilder/scheduler.go
Original file line number Diff line number Diff line change
@@ -1,39 +1,60 @@
package blockbuilder

import (
"context"
)

// Scheduler interface defines the methods for scheduling jobs and managing worker pools.
type Scheduler interface {
ScheduleJob(job Job) error
DispatchJobs() error
AddWorker(worker Worker) error
RemoveWorker(worker Worker) error
// HandleGetJob processes a job request from a block builder
HandleGetJob(ctx context.Context, builderID string) (*Job, bool, error)
// HandleCompleteJob processes a job completion notification
HandleCompleteJob(ctx context.Context, builderID string, job *Job) error
// HandleSyncJob processes a job sync request
HandleSyncJob(ctx context.Context, builderID string, job *Job) error
}

// unimplementedScheduler provides default implementations for the Scheduler interface.
// unimplementedScheduler provides default implementations that panic.
type unimplementedScheduler struct{}

func (u *unimplementedScheduler) ScheduleJob(job Job) error {
func (s *unimplementedScheduler) HandleGetJob(ctx context.Context, builderID string) (*Job, bool, error) {
panic("unimplemented")
}

func (u *unimplementedScheduler) DispatchJobs() error {
func (s *unimplementedScheduler) HandleCompleteJob(ctx context.Context, builderID string, job *Job) error {
panic("unimplemented")
}

func (u *unimplementedScheduler) AddWorker(worker Worker) error {
func (s *unimplementedScheduler) HandleSyncJob(ctx context.Context, builderID string, job *Job) error {
panic("unimplemented")
}

func (u *unimplementedScheduler) RemoveWorker(worker Worker) error {
panic("unimplemented")
}

// SchedulerImpl is the implementation of the Scheduler interface.
// SchedulerImpl implements the Scheduler interface
type SchedulerImpl struct {
unimplementedScheduler
// Add necessary fields here
queue *JobQueue
}

// NewScheduler creates a new scheduler instance
func NewScheduler(queue *JobQueue) *SchedulerImpl {
return &SchedulerImpl{
queue: queue,
}
}

func (s *SchedulerImpl) HandleGetJob(ctx context.Context, builderID string) (*Job, bool, error) {
select {
case <-ctx.Done():
return nil, false, ctx.Err()
default:
return s.queue.Dequeue(builderID)
}
}

func (s *SchedulerImpl) HandleCompleteJob(ctx context.Context, builderID string, job *Job) error {
return s.queue.MarkComplete(job.ID, builderID)
}

// NewScheduler creates a new Scheduler instance.
func NewScheduler() Scheduler {
return &SchedulerImpl{}
func (s *SchedulerImpl) HandleSyncJob(ctx context.Context, builderID string, job *Job) error {
return s.queue.SyncJob(job.ID, builderID, job)
}
Loading