Skip to content

Commit f92dde0

Browse files
authored
feat(blooms): Apply task timeout in bloom builder (#14988)
1 parent df7a8e4 commit f92dde0

File tree

3 files changed

+162
-33
lines changed

3 files changed

+162
-33
lines changed

‎pkg/bloombuild/builder/builder.go

+10
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,12 @@ func (b *Builder) processTask(
368368
logger := task.GetLogger(b.logger)
369369
level.Debug(logger).Log("msg", "task started")
370370

371+
if timeout := b.limits.BuilderResponseTimeout(task.Tenant); timeout > 0 {
372+
var cancel context.CancelFunc
373+
ctx, cancel = context.WithDeadline(ctx, time.Now().Add(timeout))
374+
defer cancel()
375+
}
376+
371377
client, err := b.bloomStore.Client(task.Table.ModelTime())
372378
if err != nil {
373379
level.Error(logger).Log("msg", "failed to get client", "err", err)
@@ -390,6 +396,10 @@ func (b *Builder) processTask(
390396
)
391397

392398
for i := range task.Gaps {
399+
if ctx.Err() != nil {
400+
return nil, ctx.Err()
401+
}
402+
393403
gap := task.Gaps[i]
394404
logger := log.With(logger, "gap", gap.Bounds.String())
395405

‎pkg/bloombuild/builder/builder_test.go

+150-33
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"go.uber.org/atomic"
1919
"google.golang.org/grpc"
2020

21+
"github.com/grafana/loki/v3/pkg/bloombuild/planner/plannertest"
2122
"github.com/grafana/loki/v3/pkg/bloombuild/protos"
2223
"github.com/grafana/loki/v3/pkg/storage"
2324
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
@@ -28,10 +29,7 @@ import (
2829
"github.com/grafana/loki/v3/pkg/storage/types"
2930
)
3031

31-
func Test_BuilderLoop(t *testing.T) {
32-
logger := log.NewNopLogger()
33-
//logger := log.NewLogfmtLogger(os.Stdout)
34-
32+
func setupBuilder(t *testing.T, plannerAddr string, limits Limits, logger log.Logger) *Builder {
3533
schemaCfg := config.SchemaConfig{
3634
Configs: []config.PeriodConfig{
3735
{
@@ -64,22 +62,8 @@ func Test_BuilderLoop(t *testing.T) {
6462
},
6563
}
6664

67-
tasks := make([]*protos.ProtoTask, 256)
68-
for i := range tasks {
69-
tasks[i] = &protos.ProtoTask{
70-
Id: fmt.Sprintf("task-%d", i),
71-
}
72-
}
73-
74-
server, err := newFakePlannerServer(tasks)
75-
require.NoError(t, err)
76-
77-
// Start the server so the builder can connect and receive tasks.
78-
server.Start()
79-
80-
limits := fakeLimits{}
8165
cfg := Config{
82-
PlannerAddress: server.Addr(),
66+
PlannerAddress: plannerAddr,
8367
BackoffConfig: backoff.Config{
8468
MinBackoff: 1 * time.Second,
8569
MaxBackoff: 10 * time.Second,
@@ -88,8 +72,48 @@ func Test_BuilderLoop(t *testing.T) {
8872
}
8973
flagext.DefaultValues(&cfg.GrpcConfig)
9074

91-
builder, err := New(cfg, limits, schemaCfg, storageCfg, storage.NewClientMetrics(), nil, fakeBloomStore{}, logger, prometheus.DefaultRegisterer, nil)
75+
metrics := storage.NewClientMetrics()
76+
metrics.Unregister()
77+
78+
builder, err := New(cfg, limits, schemaCfg, storageCfg, metrics, nil, fakeBloomStore{}, logger, prometheus.NewPedanticRegistry(), nil)
79+
require.NoError(t, err)
80+
81+
return builder
82+
}
83+
84+
func createTasks(n int) []*protos.ProtoTask {
85+
tasks := make([]*protos.ProtoTask, n)
86+
for i := range tasks {
87+
tasks[i] = protos.NewTask(
88+
plannertest.TestTable,
89+
"fake",
90+
v1.NewBounds(model.Fingerprint(i), model.Fingerprint(i+10)),
91+
plannertest.TsdbID(1),
92+
[]protos.Gap{
93+
{
94+
Bounds: v1.NewBounds(model.Fingerprint(i+1), model.Fingerprint(i+2)),
95+
},
96+
{
97+
Bounds: v1.NewBounds(model.Fingerprint(i+3), model.Fingerprint(i+9)),
98+
},
99+
},
100+
).ToProtoTask()
101+
}
102+
return tasks
103+
}
104+
105+
func Test_BuilderLoop(t *testing.T) {
106+
logger := log.NewNopLogger()
107+
//logger := log.NewLogfmtLogger(os.Stdout)
108+
109+
tasks := createTasks(256)
110+
server, err := newFakePlannerServer(tasks)
92111
require.NoError(t, err)
112+
113+
// Start the server so the builder can connect and receive tasks.
114+
server.Start()
115+
116+
builder := setupBuilder(t, server.Addr(), fakeLimits{}, logger)
93117
t.Cleanup(func() {
94118
err = services.StopAndAwaitTerminated(context.Background(), builder)
95119
require.NoError(t, err)
@@ -128,9 +152,71 @@ func Test_BuilderLoop(t *testing.T) {
128152
require.True(t, server.shutdownCalled)
129153
}
130154

155+
func Test_BuilderLoop_Timeout(t *testing.T) {
156+
for _, tc := range []struct {
157+
name string
158+
timeout time.Duration
159+
allTasksSucceed bool
160+
}{
161+
{
162+
name: "no timeout configured",
163+
timeout: 0,
164+
allTasksSucceed: true,
165+
},
166+
{
167+
name: "long enough timeout",
168+
timeout: 15 * time.Minute,
169+
allTasksSucceed: true,
170+
},
171+
{
172+
name: "task times out",
173+
timeout: 1 * time.Nanosecond, // Pretty much immediately.
174+
allTasksSucceed: false,
175+
},
176+
} {
177+
t.Run(tc.name, func(t *testing.T) {
178+
logger := log.NewNopLogger()
179+
//logger := log.NewLogfmtLogger(os.Stdout)
180+
181+
tasks := createTasks(256)
182+
server, err := newFakePlannerServer(tasks)
183+
require.NoError(t, err)
184+
185+
// Start the server so the builder can connect and receive tasks.
186+
server.Start()
187+
188+
limits := fakeLimits{
189+
taskTimout: tc.timeout,
190+
}
191+
builder := setupBuilder(t, server.Addr(), limits, logger)
192+
t.Cleanup(func() {
193+
err = services.StopAndAwaitTerminated(context.Background(), builder)
194+
require.NoError(t, err)
195+
196+
server.Stop()
197+
})
198+
199+
err = services.StartAndAwaitRunning(context.Background(), builder)
200+
require.NoError(t, err)
201+
202+
require.Eventually(t, func() bool {
203+
return server.CompletedTasks() >= len(tasks)
204+
}, 30*time.Second, 500*time.Millisecond)
205+
206+
erroredTasks := server.ErroredTasks()
207+
if tc.allTasksSucceed {
208+
require.Equal(t, 0, erroredTasks)
209+
} else {
210+
require.Equal(t, len(tasks), erroredTasks)
211+
}
212+
})
213+
}
214+
}
215+
131216
type fakePlannerServer struct {
132217
tasks []*protos.ProtoTask
133218
completedTasks atomic.Int64
219+
erroredTasks atomic.Int64
134220
shutdownCalled bool
135221

136222
listenAddr string
@@ -198,11 +284,18 @@ func (f *fakePlannerServer) BuilderLoop(srv protos.PlannerForBuilder_BuilderLoop
198284
if err := srv.Send(&protos.PlannerToBuilder{Task: task}); err != nil {
199285
return fmt.Errorf("failed to send task: %w", err)
200286
}
201-
if _, err := srv.Recv(); err != nil {
287+
288+
result, err := srv.Recv()
289+
if err != nil {
202290
return fmt.Errorf("failed to receive task response: %w", err)
203291
}
204-
time.Sleep(10 * time.Millisecond) // Simulate task processing time to add some latency.
292+
205293
f.completedTasks.Inc()
294+
if result.Result.Error != "" {
295+
f.erroredTasks.Inc()
296+
}
297+
298+
time.Sleep(10 * time.Millisecond) // Simulate task processing time to add some latency.
206299
}
207300

208301
// No more tasks. Wait until shutdown.
@@ -214,32 +307,36 @@ func (f *fakePlannerServer) CompletedTasks() int {
214307
return int(f.completedTasks.Load())
215308
}
216309

310+
func (f *fakePlannerServer) ErroredTasks() int {
311+
return int(f.erroredTasks.Load())
312+
}
313+
217314
func (f *fakePlannerServer) NotifyBuilderShutdown(_ context.Context, _ *protos.NotifyBuilderShutdownRequest) (*protos.NotifyBuilderShutdownResponse, error) {
218315
f.shutdownCalled = true
219316
return &protos.NotifyBuilderShutdownResponse{}, nil
220317
}
221318

222319
type fakeLimits struct {
320+
Limits
321+
taskTimout time.Duration
223322
}
224323

225-
func (f fakeLimits) BloomBlockEncoding(_ string) string {
226-
panic("implement me")
227-
}
228-
229-
func (f fakeLimits) BloomNGramLength(_ string) int {
230-
panic("implement me")
231-
}
324+
var _ Limits = fakeLimits{}
232325

233-
func (f fakeLimits) BloomNGramSkip(_ string) int {
234-
panic("implement me")
326+
func (f fakeLimits) BloomBlockEncoding(_ string) string {
327+
return "none"
235328
}
236329

237330
func (f fakeLimits) BloomMaxBlockSize(_ string) int {
238-
panic("implement me")
331+
return 0
239332
}
240333

241334
func (f fakeLimits) BloomMaxBloomSize(_ string) int {
242-
panic("implement me")
335+
return 0
336+
}
337+
338+
func (f fakeLimits) BuilderResponseTimeout(_ string) time.Duration {
339+
return f.taskTimout
243340
}
244341

245342
type fakeBloomStore struct {
@@ -250,6 +347,26 @@ func (f fakeBloomStore) BloomMetrics() *v1.Metrics {
250347
return nil
251348
}
252349

350+
func (f fakeBloomStore) Client(_ model.Time) (bloomshipper.Client, error) {
351+
return fakeBloomClient{}, nil
352+
}
353+
354+
func (f fakeBloomStore) Fetcher(_ model.Time) (*bloomshipper.Fetcher, error) {
355+
return &bloomshipper.Fetcher{}, nil
356+
}
357+
358+
type fakeBloomClient struct {
359+
bloomshipper.Client
360+
}
361+
362+
func (f fakeBloomClient) PutBlock(_ context.Context, _ bloomshipper.Block) error {
363+
return nil
364+
}
365+
366+
func (f fakeBloomClient) PutMeta(_ context.Context, _ bloomshipper.Meta) error {
367+
return nil
368+
}
369+
253370
func parseDayTime(s string) config.DayTime {
254371
t, err := time.Parse("2006-01-02", s)
255372
if err != nil {

‎pkg/bloombuild/builder/config.go

+2
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package builder
33
import (
44
"flag"
55
"fmt"
6+
"time"
67

78
"github.com/grafana/dskit/backoff"
89
"github.com/grafana/dskit/grpcclient"
@@ -40,4 +41,5 @@ type Limits interface {
4041
BloomBlockEncoding(tenantID string) string
4142
BloomMaxBlockSize(tenantID string) int
4243
BloomMaxBloomSize(tenantID string) int
44+
BuilderResponseTimeout(tenantID string) time.Duration
4345
}

0 commit comments

Comments
 (0)