Skip to content

Commit 7669385

Browse files
authored
fix(blooms): Do not restart builders when planner disconnects (#14783)
1 parent d9eeed3 commit 7669385

File tree

1 file changed

+62
-17
lines changed

1 file changed

+62
-17
lines changed

‎pkg/bloombuild/builder/builder.go

+62-17
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@ package builder
33
import (
44
"context"
55
"fmt"
6+
"io"
67
"os"
8+
"strings"
79
"sync"
810
"time"
911

@@ -136,16 +138,39 @@ func (b *Builder) running(ctx context.Context) error {
136138
retries := backoff.New(ctx, b.cfg.BackoffConfig)
137139
for retries.Ongoing() {
138140
err := b.connectAndBuild(ctx)
139-
if err == nil || errors.Is(err, context.Canceled) {
140-
break
141+
if err != nil {
142+
err := standardizeRPCError(err)
143+
144+
// When the builder is shutting down, we will get a context canceled error
145+
if errors.Is(err, context.Canceled) && b.State() != services.Running {
146+
level.Debug(b.logger).Log("msg", "builder is shutting down")
147+
break
148+
}
149+
150+
// If the planner disconnects while we are sending/receive a message we get an EOF error.
151+
// In this case we should reset the backoff and retry
152+
if errors.Is(err, io.EOF) {
153+
level.Error(b.logger).Log("msg", "planner disconnected. Resetting backoff and retrying", "err", err)
154+
retries.Reset()
155+
continue
156+
}
157+
158+
// Otherwise (e.g. failed to connect to the builder), we should retry
159+
code := status.Code(err)
160+
level.Error(b.logger).Log("msg", "failed to connect and build. Retrying", "retry", retries.NumRetries(), "maxRetries", b.cfg.BackoffConfig.MaxRetries, "code", code.String(), "err", err)
161+
retries.Wait()
162+
continue
141163
}
142164

143-
level.Error(b.logger).Log("msg", "failed to connect and build. Retrying", "err", err)
144-
retries.Wait()
165+
// We shouldn't get here. If we do, we better restart the builder.
166+
// Adding a log line for visibility
167+
level.Error(b.logger).Log("msg", "unexpected end of connectAndBuild. Restarting builder")
168+
break
145169
}
146170

147171
if err := retries.Err(); err != nil {
148172
if errors.Is(err, context.Canceled) {
173+
// Edge case when the builder is shutting down while we check for retries
149174
return nil
150175
}
151176
return fmt.Errorf("failed to connect and build: %w", err)
@@ -154,6 +179,31 @@ func (b *Builder) running(ctx context.Context) error {
154179
return nil
155180
}
156181

182+
// standardizeRPCError converts some gRPC errors we want to handle differently to standard errors.
183+
// 1. codes.Canceled -> context.Canceled
184+
// 2. codes.Unavailable with EOF -> io.EOF
185+
// 3. All other errors are returned as is.
186+
func standardizeRPCError(err error) error {
187+
if err == nil {
188+
return nil
189+
}
190+
191+
if st, ok := status.FromError(err); ok {
192+
switch st.Code() {
193+
case codes.Canceled:
194+
// Happens when the builder is shutting down, and we are sending/receiving a message
195+
return context.Canceled
196+
case codes.Unavailable:
197+
// We want to handle this case as a retryable error that resets the backoff
198+
if i := strings.LastIndex(st.Message(), "EOF"); i != -1 {
199+
return io.EOF
200+
}
201+
}
202+
}
203+
204+
return err
205+
}
206+
157207
func (b *Builder) plannerAddress() string {
158208
if b.ringWatcher == nil {
159209
return b.cfg.PlannerAddress
@@ -173,8 +223,7 @@ func (b *Builder) connectAndBuild(ctx context.Context) error {
173223
return fmt.Errorf("failed to create grpc dial options: %w", err)
174224
}
175225

176-
// nolint:staticcheck // grpc.DialContext() has been deprecated; we'll address it before upgrading to gRPC 2.
177-
conn, err := grpc.DialContext(ctx, b.plannerAddress(), opts...)
226+
conn, err := grpc.NewClient(b.plannerAddress(), opts...)
178227
if err != nil {
179228
return fmt.Errorf("failed to dial bloom planner: %w", err)
180229
}
@@ -202,21 +251,17 @@ func (b *Builder) connectAndBuild(ctx context.Context) error {
202251
}
203252

204253
func (b *Builder) builderLoop(c protos.PlannerForBuilder_BuilderLoopClient) error {
254+
ctx := c.Context()
255+
205256
// Send ready message to planner
206257
if err := c.Send(&protos.BuilderToPlanner{BuilderID: b.ID}); err != nil {
207258
return fmt.Errorf("failed to send ready message to planner: %w", err)
208259
}
209260

210-
for b.State() == services.Running {
211-
// When the planner connection closes, an EOF or "planner shutting down" error is returned.
212-
// When the builder is shutting down, a gRPC context canceled error is returned.
261+
// Will break when planner<->builder connection is closed or when the builder is shutting down.
262+
for ctx.Err() == nil {
213263
protoTask, err := c.Recv()
214264
if err != nil {
215-
if status.Code(err) == codes.Canceled {
216-
level.Debug(b.logger).Log("msg", "builder loop context canceled")
217-
return nil
218-
}
219-
220265
return fmt.Errorf("failed to receive task from planner: %w", err)
221266
}
222267

@@ -235,7 +280,7 @@ func (b *Builder) builderLoop(c protos.PlannerForBuilder_BuilderLoopClient) erro
235280
continue
236281
}
237282

238-
newMetas, err := b.processTask(c.Context(), task)
283+
newMetas, err := b.processTask(ctx, task)
239284
if err != nil {
240285
err = fmt.Errorf("failed to process task: %w", err)
241286
}
@@ -249,8 +294,8 @@ func (b *Builder) builderLoop(c protos.PlannerForBuilder_BuilderLoopClient) erro
249294
b.metrics.processingTask.Set(0)
250295
}
251296

252-
level.Debug(b.logger).Log("msg", "builder loop stopped")
253-
return nil
297+
level.Debug(b.logger).Log("msg", "builder loop stopped", "ctx_err", ctx.Err())
298+
return ctx.Err()
254299
}
255300

256301
func (b *Builder) logTaskCompleted(

0 commit comments

Comments
 (0)