Skip to content

perf(bloom): Compute chunkrefs for series right before sending task to builder #14808

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Nov 7, 2024
Prev Previous commit
Next Next commit
ToProto implemented in planner.Task
  • Loading branch information
salvacorts committed Nov 7, 2024
commit 0d1e61069eec810fa0601d6b354c571db4323c0d
73 changes: 44 additions & 29 deletions pkg/bloombuild/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,9 +227,10 @@ func (p *Planner) runOne(ctx context.Context) error {
}

var (
wg sync.WaitGroup
start = time.Now()
status = statusFailure
wg sync.WaitGroup
start = time.Now()
status = statusFailure
openTSDBs strategies.TSDBSet
)
defer func() {
p.metrics.buildCompleted.WithLabelValues(status).Inc()
Expand All @@ -238,6 +239,15 @@ func (p *Planner) runOne(ctx context.Context) error {
if status == statusSuccess {
p.metrics.buildLastSuccess.SetToCurrentTime()
}

// Close all open TSDBs
// These are used to get the chunkrefs for the series in the gaps.
// We populate the chunkrefs when we send the task to the builder.
for idx, reader := range openTSDBs {
if err := reader.Close(); err != nil {
level.Error(p.logger).Log("msg", "failed to close tsdb", "tsdb", idx.Name(), "err", err)
}
}
}()

p.metrics.buildStarted.Inc()
Expand Down Expand Up @@ -275,7 +285,21 @@ func (p *Planner) runOne(ctx context.Context) error {
table: table,
}

tasks, existingMetas, err := p.computeTasks(ctx, table, tenant)
// Resolve TSDBs
tsdbs, err := p.tsdbStore.ResolveTSDBs(ctx, table, tenant)
if err != nil {
level.Error(logger).Log("msg", "failed to resolve tsdbs", "err", err)
continue
}

// Open new TSDBs
openTSDBs, err = openAllTSDBs(ctx, table, tenant, p.tsdbStore, tsdbs, openTSDBs)
if err != nil {
level.Error(logger).Log("msg", "failed to open all tsdbs", "err", err)
continue
}

tasks, existingMetas, err := p.computeTasks(ctx, table, tenant, openTSDBs)
if err != nil {
level.Error(logger).Log("msg", "failed to compute tasks", "err", err)
continue
Expand All @@ -286,7 +310,7 @@ func (p *Planner) runOne(ctx context.Context) error {

now := time.Now()
for _, task := range tasks {
queueTask := NewQueueTask(ctx, now, task, resultsCh)
queueTask := NewQueueTask(ctx, now, task, openTSDBs[task.TSDB], resultsCh)
if err := p.enqueueTask(queueTask); err != nil {
level.Error(logger).Log("msg", "error enqueuing task", "err", err)
continue
Expand Down Expand Up @@ -374,6 +398,7 @@ func (p *Planner) computeTasks(
ctx context.Context,
table config.DayTable,
tenant string,
tsdbs strategies.TSDBSet,
) ([]*strategies.Task, []bloomshipper.Meta, error) {
strategy, err := strategies.NewStrategy(tenant, p.limits, p.logger)
if err != nil {
Expand Down Expand Up @@ -402,29 +427,11 @@ func (p *Planner) computeTasks(
return nil, nil, fmt.Errorf("failed to delete outdated metas during planning: %w", err)
}

// Resolve TSDBs
tsdbs, err := p.tsdbStore.ResolveTSDBs(ctx, table, tenant)
if err != nil {
return nil, nil, fmt.Errorf("failed to resolve tsdbs: %w", err)
}

if len(tsdbs) == 0 {
return nil, metas, nil
}

openTSDBs, err := openAllTSDBs(ctx, table, tenant, p.tsdbStore, tsdbs)
if err != nil {
return nil, nil, fmt.Errorf("failed to open all tsdbs: %w", err)
}
defer func() {
for idx, reader := range openTSDBs {
if err := reader.Close(); err != nil {
level.Error(logger).Log("msg", "failed to close index", "err", err, "tsdb", idx.Name())
}
}
}()

tasks, err := strategy.Plan(ctx, table, tenant, openTSDBs, metas)
tasks, err := strategy.Plan(ctx, table, tenant, tsdbs, metas)
if err != nil {
return nil, nil, fmt.Errorf("failed to plan tasks: %w", err)
}
Expand Down Expand Up @@ -506,18 +513,26 @@ func openAllTSDBs(
tenant string,
store common.TSDBStore,
tsdbs []tsdb.SingleTenantTSDBIdentifier,
) (map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries, error) {
openTSDBs := make(map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries, len(tsdbs))
alreadyOpen strategies.TSDBSet,
) (strategies.TSDBSet, error) {
if len(alreadyOpen) == 0 {
alreadyOpen = make(strategies.TSDBSet, len(tsdbs))
}

for _, idx := range tsdbs {
tsdb, err := store.LoadTSDB(ctx, table, tenant, idx)
if _, ok := alreadyOpen[idx]; ok {
continue
}
Comment on lines +521 to +523
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch :)


reader, err := store.LoadTSDB(ctx, table, tenant, idx)
if err != nil {
return nil, fmt.Errorf("failed to load tsdb: %w", err)
}

openTSDBs[idx] = tsdb
alreadyOpen[idx] = reader
}

return openTSDBs, nil
return alreadyOpen, nil
}

// deleteOutdatedMetasAndBlocks filters out the outdated metas from the `metas` argument and deletes them from the store.
Expand Down
2 changes: 1 addition & 1 deletion pkg/bloombuild/planner/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -725,9 +725,9 @@ func createTasks(n int, resultsCh chan *protos.TaskResult) []*QueueTask {
"fakeTenant",
v1.NewBounds(0, 10),
plannertest.TsdbID(1),
forSeries,
nil,
),
forSeries,
resultsCh,
)
tasks = append(tasks, task)
Expand Down
2 changes: 1 addition & 1 deletion pkg/bloombuild/planner/strategies/chunksize.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (s *ChunkSizeStrategy) Plan(
Blocks: blocks,
}

tasks = append(tasks, NewTask(table, tenant, bounds, batch.TSDB(), tsdbs[batch.TSDB()], []Gap{planGap}))
tasks = append(tasks, NewTask(table, tenant, bounds, batch.TSDB(), []Gap{planGap}))
}
if err := sizedIter.Err(); err != nil {
return nil, fmt.Errorf("failed to iterate over sized series: %w", err)
Expand Down
37 changes: 18 additions & 19 deletions pkg/bloombuild/planner/strategies/chunksize_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,14 @@ import (
"github.com/go-kit/log"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/v3/pkg/bloombuild/common"
"github.com/grafana/loki/v3/pkg/bloombuild/planner/plannertest"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
)

func taskForGap(tsdb tsdb.SingleTenantTSDBIdentifier, forSeries common.ClosableForSeries, bounds v1.FingerprintBounds, blocks []bloomshipper.BlockRef) *Task {
return NewTask(plannertest.TestTable, "fake", bounds, tsdb, forSeries, []Gap{
func taskForGap(tsdb tsdb.SingleTenantTSDBIdentifier, bounds v1.FingerprintBounds, blocks []bloomshipper.BlockRef) *Task {
return NewTask(plannertest.TestTable, "fake", bounds, tsdb, []Gap{
{
Bounds: bounds,
Series: plannertest.GenSeriesWithStep(bounds, 10),
Expand Down Expand Up @@ -45,12 +44,12 @@ func Test_ChunkSizeStrategy_Plan(t *testing.T) {

// We expect 5 tasks, each with 2 series each
expectedTasks: []*Task{
taskForGap(plannertest.TsdbID(0), forSeries, v1.NewBounds(0, 10), nil),
taskForGap(plannertest.TsdbID(0), forSeries, v1.NewBounds(20, 30), nil),
taskForGap(plannertest.TsdbID(0), forSeries, v1.NewBounds(40, 50), nil),
taskForGap(plannertest.TsdbID(0), forSeries, v1.NewBounds(60, 70), nil),
taskForGap(plannertest.TsdbID(0), forSeries, v1.NewBounds(80, 90), nil),
taskForGap(plannertest.TsdbID(0), forSeries, v1.NewBounds(100, 100), nil),
taskForGap(plannertest.TsdbID(0), v1.NewBounds(0, 10), nil),
taskForGap(plannertest.TsdbID(0), v1.NewBounds(20, 30), nil),
taskForGap(plannertest.TsdbID(0), v1.NewBounds(40, 50), nil),
taskForGap(plannertest.TsdbID(0), v1.NewBounds(60, 70), nil),
taskForGap(plannertest.TsdbID(0), v1.NewBounds(80, 90), nil),
taskForGap(plannertest.TsdbID(0), v1.NewBounds(100, 100), nil),
},
},
{
Expand Down Expand Up @@ -128,7 +127,7 @@ func Test_ChunkSizeStrategy_Plan(t *testing.T) {

// We expect 1 tasks for the missing series
expectedTasks: []*Task{
taskForGap(plannertest.TsdbID(0), forSeries, v1.NewBounds(20, 30), nil),
taskForGap(plannertest.TsdbID(0), v1.NewBounds(20, 30), nil),
},
},
{
Expand Down Expand Up @@ -157,27 +156,27 @@ func Test_ChunkSizeStrategy_Plan(t *testing.T) {

// We expect 5 tasks, each with 2 series each
expectedTasks: []*Task{
taskForGap(plannertest.TsdbID(1), forSeries, v1.NewBounds(0, 10), []bloomshipper.BlockRef{
taskForGap(plannertest.TsdbID(1), v1.NewBounds(0, 10), []bloomshipper.BlockRef{
plannertest.GenBlockRef(0, 0),
plannertest.GenBlockRef(10, 10),
}),
taskForGap(plannertest.TsdbID(1), forSeries, v1.NewBounds(20, 30), []bloomshipper.BlockRef{
taskForGap(plannertest.TsdbID(1), v1.NewBounds(20, 30), []bloomshipper.BlockRef{
plannertest.GenBlockRef(20, 20),
plannertest.GenBlockRef(30, 30),
}),
taskForGap(plannertest.TsdbID(1), forSeries, v1.NewBounds(40, 50), []bloomshipper.BlockRef{
taskForGap(plannertest.TsdbID(1), v1.NewBounds(40, 50), []bloomshipper.BlockRef{
plannertest.GenBlockRef(40, 40),
plannertest.GenBlockRef(50, 50),
}),
taskForGap(plannertest.TsdbID(1), forSeries, v1.NewBounds(60, 70), []bloomshipper.BlockRef{
taskForGap(plannertest.TsdbID(1), v1.NewBounds(60, 70), []bloomshipper.BlockRef{
plannertest.GenBlockRef(60, 60),
plannertest.GenBlockRef(70, 70),
}),
taskForGap(plannertest.TsdbID(1), forSeries, v1.NewBounds(80, 90), []bloomshipper.BlockRef{
taskForGap(plannertest.TsdbID(1), v1.NewBounds(80, 90), []bloomshipper.BlockRef{
plannertest.GenBlockRef(80, 80),
plannertest.GenBlockRef(90, 90),
}),
taskForGap(plannertest.TsdbID(1), forSeries, v1.NewBounds(100, 100), []bloomshipper.BlockRef{
taskForGap(plannertest.TsdbID(1), v1.NewBounds(100, 100), []bloomshipper.BlockRef{
plannertest.GenBlockRef(100, 100),
}),
},
Expand Down Expand Up @@ -212,15 +211,15 @@ func Test_ChunkSizeStrategy_Plan(t *testing.T) {

// We expect 5 tasks, each with 2 series each
expectedTasks: []*Task{
taskForGap(plannertest.TsdbID(1), forSeries, v1.NewBounds(0, 10), []bloomshipper.BlockRef{
taskForGap(plannertest.TsdbID(1), v1.NewBounds(0, 10), []bloomshipper.BlockRef{
plannertest.GenBlockRef(0, 0),
plannertest.GenBlockRef(10, 10),
}),
taskForGap(plannertest.TsdbID(1), forSeries, v1.NewBounds(20, 30), []bloomshipper.BlockRef{
taskForGap(plannertest.TsdbID(1), v1.NewBounds(20, 30), []bloomshipper.BlockRef{
plannertest.GenBlockRef(20, 20),
plannertest.GenBlockRef(30, 30),
}),
taskForGap(plannertest.TsdbID(1), forSeries, v1.NewBounds(40, 40), []bloomshipper.BlockRef{
taskForGap(plannertest.TsdbID(1), v1.NewBounds(40, 40), []bloomshipper.BlockRef{
plannertest.GenBlockRef(40, 40),
}),
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/bloombuild/planner/strategies/splitkeyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (s *SplitKeyspaceStrategy) Plan(
}

for _, gap := range gaps {
tasks = append(tasks, NewTask(table, tenant, ownershipRange, gap.tsdb, tsdbs[gap.tsdb], gap.gaps))
tasks = append(tasks, NewTask(table, tenant, ownershipRange, gap.tsdb, gap.gaps))
}
}

Expand Down
98 changes: 2 additions & 96 deletions pkg/bloombuild/planner/strategies/task.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,14 @@
package strategies

import (
"context"
"fmt"
"math"
"slices"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/loki/v3/pkg/bloombuild/common"
"github.com/grafana/loki/v3/pkg/bloombuild/protos"
"github.com/grafana/loki/v3/pkg/logproto"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/index"
)

type Gap struct {
Expand All @@ -31,8 +23,7 @@ type Task struct {
Table config.DayTable
Tenant string
OwnershipBounds v1.FingerprintBounds
tsdbIdentifier tsdb.SingleTenantTSDBIdentifier
forSeries common.ClosableForSeries
TSDB tsdb.SingleTenantTSDBIdentifier
Gaps []Gap
}

Expand All @@ -41,99 +32,14 @@ func NewTask(
tenant string,
bounds v1.FingerprintBounds,
tsdb tsdb.SingleTenantTSDBIdentifier,
forSeries common.ClosableForSeries,
gaps []Gap,
) *Task {
return &Task{
ID: fmt.Sprintf("%s-%s-%s-%d", table.Addr(), tenant, bounds.String(), len(gaps)),
Table: table,
Tenant: tenant,
OwnershipBounds: bounds,
tsdbIdentifier: tsdb,
forSeries: forSeries,
TSDB: tsdb,
Gaps: gaps,
}
}

// TODO: move to planner.Task and pass forSeries comming from the planner.
// ToProtoTask converts a Task to a ProtoTask.
// It will use the opened TSDB to get the chunks for the series in the gaps.
func (t *Task) ToProtoTask(ctx context.Context) (*protos.ProtoTask, error) {
if t == nil {
return nil, nil
}

protoGaps := make([]*protos.ProtoGapWithBlocks, 0, len(t.Gaps))
for _, gap := range t.Gaps {
blockRefs := make([]string, 0, len(gap.Blocks))
for _, block := range gap.Blocks {
blockRefs = append(blockRefs, block.String())
}

if !slices.IsSorted(gap.Series) {
slices.Sort(gap.Series)
}

series := make([]*protos.ProtoSeries, 0, len(gap.Series))
if err := t.forSeries.ForSeries(
ctx,
t.Tenant,
gap.Bounds,
0, math.MaxInt64,
func(_ labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) (stop bool) {
select {
case <-ctx.Done():
return true
default:
// Skip this series if it's not in the gap.
// Series are sorted, so we can break early.
if _, found := slices.BinarySearch(gap.Series, fp); !found {
return false
}

chunks := make([]*logproto.ShortRef, 0, len(chks))
for _, chk := range chks {
chunks = append(chunks, &logproto.ShortRef{
From: model.Time(chk.MinTime),
Through: model.Time(chk.MaxTime),
Checksum: chk.Checksum,
})
}

series = append(series, &protos.ProtoSeries{
Fingerprint: uint64(fp),
Chunks: chunks,
})
return false
}
},
labels.MustNewMatcher(labels.MatchEqual, "", ""),
); err != nil {
return nil, fmt.Errorf("failed to load series from TSDB for gap (%s): %w", gap.Bounds.String(), err)
}

protoGaps = append(protoGaps, &protos.ProtoGapWithBlocks{
Bounds: protos.ProtoFingerprintBounds{
Min: gap.Bounds.Min,
Max: gap.Bounds.Max,
},
Series: series,
BlockRef: blockRefs,
})
}

return &protos.ProtoTask{
Id: t.ID,
Table: protos.DayTable{
DayTimestampMS: int64(t.Table.Time),
Prefix: t.Table.Prefix,
},
Tenant: t.Tenant,
Bounds: protos.ProtoFingerprintBounds{
Min: t.OwnershipBounds.Min,
Max: t.OwnershipBounds.Max,
},
Tsdb: t.tsdbIdentifier.Path(),
Gaps: protoGaps,
}, nil
}
Loading
Loading