Skip to content
Merged
6 changes: 6 additions & 0 deletions docs/changelog/125636.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 125636
summary: Make `numberOfChannels` consistent with layout map by removing duplicated
`ChannelSet`
area: ES|QL
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -994,6 +994,32 @@ private void validateResultsOfDoubleParametersForIdentifiers(RequestObjectBuilde
assertEquals(List.of(List.of(false, 9.1), List.of(true, 8.1)), result.get("values"));
}

public void testMultipleBatchesWithLookupJoin() throws IOException {
assumeTrue(
"Makes numberOfChannels consistent with layout map for join with multiple batches",
EsqlCapabilities.Cap.MAKE_NUMBER_OF_CHANNELS_CONSISTENT_WITH_LAYOUT.isEnabled()
);
// Create more than 10 indices to trigger multiple batches of data node execution.
// The sort field should be missing on some indices to reproduce NullPointerException caused by duplicated items in layout
for (int i = 1; i <= 20; i++) {
createIndex("idx" + i, randomBoolean(), "\"mappings\": {\"properties\" : {\"a\" : {\"type\" : \"keyword\"}}}");
}
bulkLoadTestDataLookupMode(10);
// lookup join with and without sort
for (String sort : List.of("", "| sort integer")) {
var query = requestObjectBuilder().query(format(null, "from * | lookup join {} on integer {}", testIndexName(), sort));
Map<String, Object> result = runEsql(query);
var columns = as(result.get("columns"), List.class);
assertEquals(21, columns.size());
var values = as(result.get("values"), List.class);
assertEquals(10, values.size());
}
// clean up
for (int i = 1; i <= 20; i++) {
assertThat(deleteIndex("idx" + i).isAcknowledged(), is(true));
}
}

public void testErrorMessageForLiteralDateMathOverflow() throws IOException {
List<String> dateMathOverflowExpressions = List.of(
"2147483647 day + 1 day",
Expand Down Expand Up @@ -1668,6 +1694,13 @@ private static String repeatValueAsMV(Object value) {
return "[" + value + ", " + value + "]";
}

private static void createIndex(String indexName, boolean lookupMode, String mapping) throws IOException {
Request request = new Request("PUT", "/" + indexName);
String settings = "\"settings\" : {\"mode\" : \"lookup\"}, ";
request.setJsonEntity("{" + (lookupMode ? settings : "") + mapping + "}");
assertEquals(200, client().performRequest(request).getStatusLine().getStatusCode());
}

public static RequestObjectBuilder requestObjectBuilder() throws IOException {
return new RequestObjectBuilder();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1452,3 +1452,90 @@ emp_no:integer | language_code:integer | language_name:keyword
10092 | 1 | English
10093 | 3 | Spanish
;

multipleBatchesWithSort
required_capability: join_lookup_v12
required_capability: remove_redundant_sort
required_capability: make_number_of_channels_consistent_with_layout

from *
| rename city.country.continent.planet.name as message
| lookup join message_types_lookup on message
| sort language_code, birth_date
| keep language_code
| limit 1
;

language_code:integer
1
;

multipleBatchesWithMvExpand
required_capability: join_lookup_v12
required_capability: remove_redundant_sort
required_capability: make_number_of_channels_consistent_with_layout

from *
| rename city.country.continent.planet.name as message
| lookup join message_types_lookup on message
| keep birth_date, language_code
| mv_expand birth_date
| sort birth_date, language_code
| limit 1
;

birth_date:datetime |language_code:integer
1952-02-27T00:00:00.000Z |null
;

multipleBatchesWithAggregate1
required_capability: join_lookup_v12
required_capability: remove_redundant_sort
required_capability: make_number_of_channels_consistent_with_layout

from *
| rename city.country.continent.planet.name as message
| lookup join message_types_lookup on message
| keep birth_date, language_code
| stats x=max(birth_date), y=min(language_code)
;

x:datetime |y:integer
1965-01-03T00:00:00.000Z |1
;

multipleBatchesWithAggregate2
required_capability: join_lookup_v12
required_capability: remove_redundant_sort
required_capability: make_number_of_channels_consistent_with_layout

from *
| rename city.country.continent.planet.name as message
| lookup join message_types_lookup on message
| keep birth_date, language_code
| stats m=min(birth_date) by language_code
| sort language_code
| limit 1
;

m:datetime |language_code:integer
null |1
;

multipleBatchesWithAggregate3
required_capability: join_lookup_v12
required_capability: remove_redundant_sort
required_capability: make_number_of_channels_consistent_with_layout

from *
| rename city.country.continent.planet.name as message
| lookup join message_types_lookup on message
| keep birth_date, language_code
| stats m=min(language_code) by birth_date
| sort birth_date
| limit 1
;

m:integer |birth_date:datetime
null |1952-02-27T00:00:00.000Z
;
Original file line number Diff line number Diff line change
Expand Up @@ -932,7 +932,12 @@ public enum Cap {
/**
* Index component selector syntax (my-data-stream-name::failures)
*/
INDEX_COMPONENT_SELECTORS(DataStream.isFailureStoreFeatureFlagEnabled());
INDEX_COMPONENT_SELECTORS(DataStream.isFailureStoreFeatureFlagEnabled()),

/**
* Make numberOfChannels consistent with layout in DefaultLayout by removing duplicated ChannelSet.
*/
MAKE_NUMBER_OF_CHANNELS_CONSISTENT_WITH_LAYOUT;

private final boolean enabled;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,20 @@ public Layout build() {
Map<NameId, ChannelAndType> layout = new HashMap<>();
int numberOfChannels = 0;
for (ChannelSet set : channels) {
int channel = numberOfChannels++;
boolean createNewChannel = true;
int channel = 0;
for (NameId id : set.nameIds) {
if (layout.containsKey(id)) {
// If a NameId already exists in the map, do not increase the numberOfChannels, it can cause inverse() to create
// a null in the list of channels, and NullPointerException when build() is called.
// TODO avoid adding duplicated attributes with the same id in the plan, ReplaceMissingFieldWithNull may add nulls
// with the same ids as the missing field ids.
continue;
}
if (createNewChannel) {
channel = numberOfChannels++;
createNewChannel = false;
}
ChannelAndType next = new ChannelAndType(channel, set.type);
ChannelAndType prev = layout.put(id, next);
// Do allow multiple name to point to the same channel - see https://github.com/elastic/elasticsearch/pull/100238
Expand Down
Loading