Skip to content
3 changes: 1 addition & 2 deletions pkg/bloombuild/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
utillog "github.com/grafana/loki/v3/pkg/util/log"
"github.com/grafana/loki/v3/pkg/util/ring"
)
Expand Down Expand Up @@ -415,7 +414,6 @@ func (b *Builder) processTask(
Bounds: gap.Bounds,
},
},
Sources: []tsdb.SingleTenantTSDBIdentifier{task.TSDB},
}

// Fetch blocks that aren't up to date but are in the desired fingerprint range
Expand Down Expand Up @@ -492,6 +490,7 @@ func (b *Builder) processTask(
level.Debug(logger).Log("msg", "uploaded block", "progress_pct", fmt.Sprintf("%.2f", pct))

meta.Blocks = append(meta.Blocks, built.BlockRef)
meta.Sources = append(meta.Sources, task.TSDB)
}

if err := newBlocks.Err(); err != nil {
Expand Down
70 changes: 45 additions & 25 deletions pkg/bloomgateway/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package bloomgateway

import (
"context"
"slices"
"sort"
"time"

Expand Down Expand Up @@ -61,36 +62,55 @@ func (r *defaultBlockResolver) Resolve(ctx context.Context, tenant string, inter
}

func blocksMatchingSeries(metas []bloomshipper.Meta, interval bloomshipper.Interval, series []*logproto.GroupedChunkRefs) []blockWithSeries {
result := make([]blockWithSeries, 0, len(metas))

for _, meta := range metas {
for _, block := range meta.Blocks {
slices.SortFunc(series, func(a, b *logproto.GroupedChunkRefs) int { return int(a.Fingerprint - b.Fingerprint) })

// skip blocks that are not within time interval
if !interval.Overlaps(block.Interval()) {
continue
result := make([]blockWithSeries, 0, len(metas))
cache := make(map[bloomshipper.BlockRef]int)

// find the newest block for each series
for _, s := range series {
var b *bloomshipper.BlockRef
var newestTs time.Time

for i := range metas {
for j := range metas[i].Blocks {
block := metas[i].Blocks[j]
// To keep backwards compatibility, we can only look at the source at index 0
// because in the past the slice had always length 1, see
// https://github.com/grafana/loki/blob/b4060154d198e17bef8ba0fbb1c99bb5c93a412d/pkg/bloombuild/builder/builder.go#L418
sourceTs := metas[i].Sources[0].TS
// Newer metas have len(Sources) == len(Blocks)
if len(metas[i].Sources) > j {
sourceTs = metas[i].Sources[j].TS
}
// skip blocks that are not within time interval
if !interval.Overlaps(block.Interval()) {
continue
}
// skip blocks that do not contain the series
if block.Cmp(s.Fingerprint) != v1.Overlap {
continue
}
// only use the block if it is newer than the previous
if sourceTs.After(newestTs) {
b = &block
newestTs = sourceTs
}
}
}

min := sort.Search(len(series), func(i int) bool {
return block.Cmp(series[i].Fingerprint) > v1.Before
})

max := sort.Search(len(series), func(i int) bool {
return block.Cmp(series[i].Fingerprint) == v1.After
})

// All fingerprints fall outside of the consumer's range
if min == len(series) || max == 0 || min == max {
continue
}
if b == nil {
continue
}

// At least one fingerprint is within bounds of the blocks
// so append to results
dst := make([]*logproto.GroupedChunkRefs, max-min)
_ = copy(dst, series[min:max])
idx, ok := cache[*b]
if ok {
result[idx].series = append(result[idx].series, s)
} else {
cache[*b] = len(result)
result = append(result, blockWithSeries{
block: block,
series: dst,
block: *b,
series: []*logproto.GroupedChunkRefs{s},
})
}
}
Expand Down
17 changes: 14 additions & 3 deletions pkg/bloomgateway/resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/grafana/loki/v3/pkg/logproto"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
)

func makeBlockRef(minFp, maxFp model.Fingerprint, from, through model.Time) bloomshipper.BlockRef {
Expand All @@ -28,6 +29,9 @@ func makeMeta(minFp, maxFp model.Fingerprint, from, through model.Time) bloomshi
Blocks: []bloomshipper.BlockRef{
makeBlockRef(minFp, maxFp, from, through),
},
Sources: []tsdb.SingleTenantTSDBIdentifier{
{TS: through.Time()},
},
}
}

Expand Down Expand Up @@ -100,14 +104,21 @@ func TestBlockResolver_BlocksMatchingSeries(t *testing.T) {

t.Run("multiple overlapping blocks within time range covering full keyspace", func(t *testing.T) {
metas := []bloomshipper.Meta{
makeMeta(0x00, 0xdf, 1000, 1999),
makeMeta(0xc0, 0xff, 1000, 1999),
// 2 series overlap
makeMeta(0x00, 0xdf, 1000, 1499), // "old" meta covers first 4 series
makeMeta(0xc0, 0xff, 1500, 1999), // "new" meta covers last 4 series
}
res := blocksMatchingSeries(metas, interval, series)
for i := range res {
t.Logf("%s", res[i].block)
for j := range res[i].series {
t.Logf(" %016x", res[i].series[j].Fingerprint)
}
}
Comment on lines +112 to +117
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mainly for debugging

expected := []blockWithSeries{
{
block: metas[0].Blocks[0],
series: series[0:4],
series: series[0:2], // series 0x00c0 and 0x00d0 are covered in the newer block
},
{
block: metas[1].Blocks[0],
Expand Down
Loading