Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
key lookups for priority queue
  • Loading branch information
owen-d committed Dec 10, 2024
commit fe412860cdac0992b6388a8bd0ccb3f3670b1e36
81 changes: 70 additions & 11 deletions pkg/blockbuilder/scheduler/prioritiy_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
)

func TestPriorityQueue(t *testing.T) {
t.Run("operations", func(t *testing.T) {
t.Run("basic operations", func(t *testing.T) {
tests := []struct {
name string
input []int
Expand All @@ -33,16 +33,14 @@ func TestPriorityQueue(t *testing.T) {
input: []int{3, 1, 2},
wantPops: []int{1, 2, 3},
},
{
name: "duplicate elements",
input: []int{2, 1, 2, 1},
wantPops: []int{1, 1, 2, 2},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
pq := NewPriorityQueue[int](func(a, b int) bool { return a < b })
pq := NewPriorityQueue[int, int](
func(a, b int) bool { return a < b },
func(a int) int { return a },
)
require.Equal(t, 0, pq.Len())

// Push all elements
Expand All @@ -69,15 +67,73 @@ func TestPriorityQueue(t *testing.T) {
}
})

t.Run("key operations", func(t *testing.T) {
type Job struct {
ID string
Priority int
}

pq := NewPriorityQueue[Job, string](
func(a, b Job) bool { return a.Priority < b.Priority },
func(j Job) string { return j.ID },
)

// Test Push with duplicate key
job1 := Job{ID: "job1", Priority: 1}
job1Updated := Job{ID: "job1", Priority: 3}
job2 := Job{ID: "job2", Priority: 2}

pq.Push(job1)
require.Equal(t, 1, pq.Len())

// Push with same key should update
pq.Push(job1Updated)
require.Equal(t, 1, pq.Len())

// Verify updated priority
v, ok := pq.Lookup("job1")
require.True(t, ok)
require.Equal(t, job1Updated, v)

// Test Remove
pq.Push(job2)
v, ok = pq.Remove("job1")
require.True(t, ok)
require.Equal(t, job1Updated, v)
require.Equal(t, 1, pq.Len())

// Test UpdatePriority
newJob2 := Job{ID: "job2", Priority: 4}
ok = pq.UpdatePriority("job2", newJob2)
require.True(t, ok)

v, ok = pq.Lookup("job2")
require.True(t, ok)
require.Equal(t, newJob2, v)

// Test non-existent key operations
v, ok = pq.Lookup("nonexistent")
require.False(t, ok)
require.Zero(t, v)

v, ok = pq.Remove("nonexistent")
require.False(t, ok)
require.Zero(t, v)

ok = pq.UpdatePriority("nonexistent", Job{})
require.False(t, ok)
})

t.Run("custom type", func(t *testing.T) {
type Job struct {
ID string
Priority int
}

pq := NewPriorityQueue[Job](func(a, b Job) bool {
return a.Priority < b.Priority
})
pq := NewPriorityQueue[Job, string](
func(a, b Job) bool { return a.Priority < b.Priority },
func(j Job) string { return j.ID },
)

jobs := []Job{
{ID: "high", Priority: 3},
Expand All @@ -102,7 +158,10 @@ func TestPriorityQueue(t *testing.T) {
})

t.Run("mixed operations", func(t *testing.T) {
pq := NewPriorityQueue[int](func(a, b int) bool { return a < b })
pq := NewPriorityQueue[int, int](
func(a, b int) bool { return a < b },
func(a int) int { return a },
)

// Push some elements
pq.Push(3)
Expand Down
99 changes: 83 additions & 16 deletions pkg/blockbuilder/scheduler/priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,68 +4,135 @@ import (
"container/heap"
)

// PriorityQueue is a generic priority queue.
type PriorityQueue[T any] struct {
h *priorityHeap[T]
// PriorityQueue is a generic priority queue with constant time lookups.
type PriorityQueue[T any, K comparable] struct {
h *priorityHeap[T]
m map[K]*item[T] // Map for constant time lookups
key func(T) K // Function to extract key from item
}

// item represents an item in the priority queue with its index
type item[T any] struct {
value T
index int
}

// NewPriorityQueue creates a new priority queue.
func NewPriorityQueue[T any](less func(T, T) bool) *PriorityQueue[T] {
func NewPriorityQueue[T any, K comparable](less func(T, T) bool, key func(T) K) *PriorityQueue[T, K] {
h := &priorityHeap[T]{
less: less,
heap: make([]T, 0),
heap: make([]*item[T], 0),
idx: make(map[int]*item[T]),
}
heap.Init(h)
return &PriorityQueue[T]{h: h}
return &PriorityQueue[T, K]{
h: h,
m: make(map[K]*item[T]),
key: key,
}
}

// Push adds an element to the queue.
func (pq *PriorityQueue[T]) Push(v T) {
heap.Push(pq.h, v)
func (pq *PriorityQueue[T, K]) Push(v T) {
k := pq.key(v)
if existing, ok := pq.m[k]; ok {
// Update existing item's value and fix heap
existing.value = v
heap.Fix(pq.h, existing.index)
return
}

// Add new item
idx := pq.h.Len()
it := &item[T]{value: v, index: idx}
pq.m[k] = it
heap.Push(pq.h, it)
}

// Pop removes and returns the element with the highest priority from the queue.
func (pq *PriorityQueue[T]) Pop() (T, bool) {
func (pq *PriorityQueue[T, K]) Pop() (T, bool) {
if pq.Len() == 0 {
var zero T
return zero, false
}
return heap.Pop(pq.h).(T), true
it := heap.Pop(pq.h).(*item[T])
delete(pq.m, pq.key(it.value))
return it.value, true
}

// Lookup returns the item with the given key if it exists.
func (pq *PriorityQueue[T, K]) Lookup(k K) (T, bool) {
if it, ok := pq.m[k]; ok {
return it.value, true
}
var zero T
return zero, false
}

// Remove removes and returns the item with the given key if it exists.
func (pq *PriorityQueue[T, K]) Remove(k K) (T, bool) {
it, ok := pq.m[k]
if !ok {
var zero T
return zero, false
}
heap.Remove(pq.h, it.index)
delete(pq.m, k)
return it.value, true
}

// UpdatePriority updates the priority of an item and reorders the queue.
func (pq *PriorityQueue[T, K]) UpdatePriority(k K, v T) bool {
if it, ok := pq.m[k]; ok {
it.value = v
heap.Fix(pq.h, it.index)
return true
}
return false
}

// Len returns the number of elements in the queue.
func (pq *PriorityQueue[T]) Len() int {
func (pq *PriorityQueue[T, K]) Len() int {
return pq.h.Len()
}

// priorityHeap is the internal heap implementation that satisfies heap.Interface.
type priorityHeap[T any] struct {
less func(T, T) bool
heap []T
heap []*item[T]
idx map[int]*item[T] // Maps index to item for efficient updates
}

func (h *priorityHeap[T]) Len() int {
return len(h.heap)
}

func (h *priorityHeap[T]) Less(i, j int) bool {
return h.less(h.heap[i], h.heap[j])
return h.less(h.heap[i].value, h.heap[j].value)
}

func (h *priorityHeap[T]) Swap(i, j int) {
h.heap[i], h.heap[j] = h.heap[j], h.heap[i]
h.heap[i].index = i
h.heap[j].index = j
h.idx[i] = h.heap[i]
h.idx[j] = h.heap[j]
}

func (h *priorityHeap[T]) Push(x any) {
h.heap = append(h.heap, x.(T))
it := x.(*item[T])
it.index = len(h.heap)
h.heap = append(h.heap, it)
h.idx[it.index] = it
}

func (h *priorityHeap[T]) Pop() any {
old := h.heap
n := len(old)
x := old[n-1]
it := old[n-1]
h.heap = old[0 : n-1]
return x
delete(h.idx, it.index)
return it
}

// CircularBuffer is a generic circular buffer.
Expand Down
19 changes: 12 additions & 7 deletions pkg/blockbuilder/scheduler/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,24 @@ func (j *inProgressJob) Duration() time.Duration {

// JobQueue manages the queue of pending jobs and tracks their state.
type JobQueue struct {
pending *PriorityQueue[*JobWithPriority[int]] // Jobs waiting to be processed, ordered by priority
inProgress map[string]*inProgressJob // Jobs currently being processed, key is job ID
completed *CircularBuffer[*types.Job] // Last N completed jobs
statusMap map[string]types.JobStatus // Maps job ID to its current status
pending *PriorityQueue[*JobWithPriority[int], string] // Jobs waiting to be processed, ordered by priority
inProgress map[string]*inProgressJob // Jobs currently being processed, key is job ID
completed *CircularBuffer[*types.Job] // Last N completed jobs
statusMap map[string]types.JobStatus // Maps job ID to its current status
mu sync.RWMutex
}

// NewJobQueue creates a new job queue instance
func NewJobQueue() *JobQueue {
return &JobQueue{
pending: NewPriorityQueue[*JobWithPriority[int]](func(a, b *JobWithPriority[int]) bool {
return a.Priority > b.Priority // Higher priority first
}),
pending: NewPriorityQueue[*JobWithPriority[int]](
func(a, b *JobWithPriority[int]) bool {
return a.Priority > b.Priority // Higher priority first
},
func(a *JobWithPriority[int]) string {
return a.Job.ID
},
),
inProgress: make(map[string]*inProgressJob),
completed: NewCircularBuffer[*types.Job](defaultCompletedJobsCapacity),
statusMap: make(map[string]types.JobStatus),
Expand Down
10 changes: 8 additions & 2 deletions pkg/blockbuilder/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ func (s *BlockScheduler) runOnce(ctx context.Context) error {
// TODO: end offset keeps moving each time we plan jobs, maybe we should not use it as part of the job ID
if status, ok := s.queue.Exists(job.Job); ok {
level.Debug(s.logger).Log("msg", "job already exists", "job", job, "status", status)
// TODO: update priority
continue
}

Expand Down Expand Up @@ -174,8 +175,13 @@ func (s *BlockScheduler) HandleGetJob(ctx context.Context, builderID string) (*t
}
}

func (s *BlockScheduler) HandleCompleteJob(_ context.Context, _ string, job *types.Job, _ bool) error {
// TODO: handle commits
func (s *BlockScheduler) HandleCompleteJob(_ context.Context, _ string, job *types.Job, success bool) error {
logger := log.With(s.logger, "job", job.ID)

if !success {
level.Error(logger).Log("msg", "job failed")
return nil
}
s.queue.MarkComplete(job.ID)
return nil
}
Expand Down