Skip to content

perf(bloom): Compute chunkrefs for series right before sending task to builder #14808

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Nov 7, 2024
20 changes: 4 additions & 16 deletions pkg/bloombuild/common/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,9 @@ func (b *BloomTSDBStore) LoadTSDB(
return idx, nil
}

func NewTSDBSeriesIter(ctx context.Context, user string, f sharding.ForSeries, bounds v1.FingerprintBounds) (iter.Iterator[*v1.Series], error) {
func NewTSDBSeriesIter(ctx context.Context, user string, f sharding.ForSeries, bounds v1.FingerprintBounds) (iter.Iterator[model.Fingerprint], error) {
// TODO(salvacorts): Create a pool
series := make([]*v1.Series, 0, 100)
series := make([]model.Fingerprint, 0, 100)

if err := f.ForSeries(
ctx,
Expand All @@ -138,19 +138,7 @@ func NewTSDBSeriesIter(ctx context.Context, user string, f sharding.ForSeries, b
case <-ctx.Done():
return true
default:
res := &v1.Series{
Fingerprint: fp,
Chunks: make(v1.ChunkRefs, 0, len(chks)),
}
for _, chk := range chks {
res.Chunks = append(res.Chunks, v1.ChunkRef{
From: model.Time(chk.MinTime),
Through: model.Time(chk.MaxTime),
Checksum: chk.Checksum,
})
}

series = append(series, res)
series = append(series, fp)
return false
}
},
Expand All @@ -161,7 +149,7 @@ func NewTSDBSeriesIter(ctx context.Context, user string, f sharding.ForSeries, b

select {
case <-ctx.Done():
return iter.NewEmptyIter[*v1.Series](), ctx.Err()
return iter.NewEmptyIter[model.Fingerprint](), ctx.Err()
default:
return iter.NewCancelableIter(ctx, iter.NewSliceIter(series)), nil
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/bloombuild/common/tsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@ func TestTSDBSeriesIter(t *testing.T) {
itr, err := NewTSDBSeriesIter(context.Background(), "", forSeriesTestImpl(input), v1.NewBounds(0, math.MaxUint64))
require.NoError(t, err)

v1.EqualIterators(
v1.CompareIterators(
t,
func(a, b *v1.Series) {
require.Equal(t, a, b)
func(t *testing.T, a model.Fingerprint, b *v1.Series) {
require.Equal(t, a, b.Fingerprint)
},
itr,
srcItr,
Expand Down
80 changes: 49 additions & 31 deletions pkg/bloombuild/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,9 +227,10 @@ func (p *Planner) runOne(ctx context.Context) error {
}

var (
wg sync.WaitGroup
start = time.Now()
status = statusFailure
wg sync.WaitGroup
start = time.Now()
status = statusFailure
openTSDBs strategies.TSDBSet
)
defer func() {
p.metrics.buildCompleted.WithLabelValues(status).Inc()
Expand All @@ -238,6 +239,15 @@ func (p *Planner) runOne(ctx context.Context) error {
if status == statusSuccess {
p.metrics.buildLastSuccess.SetToCurrentTime()
}

// Close all open TSDBs.
// These are used to get the chunkrefs for the series in the gaps.
// We populate the chunkrefs when we send the task to the builder.
for idx, reader := range openTSDBs {
if err := reader.Close(); err != nil {
level.Error(p.logger).Log("msg", "failed to close tsdb", "tsdb", idx.Name(), "err", err)
}
}
}()

p.metrics.buildStarted.Inc()
Expand Down Expand Up @@ -275,7 +285,19 @@ func (p *Planner) runOne(ctx context.Context) error {
table: table,
}

tasks, existingMetas, err := p.computeTasks(ctx, table, tenant)
tsdbs, err := p.tsdbStore.ResolveTSDBs(ctx, table, tenant)
if err != nil {
level.Error(logger).Log("msg", "failed to resolve tsdbs", "err", err)
continue
}

openTSDBs, err = openAllTSDBs(ctx, table, tenant, p.tsdbStore, tsdbs, openTSDBs)
if err != nil {
level.Error(logger).Log("msg", "failed to open all tsdbs", "err", err)
continue
}

tasks, existingMetas, err := p.computeTasks(ctx, table, tenant, openTSDBs)
if err != nil {
level.Error(logger).Log("msg", "failed to compute tasks", "err", err)
continue
Expand All @@ -286,7 +308,7 @@ func (p *Planner) runOne(ctx context.Context) error {

now := time.Now()
for _, task := range tasks {
queueTask := NewQueueTask(ctx, now, task, resultsCh)
queueTask := NewQueueTask(ctx, now, task, openTSDBs[task.TSDB], resultsCh)
if err := p.enqueueTask(queueTask); err != nil {
level.Error(logger).Log("msg", "error enqueuing task", "err", err)
continue
Expand Down Expand Up @@ -374,7 +396,8 @@ func (p *Planner) computeTasks(
ctx context.Context,
table config.DayTable,
tenant string,
) ([]*protos.Task, []bloomshipper.Meta, error) {
tsdbs strategies.TSDBSet,
) ([]*strategies.Task, []bloomshipper.Meta, error) {
strategy, err := strategies.NewStrategy(tenant, p.limits, p.logger)
if err != nil {
return nil, nil, fmt.Errorf("error creating strategy: %w", err)
Expand Down Expand Up @@ -402,29 +425,11 @@ func (p *Planner) computeTasks(
return nil, nil, fmt.Errorf("failed to delete outdated metas during planning: %w", err)
}

// Resolve TSDBs
tsdbs, err := p.tsdbStore.ResolveTSDBs(ctx, table, tenant)
if err != nil {
return nil, nil, fmt.Errorf("failed to resolve tsdbs: %w", err)
}

if len(tsdbs) == 0 {
return nil, metas, nil
}

openTSDBs, err := openAllTSDBs(ctx, table, tenant, p.tsdbStore, tsdbs)
if err != nil {
return nil, nil, fmt.Errorf("failed to open all tsdbs: %w", err)
}
defer func() {
for idx, reader := range openTSDBs {
if err := reader.Close(); err != nil {
level.Error(logger).Log("msg", "failed to close index", "err", err, "tsdb", idx.Name())
}
}
}()

tasks, err := strategy.Plan(ctx, table, tenant, openTSDBs, metas)
tasks, err := strategy.Plan(ctx, table, tenant, tsdbs, metas)
if err != nil {
return nil, nil, fmt.Errorf("failed to plan tasks: %w", err)
}
Expand Down Expand Up @@ -506,18 +511,26 @@ func openAllTSDBs(
tenant string,
store common.TSDBStore,
tsdbs []tsdb.SingleTenantTSDBIdentifier,
) (map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries, error) {
openTSDBs := make(map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries, len(tsdbs))
alreadyOpen strategies.TSDBSet,
) (strategies.TSDBSet, error) {
if len(alreadyOpen) == 0 {
alreadyOpen = make(strategies.TSDBSet, len(tsdbs))
}

for _, idx := range tsdbs {
tsdb, err := store.LoadTSDB(ctx, table, tenant, idx)
if _, ok := alreadyOpen[idx]; ok {
continue
}
Comment on lines +521 to +523
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch :)


reader, err := store.LoadTSDB(ctx, table, tenant, idx)
if err != nil {
return nil, fmt.Errorf("failed to load tsdb: %w", err)
}

openTSDBs[idx] = tsdb
alreadyOpen[idx] = reader
}

return openTSDBs, nil
return alreadyOpen, nil
}

// deleteOutdatedMetasAndBlocks filters out the outdated metas from the `metas` argument and deletes them from the store.
Expand Down Expand Up @@ -847,8 +860,13 @@ func (p *Planner) forwardTaskToBuilder(
builderID string,
task *QueueTask,
) (*protos.TaskResult, error) {
protoTask, err := task.ToProtoTask(builder.Context())
if err != nil {
return nil, fmt.Errorf("error converting task to proto task: %w", err)
}

msg := &protos.PlannerToBuilder{
Task: task.ToProtoTask(),
Task: protoTask,
}

if err := builder.Send(msg); err != nil {
Expand Down
11 changes: 10 additions & 1 deletion pkg/bloombuild/planner/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -713,12 +713,21 @@ func (f *fakeBuilder) Recv() (*protos.BuilderToPlanner, error) {
}

func createTasks(n int, resultsCh chan *protos.TaskResult) []*QueueTask {
forSeries := plannertest.NewFakeForSeries(plannertest.GenV1Series(v1.NewBounds(0, 100)))

tasks := make([]*QueueTask, 0, n)
// Enqueue tasks
for i := 0; i < n; i++ {
task := NewQueueTask(
context.Background(), time.Now(),
protos.NewTask(config.NewDayTable(plannertest.TestDay, "fake"), "fakeTenant", v1.NewBounds(0, 10), plannertest.TsdbID(1), nil),
strategies.NewTask(
config.NewDayTable(plannertest.TestDay, "fake"),
"fakeTenant",
v1.NewBounds(0, 10),
plannertest.TsdbID(1),
nil,
),
forSeries,
resultsCh,
)
tasks = append(tasks, task)
Expand Down
58 changes: 56 additions & 2 deletions pkg/bloombuild/planner/plannertest/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ import (
"time"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/loki/v3/pkg/compression"
v2 "github.com/grafana/loki/v3/pkg/iter/v2"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/index"
)

var TestDay = ParseDayTime("2023-09-01")
Expand Down Expand Up @@ -87,11 +89,23 @@ func GenBlock(ref bloomshipper.BlockRef) (bloomshipper.Block, error) {
}, nil
}

func GenSeries(bounds v1.FingerprintBounds) []*v1.Series {
func GenSeries(bounds v1.FingerprintBounds) []model.Fingerprint {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Should this be GenFingerprint/GenFignerPrintWithStep?

return GenSeriesWithStep(bounds, 1)
}

func GenSeriesWithStep(bounds v1.FingerprintBounds, step int) []*v1.Series {
func GenSeriesWithStep(bounds v1.FingerprintBounds, step int) []model.Fingerprint {
series := make([]model.Fingerprint, 0, int(bounds.Max-bounds.Min+1)/step)
for i := bounds.Min; i <= bounds.Max; i += model.Fingerprint(step) {
series = append(series, i)
}
return series
}

func GenV1Series(bounds v1.FingerprintBounds) []*v1.Series {
return GenV1SeriesWithStep(bounds, 1)
}

func GenV1SeriesWithStep(bounds v1.FingerprintBounds, step int) []*v1.Series {
series := make([]*v1.Series, 0, int(bounds.Max-bounds.Min+1)/step)
for i := bounds.Min; i <= bounds.Max; i += model.Fingerprint(step) {
series = append(series, &v1.Series{
Expand Down Expand Up @@ -139,3 +153,43 @@ func ParseDayTime(s string) config.DayTime {
Time: model.TimeFromUnix(t.Unix()),
}
}

type FakeForSeries struct {
series []*v1.Series
}

func NewFakeForSeries(series []*v1.Series) *FakeForSeries {
return &FakeForSeries{
series: series,
}
}

func (f FakeForSeries) ForSeries(_ context.Context, _ string, ff index.FingerprintFilter, _ model.Time, _ model.Time, fn func(labels.Labels, model.Fingerprint, []index.ChunkMeta) (stop bool), _ ...*labels.Matcher) error {
overlapping := make([]*v1.Series, 0, len(f.series))
for _, s := range f.series {
if ff.Match(s.Fingerprint) {
overlapping = append(overlapping, s)
}
}

for _, s := range overlapping {
chunks := make([]index.ChunkMeta, 0, len(s.Chunks))
for _, c := range s.Chunks {
chunks = append(chunks, index.ChunkMeta{
MinTime: int64(c.From),
MaxTime: int64(c.Through),
Checksum: c.Checksum,
KB: 100,
})
}

if fn(labels.EmptyLabels(), s.Fingerprint, chunks) {
break
}
}
return nil
}

func (f FakeForSeries) Close() error {
return nil
}
Loading
Loading