ESQL: CATEGORIZE as a BlockHash#114317
Conversation
|
Pinging @elastic/es-analytical-engine (Team:Analytics) |
|
Hi @nik9000, I've created a changelog YAML for you. |
somehow
This makes them easier to be tested.
041609a to
31e9e20
Compare
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ElementType.java
Outdated
Show resolved
Hide resolved
...c/main/java/org/elasticsearch/compute/aggregation/blockhash/AbstractCategorizeBlockHash.java
Show resolved
Hide resolved
x-pack/plugin/esql/qa/testFixtures/src/main/resources/categorize.csv-spec
Outdated
Show resolved
Hide resolved
alex-spies
left a comment
There was a problem hiding this comment.
Heya, let's get this green and into main. The only work before merging is, IMO, getting the mutes/capabilities right and having correct expectations for the csv test with nulls - even if that means it needs muting, see below.
There is some immediate follow-up work that needs to be done, but that can be in subsequent PRs. I'm summarizing it here because the many comments are hard to navigate.
Remaining csv test cases:
STATS a = c, b = c BY c = CATEGORIZE(message)from test | STATS MV_COUNT(cat), COUNT(*) BY cat = CATEGORIZE(first_name)| stats mv_count(categorize(message)) by categorize(message)
Correct hashing of multivalues (+ test against regressions), see #114317 (comment).
Block hash tests:
- more cases #114317 (comment)
- stronger assertion #114317 (comment)
FoldNull: check if this change is necessary and add a comment as to why if that's still the case #114317 (comment)
Ideally:
- simplify the changes to CombineProjections #114317 (comment)
| FROM sample_data | ||
| | EVAL x = null | ||
| | STATS COUNT() BY category=CATEGORIZE(x) | ||
| | SORT category | ||
| ; | ||
|
|
||
| COUNT():long | category:keyword | ||
| ; |
There was a problem hiding this comment.
Let's mark this with -Ignore but let's put the correct expectation here - and in the test below it.
| * Base BlockHash implementation for {@code Categorize} grouping function. | ||
| */ | ||
| public abstract class AbstractCategorizeBlockHash extends BlockHash { | ||
| // TODO: this should probably also take an emitBatchSize |
There was a problem hiding this comment.
TLDR: It's probably not important for the single-element BlockHash implementations like this.
So emitBatchSize is a request to call AddInput#add every emitBatchSize entries. It's designed to prevent building a huge page of ordinals when processing STATS BY a, b and a and b are no single valued - especially if they are both multivaued. There it's the contract to put the row into an ordering for all combinations of a and b values. Since that can explode into a huge number of rows, we batch it.
This is much much less important for single element BlockHash implementations. They don't change the number of output rows. That's true even for CATEGORIZE. And if the incoming page already "wasn't too big" then the page of ordinals passed to the aggs can't be that big either.
When I first built this I thought I might apply this to single valued BlockHash implementations as well. It'd be consistent. It's lame to ignore this request. But it isn't important so I never got to it.
| out.writeVInt(categorizer.getCategoryCount()); | ||
| for (SerializableTokenListCategory category : categorizer.toCategoriesById()) { | ||
| category.writeTo(out); | ||
| } | ||
| // We're returning a block with N positions just because the Page must have all blocks with the same position count! | ||
| return blockFactory.newConstantBytesRefBlockWith(out.bytes().toBytesRef(), categorizer.getCategoryCount()); |
There was a problem hiding this comment.
Do we really need to wirite the vInt and the Pages positions hack? Can't we just write a position per category? To be more like ESQL
There was a problem hiding this comment.
Not sure what you exactly mean. The number of categories is not equal to the number of inputs texts, meaning you still have a mismatch in number of positions.
There was a problem hiding this comment.
We're building here the intermediate state to pass to the CategorizeIntermediateHashBlock, with 1 row/position per category. So I imagine we can do this in 2 ways:
The current one:
Serialize into a BytesRef with:
- int (# of categories)
- every category
And send this BytesRef in a block with N "simulated" rows
Instead, do:
Write a block with a category (Serialized in a BytesRef) per position/row.
So we don't simulate anything, and we could even consume just 2 categories later and discard the rest (Maybe? Just a _funny_ possibility, not sure if it's possible)
There was a problem hiding this comment.
My memory was that the state was one blob of bytes and not a blob per category. There's, like, shared state. But it's been a month since I thought a lot about this. And I'm wrong about lots of stuff.
| ) { | ||
| if (groups.stream().anyMatch(GroupSpec::isCategorize)) { | ||
| if (groups.size() != 1) { | ||
| throw new IllegalArgumentException("only a single CATEGORIZE group can used"); |
There was a problem hiding this comment.
Typo. Maybe also something like:
| throw new IllegalArgumentException("only a single CATEGORIZE group can used"); | |
| throw new IllegalArgumentException("if a CATEGORIZE group is present, no other groups are allowed"); |
| int end = first + count; | ||
| for (int i = first; i < end; i++) { | ||
| result.appendInt(process(vBlock.getBytesRef(i, vScratch))); | ||
| } |
There was a problem hiding this comment.
Indeed, it's broken. We didn't see it because all of our tests with multivalues use COUNT(), which just increments 1 and doesn't use any other field 💀
Fixing now and added extra tests and functions
| return new HashAggregationOperator( | ||
| aggregators, | ||
| () -> BlockHash.build(groups, driverContext.blockFactory(), maxPageSize, false), | ||
| () -> BlockHash.build(groups, aggregatorMode, driverContext.blockFactory(), maxPageSize, false), |
There was a problem hiding this comment.
We should probably make this change in the follow-up!
There was a problem hiding this comment.
Copilot reviewed 20 out of 35 changed files in this pull request and generated no suggestions.
Files not reviewed (15)
- docs/reference/esql/functions/kibana/definition/categorize.json: Language not supported
- docs/reference/esql/functions/types/categorize.asciidoc: Language not supported
- x-pack/plugin/esql/qa/testFixtures/src/main/resources/mapping-mv_sample_data.json: Language not supported
- x-pack/plugin/esql/qa/testFixtures/src/main/resources/mv_sample_data.csv: Language not supported
- muted-tests.yml: Evaluated as low risk
- x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/FoldNull.java: Evaluated as low risk
- x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/CombineProjections.java: Evaluated as low risk
- x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/grouping/Categorize.java: Evaluated as low risk
- x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/HashAggregationOperator.java: Evaluated as low risk
- x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/GroupingAggregatorFunctionTestCase.java: Evaluated as low risk
- x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/BlockHashTestCase.java: Evaluated as low risk
- x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestsDataLoader.java: Evaluated as low risk
- x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/HashAggregationOperatorTests.java: Evaluated as low risk
- x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/BlockHashTests.java: Evaluated as low risk
- x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java: Evaluated as low risk
💔 Backport failed
You can use sqren/backport to manually backport by running |
…lastic#117367) Set/Collection#add() is supposed to return `true` if the collection changed (If it actually added something). In this case, it must return if the old value is null. Extracted from elastic#114317 (Where it's being used)
Re-implement `CATEGORIZE` in a way that works for multi-node clusters. This requires that data is first categorized on each data node in a first pass, then the categorizers from each data node are merged on the coordinator node and previously categorized rows are re-categorized. BlockHashes, used in HashAggregations, already work in a very similar way. E.g. for queries like `... | STATS ... BY field1, field2` they map values for `field1` and `field2` to unique integer ids that are then passed to the actual aggregate functions to identify which "bucket" a row belongs to. When passed from the data nodes to the coordinator, the BlockHashes are also merged to obtain unique ids for every value in `field1, field2` that is seen on the coordinator (not only on the local data nodes). Therefore, we re-implement `CATEGORIZE` as a special BlockHash. To choose the correct BlockHash when a query plan is mapped to physical operations, the `AggregateExec` query plan node needs to know that we will be categorizing the field `message` in a query containing `... | STATS ... BY c = CATEGORIZE(message)`. For this reason, _we do not extract the expression_ `c = CATEGORIZE(message)` into an `EVAL` node, in contrast to e.g. `STATS ... BY b = BUCKET(field, 10)`. The expression `c = CATEGORIZE(message)` simply remains inside the `AggregateExec`'s groupings. **Important limitation:** For now, to use `CATEGORIZE` in a `STATS` command, there can be only 1 grouping (the `CATEGORIZE`) overall.
Re-implement
CATEGORIZEin a way that works for multi-node clusters.This requires that data is first categorized on each data node in a first pass, then the categorizers from each data node are merged on the coordinator node and previously categorized rows are re-categorized.
BlockHashes, used in HashAggregations, already work in a very similar way. E.g. for queries like
... | STATS ... BY field1, field2they map values forfield1andfield2to unique integer ids that are then passed to the actual aggregate functions to identify which "bucket" a row belongs to. When passed from the data nodes to the coordinator, the BlockHashes are also merged to obtain unique ids for every value infield1, field2that is seen on the coordinator (not only on the local data nodes).Therefore, we re-implement
CATEGORIZEas a special BlockHash.To choose the correct BlockHash when a query plan is mapped to physical operations, the
AggregateExecquery plan node needs to know that we will be categorizing the fieldmessagein a query containing... | STATS ... BY c = CATEGORIZE(message). For this reason, we do not extract the expressionc = CATEGORIZE(message)into anEVALnode, in contrast to e.g.STATS ... BY b = BUCKET(field, 10). The expressionc = CATEGORIZE(message)simply remains inside theAggregateExec's groupings.Important limitation: For now, to use
CATEGORIZEin aSTATScommand, there can be only 1 grouping (theCATEGORIZE) overall.