Skip to content

Commit 558080c

Browse files
authored
feat(block-scheduler): introduce job lease and requeue expired jobs (#15560)
1 parent 61df085 commit 558080c

File tree

6 files changed

+220
-23
lines changed

6 files changed

+220
-23
lines changed

‎docs/sources/shared/configuration.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,17 @@ block_scheduler:
236236
# CLI flag: -block-scheduler.max-jobs-planned-per-interval
237237
[max_jobs_planned_per_interval: <int> | default = 100]
238238

239+
job_queue:
240+
# Interval to check for expired job leases
241+
# CLI flag: -jobqueue.lease-expiry-check-interval
242+
[lease_expiry_check_interval: <duration> | default = 1m]
243+
244+
# Duration after which a job lease is considered expired if the scheduler
245+
# receives no updates from builders about the job. Expired jobs are
246+
# re-enqueued
247+
# CLI flag: -jobqueue.lease-duration
248+
[lease_duration: <duration> | default = 10m]
249+
239250
pattern_ingester:
240251
# Whether the pattern ingester is enabled.
241252
# CLI flag: -pattern-ingester.enabled

‎pkg/blockbuilder/scheduler/queue.go

Lines changed: 83 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package scheduler
22

33
import (
4+
"context"
5+
"errors"
6+
"flag"
47
"fmt"
58
"sync"
69
"time"
@@ -21,6 +24,7 @@ const (
2124
// JobWithMetadata wraps a job with additional metadata for tracking its lifecycle
2225
type JobWithMetadata struct {
2326
*types.Job
27+
2428
Priority int
2529
Status types.JobStatus
2630
StartTime time.Time
@@ -60,21 +64,36 @@ func newJobQueueMetrics(r prometheus.Registerer) *jobQueueMetrics {
6064
}
6165
}
6266

67+
type JobQueueConfig struct {
68+
LeaseExpiryCheckInterval time.Duration `yaml:"lease_expiry_check_interval"`
69+
LeaseDuration time.Duration `yaml:"lease_duration"`
70+
}
71+
72+
func (cfg *JobQueueConfig) RegisterFlags(f *flag.FlagSet) {
73+
f.DurationVar(&cfg.LeaseExpiryCheckInterval, "jobqueue.lease-expiry-check-interval", 1*time.Minute, "Interval to check for expired job leases")
74+
f.DurationVar(&cfg.LeaseDuration, "jobqueue.lease-duration", 10*time.Minute, "Duration after which a job lease is considered expired if the scheduler receives no updates from builders about the job. Expired jobs are re-enqueued")
75+
}
76+
6377
// JobQueue manages the queue of pending jobs and tracks their state.
6478
type JobQueue struct {
65-
logger log.Logger
79+
cfg JobQueueConfig
80+
81+
mu sync.RWMutex
6682
pending *PriorityQueue[string, *JobWithMetadata] // Jobs waiting to be processed, ordered by priority
6783
inProgress map[string]*JobWithMetadata // Jobs currently being processed
6884
completed *CircularBuffer[*JobWithMetadata] // Last N completed jobs
6985
statusMap map[string]types.JobStatus // Maps job ID to its current status
70-
metrics *jobQueueMetrics
71-
mu sync.RWMutex
86+
87+
logger log.Logger
88+
metrics *jobQueueMetrics
7289
}
7390

7491
// NewJobQueue creates a new job queue instance
75-
func NewJobQueue(logger log.Logger, reg prometheus.Registerer) *JobQueue {
92+
func NewJobQueue(cfg JobQueueConfig, logger log.Logger, reg prometheus.Registerer) *JobQueue {
7693
return &JobQueue{
94+
cfg: cfg,
7795
logger: logger,
96+
7897
pending: NewPriorityQueue(
7998
func(a, b *JobWithMetadata) bool {
8099
return a.Priority > b.Priority // Higher priority first
@@ -88,6 +107,49 @@ func NewJobQueue(logger log.Logger, reg prometheus.Registerer) *JobQueue {
88107
}
89108
}
90109

110+
func (q *JobQueue) RunLeaseExpiryChecker(ctx context.Context) {
111+
ticker := time.NewTicker(q.cfg.LeaseExpiryCheckInterval)
112+
defer ticker.Stop()
113+
114+
for {
115+
select {
116+
case <-ticker.C:
117+
level.Debug(q.logger).Log("msg", "checking for expired job leases")
118+
if err := q.requeueExpiredJobs(); err != nil {
119+
level.Error(q.logger).Log("msg", "failed to requeue expired jobs", "err", err)
120+
}
121+
case <-ctx.Done():
122+
return
123+
}
124+
}
125+
}
126+
127+
func (q *JobQueue) requeueExpiredJobs() error {
128+
q.mu.Lock()
129+
defer q.mu.Unlock()
130+
131+
var multiErr error
132+
for id, job := range q.inProgress {
133+
if time.Since(job.UpdateTime) > q.cfg.LeaseDuration {
134+
level.Warn(q.logger).Log("msg", "job lease expired. requeuing", "job", id, "update_time", job.UpdateTime, "now", time.Now())
135+
136+
// complete the job with expired status and re-enqueue
137+
delete(q.inProgress, id)
138+
q.metrics.inProgress.Dec()
139+
140+
job.Status = types.JobStatusExpired
141+
q.addToCompletedBuffer(job)
142+
143+
if err := q.enqueueLockLess(job.Job, job.Priority); err != nil {
144+
level.Error(q.logger).Log("msg", "failed to requeue expired job", "job", id, "err", err)
145+
multiErr = errors.Join(multiErr, err)
146+
}
147+
}
148+
}
149+
150+
return multiErr
151+
}
152+
91153
// Exists checks if a job exists in any state and returns its status
92154
func (q *JobQueue) Exists(job *types.Job) (types.JobStatus, bool) {
93155
q.mu.RLock()
@@ -126,8 +188,12 @@ func (q *JobQueue) Enqueue(job *types.Job, priority int) error {
126188
q.mu.Lock()
127189
defer q.mu.Unlock()
128190

191+
return q.enqueueLockLess(job, priority)
192+
}
193+
194+
func (q *JobQueue) enqueueLockLess(job *types.Job, priority int) error {
129195
// Check if job already exists
130-
if status, exists := q.statusMap[job.ID()]; exists {
196+
if status, exists := q.statusMap[job.ID()]; exists && status != types.JobStatusExpired {
131197
return fmt.Errorf("job %s already exists with status %v", job.ID(), status)
132198
}
133199

@@ -203,20 +269,29 @@ func (q *JobQueue) MarkComplete(id string, status types.JobStatus) {
203269
level.Error(q.logger).Log("msg", "failed to remove job from pending queue", "job", id)
204270
}
205271
q.metrics.pending.Dec()
272+
case types.JobStatusComplete:
273+
level.Info(q.logger).Log("msg", "job is already complete, ignoring", "job", id)
274+
return
206275
default:
207276
level.Error(q.logger).Log("msg", "unknown job status, cannot mark as complete", "job", id, "status", status)
277+
return
208278
}
209279

210280
jobMeta.Status = status
211281
jobMeta.UpdateTime = time.Now()
212282

213-
// add it to the completed buffer, removing any evicted job from the statusMap
283+
q.addToCompletedBuffer(jobMeta)
284+
}
285+
286+
// add it to the completed buffer, removing any evicted job from the statusMap
287+
func (q *JobQueue) addToCompletedBuffer(jobMeta *JobWithMetadata) {
214288
removal, evicted := q.completed.Push(jobMeta)
215289
if evicted {
216290
delete(q.statusMap, removal.ID())
217291
}
218-
q.statusMap[id] = status
219-
q.metrics.completed.WithLabelValues(status.String()).Inc()
292+
293+
q.statusMap[jobMeta.ID()] = jobMeta.Status
294+
q.metrics.completed.WithLabelValues(jobMeta.Status.String()).Inc()
220295
}
221296

222297
// SyncJob registers a job as in-progress or updates its UpdateTime if already in progress

‎pkg/blockbuilder/scheduler/queue_test.go

Lines changed: 114 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,11 @@ import (
1010
"github.com/grafana/loki/v3/pkg/blockbuilder/types"
1111
)
1212

13+
var testQueueCfg = JobQueueConfig{}
14+
1315
func TestJobQueue_SyncJob(t *testing.T) {
1416
t.Run("non-existent to in-progress", func(t *testing.T) {
15-
q := NewJobQueue(log.NewNopLogger(), nil)
17+
q := NewJobQueue(testQueueCfg, log.NewNopLogger(), nil)
1618
job := types.NewJob(1, types.Offsets{Min: 100, Max: 200})
1719
jobID := job.ID()
1820

@@ -29,7 +31,7 @@ func TestJobQueue_SyncJob(t *testing.T) {
2931
})
3032

3133
t.Run("pending to in-progress", func(t *testing.T) {
32-
q := NewJobQueue(log.NewNopLogger(), nil)
34+
q := NewJobQueue(testQueueCfg, log.NewNopLogger(), nil)
3335
job := types.NewJob(1, types.Offsets{Min: 100, Max: 200})
3436

3537
// Start with pending job
@@ -52,7 +54,7 @@ func TestJobQueue_SyncJob(t *testing.T) {
5254
})
5355

5456
t.Run("already in-progress", func(t *testing.T) {
55-
q := NewJobQueue(log.NewNopLogger(), nil)
57+
q := NewJobQueue(testQueueCfg, log.NewNopLogger(), nil)
5658
job := types.NewJob(1, types.Offsets{Min: 100, Max: 200})
5759

5860
// First sync to put in in-progress
@@ -73,7 +75,7 @@ func TestJobQueue_SyncJob(t *testing.T) {
7375

7476
func TestJobQueue_MarkComplete(t *testing.T) {
7577
t.Run("in-progress to complete", func(t *testing.T) {
76-
q := NewJobQueue(log.NewNopLogger(), nil)
78+
q := NewJobQueue(testQueueCfg, log.NewNopLogger(), nil)
7779
job := types.NewJob(1, types.Offsets{Min: 100, Max: 200})
7880

7981
// Start with in-progress job
@@ -103,7 +105,7 @@ func TestJobQueue_MarkComplete(t *testing.T) {
103105
})
104106

105107
t.Run("pending to complete", func(t *testing.T) {
106-
q := NewJobQueue(log.NewNopLogger(), nil)
108+
q := NewJobQueue(testQueueCfg, log.NewNopLogger(), nil)
107109
job := types.NewJob(1, types.Offsets{Min: 100, Max: 200})
108110

109111
// Start with pending job
@@ -130,7 +132,7 @@ func TestJobQueue_MarkComplete(t *testing.T) {
130132
})
131133

132134
t.Run("non-existent job", func(t *testing.T) {
133-
q := NewJobQueue(log.NewNopLogger(), nil)
135+
q := NewJobQueue(testQueueCfg, log.NewNopLogger(), nil)
134136
logger := &testLogger{t: t}
135137
q.logger = logger
136138

@@ -139,7 +141,7 @@ func TestJobQueue_MarkComplete(t *testing.T) {
139141
})
140142

141143
t.Run("already completed job", func(t *testing.T) {
142-
q := NewJobQueue(log.NewNopLogger(), nil)
144+
q := NewJobQueue(testQueueCfg, log.NewNopLogger(), nil)
143145
logger := &testLogger{t: t}
144146
q.logger = logger
145147

@@ -153,6 +155,111 @@ func TestJobQueue_MarkComplete(t *testing.T) {
153155
})
154156
}
155157

158+
func TestJobQueue_Enqueue(t *testing.T) {
159+
q := NewJobQueue(testQueueCfg, log.NewNopLogger(), nil)
160+
161+
job := types.NewJob(1, types.Offsets{Min: 100, Max: 200})
162+
163+
beforeComplete := time.Now()
164+
err := q.Enqueue(job, 1)
165+
afterComplete := time.Now()
166+
require.NoError(t, err)
167+
168+
status, ok := q.Exists(job)
169+
require.True(t, ok, "job should exist")
170+
require.Equal(t, types.JobStatusPending, status)
171+
172+
// Verify job in pending queue
173+
foundJob, ok := q.pending.Lookup(job.ID())
174+
require.True(t, ok, "job should be in pending queue")
175+
require.Equal(t, job, foundJob.Job)
176+
require.Equal(t, 1, foundJob.Priority)
177+
require.True(t, foundJob.StartTime.IsZero())
178+
require.True(t, foundJob.UpdateTime.After(beforeComplete) || foundJob.UpdateTime.Equal(beforeComplete))
179+
require.True(t, foundJob.UpdateTime.Before(afterComplete) || foundJob.UpdateTime.Equal(afterComplete))
180+
181+
// allow enqueueing of job with same ID if expired
182+
job2 := types.NewJob(2, types.Offsets{Min: 100, Max: 200})
183+
q.statusMap[job2.ID()] = types.JobStatusExpired
184+
185+
err = q.Enqueue(job2, 2)
186+
require.NoError(t, err)
187+
188+
status, ok = q.Exists(job2)
189+
require.True(t, ok, "job should exist")
190+
require.Equal(t, types.JobStatusPending, status)
191+
192+
// Verify job2 in pending queue
193+
foundJob, ok = q.pending.Lookup(job2.ID())
194+
require.True(t, ok, "job2 should be in pending queue")
195+
require.Equal(t, job2, foundJob.Job)
196+
require.Equal(t, 2, foundJob.Priority)
197+
198+
// do not allow enqueueing of job with same ID if not expired
199+
job3 := types.NewJob(3, types.Offsets{Min: 120, Max: 230})
200+
q.statusMap[job3.ID()] = types.JobStatusInProgress
201+
202+
err = q.Enqueue(job3, DefaultPriority)
203+
require.Error(t, err)
204+
}
205+
206+
func TestJobQueue_RequeueExpiredJobs(t *testing.T) {
207+
q := NewJobQueue(JobQueueConfig{
208+
LeaseDuration: 5 * time.Minute,
209+
}, log.NewNopLogger(), nil)
210+
211+
job1 := &JobWithMetadata{
212+
Job: types.NewJob(1, types.Offsets{Min: 100, Max: 200}),
213+
Priority: 1,
214+
Status: types.JobStatusInProgress,
215+
StartTime: time.Now().Add(-time.Hour),
216+
UpdateTime: time.Now().Add(-time.Minute),
217+
}
218+
// expired job
219+
job2 := &JobWithMetadata{
220+
Job: types.NewJob(2, types.Offsets{Min: 300, Max: 400}),
221+
Priority: 2,
222+
Status: types.JobStatusInProgress,
223+
StartTime: time.Now().Add(-time.Hour),
224+
UpdateTime: time.Now().Add(-6 * time.Minute),
225+
}
226+
227+
q.inProgress[job1.ID()] = job1
228+
q.inProgress[job2.ID()] = job2
229+
q.statusMap[job1.ID()] = types.JobStatusInProgress
230+
q.statusMap[job2.ID()] = types.JobStatusInProgress
231+
232+
beforeRequeue := time.Now()
233+
err := q.requeueExpiredJobs()
234+
require.NoError(t, err)
235+
236+
status, ok := q.statusMap[job1.ID()]
237+
require.True(t, ok)
238+
require.Equal(t, types.JobStatusInProgress, status)
239+
240+
got, ok := q.inProgress[job1.ID()]
241+
require.True(t, ok)
242+
require.Equal(t, job1, got)
243+
244+
status, ok = q.statusMap[job2.ID()]
245+
require.True(t, ok)
246+
require.Equal(t, types.JobStatusPending, status)
247+
248+
got, ok = q.pending.Lookup(job2.ID())
249+
require.True(t, ok)
250+
require.Equal(t, job2.Job, got.Job)
251+
require.Equal(t, types.JobStatusPending, got.Status)
252+
require.Equal(t, job2.Priority, got.Priority)
253+
require.True(t, got.StartTime.IsZero())
254+
require.True(t, got.UpdateTime.After(beforeRequeue) || got.UpdateTime.Equal(beforeRequeue))
255+
256+
require.Equal(t, 1, q.completed.Len())
257+
got, ok = q.completed.Pop()
258+
require.True(t, ok)
259+
job2.Status = types.JobStatusExpired
260+
require.Equal(t, job2, got)
261+
}
262+
156263
// testLogger implements log.Logger for testing
157264
type testLogger struct {
158265
t *testing.T

‎pkg/blockbuilder/scheduler/scheduler.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,13 @@ var (
2525
)
2626

2727
type Config struct {
28-
ConsumerGroup string `yaml:"consumer_group"`
29-
Interval time.Duration `yaml:"interval"`
30-
LookbackPeriod time.Duration `yaml:"lookback_period"`
31-
Strategy string `yaml:"strategy"`
32-
TargetRecordCount int64 `yaml:"target_record_count"`
33-
MaxJobsPlannedPerInterval int `yaml:"max_jobs_planned_per_interval"`
28+
ConsumerGroup string `yaml:"consumer_group"`
29+
Interval time.Duration `yaml:"interval"`
30+
LookbackPeriod time.Duration `yaml:"lookback_period"`
31+
Strategy string `yaml:"strategy"`
32+
TargetRecordCount int64 `yaml:"target_record_count"`
33+
MaxJobsPlannedPerInterval int `yaml:"max_jobs_planned_per_interval"`
34+
JobQueueConfig JobQueueConfig `yaml:"job_queue"`
3435
}
3536

3637
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
@@ -61,6 +62,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
6162
100,
6263
"Maximum number of jobs that the planner can return.",
6364
)
65+
cfg.JobQueueConfig.RegisterFlags(f)
6466
}
6567

6668
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
@@ -128,6 +130,8 @@ func (s *BlockScheduler) running(ctx context.Context) error {
128130
level.Error(s.logger).Log("msg", "failed to schedule jobs", "err", err)
129131
}
130132

133+
go s.queue.RunLeaseExpiryChecker(ctx)
134+
131135
ticker := time.NewTicker(s.cfg.Interval)
132136
for {
133137
select {

‎pkg/blockbuilder/scheduler/scheduler_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func (m *mockOffsetManager) Commit(_ context.Context, _ int32, _ int64) error {
4141
}
4242

4343
func newTestEnv(builderID string) (*testEnv, error) {
44-
queue := NewJobQueue(log.NewNopLogger(), nil)
44+
queue := NewJobQueue(testQueueCfg, log.NewNopLogger(), nil)
4545
mockOffsetMgr := &mockOffsetManager{
4646
topic: "test-topic",
4747
consumerGroup: "test-group",

0 commit comments

Comments
 (0)