Skip to content

Conversation

@owen-d
Copy link
Member

@owen-d owen-d commented Dec 10, 2024

Block Builder Scheduler Updates

Key improvements to the scheduler component focused on jobs and state management:

Job Handling

  • Better state transitions with some test coverage
  • Improved job metadata organization

Queue Improvements

  • Added key lookups to priority queue
  • Circular buffer for completed jobs
  • Better timestamp and status handling

Other Changes

  • Cleaned up scheduler interfaces
  • Better transport layer separation

No migration needed - internal changes only. Next up: metrics, docs, and retry policies.

@owen-d owen-d requested a review from a team as a code owner December 10, 2024 09:15
logger := log.With(s.logger, "job", job.ID())

if success {
// TODO(owen-d): do i need to increment offset here?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we need to commit job.Offsets().Max-1 given builders only consume upto but not including Max

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good

if err = s.offsetManager.Commit(
ctx,
job.Partition(),
job.Offsets().Max,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

next jobs would start with job.Offsets().Max

Suggested change
job.Offsets().Max,
job.Offsets().Max-1,
s.queue.pending.Push(
NewJobWithMetadata(
job,
DefaultPriority,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't failed jobs instead get a higher priority?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but I didn't want to complicate the PR further by adding logic for priority discovery on retries. I think this is best left for a followup PR.

type priorityHeap[V any] struct {
less func(V, V) bool
heap []*item[V]
idx map[int]*item[V] // Maps index to item for efficient updates
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we use this for lookups? i don't see any references

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good call 👍


// Add new item
idx := pq.h.Len()
it := &item[V]{value: v, index: idx}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a bit of duplication, we update the index of the item here and within the heap's Push() method.

Copy link
Contributor

@ashwanthgoli ashwanthgoli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@ashwanthgoli ashwanthgoli merged commit f2bff77 into grafana:main Dec 11, 2024
58 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

2 participants