Skip to content

Commit 4bfa380

Browse files
authored
revert: "perf(bloom): Compute chunkrefs for series right before sending task to builder" (#14839)
1 parent feef1d8 commit 4bfa380

File tree

13 files changed

+188
-305
lines changed

13 files changed

+188
-305
lines changed

‎pkg/bloombuild/common/tsdb.go

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,8 @@ const (
2929
gzipExtension = ".gz"
3030
)
3131

32-
type ForSeries = sharding.ForSeries
33-
3432
type ClosableForSeries interface {
35-
ForSeries
33+
sharding.ForSeries
3634
Close() error
3735
}
3836

@@ -126,21 +124,33 @@ func (b *BloomTSDBStore) LoadTSDB(
126124
return idx, nil
127125
}
128126

129-
func NewTSDBSeriesIter(ctx context.Context, user string, f sharding.ForSeries, bounds v1.FingerprintBounds) (iter.Iterator[model.Fingerprint], error) {
127+
func NewTSDBSeriesIter(ctx context.Context, user string, f sharding.ForSeries, bounds v1.FingerprintBounds) (iter.Iterator[*v1.Series], error) {
130128
// TODO(salvacorts): Create a pool
131-
series := make([]model.Fingerprint, 0, 100)
129+
series := make([]*v1.Series, 0, 100)
132130

133131
if err := f.ForSeries(
134132
ctx,
135133
user,
136134
bounds,
137135
0, math.MaxInt64,
138-
func(_ labels.Labels, fp model.Fingerprint, _ []index.ChunkMeta) (stop bool) {
136+
func(_ labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) (stop bool) {
139137
select {
140138
case <-ctx.Done():
141139
return true
142140
default:
143-
series = append(series, fp)
141+
res := &v1.Series{
142+
Fingerprint: fp,
143+
Chunks: make(v1.ChunkRefs, 0, len(chks)),
144+
}
145+
for _, chk := range chks {
146+
res.Chunks = append(res.Chunks, v1.ChunkRef{
147+
From: model.Time(chk.MinTime),
148+
Through: model.Time(chk.MaxTime),
149+
Checksum: chk.Checksum,
150+
})
151+
}
152+
153+
series = append(series, res)
144154
return false
145155
}
146156
},
@@ -151,7 +161,7 @@ func NewTSDBSeriesIter(ctx context.Context, user string, f sharding.ForSeries, b
151161

152162
select {
153163
case <-ctx.Done():
154-
return iter.NewEmptyIter[model.Fingerprint](), ctx.Err()
164+
return iter.NewEmptyIter[*v1.Series](), ctx.Err()
155165
default:
156166
return iter.NewCancelableIter(ctx, iter.NewSliceIter(series)), nil
157167
}

‎pkg/bloombuild/common/tsdb_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,10 +66,10 @@ func TestTSDBSeriesIter(t *testing.T) {
6666
itr, err := NewTSDBSeriesIter(context.Background(), "", forSeriesTestImpl(input), v1.NewBounds(0, math.MaxUint64))
6767
require.NoError(t, err)
6868

69-
v1.CompareIterators(
69+
v1.EqualIterators(
7070
t,
71-
func(t *testing.T, a model.Fingerprint, b *v1.Series) {
72-
require.Equal(t, a, b.Fingerprint)
71+
func(a, b *v1.Series) {
72+
require.Equal(t, a, b)
7373
},
7474
itr,
7575
srcItr,

‎pkg/bloombuild/planner/planner.go

Lines changed: 31 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -227,10 +227,9 @@ func (p *Planner) runOne(ctx context.Context) error {
227227
}
228228

229229
var (
230-
wg sync.WaitGroup
231-
start = time.Now()
232-
status = statusFailure
233-
openTSDBs strategies.TSDBSet
230+
wg sync.WaitGroup
231+
start = time.Now()
232+
status = statusFailure
234233
)
235234
defer func() {
236235
p.metrics.buildCompleted.WithLabelValues(status).Inc()
@@ -239,15 +238,6 @@ func (p *Planner) runOne(ctx context.Context) error {
239238
if status == statusSuccess {
240239
p.metrics.buildLastSuccess.SetToCurrentTime()
241240
}
242-
243-
// Close all open TSDBs.
244-
// These are used to get the chunkrefs for the series in the gaps.
245-
// We populate the chunkrefs when we send the task to the builder.
246-
for idx, reader := range openTSDBs {
247-
if err := reader.Close(); err != nil {
248-
level.Error(p.logger).Log("msg", "failed to close tsdb", "tsdb", idx.Name(), "err", err)
249-
}
250-
}
251241
}()
252242

253243
p.metrics.buildStarted.Inc()
@@ -285,19 +275,7 @@ func (p *Planner) runOne(ctx context.Context) error {
285275
table: table,
286276
}
287277

288-
tsdbs, err := p.tsdbStore.ResolveTSDBs(ctx, table, tenant)
289-
if err != nil {
290-
level.Error(logger).Log("msg", "failed to resolve tsdbs", "err", err)
291-
continue
292-
}
293-
294-
openTSDBs, err = openAllTSDBs(ctx, table, tenant, p.tsdbStore, tsdbs, openTSDBs)
295-
if err != nil {
296-
level.Error(logger).Log("msg", "failed to open all tsdbs", "err", err)
297-
continue
298-
}
299-
300-
tasks, existingMetas, err := p.computeTasks(ctx, table, tenant, openTSDBs)
278+
tasks, existingMetas, err := p.computeTasks(ctx, table, tenant)
301279
if err != nil {
302280
level.Error(logger).Log("msg", "failed to compute tasks", "err", err)
303281
continue
@@ -308,7 +286,7 @@ func (p *Planner) runOne(ctx context.Context) error {
308286

309287
now := time.Now()
310288
for _, task := range tasks {
311-
queueTask := NewQueueTask(ctx, now, task, openTSDBs[task.TSDB], resultsCh)
289+
queueTask := NewQueueTask(ctx, now, task, resultsCh)
312290
if err := p.enqueueTask(queueTask); err != nil {
313291
level.Error(logger).Log("msg", "error enqueuing task", "err", err)
314292
continue
@@ -396,8 +374,7 @@ func (p *Planner) computeTasks(
396374
ctx context.Context,
397375
table config.DayTable,
398376
tenant string,
399-
tsdbs strategies.TSDBSet,
400-
) ([]*strategies.Task, []bloomshipper.Meta, error) {
377+
) ([]*protos.Task, []bloomshipper.Meta, error) {
401378
strategy, err := strategies.NewStrategy(tenant, p.limits, p.logger)
402379
if err != nil {
403380
return nil, nil, fmt.Errorf("error creating strategy: %w", err)
@@ -425,11 +402,29 @@ func (p *Planner) computeTasks(
425402
return nil, nil, fmt.Errorf("failed to delete outdated metas during planning: %w", err)
426403
}
427404

405+
// Resolve TSDBs
406+
tsdbs, err := p.tsdbStore.ResolveTSDBs(ctx, table, tenant)
407+
if err != nil {
408+
return nil, nil, fmt.Errorf("failed to resolve tsdbs: %w", err)
409+
}
410+
428411
if len(tsdbs) == 0 {
429412
return nil, metas, nil
430413
}
431414

432-
tasks, err := strategy.Plan(ctx, table, tenant, tsdbs, metas)
415+
openTSDBs, err := openAllTSDBs(ctx, table, tenant, p.tsdbStore, tsdbs)
416+
if err != nil {
417+
return nil, nil, fmt.Errorf("failed to open all tsdbs: %w", err)
418+
}
419+
defer func() {
420+
for idx, reader := range openTSDBs {
421+
if err := reader.Close(); err != nil {
422+
level.Error(logger).Log("msg", "failed to close index", "err", err, "tsdb", idx.Name())
423+
}
424+
}
425+
}()
426+
427+
tasks, err := strategy.Plan(ctx, table, tenant, openTSDBs, metas)
433428
if err != nil {
434429
return nil, nil, fmt.Errorf("failed to plan tasks: %w", err)
435430
}
@@ -511,26 +506,18 @@ func openAllTSDBs(
511506
tenant string,
512507
store common.TSDBStore,
513508
tsdbs []tsdb.SingleTenantTSDBIdentifier,
514-
alreadyOpen strategies.TSDBSet,
515-
) (strategies.TSDBSet, error) {
516-
if len(alreadyOpen) == 0 {
517-
alreadyOpen = make(strategies.TSDBSet, len(tsdbs))
518-
}
519-
509+
) (map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries, error) {
510+
openTSDBs := make(map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries, len(tsdbs))
520511
for _, idx := range tsdbs {
521-
if _, ok := alreadyOpen[idx]; ok {
522-
continue
523-
}
524-
525-
reader, err := store.LoadTSDB(ctx, table, tenant, idx)
512+
tsdb, err := store.LoadTSDB(ctx, table, tenant, idx)
526513
if err != nil {
527514
return nil, fmt.Errorf("failed to load tsdb: %w", err)
528515
}
529516

530-
alreadyOpen[idx] = reader
517+
openTSDBs[idx] = tsdb
531518
}
532519

533-
return alreadyOpen, nil
520+
return openTSDBs, nil
534521
}
535522

536523
// deleteOutdatedMetasAndBlocks filters out the outdated metas from the `metas` argument and deletes them from the store.
@@ -860,13 +847,8 @@ func (p *Planner) forwardTaskToBuilder(
860847
builderID string,
861848
task *QueueTask,
862849
) (*protos.TaskResult, error) {
863-
protoTask, err := task.ToProtoTask(builder.Context())
864-
if err != nil {
865-
return nil, fmt.Errorf("error converting task to proto task: %w", err)
866-
}
867-
868850
msg := &protos.PlannerToBuilder{
869-
Task: protoTask,
851+
Task: task.ToProtoTask(),
870852
}
871853

872854
if err := builder.Send(msg); err != nil {

‎pkg/bloombuild/planner/planner_test.go

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -713,21 +713,12 @@ func (f *fakeBuilder) Recv() (*protos.BuilderToPlanner, error) {
713713
}
714714

715715
func createTasks(n int, resultsCh chan *protos.TaskResult) []*QueueTask {
716-
forSeries := plannertest.NewFakeForSeries(plannertest.GenV1Series(v1.NewBounds(0, 100)))
717-
718716
tasks := make([]*QueueTask, 0, n)
719717
// Enqueue tasks
720718
for i := 0; i < n; i++ {
721719
task := NewQueueTask(
722720
context.Background(), time.Now(),
723-
strategies.NewTask(
724-
config.NewDayTable(plannertest.TestDay, "fake"),
725-
"fakeTenant",
726-
v1.NewBounds(0, 10),
727-
plannertest.TsdbID(1),
728-
nil,
729-
),
730-
forSeries,
721+
protos.NewTask(config.NewDayTable(plannertest.TestDay, "fake"), "fakeTenant", v1.NewBounds(0, 10), plannertest.TsdbID(1), nil),
731722
resultsCh,
732723
)
733724
tasks = append(tasks, task)

‎pkg/bloombuild/planner/plannertest/utils.go

Lines changed: 2 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,13 @@ import (
66
"time"
77

88
"github.com/prometheus/common/model"
9-
"github.com/prometheus/prometheus/model/labels"
109

1110
"github.com/grafana/loki/v3/pkg/compression"
1211
v2 "github.com/grafana/loki/v3/pkg/iter/v2"
1312
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
1413
"github.com/grafana/loki/v3/pkg/storage/config"
1514
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
1615
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
17-
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/index"
1816
)
1917

2018
var TestDay = ParseDayTime("2023-09-01")
@@ -89,23 +87,11 @@ func GenBlock(ref bloomshipper.BlockRef) (bloomshipper.Block, error) {
8987
}, nil
9088
}
9189

92-
func GenSeries(bounds v1.FingerprintBounds) []model.Fingerprint {
90+
func GenSeries(bounds v1.FingerprintBounds) []*v1.Series {
9391
return GenSeriesWithStep(bounds, 1)
9492
}
9593

96-
func GenSeriesWithStep(bounds v1.FingerprintBounds, step int) []model.Fingerprint {
97-
series := make([]model.Fingerprint, 0, int(bounds.Max-bounds.Min+1)/step)
98-
for i := bounds.Min; i <= bounds.Max; i += model.Fingerprint(step) {
99-
series = append(series, i)
100-
}
101-
return series
102-
}
103-
104-
func GenV1Series(bounds v1.FingerprintBounds) []*v1.Series {
105-
return GenV1SeriesWithStep(bounds, 1)
106-
}
107-
108-
func GenV1SeriesWithStep(bounds v1.FingerprintBounds, step int) []*v1.Series {
94+
func GenSeriesWithStep(bounds v1.FingerprintBounds, step int) []*v1.Series {
10995
series := make([]*v1.Series, 0, int(bounds.Max-bounds.Min+1)/step)
11096
for i := bounds.Min; i <= bounds.Max; i += model.Fingerprint(step) {
11197
series = append(series, &v1.Series{
@@ -153,43 +139,3 @@ func ParseDayTime(s string) config.DayTime {
153139
Time: model.TimeFromUnix(t.Unix()),
154140
}
155141
}
156-
157-
type FakeForSeries struct {
158-
series []*v1.Series
159-
}
160-
161-
func NewFakeForSeries(series []*v1.Series) *FakeForSeries {
162-
return &FakeForSeries{
163-
series: series,
164-
}
165-
}
166-
167-
func (f FakeForSeries) ForSeries(_ context.Context, _ string, ff index.FingerprintFilter, _ model.Time, _ model.Time, fn func(labels.Labels, model.Fingerprint, []index.ChunkMeta) (stop bool), _ ...*labels.Matcher) error {
168-
overlapping := make([]*v1.Series, 0, len(f.series))
169-
for _, s := range f.series {
170-
if ff.Match(s.Fingerprint) {
171-
overlapping = append(overlapping, s)
172-
}
173-
}
174-
175-
for _, s := range overlapping {
176-
chunks := make([]index.ChunkMeta, 0, len(s.Chunks))
177-
for _, c := range s.Chunks {
178-
chunks = append(chunks, index.ChunkMeta{
179-
MinTime: int64(c.From),
180-
MaxTime: int64(c.Through),
181-
Checksum: c.Checksum,
182-
KB: 100,
183-
})
184-
}
185-
186-
if fn(labels.EmptyLabels(), s.Fingerprint, chunks) {
187-
break
188-
}
189-
}
190-
return nil
191-
}
192-
193-
func (f FakeForSeries) Close() error {
194-
return nil
195-
}

0 commit comments

Comments
 (0)