ESQL: Calculate concurrent node limit#124901
Conversation
# Conflicts: # x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java
# Conflicts: # x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/ManyShardsIT.java # x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java # x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java # x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java # x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java
| * Used to avoid overloading the cluster with concurrent requests that may not be needed. | ||
| * </p> | ||
| * | ||
| * @return Null if there should be no limit, otherwise, the maximum number of nodes that should be executed concurrently. |
There was a problem hiding this comment.
This could be -1 for no limit, to work like the pragma
There was a problem hiding this comment.
I'd probably move this to the class javadoc.
| * @return Null if there should be no limit, otherwise, the maximum number of nodes that should be executed concurrently. | ||
| */ | ||
| public Integer calculateNodesConcurrency(PhysicalPlan dataNodePlan, Configuration configuration) { | ||
| // TODO: Request FoldContext or a context containing it |
There was a problem hiding this comment.
This is needed for the limit.limit().fold(...). But we can probably assert that it's a Literal, and avoid folding
| // TODO: Do some conversion here | ||
| return limit; |
There was a problem hiding this comment.
The logic we choose here may be quite arbitrary without some real statistics of the nodes/shard
There was a problem hiding this comment.
I would probably limit to 2 for everything up to 10. Or may be something like Math.max(2, log(limit))
| // # Negative cases | ||
| // - FROM | STATS: Fragment[EsRelation, Aggregate] | ||
| // - SORT: Fragment[EsRelation, TopN] | ||
| // - WHERE: Fragment[EsRelation, Filter] |
There was a problem hiding this comment.
When getting the LIMIT value:
- The WHERE is already taken into account explicitly.
- The STATS can't have a LIMIT in the datanode side, so it's fine.
- The SORT shouldn't happen, as we look for a
Limitafter theEsRelation, and theLimitwould be aTopNotherwise.
Those are mostly assumptions; there's still a lot of testing to do with different commands that could break them
luigidellaquila
left a comment
There was a problem hiding this comment.
I had a first quick look and left a couple of comments.
In general, for now I think it's acceptable to have this component at this level as it's simple enough, but on the other hand this could benefit from some additional information (eg. LocalPhysicalOptimizerContext and SearchStats) that is available at physical planning time.
More in abstract, this should be part of a cost based execution planning process, but it's way too complicated as a topic for now.
| } else if (relationFound.get() && filterFound.get() == false) { | ||
| // We only care about the limit if there's a relation before it, and no filter in between | ||
| if (node instanceof Limit limit) { | ||
| assert limitValue.get() == null : "Multiple limits found in the same data node plan"; |
There was a problem hiding this comment.
This could still happen, eg. with MV_EXPAND | LIMIT, that becomes LIMIT | MV_EXPAND | LIMIT
There was a problem hiding this comment.
Removed that assertion, to just use the first limit it finds, which is what makes sense in any case
| logicalPlan.forEachUp(node -> { | ||
| if (node instanceof EsRelation) { | ||
| relationFound.set(true); | ||
| } else if (node instanceof Filter) { |
There was a problem hiding this comment.
I'm not sure this blacklisting is safe in the long term.
I'd prefer to have a whitelist approach, ie. a set of plan types that can be present after EsRelation and that we know are safe to ignore before a LIMIT.
There was a problem hiding this comment.
Initially changed it to a whitelist, but after adding test for every command, Limit is effectively pushed down always. So now it's just an "If not a relation or limit -> 💀"
| // 10 | 3 | ||
| // 1000 | 9 | ||
| // 100000 | 16 | ||
| return Math.max(2, (int) (Math.log(limit) / Math.log(2))); |
There was a problem hiding this comment.
Lets not limit queries with limits higher than 1000 for now.
It might become slower when querying a lot of shards with small number of shards.
There was a problem hiding this comment.
// Limit | Concurrency
// 1 | 2
// 10 | 3
// 1000 | 9
Above makes sense, but I would like to confirm with @costin about it
There was a problem hiding this comment.
I'm fine with this heuristic. You can always override it.
Do we get here with | LIMIT 0? Could you make sure we have tests for that?
| | LOOKUP JOIN languages_lookup on language_code | ||
| | LIMIT 1024 | ||
| """, 10); | ||
| } |
There was a problem hiding this comment.
I wish we have junit 5 parametrization for this: https://junit.org/junit5/docs/current/user-guide/#writing-tests-parameterized-tests-sources-CsvSource
There was a problem hiding this comment.
I was looking for that when doing it, but we only have class-level parameterized tests...
I was also checking assertAll(), but, of course, junit5 too 💀
Luckily there aren't that many cases now, we can refactor them if we add more and they're similar
|
Pinging @elastic/es-analytical-engine (Team:Analytics) |
|
Hi @ivancea, I've created a changelog YAML for you. |
| // 10 | 3 | ||
| // 1000 | 9 | ||
| // 100000 | 16 | ||
| return Math.max(2, (int) (Math.log(limit) / Math.log(2))); |
There was a problem hiding this comment.
Super-duper driveby: maybe it's simpler to take 31-Integer.numberOfLeadingZeros(limit) to compute the log2.
nik9000
left a comment
There was a problem hiding this comment.
LGTM so long as we have tests for |LIMIT 0 and we're sure this doesn't break them.
It'd be cool to see this on tests for bigger clusters. I bet it'll be compelling. I'm really curious to see about follow up that let us apply this for things like FROM | SORT | LIMIT - that's trickier but it'll be lovely one day!
| * Used to avoid overloading the cluster with concurrent requests that may not be needed. | ||
| * </p> | ||
| * | ||
| * @return Null if there should be no limit, otherwise, the maximum number of nodes that should be executed concurrently. |
There was a problem hiding this comment.
I'd probably move this to the class javadoc.
| * Used to avoid overloading the cluster with concurrent requests that may not be needed. | ||
| * </p> | ||
| * | ||
| * @return Null if there should be no limit, otherwise, the maximum number of nodes that should be executed concurrently. |
| // 1 | 2 | ||
| // 10 | 3 | ||
| // 1000 | 9 | ||
| // 100000 | 16 |
There was a problem hiding this comment.
This example would violate the above.
| // 10 | 3 | ||
| // 1000 | 9 | ||
| // 100000 | 16 | ||
| return Math.max(2, (int) (Math.log(limit) / Math.log(2))); |
There was a problem hiding this comment.
I'm fine with this heuristic. You can always override it.
Do we get here with | LIMIT 0? Could you make sure we have tests for that?
Calculate the maximum concurrent nodes for a query, based on whether the datanode plan has a limit or not (And no other conditions/nodes before).
The concurrency limit is calculated as the
log2(limit).Also, changed the query pragma to not have an upper limit, allowing users to effectively override any calculation with a bigger limit.