Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/135011.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 135011
summary: Handle right hand side of Inline Stats coming optimized with `LocalRelation`
shortcut
area: ES|QL
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -1808,9 +1808,9 @@ from employees
10003 |Parto |null |0 |null |M |1 |4
;

prunedInlinestatsFollowedByInlinestats_GroupByOneFieldOnSecondInlinestats-Ignore
// values doesn't end up as null
prunedInlinestatsFollowedByInlinestats_GroupByOneFieldOnSecondInlinestats
required_capability: inline_stats
required_capability: inline_stats_fix_pruning_null_filter
from employees
| eval my_length = length(concat(first_name, null))
| inline stats count = count(my_length) where false,
Expand Down Expand Up @@ -1988,9 +1988,9 @@ null |0 |[Gino, Heping]
null |[0, 0, 0, 0] |[Berni, Chirstian, Amabile, Berni, Bojan, Chirstian, Anneke, Chirstian]|[Head Human Resources, Reporting Analyst, Support Engineer, Tech Lead]|10004
;

prunedInlinestatsFollowedByinlinestats-Ignore
// values doesn't end up as null
prunedInlinestatsFollowedByInlinestats
required_capability: inline_stats
required_capability: inline_stats_fix_pruning_null_filter
from employees
| eval my_length = length(concat(first_name, null))
| inline stats count = count(my_length) where false,
Expand Down Expand Up @@ -3877,3 +3877,39 @@ city:keyword|c:long
Raleigh |1
;

inlinestatsOptimizedAsLocalRelation_before_EsqlSession1
required_capability: inline_stats_fix_optimized_as_local_relation

from employees
| keep first_name, emp_no
| eval my_length = length(concat(first_name, null))
| inline stats count = count(my_length) where my_length > 0
| sort first_name
| limit 5
;

first_name:s | emp_no:i | my_length:i | count:l
Alejandro |10059 |null |0
Amabile |10091 |null |0
Anneke |10006 |null |0
Anoosh |10062 |null |0
Arumugam |10094 |null |0
;

inlinestatsOptimizedAsLocalRelation_before_EsqlSession2
required_capability: inline_stats_fix_optimized_as_local_relation

from employees
| eval my_length = concat(first_name, null)
| inline stats count = count(my_length) where my_length is not null
| keep emp_no, count, my_length
| sort emp_no
| limit 3
;

emp_no:i | count:l | my_length:s
10001 |0 |null
10002 |0 |null
10003 |0 |null
;

Original file line number Diff line number Diff line change
Expand Up @@ -1543,9 +1543,15 @@ public enum Cap {
*/
TS_COMMAND_V0(),

FIX_ALIAS_ID_WHEN_DROP_ALL_AGGREGATES
FIX_ALIAS_ID_WHEN_DROP_ALL_AGGREGATES,

;
/**
* INLINE STATS fix incorrect prunning of null filtering
* https://github.com/elastic/elasticsearch/pull/135011
*/
INLINE_STATS_FIX_PRUNING_NULL_FILTER(INLINESTATS_V11.enabled),

INLINE_STATS_FIX_OPTIMIZED_AS_LOCAL_RELATION(INLINESTATS_V11.enabled);

private final boolean enabled;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.xpack.esql.VerificationException;
import org.elasticsearch.xpack.esql.common.Failures;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.optimizer.rules.PruneInlineJoinOnEmptyRightSide;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.BooleanFunctionEqualsElimination;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.BooleanSimplification;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.CombineBinaryComparisons;
Expand Down Expand Up @@ -205,7 +206,8 @@ protected static Batch<LogicalPlan> operators(boolean local) {
new PushDownAndCombineOrderBy(),
new PruneRedundantOrderBy(),
new PruneRedundantSortClauses(),
new PruneLeftJoinOnNullMatchingField()
new PruneLeftJoinOnNullMatchingField(),
new PruneInlineJoinOnEmptyRightSide()
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.optimizer.rules;

import org.elasticsearch.xpack.esql.optimizer.rules.logical.OptimizerRules;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;

public final class PruneInlineJoinOnEmptyRightSide extends OptimizerRules.OptimizerRule<InlineJoin> {

@Override
protected LogicalPlan rule(InlineJoin plan) {
return plan.right() instanceof LocalRelation lr ? InlineJoin.inlineData(plan, lr) : plan;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.Project;
import org.elasticsearch.xpack.esql.plan.logical.Sample;
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
import org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin;
import org.elasticsearch.xpack.esql.plan.logical.local.EmptyLocalSupplier;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier;
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
Expand All @@ -34,6 +34,8 @@
import java.util.ArrayList;
import java.util.List;

import static org.elasticsearch.xpack.esql.optimizer.rules.logical.PruneEmptyPlans.skipPlan;

/**
* Remove unused columns created in the plan, in fields inside eval or aggregations inside stats.
*/
Expand Down Expand Up @@ -193,9 +195,9 @@ private static LogicalPlan pruneColumnsInEsRelation(EsRelation esr, AttributeSet
return p;
}

private static LogicalPlan emptyLocalRelation(LogicalPlan plan) {
private static LogicalPlan emptyLocalRelation(UnaryPlan plan) {
// create an empty local relation with no attributes
return new LocalRelation(plan.source(), plan.output(), EmptyLocalSupplier.EMPTY);
return skipPlan(plan);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated to the meat of the PR

}

private static boolean isLocalEmptyRelation(LogicalPlan plan) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;

import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes;
import static org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes.LEFT;
Expand Down Expand Up @@ -142,15 +143,29 @@ public record LogicalPlanTuple(LogicalPlan stubReplacedSubPlan, LogicalPlan orig
* \_Limit[1000[INTEGER],false]
* \_LocalRelation[[x{r}#99],[IntVectorBlock[vector=ConstantIntVector[positions=1, value=1]]]]
*/
public static LogicalPlanTuple firstSubPlan(LogicalPlan optimizedPlan) {
public static LogicalPlanTuple firstSubPlan(LogicalPlan optimizedPlan, Set<LocalRelation> subPlansResults) {
Holder<LogicalPlanTuple> subPlan = new Holder<>();
// Collect the first inlinejoin (bottom up in the tree)
optimizedPlan.forEachUp(InlineJoin.class, ij -> {
// extract the right side of the plan and replace its source
if (subPlan.get() == null && ij.right().anyMatch(p -> p instanceof StubRelation)) {
var p = replaceStub(ij.left(), ij.right());
p.setOptimized();
subPlan.set(new LogicalPlanTuple(p, ij.right()));
if (subPlan.get() == null) {
if (ij.right().anyMatch(p -> p instanceof StubRelation)) {
var p = replaceStub(ij.left(), ij.right());
p.setOptimized();
subPlan.set(new LogicalPlanTuple(p, ij.right()));
} else if (ij.right() instanceof LocalRelation relation
&& (subPlansResults.isEmpty() || subPlansResults.contains(relation) == false)
|| ij.right() instanceof LocalRelation == false && ij.right().anyMatch(p -> p instanceof LocalRelation)) {
Comment on lines +156 to +158
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be worth extracting this check into a method, it's a bit hard to read.
I tried to simplify it a bit, but we run into block tracking issues, so I guess this could probably be simplified, with some extra accounting.
But that can be a followup.

// In case the plan was optimized further and the StubRelation was replaced with a LocalRelation
// or the right hand side became a LocalRelation alltogether, there is no need to replace the source of the
// right-hand side anymore.
var p = ij.right();
p.setOptimized();
subPlan.set(new LogicalPlanTuple(p, ij.right()));
Comment on lines +162 to +164
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't know if it'd be worth creating a light static method to do this here and the similar thing above.

// TODO: INLINE STATS this is essentially an optimization similar to the one in PruneInlineJoinOnEmptyRightSide
// this further supports the idea of running the optimization step again after the substitutions (see EsqlSession
// executeSubPlan() method where we could run the optimizer after the results are replaced in place).
}
}
});
return subPlan.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.elasticsearch.xpack.esql.analysis.Verifier;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute;
import org.elasticsearch.xpack.esql.core.tree.NodeUtils;
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.enrich.EnrichPolicyResolver;
Expand Down Expand Up @@ -82,6 +83,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -252,7 +254,8 @@ private void executeSubPlans(
EsqlQueryRequest request,
ActionListener<Result> listener
) {
var subPlan = firstSubPlan(optimizedPlan);
var subPlansResults = new HashSet<LocalRelation>();
var subPlan = firstSubPlan(optimizedPlan, subPlansResults);

// TODO: merge into one method
if (subPlan != null) {
Expand All @@ -264,6 +267,7 @@ private void executeSubPlans(
executionInfo,
runner,
request,
subPlansResults,
// Ensure we don't have subplan flag stuck in there on failure
ActionListener.runAfter(listener, executionInfo::finishSubPlans)
);
Expand All @@ -281,6 +285,7 @@ private void executeSubPlan(
EsqlExecutionInfo executionInfo,
PlanRunner runner,
EsqlQueryRequest request,
Set<LocalRelation> subPlansResults,
ActionListener<Result> listener
) {
LOGGER.debug("Executing subplan:\n{}", subPlans.stubReplacedSubPlan());
Expand All @@ -297,6 +302,7 @@ private void executeSubPlan(
LocalRelation resultWrapper = resultToPlan(subPlans.stubReplacedSubPlan().source(), result);
localRelationBlocks.set(resultWrapper.supplier().get());
var releasingNext = ActionListener.runAfter(next, () -> releaseLocalRelationBlocks(localRelationBlocks));
subPlansResults.add(resultWrapper);

// replace the original logical plan with the backing result
LogicalPlan newLogicalPlan = optimizedPlan.transformUp(
Expand All @@ -307,9 +313,10 @@ private void executeSubPlan(
);
// TODO: INLINE STATS can we do better here and further optimize the plan AFTER one of the subplans executed?
newLogicalPlan.setOptimized();
LOGGER.debug("Plan after previous subplan execution:\n{}", newLogicalPlan);
LOGGER.trace("Main plan change after previous subplan execution:\n{}", NodeUtils.diffString(optimizedPlan, newLogicalPlan));

// look for the next inlinejoin plan
var newSubPlan = firstSubPlan(newLogicalPlan);
var newSubPlan = firstSubPlan(newLogicalPlan, subPlansResults);

if (newSubPlan == null) {// run the final "main" plan
executionInfo.finishSubPlans();
Expand All @@ -322,7 +329,16 @@ private void executeSubPlan(
);
}));
} else {// continue executing the subplans
executeSubPlan(completionInfoAccumulator, newLogicalPlan, newSubPlan, executionInfo, runner, request, releasingNext);
executeSubPlan(
completionInfoAccumulator,
newLogicalPlan,
newSubPlan,
executionInfo,
runner,
request,
subPlansResults,
releasingNext
);
}
} catch (Exception e) {
// safely release the blocks in case an exception occurs either before, but also after the "final" runner.run() forks off
Expand Down