Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
removes unused builderID from scheduler interfaces
  • Loading branch information
owen-d committed Dec 10, 2024
commit 5576b0c9f33c7d5e5f7a636f31e3057e12982297
2 changes: 1 addition & 1 deletion pkg/blockbuilder/scheduler/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (q *JobQueue) GetStatus(id string) (types.JobStatus, bool) {
}

// SyncJob registers a job as in-progress, used for restoring state after scheduler restarts
func (q *JobQueue) SyncJob(jobID string, _ string, job *types.Job) {
func (q *JobQueue) SyncJob(jobID string, job *types.Job) {
q.mu.Lock()
defer q.mu.Unlock()

Expand Down
8 changes: 4 additions & 4 deletions pkg/blockbuilder/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (s *BlockScheduler) publishLagMetrics(lag map[int32]kadm.GroupMemberLag) {
}
}

func (s *BlockScheduler) HandleGetJob(ctx context.Context, _ string) (*types.Job, bool, error) {
func (s *BlockScheduler) HandleGetJob(ctx context.Context) (*types.Job, bool, error) {
select {
case <-ctx.Done():
return nil, false, ctx.Err()
Expand All @@ -214,7 +214,7 @@ func (s *BlockScheduler) HandleGetJob(ctx context.Context, _ string) (*types.Job
}
}

func (s *BlockScheduler) HandleCompleteJob(_ context.Context, _ string, job *types.Job, success bool) error {
func (s *BlockScheduler) HandleCompleteJob(_ context.Context, job *types.Job, success bool) error {
logger := log.With(s.logger, "job", job.ID())

if success {
Expand All @@ -228,7 +228,7 @@ func (s *BlockScheduler) HandleCompleteJob(_ context.Context, _ string, job *typ
return nil
}

func (s *BlockScheduler) HandleSyncJob(_ context.Context, builderID string, job *types.Job) error {
s.queue.SyncJob(job.ID(), builderID, job)
func (s *BlockScheduler) HandleSyncJob(_ context.Context, job *types.Job) error {
s.queue.SyncJob(job.ID(), job)
return nil
}
6 changes: 3 additions & 3 deletions pkg/blockbuilder/types/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ type BuilderTransport interface {
// SchedulerHandler defines the business logic for handling builder requests
type SchedulerHandler interface {
// HandleGetJob processes a request for a new job
HandleGetJob(ctx context.Context, builderID string) (*Job, bool, error)
HandleGetJob(ctx context.Context) (*Job, bool, error)
// HandleCompleteJob processes a job completion notification
HandleCompleteJob(ctx context.Context, builderID string, job *Job, success bool) error
HandleCompleteJob(ctx context.Context, job *Job, success bool) error
// HandleSyncJob processes a job sync request
HandleSyncJob(ctx context.Context, builderID string, job *Job) error
HandleSyncJob(ctx context.Context, job *Job) error
}

// Request/Response message types
Expand Down
6 changes: 3 additions & 3 deletions pkg/blockbuilder/types/scheduler_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func NewSchedulerServer(handler SchedulerHandler) proto.SchedulerServiceServer {

// GetJob implements proto.SchedulerServiceServer
func (s *schedulerServer) GetJob(ctx context.Context, req *proto.GetJobRequest) (*proto.GetJobResponse, error) {
job, ok, err := s.handler.HandleGetJob(ctx, req.BuilderId)
job, ok, err := s.handler.HandleGetJob(ctx)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
Expand All @@ -39,15 +39,15 @@ func (s *schedulerServer) GetJob(ctx context.Context, req *proto.GetJobRequest)

// CompleteJob implements proto.SchedulerServiceServer
func (s *schedulerServer) CompleteJob(ctx context.Context, req *proto.CompleteJobRequest) (*proto.CompleteJobResponse, error) {
if err := s.handler.HandleCompleteJob(ctx, req.BuilderId, protoToJob(req.Job), req.Success); err != nil {
if err := s.handler.HandleCompleteJob(ctx, protoToJob(req.Job), req.Success); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
return &proto.CompleteJobResponse{}, nil
}

// SyncJob implements proto.SchedulerServiceServer
func (s *schedulerServer) SyncJob(ctx context.Context, req *proto.SyncJobRequest) (*proto.SyncJobResponse, error) {
if err := s.handler.HandleSyncJob(ctx, req.BuilderId, protoToJob(req.Job)); err != nil {
if err := s.handler.HandleSyncJob(ctx, protoToJob(req.Job)); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
return &proto.SyncJobResponse{}, nil
Expand Down
6 changes: 3 additions & 3 deletions pkg/blockbuilder/types/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func NewMemoryTransport(scheduler SchedulerHandler) *MemoryTransport {
}

func (t *MemoryTransport) SendGetJobRequest(ctx context.Context, req *GetJobRequest) (*GetJobResponse, error) {
job, ok, err := t.scheduler.HandleGetJob(ctx, req.BuilderID)
job, ok, err := t.scheduler.HandleGetJob(ctx)
if err != nil {
return nil, err
}
Expand All @@ -48,9 +48,9 @@ func (t *MemoryTransport) SendGetJobRequest(ctx context.Context, req *GetJobRequ
}

func (t *MemoryTransport) SendCompleteJob(ctx context.Context, req *CompleteJobRequest) error {
return t.scheduler.HandleCompleteJob(ctx, req.BuilderID, req.Job, req.Success)
return t.scheduler.HandleCompleteJob(ctx, req.Job, req.Success)
}

func (t *MemoryTransport) SendSyncJob(ctx context.Context, req *SyncJobRequest) error {
return t.scheduler.HandleSyncJob(ctx, req.BuilderID, req.Job)
return t.scheduler.HandleSyncJob(ctx, req.Job)
}