Skip to content

Commit 127b609

Browse files
feat: add limits package (#16489)
1 parent f1ac3c4 commit 127b609

File tree

7 files changed

+3495
-433
lines changed

7 files changed

+3495
-433
lines changed

‎pkg/kafka/encoding.go

Lines changed: 53 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,17 @@ import (
1616
"github.com/grafana/loki/v3/pkg/logql/syntax"
1717
)
1818

19-
var encoderPool = sync.Pool{
20-
New: func() any {
21-
return &logproto.Stream{}
22-
},
23-
}
19+
const (
20+
metadataTopicSuffix = ".metadata"
21+
)
22+
23+
var (
24+
encoderPool = sync.Pool{
25+
New: func() any {
26+
return &logproto.Stream{}
27+
},
28+
}
29+
)
2430

2531
// Encode converts a logproto.Stream into one or more Kafka records.
2632
// It handles splitting large streams into multiple records if necessary.
@@ -188,3 +194,45 @@ func (d *Decoder) DecodeWithoutLabels(data []byte) (logproto.Stream, error) {
188194
func sovPush(x uint64) (n int) {
189195
return (math_bits.Len64(x|1) + 6) / 7
190196
}
197+
198+
// EncodeStreamMetadata encodes the stream metadata into a Kafka record
199+
// using the tenantID as the key and partition as the target partition
200+
func EncodeStreamMetadata(partition int32, topic string, tenantID string, streamHash uint64) (*kgo.Record, error) {
201+
// Validate stream hash
202+
if streamHash == 0 {
203+
return nil, fmt.Errorf("invalid stream hash '%d'", streamHash)
204+
}
205+
206+
metadata := logproto.StreamMetadata{
207+
StreamHash: streamHash,
208+
}
209+
210+
// Encode the metadata into a byte slice
211+
value, err := metadata.Marshal()
212+
if err != nil {
213+
return nil, err
214+
}
215+
216+
return &kgo.Record{
217+
Key: []byte(tenantID),
218+
Value: value,
219+
Partition: partition,
220+
Topic: MetadataTopicFor(topic),
221+
}, nil
222+
}
223+
224+
// DecodeStreamMetadata decodes a Kafka record into a StreamMetadata.
225+
// It returns the decoded metadata and any error encountered.
226+
func DecodeStreamMetadata(record *kgo.Record) (*logproto.StreamMetadata, error) {
227+
var metadata logproto.StreamMetadata
228+
if err := metadata.Unmarshal(record.Value); err != nil {
229+
return nil, fmt.Errorf("failed to unmarshal stream metadata: %w", err)
230+
}
231+
232+
return &metadata, nil
233+
}
234+
235+
// MetadataTopicFor returns the metadata topic name for the given topic.
236+
func MetadataTopicFor(topic string) string {
237+
return topic + metadataTopicSuffix
238+
}

‎pkg/kafka/encoding_test.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77

88
"github.com/prometheus/prometheus/model/labels"
99
"github.com/stretchr/testify/require"
10+
"github.com/twmb/franz-go/pkg/kgo"
1011

1112
"github.com/grafana/loki/v3/pkg/logproto"
1213
)
@@ -149,3 +150,66 @@ func generateRandomString(length int) string {
149150
}
150151
return string(b)
151152
}
153+
154+
func TestEncodeDecodeStreamMetadata(t *testing.T) {
155+
tests := []struct {
156+
name string
157+
hash uint64
158+
partition int32
159+
topic string
160+
tenantID string
161+
expectErr bool
162+
}{
163+
{
164+
name: "Valid metadata",
165+
hash: 12345,
166+
partition: 1,
167+
topic: "logs",
168+
tenantID: "tenant-1",
169+
expectErr: false,
170+
},
171+
{
172+
name: "Zero hash - should error",
173+
hash: 0,
174+
partition: 3,
175+
topic: "traces",
176+
tenantID: "tenant-3",
177+
expectErr: true,
178+
},
179+
}
180+
181+
for _, tt := range tests {
182+
t.Run(tt.name, func(t *testing.T) {
183+
// Encode metadata
184+
record, err := EncodeStreamMetadata(tt.partition, tt.topic, tt.tenantID, tt.hash)
185+
if tt.expectErr {
186+
require.Error(t, err)
187+
require.Nil(t, record)
188+
return
189+
}
190+
191+
require.NotNil(t, record)
192+
require.NotNil(t, record.Value)
193+
require.Equal(t, tt.topic+metadataTopicSuffix, record.Topic)
194+
require.Equal(t, tt.partition, record.Partition)
195+
require.Equal(t, []byte(tt.tenantID), record.Key)
196+
197+
// Decode metadata
198+
metadata, err := DecodeStreamMetadata(record)
199+
require.NoError(t, err)
200+
require.NotNil(t, metadata)
201+
202+
// Verify decoded values
203+
require.Equal(t, tt.hash, metadata.StreamHash)
204+
})
205+
}
206+
207+
t.Run("Decode invalid value", func(t *testing.T) {
208+
record := &kgo.Record{
209+
Value: []byte("invalid data"),
210+
}
211+
metadata, err := DecodeStreamMetadata(record)
212+
require.Error(t, err)
213+
require.Nil(t, metadata)
214+
})
215+
}

‎pkg/limits/client/client.go

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
// Package client provides gRPC client implementation for limits service.
2+
package client
3+
4+
import (
5+
"flag"
6+
"io"
7+
"time"
8+
9+
"github.com/go-kit/log"
10+
"github.com/grafana/dskit/grpcclient"
11+
"github.com/grafana/dskit/middleware"
12+
"github.com/grafana/dskit/ring"
13+
ring_client "github.com/grafana/dskit/ring/client"
14+
"github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc"
15+
"github.com/opentracing/opentracing-go"
16+
"github.com/prometheus/client_golang/prometheus"
17+
"github.com/prometheus/client_golang/prometheus/promauto"
18+
"google.golang.org/grpc"
19+
"google.golang.org/grpc/health/grpc_health_v1"
20+
21+
"github.com/grafana/loki/v3/pkg/logproto"
22+
"github.com/grafana/loki/v3/pkg/util/server"
23+
)
24+
25+
var (
26+
backendClients = prometheus.NewGauge(prometheus.GaugeOpts{
27+
Name: "loki_ingest_limits_backend_clients",
28+
Help: "The current number of ingest limits backend clients.",
29+
})
30+
backendRequestDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
31+
Name: "loki_ingest_limits_backend_client_request_duration_seconds",
32+
Help: "Time spent doing ingest limits backend requests.",
33+
Buckets: prometheus.ExponentialBuckets(0.001, 4, 6),
34+
}, []string{"operation", "status_code"})
35+
)
36+
37+
// Config contains the config for an ingest-limits client.
38+
type Config struct {
39+
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config" doc:"description=Configures client gRPC connections to limits service."`
40+
PoolConfig PoolConfig `yaml:"pool_config,omitempty" doc:"description=Configures client gRPC connections pool to limits service."`
41+
GRPCUnaryClientInterceptors []grpc.UnaryClientInterceptor `yaml:"-"`
42+
GRCPStreamClientInterceptors []grpc.StreamClientInterceptor `yaml:"-"`
43+
44+
// Internal is used to indicate that this client communicates on behalf of
45+
// a machine and not a user. When Internal = true, the client won't attempt
46+
// to inject an userid into the context.
47+
Internal bool `yaml:"-"`
48+
}
49+
50+
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
51+
cfg.GRPCClientConfig.RegisterFlagsWithPrefix(prefix+".limits-client", f)
52+
cfg.PoolConfig.RegisterFlagsWithPrefix(prefix, f)
53+
}
54+
55+
// PoolConfig contains the config for a pool of ingest-limits clients.
56+
type PoolConfig struct {
57+
ClientCleanupPeriod time.Duration `yaml:"client_cleanup_period"`
58+
HealthCheckIngestLimits bool `yaml:"health_check_ingest_limits"`
59+
RemoteTimeout time.Duration `yaml:"remote_timeout"`
60+
}
61+
62+
func (cfg *PoolConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
63+
f.DurationVar(&cfg.ClientCleanupPeriod, prefix+".client-cleanup-period", 15*time.Second, "How frequently to clean up clients for ingest-limits that have gone away.")
64+
f.BoolVar(&cfg.HealthCheckIngestLimits, prefix+".health-check-ingest-limits", true, "Run a health check on each ingest-limits client during periodic cleanup.")
65+
f.DurationVar(&cfg.RemoteTimeout, prefix+".remote-timeout", 1*time.Second, "Timeout for the health check.")
66+
}
67+
68+
// Client is a gRPC client for the ingest-limits.
69+
type Client struct {
70+
logproto.IngestLimitsClient
71+
grpc_health_v1.HealthClient
72+
io.Closer
73+
}
74+
75+
// NewClient returns a new Client for the specified ingest-limits.
76+
func NewClient(cfg Config, addr string) (*Client, error) {
77+
opts := []grpc.DialOption{
78+
grpc.WithDefaultCallOptions(cfg.GRPCClientConfig.CallOptions()...),
79+
}
80+
dialOpts, err := cfg.GRPCClientConfig.DialOption(getGRPCInterceptors(&cfg))
81+
if err != nil {
82+
return nil, err
83+
}
84+
opts = append(opts, dialOpts...)
85+
// nolint:staticcheck // grpc.Dial() has been deprecated; we'll address it before upgrading to gRPC 2.
86+
conn, err := grpc.Dial(addr, opts...)
87+
if err != nil {
88+
return nil, err
89+
}
90+
return &Client{
91+
IngestLimitsClient: logproto.NewIngestLimitsClient(conn),
92+
HealthClient: grpc_health_v1.NewHealthClient(conn),
93+
Closer: conn,
94+
}, nil
95+
}
96+
97+
// getInterceptors returns the gRPC interceptors for the given ClientConfig.
98+
func getGRPCInterceptors(cfg *Config) ([]grpc.UnaryClientInterceptor, []grpc.StreamClientInterceptor) {
99+
var (
100+
unaryInterceptors []grpc.UnaryClientInterceptor
101+
streamInterceptors []grpc.StreamClientInterceptor
102+
)
103+
104+
unaryInterceptors = append(unaryInterceptors, cfg.GRPCUnaryClientInterceptors...)
105+
unaryInterceptors = append(unaryInterceptors, server.UnaryClientQueryTagsInterceptor)
106+
unaryInterceptors = append(unaryInterceptors, server.UnaryClientHTTPHeadersInterceptor)
107+
unaryInterceptors = append(unaryInterceptors, otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()))
108+
if !cfg.Internal {
109+
unaryInterceptors = append(unaryInterceptors, middleware.ClientUserHeaderInterceptor)
110+
}
111+
unaryInterceptors = append(unaryInterceptors, middleware.UnaryClientInstrumentInterceptor(backendRequestDuration))
112+
113+
streamInterceptors = append(streamInterceptors, cfg.GRCPStreamClientInterceptors...)
114+
streamInterceptors = append(streamInterceptors, server.StreamClientQueryTagsInterceptor)
115+
streamInterceptors = append(streamInterceptors, server.StreamClientHTTPHeadersInterceptor)
116+
streamInterceptors = append(streamInterceptors, otgrpc.OpenTracingStreamClientInterceptor(opentracing.GlobalTracer()))
117+
if !cfg.Internal {
118+
streamInterceptors = append(streamInterceptors, middleware.StreamClientUserHeaderInterceptor)
119+
}
120+
streamInterceptors = append(streamInterceptors, middleware.StreamClientInstrumentInterceptor(backendRequestDuration))
121+
122+
return unaryInterceptors, streamInterceptors
123+
}
124+
125+
// NewPool returns a new pool of clients for the ingest-limits.
126+
func NewPool(
127+
name string,
128+
cfg PoolConfig,
129+
ring ring.ReadRing,
130+
factory ring_client.PoolFactory,
131+
logger log.Logger,
132+
) *ring_client.Pool {
133+
poolCfg := ring_client.PoolConfig{
134+
CheckInterval: cfg.ClientCleanupPeriod,
135+
HealthCheckEnabled: cfg.HealthCheckIngestLimits,
136+
HealthCheckTimeout: cfg.RemoteTimeout,
137+
}
138+
return ring_client.NewPool(
139+
name,
140+
poolCfg,
141+
ring_client.NewRingServiceDiscovery(ring),
142+
factory,
143+
backendClients,
144+
logger,
145+
)
146+
}
147+
148+
// NewPoolFactory returns a new factory for ingest-limits clients.
149+
func NewPoolFactory(cfg Config) ring_client.PoolFactory {
150+
return ring_client.PoolAddrFunc(func(addr string) (ring_client.PoolClient, error) {
151+
return NewClient(cfg, addr)
152+
})
153+
}

0 commit comments

Comments
 (0)