Skip to content

feat(blockbuilder): grpc transport #15218

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions pkg/blockbuilder/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ import (
type testEnv struct {
queue *JobQueue
scheduler *BlockScheduler
transport *builder.MemoryTransport
transport *types.MemoryTransport
builder *builder.Worker
}

func newTestEnv(builderID string) *testEnv {
queue := NewJobQueue()
scheduler := NewScheduler(Config{}, queue, nil, log.NewNopLogger(), prometheus.NewRegistry())
transport := builder.NewMemoryTransport(scheduler)
builder := builder.NewWorker(builderID, builder.NewMemoryTransport(scheduler))
transport := types.NewMemoryTransport(scheduler)
builder := builder.NewWorker(builderID, transport)

return &testEnv{
queue: queue,
Expand Down Expand Up @@ -89,7 +89,7 @@ func TestMultipleBuilders(t *testing.T) {
// Create first environment
env1 := newTestEnv("test-builder-1")
// Create second builder using same scheduler
builder2 := builder.NewWorker("test-builder-2", builder.NewMemoryTransport(env1.scheduler))
builder2 := builder.NewWorker("test-builder-2", env1.transport)

ctx := context.Background()

Expand Down
147 changes: 147 additions & 0 deletions pkg/blockbuilder/types/grpc_transport.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package types

import (
"context"
"flag"
"io"

"github.com/grafana/dskit/grpcclient"
"github.com/grafana/dskit/instrument"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/grafana/loki/v3/pkg/blockbuilder/types/proto"
"github.com/grafana/loki/v3/pkg/util/constants"
)

var _ Transport = &GRPCTransport{}

type GRPCTransportConfig struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should we embed this in builder config? ignore me if it is planned for a follow-up

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I planned to leave this for when we're ready to wire it together.

Address string `yaml:"address,omitempty"`

// GRPCClientConfig configures the gRPC connection between the Bloom Gateway client and the server.
Copy link
Contributor

@ashwanthgoli ashwanthgoli Dec 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// GRPCClientConfig configures the gRPC connection between the Bloom Gateway client and the server.
// GRPCClientConfig configures the gRPC connection between the block builder client and the scheduler.
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"`
}

func (cfg *GRPCTransportConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&cfg.Address, prefix+"address", "", "address in DNS Service Discovery format: https://grafana.com/docs/mimir/latest/configure/about-dns-service-discovery/#supported-discovery-modes")
}

type grpcTransportMetrics struct {
requestLatency *prometheus.HistogramVec
}

func newGRPCTransportMetrics(registerer prometheus.Registerer) *grpcTransportMetrics {
return &grpcTransportMetrics{
requestLatency: promauto.With(registerer).NewHistogramVec(prometheus.HistogramOpts{
Namespace: constants.Loki,
Subsystem: "block_builder_grpc",
Name: "request_duration_seconds",
Help: "Time (in seconds) spent serving requests when using the block builder grpc transport",
Buckets: instrument.DefBuckets,
}, []string{"operation", "status_code"}),
}
}

// GRPCTransport implements the Transport interface using gRPC
type GRPCTransport struct {
grpc_health_v1.HealthClient
io.Closer
proto.BlockBuilderServiceClient
}

// NewGRPCTransportFromAddress creates a new gRPC transport instance from an address and dial options
func NewGRPCTransportFromAddress(
metrics *grpcTransportMetrics,
cfg GRPCTransportConfig,
) (*GRPCTransport, error) {

dialOpts, err := cfg.GRPCClientConfig.DialOption(grpcclient.Instrument(metrics.requestLatency))
if err != nil {
return nil, err
}

// nolint:staticcheck // grpc.Dial() has been deprecated; we'll address it before upgrading to gRPC 2.
conn, err := grpc.Dial(cfg.Address, dialOpts...)
if err != nil {
return nil, errors.Wrap(err, "new grpc pool dial")
}

return &GRPCTransport{
Closer: conn,
HealthClient: grpc_health_v1.NewHealthClient(conn),
BlockBuilderServiceClient: proto.NewBlockBuilderServiceClient(conn),
}, nil
}

// SendGetJobRequest implements Transport
func (t *GRPCTransport) SendGetJobRequest(ctx context.Context, req *GetJobRequest) (*GetJobResponse, error) {
protoReq := &proto.GetJobRequest{
BuilderId: req.BuilderID,
}

resp, err := t.GetJob(ctx, protoReq)
if err != nil {
return nil, err
}

return &GetJobResponse{
Job: protoToJob(resp.GetJob()),
OK: resp.GetOk(),
}, nil
}

// SendCompleteJob implements Transport
func (t *GRPCTransport) SendCompleteJob(ctx context.Context, req *CompleteJobRequest) error {
protoReq := &proto.CompleteJobRequest{
BuilderId: req.BuilderID,
Job: jobToProto(req.Job),
}

_, err := t.CompleteJob(ctx, protoReq)
return err
}

// SendSyncJob implements Transport
func (t *GRPCTransport) SendSyncJob(ctx context.Context, req *SyncJobRequest) error {
protoReq := &proto.SyncJobRequest{
BuilderId: req.BuilderID,
Job: jobToProto(req.Job),
}

_, err := t.SyncJob(ctx, protoReq)
return err
}

// protoToJob converts a proto Job to a types.Job
func protoToJob(p *proto.Job) *Job {
if p == nil {
return nil
}
return &Job{
ID: p.GetId(),
Partition: int(p.GetPartition()),
Offsets: Offsets{
Min: p.GetOffsets().GetMin(),
Max: p.GetOffsets().GetMax(),
},
}
}

// jobToProto converts a types.Job to a proto Job
func jobToProto(j *Job) *proto.Job {
if j == nil {
return nil
}
return &proto.Job{
Id: j.ID,
Partition: int32(j.Partition),
Offsets: &proto.Offsets{
Min: j.Offsets.Min,
Max: j.Offsets.Max,
},
}
}
9 changes: 9 additions & 0 deletions pkg/blockbuilder/types/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,15 @@ type Scheduler interface {

// Transport defines the interface for communication between block builders and scheduler
type Transport interface {
BuilderTransport
SchedulerTransport
}

// SchedulerTransport is for calls originating from the scheduler
type SchedulerTransport interface{}

// BuilderTransport is for calls originating from the builder
type BuilderTransport interface {
// SendGetJobRequest sends a request to get a new job
SendGetJobRequest(ctx context.Context, req *GetJobRequest) (*GetJobResponse, error)
// SendCompleteJob sends a job completion notification
Expand Down
4 changes: 1 addition & 3 deletions pkg/blockbuilder/types/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ import "fmt"

// Job represents a block building task.
type Job struct {
ID string
Status JobStatus
ID string
// Partition and offset information
Partition int
Offsets Offsets
Expand All @@ -30,7 +29,6 @@ type Offsets struct {
func NewJob(partition int, offsets Offsets) *Job {
return &Job{
ID: GenerateJobID(partition, offsets),
Status: JobStatusPending,
Partition: partition,
Offsets: offsets,
}
Expand Down
Loading
Loading