Skip to content

Commit bd46e4c

Browse files
authored
fix: Mitigate ingester race between Query & GetChunkIDs (#15178)
1 parent f2c2a22 commit bd46e4c

File tree

2 files changed

+42
-30
lines changed

2 files changed

+42
-30
lines changed

‎pkg/querier/ingester_querier.go

+31-24
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ import (
3535
util_log "github.com/grafana/loki/v3/pkg/util/log"
3636
)
3737

38+
var defaultQuorumConfig = ring.DoUntilQuorumConfig{
39+
// Nothing here
40+
}
41+
3842
type responseFromIngesters struct {
3943
addr string
4044
response interface{}
@@ -79,7 +83,8 @@ func newIngesterQuerier(querierConfig Config, clientCfg client.Config, ring ring
7983
}
8084

8185
// forAllIngesters runs f, in parallel, for all ingesters
82-
func (q *IngesterQuerier) forAllIngesters(ctx context.Context, f func(context.Context, logproto.QuerierClient) (interface{}, error)) ([]responseFromIngesters, error) {
86+
// waitForAllResponses param can be used to require results from all ingesters in the replication set. If this is set to false, the call will return as soon as we have a quorum by zone. Only valid for partition-ingesters.
87+
func (q *IngesterQuerier) forAllIngesters(ctx context.Context, waitForAllResponses bool, f func(context.Context, logproto.QuerierClient) (interface{}, error)) ([]responseFromIngesters, error) {
8388
if q.querierConfig.QueryPartitionIngesters {
8489
tenantID, err := user.ExtractOrgID(ctx)
8590
if err != nil {
@@ -94,36 +99,36 @@ func (q *IngesterQuerier) forAllIngesters(ctx context.Context, f func(context.Co
9499
if err != nil {
95100
return nil, err
96101
}
97-
return q.forGivenIngesterSets(ctx, replicationSets, f)
102+
return q.forGivenIngesterSets(ctx, waitForAllResponses, replicationSets, f)
98103
}
99104

100105
replicationSet, err := q.ring.GetReplicationSetForOperation(ring.Read)
101106
if err != nil {
102107
return nil, err
103108
}
104109

105-
return q.forGivenIngesters(ctx, replicationSet, defaultQuorumConfig(), f)
110+
return q.forGivenIngesters(ctx, replicationSet, defaultQuorumConfig, f)
106111
}
107112

108113
// forGivenIngesterSets runs f, in parallel, for given ingester sets
109-
func (q *IngesterQuerier) forGivenIngesterSets(ctx context.Context, replicationSet []ring.ReplicationSet, f func(context.Context, logproto.QuerierClient) (interface{}, error)) ([]responseFromIngesters, error) {
110-
// Enable minimize requests so we initially query a single ingester per replication set, as each replication-set is one partition.
114+
// waitForAllResponses param can be used to require results from all ingesters in all replication sets. If this is set to false, the call will return as soon as we have a quorum by zone.
115+
func (q *IngesterQuerier) forGivenIngesterSets(ctx context.Context, waitForAllResponses bool, replicationSet []ring.ReplicationSet, f func(context.Context, logproto.QuerierClient) (interface{}, error)) ([]responseFromIngesters, error) {
116+
// Enable minimize requests if we can, so we initially query a single ingester per replication set, as each replication-set is one partition.
111117
// Ingesters must supply zone information for this to have an effect.
112118
config := ring.DoUntilQuorumConfig{
113-
MinimizeRequests: true,
119+
MinimizeRequests: !waitForAllResponses,
114120
}
115121
return concurrency.ForEachJobMergeResults[ring.ReplicationSet, responseFromIngesters](ctx, replicationSet, 0, func(ctx context.Context, set ring.ReplicationSet) ([]responseFromIngesters, error) {
122+
if waitForAllResponses {
123+
// Tell the ring we need to return all responses from all zones
124+
set.MaxErrors = 0
125+
set.MaxUnavailableZones = 0
126+
}
116127
return q.forGivenIngesters(ctx, set, config, f)
117128
})
118129
}
119130

120-
func defaultQuorumConfig() ring.DoUntilQuorumConfig {
121-
return ring.DoUntilQuorumConfig{
122-
// Nothing here
123-
}
124-
}
125-
126-
// forGivenIngesters runs f, in parallel, for given ingesters
131+
// forGivenIngesters runs f, in parallel, for given ingesters until a quorum of responses are received
127132
func (q *IngesterQuerier) forGivenIngesters(ctx context.Context, replicationSet ring.ReplicationSet, quorumConfig ring.DoUntilQuorumConfig, f func(context.Context, logproto.QuerierClient) (interface{}, error)) ([]responseFromIngesters, error) {
128133
results, err := ring.DoUntilQuorum(ctx, replicationSet, quorumConfig, func(ctx context.Context, ingester *ring.InstanceDesc) (responseFromIngesters, error) {
129134
client, err := q.pool.GetClientFor(ingester.Addr)
@@ -152,7 +157,7 @@ func (q *IngesterQuerier) forGivenIngesters(ctx context.Context, replicationSet
152157
}
153158

154159
func (q *IngesterQuerier) SelectLogs(ctx context.Context, params logql.SelectLogParams) ([]iter.EntryIterator, error) {
155-
resps, err := q.forAllIngesters(ctx, func(_ context.Context, client logproto.QuerierClient) (interface{}, error) {
160+
resps, err := q.forAllIngesters(ctx, false, func(_ context.Context, client logproto.QuerierClient) (interface{}, error) {
156161
stats.FromContext(ctx).AddIngesterReached(1)
157162
return client.Query(ctx, params.QueryRequest)
158163
})
@@ -168,7 +173,7 @@ func (q *IngesterQuerier) SelectLogs(ctx context.Context, params logql.SelectLog
168173
}
169174

170175
func (q *IngesterQuerier) SelectSample(ctx context.Context, params logql.SelectSampleParams) ([]iter.SampleIterator, error) {
171-
resps, err := q.forAllIngesters(ctx, func(_ context.Context, client logproto.QuerierClient) (interface{}, error) {
176+
resps, err := q.forAllIngesters(ctx, false, func(_ context.Context, client logproto.QuerierClient) (interface{}, error) {
172177
stats.FromContext(ctx).AddIngesterReached(1)
173178
return client.QuerySample(ctx, params.SampleQueryRequest)
174179
})
@@ -184,7 +189,7 @@ func (q *IngesterQuerier) SelectSample(ctx context.Context, params logql.SelectS
184189
}
185190

186191
func (q *IngesterQuerier) Label(ctx context.Context, req *logproto.LabelRequest) ([][]string, error) {
187-
resps, err := q.forAllIngesters(ctx, func(ctx context.Context, client logproto.QuerierClient) (interface{}, error) {
192+
resps, err := q.forAllIngesters(ctx, false, func(ctx context.Context, client logproto.QuerierClient) (interface{}, error) {
188193
return client.Label(ctx, req)
189194
})
190195
if err != nil {
@@ -200,7 +205,7 @@ func (q *IngesterQuerier) Label(ctx context.Context, req *logproto.LabelRequest)
200205
}
201206

202207
func (q *IngesterQuerier) Tail(ctx context.Context, req *logproto.TailRequest) (map[string]logproto.Querier_TailClient, error) {
203-
resps, err := q.forAllIngesters(ctx, func(_ context.Context, client logproto.QuerierClient) (interface{}, error) {
208+
resps, err := q.forAllIngesters(ctx, false, func(_ context.Context, client logproto.QuerierClient) (interface{}, error) {
204209
return client.Tail(ctx, req)
205210
})
206211
if err != nil {
@@ -249,7 +254,7 @@ func (q *IngesterQuerier) TailDisconnectedIngesters(ctx context.Context, req *lo
249254
}
250255

251256
// Instance a tail client for each ingester to re(connect)
252-
reconnectClients, err := q.forGivenIngesters(ctx, ring.ReplicationSet{Instances: reconnectIngesters}, defaultQuorumConfig(), func(_ context.Context, client logproto.QuerierClient) (interface{}, error) {
257+
reconnectClients, err := q.forGivenIngesters(ctx, ring.ReplicationSet{Instances: reconnectIngesters}, defaultQuorumConfig, func(_ context.Context, client logproto.QuerierClient) (interface{}, error) {
253258
return client.Tail(ctx, req)
254259
})
255260
if err != nil {
@@ -265,7 +270,7 @@ func (q *IngesterQuerier) TailDisconnectedIngesters(ctx context.Context, req *lo
265270
}
266271

267272
func (q *IngesterQuerier) Series(ctx context.Context, req *logproto.SeriesRequest) ([][]logproto.SeriesIdentifier, error) {
268-
resps, err := q.forAllIngesters(ctx, func(ctx context.Context, client logproto.QuerierClient) (interface{}, error) {
273+
resps, err := q.forAllIngesters(ctx, false, func(ctx context.Context, client logproto.QuerierClient) (interface{}, error) {
269274
return client.Series(ctx, req)
270275
})
271276
if err != nil {
@@ -297,7 +302,7 @@ func (q *IngesterQuerier) TailersCount(ctx context.Context) ([]uint32, error) {
297302
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "no active ingester found")
298303
}
299304

300-
responses, err := q.forGivenIngesters(ctx, replicationSet, defaultQuorumConfig(), func(ctx context.Context, querierClient logproto.QuerierClient) (interface{}, error) {
305+
responses, err := q.forGivenIngesters(ctx, replicationSet, defaultQuorumConfig, func(ctx context.Context, querierClient logproto.QuerierClient) (interface{}, error) {
301306
resp, err := querierClient.TailersCount(ctx, &logproto.TailersCountRequest{})
302307
if err != nil {
303308
return nil, err
@@ -320,7 +325,9 @@ func (q *IngesterQuerier) TailersCount(ctx context.Context) ([]uint32, error) {
320325
}
321326

322327
func (q *IngesterQuerier) GetChunkIDs(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]string, error) {
323-
resps, err := q.forAllIngesters(ctx, func(ctx context.Context, querierClient logproto.QuerierClient) (interface{}, error) {
328+
// We must wait for all responses when using partition-ingesters to avoid a race between Query and GetChunkIDs calls.
329+
// This occurs if call Query on an ingester after a recent flush then call GetChunkIDs on a different, unflushed ingester in the same partition.
330+
resps, err := q.forAllIngesters(ctx, q.querierConfig.QueryPartitionIngesters, func(ctx context.Context, querierClient logproto.QuerierClient) (interface{}, error) {
324331
return querierClient.GetChunkIDs(ctx, &logproto.GetChunkIDsRequest{
325332
Matchers: convertMatchersToString(matchers),
326333
Start: from.Time(),
@@ -340,7 +347,7 @@ func (q *IngesterQuerier) GetChunkIDs(ctx context.Context, from, through model.T
340347
}
341348

342349
func (q *IngesterQuerier) Stats(ctx context.Context, _ string, from, through model.Time, matchers ...*labels.Matcher) (*index_stats.Stats, error) {
343-
resps, err := q.forAllIngesters(ctx, func(ctx context.Context, querierClient logproto.QuerierClient) (interface{}, error) {
350+
resps, err := q.forAllIngesters(ctx, false, func(ctx context.Context, querierClient logproto.QuerierClient) (interface{}, error) {
344351
return querierClient.GetStats(ctx, &logproto.IndexStatsRequest{
345352
From: from,
346353
Through: through,
@@ -371,7 +378,7 @@ func (q *IngesterQuerier) Volume(ctx context.Context, _ string, from, through mo
371378
matcherString = syntax.MatchersString(matchers)
372379
}
373380

374-
resps, err := q.forAllIngesters(ctx, func(ctx context.Context, querierClient logproto.QuerierClient) (interface{}, error) {
381+
resps, err := q.forAllIngesters(ctx, false, func(ctx context.Context, querierClient logproto.QuerierClient) (interface{}, error) {
375382
return querierClient.GetVolume(ctx, &logproto.VolumeRequest{
376383
From: from,
377384
Through: through,
@@ -400,7 +407,7 @@ func (q *IngesterQuerier) Volume(ctx context.Context, _ string, from, through mo
400407
}
401408

402409
func (q *IngesterQuerier) DetectedLabel(ctx context.Context, req *logproto.DetectedLabelsRequest) (*logproto.LabelToValuesResponse, error) {
403-
ingesterResponses, err := q.forAllIngesters(ctx, func(ctx context.Context, client logproto.QuerierClient) (interface{}, error) {
410+
ingesterResponses, err := q.forAllIngesters(ctx, false, func(ctx context.Context, client logproto.QuerierClient) (interface{}, error) {
404411
return client.GetDetectedLabels(ctx, req)
405412
})
406413

‎pkg/querier/ingester_querier_test.go

+11-6
Original file line numberDiff line numberDiff line change
@@ -241,10 +241,11 @@ func TestIngesterQuerierFetchesResponsesFromPartitionIngesters(t *testing.T) {
241241
}
242242

243243
tests := map[string]struct {
244-
method string
245-
testFn func(*IngesterQuerier) error
246-
retVal interface{}
247-
shards int
244+
method string
245+
testFn func(*IngesterQuerier) error
246+
retVal interface{}
247+
shards int
248+
expectAllResponses bool
248249
}{
249250
"label": {
250251
method: "Label",
@@ -268,7 +269,8 @@ func TestIngesterQuerierFetchesResponsesFromPartitionIngesters(t *testing.T) {
268269
_, err := ingesterQuerier.GetChunkIDs(ctx, model.Time(0), model.Time(0))
269270
return err
270271
},
271-
retVal: new(logproto.GetChunkIDsResponse),
272+
retVal: new(logproto.GetChunkIDsResponse),
273+
expectAllResponses: true,
272274
},
273275
"select_logs": {
274276
method: "Query",
@@ -314,7 +316,7 @@ func TestIngesterQuerierFetchesResponsesFromPartitionIngesters(t *testing.T) {
314316
select {
315317
case <-ctx.Done():
316318
// should not be cancelled by the tracker
317-
require.NoError(t, ctx.Err())
319+
require.NoErrorf(t, ctx.Err(), "tracker should not cancel ctx: %v", context.Cause(ctx))
318320
default:
319321
cnt.Add(1)
320322
}
@@ -340,6 +342,9 @@ func TestIngesterQuerierFetchesResponsesFromPartitionIngesters(t *testing.T) {
340342
testData.shards = partitions
341343
}
342344
expectedCalls := min(testData.shards, partitions)
345+
if testData.expectAllResponses {
346+
expectedCalls = expectedCalls * ingestersPerPartition
347+
}
343348
// Wait for responses: We expect one request per queried partition because we have request minimization enabled & ingesters are in multiple zones.
344349
// If shuffle sharding is enabled, we expect one query per shard as we write to a subset of partitions.
345350
require.Eventually(t, func() bool { return cnt.Load() >= int32(expectedCalls) }, time.Millisecond*100, time.Millisecond*1, "expected all ingesters to respond")

0 commit comments

Comments
 (0)