Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
bcde6be
LookupJoin prejoin filter POC WIP
julian-elastic Jul 28, 2025
d24dab3
Get basic case with translatable filters to work
julian-elastic Jul 29, 2025
9603a7c
Fix failing UTs
julian-elastic Aug 13, 2025
e73996f
Fix failing UTs part 2
julian-elastic Aug 14, 2025
278877a
Add additional checks for right pushable filters
julian-elastic Aug 14, 2025
ec9817d
Merge branch 'main' into lookupPrefilter_v2
julian-elastic Aug 15, 2025
a59d0de
Update docs/changelog/132889.yaml
julian-elastic Aug 15, 2025
6e6e28e
Switch to storing the filter in Join
julian-elastic Aug 15, 2025
018b40d
Switch to storing the filter in Join
julian-elastic Aug 15, 2025
2ee02a1
bugfix
julian-elastic Aug 16, 2025
abfe672
Limit the change to pushable filters only, make the filter optional
julian-elastic Aug 19, 2025
7f82362
Add more UTs
julian-elastic Aug 20, 2025
ba9ab52
Merge branch 'main' into lookupPrefilterPushable
julian-elastic Aug 20, 2025
2aa2b49
Clean up, add more UTs
julian-elastic Aug 20, 2025
0e7b3ed
Update docs/changelog/133166.yaml
julian-elastic Aug 20, 2025
66c126f
Fix a bug where a mix of pushable and non-pushable filters resulted i…
julian-elastic Aug 21, 2025
84d2dcb
Address code review comments, add UTs
julian-elastic Aug 22, 2025
6b41aa9
Merge branch 'main' into lookupPrefilterPushable
julian-elastic Aug 22, 2025
d32cdc4
Fix formatting for UT
julian-elastic Aug 22, 2025
68d319b
Switch to storing the optional filter in the RHS of the Join
julian-elastic Aug 26, 2025
5405235
Address code review feedback
julian-elastic Aug 26, 2025
03796e0
Fix merge errors
julian-elastic Aug 27, 2025
c15df0a
Address a missed comment
julian-elastic Aug 27, 2025
63018a7
Switch to passing local logical plan to lookup node
julian-elastic Aug 27, 2025
76b4042
Switch to passing local logical plan to lookup node
julian-elastic Aug 28, 2025
4205693
Address more code review feedback
julian-elastic Aug 28, 2025
5a2b2fd
Merge branch 'main' into lookupPrefilterPushable
julian-elastic Aug 28, 2025
3c39e90
fix failing UT
julian-elastic Aug 28, 2025
35121eb
Merge branch 'main' into lookupPrefilterPushable
julian-elastic Aug 28, 2025
40c6d7a
Merge branch 'main' into lookupPrefilterPushable
julian-elastic Aug 28, 2025
242455f
Address more code review comments
julian-elastic Aug 28, 2025
f5cb543
Merge branch 'main' into lookupPrefilterPushable
julian-elastic Sep 2, 2025
f7ff90e
Address code review comments
julian-elastic Sep 2, 2025
3116545
Address code review comments, part 2
julian-elastic Sep 2, 2025
7a8af28
Merge branch 'main' into lookupPrefilterPushable
julian-elastic Sep 3, 2025
c0733f7
Address more code review comments
julian-elastic Sep 3, 2025
166130f
Merge branch 'main' into lookupPrefilterPushable
julian-elastic Sep 3, 2025
0550dae
Merge branch 'main' into lookupPrefilterPushable
julian-elastic Sep 3, 2025
328af0a
Merge branch 'main' into lookupPrefilterPushable
julian-elastic Sep 3, 2025
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/133166.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 133166
summary: Improve Expanding Lookup Join performance by pushing a filter to the right
side of the lookup join
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ static TransportVersion def(int id) {
public static final TransportVersion STREAMS_ENDPOINT_PARAM_RESTRICTIONS = def(9_148_0_00);
public static final TransportVersion RESOLVE_INDEX_MODE_FILTER = def(9_149_0_00);
public static final TransportVersion SEMANTIC_QUERY_MULTIPLE_INFERENCE_IDS = def(9_150_0_00);
public static final TransportVersion ESQL_LOOKUP_JOIN_PRE_JOIN_FILTER = def(9_151_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ public Query regexpQuery(
}

/**
* Returns a Lucine pushable Query for the current field
* Returns a Lucene pushable Query for the current field
* For now can only be AutomatonQuery or MatchAllDocsQuery() or MatchNoDocsQuery()
*/
public Query automatonQuery(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -698,26 +698,42 @@ private Map<String, Object> fetchMvLongs() throws IOException {
public void testLookupExplosion() throws IOException {
int sensorDataCount = 400;
int lookupEntries = 10000;
Map<?, ?> map = lookupExplosion(sensorDataCount, lookupEntries, 1);
Map<?, ?> map = lookupExplosion(sensorDataCount, lookupEntries, 1, lookupEntries);
assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount * lookupEntries))));
}

public void testLookupExplosionManyFields() throws IOException {
int sensorDataCount = 400;
int lookupEntries = 1000;
int joinFieldsCount = 990;
Map<?, ?> map = lookupExplosion(sensorDataCount, lookupEntries, joinFieldsCount);
Map<?, ?> map = lookupExplosion(sensorDataCount, lookupEntries, joinFieldsCount, lookupEntries);
assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount * lookupEntries))));
}

public void testLookupExplosionManyMatchesManyFields() throws IOException {
// 1500, 10000 is enough locally, but some CI machines need more.
assertCircuitBreaks(attempt -> lookupExplosion(attempt * 1500, 10000, 30));
int lookupEntries = 10000;
assertCircuitBreaks(attempt -> lookupExplosion(attempt * 1500, lookupEntries, 30, lookupEntries));
}

public void testLookupExplosionManyMatches() throws IOException {
// 1500, 10000 is enough locally, but some CI machines need more.
assertCircuitBreaks(attempt -> lookupExplosion(attempt * 1500, 10000, 1));
int lookupEntries = 10000;
assertCircuitBreaks(attempt -> lookupExplosion(attempt * 1500, lookupEntries, 1, lookupEntries));
}

public void testLookupExplosionManyMatchesFiltered() throws IOException {
// This test will only work with the expanding join optimization
// that pushes the filter to the right side of the lookup.
// Without the optimization, it will fail with circuit_breaking_exception
int sensorDataCount = 10000;
int lookupEntries = 10000;
int reductionFactor = 1000; // reduce the number of matches by this factor
// lookupEntries % reductionFactor must be 0 to ensure the number of rows returned matches the expected value
assertTrue(0 == lookupEntries % reductionFactor);
Map<?, ?> map = lookupExplosion(sensorDataCount, lookupEntries, 1, lookupEntries / reductionFactor);
assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount * lookupEntries / reductionFactor))));

}

public void testLookupExplosionNoFetch() throws IOException {
Expand All @@ -744,7 +760,8 @@ public void testLookupExplosionBigStringManyMatches() throws IOException {
assertCircuitBreaks(attempt -> lookupExplosionBigString(attempt * 500, 1));
}

private Map<String, Object> lookupExplosion(int sensorDataCount, int lookupEntries, int joinFieldsCount) throws IOException {
private Map<String, Object> lookupExplosion(int sensorDataCount, int lookupEntries, int joinFieldsCount, int lookupEntriesToKeep)
throws IOException {
try {
lookupExplosionData(sensorDataCount, lookupEntries, joinFieldsCount);
StringBuilder query = startQuery();
Expand All @@ -755,7 +772,14 @@ private Map<String, Object> lookupExplosion(int sensorDataCount, int lookupEntri
}
query.append("id").append(i);
}
query.append(" | STATS COUNT(location)\"}");
if (lookupEntries != lookupEntriesToKeep) {
// add a filter to reduce the number of matches
// we add both a Lucene pushable filter and a non-pushable filter
// this is to make sure that even if there are non-pushable filters the pushable filters is still applied
query.append(" | WHERE ABS(filter_key) > -1 AND filter_key < ").append(lookupEntriesToKeep);

}
query.append(" | STATS COUNT(location) | LIMIT 100\"}");
return responseAsMap(query(query.toString(), null));
} finally {
deleteIndex("sensor_data");
Expand Down Expand Up @@ -1038,7 +1062,8 @@ private void initSensorLookup(int lookupEntries, int sensorCount, IntFunction<St
createIndexBuilder.append("\"id").append(i).append("\": { \"type\": \"long\" },");
}
createIndexBuilder.append("""
"location": { "type": "geo_point" }
"location": { "type": "geo_point" },
"filter_key": { "type": "integer" }
}
}""");
CreateIndexResponse response = createIndex(
Expand All @@ -1058,7 +1083,7 @@ private void initSensorLookup(int lookupEntries, int sensorCount, IntFunction<St
data.append(String.format(Locale.ROOT, "\"id%d\":%d, ", j, sensor));
}
data.append(String.format(Locale.ROOT, """
"location": "POINT(%s)"}\n""", location.apply(sensor)));
"location": "POINT(%s)", "filter_key": %d}\n""", location.apply(sensor), i));
if (i % docsPerBulk == docsPerBulk - 1) {
bulk("sensor_lookup", data.toString());
data.setLength(0);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ protected final Operator.OperatorFactory simple() {
/**
* Makes sure the description of {@link #simple} matches the {@link #expectedDescriptionOfSimple}.
*/
public final void testSimpleDescription() {
public void testSimpleDescription() {
Operator.OperatorFactory factory = simple();
String description = factory.describe();
assertThat(description, expectedDescriptionOfSimple());
Expand Down
Loading