Skip to content

Commit 0cbaa46

Browse files
feat: add limits/frontend pkg (#16658)
1 parent bf99989 commit 0cbaa46

File tree

6 files changed

+2256
-330
lines changed

6 files changed

+2256
-330
lines changed

‎pkg/limits/frontend/client/client.go

Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
// Package client provides gRPC client implementation for limits-frontend.
2+
// An example use case is the distributor, which needs to ask limits-frontend
3+
// if a push request has exceeded per-tenant limits.
4+
package client
5+
6+
import (
7+
"flag"
8+
"io"
9+
"time"
10+
11+
"github.com/go-kit/log"
12+
"github.com/grafana/dskit/grpcclient"
13+
"github.com/grafana/dskit/middleware"
14+
"github.com/grafana/dskit/ring"
15+
ring_client "github.com/grafana/dskit/ring/client"
16+
"github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc"
17+
"github.com/opentracing/opentracing-go"
18+
"github.com/prometheus/client_golang/prometheus"
19+
"github.com/prometheus/client_golang/prometheus/promauto"
20+
"google.golang.org/grpc"
21+
"google.golang.org/grpc/health/grpc_health_v1"
22+
23+
"github.com/grafana/loki/v3/pkg/logproto"
24+
"github.com/grafana/loki/v3/pkg/util/server"
25+
)
26+
27+
var (
28+
LimitsRead = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil)
29+
)
30+
31+
var (
32+
frontendClients = prometheus.NewGauge(prometheus.GaugeOpts{
33+
Name: "loki_ingest_limits_frontend_clients",
34+
Help: "The current number of ingest limits frontend clients.",
35+
})
36+
frontendRequestDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
37+
Name: "loki_ingest_limits_frontend_client_request_duration_seconds",
38+
Help: "Time spent doing ingest limits frontend requests.",
39+
Buckets: prometheus.ExponentialBuckets(0.001, 4, 6),
40+
}, []string{"operation", "status_code"})
41+
)
42+
43+
// Config contains the config for an ingest-limits-frontend client.
44+
type Config struct {
45+
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config" doc:"description=Configures client gRPC connections to limits service."`
46+
PoolConfig PoolConfig `yaml:"pool_config,omitempty" doc:"description=Configures client gRPC connections pool to limits service."`
47+
GRPCUnaryClientInterceptors []grpc.UnaryClientInterceptor `yaml:"-"`
48+
GRCPStreamClientInterceptors []grpc.StreamClientInterceptor `yaml:"-"`
49+
50+
// Internal is used to indicate that this client communicates on behalf of
51+
// a machine and not a user. When Internal = true, the client won't attempt
52+
// to inject an userid into the context.
53+
Internal bool `yaml:"-"`
54+
}
55+
56+
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
57+
cfg.RegisterFlagsWithPrefix("ingest-limits-frontend-client", f)
58+
}
59+
60+
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
61+
cfg.GRPCClientConfig.RegisterFlagsWithPrefix(prefix, f)
62+
cfg.PoolConfig.RegisterFlagsWithPrefix(prefix, f)
63+
}
64+
65+
// PoolConfig contains the config for a pool of ingest-limits-frontend clients.
66+
type PoolConfig struct {
67+
ClientCleanupPeriod time.Duration `yaml:"client_cleanup_period"`
68+
HealthCheckIngestLimits bool `yaml:"health_check_ingest_limits"`
69+
RemoteTimeout time.Duration `yaml:"remote_timeout"`
70+
}
71+
72+
func (cfg *PoolConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
73+
f.DurationVar(&cfg.ClientCleanupPeriod, prefix+".client-cleanup-period", 15*time.Second, "How frequently to clean up clients for ingest-limits-frontend that have gone away.")
74+
f.BoolVar(&cfg.HealthCheckIngestLimits, prefix+".health-check-ingest-limits", true, "Run a health check on each ingest-limits-frontend client during periodic cleanup.")
75+
f.DurationVar(&cfg.RemoteTimeout, prefix+".remote-timeout", 1*time.Second, "Timeout for the health check.")
76+
}
77+
78+
// Client is a gRPC client for the ingest-limits-frontend.
79+
type Client struct {
80+
logproto.IngestLimitsFrontendClient
81+
grpc_health_v1.HealthClient
82+
io.Closer
83+
}
84+
85+
// NewClient returns a new Client for the specified ingest-limits-frontend.
86+
func NewClient(cfg Config, addr string) (*Client, error) {
87+
opts := []grpc.DialOption{
88+
grpc.WithDefaultCallOptions(cfg.GRPCClientConfig.CallOptions()...),
89+
}
90+
dialOpts, err := cfg.GRPCClientConfig.DialOption(getGRPCInterceptors(&cfg))
91+
if err != nil {
92+
return nil, err
93+
}
94+
opts = append(opts, dialOpts...)
95+
// nolint:staticcheck // grpc.Dial() has been deprecated; we'll address it before upgrading to gRPC 2.
96+
conn, err := grpc.Dial(addr, opts...)
97+
if err != nil {
98+
return nil, err
99+
}
100+
return &Client{
101+
IngestLimitsFrontendClient: logproto.NewIngestLimitsFrontendClient(conn),
102+
HealthClient: grpc_health_v1.NewHealthClient(conn),
103+
Closer: conn,
104+
}, nil
105+
}
106+
107+
// getInterceptors returns the gRPC interceptors for the given Config.
108+
func getGRPCInterceptors(cfg *Config) ([]grpc.UnaryClientInterceptor, []grpc.StreamClientInterceptor) {
109+
var (
110+
unaryInterceptors []grpc.UnaryClientInterceptor
111+
streamInterceptors []grpc.StreamClientInterceptor
112+
)
113+
114+
unaryInterceptors = append(unaryInterceptors, cfg.GRPCUnaryClientInterceptors...)
115+
unaryInterceptors = append(unaryInterceptors, server.UnaryClientQueryTagsInterceptor)
116+
unaryInterceptors = append(unaryInterceptors, server.UnaryClientHTTPHeadersInterceptor)
117+
unaryInterceptors = append(unaryInterceptors, otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()))
118+
if !cfg.Internal {
119+
unaryInterceptors = append(unaryInterceptors, middleware.ClientUserHeaderInterceptor)
120+
}
121+
unaryInterceptors = append(unaryInterceptors, middleware.UnaryClientInstrumentInterceptor(frontendRequestDuration))
122+
123+
streamInterceptors = append(streamInterceptors, cfg.GRCPStreamClientInterceptors...)
124+
streamInterceptors = append(streamInterceptors, server.StreamClientQueryTagsInterceptor)
125+
streamInterceptors = append(streamInterceptors, server.StreamClientHTTPHeadersInterceptor)
126+
streamInterceptors = append(streamInterceptors, otgrpc.OpenTracingStreamClientInterceptor(opentracing.GlobalTracer()))
127+
if !cfg.Internal {
128+
streamInterceptors = append(streamInterceptors, middleware.StreamClientUserHeaderInterceptor)
129+
}
130+
streamInterceptors = append(streamInterceptors, middleware.StreamClientInstrumentInterceptor(frontendRequestDuration))
131+
132+
return unaryInterceptors, streamInterceptors
133+
}
134+
135+
// NewPool returns a new pool of clients for the ingest-limits-frontend.
136+
func NewPool(
137+
name string,
138+
cfg PoolConfig,
139+
ring ring.ReadRing,
140+
factory ring_client.PoolFactory,
141+
logger log.Logger,
142+
) *ring_client.Pool {
143+
poolCfg := ring_client.PoolConfig{
144+
CheckInterval: cfg.ClientCleanupPeriod,
145+
HealthCheckEnabled: cfg.HealthCheckIngestLimits,
146+
HealthCheckTimeout: cfg.RemoteTimeout,
147+
}
148+
return ring_client.NewPool(
149+
name,
150+
poolCfg,
151+
ring_client.NewRingServiceDiscovery(ring),
152+
factory,
153+
frontendClients,
154+
logger,
155+
)
156+
}
157+
158+
// NewPoolFactory returns a new factory for ingest-limits-frontend clients.
159+
func NewPoolFactory(cfg Config) ring_client.PoolFactory {
160+
return ring_client.PoolAddrFunc(func(addr string) (ring_client.PoolClient, error) {
161+
return NewClient(cfg, addr)
162+
})
163+
}

‎pkg/limits/frontend/frontend.go

Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
// Package frontend contains provides a frontend service for ingest limits.
2+
// It is responsible for receiving and answering gRPC requests from distributors,
3+
// such as exceeds limits requests, forwarding them to individual limits backends,
4+
// gathering and aggregating their responses (where required), and returning
5+
// the final result.
6+
package frontend
7+
8+
import (
9+
"context"
10+
"encoding/json"
11+
"flag"
12+
"fmt"
13+
"net/http"
14+
15+
"github.com/go-kit/log"
16+
"github.com/go-kit/log/level"
17+
"github.com/grafana/dskit/ring"
18+
"github.com/grafana/dskit/services"
19+
"github.com/grafana/dskit/user"
20+
"github.com/prometheus/client_golang/prometheus"
21+
22+
limits_client "github.com/grafana/loki/v3/pkg/limits/client"
23+
"github.com/grafana/loki/v3/pkg/logproto"
24+
"github.com/grafana/loki/v3/pkg/util"
25+
util_log "github.com/grafana/loki/v3/pkg/util/log"
26+
)
27+
28+
const (
29+
RingKey = "ingest-limits-frontend"
30+
RingName = "ingest-limits-frontend"
31+
)
32+
33+
// Config contains the config for an ingest-limits-frontend.
34+
type Config struct {
35+
ClientConfig limits_client.Config `yaml:"client_config"`
36+
LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler,omitempty"`
37+
}
38+
39+
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
40+
cfg.ClientConfig.RegisterFlagsWithPrefix("ingest-limits-frontend", f)
41+
cfg.LifecyclerConfig.RegisterFlagsWithPrefix("ingest-limits-frontend.", f, util_log.Logger)
42+
}
43+
44+
func (cfg *Config) Validate() error {
45+
if err := cfg.ClientConfig.GRPCClientConfig.Validate(); err != nil {
46+
return fmt.Errorf("invalid gRPC client config: %w", err)
47+
}
48+
return nil
49+
}
50+
51+
// Frontend is the limits-frontend service, and acts a service wrapper for
52+
// all components needed to run the limits-frontend.
53+
type Frontend struct {
54+
services.Service
55+
56+
cfg Config
57+
logger log.Logger
58+
59+
subservices *services.Manager
60+
subservicesWatcher *services.FailureWatcher
61+
62+
limits IngestLimitsService
63+
64+
lifecycler *ring.Lifecycler
65+
lifecyclerWatcher *services.FailureWatcher
66+
}
67+
68+
// New returns a new Frontend.
69+
func New(cfg Config, ringName string, limitsRing ring.ReadRing, limits Limits, logger log.Logger, reg prometheus.Registerer) (*Frontend, error) {
70+
var servs []services.Service
71+
72+
factory := limits_client.NewPoolFactory(cfg.ClientConfig)
73+
pool := limits_client.NewPool(ringName, cfg.ClientConfig.PoolConfig, limitsRing, factory, logger)
74+
limitsSrv := NewRingIngestLimitsService(limitsRing, pool, limits, logger, reg)
75+
76+
f := &Frontend{
77+
cfg: cfg,
78+
logger: logger,
79+
limits: limitsSrv,
80+
}
81+
82+
var err error
83+
f.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, f, RingName, RingKey, true, logger, reg)
84+
if err != nil {
85+
return nil, fmt.Errorf("failed to create %s lifecycler: %w", RingName, err)
86+
}
87+
// Watch the lifecycler
88+
f.lifecyclerWatcher = services.NewFailureWatcher()
89+
f.lifecyclerWatcher.WatchService(f.lifecycler)
90+
91+
servs = append(servs, f.lifecycler)
92+
servs = append(servs, pool)
93+
mgr, err := services.NewManager(servs...)
94+
if err != nil {
95+
return nil, err
96+
}
97+
98+
f.subservices = mgr
99+
f.subservicesWatcher = services.NewFailureWatcher()
100+
f.subservicesWatcher.WatchManager(f.subservices)
101+
f.Service = services.NewBasicService(f.starting, f.running, f.stopping)
102+
103+
return f, nil
104+
}
105+
106+
// Flush implements ring.FlushTransferer. It transfers state to another ingest limits frontend instance.
107+
func (f *Frontend) Flush() {}
108+
109+
// TransferOut implements ring.FlushTransferer. It transfers state to another ingest limits frontend instance.
110+
func (f *Frontend) TransferOut(_ context.Context) error {
111+
return nil
112+
}
113+
114+
// starting implements services.Service.
115+
func (f *Frontend) starting(ctx context.Context) (err error) {
116+
defer func() {
117+
if err == nil {
118+
return
119+
}
120+
stopErr := services.StopManagerAndAwaitStopped(context.Background(), f.subservices)
121+
if stopErr != nil {
122+
level.Error(f.logger).Log("msg", "failed to stop subservices", "err", stopErr)
123+
}
124+
}()
125+
126+
level.Info(f.logger).Log("msg", "starting subservices")
127+
if err := services.StartManagerAndAwaitHealthy(ctx, f.subservices); err != nil {
128+
return fmt.Errorf("failed to start subservices: %w", err)
129+
}
130+
131+
return nil
132+
}
133+
134+
// running implements services.Service.
135+
func (f *Frontend) running(ctx context.Context) error {
136+
select {
137+
case <-ctx.Done():
138+
return nil
139+
case err := <-f.subservicesWatcher.Chan():
140+
return fmt.Errorf("ingest limits frontend subservice failed: %w", err)
141+
}
142+
}
143+
144+
// stopping implements services.Service.
145+
func (f *Frontend) stopping(_ error) error {
146+
return services.StopManagerAndAwaitStopped(context.Background(), f.subservices)
147+
}
148+
149+
// ExceedsLimits implements logproto.IngestLimitsFrontendClient.
150+
func (f *Frontend) ExceedsLimits(ctx context.Context, r *logproto.ExceedsLimitsRequest) (*logproto.ExceedsLimitsResponse, error) {
151+
return f.limits.ExceedsLimits(ctx, r)
152+
}
153+
154+
type exceedsLimitsRequest struct {
155+
TenantID string `json:"tenantID"`
156+
StreamHashes []uint64 `json:"streamHashes"`
157+
}
158+
159+
type exceedsLimitsResponse struct {
160+
RejectedStreams []*logproto.RejectedStream `json:"rejectedStreams,omitempty"`
161+
}
162+
163+
// ServeHTTP implements http.Handler.
164+
func (f *Frontend) ServeHTTP(w http.ResponseWriter, r *http.Request) {
165+
var req exceedsLimitsRequest
166+
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
167+
level.Error(f.logger).Log("msg", "error unmarshalling request body", "err", err)
168+
http.Error(w, "error unmarshalling request body", http.StatusBadRequest)
169+
return
170+
}
171+
172+
if req.TenantID == "" {
173+
http.Error(w, "tenantID is required", http.StatusBadRequest)
174+
return
175+
}
176+
177+
// Convert request to protobuf format
178+
protoReq := &logproto.ExceedsLimitsRequest{
179+
Tenant: req.TenantID,
180+
Streams: make([]*logproto.StreamMetadata, len(req.StreamHashes)),
181+
}
182+
for i, hash := range req.StreamHashes {
183+
protoReq.Streams[i] = &logproto.StreamMetadata{
184+
StreamHash: hash,
185+
}
186+
}
187+
188+
ctx, err := user.InjectIntoGRPCRequest(user.InjectOrgID(r.Context(), req.TenantID))
189+
if err != nil {
190+
http.Error(w, "failed to inject org ID", http.StatusInternalServerError)
191+
return
192+
}
193+
194+
// Call the service
195+
resp, err := f.limits.ExceedsLimits(ctx, protoReq)
196+
if err != nil {
197+
level.Error(f.logger).Log("msg", "error checking limits", "err", err)
198+
http.Error(w, "error checking limits", http.StatusInternalServerError)
199+
return
200+
}
201+
202+
// Convert response to JSON format
203+
jsonResp := exceedsLimitsResponse{
204+
RejectedStreams: resp.RejectedStreams,
205+
}
206+
207+
util.WriteJSONResponse(w, jsonResp)
208+
}

0 commit comments

Comments
 (0)