Skip to content

Commit 5f476a3

Browse files
fix: Fix goroutine leak in queryrange downstreamer (#15665)
1 parent e347eb7 commit 5f476a3

File tree

1 file changed

+13
-7
lines changed

1 file changed

+13
-7
lines changed

‎pkg/querier/queryrange/downstreamer.go

+13-7
Original file line numberDiff line numberDiff line change
@@ -170,18 +170,20 @@ func (in instance) For(
170170
go func() {
171171
err := concurrency.ForEachJob(ctx, len(queries), in.parallelism, func(ctx context.Context, i int) error {
172172
res, err := fn(queries[i])
173+
if err != nil {
174+
return err
175+
}
173176
response := logql.Resp{
174177
I: i,
175178
Res: res,
176-
Err: err,
177179
}
178180

179181
// Feed the result into the channel unless the work has completed.
180182
select {
181183
case <-ctx.Done():
182184
case ch <- response:
183185
}
184-
return err
186+
return nil
185187
})
186188
if err != nil {
187189
ch <- logql.Resp{
@@ -192,15 +194,19 @@ func (in instance) For(
192194
close(ch)
193195
}()
194196

197+
var err error
195198
for resp := range ch {
196-
if resp.Err != nil {
197-
return nil, resp.Err
199+
if err != nil {
200+
continue
198201
}
199-
if err := acc.Accumulate(ctx, resp.Res, resp.I); err != nil {
200-
return nil, err
202+
if resp.Err != nil {
203+
err = resp.Err
204+
continue
201205
}
206+
err = acc.Accumulate(ctx, resp.Res, resp.I)
202207
}
203-
return acc.Result(), nil
208+
209+
return acc.Result(), err
204210
}
205211

206212
// convert to matrix

0 commit comments

Comments
 (0)