Skip to content

Commit 66e6b1c

Browse files
authored
perf(bloom): Compute chunkrefs for series right before sending task to builder (#14808)
1 parent 3b20cb0 commit 66e6b1c

13 files changed

+304
-187
lines changed

‎pkg/bloombuild/common/tsdb.go

+7-17
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,10 @@ const (
2929
gzipExtension = ".gz"
3030
)
3131

32+
type ForSeries = sharding.ForSeries
33+
3234
type ClosableForSeries interface {
33-
sharding.ForSeries
35+
ForSeries
3436
Close() error
3537
}
3638

@@ -124,9 +126,9 @@ func (b *BloomTSDBStore) LoadTSDB(
124126
return idx, nil
125127
}
126128

127-
func NewTSDBSeriesIter(ctx context.Context, user string, f sharding.ForSeries, bounds v1.FingerprintBounds) (iter.Iterator[*v1.Series], error) {
129+
func NewTSDBSeriesIter(ctx context.Context, user string, f sharding.ForSeries, bounds v1.FingerprintBounds) (iter.Iterator[model.Fingerprint], error) {
128130
// TODO(salvacorts): Create a pool
129-
series := make([]*v1.Series, 0, 100)
131+
series := make([]model.Fingerprint, 0, 100)
130132

131133
if err := f.ForSeries(
132134
ctx,
@@ -138,19 +140,7 @@ func NewTSDBSeriesIter(ctx context.Context, user string, f sharding.ForSeries, b
138140
case <-ctx.Done():
139141
return true
140142
default:
141-
res := &v1.Series{
142-
Fingerprint: fp,
143-
Chunks: make(v1.ChunkRefs, 0, len(chks)),
144-
}
145-
for _, chk := range chks {
146-
res.Chunks = append(res.Chunks, v1.ChunkRef{
147-
From: model.Time(chk.MinTime),
148-
Through: model.Time(chk.MaxTime),
149-
Checksum: chk.Checksum,
150-
})
151-
}
152-
153-
series = append(series, res)
143+
series = append(series, fp)
154144
return false
155145
}
156146
},
@@ -161,7 +151,7 @@ func NewTSDBSeriesIter(ctx context.Context, user string, f sharding.ForSeries, b
161151

162152
select {
163153
case <-ctx.Done():
164-
return iter.NewEmptyIter[*v1.Series](), ctx.Err()
154+
return iter.NewEmptyIter[model.Fingerprint](), ctx.Err()
165155
default:
166156
return iter.NewCancelableIter(ctx, iter.NewSliceIter(series)), nil
167157
}

‎pkg/bloombuild/common/tsdb_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -66,10 +66,10 @@ func TestTSDBSeriesIter(t *testing.T) {
6666
itr, err := NewTSDBSeriesIter(context.Background(), "", forSeriesTestImpl(input), v1.NewBounds(0, math.MaxUint64))
6767
require.NoError(t, err)
6868

69-
v1.EqualIterators(
69+
v1.CompareIterators(
7070
t,
71-
func(a, b *v1.Series) {
72-
require.Equal(t, a, b)
71+
func(t *testing.T, a model.Fingerprint, b *v1.Series) {
72+
require.Equal(t, a, b.Fingerprint)
7373
},
7474
itr,
7575
srcItr,

‎pkg/bloombuild/planner/planner.go

+49-31
Original file line numberDiff line numberDiff line change
@@ -227,9 +227,10 @@ func (p *Planner) runOne(ctx context.Context) error {
227227
}
228228

229229
var (
230-
wg sync.WaitGroup
231-
start = time.Now()
232-
status = statusFailure
230+
wg sync.WaitGroup
231+
start = time.Now()
232+
status = statusFailure
233+
openTSDBs strategies.TSDBSet
233234
)
234235
defer func() {
235236
p.metrics.buildCompleted.WithLabelValues(status).Inc()
@@ -238,6 +239,15 @@ func (p *Planner) runOne(ctx context.Context) error {
238239
if status == statusSuccess {
239240
p.metrics.buildLastSuccess.SetToCurrentTime()
240241
}
242+
243+
// Close all open TSDBs.
244+
// These are used to get the chunkrefs for the series in the gaps.
245+
// We populate the chunkrefs when we send the task to the builder.
246+
for idx, reader := range openTSDBs {
247+
if err := reader.Close(); err != nil {
248+
level.Error(p.logger).Log("msg", "failed to close tsdb", "tsdb", idx.Name(), "err", err)
249+
}
250+
}
241251
}()
242252

243253
p.metrics.buildStarted.Inc()
@@ -275,7 +285,19 @@ func (p *Planner) runOne(ctx context.Context) error {
275285
table: table,
276286
}
277287

278-
tasks, existingMetas, err := p.computeTasks(ctx, table, tenant)
288+
tsdbs, err := p.tsdbStore.ResolveTSDBs(ctx, table, tenant)
289+
if err != nil {
290+
level.Error(logger).Log("msg", "failed to resolve tsdbs", "err", err)
291+
continue
292+
}
293+
294+
openTSDBs, err = openAllTSDBs(ctx, table, tenant, p.tsdbStore, tsdbs, openTSDBs)
295+
if err != nil {
296+
level.Error(logger).Log("msg", "failed to open all tsdbs", "err", err)
297+
continue
298+
}
299+
300+
tasks, existingMetas, err := p.computeTasks(ctx, table, tenant, openTSDBs)
279301
if err != nil {
280302
level.Error(logger).Log("msg", "failed to compute tasks", "err", err)
281303
continue
@@ -286,7 +308,7 @@ func (p *Planner) runOne(ctx context.Context) error {
286308

287309
now := time.Now()
288310
for _, task := range tasks {
289-
queueTask := NewQueueTask(ctx, now, task, resultsCh)
311+
queueTask := NewQueueTask(ctx, now, task, openTSDBs[task.TSDB], resultsCh)
290312
if err := p.enqueueTask(queueTask); err != nil {
291313
level.Error(logger).Log("msg", "error enqueuing task", "err", err)
292314
continue
@@ -374,7 +396,8 @@ func (p *Planner) computeTasks(
374396
ctx context.Context,
375397
table config.DayTable,
376398
tenant string,
377-
) ([]*protos.Task, []bloomshipper.Meta, error) {
399+
tsdbs strategies.TSDBSet,
400+
) ([]*strategies.Task, []bloomshipper.Meta, error) {
378401
strategy, err := strategies.NewStrategy(tenant, p.limits, p.logger)
379402
if err != nil {
380403
return nil, nil, fmt.Errorf("error creating strategy: %w", err)
@@ -402,29 +425,11 @@ func (p *Planner) computeTasks(
402425
return nil, nil, fmt.Errorf("failed to delete outdated metas during planning: %w", err)
403426
}
404427

405-
// Resolve TSDBs
406-
tsdbs, err := p.tsdbStore.ResolveTSDBs(ctx, table, tenant)
407-
if err != nil {
408-
return nil, nil, fmt.Errorf("failed to resolve tsdbs: %w", err)
409-
}
410-
411428
if len(tsdbs) == 0 {
412429
return nil, metas, nil
413430
}
414431

415-
openTSDBs, err := openAllTSDBs(ctx, table, tenant, p.tsdbStore, tsdbs)
416-
if err != nil {
417-
return nil, nil, fmt.Errorf("failed to open all tsdbs: %w", err)
418-
}
419-
defer func() {
420-
for idx, reader := range openTSDBs {
421-
if err := reader.Close(); err != nil {
422-
level.Error(logger).Log("msg", "failed to close index", "err", err, "tsdb", idx.Name())
423-
}
424-
}
425-
}()
426-
427-
tasks, err := strategy.Plan(ctx, table, tenant, openTSDBs, metas)
432+
tasks, err := strategy.Plan(ctx, table, tenant, tsdbs, metas)
428433
if err != nil {
429434
return nil, nil, fmt.Errorf("failed to plan tasks: %w", err)
430435
}
@@ -506,18 +511,26 @@ func openAllTSDBs(
506511
tenant string,
507512
store common.TSDBStore,
508513
tsdbs []tsdb.SingleTenantTSDBIdentifier,
509-
) (map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries, error) {
510-
openTSDBs := make(map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries, len(tsdbs))
514+
alreadyOpen strategies.TSDBSet,
515+
) (strategies.TSDBSet, error) {
516+
if len(alreadyOpen) == 0 {
517+
alreadyOpen = make(strategies.TSDBSet, len(tsdbs))
518+
}
519+
511520
for _, idx := range tsdbs {
512-
tsdb, err := store.LoadTSDB(ctx, table, tenant, idx)
521+
if _, ok := alreadyOpen[idx]; ok {
522+
continue
523+
}
524+
525+
reader, err := store.LoadTSDB(ctx, table, tenant, idx)
513526
if err != nil {
514527
return nil, fmt.Errorf("failed to load tsdb: %w", err)
515528
}
516529

517-
openTSDBs[idx] = tsdb
530+
alreadyOpen[idx] = reader
518531
}
519532

520-
return openTSDBs, nil
533+
return alreadyOpen, nil
521534
}
522535

523536
// deleteOutdatedMetasAndBlocks filters out the outdated metas from the `metas` argument and deletes them from the store.
@@ -847,8 +860,13 @@ func (p *Planner) forwardTaskToBuilder(
847860
builderID string,
848861
task *QueueTask,
849862
) (*protos.TaskResult, error) {
863+
protoTask, err := task.ToProtoTask(builder.Context())
864+
if err != nil {
865+
return nil, fmt.Errorf("error converting task to proto task: %w", err)
866+
}
867+
850868
msg := &protos.PlannerToBuilder{
851-
Task: task.ToProtoTask(),
869+
Task: protoTask,
852870
}
853871

854872
if err := builder.Send(msg); err != nil {

‎pkg/bloombuild/planner/planner_test.go

+10-1
Original file line numberDiff line numberDiff line change
@@ -713,12 +713,21 @@ func (f *fakeBuilder) Recv() (*protos.BuilderToPlanner, error) {
713713
}
714714

715715
func createTasks(n int, resultsCh chan *protos.TaskResult) []*QueueTask {
716+
forSeries := plannertest.NewFakeForSeries(plannertest.GenV1Series(v1.NewBounds(0, 100)))
717+
716718
tasks := make([]*QueueTask, 0, n)
717719
// Enqueue tasks
718720
for i := 0; i < n; i++ {
719721
task := NewQueueTask(
720722
context.Background(), time.Now(),
721-
protos.NewTask(config.NewDayTable(plannertest.TestDay, "fake"), "fakeTenant", v1.NewBounds(0, 10), plannertest.TsdbID(1), nil),
723+
strategies.NewTask(
724+
config.NewDayTable(plannertest.TestDay, "fake"),
725+
"fakeTenant",
726+
v1.NewBounds(0, 10),
727+
plannertest.TsdbID(1),
728+
nil,
729+
),
730+
forSeries,
722731
resultsCh,
723732
)
724733
tasks = append(tasks, task)

‎pkg/bloombuild/planner/plannertest/utils.go

+56-2
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,15 @@ import (
66
"time"
77

88
"github.com/prometheus/common/model"
9+
"github.com/prometheus/prometheus/model/labels"
910

1011
"github.com/grafana/loki/v3/pkg/compression"
1112
v2 "github.com/grafana/loki/v3/pkg/iter/v2"
1213
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
1314
"github.com/grafana/loki/v3/pkg/storage/config"
1415
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
1516
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
17+
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/index"
1618
)
1719

1820
var TestDay = ParseDayTime("2023-09-01")
@@ -87,11 +89,23 @@ func GenBlock(ref bloomshipper.BlockRef) (bloomshipper.Block, error) {
8789
}, nil
8890
}
8991

90-
func GenSeries(bounds v1.FingerprintBounds) []*v1.Series {
92+
func GenSeries(bounds v1.FingerprintBounds) []model.Fingerprint {
9193
return GenSeriesWithStep(bounds, 1)
9294
}
9395

94-
func GenSeriesWithStep(bounds v1.FingerprintBounds, step int) []*v1.Series {
96+
func GenSeriesWithStep(bounds v1.FingerprintBounds, step int) []model.Fingerprint {
97+
series := make([]model.Fingerprint, 0, int(bounds.Max-bounds.Min+1)/step)
98+
for i := bounds.Min; i <= bounds.Max; i += model.Fingerprint(step) {
99+
series = append(series, i)
100+
}
101+
return series
102+
}
103+
104+
func GenV1Series(bounds v1.FingerprintBounds) []*v1.Series {
105+
return GenV1SeriesWithStep(bounds, 1)
106+
}
107+
108+
func GenV1SeriesWithStep(bounds v1.FingerprintBounds, step int) []*v1.Series {
95109
series := make([]*v1.Series, 0, int(bounds.Max-bounds.Min+1)/step)
96110
for i := bounds.Min; i <= bounds.Max; i += model.Fingerprint(step) {
97111
series = append(series, &v1.Series{
@@ -139,3 +153,43 @@ func ParseDayTime(s string) config.DayTime {
139153
Time: model.TimeFromUnix(t.Unix()),
140154
}
141155
}
156+
157+
type FakeForSeries struct {
158+
series []*v1.Series
159+
}
160+
161+
func NewFakeForSeries(series []*v1.Series) *FakeForSeries {
162+
return &FakeForSeries{
163+
series: series,
164+
}
165+
}
166+
167+
func (f FakeForSeries) ForSeries(_ context.Context, _ string, ff index.FingerprintFilter, _ model.Time, _ model.Time, fn func(labels.Labels, model.Fingerprint, []index.ChunkMeta) (stop bool), _ ...*labels.Matcher) error {
168+
overlapping := make([]*v1.Series, 0, len(f.series))
169+
for _, s := range f.series {
170+
if ff.Match(s.Fingerprint) {
171+
overlapping = append(overlapping, s)
172+
}
173+
}
174+
175+
for _, s := range overlapping {
176+
chunks := make([]index.ChunkMeta, 0, len(s.Chunks))
177+
for _, c := range s.Chunks {
178+
chunks = append(chunks, index.ChunkMeta{
179+
MinTime: int64(c.From),
180+
MaxTime: int64(c.Through),
181+
Checksum: c.Checksum,
182+
KB: 100,
183+
})
184+
}
185+
186+
if fn(labels.EmptyLabels(), s.Fingerprint, chunks) {
187+
break
188+
}
189+
}
190+
return nil
191+
}
192+
193+
func (f FakeForSeries) Close() error {
194+
return nil
195+
}

0 commit comments

Comments
 (0)