Skip to content

Commit 1174064

Browse files
committed
Wrapped ToProtoTask()
1 parent 256d8ff commit 1174064

File tree

3 files changed

+123
-120
lines changed

3 files changed

+123
-120
lines changed

‎pkg/bloombuild/planner/strategies/task.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,21 @@
11
package strategies
22

33
import (
4+
"context"
5+
"fmt"
6+
"math"
7+
"slices"
8+
49
"github.com/prometheus/common/model"
10+
"github.com/prometheus/prometheus/model/labels"
511

612
"github.com/grafana/loki/v3/pkg/bloombuild/protos"
713
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
814
"github.com/grafana/loki/v3/pkg/storage/config"
915
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
1016
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
17+
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/index"
18+
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/sharding"
1119
)
1220

1321
type Gap struct {
@@ -35,3 +43,64 @@ func NewTask(
3543
Gaps: gaps,
3644
}
3745
}
46+
47+
// ToProtoTask converts a Task to a ProtoTask.
48+
// It will use the opened TSDB to get the chunks for the series in the gaps.
49+
func (t *Task) ToProtoTask(ctx context.Context, forSeries sharding.ForSeries) (*protos.ProtoTask, error) {
50+
// Populate the gaps with the series and chunks.
51+
protoGaps := make([]protos.Gap, 0, len(t.Gaps))
52+
for _, gap := range t.Gaps {
53+
if !slices.IsSorted(gap.Series) {
54+
slices.Sort(gap.Series)
55+
}
56+
57+
series := make([]*v1.Series, 0, len(gap.Series))
58+
if err := forSeries.ForSeries(
59+
ctx,
60+
t.Tenant,
61+
gap.Bounds,
62+
0, math.MaxInt64,
63+
func(_ labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) (stop bool) {
64+
select {
65+
case <-ctx.Done():
66+
return true
67+
default:
68+
// Skip this series if it's not in the gap.
69+
// Series are sorted, so we can break early.
70+
if _, found := slices.BinarySearch(gap.Series, fp); !found {
71+
return false
72+
}
73+
74+
chunks := make(v1.ChunkRefs, 0, len(chks))
75+
for _, chk := range chks {
76+
chunks = append(chunks, v1.ChunkRef{
77+
From: model.Time(chk.MinTime),
78+
Through: model.Time(chk.MaxTime),
79+
Checksum: chk.Checksum,
80+
})
81+
}
82+
83+
series = append(series, &v1.Series{
84+
Fingerprint: fp,
85+
Chunks: chunks,
86+
})
87+
return false
88+
}
89+
},
90+
labels.MustNewMatcher(labels.MatchEqual, "", ""),
91+
); err != nil {
92+
return nil, fmt.Errorf("failed to load series from TSDB for gap (%s): %w", gap.Bounds.String(), err)
93+
}
94+
95+
protoGaps = append(protoGaps, protos.Gap{
96+
Bounds: gap.Bounds,
97+
Series: series,
98+
Blocks: gap.Blocks,
99+
})
100+
}
101+
102+
// Copy inner task and set gaps
103+
task := *t.Task
104+
task.Gaps = protoGaps
105+
return task.ToProtoTask(), nil
106+
}

‎pkg/bloombuild/planner/task.go

Lines changed: 1 addition & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,13 @@ package planner
22

33
import (
44
"context"
5-
"fmt"
6-
"math"
7-
"slices"
85
"time"
96

10-
"github.com/prometheus/common/model"
11-
"github.com/prometheus/prometheus/model/labels"
127
"go.uber.org/atomic"
138

149
"github.com/grafana/loki/v3/pkg/bloombuild/common"
1510
"github.com/grafana/loki/v3/pkg/bloombuild/planner/strategies"
1611
"github.com/grafana/loki/v3/pkg/bloombuild/protos"
17-
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
18-
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/index"
1912
)
2013

2114
type QueueTask struct {
@@ -51,64 +44,5 @@ func NewQueueTask(
5144
// ToProtoTask converts a Task to a ProtoTask.
5245
// It will use the opened TSDB to get the chunks for the series in the gaps.
5346
func (t *QueueTask) ToProtoTask(ctx context.Context) (*protos.ProtoTask, error) {
54-
if t == nil {
55-
return nil, nil
56-
}
57-
58-
// Populate the gaps with the series and chunks.
59-
protoGaps := make([]protos.Gap, 0, len(t.Gaps))
60-
for _, gap := range t.Gaps {
61-
if !slices.IsSorted(gap.Series) {
62-
slices.Sort(gap.Series)
63-
}
64-
65-
series := make([]*v1.Series, 0, len(gap.Series))
66-
if err := t.forSeries.ForSeries(
67-
ctx,
68-
t.Tenant,
69-
gap.Bounds,
70-
0, math.MaxInt64,
71-
func(_ labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) (stop bool) {
72-
select {
73-
case <-ctx.Done():
74-
return true
75-
default:
76-
// Skip this series if it's not in the gap.
77-
// Series are sorted, so we can break early.
78-
if _, found := slices.BinarySearch(gap.Series, fp); !found {
79-
return false
80-
}
81-
82-
chunks := make(v1.ChunkRefs, 0, len(chks))
83-
for _, chk := range chks {
84-
chunks = append(chunks, v1.ChunkRef{
85-
From: model.Time(chk.MinTime),
86-
Through: model.Time(chk.MaxTime),
87-
Checksum: chk.Checksum,
88-
})
89-
}
90-
91-
series = append(series, &v1.Series{
92-
Fingerprint: fp,
93-
Chunks: chunks,
94-
})
95-
return false
96-
}
97-
},
98-
labels.MustNewMatcher(labels.MatchEqual, "", ""),
99-
); err != nil {
100-
return nil, fmt.Errorf("failed to load series from TSDB for gap (%s): %w", gap.Bounds.String(), err)
101-
}
102-
103-
protoGaps = append(protoGaps, protos.Gap{
104-
Bounds: gap.Bounds,
105-
Series: series,
106-
Blocks: gap.Blocks,
107-
})
108-
}
109-
110-
// Copy task and set gaps
111-
task := *t.Task.Task
112-
task.Gaps = protoGaps
113-
return task.ToProtoTask(), nil
47+
return t.Task.ToProtoTask(ctx, t.forSeries)
11448
}

‎pkg/bloombuild/protos/compat.go

Lines changed: 53 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -49,59 +49,6 @@ func NewTask(
4949
}
5050
}
5151

52-
func (t *Task) ToProtoTask() *ProtoTask {
53-
if t == nil {
54-
return nil
55-
}
56-
57-
protoGaps := make([]*ProtoGapWithBlocks, 0, len(t.Gaps))
58-
for _, gap := range t.Gaps {
59-
blockRefs := make([]string, 0, len(gap.Blocks))
60-
for _, block := range gap.Blocks {
61-
blockRefs = append(blockRefs, block.String())
62-
}
63-
64-
// TODO(salvacorts): Cast []*v1.Series to []*ProtoSeries right away
65-
series := make([]*ProtoSeries, 0, len(gap.Series))
66-
for _, s := range gap.Series {
67-
chunks := make([]*logproto.ShortRef, 0, len(s.Chunks))
68-
for _, c := range s.Chunks {
69-
chunk := logproto.ShortRef(c)
70-
chunks = append(chunks, &chunk)
71-
}
72-
73-
series = append(series, &ProtoSeries{
74-
Fingerprint: uint64(s.Fingerprint),
75-
Chunks: chunks,
76-
})
77-
}
78-
79-
protoGaps = append(protoGaps, &ProtoGapWithBlocks{
80-
Bounds: ProtoFingerprintBounds{
81-
Min: gap.Bounds.Min,
82-
Max: gap.Bounds.Max,
83-
},
84-
Series: series,
85-
BlockRef: blockRefs,
86-
})
87-
}
88-
89-
return &ProtoTask{
90-
Id: t.ID,
91-
Table: DayTable{
92-
DayTimestampMS: int64(t.Table.Time),
93-
Prefix: t.Table.Prefix,
94-
},
95-
Tenant: t.Tenant,
96-
Bounds: ProtoFingerprintBounds{
97-
Min: t.OwnershipBounds.Min,
98-
Max: t.OwnershipBounds.Max,
99-
},
100-
Tsdb: t.TSDB.Path(),
101-
Gaps: protoGaps,
102-
}
103-
}
104-
10552
func FromProtoTask(task *ProtoTask) (*Task, error) {
10653
if task == nil {
10754
return nil, nil
@@ -160,6 +107,59 @@ func FromProtoTask(task *ProtoTask) (*Task, error) {
160107
}, nil
161108
}
162109

110+
func (t *Task) ToProtoTask() *ProtoTask {
111+
if t == nil {
112+
return nil
113+
}
114+
115+
protoGaps := make([]*ProtoGapWithBlocks, 0, len(t.Gaps))
116+
for _, gap := range t.Gaps {
117+
blockRefs := make([]string, 0, len(gap.Blocks))
118+
for _, block := range gap.Blocks {
119+
blockRefs = append(blockRefs, block.String())
120+
}
121+
122+
// TODO(salvacorts): Cast []*v1.Series to []*ProtoSeries right away
123+
series := make([]*ProtoSeries, 0, len(gap.Series))
124+
for _, s := range gap.Series {
125+
chunks := make([]*logproto.ShortRef, 0, len(s.Chunks))
126+
for _, c := range s.Chunks {
127+
chunk := logproto.ShortRef(c)
128+
chunks = append(chunks, &chunk)
129+
}
130+
131+
series = append(series, &ProtoSeries{
132+
Fingerprint: uint64(s.Fingerprint),
133+
Chunks: chunks,
134+
})
135+
}
136+
137+
protoGaps = append(protoGaps, &ProtoGapWithBlocks{
138+
Bounds: ProtoFingerprintBounds{
139+
Min: gap.Bounds.Min,
140+
Max: gap.Bounds.Max,
141+
},
142+
Series: series,
143+
BlockRef: blockRefs,
144+
})
145+
}
146+
147+
return &ProtoTask{
148+
Id: t.ID,
149+
Table: DayTable{
150+
DayTimestampMS: int64(t.Table.Time),
151+
Prefix: t.Table.Prefix,
152+
},
153+
Tenant: t.Tenant,
154+
Bounds: ProtoFingerprintBounds{
155+
Min: t.OwnershipBounds.Min,
156+
Max: t.OwnershipBounds.Max,
157+
},
158+
Tsdb: t.TSDB.Path(),
159+
Gaps: protoGaps,
160+
}
161+
}
162+
163163
func (t *Task) GetLogger(logger log.Logger) log.Logger {
164164
return log.With(logger,
165165
"task", t.ID,

0 commit comments

Comments
 (0)