Skip to content
Prev Previous commit
Next Next commit
begins splitting blockbuilder pkgs
  • Loading branch information
owen-d committed Dec 2, 2024
commit d731ccefdc24475d38325c3ae9236cc2efbaa05e
54 changes: 54 additions & 0 deletions pkg/blockbuilder/builder/transport.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package builder

import (
"context"

"github.com/grafana/loki/v3/pkg/blockbuilder/types"
)

// unimplementedTransport provides default implementations that panic
type unimplementedTransport struct{}

func (t *unimplementedTransport) SendGetJobRequest(_ context.Context, _ *types.GetJobRequest) (*types.GetJobResponse, error) {
panic("unimplemented")
}

func (t *unimplementedTransport) SendCompleteJob(_ context.Context, _ *types.CompleteJobRequest) error {
panic("unimplemented")
}

func (t *unimplementedTransport) SendSyncJob(_ context.Context, _ *types.SyncJobRequest) error {
panic("unimplemented")
}

// MemoryTransport implements Transport interface for in-memory communication
type MemoryTransport struct {
unimplementedTransport
scheduler types.Scheduler
}

// NewMemoryTransport creates a new in-memory transport instance
func NewMemoryTransport(scheduler types.Scheduler) *MemoryTransport {
return &MemoryTransport{
scheduler: scheduler,
}
}

func (t *MemoryTransport) SendGetJobRequest(ctx context.Context, req *types.GetJobRequest) (*types.GetJobResponse, error) {
job, ok, err := t.scheduler.HandleGetJob(ctx, req.BuilderID)
if err != nil {
return nil, err
}
return &types.GetJobResponse{
Job: job,
OK: ok,
}, nil
}

func (t *MemoryTransport) SendCompleteJob(ctx context.Context, req *types.CompleteJobRequest) error {
return t.scheduler.HandleCompleteJob(ctx, req.BuilderID, req.Job)
}

func (t *MemoryTransport) SendSyncJob(ctx context.Context, req *types.SyncJobRequest) error {
return t.scheduler.HandleSyncJob(ctx, req.BuilderID, req.Job)
}
61 changes: 61 additions & 0 deletions pkg/blockbuilder/builder/worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package builder

import (
"context"

"github.com/grafana/loki/v3/pkg/blockbuilder/types"
)

// unimplementedWorker provides default implementations for the Worker interface.
type unimplementedWorker struct{}

func (u *unimplementedWorker) GetJob(_ context.Context) (*types.Job, bool, error) {
panic("unimplemented")
}

func (u *unimplementedWorker) CompleteJob(_ context.Context, _ *types.Job) error {
panic("unimplemented")
}

func (u *unimplementedWorker) SyncJob(_ context.Context, _ *types.Job) error {
panic("unimplemented")
}

// WorkerImpl is the implementation of the Worker interface.
type WorkerImpl struct {
unimplementedWorker
transport types.Transport
builderID string
}

// NewWorker creates a new Worker instance.
func NewWorker(builderID string, transport types.Transport) *WorkerImpl {
return &WorkerImpl{
transport: transport,
builderID: builderID,
}
}

func (w *WorkerImpl) GetJob(ctx context.Context) (*types.Job, bool, error) {
resp, err := w.transport.SendGetJobRequest(ctx, &types.GetJobRequest{
BuilderID: w.builderID,
})
if err != nil {
return nil, false, err
}
return resp.Job, resp.OK, nil
}

func (w *WorkerImpl) CompleteJob(ctx context.Context, job *types.Job) error {
return w.transport.SendCompleteJob(ctx, &types.CompleteJobRequest{
BuilderID: w.builderID,
Job: job,
})
}

func (w *WorkerImpl) SyncJob(ctx context.Context, job *types.Job) error {
return w.transport.SendSyncJob(ctx, &types.SyncJobRequest{
BuilderID: w.builderID,
Job: job,
})
}
76 changes: 0 additions & 76 deletions pkg/blockbuilder/builder_test.go

This file was deleted.

42 changes: 0 additions & 42 deletions pkg/blockbuilder/job.go

This file was deleted.

77 changes: 31 additions & 46 deletions pkg/blockbuilder/queue.go → pkg/blockbuilder/scheduler/queue.go
Original file line number Diff line number Diff line change
@@ -1,35 +1,38 @@
package blockbuilder
package scheduler

import (
"fmt"
"sync"

"github.com/grafana/loki/v3/pkg/blockbuilder/types"
)

// jobAssignment tracks a job and its assigned builder
type jobAssignment struct {
job *types.Job
builderID string
}

// JobQueue manages the queue of pending jobs and tracks their state.
type JobQueue struct {
pending map[string]*Job // Jobs waiting to be processed, key is job ID
pending map[string]*types.Job // Jobs waiting to be processed, key is job ID
inProgress map[string]*jobAssignment // job ID -> assignment info
completed map[string]*Job // Completed jobs, key is job ID
completed map[string]*types.Job // Completed jobs, key is job ID
mu sync.RWMutex
}

type jobAssignment struct {
builderID string
job *Job
}

// NewJobQueue creates a new job queue instance
func NewJobQueue() *JobQueue {
return &JobQueue{
pending: make(map[string]*Job),
pending: make(map[string]*types.Job),
inProgress: make(map[string]*jobAssignment),
completed: make(map[string]*Job),
completed: make(map[string]*types.Job),
}
}

// Enqueue adds a new job to the pending queue
// This is a naive implementation, intended to be refactored
func (q *JobQueue) Enqueue(job *Job) error {
func (q *JobQueue) Enqueue(job *types.Job) error {
q.mu.Lock()
defer q.mu.Unlock()

Expand All @@ -43,45 +46,29 @@ func (q *JobQueue) Enqueue(job *Job) error {
return fmt.Errorf("job %s already completed", job.ID)
}

job.Status = JobStatusPending
q.pending[job.ID] = job
return nil
}

// Dequeue gets the next available job and assigns it to a builder
func (q *JobQueue) Dequeue(builderID string) (*Job, bool, error) {
if builderID == "" {
return nil, false, fmt.Errorf("builder ID cannot be empty")
}

func (q *JobQueue) Dequeue(builderID string) (*types.Job, bool, error) {
q.mu.Lock()
defer q.mu.Unlock()

// Get first available pending job
var jobID string
var job *Job
for id, j := range q.pending {
jobID = id
job = j
break
// Simple FIFO for now
for id, job := range q.pending {
delete(q.pending, id)
q.inProgress[id] = &jobAssignment{
job: job,
builderID: builderID,
}
return job, true, nil
}

if job == nil {
return nil, false, nil
}

// Move job to in progress
delete(q.pending, jobID)
job.Status = JobStatusInProgress
q.inProgress[jobID] = &jobAssignment{
builderID: builderID,
job: job,
}

return job, true, nil
return nil, false, nil
}

// MarkComplete moves a job from in progress to completed
// MarkComplete moves a job from in-progress to completed
func (q *JobQueue) MarkComplete(jobID string, builderID string) error {
q.mu.Lock()
defer q.mu.Unlock()
Expand All @@ -90,32 +77,30 @@ func (q *JobQueue) MarkComplete(jobID string, builderID string) error {
if !exists {
return fmt.Errorf("job %s not found in progress", jobID)
}

if assignment.builderID != builderID {
return fmt.Errorf("job %s is assigned to builder %s, not %s", jobID, assignment.builderID, builderID)
return fmt.Errorf("job %s not assigned to builder %s", jobID, builderID)
}

job := assignment.job
delete(q.inProgress, jobID)
job.Status = JobStatusComplete
q.completed[jobID] = job

q.completed[jobID] = assignment.job
return nil
}

// SyncJob updates the state of an in-progress job
func (q *JobQueue) SyncJob(jobID string, builderID string, job *Job) error {
func (q *JobQueue) SyncJob(jobID string, builderID string, job *types.Job) error {
q.mu.Lock()
defer q.mu.Unlock()

assignment, exists := q.inProgress[jobID]
if !exists {
return fmt.Errorf("job %s not found in progress", jobID)
}

if assignment.builderID != builderID {
return fmt.Errorf("job %s is assigned to builder %s, not %s", jobID, assignment.builderID, builderID)
return fmt.Errorf("job %s not assigned to builder %s", jobID, builderID)
}

// Update job state
assignment.job = job
return nil
}
Loading