Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Revert "perf(bloom): Compute chunkrefs for series right before sendin…
…g task t…"

This reverts commit 66e6b1c.
  • Loading branch information
salvacorts authored Nov 8, 2024
commit f6029a0701b0f9ba284bdd0183857cfb368039ce
24 changes: 17 additions & 7 deletions pkg/bloombuild/common/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,8 @@ const (
gzipExtension = ".gz"
)

type ForSeries = sharding.ForSeries

type ClosableForSeries interface {
ForSeries
sharding.ForSeries
Close() error
}

Expand Down Expand Up @@ -126,9 +124,9 @@ func (b *BloomTSDBStore) LoadTSDB(
return idx, nil
}

func NewTSDBSeriesIter(ctx context.Context, user string, f sharding.ForSeries, bounds v1.FingerprintBounds) (iter.Iterator[model.Fingerprint], error) {
func NewTSDBSeriesIter(ctx context.Context, user string, f sharding.ForSeries, bounds v1.FingerprintBounds) (iter.Iterator[*v1.Series], error) {
// TODO(salvacorts): Create a pool
series := make([]model.Fingerprint, 0, 100)
series := make([]*v1.Series, 0, 100)

if err := f.ForSeries(
ctx,
Expand All @@ -140,7 +138,19 @@ func NewTSDBSeriesIter(ctx context.Context, user string, f sharding.ForSeries, b
case <-ctx.Done():
return true
default:
series = append(series, fp)
res := &v1.Series{
Fingerprint: fp,
Chunks: make(v1.ChunkRefs, 0, len(chks)),
}
for _, chk := range chks {
res.Chunks = append(res.Chunks, v1.ChunkRef{
From: model.Time(chk.MinTime),
Through: model.Time(chk.MaxTime),
Checksum: chk.Checksum,
})
}

series = append(series, res)
return false
}
},
Expand All @@ -151,7 +161,7 @@ func NewTSDBSeriesIter(ctx context.Context, user string, f sharding.ForSeries, b

select {
case <-ctx.Done():
return iter.NewEmptyIter[model.Fingerprint](), ctx.Err()
return iter.NewEmptyIter[*v1.Series](), ctx.Err()
default:
return iter.NewCancelableIter(ctx, iter.NewSliceIter(series)), nil
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/bloombuild/common/tsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@ func TestTSDBSeriesIter(t *testing.T) {
itr, err := NewTSDBSeriesIter(context.Background(), "", forSeriesTestImpl(input), v1.NewBounds(0, math.MaxUint64))
require.NoError(t, err)

v1.CompareIterators(
v1.EqualIterators(
t,
func(t *testing.T, a model.Fingerprint, b *v1.Series) {
require.Equal(t, a, b.Fingerprint)
func(a, b *v1.Series) {
require.Equal(t, a, b)
},
itr,
srcItr,
Expand Down
80 changes: 31 additions & 49 deletions pkg/bloombuild/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
"github.com/prometheus/common/model"
"go.uber.org/atomic"

"github.com/grafana/loki/v3/pkg/bloombuild/common"

Check failure on line 19 in pkg/bloombuild/planner/planner.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

could not import github.com/grafana/loki/v3/pkg/bloombuild/common (-: # github.com/grafana/loki/v3/pkg/bloombuild/common
"github.com/grafana/loki/v3/pkg/bloombuild/planner/strategies"
"github.com/grafana/loki/v3/pkg/bloombuild/protos"
iter "github.com/grafana/loki/v3/pkg/iter/v2"
Expand Down Expand Up @@ -227,10 +227,9 @@
}

var (
wg sync.WaitGroup
start = time.Now()
status = statusFailure
openTSDBs strategies.TSDBSet
wg sync.WaitGroup
start = time.Now()
status = statusFailure
)
defer func() {
p.metrics.buildCompleted.WithLabelValues(status).Inc()
Expand All @@ -239,15 +238,6 @@
if status == statusSuccess {
p.metrics.buildLastSuccess.SetToCurrentTime()
}

// Close all open TSDBs.
// These are used to get the chunkrefs for the series in the gaps.
// We populate the chunkrefs when we send the task to the builder.
for idx, reader := range openTSDBs {
if err := reader.Close(); err != nil {
level.Error(p.logger).Log("msg", "failed to close tsdb", "tsdb", idx.Name(), "err", err)
}
}
}()

p.metrics.buildStarted.Inc()
Expand Down Expand Up @@ -285,19 +275,7 @@
table: table,
}

tsdbs, err := p.tsdbStore.ResolveTSDBs(ctx, table, tenant)
if err != nil {
level.Error(logger).Log("msg", "failed to resolve tsdbs", "err", err)
continue
}

openTSDBs, err = openAllTSDBs(ctx, table, tenant, p.tsdbStore, tsdbs, openTSDBs)
if err != nil {
level.Error(logger).Log("msg", "failed to open all tsdbs", "err", err)
continue
}

tasks, existingMetas, err := p.computeTasks(ctx, table, tenant, openTSDBs)
tasks, existingMetas, err := p.computeTasks(ctx, table, tenant)
if err != nil {
level.Error(logger).Log("msg", "failed to compute tasks", "err", err)
continue
Expand All @@ -308,7 +286,7 @@

now := time.Now()
for _, task := range tasks {
queueTask := NewQueueTask(ctx, now, task, openTSDBs[task.TSDB], resultsCh)
queueTask := NewQueueTask(ctx, now, task, resultsCh)
if err := p.enqueueTask(queueTask); err != nil {
level.Error(logger).Log("msg", "error enqueuing task", "err", err)
continue
Expand Down Expand Up @@ -396,8 +374,7 @@
ctx context.Context,
table config.DayTable,
tenant string,
tsdbs strategies.TSDBSet,
) ([]*strategies.Task, []bloomshipper.Meta, error) {
) ([]*protos.Task, []bloomshipper.Meta, error) {
strategy, err := strategies.NewStrategy(tenant, p.limits, p.logger)
if err != nil {
return nil, nil, fmt.Errorf("error creating strategy: %w", err)
Expand Down Expand Up @@ -425,11 +402,29 @@
return nil, nil, fmt.Errorf("failed to delete outdated metas during planning: %w", err)
}

// Resolve TSDBs
tsdbs, err := p.tsdbStore.ResolveTSDBs(ctx, table, tenant)
if err != nil {
return nil, nil, fmt.Errorf("failed to resolve tsdbs: %w", err)
}

if len(tsdbs) == 0 {
return nil, metas, nil
}

tasks, err := strategy.Plan(ctx, table, tenant, tsdbs, metas)
openTSDBs, err := openAllTSDBs(ctx, table, tenant, p.tsdbStore, tsdbs)
if err != nil {
return nil, nil, fmt.Errorf("failed to open all tsdbs: %w", err)
}
defer func() {
for idx, reader := range openTSDBs {
if err := reader.Close(); err != nil {
level.Error(logger).Log("msg", "failed to close index", "err", err, "tsdb", idx.Name())
}
}
}()

tasks, err := strategy.Plan(ctx, table, tenant, openTSDBs, metas)
if err != nil {
return nil, nil, fmt.Errorf("failed to plan tasks: %w", err)
}
Expand Down Expand Up @@ -511,26 +506,18 @@
tenant string,
store common.TSDBStore,
tsdbs []tsdb.SingleTenantTSDBIdentifier,
alreadyOpen strategies.TSDBSet,
) (strategies.TSDBSet, error) {
if len(alreadyOpen) == 0 {
alreadyOpen = make(strategies.TSDBSet, len(tsdbs))
}

) (map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries, error) {
openTSDBs := make(map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries, len(tsdbs))
for _, idx := range tsdbs {
if _, ok := alreadyOpen[idx]; ok {
continue
}

reader, err := store.LoadTSDB(ctx, table, tenant, idx)
tsdb, err := store.LoadTSDB(ctx, table, tenant, idx)
if err != nil {
return nil, fmt.Errorf("failed to load tsdb: %w", err)
}

alreadyOpen[idx] = reader
openTSDBs[idx] = tsdb
}

return alreadyOpen, nil
return openTSDBs, nil
}

// deleteOutdatedMetasAndBlocks filters out the outdated metas from the `metas` argument and deletes them from the store.
Expand Down Expand Up @@ -860,13 +847,8 @@
builderID string,
task *QueueTask,
) (*protos.TaskResult, error) {
protoTask, err := task.ToProtoTask(builder.Context())
if err != nil {
return nil, fmt.Errorf("error converting task to proto task: %w", err)
}

msg := &protos.PlannerToBuilder{
Task: protoTask,
Task: task.ToProtoTask(),
}

if err := builder.Send(msg); err != nil {
Expand Down
11 changes: 1 addition & 10 deletions pkg/bloombuild/planner/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -713,21 +713,12 @@ func (f *fakeBuilder) Recv() (*protos.BuilderToPlanner, error) {
}

func createTasks(n int, resultsCh chan *protos.TaskResult) []*QueueTask {
forSeries := plannertest.NewFakeForSeries(plannertest.GenV1Series(v1.NewBounds(0, 100)))

tasks := make([]*QueueTask, 0, n)
// Enqueue tasks
for i := 0; i < n; i++ {
task := NewQueueTask(
context.Background(), time.Now(),
strategies.NewTask(
config.NewDayTable(plannertest.TestDay, "fake"),
"fakeTenant",
v1.NewBounds(0, 10),
plannertest.TsdbID(1),
nil,
),
forSeries,
protos.NewTask(config.NewDayTable(plannertest.TestDay, "fake"), "fakeTenant", v1.NewBounds(0, 10), plannertest.TsdbID(1), nil),
resultsCh,
)
tasks = append(tasks, task)
Expand Down
58 changes: 2 additions & 56 deletions pkg/bloombuild/planner/plannertest/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,13 @@ import (
"time"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/loki/v3/pkg/compression"
v2 "github.com/grafana/loki/v3/pkg/iter/v2"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/index"
)

var TestDay = ParseDayTime("2023-09-01")
Expand Down Expand Up @@ -89,23 +87,11 @@ func GenBlock(ref bloomshipper.BlockRef) (bloomshipper.Block, error) {
}, nil
}

func GenSeries(bounds v1.FingerprintBounds) []model.Fingerprint {
func GenSeries(bounds v1.FingerprintBounds) []*v1.Series {
return GenSeriesWithStep(bounds, 1)
}

func GenSeriesWithStep(bounds v1.FingerprintBounds, step int) []model.Fingerprint {
series := make([]model.Fingerprint, 0, int(bounds.Max-bounds.Min+1)/step)
for i := bounds.Min; i <= bounds.Max; i += model.Fingerprint(step) {
series = append(series, i)
}
return series
}

func GenV1Series(bounds v1.FingerprintBounds) []*v1.Series {
return GenV1SeriesWithStep(bounds, 1)
}

func GenV1SeriesWithStep(bounds v1.FingerprintBounds, step int) []*v1.Series {
func GenSeriesWithStep(bounds v1.FingerprintBounds, step int) []*v1.Series {
series := make([]*v1.Series, 0, int(bounds.Max-bounds.Min+1)/step)
for i := bounds.Min; i <= bounds.Max; i += model.Fingerprint(step) {
series = append(series, &v1.Series{
Expand Down Expand Up @@ -153,43 +139,3 @@ func ParseDayTime(s string) config.DayTime {
Time: model.TimeFromUnix(t.Unix()),
}
}

type FakeForSeries struct {
series []*v1.Series
}

func NewFakeForSeries(series []*v1.Series) *FakeForSeries {
return &FakeForSeries{
series: series,
}
}

func (f FakeForSeries) ForSeries(_ context.Context, _ string, ff index.FingerprintFilter, _ model.Time, _ model.Time, fn func(labels.Labels, model.Fingerprint, []index.ChunkMeta) (stop bool), _ ...*labels.Matcher) error {
overlapping := make([]*v1.Series, 0, len(f.series))
for _, s := range f.series {
if ff.Match(s.Fingerprint) {
overlapping = append(overlapping, s)
}
}

for _, s := range overlapping {
chunks := make([]index.ChunkMeta, 0, len(s.Chunks))
for _, c := range s.Chunks {
chunks = append(chunks, index.ChunkMeta{
MinTime: int64(c.From),
MaxTime: int64(c.Through),
Checksum: c.Checksum,
KB: 100,
})
}

if fn(labels.EmptyLabels(), s.Fingerprint, chunks) {
break
}
}
return nil
}

func (f FakeForSeries) Close() error {
return nil
}
Loading
Loading