@@ -80,7 +80,7 @@ func New(
8080 // Queue to manage tasks
8181 queueMetrics := queue .NewMetrics (r , metricsNamespace , metricsSubsystem )
8282 queueLimits := NewQueueLimits (limits )
83- tasksQueue , err := queue .NewQueue (logger , cfg .Queue , queueLimits , queueMetrics )
83+ tasksQueue , err := queue .NewQueue (logger , cfg .Queue , queueLimits , queueMetrics , storageMetrics )
8484 if err != nil {
8585 return nil , fmt .Errorf ("error creating tasks queue: %w" , err )
8686 }
@@ -280,7 +280,8 @@ func (p *Planner) runOne(ctx context.Context) error {
280280
281281 now := time .Now ()
282282 for _ , task := range tasks {
283- queueTask := NewQueueTask (ctx , now , task , resultsCh )
283+ protoTask := task .ToProtoTask ()
284+ queueTask := NewQueueTask (ctx , now , protoTask , resultsCh )
284285 if err := p .enqueueTask (queueTask ); err != nil {
285286 level .Error (logger ).Log ("msg" , "error enqueuing task" , "err" , err )
286287 continue
@@ -703,7 +704,7 @@ func (p *Planner) tenants(ctx context.Context, table config.DayTable) (*iter.Sli
703704}
704705
705706func (p * Planner ) enqueueTask (task * QueueTask ) error {
706- return p .tasksQueue .Enqueue (task .Tenant () , task , func () {
707+ return p .tasksQueue .Enqueue (task .ProtoTask , task . TaskMeta , func () {
707708 task .timesEnqueued .Add (1 )
708709 })
709710}
@@ -738,7 +739,7 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer
738739
739740 lastIndex := queue .StartIndex
740741 for p .isRunningOrStopping () {
741- item , idx , err := p .tasksQueue .Dequeue (builder .Context (), lastIndex , builderID )
742+ protoTask , meta , idx , err := p .tasksQueue .Dequeue (builder .Context (), lastIndex , builderID )
742743 if err != nil {
743744 if errors .Is (err , queue .ErrStopped ) {
744745 // Planner is stopping, break the loop and return
@@ -748,36 +749,40 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer
748749 }
749750 lastIndex = idx
750751
751- if item == nil {
752+ if protoTask == nil {
752753 return fmt .Errorf ("dequeue() call resulted in nil response. builder: %s" , builderID )
753754 }
754755
755- task := item .(* QueueTask )
756- logger := log .With (logger , "task" , task .ID ())
756+ task := & QueueTask {
757+ ProtoTask : protoTask ,
758+ TaskMeta : meta .(* TaskMeta ),
759+ }
760+
761+ logger := log .With (logger , "task" , task .Id )
757762
758763 queueTime := time .Since (task .queueTime )
759764 p .metrics .queueDuration .Observe (queueTime .Seconds ())
760765
761766 if task .ctx .Err () != nil {
762767 level .Warn (logger ).Log ("msg" , "task context done after dequeue" , "err" , task .ctx .Err ())
763768 lastIndex = lastIndex .ReuseLastIndex ()
764- p .tasksQueue .Release (task )
769+ p .tasksQueue .Release (task . ProtoTask )
765770 continue
766771 }
767772
768773 result , err := p .forwardTaskToBuilder (builder , builderID , task )
769774 if err != nil {
770- maxRetries := p .limits .BloomTaskMaxRetries (task .Tenant () )
775+ maxRetries := p .limits .BloomTaskMaxRetries (task .Tenant )
771776 if maxRetries > 0 && int (task .timesEnqueued .Load ()) >= maxRetries {
772- p .tasksQueue .Release (task )
777+ p .tasksQueue .Release (task . ProtoTask )
773778 level .Error (logger ).Log (
774779 "msg" , "task failed after max retries" ,
775780 "retries" , task .timesEnqueued .Load (),
776781 "maxRetries" , maxRetries ,
777782 "err" , err ,
778783 )
779784 task .resultsChannel <- & protos.TaskResult {
780- TaskID : task .ID () ,
785+ TaskID : task .Id ,
781786 Error : fmt .Errorf ("task failed after max retries (%d): %w" , maxRetries , err ),
782787 }
783788 continue
@@ -786,10 +791,10 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer
786791 // Re-queue the task if the builder is failing to process the tasks
787792 if err := p .enqueueTask (task ); err != nil {
788793 p .metrics .taskLost .Inc ()
789- p .tasksQueue .Release (task )
794+ p .tasksQueue .Release (task . ProtoTask )
790795 level .Error (logger ).Log ("msg" , "error re-enqueuing task. this task will be lost" , "err" , err )
791796 task .resultsChannel <- & protos.TaskResult {
792- TaskID : task .ID () ,
797+ TaskID : task .Id ,
793798 Error : fmt .Errorf ("error re-enqueuing task: %w" , err ),
794799 }
795800 continue
@@ -809,7 +814,7 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer
809814 "duration" , time .Since (task .queueTime ).Seconds (),
810815 "retries" , task .timesEnqueued .Load ()- 1 , // -1 because the first enqueue is not a retry
811816 )
812- p .tasksQueue .Release (task )
817+ p .tasksQueue .Release (task . ProtoTask )
813818
814819 // Send the result back to the task. The channel is buffered, so this should not block.
815820 task .resultsChannel <- result
@@ -824,7 +829,7 @@ func (p *Planner) forwardTaskToBuilder(
824829 task * QueueTask ,
825830) (* protos.TaskResult , error ) {
826831 msg := & protos.PlannerToBuilder {
827- Task : task .ToProtoTask () ,
832+ Task : task .ProtoTask ,
828833 }
829834
830835 if err := builder .Send (msg ); err != nil {
@@ -846,7 +851,7 @@ func (p *Planner) forwardTaskToBuilder(
846851 }()
847852
848853 timeout := make (<- chan time.Time )
849- taskTimeout := p .limits .BuilderResponseTimeout (task .Tenant () )
854+ taskTimeout := p .limits .BuilderResponseTimeout (task .Tenant )
850855 if taskTimeout != 0 {
851856 // If the timeout is not 0 (disabled), configure it
852857 timeout = time .After (taskTimeout )
@@ -886,8 +891,8 @@ func (p *Planner) receiveResultFromBuilder(
886891 if err != nil {
887892 return nil , fmt .Errorf ("error processing task result in builder (%s): %w" , builderID , err )
888893 }
889- if result .TaskID != task .ID () {
890- return nil , fmt .Errorf ("unexpected task ID (%s) in response from builder (%s). Expected task ID is %s" , result .TaskID , builderID , task .ID () )
894+ if result .TaskID != task .Id {
895+ return nil , fmt .Errorf ("unexpected task ID (%s) in response from builder (%s). Expected task ID is %s" , result .TaskID , builderID , task .Id )
891896 }
892897
893898 return result , nil
0 commit comments