Skip to content

Conversation

@owen-d
Copy link
Member

@owen-d owen-d commented Dec 4, 2024

Block Builder Scheduler Refactoring: Priority-Based Job Queue

Overview

This PR refactors the block builder scheduler to introduce a more sophisticated job prioritization system. The changes improve job scheduling efficiency and provide better control over job execution order.

Key Changes

1. Generic Priority Queue Implementation

  • Added JobWithPriority[T comparable] type to support flexible priority mechanisms
  • Implemented a generic priority queue using Go's container/heap
  • Added comprehensive test coverage for priority queue operations

2. Circular Buffer for Completed Jobs

  • Implemented a fixed-size circular buffer to track the last N completed jobs
  • Maintains FIFO order with efficient O(1) operations
  • Automatically evicts oldest jobs when capacity is reached

3. Job Queue Enhancements

  • Converted from map-based storage to priority queue for pending jobs
  • Added O(1) job status tracking via statusMap
  • Improved job duration tracking with timestamps
  • Added support for job synchronization after scheduler restarts

4. Planner Interface Updates

  • Modified Planner interface to return []*JobWithPriority[int]
  • Updated RecordCountPlanner to prioritize based on record count
  • Enhanced TimeRangePlanner to use consistent priority mechanism
  • Added deterministic job ordering by partition
@owen-d owen-d requested a review from a team as a code owner December 4, 2024 05:50
Copy link
Contributor

@ashwanthgoli ashwanthgoli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM


assignment, exists := q.inProgress[jobID]
// Find job in in-progress map
inProgressJob, exists := q.inProgress[jobID]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe this should not error? if a scheduler restarted and lost it's state we'd miss committing already consumed jobs.

but this can be worried about later when we add the committer logic

if !exists {
return fmt.Errorf("job %s not found in progress", jobID)
// Check if job already exists
if status, exists := q.statusMap[jobID]; exists {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should not error?

@owen-d owen-d merged commit 0981273 into grafana:main Dec 4, 2024
58 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

2 participants