Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
23ff7c5
ESQL: Heuristics to pick efficient partitioning
nik9000 Mar 26, 2025
47bb86c
Update docs/changelog/125739.yaml
nik9000 Mar 26, 2025
527c69a
Fix TODO
nik9000 Mar 27, 2025
815dec5
Merge remote-tracking branch 'nik9000/esql_auto_partition' into esql_…
nik9000 Mar 27, 2025
c72af75
Report partitioning strategies
nik9000 Mar 27, 2025
e2e817d
Test report
nik9000 Mar 27, 2025
a348ef9
test
nik9000 Mar 27, 2025
d201b4a
[CI] Auto commit changes from spotless
Mar 27, 2025
fd41476
Merge branch 'main' into esql_auto_partition
nik9000 Mar 27, 2025
8533d07
Merge branch 'main' into esql_auto_partition
nik9000 Mar 28, 2025
e72673c
Merge branch 'main' into esql_auto_partition
nik9000 Mar 31, 2025
05e487e
Merge branch 'main' into esql_auto_partition
nik9000 Apr 1, 2025
bc92147
[CI] Auto commit changes from spotless
Apr 1, 2025
05175e5
Cluster setting
nik9000 Apr 1, 2025
03f029c
in
nik9000 Apr 1, 2025
caf5f05
Merge remote-tracking branch 'nik9000/esql_auto_partition' into esql_…
nik9000 Apr 1, 2025
5ab7032
Fix rewrite
nik9000 Apr 1, 2025
8f72b1d
Merge branch 'main' into esql_auto_partition
nik9000 Apr 2, 2025
ae48a6f
Merge branch 'main' into esql_auto_partition
nik9000 Apr 7, 2025
a4fd6c5
Merge branch 'main' into esql_auto_partition
nik9000 Apr 8, 2025
2f7ecc8
Merge branch 'main' into esql_auto_partition
nik9000 Apr 9, 2025
2ab82a8
Merge branch 'main' into esql_auto_partition
nik9000 Apr 9, 2025
c816934
Fixup merge
nik9000 Apr 9, 2025
bf3c076
Merge branch 'main' into esql_auto_partition
nik9000 Apr 9, 2025
ad9e064
Try and make this more consistent
nik9000 Apr 9, 2025
334c1bc
Merge branch 'main' into esql_auto_partition
nik9000 Apr 9, 2025
66aa691
Merge branch 'main' into esql_auto_partition
nik9000 Apr 9, 2025
409067c
Merge branch 'main' into esql_auto_partition
nik9000 Apr 9, 2025
9b7728e
Merge branch 'main' into esql_auto_partition
nik9000 Apr 10, 2025
a0ebc79
Merge branch 'main' into esql_auto_partition
nik9000 Apr 10, 2025
77608d3
Merge branch 'main' into esql_auto_partition
nik9000 Apr 10, 2025
2eb1ac7
Merge remote-tracking branch 'nik9000/esql_auto_partition' into esql_…
nik9000 Apr 10, 2025
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/125739.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 125739
summary: Heuristics to pick efficient partitioning
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ static TransportVersion def(int id) {
public static final TransportVersion SEMANTIC_TEXT_CHUNKING_CONFIG = def(9_047_00_0);
public static final TransportVersion REPO_ANALYSIS_COPY_BLOB = def(9_048_00_0);
public static final TransportVersion AMAZON_BEDROCK_TASK_SETTINGS = def(9_049_00_0);
public static final TransportVersion ESQL_REPORT_SHARD_PARTITIONING = def(9_050_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,39 @@

package org.elasticsearch.compute.lucene;

public enum DataPartitioning {
import org.elasticsearch.compute.operator.Driver;

/**
* How we partition the data across {@link Driver}s. Each request forks into
* {@code min(1.5 * cpus, partition_count)} threads on the data node. More partitions
* allow us to bring more threads to bear on CPU intensive data node side tasks.
*/
public enum DataPartitioning {
/**
* Automatically select the data partitioning based on the query and index.
* Usually that's {@link #SEGMENT}, but for small indices it's {@link #SHARD}.
* When the additional overhead from {@link #DOC} is fairly low then it'll
* pick {@link #DOC}.
*/
AUTO,
/**
* Make one partition per shard. This is generally the slowest option, but it
* has the lowest CPU overhead.
*/
SHARD,

/**
* Partition on segment boundaries, this doesn't allow forking to as many CPUs
* as {@link #DOC} but it has much lower overhead.
* <p>
* It packs segments smaller than {@link LuceneSliceQueue#MAX_DOCS_PER_SLICE}
* docs together into a partition. Larger segments get their own partition.
* Each slice contains no more than {@link LuceneSliceQueue#MAX_SEGMENTS_PER_SLICE}.
*/
SEGMENT,

/**
* Partition each shard into {@code task_concurrency} partitions, splitting
* larger segments into slices. This allows bringing the most CPUs to bear on
* the problem but adds extra overhead, especially in query preparation.
*/
DOC,
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,16 @@ public Factory(
int taskConcurrency,
int limit
) {
super(contexts, weightFunction(queryFunction, ScoreMode.COMPLETE_NO_SCORES), dataPartitioning, taskConcurrency, limit, false);
super(
contexts,
queryFunction,
dataPartitioning,
query -> LuceneSliceQueue.PartitioningStrategy.SHARD,
taskConcurrency,
limit,
false,
ScoreMode.COMPLETE_NO_SCORES
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import java.util.List;
import java.util.function.Function;

import static org.elasticsearch.compute.lucene.LuceneOperator.weightFunction;

/**
* Factory that generates an operator that finds the max value of a field using the {@link LuceneMinMaxOperator}.
*/
Expand Down Expand Up @@ -123,7 +121,16 @@ public LuceneMaxFactory(
NumberType numberType,
int limit
) {
super(contexts, weightFunction(queryFunction, ScoreMode.COMPLETE_NO_SCORES), dataPartitioning, taskConcurrency, limit, false);
super(
contexts,
queryFunction,
dataPartitioning,
query -> LuceneSliceQueue.PartitioningStrategy.SHARD,
taskConcurrency,
limit,
false,
ScoreMode.COMPLETE_NO_SCORES
);
this.fieldName = fieldName;
this.numberType = numberType;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import java.util.List;
import java.util.function.Function;

import static org.elasticsearch.compute.lucene.LuceneOperator.weightFunction;

/**
* Factory that generates an operator that finds the min value of a field using the {@link LuceneMinMaxOperator}.
*/
Expand Down Expand Up @@ -123,7 +121,16 @@ public LuceneMinFactory(
NumberType numberType,
int limit
) {
super(contexts, weightFunction(queryFunction, ScoreMode.COMPLETE_NO_SCORES), dataPartitioning, taskConcurrency, limit, false);
super(
contexts,
queryFunction,
dataPartitioning,
query -> LuceneSliceQueue.PartitioningStrategy.SHARD,
taskConcurrency,
limit,
false,
ScoreMode.COMPLETE_NO_SCORES
);
this.fieldName = fieldName;
this.numberType = numberType;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.BulkScorer;
import org.apache.lucene.search.ConstantScoreQuery;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.LeafCollector;
import org.apache.lucene.search.Query;
Expand Down Expand Up @@ -37,12 +36,16 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.elasticsearch.TransportVersions.ESQL_REPORT_SHARD_PARTITIONING;

public abstract class LuceneOperator extends SourceOperator {
private static final Logger logger = LogManager.getLogger(LuceneOperator.class);

Expand Down Expand Up @@ -93,15 +96,17 @@ public abstract static class Factory implements SourceOperator.SourceOperatorFac
*/
protected Factory(
List<? extends ShardContext> contexts,
Function<ShardContext, Weight> weightFunction,
Function<ShardContext, Query> queryFunction,
DataPartitioning dataPartitioning,
Function<Query, LuceneSliceQueue.PartitioningStrategy> autoStrategy,
int taskConcurrency,
int limit,
boolean needsScore
boolean needsScore,
ScoreMode scoreMode
) {
this.limit = limit;
this.dataPartitioning = dataPartitioning;
this.sliceQueue = LuceneSliceQueue.create(contexts, weightFunction, dataPartitioning, taskConcurrency);
this.sliceQueue = LuceneSliceQueue.create(contexts, queryFunction, dataPartitioning, autoStrategy, taskConcurrency, scoreMode);
this.taskConcurrency = Math.min(sliceQueue.totalSlices(), taskConcurrency);
this.needsScore = needsScore;
}
Expand Down Expand Up @@ -269,6 +274,7 @@ public static class Status implements Operator.Status {
private final int sliceMax;
private final int current;
private final long rowsEmitted;
private final Map<String, LuceneSliceQueue.PartitioningStrategy> partitioningStrategies;

private Status(LuceneOperator operator) {
processedSlices = operator.processedSlices;
Expand All @@ -294,6 +300,7 @@ private Status(LuceneOperator operator) {
}
pagesEmitted = operator.pagesEmitted;
rowsEmitted = operator.rowsEmitted;
partitioningStrategies = operator.sliceQueue.partitioningStrategies();
}

Status(
Expand All @@ -307,7 +314,8 @@ private Status(LuceneOperator operator) {
int sliceMin,
int sliceMax,
int current,
long rowsEmitted
long rowsEmitted,
Map<String, LuceneSliceQueue.PartitioningStrategy> partitioningStrategies
) {
this.processedSlices = processedSlices;
this.processedQueries = processedQueries;
Expand All @@ -320,6 +328,7 @@ private Status(LuceneOperator operator) {
this.sliceMax = sliceMax;
this.current = current;
this.rowsEmitted = rowsEmitted;
this.partitioningStrategies = partitioningStrategies;
}

Status(StreamInput in) throws IOException {
Expand All @@ -343,6 +352,9 @@ private Status(LuceneOperator operator) {
} else {
rowsEmitted = 0;
}
partitioningStrategies = in.getTransportVersion().onOrAfter(ESQL_REPORT_SHARD_PARTITIONING)
? in.readMap(LuceneSliceQueue.PartitioningStrategy::readFrom)
: Map.of();
}

@Override
Expand All @@ -364,6 +376,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_ROWS_PROCESSED)) {
out.writeVLong(rowsEmitted);
}
if (out.getTransportVersion().onOrAfter(ESQL_REPORT_SHARD_PARTITIONING)) {
out.writeMap(partitioningStrategies, StreamOutput::writeString, StreamOutput::writeWriteable);
}
}

@Override
Expand Down Expand Up @@ -415,6 +430,10 @@ public long rowsEmitted() {
return rowsEmitted;
}

public Map<String, LuceneSliceQueue.PartitioningStrategy> partitioningStrategies() {
return partitioningStrategies;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
Expand All @@ -432,6 +451,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field("slice_max", sliceMax);
builder.field("current", current);
builder.field("rows_emitted", rowsEmitted);
builder.field("partitioning_strategies", new TreeMap<>(this.partitioningStrategies));
return builder.endObject();
}

Expand All @@ -450,12 +470,23 @@ public boolean equals(Object o) {
&& sliceMin == status.sliceMin
&& sliceMax == status.sliceMax
&& current == status.current
&& rowsEmitted == status.rowsEmitted;
&& rowsEmitted == status.rowsEmitted
&& partitioningStrategies.equals(status.partitioningStrategies);
}

@Override
public int hashCode() {
return Objects.hash(processedSlices, sliceIndex, totalSlices, pagesEmitted, sliceMin, sliceMax, current, rowsEmitted);
return Objects.hash(
processedSlices,
sliceIndex,
totalSlices,
pagesEmitted,
sliceMin,
sliceMax,
current,
rowsEmitted,
partitioningStrategies
);
}

@Override
Expand All @@ -468,17 +499,4 @@ public TransportVersion getMinimalSupportedVersion() {
return TransportVersions.V_8_11_X;
}
}

static Function<ShardContext, Weight> weightFunction(Function<ShardContext, Query> queryFunction, ScoreMode scoreMode) {
return ctx -> {
final var query = queryFunction.apply(ctx);
final var searcher = ctx.searcher();
try {
Query actualQuery = scoreMode.needsScores() ? query : new ConstantScoreQuery(query);
return searcher.createWeight(searcher.rewrite(actualQuery), scoreMode, 1);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
};
}
}
Loading
Loading