Skip to content

perf(bloom): Compute chunkrefs for series right before sending task to builder #14808

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Nov 7, 2024
Next Next commit
Do not store chunkrefs in queue task. Defer populating chunks to just…
… before sending to builder
  • Loading branch information
salvacorts committed Nov 7, 2024
commit 2ac8f3f17ebbe9f41cd6b04bf1a720eb0b4356ad
20 changes: 4 additions & 16 deletions pkg/bloombuild/common/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,9 @@ func (b *BloomTSDBStore) LoadTSDB(
return idx, nil
}

func NewTSDBSeriesIter(ctx context.Context, user string, f sharding.ForSeries, bounds v1.FingerprintBounds) (iter.Iterator[*v1.Series], error) {
func NewTSDBSeriesIter(ctx context.Context, user string, f sharding.ForSeries, bounds v1.FingerprintBounds) (iter.Iterator[model.Fingerprint], error) {
// TODO(salvacorts): Create a pool
series := make([]*v1.Series, 0, 100)
series := make([]model.Fingerprint, 0, 100)

if err := f.ForSeries(
ctx,
Expand All @@ -138,19 +138,7 @@ func NewTSDBSeriesIter(ctx context.Context, user string, f sharding.ForSeries, b
case <-ctx.Done():
return true
default:
res := &v1.Series{
Fingerprint: fp,
Chunks: make(v1.ChunkRefs, 0, len(chks)),
}
for _, chk := range chks {
res.Chunks = append(res.Chunks, v1.ChunkRef{
From: model.Time(chk.MinTime),
Through: model.Time(chk.MaxTime),
Checksum: chk.Checksum,
})
}

series = append(series, res)
series = append(series, fp)
return false
}
},
Expand All @@ -161,7 +149,7 @@ func NewTSDBSeriesIter(ctx context.Context, user string, f sharding.ForSeries, b

select {
case <-ctx.Done():
return iter.NewEmptyIter[*v1.Series](), ctx.Err()
return iter.NewEmptyIter[model.Fingerprint](), ctx.Err()
default:
return iter.NewCancelableIter(ctx, iter.NewSliceIter(series)), nil
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/bloombuild/common/tsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@ func TestTSDBSeriesIter(t *testing.T) {
itr, err := NewTSDBSeriesIter(context.Background(), "", forSeriesTestImpl(input), v1.NewBounds(0, math.MaxUint64))
require.NoError(t, err)

v1.EqualIterators(
v1.CompareIterators(
t,
func(a, b *v1.Series) {
require.Equal(t, a, b)
func(t *testing.T, a model.Fingerprint, b *v1.Series) {
require.Equal(t, a, b.Fingerprint)
},
itr,
srcItr,
Expand Down
9 changes: 7 additions & 2 deletions pkg/bloombuild/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ func (p *Planner) computeTasks(
ctx context.Context,
table config.DayTable,
tenant string,
) ([]*protos.Task, []bloomshipper.Meta, error) {
) ([]*strategies.Task, []bloomshipper.Meta, error) {
strategy, err := strategies.NewStrategy(tenant, p.limits, p.logger)
if err != nil {
return nil, nil, fmt.Errorf("error creating strategy: %w", err)
Expand Down Expand Up @@ -847,8 +847,13 @@ func (p *Planner) forwardTaskToBuilder(
builderID string,
task *QueueTask,
) (*protos.TaskResult, error) {
protoTask, err := task.ToProtoTask(builder.Context())
if err != nil {
return nil, fmt.Errorf("error converting task to proto task: %w", err)
}

msg := &protos.PlannerToBuilder{
Task: task.ToProtoTask(),
Task: protoTask,
}

if err := builder.Send(msg); err != nil {
Expand Down
11 changes: 10 additions & 1 deletion pkg/bloombuild/planner/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -713,12 +713,21 @@ func (f *fakeBuilder) Recv() (*protos.BuilderToPlanner, error) {
}

func createTasks(n int, resultsCh chan *protos.TaskResult) []*QueueTask {
forSeries := plannertest.NewFakeForSeries(plannertest.GenV1Series(v1.NewBounds(0, 100)))

tasks := make([]*QueueTask, 0, n)
// Enqueue tasks
for i := 0; i < n; i++ {
task := NewQueueTask(
context.Background(), time.Now(),
protos.NewTask(config.NewDayTable(plannertest.TestDay, "fake"), "fakeTenant", v1.NewBounds(0, 10), plannertest.TsdbID(1), nil),
strategies.NewTask(
config.NewDayTable(plannertest.TestDay, "fake"),
"fakeTenant",
v1.NewBounds(0, 10),
plannertest.TsdbID(1),
forSeries,
nil,
),
resultsCh,
)
tasks = append(tasks, task)
Expand Down
58 changes: 56 additions & 2 deletions pkg/bloombuild/planner/plannertest/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ import (
"time"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/loki/v3/pkg/compression"
v2 "github.com/grafana/loki/v3/pkg/iter/v2"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/index"
)

var TestDay = ParseDayTime("2023-09-01")
Expand Down Expand Up @@ -87,11 +89,23 @@ func GenBlock(ref bloomshipper.BlockRef) (bloomshipper.Block, error) {
}, nil
}

func GenSeries(bounds v1.FingerprintBounds) []*v1.Series {
func GenSeries(bounds v1.FingerprintBounds) []model.Fingerprint {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Should this be GenFingerprint/GenFignerPrintWithStep?

return GenSeriesWithStep(bounds, 1)
}

func GenSeriesWithStep(bounds v1.FingerprintBounds, step int) []*v1.Series {
func GenSeriesWithStep(bounds v1.FingerprintBounds, step int) []model.Fingerprint {
series := make([]model.Fingerprint, 0, int(bounds.Max-bounds.Min+1)/step)
for i := bounds.Min; i <= bounds.Max; i += model.Fingerprint(step) {
series = append(series, i)
}
return series
}

func GenV1Series(bounds v1.FingerprintBounds) []*v1.Series {
return GenV1SeriesWithStep(bounds, 1)
}

func GenV1SeriesWithStep(bounds v1.FingerprintBounds, step int) []*v1.Series {
series := make([]*v1.Series, 0, int(bounds.Max-bounds.Min+1)/step)
for i := bounds.Min; i <= bounds.Max; i += model.Fingerprint(step) {
series = append(series, &v1.Series{
Expand Down Expand Up @@ -139,3 +153,43 @@ func ParseDayTime(s string) config.DayTime {
Time: model.TimeFromUnix(t.Unix()),
}
}

type FakeForSeries struct {
series []*v1.Series
}

func NewFakeForSeries(series []*v1.Series) *FakeForSeries {
return &FakeForSeries{
series: series,
}
}

func (f FakeForSeries) ForSeries(_ context.Context, _ string, ff index.FingerprintFilter, _ model.Time, _ model.Time, fn func(labels.Labels, model.Fingerprint, []index.ChunkMeta) (stop bool), _ ...*labels.Matcher) error {
overlapping := make([]*v1.Series, 0, len(f.series))
for _, s := range f.series {
if ff.Match(s.Fingerprint) {
overlapping = append(overlapping, s)
}
}

for _, s := range overlapping {
chunks := make([]index.ChunkMeta, 0, len(s.Chunks))
for _, c := range s.Chunks {
chunks = append(chunks, index.ChunkMeta{
MinTime: int64(c.From),
MaxTime: int64(c.Through),
Checksum: c.Checksum,
KB: 100,
})
}

if fn(labels.EmptyLabels(), s.Fingerprint, chunks) {
break
}
}
return nil
}

func (f FakeForSeries) Close() error {
return nil
}
82 changes: 26 additions & 56 deletions pkg/bloombuild/planner/strategies/chunksize.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/loki/v3/pkg/bloombuild/protos"
iter "github.com/grafana/loki/v3/pkg/iter/v2"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/config"
Expand Down Expand Up @@ -50,7 +49,7 @@ func (s *ChunkSizeStrategy) Plan(
tenant string,
tsdbs TSDBSet,
metas []bloomshipper.Meta,
) ([]*protos.Task, error) {
) ([]*Task, error) {
targetTaskSize := s.limits.BloomTaskTargetSeriesChunksSizeBytes(tenant)

logger := log.With(s.logger, "table", table.Addr(), "tenant", tenant)
Expand All @@ -73,29 +72,29 @@ func (s *ChunkSizeStrategy) Plan(
return nil, fmt.Errorf("failed to get sized series iter: %w", err)
}

tasks := make([]*protos.Task, 0, iterSize)
tasks := make([]*Task, 0, iterSize)
for sizedIter.Next() {
series := sizedIter.At()
if series.Len() == 0 {
batch := sizedIter.At()
if batch.Len() == 0 {
// This should never happen, but just in case.
level.Warn(logger).Log("msg", "got empty series batch", "tsdb", series.TSDB().Name())
level.Warn(logger).Log("msg", "got empty series batch", "tsdb", batch.TSDB().Name())
continue
}

bounds := series.Bounds()
bounds := batch.Bounds()

blocks, err := getBlocksMatchingBounds(metas, bounds)
if err != nil {
return nil, fmt.Errorf("failed to get blocks matching bounds: %w", err)
}

planGap := protos.Gap{
planGap := Gap{
Bounds: bounds,
Series: series.V1Series(),
Series: batch.series,
Blocks: blocks,
}

tasks = append(tasks, protos.NewTask(table, tenant, bounds, series.TSDB(), []protos.Gap{planGap}))
tasks = append(tasks, NewTask(table, tenant, bounds, batch.TSDB(), tsdbs[batch.TSDB()], []Gap{planGap}))
}
if err := sizedIter.Err(); err != nil {
return nil, fmt.Errorf("failed to iterate over sized series: %w", err)
Expand Down Expand Up @@ -155,20 +154,16 @@ func getBlocksMatchingBounds(metas []bloomshipper.Meta, bounds v1.FingerprintBou
return deduped, nil
}

type seriesWithChunks struct {
tsdb tsdb.SingleTenantTSDBIdentifier
fp model.Fingerprint
chunks []index.ChunkMeta
}

type seriesBatch struct {
series []seriesWithChunks
tsdb tsdb.SingleTenantTSDBIdentifier
series []model.Fingerprint
size uint64
}

func newSeriesBatch() seriesBatch {
func newSeriesBatch(tsdb tsdb.SingleTenantTSDBIdentifier) seriesBatch {
return seriesBatch{
series: make([]seriesWithChunks, 0, 100),
tsdb: tsdb,
series: make([]model.Fingerprint, 0, 100),
}
}

Expand All @@ -179,32 +174,11 @@ func (b *seriesBatch) Bounds() v1.FingerprintBounds {

// We assume that the series are sorted by fingerprint.
// This is guaranteed since series are iterated in order by the TSDB.
return v1.NewBounds(b.series[0].fp, b.series[len(b.series)-1].fp)
}

func (b *seriesBatch) V1Series() []*v1.Series {
series := make([]*v1.Series, 0, len(b.series))
for _, s := range b.series {
res := &v1.Series{
Fingerprint: s.fp,
Chunks: make(v1.ChunkRefs, 0, len(s.chunks)),
}
for _, chk := range s.chunks {
res.Chunks = append(res.Chunks, v1.ChunkRef{
From: model.Time(chk.MinTime),
Through: model.Time(chk.MaxTime),
Checksum: chk.Checksum,
})
}

series = append(series, res)
}

return series
return v1.NewBounds(b.series[0], b.series[len(b.series)-1])
}

func (b *seriesBatch) Append(s seriesWithChunks, size uint64) {
b.series = append(b.series, s)
func (b *seriesBatch) Append(series model.Fingerprint, size uint64) {
b.series = append(b.series, series)
b.size += size
}

Expand All @@ -217,10 +191,7 @@ func (b *seriesBatch) Size() uint64 {
}

func (b *seriesBatch) TSDB() tsdb.SingleTenantTSDBIdentifier {
if len(b.series) == 0 {
return tsdb.SingleTenantTSDBIdentifier{}
}
return b.series[0].tsdb
return b.tsdb
}

func (s *ChunkSizeStrategy) sizedSeriesIter(
Expand All @@ -230,9 +201,12 @@ func (s *ChunkSizeStrategy) sizedSeriesIter(
targetTaskSizeBytes uint64,
) (iter.Iterator[seriesBatch], int, error) {
batches := make([]seriesBatch, 0, 100)
currentBatch := newSeriesBatch()
var currentBatch seriesBatch

for _, idx := range tsdbsWithGaps {
// We cut a new batch for each TSDB.
currentBatch = newSeriesBatch(idx.tsdbIdentifier)

for _, gap := range idx.gaps {
if err := idx.tsdb.ForSeries(
ctx,
Expand All @@ -253,14 +227,10 @@ func (s *ChunkSizeStrategy) sizedSeriesIter(
// AND Adding this series to the batch would exceed the target task size.
if currentBatch.Len() > 0 && currentBatch.Size()+seriesSize > targetTaskSizeBytes {
batches = append(batches, currentBatch)
currentBatch = newSeriesBatch()
currentBatch = newSeriesBatch(idx.tsdbIdentifier)
}

currentBatch.Append(seriesWithChunks{
tsdb: idx.tsdbIdentifier,
fp: fp,
chunks: chks,
}, seriesSize)
currentBatch.Append(fp, seriesSize)
return false
}
},
Expand All @@ -269,10 +239,10 @@ func (s *ChunkSizeStrategy) sizedSeriesIter(
return nil, 0, err
}

// Add the last batch for this TSDB if it's not empty.
// Add the last batch for this gap if it's not empty.
if currentBatch.Len() > 0 {
batches = append(batches, currentBatch)
currentBatch = newSeriesBatch()
currentBatch = newSeriesBatch(idx.tsdbIdentifier)
}
}
}
Expand Down
Loading
Loading