Skip to content

Commit 70d9587

Browse files
feat: Use context propagation to call the same ingester in GetChunksID as Query (#15186)
Co-authored-by: Ben Clive <ben.clive@grafana.com>
1 parent 532bdbc commit 70d9587

File tree

4 files changed

+267
-42
lines changed

4 files changed

+267
-42
lines changed

‎pkg/querier/ingester_querier.go

+108-29
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"net/http"
66
"slices"
77
"strings"
8+
"sync"
89
"time"
910

1011
"github.com/go-kit/log"
@@ -82,10 +83,91 @@ func newIngesterQuerier(querierConfig Config, clientCfg client.Config, ring ring
8283
return &iq, nil
8384
}
8485

86+
type ctxKeyType string
87+
88+
const (
89+
partitionCtxKey ctxKeyType = "partitionCtx"
90+
)
91+
92+
type PartitionContext struct {
93+
isPartitioned bool
94+
ingestersUsed map[string]PartitionIngesterUsed
95+
mtx sync.Mutex
96+
}
97+
98+
type PartitionIngesterUsed struct {
99+
client logproto.QuerierClient
100+
addr string
101+
}
102+
103+
func (p *PartitionContext) AddClient(client logproto.QuerierClient, addr string) {
104+
p.mtx.Lock()
105+
defer p.mtx.Unlock()
106+
if !p.isPartitioned {
107+
return
108+
}
109+
p.ingestersUsed[addr] = PartitionIngesterUsed{client: client, addr: addr}
110+
}
111+
112+
func (p *PartitionContext) RemoveClient(addr string) {
113+
p.mtx.Lock()
114+
defer p.mtx.Unlock()
115+
if !p.isPartitioned {
116+
return
117+
}
118+
delete(p.ingestersUsed, addr)
119+
}
120+
121+
func (p *PartitionContext) SetIsPartitioned(isPartitioned bool) {
122+
p.mtx.Lock()
123+
defer p.mtx.Unlock()
124+
p.isPartitioned = isPartitioned
125+
}
126+
127+
func (p *PartitionContext) IsPartitioned() bool {
128+
return p.isPartitioned
129+
}
130+
131+
func (p *PartitionContext) forQueriedIngesters(ctx context.Context, f func(context.Context, logproto.QuerierClient) (interface{}, error)) ([]responseFromIngesters, error) {
132+
p.mtx.Lock()
133+
defer p.mtx.Unlock()
134+
135+
ingestersUsed := make([]PartitionIngesterUsed, 0, len(p.ingestersUsed))
136+
for _, ingester := range p.ingestersUsed {
137+
ingestersUsed = append(ingestersUsed, ingester)
138+
}
139+
140+
return concurrency.ForEachJobMergeResults(ctx, ingestersUsed, 0, func(ctx context.Context, job PartitionIngesterUsed) ([]responseFromIngesters, error) {
141+
resp, err := f(ctx, job.client)
142+
if err != nil {
143+
return nil, err
144+
}
145+
return []responseFromIngesters{{addr: job.addr, response: resp}}, nil
146+
})
147+
}
148+
149+
// NewPartitionContext creates a new partition context
150+
// This is used to track which ingesters were used in the query and reuse the same ingesters for consecutive queries
151+
func NewPartitionContext(ctx context.Context) context.Context {
152+
return context.WithValue(ctx, partitionCtxKey, &PartitionContext{
153+
ingestersUsed: make(map[string]PartitionIngesterUsed),
154+
})
155+
}
156+
157+
func ExtractPartitionContext(ctx context.Context) *PartitionContext {
158+
v, ok := ctx.Value(partitionCtxKey).(*PartitionContext)
159+
if !ok {
160+
return &PartitionContext{
161+
ingestersUsed: make(map[string]PartitionIngesterUsed),
162+
}
163+
}
164+
return v
165+
}
166+
85167
// forAllIngesters runs f, in parallel, for all ingesters
86-
// waitForAllResponses param can be used to require results from all ingesters in the replication set. If this is set to false, the call will return as soon as we have a quorum by zone. Only valid for partition-ingesters.
87-
func (q *IngesterQuerier) forAllIngesters(ctx context.Context, waitForAllResponses bool, f func(context.Context, logproto.QuerierClient) (interface{}, error)) ([]responseFromIngesters, error) {
168+
func (q *IngesterQuerier) forAllIngesters(ctx context.Context, f func(context.Context, logproto.QuerierClient) (interface{}, error)) ([]responseFromIngesters, error) {
88169
if q.querierConfig.QueryPartitionIngesters {
170+
ExtractPartitionContext(ctx).SetIsPartitioned(true)
89171
tenantID, err := user.ExtractOrgID(ctx)
90172
if err != nil {
91173
return nil, err
@@ -99,7 +181,7 @@ func (q *IngesterQuerier) forAllIngesters(ctx context.Context, waitForAllRespons
99181
if err != nil {
100182
return nil, err
101183
}
102-
return q.forGivenIngesterSets(ctx, waitForAllResponses, replicationSets, f)
184+
return q.forGivenIngesterSets(ctx, replicationSets, f)
103185
}
104186

105187
replicationSet, err := q.ring.GetReplicationSetForOperation(ring.Read)
@@ -111,19 +193,13 @@ func (q *IngesterQuerier) forAllIngesters(ctx context.Context, waitForAllRespons
111193
}
112194

113195
// forGivenIngesterSets runs f, in parallel, for given ingester sets
114-
// waitForAllResponses param can be used to require results from all ingesters in all replication sets. If this is set to false, the call will return as soon as we have a quorum by zone.
115-
func (q *IngesterQuerier) forGivenIngesterSets(ctx context.Context, waitForAllResponses bool, replicationSet []ring.ReplicationSet, f func(context.Context, logproto.QuerierClient) (interface{}, error)) ([]responseFromIngesters, error) {
196+
func (q *IngesterQuerier) forGivenIngesterSets(ctx context.Context, replicationSet []ring.ReplicationSet, f func(context.Context, logproto.QuerierClient) (interface{}, error)) ([]responseFromIngesters, error) {
116197
// Enable minimize requests if we can, so we initially query a single ingester per replication set, as each replication-set is one partition.
117198
// Ingesters must supply zone information for this to have an effect.
118199
config := ring.DoUntilQuorumConfig{
119-
MinimizeRequests: !waitForAllResponses,
200+
MinimizeRequests: true,
120201
}
121202
return concurrency.ForEachJobMergeResults[ring.ReplicationSet, responseFromIngesters](ctx, replicationSet, 0, func(ctx context.Context, set ring.ReplicationSet) ([]responseFromIngesters, error) {
122-
if waitForAllResponses {
123-
// Tell the ring we need to return all responses from all zones
124-
set.MaxErrors = 0
125-
set.MaxUnavailableZones = 0
126-
}
127203
return q.forGivenIngesters(ctx, set, config, f)
128204
})
129205
}
@@ -135,17 +211,16 @@ func (q *IngesterQuerier) forGivenIngesters(ctx context.Context, replicationSet
135211
if err != nil {
136212
return responseFromIngesters{addr: ingester.Addr}, err
137213
}
138-
139214
resp, err := f(ctx, client.(logproto.QuerierClient))
140215
if err != nil {
141216
return responseFromIngesters{addr: ingester.Addr}, err
142217
}
143218

219+
ExtractPartitionContext(ctx).AddClient(client.(logproto.QuerierClient), ingester.Addr)
144220
return responseFromIngesters{ingester.Addr, resp}, nil
145-
}, func(responseFromIngesters) {
146-
// Nothing to do
221+
}, func(cleanup responseFromIngesters) {
222+
ExtractPartitionContext(ctx).RemoveClient(cleanup.addr)
147223
})
148-
149224
if err != nil {
150225
return nil, err
151226
}
@@ -157,7 +232,7 @@ func (q *IngesterQuerier) forGivenIngesters(ctx context.Context, replicationSet
157232
}
158233

159234
func (q *IngesterQuerier) SelectLogs(ctx context.Context, params logql.SelectLogParams) ([]iter.EntryIterator, error) {
160-
resps, err := q.forAllIngesters(ctx, false, func(_ context.Context, client logproto.QuerierClient) (interface{}, error) {
235+
resps, err := q.forAllIngesters(ctx, func(_ context.Context, client logproto.QuerierClient) (interface{}, error) {
161236
stats.FromContext(ctx).AddIngesterReached(1)
162237
return client.Query(ctx, params.QueryRequest)
163238
})
@@ -173,7 +248,7 @@ func (q *IngesterQuerier) SelectLogs(ctx context.Context, params logql.SelectLog
173248
}
174249

175250
func (q *IngesterQuerier) SelectSample(ctx context.Context, params logql.SelectSampleParams) ([]iter.SampleIterator, error) {
176-
resps, err := q.forAllIngesters(ctx, false, func(_ context.Context, client logproto.QuerierClient) (interface{}, error) {
251+
resps, err := q.forAllIngesters(ctx, func(_ context.Context, client logproto.QuerierClient) (interface{}, error) {
177252
stats.FromContext(ctx).AddIngesterReached(1)
178253
return client.QuerySample(ctx, params.SampleQueryRequest)
179254
})
@@ -189,7 +264,7 @@ func (q *IngesterQuerier) SelectSample(ctx context.Context, params logql.SelectS
189264
}
190265

191266
func (q *IngesterQuerier) Label(ctx context.Context, req *logproto.LabelRequest) ([][]string, error) {
192-
resps, err := q.forAllIngesters(ctx, false, func(ctx context.Context, client logproto.QuerierClient) (interface{}, error) {
267+
resps, err := q.forAllIngesters(ctx, func(ctx context.Context, client logproto.QuerierClient) (interface{}, error) {
193268
return client.Label(ctx, req)
194269
})
195270
if err != nil {
@@ -205,7 +280,7 @@ func (q *IngesterQuerier) Label(ctx context.Context, req *logproto.LabelRequest)
205280
}
206281

207282
func (q *IngesterQuerier) Tail(ctx context.Context, req *logproto.TailRequest) (map[string]logproto.Querier_TailClient, error) {
208-
resps, err := q.forAllIngesters(ctx, false, func(_ context.Context, client logproto.QuerierClient) (interface{}, error) {
283+
resps, err := q.forAllIngesters(ctx, func(_ context.Context, client logproto.QuerierClient) (interface{}, error) {
209284
return client.Tail(ctx, req)
210285
})
211286
if err != nil {
@@ -270,7 +345,7 @@ func (q *IngesterQuerier) TailDisconnectedIngesters(ctx context.Context, req *lo
270345
}
271346

272347
func (q *IngesterQuerier) Series(ctx context.Context, req *logproto.SeriesRequest) ([][]logproto.SeriesIdentifier, error) {
273-
resps, err := q.forAllIngesters(ctx, false, func(ctx context.Context, client logproto.QuerierClient) (interface{}, error) {
348+
resps, err := q.forAllIngesters(ctx, func(ctx context.Context, client logproto.QuerierClient) (interface{}, error) {
274349
return client.Series(ctx, req)
275350
})
276351
if err != nil {
@@ -325,15 +400,22 @@ func (q *IngesterQuerier) TailersCount(ctx context.Context) ([]uint32, error) {
325400
}
326401

327402
func (q *IngesterQuerier) GetChunkIDs(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]string, error) {
328-
// We must wait for all responses when using partition-ingesters to avoid a race between Query and GetChunkIDs calls.
329-
// This occurs if call Query on an ingester after a recent flush then call GetChunkIDs on a different, unflushed ingester in the same partition.
330-
resps, err := q.forAllIngesters(ctx, q.querierConfig.QueryPartitionIngesters, func(ctx context.Context, querierClient logproto.QuerierClient) (interface{}, error) {
403+
ingesterQueryFn := q.forAllIngesters
404+
405+
partitionCtx := ExtractPartitionContext(ctx)
406+
if partitionCtx.IsPartitioned() {
407+
// We need to query the same ingesters as the previous query
408+
ingesterQueryFn = partitionCtx.forQueriedIngesters
409+
}
410+
411+
resps, err := ingesterQueryFn(ctx, func(ctx context.Context, querierClient logproto.QuerierClient) (interface{}, error) {
331412
return querierClient.GetChunkIDs(ctx, &logproto.GetChunkIDsRequest{
332413
Matchers: convertMatchersToString(matchers),
333414
Start: from.Time(),
334415
End: through.Time(),
335416
})
336417
})
418+
337419
if err != nil {
338420
return nil, err
339421
}
@@ -347,14 +429,13 @@ func (q *IngesterQuerier) GetChunkIDs(ctx context.Context, from, through model.T
347429
}
348430

349431
func (q *IngesterQuerier) Stats(ctx context.Context, _ string, from, through model.Time, matchers ...*labels.Matcher) (*index_stats.Stats, error) {
350-
resps, err := q.forAllIngesters(ctx, false, func(ctx context.Context, querierClient logproto.QuerierClient) (interface{}, error) {
432+
resps, err := q.forAllIngesters(ctx, func(ctx context.Context, querierClient logproto.QuerierClient) (interface{}, error) {
351433
return querierClient.GetStats(ctx, &logproto.IndexStatsRequest{
352434
From: from,
353435
Through: through,
354436
Matchers: syntax.MatchersString(matchers),
355437
})
356438
})
357-
358439
if err != nil {
359440
if isUnimplementedCallError(err) {
360441
// Handle communication with older ingesters gracefully
@@ -378,7 +459,7 @@ func (q *IngesterQuerier) Volume(ctx context.Context, _ string, from, through mo
378459
matcherString = syntax.MatchersString(matchers)
379460
}
380461

381-
resps, err := q.forAllIngesters(ctx, false, func(ctx context.Context, querierClient logproto.QuerierClient) (interface{}, error) {
462+
resps, err := q.forAllIngesters(ctx, func(ctx context.Context, querierClient logproto.QuerierClient) (interface{}, error) {
382463
return querierClient.GetVolume(ctx, &logproto.VolumeRequest{
383464
From: from,
384465
Through: through,
@@ -388,7 +469,6 @@ func (q *IngesterQuerier) Volume(ctx context.Context, _ string, from, through mo
388469
AggregateBy: aggregateBy,
389470
})
390471
})
391-
392472
if err != nil {
393473
if isUnimplementedCallError(err) {
394474
// Handle communication with older ingesters gracefully
@@ -407,10 +487,9 @@ func (q *IngesterQuerier) Volume(ctx context.Context, _ string, from, through mo
407487
}
408488

409489
func (q *IngesterQuerier) DetectedLabel(ctx context.Context, req *logproto.DetectedLabelsRequest) (*logproto.LabelToValuesResponse, error) {
410-
ingesterResponses, err := q.forAllIngesters(ctx, false, func(ctx context.Context, client logproto.QuerierClient) (interface{}, error) {
490+
ingesterResponses, err := q.forAllIngesters(ctx, func(ctx context.Context, client logproto.QuerierClient) (interface{}, error) {
411491
return client.GetDetectedLabels(ctx, req)
412492
})
413-
414493
if err != nil {
415494
level.Error(q.logger).Log("msg", "error getting detected labels", "err", err)
416495
return nil, err

0 commit comments

Comments
 (0)