Skip to content
Next Next commit
initial block scheduler scaffolding
  • Loading branch information
owen-d committed Dec 2, 2024
commit 66bcf801d4ef92d3e57c4c631a098977af5e244b
160 changes: 160 additions & 0 deletions pkg/blockbuilder/architecture.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
# Block Builder Architecture

## Overview

The Block Builder and Block Scheduler are separate components designed to build storage formats from ingested Kafka data. The Block Scheduler coordinates job distribution to multiple Block Builder instances, implementing a pull-based architecture that decouples read and write operations, allowing for independent scaling and simpler operational management. This document describes the architecture and interaction between components.

## Component Diagram

```mermaid
graph TB
subgraph Kafka
KP[Kafka Partitions]
end

subgraph Block Scheduler
S[Scheduler]
Q[Job Queue]
PC[Partition Controller]

subgraph Transport Layer
T[gRPC/Transport Interface]
end
end

subgraph Block Builders
BB1[Block Builder 1]
BB2[Block Builder 2]
BB3[Block Builder N]
end

subgraph Storage
OS[Object Storage]
end

KP --> PC
PC --> S
S <--> Q
S <--> T
T <--> BB1
T <--> BB2
T <--> BB3
BB1 --> OS
BB2 --> OS
BB3 --> OS
```

## Job Processing Sequence

```mermaid
sequenceDiagram
participant PC as Partition Controller
participant S as Block Scheduler
participant Q as Queue
participant T as Transport
participant BB as Block Builder
participant OS as Object Storage

loop Monitor Partitions
PC->>PC: Check for new offsets
PC->>S: Create Job (partition, offset range)
S->>Q: Enqueue Job
end

BB->>T: Request Job
T->>S: Forward Request
S->>Q: Dequeue Job
Q-->>S: Return Job (or empty)
alt Has Job
S->>T: Send Job
T->>BB: Forward Job
BB->>OS: Process & Write Data
BB->>T: Report Success
T->>S: Forward Status
S->>PC: Commit Offset
else No Job
S->>T: Send No Job Available
T->>BB: Forward Response
end
```

## Core Components

### Job and Offsets
- `Job`: Represents a unit of work for processing Kafka data
- Contains a partition ID and an offset range
- Immutable data structure that can be safely passed between components
- `Offsets`: Defines a half-open range [min,max) of Kafka offsets to process
- Used to track progress and ensure exactly-once processing

### Block Scheduler
- Central component responsible for:
- Managing the job queue
- Coordinating Block Builder assignments
- Tracking job progress
- Implements a pull-based model where Block Builders request jobs
- Decoupled from specific transport mechanisms through the Transport interface

### Block Builder
- Processes jobs assigned by the Block Scheduler
- Responsible for:
- Building storage formats from Kafka data
- Reporting job status back to Block Scheduler
- Stateless design allows for easy scaling
- Transport-agnostic through the Transport interface

### Transport Layer
- Abstract interface for communication between Block Scheduler and Block Builders
- Default gRPC implementation provided
- Can be replaced with different implementations for testing or different protocols
- Enables decoupling of business logic from I/O operations

### Queue
- Manages pending jobs
- Provides operations for:
- Enqueueing new jobs
- Dequeuing jobs for processing
- Tracking job status

### Partition Controller
- Interfaces with Kafka partitions
- Manages:
- Offset tracking
- Data processing
- Commit operations
- Ensures exactly-once processing of data

## Design Principles

### 1. Decoupled I/O
- Business logic is separated from I/O operations
- Transport interface allows for different communication mechanisms
- Enables easier testing through mock implementations

### 2. Pull-Based Architecture
- Block Builders pull jobs from the Block Scheduler
- Natural backpressure handling
- Improved resource utilization
- Similar to Loki's query-frontend design

### 3. Stateless Block Builders
- Block Builders are designed to be stateless
- Easier to scale and manage
- No disk requirements
- Safe to delete without orphaned data

### 4. Interface-Driven Design
- Core components defined by interfaces
- Default implementations provided
- Easy to extend or mock for testing
- Uses unimplemented variants for interface compliance

### 5. Idempotent Operations
- Jobs can be retried safely
- No data duplication on retries
- Robust error handling

### Independent Scaling
- Block Scheduler and Block Builders can be scaled independently
- Resource allocation based on actual demand
- Improved cost efficiency
10 changes: 0 additions & 10 deletions pkg/blockbuilder/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,6 @@ import (
"github.com/grafana/loki/pkg/push"
)

// [min,max)
type Offsets struct {
Min, Max int64
}

type Job struct {
Partition int32
Offsets Offsets
}

// Interface required for interacting with queue partitions.
type PartitionController interface {
Topic() string
Expand Down
25 changes: 25 additions & 0 deletions pkg/blockbuilder/job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package blockbuilder

// Job represents a unit of work with a specific Kafka partition and offset range.
type Job struct {
ID string
Partition int32
Offsets Offsets
Status JobStatus
}

// JobStatus represents the current state of a job
type JobStatus int

const (
JobStatusPending JobStatus = iota
JobStatusInProgress
JobStatusComplete
JobStatusFailed
)

// Offsets defines a range of offsets with Min and Max values.
// [min,max)
type Offsets struct {
Min, Max int64
}
23 changes: 23 additions & 0 deletions pkg/blockbuilder/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package blockbuilder

// JobQueue manages the queue of pending jobs.
type JobQueue struct {
// Add necessary fields here
}

// NewJobQueue creates a new JobQueue instance.
func NewJobQueue() *JobQueue {
return &JobQueue{}
}

// Enqueue adds a job to the queue.
func (q *JobQueue) Enqueue(job Job) error {
// Implementation goes here
return nil
}

// Dequeue removes a job from the queue.
func (q *JobQueue) Dequeue() (Job, error) {
// Implementation goes here
return Job{}, nil
}
39 changes: 39 additions & 0 deletions pkg/blockbuilder/scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package blockbuilder

// Scheduler interface defines the methods for scheduling jobs and managing worker pools.
type Scheduler interface {
ScheduleJob(job Job) error
DispatchJobs() error
AddWorker(worker Worker) error
RemoveWorker(worker Worker) error
}

// unimplementedScheduler provides default implementations for the Scheduler interface.
type unimplementedScheduler struct{}

func (u *unimplementedScheduler) ScheduleJob(job Job) error {
panic("unimplemented")
}

func (u *unimplementedScheduler) DispatchJobs() error {
panic("unimplemented")
}

func (u *unimplementedScheduler) AddWorker(worker Worker) error {
panic("unimplemented")
}

func (u *unimplementedScheduler) RemoveWorker(worker Worker) error {
panic("unimplemented")
}

// SchedulerImpl is the implementation of the Scheduler interface.
type SchedulerImpl struct {
unimplementedScheduler
// Add necessary fields here
}

// NewScheduler creates a new Scheduler instance.
func NewScheduler() Scheduler {
return &SchedulerImpl{}
}
30 changes: 30 additions & 0 deletions pkg/blockbuilder/transport.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package blockbuilder

// Transport interface defines the methods for communication between scheduler and workers.
type Transport interface {
Send(message interface{}) error
Receive() (interface{}, error)
}

type unimplementedTransport struct{}

func (t *unimplementedTransport) Send(message interface{}) error {
panic("unimplemented")
}

func (t *unimplementedTransport) Receive() (interface{}, error) {
panic("unimplemented")
}

var _ Transport = &GRPCTransport{}

// GRPCTransport is the default implementation of the Transport interface using gRPC.
type GRPCTransport struct {
// Add necessary fields here
unimplementedTransport
}

// NewGRPCTransport creates a new GRPCTransport instance.
func NewGRPCTransport() *GRPCTransport {
return &GRPCTransport{}
}
39 changes: 39 additions & 0 deletions pkg/blockbuilder/worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package blockbuilder

import "context"

// Worker interface defines the methods for processing jobs and reporting status.
type Worker interface {
// GetJob requests a new job from the scheduler
GetJob(ctx context.Context) (*Job, bool, error)
// CompleteJob marks a job as finished
CompleteJob(ctx context.Context, job *Job) error
// SyncJob informs the scheduler about an in-progress job
SyncJob(ctx context.Context, job *Job) error
}

// unimplementedWorker provides default implementations for the Worker interface.
type unimplementedWorker struct{}

func (u *unimplementedWorker) GetJob(ctx context.Context) (*Job, bool, error) {
panic("unimplemented")
}

func (u *unimplementedWorker) CompleteJob(ctx context.Context, job *Job) error {
panic("unimplemented")
}

func (u *unimplementedWorker) SyncJob(ctx context.Context, job *Job) error {
panic("unimplemented")
}

// WorkerImpl is the implementation of the Worker interface.
type WorkerImpl struct {
unimplementedWorker
// Add necessary fields here
}

// NewWorker creates a new Worker instance.
func NewWorker() Worker {
return &WorkerImpl{}
}