Skip to content

Commit 9d6870f

Browse files
committed
Wrap ToProtoTask
1 parent 4ace777 commit 9d6870f

File tree

4 files changed

+125
-87
lines changed

4 files changed

+125
-87
lines changed

‎pkg/bloombuild/planner/planner.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,7 @@ func (p *Planner) runOne(ctx context.Context) error {
240240
p.metrics.buildLastSuccess.SetToCurrentTime()
241241
}
242242

243-
// Close all open TSDBs
243+
// Close all open TSDBs.
244244
// These are used to get the chunkrefs for the series in the gaps.
245245
// We populate the chunkrefs when we send the task to the builder.
246246
for idx, reader := range openTSDBs {
@@ -285,14 +285,12 @@ func (p *Planner) runOne(ctx context.Context) error {
285285
table: table,
286286
}
287287

288-
// Resolve TSDBs
289288
tsdbs, err := p.tsdbStore.ResolveTSDBs(ctx, table, tenant)
290289
if err != nil {
291290
level.Error(logger).Log("msg", "failed to resolve tsdbs", "err", err)
292291
continue
293292
}
294293

295-
// Open new TSDBs
296294
openTSDBs, err = openAllTSDBs(ctx, table, tenant, p.tsdbStore, tsdbs, openTSDBs)
297295
if err != nil {
298296
level.Error(logger).Log("msg", "failed to open all tsdbs", "err", err)

‎pkg/bloombuild/planner/strategies/task.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,21 @@
11
package strategies
22

33
import (
4+
"context"
5+
"fmt"
6+
"math"
7+
"slices"
8+
49
"github.com/prometheus/common/model"
10+
"github.com/prometheus/prometheus/model/labels"
511

612
"github.com/grafana/loki/v3/pkg/bloombuild/protos"
713
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
814
"github.com/grafana/loki/v3/pkg/storage/config"
915
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
1016
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
17+
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/index"
18+
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/sharding"
1119
)
1220

1321
type Gap struct {
@@ -35,3 +43,64 @@ func NewTask(
3543
Gaps: gaps,
3644
}
3745
}
46+
47+
// ToProtoTask converts a Task to a ProtoTask.
48+
// It will use the opened TSDB to get the chunks for the series in the gaps.
49+
func (t *Task) ToProtoTask(ctx context.Context, forSeries sharding.ForSeries) (*protos.ProtoTask, error) {
50+
// Populate the gaps with the series and chunks.
51+
protoGaps := make([]protos.Gap, 0, len(t.Gaps))
52+
for _, gap := range t.Gaps {
53+
if !slices.IsSorted(gap.Series) {
54+
slices.Sort(gap.Series)
55+
}
56+
57+
series := make([]*v1.Series, 0, len(gap.Series))
58+
if err := forSeries.ForSeries(
59+
ctx,
60+
t.Tenant,
61+
gap.Bounds,
62+
0, math.MaxInt64,
63+
func(_ labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) (stop bool) {
64+
select {
65+
case <-ctx.Done():
66+
return true
67+
default:
68+
// Skip this series if it's not in the gap.
69+
// Series are sorted, so we can break early.
70+
if _, found := slices.BinarySearch(gap.Series, fp); !found {
71+
return false
72+
}
73+
74+
chunks := make(v1.ChunkRefs, 0, len(chks))
75+
for _, chk := range chks {
76+
chunks = append(chunks, v1.ChunkRef{
77+
From: model.Time(chk.MinTime),
78+
Through: model.Time(chk.MaxTime),
79+
Checksum: chk.Checksum,
80+
})
81+
}
82+
83+
series = append(series, &v1.Series{
84+
Fingerprint: fp,
85+
Chunks: chunks,
86+
})
87+
return false
88+
}
89+
},
90+
labels.MustNewMatcher(labels.MatchEqual, "", ""),
91+
); err != nil {
92+
return nil, fmt.Errorf("failed to load series from TSDB for gap (%s): %w", gap.Bounds.String(), err)
93+
}
94+
95+
protoGaps = append(protoGaps, protos.Gap{
96+
Bounds: gap.Bounds,
97+
Series: series,
98+
Blocks: gap.Blocks,
99+
})
100+
}
101+
102+
// Copy inner task and set gaps
103+
task := *t.Task
104+
task.Gaps = protoGaps
105+
return task.ToProtoTask(), nil
106+
}

‎pkg/bloombuild/planner/task.go

Lines changed: 1 addition & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,13 @@ package planner
22

33
import (
44
"context"
5-
"fmt"
6-
"math"
7-
"slices"
85
"time"
96

10-
"github.com/prometheus/common/model"
11-
"github.com/prometheus/prometheus/model/labels"
127
"go.uber.org/atomic"
138

149
"github.com/grafana/loki/v3/pkg/bloombuild/common"
1510
"github.com/grafana/loki/v3/pkg/bloombuild/planner/strategies"
1611
"github.com/grafana/loki/v3/pkg/bloombuild/protos"
17-
"github.com/grafana/loki/v3/pkg/logproto"
18-
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/index"
1912
)
2013

2114
type QueueTask struct {
@@ -51,81 +44,5 @@ func NewQueueTask(
5144
// ToProtoTask converts a Task to a ProtoTask.
5245
// It will use the opened TSDB to get the chunks for the series in the gaps.
5346
func (t *QueueTask) ToProtoTask(ctx context.Context) (*protos.ProtoTask, error) {
54-
if t == nil {
55-
return nil, nil
56-
}
57-
58-
protoGaps := make([]*protos.ProtoGapWithBlocks, 0, len(t.Gaps))
59-
for _, gap := range t.Gaps {
60-
blockRefs := make([]string, 0, len(gap.Blocks))
61-
for _, block := range gap.Blocks {
62-
blockRefs = append(blockRefs, block.String())
63-
}
64-
65-
if !slices.IsSorted(gap.Series) {
66-
slices.Sort(gap.Series)
67-
}
68-
69-
series := make([]*protos.ProtoSeries, 0, len(gap.Series))
70-
if err := t.forSeries.ForSeries(
71-
ctx,
72-
t.Tenant,
73-
gap.Bounds,
74-
0, math.MaxInt64,
75-
func(_ labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) (stop bool) {
76-
select {
77-
case <-ctx.Done():
78-
return true
79-
default:
80-
// Skip this series if it's not in the gap.
81-
// Series are sorted, so we can break early.
82-
if _, found := slices.BinarySearch(gap.Series, fp); !found {
83-
return false
84-
}
85-
86-
chunks := make([]*logproto.ShortRef, 0, len(chks))
87-
for _, chk := range chks {
88-
chunks = append(chunks, &logproto.ShortRef{
89-
From: model.Time(chk.MinTime),
90-
Through: model.Time(chk.MaxTime),
91-
Checksum: chk.Checksum,
92-
})
93-
}
94-
95-
series = append(series, &protos.ProtoSeries{
96-
Fingerprint: uint64(fp),
97-
Chunks: chunks,
98-
})
99-
return false
100-
}
101-
},
102-
labels.MustNewMatcher(labels.MatchEqual, "", ""),
103-
); err != nil {
104-
return nil, fmt.Errorf("failed to load series from TSDB for gap (%s): %w", gap.Bounds.String(), err)
105-
}
106-
107-
protoGaps = append(protoGaps, &protos.ProtoGapWithBlocks{
108-
Bounds: protos.ProtoFingerprintBounds{
109-
Min: gap.Bounds.Min,
110-
Max: gap.Bounds.Max,
111-
},
112-
Series: series,
113-
BlockRef: blockRefs,
114-
})
115-
}
116-
117-
return &protos.ProtoTask{
118-
Id: t.ID,
119-
Table: protos.DayTable{
120-
DayTimestampMS: int64(t.Table.Time),
121-
Prefix: t.Table.Prefix,
122-
},
123-
Tenant: t.Tenant,
124-
Bounds: protos.ProtoFingerprintBounds{
125-
Min: t.OwnershipBounds.Min,
126-
Max: t.OwnershipBounds.Max,
127-
},
128-
Tsdb: t.TSDB.Path(),
129-
Gaps: protoGaps,
130-
}, nil
47+
return t.Task.ToProtoTask(ctx, t.forSeries)
13148
}

‎pkg/bloombuild/protos/compat.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/pkg/errors"
88
"github.com/prometheus/common/model"
99

10+
"github.com/grafana/loki/v3/pkg/logproto"
1011
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
1112
"github.com/grafana/loki/v3/pkg/storage/config"
1213
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
@@ -106,6 +107,59 @@ func FromProtoTask(task *ProtoTask) (*Task, error) {
106107
}, nil
107108
}
108109

110+
func (t *Task) ToProtoTask() *ProtoTask {
111+
if t == nil {
112+
return nil
113+
}
114+
115+
protoGaps := make([]*ProtoGapWithBlocks, 0, len(t.Gaps))
116+
for _, gap := range t.Gaps {
117+
blockRefs := make([]string, 0, len(gap.Blocks))
118+
for _, block := range gap.Blocks {
119+
blockRefs = append(blockRefs, block.String())
120+
}
121+
122+
// TODO(salvacorts): Cast []*v1.Series to []*ProtoSeries right away
123+
series := make([]*ProtoSeries, 0, len(gap.Series))
124+
for _, s := range gap.Series {
125+
chunks := make([]*logproto.ShortRef, 0, len(s.Chunks))
126+
for _, c := range s.Chunks {
127+
chunk := logproto.ShortRef(c)
128+
chunks = append(chunks, &chunk)
129+
}
130+
131+
series = append(series, &ProtoSeries{
132+
Fingerprint: uint64(s.Fingerprint),
133+
Chunks: chunks,
134+
})
135+
}
136+
137+
protoGaps = append(protoGaps, &ProtoGapWithBlocks{
138+
Bounds: ProtoFingerprintBounds{
139+
Min: gap.Bounds.Min,
140+
Max: gap.Bounds.Max,
141+
},
142+
Series: series,
143+
BlockRef: blockRefs,
144+
})
145+
}
146+
147+
return &ProtoTask{
148+
Id: t.ID,
149+
Table: DayTable{
150+
DayTimestampMS: int64(t.Table.Time),
151+
Prefix: t.Table.Prefix,
152+
},
153+
Tenant: t.Tenant,
154+
Bounds: ProtoFingerprintBounds{
155+
Min: t.OwnershipBounds.Min,
156+
Max: t.OwnershipBounds.Max,
157+
},
158+
Tsdb: t.TSDB.Path(),
159+
Gaps: protoGaps,
160+
}
161+
}
162+
109163
func (t *Task) GetLogger(logger log.Logger) log.Logger {
110164
return log.With(logger,
111165
"task", t.ID,

0 commit comments

Comments
 (0)