66 "flag"
77 "fmt"
88 "net/http"
9+ "sort"
910 "strconv"
1011 "strings"
1112 "time"
@@ -25,13 +26,12 @@ var (
2526)
2627
2728type Config struct {
28- ConsumerGroup string `yaml:"consumer_group"`
29- Interval time.Duration `yaml:"interval"`
30- LookbackPeriod time.Duration `yaml:"lookback_period"`
31- Strategy string `yaml:"strategy"`
32- TargetRecordCount int64 `yaml:"target_record_count"`
33- MaxJobsPlannedPerInterval int `yaml:"max_jobs_planned_per_interval"`
34- JobQueueConfig JobQueueConfig `yaml:"job_queue"`
29+ ConsumerGroup string `yaml:"consumer_group"`
30+ Interval time.Duration `yaml:"interval"`
31+ LookbackPeriod time.Duration `yaml:"lookback_period"`
32+ Strategy string `yaml:"strategy"`
33+ TargetRecordCount int64 `yaml:"target_record_count"`
34+ JobQueueConfig JobQueueConfig `yaml:"job_queue"`
3535}
3636
3737func (cfg * Config ) RegisterFlagsWithPrefix (prefix string , f * flag.FlagSet ) {
@@ -56,12 +56,6 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
5656 RecordCountStrategy ,
5757 ),
5858 )
59- f .IntVar (
60- & cfg .MaxJobsPlannedPerInterval ,
61- prefix + "max-jobs-planned-per-interval" ,
62- 100 ,
63- "Maximum number of jobs that the planner can return." ,
64- )
6559 cfg .JobQueueConfig .RegisterFlags (f )
6660}
6761
@@ -155,33 +149,18 @@ func (s *BlockScheduler) runOnce(ctx context.Context) error {
155149
156150 s .publishLagMetrics (lag )
157151
158- jobs , err := s .planner .Plan (ctx , s . cfg . MaxJobsPlannedPerInterval )
152+ jobs , err := s .planner .Plan (ctx , 1 ) // TODO(owen-d): parallelize work within a partition
159153 if err != nil {
160154 level .Error (s .logger ).Log ("msg" , "failed to plan jobs" , "err" , err )
161155 }
162156
163157 for _ , job := range jobs {
164158 // TODO: end offset keeps moving each time we plan jobs, maybe we should not use it as part of the job ID
165159
166- logger := log .With (
167- s .logger ,
168- "job" , job .Job .ID (),
169- "priority" , job .Priority ,
170- )
171-
172- status , ok := s .queue .Exists (job .Job )
173-
174- // scheduler is unaware of incoming job; enqueue
175- if ! ok {
176- level .Debug (logger ).Log (
177- "msg" , "job does not exist, enqueueing" ,
178- )
179-
180- // enqueue
181- if err := s .queue .Enqueue (job .Job , job .Priority ); err != nil {
182- level .Error (logger ).Log ("msg" , "failed to enqueue job" , "err" , err )
183- }
160+ added , status , err := s .idempotentEnqueue (job )
184161
162+ // if we've either added or encountered an error, move on; we're done this cycle
163+ if added || err != nil {
185164 continue
186165 }
187166
@@ -232,6 +211,34 @@ func (s *BlockScheduler) HandleGetJob(ctx context.Context) (*types.Job, bool, er
232211 }
233212}
234213
214+ // if added is true, the job was added to the queue, otherwise status is the current status of the job
215+ func (s * BlockScheduler ) idempotentEnqueue (job * JobWithMetadata ) (added bool , status types.JobStatus , err error ) {
216+ logger := log .With (
217+ s .logger ,
218+ "job" , job .Job .ID (),
219+ "priority" , job .Priority ,
220+ )
221+
222+ status , ok := s .queue .Exists (job .Job )
223+
224+ // scheduler is unaware of incoming job; enqueue
225+ if ! ok {
226+ level .Debug (logger ).Log (
227+ "msg" , "job does not exist, enqueueing" ,
228+ )
229+
230+ // enqueue
231+ if err := s .queue .Enqueue (job .Job , job .Priority ); err != nil {
232+ level .Error (logger ).Log ("msg" , "failed to enqueue job" , "err" , err )
233+ return false , types .JobStatusUnknown , err
234+ }
235+
236+ return true , types .JobStatusPending , nil
237+ }
238+
239+ return false , status , nil
240+ }
241+
235242func (s * BlockScheduler ) HandleCompleteJob (ctx context.Context , job * types.Job , success bool ) (err error ) {
236243 logger := log .With (s .logger , "job" , job .ID ())
237244
@@ -243,6 +250,23 @@ func (s *BlockScheduler) HandleCompleteJob(ctx context.Context, job *types.Job,
243250 ); err == nil {
244251 s .queue .MarkComplete (job .ID (), types .JobStatusComplete )
245252 level .Info (logger ).Log ("msg" , "job completed successfully" )
253+
254+ // TODO(owen-d): cleaner way to enqueue next job for this partition,
255+ // don't make it part of the response cycle to job completion, etc.
256+ jobs , err := s .planner .Plan (ctx , 1 )
257+ if err != nil {
258+ level .Error (logger ).Log ("msg" , "failed to plan subsequent jobs" , "err" , err )
259+ }
260+
261+ // find first job for this partition
262+ nextJob := sort .Search (len (jobs ), func (i int ) bool {
263+ return jobs [i ].Job .Partition () >= job .Partition ()
264+ })
265+
266+ if nextJob < len (jobs ) && jobs [nextJob ].Job .Partition () == job .Partition () {
267+ _ , _ , _ = s .idempotentEnqueue (jobs [nextJob ])
268+ }
269+
246270 return nil
247271 }
248272
0 commit comments