Skip to content

Commit 6d691ac

Browse files
authored
fix(approx_topk): Map approx_topk operation in all cases (#16131)
1 parent 8bd6400 commit 6d691ac

File tree

2 files changed

+188
-61
lines changed

2 files changed

+188
-61
lines changed

‎pkg/logql/shardmapper.go

Lines changed: 74 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -293,66 +293,7 @@ func (m ShardMapper) mapVectorAggregationExpr(expr *syntax.VectorAggregationExpr
293293
return nil, 0, fmt.Errorf("approx_topk is not enabled. See -limits.shard_aggregations")
294294
}
295295

296-
// TODO(owen-d): integrate bounded sharding with approx_topk
297-
// I'm not doing this now because it uses a separate code path and may not handle
298-
// bounded shards in the same way
299-
shards, bytesPerShard, err := m.shards.Resolver().Shards(expr)
300-
if err != nil {
301-
return nil, 0, err
302-
}
303-
304-
// approx_topk(k, inner) ->
305-
// topk(
306-
// k,
307-
// eval_cms(
308-
// __count_min_sketch__(inner, shard=1) ++ __count_min_sketch__(inner, shard=2)...
309-
// )
310-
// )
311-
312-
countMinSketchExpr := syntax.MustClone(expr)
313-
countMinSketchExpr.Operation = syntax.OpTypeCountMinSketch
314-
countMinSketchExpr.Params = 0
315-
316-
// Even if this query is not sharded the user wants an approximation. This is helpful if some
317-
// inferred label has a very high cardinality. Note that the querier does not support CountMinSketchEvalExpr
318-
// which is why it's evaluated on the front end.
319-
if shards == 0 {
320-
return &syntax.VectorAggregationExpr{
321-
Left: &CountMinSketchEvalExpr{
322-
downstreams: []DownstreamSampleExpr{{
323-
SampleExpr: countMinSketchExpr,
324-
}},
325-
},
326-
Grouping: expr.Grouping,
327-
Operation: syntax.OpTypeTopK,
328-
Params: expr.Params,
329-
}, bytesPerShard, nil
330-
}
331-
332-
downstreams := make([]DownstreamSampleExpr, 0, shards)
333-
for shard := 0; shard < shards; shard++ {
334-
s := NewPowerOfTwoShard(index.ShardAnnotation{
335-
Shard: uint32(shard),
336-
Of: uint32(shards),
337-
})
338-
downstreams = append(downstreams, DownstreamSampleExpr{
339-
shard: &ShardWithChunkRefs{
340-
Shard: s,
341-
},
342-
SampleExpr: countMinSketchExpr,
343-
})
344-
}
345-
346-
sharded := &CountMinSketchEvalExpr{
347-
downstreams: downstreams,
348-
}
349-
350-
return &syntax.VectorAggregationExpr{
351-
Left: sharded,
352-
Grouping: expr.Grouping,
353-
Operation: syntax.OpTypeTopK,
354-
Params: expr.Params,
355-
}, bytesPerShard, nil
296+
return m.mapApproxTopk(expr, false)
356297
default:
357298
// this should not be reachable. If an operation is shardable it should
358299
// have an optimization listed. Nonetheless, we log this as a warning
@@ -367,6 +308,16 @@ func (m ShardMapper) mapVectorAggregationExpr(expr *syntax.VectorAggregationExpr
367308
}
368309
return expr, exprStats.Bytes, nil
369310
}
311+
} else {
312+
// if this AST contains unshardable operations, we still need to rewrite some operations (e.g. approx_topk) as if it had 0 shards as they are not supported on the querier
313+
switch expr.Operation {
314+
case syntax.OpTypeApproxTopK:
315+
level.Error(util_log.Logger).Log(
316+
"msg", "encountered unshardable approx_topk operation",
317+
"operation", expr.Operation,
318+
)
319+
return m.mapApproxTopk(expr, true)
320+
}
370321
}
371322

372323
// if this AST contains unshardable operations, don't shard this at this level,
@@ -388,6 +339,69 @@ func (m ShardMapper) mapVectorAggregationExpr(expr *syntax.VectorAggregationExpr
388339
}, bytesPerShard, nil
389340
}
390341

342+
func (m ShardMapper) mapApproxTopk(expr *syntax.VectorAggregationExpr, forceNoShard bool) (*syntax.VectorAggregationExpr, uint64, error) {
343+
// TODO(owen-d): integrate bounded sharding with approx_topk
344+
// I'm not doing this now because it uses a separate code path and may not handle
345+
// bounded shards in the same way
346+
shards, bytesPerShard, err := m.shards.Resolver().Shards(expr)
347+
if err != nil {
348+
return nil, 0, err
349+
}
350+
351+
// approx_topk(k, inner) ->
352+
// topk(
353+
// k,
354+
// eval_cms(
355+
// __count_min_sketch__(inner, shard=1) ++ __count_min_sketch__(inner, shard=2)...
356+
// )
357+
// )
358+
359+
countMinSketchExpr := syntax.MustClone(expr)
360+
countMinSketchExpr.Operation = syntax.OpTypeCountMinSketch
361+
countMinSketchExpr.Params = 0
362+
363+
// Even if this query is not sharded the user wants an approximation. This is helpful if some
364+
// inferred label has a very high cardinality. Note that the querier does not support CountMinSketchEvalExpr
365+
// which is why it's evaluated on the front end.
366+
if shards == 0 || forceNoShard {
367+
return &syntax.VectorAggregationExpr{
368+
Left: &CountMinSketchEvalExpr{
369+
downstreams: []DownstreamSampleExpr{{
370+
SampleExpr: countMinSketchExpr,
371+
}},
372+
},
373+
Grouping: expr.Grouping,
374+
Operation: syntax.OpTypeTopK,
375+
Params: expr.Params,
376+
}, bytesPerShard, nil
377+
}
378+
379+
downstreams := make([]DownstreamSampleExpr, 0, shards)
380+
for shard := 0; shard < shards; shard++ {
381+
s := NewPowerOfTwoShard(index.ShardAnnotation{
382+
Shard: uint32(shard),
383+
Of: uint32(shards),
384+
})
385+
downstreams = append(downstreams, DownstreamSampleExpr{
386+
shard: &ShardWithChunkRefs{
387+
Shard: s,
388+
},
389+
SampleExpr: countMinSketchExpr,
390+
})
391+
}
392+
393+
sharded := &CountMinSketchEvalExpr{
394+
downstreams: downstreams,
395+
}
396+
397+
return &syntax.VectorAggregationExpr{
398+
Left: sharded,
399+
Grouping: expr.Grouping,
400+
Operation: syntax.OpTypeTopK,
401+
Params: expr.Params,
402+
}, bytesPerShard, nil
403+
}
404+
391405
func (m ShardMapper) mapLabelReplaceExpr(expr *syntax.LabelReplaceExpr, r *downstreamRecorder, topLevel bool) (syntax.SampleExpr, uint64, error) {
392406
subMapped, bytesPerShard, err := m.Map(expr.Left, r, topLevel)
393407
if err != nil {

‎pkg/logql/shardmapper_test.go

Lines changed: 114 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ func TestMappingStrings(t *testing.T) {
245245
sum by (foo_extracted) (
246246
downstream<sumby(foo_extracted)(quantile_over_time(0.95,{foo="baz"}|logfmt|unwrapfoo_extracted|__error__=""[5m])),shard=0_of_2>++downstream<sumby(foo_extracted)(quantile_over_time(0.95,{foo="baz"}|logfmt|unwrapfoo_extracted|__error__=""[5m])),shard=1_of_2>
247247
)
248-
/
248+
/
249249
sum by (foo_extracted) (
250250
downstream<countby(foo_extracted)(quantile_over_time(0.95,{foo="baz"}|logfmt|unwrapfoo_extracted|__error__=""[5m])),shard=0_of_2>++downstream<countby(foo_extracted)(quantile_over_time(0.95,{foo="baz"}|logfmt|unwrapfoo_extracted|__error__=""[5m])),shard=1_of_2>
251251
)
@@ -652,6 +652,52 @@ func TestMapping(t *testing.T) {
652652
},
653653
},
654654
},
655+
{
656+
in: `rate({foo="bar"}[5m]) > 3`,
657+
expr: &syntax.BinOpExpr{
658+
Op: syntax.OpTypeGT,
659+
SampleExpr: &ConcatSampleExpr{
660+
DownstreamSampleExpr: DownstreamSampleExpr{
661+
shard: NewPowerOfTwoShard(index.ShardAnnotation{
662+
Shard: 0,
663+
Of: 2,
664+
}).Bind(nil),
665+
SampleExpr: &syntax.RangeAggregationExpr{
666+
Operation: syntax.OpRangeTypeRate,
667+
Left: &syntax.LogRangeExpr{
668+
Left: &syntax.MatchersExpr{
669+
Mts: []*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")},
670+
},
671+
Interval: 5 * time.Minute,
672+
},
673+
},
674+
},
675+
next: &ConcatSampleExpr{
676+
DownstreamSampleExpr: DownstreamSampleExpr{
677+
shard: NewPowerOfTwoShard(index.ShardAnnotation{
678+
Shard: 1,
679+
Of: 2,
680+
}).Bind(nil),
681+
SampleExpr: &syntax.RangeAggregationExpr{
682+
Operation: syntax.OpRangeTypeRate,
683+
Left: &syntax.LogRangeExpr{
684+
Left: &syntax.MatchersExpr{
685+
Mts: []*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")},
686+
},
687+
Interval: 5 * time.Minute,
688+
},
689+
},
690+
},
691+
next: nil,
692+
},
693+
},
694+
RHS: &syntax.LiteralExpr{Val: 3},
695+
Opts: &syntax.BinOpOptions{
696+
ReturnBool: false,
697+
VectorMatching: &syntax.VectorMatching{},
698+
},
699+
},
700+
},
655701
{
656702
in: `count_over_time({foo="bar"}[5m])`,
657703
expr: &ConcatSampleExpr{
@@ -833,6 +879,73 @@ func TestMapping(t *testing.T) {
833879
},
834880
},
835881
},
882+
{
883+
in: `approx_topk(3, rate({foo="bar"}[5m]) > 3)`,
884+
expr: &syntax.VectorAggregationExpr{
885+
Grouping: &syntax.Grouping{},
886+
Params: 3,
887+
Operation: syntax.OpTypeTopK,
888+
Left: &CountMinSketchEvalExpr{
889+
downstreams: []DownstreamSampleExpr{
890+
{
891+
shard: nil,
892+
SampleExpr: &syntax.VectorAggregationExpr{
893+
Operation: syntax.OpTypeCountMinSketch,
894+
Left: &syntax.BinOpExpr{
895+
Op: syntax.OpTypeGT,
896+
SampleExpr: &syntax.RangeAggregationExpr{
897+
Operation: syntax.OpRangeTypeRate,
898+
Left: &syntax.LogRangeExpr{
899+
Left: &syntax.MatchersExpr{
900+
Mts: []*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")},
901+
},
902+
Interval: 5 * time.Minute,
903+
},
904+
},
905+
RHS: &syntax.LiteralExpr{Val: 3},
906+
Opts: &syntax.BinOpOptions{
907+
ReturnBool: false,
908+
VectorMatching: &syntax.VectorMatching{},
909+
},
910+
},
911+
Grouping: &syntax.Grouping{},
912+
},
913+
},
914+
},
915+
},
916+
},
917+
},
918+
{
919+
// A contrived query that is not shardable: but we must rewrite `approx_topk` as if it had 0 shards because the approx_topk operation is not supported on the querier
920+
in: `approx_topk(3, topk(5, rate({foo="bar"}[5m])))`,
921+
expr: &syntax.VectorAggregationExpr{
922+
Left: &CountMinSketchEvalExpr{
923+
downstreams: []DownstreamSampleExpr{{
924+
SampleExpr: &syntax.VectorAggregationExpr{
925+
Operation: syntax.OpTypeCountMinSketch,
926+
Left: &syntax.VectorAggregationExpr{Operation: syntax.OpTypeTopK,
927+
Params: 5,
928+
Left: &syntax.RangeAggregationExpr{
929+
Operation: syntax.OpRangeTypeRate,
930+
Left: &syntax.LogRangeExpr{
931+
Left: &syntax.MatchersExpr{
932+
Mts: []*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")},
933+
},
934+
Interval: 5 * time.Minute,
935+
},
936+
},
937+
Grouping: &syntax.Grouping{},
938+
},
939+
Grouping: &syntax.Grouping{},
940+
},
941+
shard: nil,
942+
}},
943+
},
944+
Grouping: &syntax.Grouping{},
945+
Operation: syntax.OpTypeTopK,
946+
Params: 3,
947+
},
948+
},
836949
{
837950
in: `count(rate({foo="bar"}[5m]))`,
838951
expr: &syntax.VectorAggregationExpr{

0 commit comments

Comments
 (0)