Skip to content

Commit b406015

Browse files
salvacortschaudum
andauthored
feat(blooms): Prefetch bloom blocks as soon as they are built (#15050)
Signed-off-by: Christian Haudum <christian.haudum@gmail.com> Co-authored-by: Christian Haudum <christian.haudum@gmail.com>
1 parent 2ae1ead commit b406015

File tree

16 files changed

+694
-66
lines changed

16 files changed

+694
-66
lines changed

‎docs/sources/shared/configuration.md

+4
Original file line numberDiff line numberDiff line change
@@ -3816,6 +3816,10 @@ shard_streams:
38163816
# CLI flag: -bloom-build.block-encoding
38173817
[bloom_block_encoding: <string> | default = "none"]
38183818

3819+
# Experimental. Prefetch blocks on bloom gateways as soon as they are built.
3820+
# CLI flag: -bloom-build.prefetch-blocks
3821+
[bloom_prefetch_blocks: <boolean> | default = false]
3822+
38193823
# Experimental. The maximum bloom block size. A value of 0 sets an unlimited
38203824
# size. Default is 200MB. The actual block size might exceed this limit since
38213825
# blooms will be added to blocks until the block exceeds the maximum block size.

‎pkg/bloombuild/builder/builder.go

+20-9
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323

2424
"github.com/grafana/loki/v3/pkg/bloombuild/common"
2525
"github.com/grafana/loki/v3/pkg/bloombuild/protos"
26+
"github.com/grafana/loki/v3/pkg/bloomgateway"
2627
"github.com/grafana/loki/v3/pkg/compression"
2728
iter "github.com/grafana/loki/v3/pkg/iter/v2"
2829
"github.com/grafana/loki/v3/pkg/storage"
@@ -48,8 +49,9 @@ type Builder struct {
4849
metrics *Metrics
4950
logger log.Logger
5051

51-
bloomStore bloomshipper.Store
52-
chunkLoader ChunkLoader
52+
bloomStore bloomshipper.Store
53+
chunkLoader ChunkLoader
54+
bloomGateway bloomgateway.Client
5355

5456
client protos.PlannerForBuilderClient
5557

@@ -66,6 +68,7 @@ func New(
6668
_ storage.ClientMetrics,
6769
fetcherProvider stores.ChunkFetcherProvider,
6870
bloomStore bloomshipper.Store,
71+
bloomGateway bloomgateway.Client,
6972
logger log.Logger,
7073
r prometheus.Registerer,
7174
rm *ring.RingManager,
@@ -77,13 +80,14 @@ func New(
7780

7881
metrics := NewMetrics(r)
7982
b := &Builder{
80-
ID: builderID,
81-
cfg: cfg,
82-
limits: limits,
83-
metrics: metrics,
84-
bloomStore: bloomStore,
85-
chunkLoader: NewStoreChunkLoader(fetcherProvider, metrics),
86-
logger: logger,
83+
ID: builderID,
84+
cfg: cfg,
85+
limits: limits,
86+
metrics: metrics,
87+
bloomStore: bloomStore,
88+
chunkLoader: NewStoreChunkLoader(fetcherProvider, metrics),
89+
bloomGateway: bloomGateway,
90+
logger: logger,
8791
}
8892

8993
if rm != nil {
@@ -519,6 +523,13 @@ func (b *Builder) processTask(
519523
b.metrics.metasCreated.Inc()
520524
level.Debug(logger).Log("msg", "uploaded meta")
521525
created = append(created, meta)
526+
527+
// Now that the meta is written thus blocks can be queried, we prefetch them to the gateway
528+
if b.bloomGateway != nil && b.limits.PrefetchBloomBlocks(tenant) {
529+
if err := b.bloomGateway.PrefetchBloomBlocks(ctx, meta.Blocks); err != nil {
530+
level.Error(logger).Log("msg", "failed to prefetch block on gateway", "err", err)
531+
}
532+
}
522533
}
523534

524535
b.metrics.seriesPerTask.Observe(float64(totalSeries))

‎pkg/bloombuild/builder/builder_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ func setupBuilder(t *testing.T, plannerAddr string, limits Limits, logger log.Lo
7575
metrics := storage.NewClientMetrics()
7676
metrics.Unregister()
7777

78-
builder, err := New(cfg, limits, schemaCfg, storageCfg, metrics, nil, fakeBloomStore{}, logger, prometheus.NewPedanticRegistry(), nil)
78+
builder, err := New(cfg, limits, schemaCfg, storageCfg, metrics, nil, fakeBloomStore{}, nil, logger, prometheus.NewPedanticRegistry(), nil)
7979
require.NoError(t, err)
8080

8181
return builder

‎pkg/bloombuild/builder/config.go

+1
Original file line numberDiff line numberDiff line change
@@ -42,4 +42,5 @@ type Limits interface {
4242
BloomMaxBlockSize(tenantID string) int
4343
BloomMaxBloomSize(tenantID string) int
4444
BuilderResponseTimeout(tenantID string) time.Duration
45+
PrefetchBloomBlocks(tenantID string) bool
4546
}

‎pkg/bloomgateway/bloomgateway.go

+53-8
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,43 @@ func (g *Gateway) stopping(_ error) error {
161161
return services.StopManagerAndAwaitStopped(context.Background(), g.serviceMngr)
162162
}
163163

164+
func (g *Gateway) PrefetchBloomBlocks(_ context.Context, req *logproto.PrefetchBloomBlocksRequest) (*logproto.PrefetchBloomBlocksResponse, error) {
165+
refs, err := decodeBlockKeys(req.Blocks)
166+
if err != nil {
167+
return nil, err
168+
}
169+
170+
bqs, err := g.bloomStore.FetchBlocks(
171+
// We don't use the ctx passed to the handler since its canceled when the handler returns
172+
context.Background(),
173+
refs,
174+
bloomshipper.WithFetchAsync(true),
175+
bloomshipper.WithIgnoreNotFound(true),
176+
bloomshipper.WithCacheGetOptions(
177+
bloomshipper.WithSkipHitMissMetrics(true),
178+
),
179+
)
180+
if err != nil {
181+
g.metrics.prefetchedBlocks.WithLabelValues(typeError).Add(float64(len(refs)))
182+
return nil, err
183+
}
184+
185+
for _, bq := range bqs {
186+
if bq == nil {
187+
// This is the expected case: the blocks is not yet downloaded and the block querier is nil
188+
continue
189+
}
190+
191+
// Close any block querier that were already downloaded
192+
if err := bq.Close(); err != nil {
193+
level.Warn(g.logger).Log("msg", "failed to close block querier", "err", err)
194+
}
195+
}
196+
197+
g.metrics.prefetchedBlocks.WithLabelValues(typeSuccess).Add(float64(len(refs)))
198+
return &logproto.PrefetchBloomBlocksResponse{}, err
199+
}
200+
164201
// FilterChunkRefs implements BloomGatewayServer
165202
func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunkRefRequest) (*logproto.FilterChunkRefResponse, error) {
166203
tenantID, err := tenant.TenantID(ctx)
@@ -204,14 +241,10 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
204241
return &logproto.FilterChunkRefResponse{ChunkRefs: req.Refs}, nil
205242
}
206243

207-
blocks := make([]bloomshipper.BlockRef, 0, len(req.Blocks))
208-
for _, key := range req.Blocks {
209-
block, err := bloomshipper.BlockRefFromKey(key)
210-
if err != nil {
211-
stats.Status = labelFailure
212-
return nil, errors.New("could not parse block key")
213-
}
214-
blocks = append(blocks, block)
244+
blocks, err := decodeBlockKeys(req.Blocks)
245+
if err != nil {
246+
stats.Status = labelFailure
247+
return nil, err
215248
}
216249

217250
// Shortcut if request does not contain blocks
@@ -470,3 +503,15 @@ func filterChunkRefsForSeries(cur *logproto.GroupedChunkRefs, removals v1.ChunkR
470503

471504
cur.Refs = cur.Refs[:len(res)]
472505
}
506+
507+
func decodeBlockKeys(keys []string) ([]bloomshipper.BlockRef, error) {
508+
blocks := make([]bloomshipper.BlockRef, 0, len(keys))
509+
for _, key := range keys {
510+
block, err := bloomshipper.BlockRefFromKey(key)
511+
if err != nil {
512+
return nil, errors.New("could not parse block key")
513+
}
514+
blocks = append(blocks, block)
515+
}
516+
return blocks, nil
517+
}

‎pkg/bloomgateway/cache.go

+8
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ func (m merger) MergeResponse(responses ...resultscache.Response) (resultscache.
113113

114114
type ClientCache struct {
115115
cache *resultscache.ResultsCache
116+
next logproto.BloomGatewayClient
116117
limits CacheLimits
117118
logger log.Logger
118119
}
@@ -149,12 +150,19 @@ func NewBloomGatewayClientCacheMiddleware(
149150
)
150151

151152
return &ClientCache{
153+
next: next,
152154
cache: resultsCache,
153155
limits: limits,
154156
logger: logger,
155157
}
156158
}
157159

160+
// PrefetchBloomBlocks implements logproto.BloomGatewayClient.
161+
func (c *ClientCache) PrefetchBloomBlocks(ctx context.Context, in *logproto.PrefetchBloomBlocksRequest, opts ...grpc.CallOption) (*logproto.PrefetchBloomBlocksResponse, error) {
162+
return c.next.PrefetchBloomBlocks(ctx, in, opts...)
163+
}
164+
165+
// FilterChunkRefs implements logproto.BloomGatewayClient.
158166
func (c *ClientCache) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunkRefRequest, opts ...grpc.CallOption) (*logproto.FilterChunkRefResponse, error) {
159167
cacheReq := requestWithGrpcCallOptions{
160168
FilterChunkRefRequest: req,

‎pkg/bloomgateway/cache_test.go

+8
Original file line numberDiff line numberDiff line change
@@ -468,6 +468,8 @@ type mockServer struct {
468468
res *logproto.FilterChunkRefResponse
469469
}
470470

471+
var _ logproto.BloomGatewayClient = &mockServer{}
472+
471473
func newMockServer(res *logproto.FilterChunkRefResponse) (*mockServer, *int) {
472474
var calls int
473475
return &mockServer{
@@ -480,11 +482,17 @@ func (s *mockServer) SetResponse(res *logproto.FilterChunkRefResponse) {
480482
s.res = res
481483
}
482484

485+
// FilterChunkRefs implements logproto.BloomGatewayClient.
483486
func (s *mockServer) FilterChunkRefs(_ context.Context, _ *logproto.FilterChunkRefRequest, _ ...grpc.CallOption) (*logproto.FilterChunkRefResponse, error) {
484487
*s.calls++
485488
return s.res, nil
486489
}
487490

491+
// PrefetchBloomBlocks implements logproto.BloomGatewayClient.
492+
func (s *mockServer) PrefetchBloomBlocks(_ context.Context, _ *logproto.PrefetchBloomBlocksRequest, _ ...grpc.CallOption) (*logproto.PrefetchBloomBlocksResponse, error) {
493+
panic("unimplemented")
494+
}
495+
488496
type mockLimits struct {
489497
cacheFreshness time.Duration
490498
cacheInterval time.Duration

‎pkg/bloomgateway/client.go

+49-2
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ func (i *ClientConfig) Validate() error {
116116

117117
type Client interface {
118118
FilterChunks(ctx context.Context, tenant string, interval bloomshipper.Interval, blocks []blockWithSeries, plan plan.QueryPlan) ([]*logproto.GroupedChunkRefs, error)
119+
PrefetchBloomBlocks(ctx context.Context, blocks []bloomshipper.BlockRef) error
119120
}
120121

121122
// clientPool is a minimal interface that is satisfied by the JumpHashClientPool.
@@ -204,6 +205,47 @@ func (c *GatewayClient) Close() {
204205
c.dnsProvider.Stop()
205206
}
206207

208+
func (c *GatewayClient) PrefetchBloomBlocks(ctx context.Context, blocks []bloomshipper.BlockRef) error {
209+
if len(blocks) == 0 {
210+
return nil
211+
}
212+
213+
pos := make(map[string]int)
214+
servers := make([]addrWithBlocks, 0, len(blocks))
215+
for _, block := range blocks {
216+
addr, err := c.pool.Addr(block.String())
217+
if err != nil {
218+
level.Error(c.logger).Log("msg", "failed to resolve server address for block", "block", block, "err", err)
219+
continue
220+
}
221+
222+
if idx, found := pos[addr]; found {
223+
servers[idx].blocks = append(servers[idx].blocks, block.String())
224+
} else {
225+
pos[addr] = len(servers)
226+
servers = append(servers, addrWithBlocks{
227+
addr: addr,
228+
blocks: []string{block.String()},
229+
})
230+
}
231+
}
232+
233+
return concurrency.ForEachJob(ctx, len(servers), len(servers), func(ctx context.Context, i int) error {
234+
rs := servers[i]
235+
return c.doForAddrs([]string{rs.addr}, func(client logproto.BloomGatewayClient) error {
236+
req := &logproto.PrefetchBloomBlocksRequest{Blocks: rs.blocks}
237+
_, err := client.PrefetchBloomBlocks(ctx, req)
238+
if err != nil {
239+
level.Error(c.logger).Log("msg", "block prefetch failed for instance, skipping", "addr", rs.addr, "blocks", len(rs.blocks), "err", err)
240+
c.metrics.clientRequests.WithLabelValues(routePrefectBlocks, typeError).Inc()
241+
} else {
242+
c.metrics.clientRequests.WithLabelValues(routePrefectBlocks, typeSuccess).Inc()
243+
}
244+
return err
245+
})
246+
})
247+
}
248+
207249
// FilterChunks implements Client
208250
func (c *GatewayClient) FilterChunks(ctx context.Context, _ string, interval bloomshipper.Interval, blocks []blockWithSeries, plan plan.QueryPlan) ([]*logproto.GroupedChunkRefs, error) {
209251
// no block and therefore no series with chunks
@@ -268,10 +310,10 @@ func (c *GatewayClient) FilterChunks(ctx context.Context, _ string, interval blo
268310
"err", err,
269311
)
270312
// filter none of the results on failed request
271-
c.metrics.clientRequests.WithLabelValues(typeError).Inc()
313+
c.metrics.clientRequests.WithLabelValues(routeFilterChunks, typeError).Inc()
272314
results[i] = rs.groups
273315
} else {
274-
c.metrics.clientRequests.WithLabelValues(typeSuccess).Inc()
316+
c.metrics.clientRequests.WithLabelValues(routeFilterChunks, typeSuccess).Inc()
275317
results[i] = resp.ChunkRefs
276318
}
277319

@@ -390,6 +432,11 @@ func (c *GatewayClient) doForAddrs(addrs []string, fn func(logproto.BloomGateway
390432
return err
391433
}
392434

435+
type addrWithBlocks struct {
436+
addr string
437+
blocks []string
438+
}
439+
393440
type addrWithGroups struct {
394441
addr string
395442
blocks []string

‎pkg/bloomgateway/metrics.go

+11-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ type metrics struct {
1818
const (
1919
typeSuccess = "success"
2020
typeError = "error"
21+
22+
routeFilterChunks = "FilterChunks"
23+
routePrefectBlocks = "PrefetchBloomBlocks"
2124
)
2225

2326
type clientMetrics struct {
@@ -32,7 +35,7 @@ func newClientMetrics(registerer prometheus.Registerer) *clientMetrics {
3235
Subsystem: "bloom_gateway_client",
3336
Name: "requests_total",
3437
Help: "Total number of requests made to the bloom gateway",
35-
}, []string{"type"}),
38+
}, []string{"route", "type"}),
3639
requestLatency: promauto.With(registerer).NewHistogramVec(prometheus.HistogramOpts{
3740
Namespace: constants.Loki,
3841
Subsystem: "bloom_gateway_client",
@@ -50,6 +53,7 @@ type serverMetrics struct {
5053
requestedChunks prometheus.Histogram
5154
filteredChunks prometheus.Histogram
5255
receivedMatchers prometheus.Histogram
56+
prefetchedBlocks *prometheus.CounterVec
5357
}
5458

5559
func newMetrics(registerer prometheus.Registerer, namespace, subsystem string) *metrics {
@@ -105,6 +109,12 @@ func newServerMetrics(registerer prometheus.Registerer, namespace, subsystem str
105109
Help: "Number of matchers per request.",
106110
Buckets: prometheus.ExponentialBuckets(1, 2, 9), // 1 -> 256
107111
}),
112+
prefetchedBlocks: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
113+
Namespace: namespace,
114+
Subsystem: subsystem,
115+
Name: "prefetched_blocks_total",
116+
Help: "Total amount of blocks prefetched by the bloom-gateway",
117+
}, []string{"status"}),
108118
}
109119
}
110120

‎pkg/bloomgateway/querier_test.go

+6
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ type noopClient struct {
2626
callCount int
2727
}
2828

29+
var _ Client = &noopClient{}
30+
2931
// FilterChunks implements Client.
3032
func (c *noopClient) FilterChunks(_ context.Context, _ string, _ bloomshipper.Interval, blocks []blockWithSeries, _ plan.QueryPlan) (result []*logproto.GroupedChunkRefs, err error) {
3133
for _, block := range blocks {
@@ -39,6 +41,10 @@ func (c *noopClient) FilterChunks(_ context.Context, _ string, _ bloomshipper.In
3941
return result, c.err
4042
}
4143

44+
func (c *noopClient) PrefetchBloomBlocks(_ context.Context, _ []bloomshipper.BlockRef) error {
45+
return nil
46+
}
47+
4248
type mockBlockResolver struct{}
4349

4450
// Resolve implements BlockResolver.

0 commit comments

Comments
 (0)