Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
circular buffer lookup & metadata exists supporting fns
  • Loading branch information
owen-d committed Dec 10, 2024
commit 67fc8ac9b4b7c6bd27f55276958cc28642d326db
51 changes: 51 additions & 0 deletions pkg/blockbuilder/scheduler/prioritiy_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,3 +250,54 @@ func TestCircularBuffer(t *testing.T) {
})
}
}

func TestCircularBufferLookup(t *testing.T) {
t.Run("empty buffer", func(t *testing.T) {
cb := NewCircularBuffer[int](5)
_, ok := cb.Lookup(func(i int) bool { return i == 1 })
require.False(t, ok)
})

t.Run("single element", func(t *testing.T) {
cb := NewCircularBuffer[int](5)
cb.Push(1)
v, ok := cb.Lookup(func(i int) bool { return i == 1 })
require.True(t, ok)
require.Equal(t, 1, v)
})

t.Run("multiple elements", func(t *testing.T) {
cb := NewCircularBuffer[int](5)
for i := 1; i <= 3; i++ {
cb.Push(i)
}
v, ok := cb.Lookup(func(i int) bool { return i == 2 })
require.True(t, ok)
require.Equal(t, 2, v)
})

t.Run("wrapped buffer", func(t *testing.T) {
cb := NewCircularBuffer[int](3)
// Push 5 elements into a buffer of size 3, causing wrap-around
for i := 1; i <= 5; i++ {
cb.Push(i)
}
// Buffer should now contain [4,5,3] with head at index 2
v, ok := cb.Lookup(func(i int) bool { return i == 4 })
require.True(t, ok)
require.Equal(t, 4, v)

// Element that was evicted should not be found
_, ok = cb.Lookup(func(i int) bool { return i == 1 })
require.False(t, ok)
})

t.Run("no match", func(t *testing.T) {
cb := NewCircularBuffer[int](5)
for i := 1; i <= 3; i++ {
cb.Push(i)
}
_, ok := cb.Lookup(func(i int) bool { return i == 99 })
require.False(t, ok)
})
}
13 changes: 13 additions & 0 deletions pkg/blockbuilder/scheduler/priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,3 +191,16 @@ func (b *CircularBuffer[V]) Pop() (V, bool) {
func (b *CircularBuffer[V]) Len() int {
return b.size
}

// returns the first element in the buffer that satisfies the given predicate
func (b *CircularBuffer[V]) Lookup(f func(V) bool) (V, bool) {
for i := 0; i < b.size; i++ {
idx := (b.head + i) % len(b.buffer)
if f(b.buffer[idx]) {
return b.buffer[idx], true
}

}
var zero V
return zero, false
}
28 changes: 26 additions & 2 deletions pkg/blockbuilder/scheduler/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,32 @@ func (q *JobQueue) Exists(job *types.Job) (types.JobStatus, bool) {
q.mu.RLock()
defer q.mu.RUnlock()

status, ok := q.statusMap[job.ID()]
return status, ok
x, ok := q.existsLockLess(job.ID())
if !ok {
return types.JobStatusUnknown, false
}
return x.Status, ok
}

func (q *JobQueue) existsLockLess(id string) (*JobWithMetadata, bool) {
status, ok := q.statusMap[id]
if !ok {
return nil, false
}

switch status {
case types.JobStatusPending:
return q.pending.Lookup(id)
case types.JobStatusInProgress:
res, ok := q.inProgress[id]
return res, ok
case types.JobStatusComplete:
return q.completed.Lookup(func(jwm *JobWithMetadata) bool {
return jwm.ID() == id
})
default:
return nil, false
}
}

// Enqueue adds a job to the pending queue with the given priority
Expand Down
3 changes: 2 additions & 1 deletion pkg/blockbuilder/types/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ func (j *Job) Offsets() Offsets {
type JobStatus int

const (
JobStatusPending JobStatus = iota
JobStatusUnknown JobStatus = iota // zero value, largely unused
JobStatusPending
JobStatusInProgress
JobStatusComplete
JobStatusFailed // Job failed and may be retried
Expand Down