Skip to content

Commit 0981273

Browse files
authored
feat(blockbuilder): priority queue for job dispatching (#15245)
1 parent d59a5e2 commit 0981273

File tree

7 files changed

+461
-128
lines changed

7 files changed

+461
-128
lines changed
Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
package scheduler
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/require"
7+
)
8+
9+
func TestPriorityQueue(t *testing.T) {
10+
t.Run("operations", func(t *testing.T) {
11+
tests := []struct {
12+
name string
13+
input []int
14+
wantPops []int
15+
}{
16+
{
17+
name: "empty queue",
18+
input: []int{},
19+
wantPops: []int{},
20+
},
21+
{
22+
name: "single element",
23+
input: []int{1},
24+
wantPops: []int{1},
25+
},
26+
{
27+
name: "multiple elements in order",
28+
input: []int{1, 2, 3},
29+
wantPops: []int{1, 2, 3},
30+
},
31+
{
32+
name: "multiple elements out of order",
33+
input: []int{3, 1, 2},
34+
wantPops: []int{1, 2, 3},
35+
},
36+
{
37+
name: "duplicate elements",
38+
input: []int{2, 1, 2, 1},
39+
wantPops: []int{1, 1, 2, 2},
40+
},
41+
}
42+
43+
for _, tt := range tests {
44+
t.Run(tt.name, func(t *testing.T) {
45+
pq := NewPriorityQueue[int](func(a, b int) bool { return a < b })
46+
require.Equal(t, 0, pq.Len())
47+
48+
// Push all elements
49+
for _, v := range tt.input {
50+
pq.Push(v)
51+
}
52+
require.Equal(t, len(tt.input), pq.Len())
53+
54+
// Pop all elements and verify order
55+
got := make([]int, 0, len(tt.input))
56+
for range tt.input {
57+
v, ok := pq.Pop()
58+
require.True(t, ok)
59+
got = append(got, v)
60+
}
61+
require.Equal(t, tt.wantPops, got)
62+
63+
// Verify empty queue behavior
64+
v, ok := pq.Pop()
65+
require.False(t, ok)
66+
require.Zero(t, v)
67+
require.Equal(t, 0, pq.Len())
68+
})
69+
}
70+
})
71+
72+
t.Run("custom type", func(t *testing.T) {
73+
type Job struct {
74+
ID string
75+
Priority int
76+
}
77+
78+
pq := NewPriorityQueue[Job](func(a, b Job) bool {
79+
return a.Priority < b.Priority
80+
})
81+
82+
jobs := []Job{
83+
{ID: "high", Priority: 3},
84+
{ID: "low", Priority: 1},
85+
{ID: "medium", Priority: 2},
86+
}
87+
88+
// Push all jobs
89+
for _, j := range jobs {
90+
pq.Push(j)
91+
}
92+
93+
// Verify they come out in priority order
94+
want := []string{"low", "medium", "high"}
95+
got := make([]string, 0, len(jobs))
96+
for range jobs {
97+
j, ok := pq.Pop()
98+
require.True(t, ok)
99+
got = append(got, j.ID)
100+
}
101+
require.Equal(t, want, got)
102+
})
103+
104+
t.Run("mixed operations", func(t *testing.T) {
105+
pq := NewPriorityQueue[int](func(a, b int) bool { return a < b })
106+
107+
// Push some elements
108+
pq.Push(3)
109+
pq.Push(1)
110+
require.Equal(t, 2, pq.Len())
111+
112+
// Pop lowest
113+
v, ok := pq.Pop()
114+
require.True(t, ok)
115+
require.Equal(t, 1, v)
116+
117+
// Push more elements
118+
pq.Push(2)
119+
pq.Push(4)
120+
121+
// Verify remaining elements come out in order
122+
want := []int{2, 3, 4}
123+
got := make([]int, 0, 3)
124+
for range want {
125+
v, ok := pq.Pop()
126+
require.True(t, ok)
127+
got = append(got, v)
128+
}
129+
require.Equal(t, want, got)
130+
})
131+
}
132+
133+
func TestCircularBuffer(t *testing.T) {
134+
tests := []struct {
135+
name string
136+
capacity int
137+
input []int
138+
wantPops []int
139+
}{
140+
{
141+
name: "empty buffer",
142+
capacity: 5,
143+
input: []int{},
144+
wantPops: []int{},
145+
},
146+
{
147+
name: "partial fill",
148+
capacity: 5,
149+
input: []int{1, 2, 3},
150+
wantPops: []int{1, 2, 3},
151+
},
152+
{
153+
name: "full buffer",
154+
capacity: 3,
155+
input: []int{1, 2, 3},
156+
wantPops: []int{1, 2, 3},
157+
},
158+
{
159+
name: "overflow buffer",
160+
capacity: 3,
161+
input: []int{1, 2, 3, 4, 5},
162+
wantPops: []int{3, 4, 5},
163+
},
164+
}
165+
166+
for _, tt := range tests {
167+
t.Run(tt.name, func(t *testing.T) {
168+
cb := NewCircularBuffer[int](tt.capacity)
169+
require.Equal(t, 0, cb.Len())
170+
171+
// Push all elements
172+
for _, v := range tt.input {
173+
cb.Push(v)
174+
}
175+
require.Equal(t, min(tt.capacity, len(tt.input)), cb.Len())
176+
177+
// Pop all elements and verify order
178+
got := make([]int, 0, cb.Len())
179+
for cb.Len() > 0 {
180+
v, ok := cb.Pop()
181+
require.True(t, ok)
182+
got = append(got, v)
183+
}
184+
require.Equal(t, tt.wantPops, got)
185+
186+
// Verify empty buffer behavior
187+
v, ok := cb.Pop()
188+
require.False(t, ok)
189+
require.Zero(t, v)
190+
require.Equal(t, 0, cb.Len())
191+
})
192+
}
193+
}
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
package scheduler
2+
3+
import (
4+
"container/heap"
5+
)
6+
7+
// PriorityQueue is a generic priority queue.
8+
type PriorityQueue[T any] struct {
9+
h *priorityHeap[T]
10+
}
11+
12+
// NewPriorityQueue creates a new priority queue.
13+
func NewPriorityQueue[T any](less func(T, T) bool) *PriorityQueue[T] {
14+
h := &priorityHeap[T]{
15+
less: less,
16+
heap: make([]T, 0),
17+
}
18+
heap.Init(h)
19+
return &PriorityQueue[T]{h: h}
20+
}
21+
22+
// Push adds an element to the queue.
23+
func (pq *PriorityQueue[T]) Push(v T) {
24+
heap.Push(pq.h, v)
25+
}
26+
27+
// Pop removes and returns the element with the highest priority from the queue.
28+
func (pq *PriorityQueue[T]) Pop() (T, bool) {
29+
if pq.Len() == 0 {
30+
var zero T
31+
return zero, false
32+
}
33+
return heap.Pop(pq.h).(T), true
34+
}
35+
36+
// Len returns the number of elements in the queue.
37+
func (pq *PriorityQueue[T]) Len() int {
38+
return pq.h.Len()
39+
}
40+
41+
// priorityHeap is the internal heap implementation that satisfies heap.Interface.
42+
type priorityHeap[T any] struct {
43+
less func(T, T) bool
44+
heap []T
45+
}
46+
47+
func (h *priorityHeap[T]) Len() int {
48+
return len(h.heap)
49+
}
50+
51+
func (h *priorityHeap[T]) Less(i, j int) bool {
52+
return h.less(h.heap[i], h.heap[j])
53+
}
54+
55+
func (h *priorityHeap[T]) Swap(i, j int) {
56+
h.heap[i], h.heap[j] = h.heap[j], h.heap[i]
57+
}
58+
59+
func (h *priorityHeap[T]) Push(x any) {
60+
h.heap = append(h.heap, x.(T))
61+
}
62+
63+
func (h *priorityHeap[T]) Pop() any {
64+
old := h.heap
65+
n := len(old)
66+
x := old[n-1]
67+
h.heap = old[0 : n-1]
68+
return x
69+
}
70+
71+
// CircularBuffer is a generic circular buffer.
72+
type CircularBuffer[T any] struct {
73+
buffer []T
74+
size int
75+
head int
76+
tail int
77+
}
78+
79+
// NewCircularBuffer creates a new circular buffer with the given capacity.
80+
func NewCircularBuffer[T any](capacity int) *CircularBuffer[T] {
81+
return &CircularBuffer[T]{
82+
buffer: make([]T, capacity),
83+
size: 0,
84+
head: 0,
85+
tail: 0,
86+
}
87+
}
88+
89+
// Push adds an element to the circular buffer and returns the evicted element if any
90+
func (b *CircularBuffer[T]) Push(v T) (T, bool) {
91+
var evicted T
92+
hasEvicted := false
93+
94+
if b.size == len(b.buffer) {
95+
// If buffer is full, evict the oldest element (at head)
96+
evicted = b.buffer[b.head]
97+
hasEvicted = true
98+
b.head = (b.head + 1) % len(b.buffer)
99+
} else {
100+
b.size++
101+
}
102+
103+
b.buffer[b.tail] = v
104+
b.tail = (b.tail + 1) % len(b.buffer)
105+
106+
return evicted, hasEvicted
107+
}
108+
109+
// Pop removes and returns the oldest element from the buffer
110+
func (b *CircularBuffer[T]) Pop() (T, bool) {
111+
if b.size == 0 {
112+
var zero T
113+
return zero, false
114+
}
115+
116+
v := b.buffer[b.head]
117+
b.head = (b.head + 1) % len(b.buffer)
118+
b.size--
119+
120+
return v, true
121+
}
122+
123+
// Len returns the number of elements in the buffer
124+
func (b *CircularBuffer[T]) Len() int {
125+
return b.size
126+
}

0 commit comments

Comments
 (0)