Skip to content
Prev Previous commit
Next Next commit
wire transport
  • Loading branch information
ashwanthgoli committed Dec 3, 2024
commit 995052460784e9d6668e040e4e0257d945be71e2
41 changes: 27 additions & 14 deletions pkg/blockbuilder/builder/slimgester.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/grpcclient"
"github.com/grafana/dskit/services"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -55,6 +56,10 @@
Backoff backoff.Config `yaml:"backoff_config"`
WorkerParallelism int `yaml:"worker_parallelism"`
SyncInterval time.Duration `yaml:"sync_interval"`

SchedulerAddress string `yaml:"scheduler_address"`
// SchedulerGRPCClientConfig configures the gRPC connection between the block-builder and its scheduler.
SchedulerGRPCClientConfig grpcclient.Config `yaml:"scheduler_grpc_client_config"`
}

func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
Expand All @@ -68,6 +73,9 @@
f.DurationVar(&cfg.MaxChunkAge, prefix+"max-chunk-age", 2*time.Hour, "The maximum duration of a timeseries chunk in memory. If a timeseries runs for longer than this, the current chunk will be flushed to the store and a new chunk created.")
f.DurationVar(&cfg.SyncInterval, prefix+"sync-interval", 30*time.Second, "The interval at which to sync job status with the scheduler.")
f.IntVar(&cfg.WorkerParallelism, prefix+"worker-parallelism", 1, "The number of workers to run in parallel to process jobs.")
f.StringVar(&cfg.SchedulerAddress, prefix+"scheduler-address", "", "Address of the scheduler in the format described here: https://github.com/grpc/grpc/blob/master/doc/naming.md")

cfg.SchedulerGRPCClientConfig.RegisterFlagsWithPrefix(prefix+"scheduler-grpc-client.", f)
cfg.Backoff.RegisterFlagsWithPrefix(prefix+"backoff.", f)
}

Expand Down Expand Up @@ -107,6 +115,7 @@
// containing all chunk references. Finally, clears internal state.
type BlockBuilder struct {
services.Service
types.BuilderTransport

id string
cfg Config
Expand All @@ -122,7 +131,6 @@

jobsMtx sync.RWMutex
inflightJobs map[string]*types.Job
transport types.BuilderTransport
}

func NewBlockBuilder(
Expand All @@ -141,17 +149,22 @@
return nil, err
}

t, err := types.NewGRPCTransportFromAddress(cfg.SchedulerAddress, cfg.SchedulerGRPCClientConfig, reg)
if err != nil {
return nil, fmt.Errorf("create grpc transport: %w", err)
}

i := &BlockBuilder{
id: id,
cfg: cfg,
periodicConfigs: periodicConfigs,
metrics: NewSlimgesterMetrics(reg),
logger: logger,
decoder: decoder,
readerFactory: readerFactory,
store: store,
objStore: objStore,
// TODO: wire transport
id: id,
cfg: cfg,
periodicConfigs: periodicConfigs,
metrics: NewSlimgesterMetrics(reg),
logger: logger,
decoder: decoder,
readerFactory: readerFactory,
store: store,
objStore: objStore,
BuilderTransport: t,
}

i.Service = services.NewBasicService(nil, i.running, nil)
Expand Down Expand Up @@ -208,7 +221,7 @@
defer i.jobsMtx.RUnlock()

for _, job := range i.inflightJobs {
if err := i.transport.SendSyncJob(ctx, &types.SyncJobRequest{
if err := i.SendSyncJob(ctx, &types.SyncJobRequest{
BuilderID: i.id,
Job: job,
}); err != nil {
Expand All @@ -221,7 +234,7 @@

func (i *BlockBuilder) runOne(ctx context.Context, workerID string) error {
// assuming GetJob blocks/polls until a job is available
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect we'll need to retry when there are no jobs here, but as you said it's also possible the transport handles this

resp, err := i.transport.SendGetJobRequest(ctx, &types.GetJobRequest{
resp, err := i.SendGetJobRequest(ctx, &types.GetJobRequest{
BuilderID: workerID,
})
if err != nil {
Expand Down Expand Up @@ -255,7 +268,7 @@
ctx,
i.cfg.Backoff,
func() (res struct{}, err error) {
if err = i.transport.SendCompleteJob(ctx, &types.CompleteJobRequest{
if err = i.SendCompleteJob(ctx, &types.CompleteJobRequest{
BuilderID: workerID,
Job: job,
}); err != nil {
Expand Down Expand Up @@ -484,7 +497,7 @@
return lastOffset, nil
}

func (b *BlockBuilder) loadRecords(ctx context.Context, partitionID int32, offsets types.Offsets, ch chan<- []AppendInput) (int64, error) {

Check warning on line 500 in pkg/blockbuilder/builder/slimgester.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

receiver-naming: receiver name b should be consistent with previous receiver name i for BlockBuilder (revive)
f, err := b.readerFactory(partitionID)
if err != nil {
return 0, err
Expand Down
23 changes: 5 additions & 18 deletions pkg/blockbuilder/types/grpc_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package types

import (
"context"
"flag"
"io"

"github.com/grafana/dskit/grpcclient"
Expand All @@ -19,17 +18,6 @@ import (

var _ Transport = &GRPCTransport{}

type GRPCTransportConfig struct {
Address string `yaml:"address,omitempty"`

// GRPCClientConfig configures the gRPC connection between the block-builder and its scheduler.
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"`
}

func (cfg *GRPCTransportConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&cfg.Address, prefix+"address", "", "address in DNS Service Discovery format: https://grafana.com/docs/mimir/latest/configure/about-dns-service-discovery/#supported-discovery-modes")
}

type grpcTransportMetrics struct {
requestLatency *prometheus.HistogramVec
}
Expand All @@ -55,17 +43,16 @@ type GRPCTransport struct {

// NewGRPCTransportFromAddress creates a new gRPC transport instance from an address and dial options
func NewGRPCTransportFromAddress(
metrics *grpcTransportMetrics,
cfg GRPCTransportConfig,
address string,
cfg grpcclient.Config,
reg prometheus.Registerer,
) (*GRPCTransport, error) {

dialOpts, err := cfg.GRPCClientConfig.DialOption(grpcclient.Instrument(metrics.requestLatency))
dialOpts, err := cfg.DialOption(grpcclient.Instrument(newGRPCTransportMetrics(reg).requestLatency))
if err != nil {
return nil, err
}

// nolint:staticcheck // grpc.Dial() has been deprecated; we'll address it before upgrading to gRPC 2.
conn, err := grpc.Dial(cfg.Address, dialOpts...)
conn, err := grpc.NewClient(address, dialOpts...)
if err != nil {
return nil, errors.Wrap(err, "new grpc pool dial")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/blockbuilder/types/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import "fmt"
type Job struct {
ID string
// Partition and offset information
Partition int32
Partition int
Offsets Offsets
}

Expand Down
Loading