Skip to content

Commit 339ba1a

Browse files
authored
feat(blockbuilder): grpc transport (#15218)
1 parent ad322c0 commit 339ba1a

File tree

7 files changed

+2547
-21
lines changed

7 files changed

+2547
-21
lines changed

‎pkg/blockbuilder/scheduler/scheduler_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,15 @@ import (
1515
type testEnv struct {
1616
queue *JobQueue
1717
scheduler *BlockScheduler
18-
transport *builder.MemoryTransport
18+
transport *types.MemoryTransport
1919
builder *builder.Worker
2020
}
2121

2222
func newTestEnv(builderID string) *testEnv {
2323
queue := NewJobQueue()
2424
scheduler := NewScheduler(Config{}, queue, nil, log.NewNopLogger(), prometheus.NewRegistry())
25-
transport := builder.NewMemoryTransport(scheduler)
26-
builder := builder.NewWorker(builderID, builder.NewMemoryTransport(scheduler))
25+
transport := types.NewMemoryTransport(scheduler)
26+
builder := builder.NewWorker(builderID, transport)
2727

2828
return &testEnv{
2929
queue: queue,
@@ -89,7 +89,7 @@ func TestMultipleBuilders(t *testing.T) {
8989
// Create first environment
9090
env1 := newTestEnv("test-builder-1")
9191
// Create second builder using same scheduler
92-
builder2 := builder.NewWorker("test-builder-2", builder.NewMemoryTransport(env1.scheduler))
92+
builder2 := builder.NewWorker("test-builder-2", env1.transport)
9393

9494
ctx := context.Background()
9595

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
package types
2+
3+
import (
4+
"context"
5+
"flag"
6+
"io"
7+
8+
"github.com/grafana/dskit/grpcclient"
9+
"github.com/grafana/dskit/instrument"
10+
"github.com/pkg/errors"
11+
"github.com/prometheus/client_golang/prometheus"
12+
"github.com/prometheus/client_golang/prometheus/promauto"
13+
"google.golang.org/grpc"
14+
"google.golang.org/grpc/health/grpc_health_v1"
15+
16+
"github.com/grafana/loki/v3/pkg/blockbuilder/types/proto"
17+
"github.com/grafana/loki/v3/pkg/util/constants"
18+
)
19+
20+
var _ Transport = &GRPCTransport{}
21+
22+
type GRPCTransportConfig struct {
23+
Address string `yaml:"address,omitempty"`
24+
25+
// GRPCClientConfig configures the gRPC connection between the Bloom Gateway client and the server.
26+
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"`
27+
}
28+
29+
func (cfg *GRPCTransportConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
30+
f.StringVar(&cfg.Address, prefix+"address", "", "address in DNS Service Discovery format: https://grafana.com/docs/mimir/latest/configure/about-dns-service-discovery/#supported-discovery-modes")
31+
}
32+
33+
type grpcTransportMetrics struct {
34+
requestLatency *prometheus.HistogramVec
35+
}
36+
37+
func newGRPCTransportMetrics(registerer prometheus.Registerer) *grpcTransportMetrics {
38+
return &grpcTransportMetrics{
39+
requestLatency: promauto.With(registerer).NewHistogramVec(prometheus.HistogramOpts{
40+
Namespace: constants.Loki,
41+
Subsystem: "block_builder_grpc",
42+
Name: "request_duration_seconds",
43+
Help: "Time (in seconds) spent serving requests when using the block builder grpc transport",
44+
Buckets: instrument.DefBuckets,
45+
}, []string{"operation", "status_code"}),
46+
}
47+
}
48+
49+
// GRPCTransport implements the Transport interface using gRPC
50+
type GRPCTransport struct {
51+
grpc_health_v1.HealthClient
52+
io.Closer
53+
proto.BlockBuilderServiceClient
54+
}
55+
56+
// NewGRPCTransportFromAddress creates a new gRPC transport instance from an address and dial options
57+
func NewGRPCTransportFromAddress(
58+
metrics *grpcTransportMetrics,
59+
cfg GRPCTransportConfig,
60+
) (*GRPCTransport, error) {
61+
62+
dialOpts, err := cfg.GRPCClientConfig.DialOption(grpcclient.Instrument(metrics.requestLatency))
63+
if err != nil {
64+
return nil, err
65+
}
66+
67+
// nolint:staticcheck // grpc.Dial() has been deprecated; we'll address it before upgrading to gRPC 2.
68+
conn, err := grpc.Dial(cfg.Address, dialOpts...)
69+
if err != nil {
70+
return nil, errors.Wrap(err, "new grpc pool dial")
71+
}
72+
73+
return &GRPCTransport{
74+
Closer: conn,
75+
HealthClient: grpc_health_v1.NewHealthClient(conn),
76+
BlockBuilderServiceClient: proto.NewBlockBuilderServiceClient(conn),
77+
}, nil
78+
}
79+
80+
// SendGetJobRequest implements Transport
81+
func (t *GRPCTransport) SendGetJobRequest(ctx context.Context, req *GetJobRequest) (*GetJobResponse, error) {
82+
protoReq := &proto.GetJobRequest{
83+
BuilderId: req.BuilderID,
84+
}
85+
86+
resp, err := t.GetJob(ctx, protoReq)
87+
if err != nil {
88+
return nil, err
89+
}
90+
91+
return &GetJobResponse{
92+
Job: protoToJob(resp.GetJob()),
93+
OK: resp.GetOk(),
94+
}, nil
95+
}
96+
97+
// SendCompleteJob implements Transport
98+
func (t *GRPCTransport) SendCompleteJob(ctx context.Context, req *CompleteJobRequest) error {
99+
protoReq := &proto.CompleteJobRequest{
100+
BuilderId: req.BuilderID,
101+
Job: jobToProto(req.Job),
102+
}
103+
104+
_, err := t.CompleteJob(ctx, protoReq)
105+
return err
106+
}
107+
108+
// SendSyncJob implements Transport
109+
func (t *GRPCTransport) SendSyncJob(ctx context.Context, req *SyncJobRequest) error {
110+
protoReq := &proto.SyncJobRequest{
111+
BuilderId: req.BuilderID,
112+
Job: jobToProto(req.Job),
113+
}
114+
115+
_, err := t.SyncJob(ctx, protoReq)
116+
return err
117+
}
118+
119+
// protoToJob converts a proto Job to a types.Job
120+
func protoToJob(p *proto.Job) *Job {
121+
if p == nil {
122+
return nil
123+
}
124+
return &Job{
125+
ID: p.GetId(),
126+
Partition: int(p.GetPartition()),
127+
Offsets: Offsets{
128+
Min: p.GetOffsets().GetMin(),
129+
Max: p.GetOffsets().GetMax(),
130+
},
131+
}
132+
}
133+
134+
// jobToProto converts a types.Job to a proto Job
135+
func jobToProto(j *Job) *proto.Job {
136+
if j == nil {
137+
return nil
138+
}
139+
return &proto.Job{
140+
Id: j.ID,
141+
Partition: int32(j.Partition),
142+
Offsets: &proto.Offsets{
143+
Min: j.Offsets.Min,
144+
Max: j.Offsets.Max,
145+
},
146+
}
147+
}

‎pkg/blockbuilder/types/interfaces.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,15 @@ type Scheduler interface {
2424

2525
// Transport defines the interface for communication between block builders and scheduler
2626
type Transport interface {
27+
BuilderTransport
28+
SchedulerTransport
29+
}
30+
31+
// SchedulerTransport is for calls originating from the scheduler
32+
type SchedulerTransport interface{}
33+
34+
// BuilderTransport is for calls originating from the builder
35+
type BuilderTransport interface {
2736
// SendGetJobRequest sends a request to get a new job
2837
SendGetJobRequest(ctx context.Context, req *GetJobRequest) (*GetJobResponse, error)
2938
// SendCompleteJob sends a job completion notification

‎pkg/blockbuilder/types/job.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,7 @@ import "fmt"
44

55
// Job represents a block building task.
66
type Job struct {
7-
ID string
8-
Status JobStatus
7+
ID string
98
// Partition and offset information
109
Partition int
1110
Offsets Offsets
@@ -30,7 +29,6 @@ type Offsets struct {
3029
func NewJob(partition int, offsets Offsets) *Job {
3130
return &Job{
3231
ID: GenerateJobID(partition, offsets),
33-
Status: JobStatusPending,
3432
Partition: partition,
3533
Offsets: offsets,
3634
}

0 commit comments

Comments
 (0)