Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
priorityqueue uses expected k/v annotations, works on scheduler handler
  • Loading branch information
owen-d committed Dec 10, 2024
commit f493c535518152d00dda67bfbf95e9eeb5ff45bf
20 changes: 10 additions & 10 deletions pkg/blockbuilder/scheduler/prioritiy_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestPriorityQueue(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
pq := NewPriorityQueue[int, int](
func(a, b int) bool { return a < b },
func(a int) int { return a },
func(v int) int { return v },
)
require.Equal(t, 0, pq.Len())

Expand Down Expand Up @@ -73,7 +73,7 @@ func TestPriorityQueue(t *testing.T) {
Priority int
}

pq := NewPriorityQueue[Job, string](
pq := NewPriorityQueue[string, Job](
func(a, b Job) bool { return a.Priority < b.Priority },
func(j Job) string { return j.ID },
)
Expand Down Expand Up @@ -130,7 +130,7 @@ func TestPriorityQueue(t *testing.T) {
Priority int
}

pq := NewPriorityQueue[Job, string](
pq := NewPriorityQueue[string, Job](
func(a, b Job) bool { return a.Priority < b.Priority },
func(j Job) string { return j.ID },
)
Expand Down Expand Up @@ -160,26 +160,26 @@ func TestPriorityQueue(t *testing.T) {
t.Run("mixed operations", func(t *testing.T) {
pq := NewPriorityQueue[int, int](
func(a, b int) bool { return a < b },
func(a int) int { return a },
func(v int) int { return v },
)

// Push some elements
pq.Push(3)
pq.Push(1)
require.Equal(t, 2, pq.Len())
pq.Push(4)

// Pop lowest
// Pop an element
v, ok := pq.Pop()
require.True(t, ok)
require.Equal(t, 1, v)

// Push more elements
pq.Push(2)
pq.Push(4)
pq.Push(5)

// Verify remaining elements come out in order
want := []int{2, 3, 4}
got := make([]int, 0, 3)
// Pop remaining elements and verify order
want := []int{2, 3, 4, 5}
got := make([]int, 0, len(want))
for range want {
v, ok := pq.Pop()
require.True(t, ok)
Expand Down
86 changes: 43 additions & 43 deletions pkg/blockbuilder/scheduler/priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,35 +5,35 @@ import (
)

// PriorityQueue is a generic priority queue with constant time lookups.
type PriorityQueue[T any, K comparable] struct {
h *priorityHeap[T]
m map[K]*item[T] // Map for constant time lookups
key func(T) K // Function to extract key from item
type PriorityQueue[K comparable, V any] struct {
h *priorityHeap[V]
m map[K]*item[V] // Map for constant time lookups
key func(V) K // Function to extract key from value
}

// item represents an item in the priority queue with its index
type item[T any] struct {
value T
type item[V any] struct {
value V
index int
}

// NewPriorityQueue creates a new priority queue.
func NewPriorityQueue[T any, K comparable](less func(T, T) bool, key func(T) K) *PriorityQueue[T, K] {
h := &priorityHeap[T]{
func NewPriorityQueue[K comparable, V any](less func(V, V) bool, key func(V) K) *PriorityQueue[K, V] {
h := &priorityHeap[V]{
less: less,
heap: make([]*item[T], 0),
idx: make(map[int]*item[T]),
heap: make([]*item[V], 0),
idx: make(map[int]*item[V]),
}
heap.Init(h)
return &PriorityQueue[T, K]{
return &PriorityQueue[K, V]{
h: h,
m: make(map[K]*item[T]),
m: make(map[K]*item[V]),
key: key,
}
}

// Push adds an element to the queue.
func (pq *PriorityQueue[T, K]) Push(v T) {
func (pq *PriorityQueue[K, V]) Push(v V) {
k := pq.key(v)
if existing, ok := pq.m[k]; ok {
// Update existing item's value and fix heap
Expand All @@ -44,36 +44,36 @@ func (pq *PriorityQueue[T, K]) Push(v T) {

// Add new item
idx := pq.h.Len()
it := &item[T]{value: v, index: idx}
it := &item[V]{value: v, index: idx}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a bit of duplication, we update the index of the item here and within the heap's Push() method.

pq.m[k] = it
heap.Push(pq.h, it)
}

// Pop removes and returns the element with the highest priority from the queue.
func (pq *PriorityQueue[T, K]) Pop() (T, bool) {
func (pq *PriorityQueue[K, V]) Pop() (V, bool) {
if pq.Len() == 0 {
var zero T
var zero V
return zero, false
}
it := heap.Pop(pq.h).(*item[T])
it := heap.Pop(pq.h).(*item[V])
delete(pq.m, pq.key(it.value))
return it.value, true
}

// Lookup returns the item with the given key if it exists.
func (pq *PriorityQueue[T, K]) Lookup(k K) (T, bool) {
func (pq *PriorityQueue[K, V]) Lookup(k K) (V, bool) {
if it, ok := pq.m[k]; ok {
return it.value, true
}
var zero T
var zero V
return zero, false
}

// Remove removes and returns the item with the given key if it exists.
func (pq *PriorityQueue[T, K]) Remove(k K) (T, bool) {
func (pq *PriorityQueue[K, V]) Remove(k K) (V, bool) {
it, ok := pq.m[k]
if !ok {
var zero T
var zero V
return zero, false
}
heap.Remove(pq.h, it.index)
Expand All @@ -82,7 +82,7 @@ func (pq *PriorityQueue[T, K]) Remove(k K) (T, bool) {
}

// UpdatePriority updates the priority of an item and reorders the queue.
func (pq *PriorityQueue[T, K]) UpdatePriority(k K, v T) bool {
func (pq *PriorityQueue[K, V]) UpdatePriority(k K, v V) bool {
if it, ok := pq.m[k]; ok {
it.value = v
heap.Fix(pq.h, it.index)
Expand All @@ -92,41 +92,41 @@ func (pq *PriorityQueue[T, K]) UpdatePriority(k K, v T) bool {
}

// Len returns the number of elements in the queue.
func (pq *PriorityQueue[T, K]) Len() int {
func (pq *PriorityQueue[K, V]) Len() int {
return pq.h.Len()
}

// priorityHeap is the internal heap implementation that satisfies heap.Interface.
type priorityHeap[T any] struct {
less func(T, T) bool
heap []*item[T]
idx map[int]*item[T] // Maps index to item for efficient updates
type priorityHeap[V any] struct {
less func(V, V) bool
heap []*item[V]
idx map[int]*item[V] // Maps index to item for efficient updates
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we use this for lookups? i don't see any references

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good call 👍

}

func (h *priorityHeap[T]) Len() int {
func (h *priorityHeap[V]) Len() int {
return len(h.heap)
}

func (h *priorityHeap[T]) Less(i, j int) bool {
func (h *priorityHeap[V]) Less(i, j int) bool {
return h.less(h.heap[i].value, h.heap[j].value)
}

func (h *priorityHeap[T]) Swap(i, j int) {
func (h *priorityHeap[V]) Swap(i, j int) {
h.heap[i], h.heap[j] = h.heap[j], h.heap[i]
h.heap[i].index = i
h.heap[j].index = j
h.idx[i] = h.heap[i]
h.idx[j] = h.heap[j]
}

func (h *priorityHeap[T]) Push(x any) {
it := x.(*item[T])
func (h *priorityHeap[V]) Push(x any) {
it := x.(*item[V])
it.index = len(h.heap)
h.heap = append(h.heap, it)
h.idx[it.index] = it
}

func (h *priorityHeap[T]) Pop() any {
func (h *priorityHeap[V]) Pop() any {
old := h.heap
n := len(old)
it := old[n-1]
Expand All @@ -136,26 +136,26 @@ func (h *priorityHeap[T]) Pop() any {
}

// CircularBuffer is a generic circular buffer.
type CircularBuffer[T any] struct {
buffer []T
type CircularBuffer[V any] struct {
buffer []V
size int
head int
tail int
}

// NewCircularBuffer creates a new circular buffer with the given capacity.
func NewCircularBuffer[T any](capacity int) *CircularBuffer[T] {
return &CircularBuffer[T]{
buffer: make([]T, capacity),
func NewCircularBuffer[V any](capacity int) *CircularBuffer[V] {
return &CircularBuffer[V]{
buffer: make([]V, capacity),
size: 0,
head: 0,
tail: 0,
}
}

// Push adds an element to the circular buffer and returns the evicted element if any
func (b *CircularBuffer[T]) Push(v T) (T, bool) {
var evicted T
func (b *CircularBuffer[V]) Push(v V) (V, bool) {
var evicted V
hasEvicted := false

if b.size == len(b.buffer) {
Expand All @@ -174,9 +174,9 @@ func (b *CircularBuffer[T]) Push(v T) (T, bool) {
}

// Pop removes and returns the oldest element from the buffer
func (b *CircularBuffer[T]) Pop() (T, bool) {
func (b *CircularBuffer[V]) Pop() (V, bool) {
if b.size == 0 {
var zero T
var zero V
return zero, false
}

Expand All @@ -188,6 +188,6 @@ func (b *CircularBuffer[T]) Pop() (T, bool) {
}

// Len returns the number of elements in the buffer
func (b *CircularBuffer[T]) Len() int {
func (b *CircularBuffer[V]) Len() int {
return b.size
}
4 changes: 2 additions & 2 deletions pkg/blockbuilder/scheduler/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (j *inProgressJob) Duration() time.Duration {

// JobQueue manages the queue of pending jobs and tracks their state.
type JobQueue struct {
pending *PriorityQueue[*JobWithPriority[int], string] // Jobs waiting to be processed, ordered by priority
pending *PriorityQueue[string, *JobWithPriority[int]] // Jobs waiting to be processed, ordered by priority
inProgress map[string]*inProgressJob // Jobs currently being processed, key is job ID
completed *CircularBuffer[*types.Job] // Last N completed jobs
statusMap map[string]types.JobStatus // Maps job ID to its current status
Expand All @@ -49,7 +49,7 @@ type JobQueue struct {
// NewJobQueue creates a new job queue instance
func NewJobQueue() *JobQueue {
return &JobQueue{
pending: NewPriorityQueue[*JobWithPriority[int]](
pending: NewPriorityQueue(
func(a, b *JobWithPriority[int]) bool {
return a.Priority > b.Priority // Higher priority first
},
Expand Down
56 changes: 50 additions & 6 deletions pkg/blockbuilder/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,15 +144,53 @@ func (s *BlockScheduler) runOnce(ctx context.Context) error {

for _, job := range jobs {
// TODO: end offset keeps moving each time we plan jobs, maybe we should not use it as part of the job ID
if status, ok := s.queue.Exists(job.Job); ok {
level.Debug(s.logger).Log("msg", "job already exists", "job", job, "status", status)
// TODO: update priority

logger := log.With(
s.logger,
"job", job.Job.ID,
"priority", job.Priority,
)

status, ok := s.queue.Exists(job.Job)

// scheduler is unaware of incoming job; enqueue
if !ok {
level.Debug(logger).Log(
"msg", "job does not exist, enqueueing",
)

// enqueue
if err := s.queue.Enqueue(job.Job, job.Priority); err != nil {
level.Error(logger).Log("msg", "failed to enqueue job", "err", err)
}

continue
}

if err := s.queue.Enqueue(job.Job, job.Priority); err != nil {
level.Error(s.logger).Log("msg", "failed to enqueue job", "job", job, "err", err)
// scheduler is aware of incoming job; handling depends on status
switch status {
case types.JobStatusPending:
level.Debug(s.logger).Log(
"msg", "job is pending, updating priority",
"old_priority", job.Priority,
)
s.queue.pending.UpdatePriority(job.Job.ID, job)
case types.JobStatusInProgress:
level.Debug(s.logger).Log(
"msg", "job is in progress, ignoring",
)
case types.JobStatusComplete:
// shouldn't happen
level.Debug(s.logger).Log(
"msg", "job is complete, ignoring",
)
default:
level.Error(s.logger).Log(
"msg", "job has unknown status, ignoring",
"status", status,
)
}

}

return nil
Expand All @@ -178,8 +216,14 @@ func (s *BlockScheduler) HandleGetJob(ctx context.Context, builderID string) (*t
func (s *BlockScheduler) HandleCompleteJob(_ context.Context, _ string, job *types.Job, success bool) error {
logger := log.With(s.logger, "job", job.ID)

status, exists := s.queue.Exists(job)
if !exists {
level.Error(logger).Log("msg", "cannot complete job, job does not exist")
return nil
}

if !success {
level.Error(logger).Log("msg", "job failed")
level.Error(logger).Log("msg", "job failed, re-enqueuing")
return nil
}
s.queue.MarkComplete(job.ID)
Expand Down
4 changes: 4 additions & 0 deletions pkg/blockbuilder/types/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ type Job struct {
Offsets Offsets
}

func (j *Job) GetID() string {
return j.ID
}

// JobStatus represents the current state of a job
type JobStatus int

Expand Down