ESQL: Support filters on inlinestats#132934
Conversation
…filters_on_inlinestats
…filters_on_inlinestats
…filters_on_inlinestats
35b85ac to
d852cda
Compare
…filters_on_inlinestats
…filters_on_inlinestats
…filters_on_inlinestats
|
Pinging @elastic/es-analytical-engine (Team:Analytics) |
|
Hi @astefan, I've created a changelog YAML for you. |
| 10003 |Parto |null |0 |null |M |1 |4 | ||
| ; | ||
|
|
||
| prunedInlinestatsFollowedByInlinestats_GroupByOneFieldOnSecondInlinestats-Ignore |
There was a problem hiding this comment.
This will be as a follow up investigation. I did try to fix it several times, ran out of good ideas.
| null |[0, 0, 0, 0] |[Berni, Chirstian, Amabile, Berni, Bojan, Chirstian, Anneke, Chirstian]|[Head Human Resources, Reporting Analyst, Support Engineer, Tech Lead]|10004 | ||
| ; | ||
|
|
||
| prunedInlinestatsFollowedByinlinestats-Ignore |
There was a problem hiding this comment.
This will be as a follow up investigation. I did try to fix it several times, ran out of good ideas. This is in the same ballpark failure set as the other one.
| emp_no:integer | ||
| 5 | ||
| ; | ||
| /////////////////////////// |
There was a problem hiding this comment.
Almost all the tests below have been ported from the same set of stats tests with filters and adapted to inlinestats.
| protected LogicalPlan rule(LogicalPlan plan) { | ||
| Aggregate aggregate; | ||
| Holder<InlineJoin> ij = new Holder<>(); | ||
| if (plan instanceof Aggregate a) { |
There was a problem hiding this comment.
This code block, in a nutshell, finds out if the Aggregate comes from inlinejoin or is a standalone one.
| return plan; // not an Aggregate or InlineJoin, nothing to do | ||
| } | ||
|
|
||
| if (aggregate != null) { |
There was a problem hiding this comment.
The code here is the same as before, but now wrapped in this condition related to aggregate being null or not.
| plan = new Project(aggregate.source(), plan, newProjections); | ||
| if (newEvals.isEmpty() == false) { | ||
| if (newAggs.isEmpty()) { // the Aggregate node is pruned | ||
| if (ij.get() != null) { // this is an Aggregate part of right-hand side of an InlineJoin |
There was a problem hiding this comment.
The meat of the PR. Basically, a new "branch" in dealing with pruning Aggs in InlineJoin specifically.
| // project the correct output (the one of the former inlinejoin) and remove the InlineJoin altogether, | ||
| // replacing it with its right-hand side followed by its left-hand side | ||
| plan = new Project(newIJ.source(), newIJ.right(), newIJ.output()); | ||
| } else { // this is a standalone Aggregate |
There was a problem hiding this comment.
Same code as before.
| return newPlan; | ||
| }); | ||
| } else { // this is a standalone Aggregate | ||
| plan = aggregate.with(aggregate.child(), aggregate.groupings(), newAggs); |
There was a problem hiding this comment.
Same code as before.
| plan = localRelation(aggregate.source(), newEvals); | ||
| } | ||
| } else { | ||
| if (ij.get() != null) { // this is an Aggregate part of right-hand side of an InlineJoin |
There was a problem hiding this comment.
The meat of the PR (specific handling for InlineJoin).
| assertThat(Expressions.names(agg.aggregates()), contains("sum(salary)", "sum(salary) WheRe last_name == \"Doe\"")); | ||
| } | ||
|
|
||
| /** |
There was a problem hiding this comment.
Moved these to a new class, specific to ReplaceStatsFilteredAggWithEval rule.
| List<NamedExpression> newProjections = new ArrayList<>(oldAggSize); | ||
| protected LogicalPlan rule(LogicalPlan plan) { | ||
| Aggregate aggregate; | ||
| Holder<InlineJoin> ij = new Holder<>(); |
There was a problem hiding this comment.
I think you can use a simple variable here, you don't need a holder. IIUC, there's a single place where the var would be used in a lambda, you may assign it locally in the respective branch (or right after this detection block) to make it effectively final.
| Holder<InlineJoin> ij = new Holder<>(); | ||
| if (plan instanceof Aggregate a) { | ||
| aggregate = a; | ||
| } else if (plan instanceof InlineJoin) { |
There was a problem hiding this comment.
| } else if (plan instanceof InlineJoin) { | |
| } else if (plan instanceof InlineJoin inlineJoin) { |
to no longer need to cast.
| ij.get().right().forEachDown(p -> { | ||
| if (aggHolder.get() == null && p instanceof Aggregate a) { | ||
| aggHolder.set(a); | ||
| } | ||
| }); |
There was a problem hiding this comment.
| ij.get().right().forEachDown(p -> { | |
| if (aggHolder.get() == null && p instanceof Aggregate a) { | |
| aggHolder.set(a); | |
| } | |
| }); | |
| inlineJoin.right().forEachDown(Aggregate.class, aggHolder::setIfAbsent); |
| if (newAggs.isEmpty()) { // the Aggregate node is pruned | ||
| if (ij.get() != null) { // this is an Aggregate part of right-hand side of an InlineJoin | ||
| // replace the right hand side Aggregate with an Eval | ||
| InlineJoin newIJ = new InlineJoin(ij.get().source(), ij.get().left(), ij.get().right().transformDown(p -> { |
There was a problem hiding this comment.
| InlineJoin newIJ = new InlineJoin(ij.get().source(), ij.get().left(), ij.get().right().transformDown(p -> { | |
| InlineJoin newIJ = inlineJoin.replaceRight(inlineJoin.right().transformDown(p -> { |
with inlineJoin the effectively final var.
| } else { | ||
| if (ij.get() != null) { // this is an Aggregate part of right-hand side of an InlineJoin | ||
| // only update the Aggregate and add an Eval for the removed aggregations | ||
| plan = ij.get().transformUp(Aggregate.class, agg -> { |
There was a problem hiding this comment.
Minor: I think this'll walk the entire (sub)tree of InlineJoin, while an update is really only possible on the right hand-side. Would it be worth applying the change there only?
| | inlinestats max = max(salary), max_a = max(salary) where null, | ||
| min = min(salary), min_a = min(salary) where to_string(null) == "abc" |
There was a problem hiding this comment.
Aesthetic:
| | inlinestats max = max(salary), max_a = max(salary) where null, | |
| min = min(salary), min_a = min(salary) where to_string(null) == "abc" | |
| | inlinestats max = max(salary), | |
| max_a = max(salary) where null, | |
| min = min(salary), | |
| min_a = min(salary) where to_string(null) == "abc" |
| as(limit.child(), EsRelation.class); | ||
| } | ||
|
|
||
| /** |
There was a problem hiding this comment.
| /** | |
| /* |
to prevent having the block be interpreted as javadoc. Here and above, where there's no @code wrapping (which isn't ideal, as it's wrapped in IntelliJ; but that was just copied).
There was a problem hiding this comment.
It's great to start new files (started working on splitting the mighty x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java, but it'd be good to allocate a packet for it, since there'll be many "logical plan optimiser tests" (vs local ~ etc.).
But anyways, this can be added as is and I'll incorporate this later in that work.
There was a problem hiding this comment.
Note though that this approach (separate package only for `local) is the same in the non-test package.
| }), ij.get().config()); | ||
| // project the correct output (the one of the former inlinejoin) and remove the InlineJoin altogether, | ||
| // replacing it with its right-hand side followed by its left-hand side | ||
| plan = new Project(newIJ.source(), newIJ.right(), newIJ.output()); |
There was a problem hiding this comment.
This Project is what's basically returned. Can't the entire InlineJoin transformation be skipped and only transform with the right hand-side of the "old" InlineJoin?
| } else if (p instanceof StubRelation) { | ||
| // Remove the StubRelation since the right-hand side of the join is now part of the main plan | ||
| // and it won't be executed separately by the EsqlSession inlinestats planning. | ||
| p = ij.get().left(); |
There was a problem hiding this comment.
Not quite positive, but this might be the issue for a stack overflow with:
from employees
| inlinestats count = count(*) where false
| inlinestats cc = count_distinct(emp_no) where false
There was a problem hiding this comment.
Nice catch @bpintea.
This raises an interesting discussion point: the fact that the right-hand side of a InlineJoin must contain a single StubRelation and that anything that has to do with StubRelations has to happen only once per InlineJoin right-hand side. I've made some changes to InlineJoin because of this.
…filters_on_inlinestats
…filters_on_inlinestats
…filters_on_inlinestats
| doneReplacing.set(true); | ||
| return up.replaceChild(stubReplacement); | ||
| } | ||
| assert false : "Expected to replace a single StubRelation in the plan, but found more than one"; |
There was a problem hiding this comment.
Better throw here:
- this will bring down the whole node in CI (and fail many other unrelated tests).
- it will do nothing in production: better fail the query.
| @@ -80,17 +80,26 @@ public static LogicalPlan inlineData(InlineJoin target, LocalRelation data) { | |||
| * NOTE: this will replace all {@link StubRelation}s found with the source and the method is meant to be used to replace one node only | |||
There was a problem hiding this comment.
| * NOTE: this will replace all {@link StubRelation}s found with the source and the method is meant to be used to replace one node only | |
| * NOTE: this will replace the first {@link StubRelation}s found with the source and the method is meant to be used to replace one node only |
| return stubbed.transformUp(UnaryPlan.class, up -> { | ||
| Holder<Boolean> doneReplacing = new Holder<>(false); | ||
| var result = stubbedPlan.transformUp(UnaryPlan.class, up -> { | ||
| if (up.child() instanceof StubRelation) { |
There was a problem hiding this comment.
This is not a change strictly related to the PR, but this assumes there will never be an InlineJoin having a right child be a StubRelation. I'm not sure if this can occur now, but ideally we wouldn't rely on it, an intermediary state of the optimisations could theoretically produce it.
I.e. it's not an invariant.
| return up; | ||
| }); | ||
|
|
||
| assert doneReplacing.get() : "Expected to replace a single StubRelation in the plan, but none found"; |
There was a problem hiding this comment.
Maybe same here, we could throw, instead of an assert?
| var newRight = ij.right().transformDown(Aggregate.class, agg -> { | ||
| LogicalPlan p = agg; | ||
| if (agg == aggregate) { | ||
| // the aggregate becomes a simple Eval since it's not needed anymore (it was replaced with Literals) | ||
| p = new Eval(aggregate.source(), aggregate.child(), newEvals); | ||
| } | ||
| return p; | ||
| }); |
There was a problem hiding this comment.
Optional, but would be symmetrical to the other branch:
| var newRight = ij.right().transformDown(Aggregate.class, agg -> { | |
| LogicalPlan p = agg; | |
| if (agg == aggregate) { | |
| // the aggregate becomes a simple Eval since it's not needed anymore (it was replaced with Literals) | |
| p = new Eval(aggregate.source(), aggregate.child(), newEvals); | |
| } | |
| return p; | |
| }); | |
| // the aggregate becomes a simple Eval since it's not needed anymore (it was replaced with Literals) | |
| var newRight = ij.right() | |
| .transformDown( | |
| Aggregate.class, | |
| agg -> agg == aggregate ? new Eval(aggregate.source(), aggregate.child(), newEvals) : agg | |
| ); |
| 10010 |null |4 |6.949207097931448 |7.127229475750027|27921.220736207077|10 | ||
| ; | ||
|
|
||
| twoConsecutiveInlinestatsWithFalseFiltes |
There was a problem hiding this comment.
I'd maybe add this part of the unit tests (ReplaceStatsFilteredAggWithEvalTests). It's interesting, as it produces this plan:
EsqlProject[[emp_no{f}#9, count{r}#5, cc{r}#8]]
\_Eval[[0[LONG] AS count#5, 0[LONG] AS cc#8]]
\_TopN[[Order[emp_no{f}#9,ASC,LAST]],3[INTEGER]]
\_Limit[3[INTEGER],false]
\_EsRelation[test][_meta_field{f}#15, emp_no{f}#9, first_name{f}#10, g..]
The TopN-Limit is probably avoidable, though it's not produced part of this change, it's the way we handle limit | sort | limit. But up to you.
…filters_on_inlinestats
Addresses #124741