Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
b09553a
Add a pre-mapping logical plan processing step
bpintea Jan 17, 2025
cbddbc6
Update docs/changelog/120368.yaml
Jan 17, 2025
baec4ee
[CI] Auto commit changes from spotless
Jan 17, 2025
b17f6e6
Minor listerns refactoring
bpintea Jan 20, 2025
8b5d4a8
Merge branch 'main' into enh/mapping_pre_processor
bpintea Jan 20, 2025
8fbd0fc
Fix wrong auto-merge
bpintea Jan 20, 2025
62a5862
Have the FTFMapperPreprocessor update the plan in-place
bpintea Jan 27, 2025
6040f3a
rename if'aces back
bpintea Jan 28, 2025
865efe5
Merge branch 'main' into enh/mapping_pre_processor
bpintea Jan 28, 2025
aebbccf
[CI] Auto commit changes from spotless
Jan 28, 2025
39954c6
Drop the pre-mapping executor
bpintea Jan 30, 2025
77518cd
Merge branch 'main' into enh/mapping_pre_processor2
bpintea Jan 30, 2025
e08fd4d
drop wrong changelog
bpintea Jan 30, 2025
155acc7
Reintroduce a light premapping layer
bpintea Jan 30, 2025
1df65e6
Merge branch 'main' into enh/mapping_pre_processor2
bpintea Jan 30, 2025
affc2e4
Merge branch 'main' into enh/mapping_pre_processor2
bpintea Jan 30, 2025
04ef37d
Merge branch 'main' into enh/mapping_pre_processor2
bpintea Jan 30, 2025
d9a6807
Update some FTF verifications. Address review comments
bpintea Jan 30, 2025
fde7ea7
Merge branch 'main' into enh/mapping_pre_processor2
bpintea Jan 30, 2025
adada00
Update docs/changelog/121260.yaml
Jan 30, 2025
da8849c
Tests fixes
bpintea Jan 31, 2025
da56299
[ES|QL] Add aggregate metric double feature flag to its capability (#…
limotova Jan 30, 2025
9993e4a
Mute org.elasticsearch.smoketest.DocsClientYamlTestSuiteIT test {yaml…
elasticsearchmachine Jan 30, 2025
49071e0
ReindexDataStreamIndex bug in assertion caused by reference equality …
parkertimmins Jan 30, 2025
7804f28
Mute org.elasticsearch.xpack.security.profile.ProfileIntegTests testH…
elasticsearchmachine Jan 30, 2025
26a16c7
Mute org.elasticsearch.xpack.security.profile.ProfileIntegTests testA…
elasticsearchmachine Jan 30, 2025
4d8fd09
Update ESRestTestCase's ROLLUP_REQUESTS_OPTIONS (#121335)
martijnvg Jan 30, 2025
d3ac9d8
Mute org.elasticsearch.smoketest.SmokeTestMultiNodeClientYamlTestSuit…
elasticsearchmachine Jan 30, 2025
2919025
Fix propagation of dynamic mapping parameter when applying copy_to (#…
lkts Jan 30, 2025
2008096
Integrate watsonx for re-ranking task (#117176)
saikatsarkar056 Jan 31, 2025
daddb62
revert rebasing update
bpintea Jan 31, 2025
1fac8fb
Merge branch 'main' into enh/mapping_pre_processor2
bpintea Jan 31, 2025
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/121260.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 121260
summary: Introduce a pre-mapping logical plan processing step
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,16 @@ public void set(T value) {
this.value = value;
}

/**
* Sets a value in the holder, but only if none has already been set.
* @param value the new value to set.
*/
public void setIfAbsent(T value) {
if (this.value == null) {
this.value = value;
}
}

public T get() {
return value;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import org.apache.lucene.sandbox.document.HalfFloatPoint;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
Expand All @@ -31,9 +33,11 @@
import org.elasticsearch.geo.ShapeTestUtils;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.esql.action.EsqlQueryResponse;
import org.elasticsearch.xpack.esql.analysis.EnrichResolution;
Expand Down Expand Up @@ -72,8 +76,8 @@
import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier;
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
import org.elasticsearch.xpack.esql.plugin.TransportActionServices;
import org.elasticsearch.xpack.esql.session.Configuration;
import org.elasticsearch.xpack.esql.session.QueryBuilderResolver;
import org.elasticsearch.xpack.esql.stats.SearchStats;
import org.elasticsearch.xpack.esql.telemetry.Metrics;
import org.elasticsearch.xpack.versionfield.Version;
Expand Down Expand Up @@ -140,6 +144,7 @@
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.mock;

public final class EsqlTestUtils {

Expand Down Expand Up @@ -360,7 +365,14 @@ public static LogicalOptimizerContext unboundLogicalOptimizerContext() {

public static final Verifier TEST_VERIFIER = new Verifier(new Metrics(new EsqlFunctionRegistry()), new XPackLicenseState(() -> 0L));

public static final QueryBuilderResolver MOCK_QUERY_BUILDER_RESOLVER = new MockQueryBuilderResolver();
public static final TransportActionServices MOCK_TRANSPORT_ACTION_SERVICES = new TransportActionServices(
mock(TransportService.class),
mock(SearchService.class),
null,
mock(ClusterService.class),
mock(IndexNameExpressionResolver.class),
null
);

private EsqlTestUtils() {}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ public void testWhereMatchWithRow() {
var error = expectThrows(ElasticsearchException.class, () -> run(query));
assertThat(
error.getMessage(),
containsString("[MATCH] function cannot operate on [\"a brown fox\"], which is not a field from an index mapping")
containsString("line 2:15: [MATCH] function cannot operate on [content], which is not a field from an index mapping")
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ public void testWhereMatchWithRow() {
var error = expectThrows(ElasticsearchException.class, () -> run(query));
assertThat(
error.getMessage(),
containsString("[:] operator cannot operate on [\"a brown fox\"], which is not a field from an index mapping")
containsString("line 2:9: [:] operator cannot operate on [content], which is not a field from an index mapping")
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext;
import org.elasticsearch.xpack.esql.optimizer.LogicalPlanOptimizer;
import org.elasticsearch.xpack.esql.planner.mapper.Mapper;
import org.elasticsearch.xpack.esql.plugin.TransportActionServices;
import org.elasticsearch.xpack.esql.session.Configuration;
import org.elasticsearch.xpack.esql.session.EsqlSession;
import org.elasticsearch.xpack.esql.session.IndexResolver;
import org.elasticsearch.xpack.esql.session.QueryBuilderResolver;
import org.elasticsearch.xpack.esql.session.Result;
import org.elasticsearch.xpack.esql.telemetry.Metrics;
import org.elasticsearch.xpack.esql.telemetry.PlanTelemetry;
Expand Down Expand Up @@ -62,7 +62,7 @@ public void esql(
EsqlExecutionInfo executionInfo,
IndicesExpressionGrouper indicesExpressionGrouper,
EsqlSession.PlanRunner planRunner,
QueryBuilderResolver queryBuilderResolver,
TransportActionServices services,
ActionListener<Result> listener
) {
final PlanTelemetry planTelemetry = new PlanTelemetry(functionRegistry);
Expand All @@ -78,7 +78,7 @@ public void esql(
verifier,
planTelemetry,
indicesExpressionGrouper,
queryBuilderResolver
services
);
QueryMetric clientId = QueryMetric.fromString("rest");
metrics.total(clientId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

package org.elasticsearch.xpack.esql.expression.function.fulltext;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.lucene.BytesRefs;
import org.elasticsearch.compute.lucene.LuceneQueryExpressionEvaluator;
import org.elasticsearch.compute.lucene.LuceneQueryExpressionEvaluator.ShardConfig;
import org.elasticsearch.compute.operator.EvalOperator;
Expand Down Expand Up @@ -110,11 +110,7 @@ public Expression query() {
*/
public Object queryAsObject() {
Object queryAsObject = query().fold(FoldContext.small() /* TODO remove me */);
if (queryAsObject instanceof BytesRef bytesRef) {
return bytesRef.utf8ToString();
}

return queryAsObject;
return BytesRefs.toString(queryAsObject);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.Fuzziness;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.xpack.esql.capabilities.PostOptimizationVerificationAware;
import org.elasticsearch.xpack.esql.capabilities.PostAnalysisPlanVerificationAware;
import org.elasticsearch.xpack.esql.common.Failure;
import org.elasticsearch.xpack.esql.common.Failures;
import org.elasticsearch.xpack.esql.core.InvalidArgumentException;
Expand All @@ -30,6 +30,7 @@
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.core.type.DataTypeConverter;
import org.elasticsearch.xpack.esql.core.type.MultiTypeEsField;
import org.elasticsearch.xpack.esql.core.util.Check;
import org.elasticsearch.xpack.esql.core.util.NumericUtils;
import org.elasticsearch.xpack.esql.expression.function.Example;
import org.elasticsearch.xpack.esql.expression.function.FunctionInfo;
Expand All @@ -38,6 +39,7 @@
import org.elasticsearch.xpack.esql.expression.function.Param;
import org.elasticsearch.xpack.esql.expression.function.scalar.convert.AbstractConvertFunction;
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.planner.TranslatorHandler;
import org.elasticsearch.xpack.esql.querydsl.query.MatchQuery;
import org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter;
Expand All @@ -48,6 +50,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.BiConsumer;

import static java.util.Map.entry;
import static org.elasticsearch.common.logging.LoggerMessageFormat.format;
Expand Down Expand Up @@ -88,7 +91,7 @@
/**
* Full text function that performs a {@link org.elasticsearch.xpack.esql.querydsl.query.MatchQuery} .
*/
public class Match extends FullTextFunction implements OptionalArgument, PostOptimizationVerificationAware {
public class Match extends FullTextFunction implements OptionalArgument, PostAnalysisPlanVerificationAware {

public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Expression.class, "Match", Match::readFrom);
public static final Set<DataType> FIELD_DATA_TYPES = Set.of(
Expand Down Expand Up @@ -429,23 +432,23 @@ public Expression replaceQueryBuilder(QueryBuilder queryBuilder) {
}

@Override
public void postOptimizationVerification(Failures failures) {
Expression fieldExpression = field();
// Field may be converted to other data type (field_name :: data_type), so we need to check the original field
if (fieldExpression instanceof AbstractConvertFunction convertFunction) {
fieldExpression = convertFunction.field();
}
if (fieldExpression instanceof FieldAttribute == false) {
failures.add(
Failure.fail(
field,
"[{}] {} cannot operate on [{}], which is not a field from an index mapping",
functionName(),
functionType(),
field.sourceText()
)
);
}
public BiConsumer<LogicalPlan, Failures> postAnalysisPlanVerification() {
return (plan, failures) -> {
super.postAnalysisPlanVerification().accept(plan, failures);
plan.forEachExpression(Match.class, m -> {
if (m.fieldAsFieldAttribute() == null) {
failures.add(
Failure.fail(
m.field(),
"[{}] {} cannot operate on [{}], which is not a field from an index mapping",
functionName(),
functionType(),
m.field().sourceText()
)
);
}
});
};
}

@Override
Expand Down Expand Up @@ -476,22 +479,24 @@ public Object queryAsObject() {

@Override
protected Query translate(TranslatorHandler handler) {
var fieldAttribute = fieldAsFieldAttribute();
Check.notNull(fieldAttribute, "Match must have a field attribute as the first argument");
String fieldName = fieldAttribute.name();
if (fieldAttribute.field() instanceof MultiTypeEsField multiTypeEsField) {
// If we have multiple field types, we allow the query to be done, but getting the underlying field name
fieldName = multiTypeEsField.getName();
}
// Make query lenient so mixed field types can be queried when a field type is incompatible with the value provided
return new MatchQuery(source(), fieldName, queryAsObject(), matchQueryOptions());
}

private FieldAttribute fieldAsFieldAttribute() {
Expression fieldExpression = field;
// Field may be converted to other data type (field_name :: data_type), so we need to check the original field
if (fieldExpression instanceof AbstractConvertFunction convertFunction) {
fieldExpression = convertFunction.field();
}
if (fieldExpression instanceof FieldAttribute fieldAttribute) {
String fieldName = fieldAttribute.name();
if (fieldAttribute.field() instanceof MultiTypeEsField multiTypeEsField) {
// If we have multiple field types, we allow the query to be done, but getting the underlying field name
fieldName = multiTypeEsField.getName();
}
// Make query lenient so mixed field types can be queried when a field type is incompatible with the value provided
return new MatchQuery(source(), fieldName, queryAsObject(), matchQueryOptions());
}

throw new IllegalArgumentException("Match must have a field attribute as the first argument");
return fieldExpression instanceof FieldAttribute fieldAttribute ? fieldAttribute : null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.expression.function.fulltext;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ResolvedIndices;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryRewriteContext;
import org.elasticsearch.index.query.Rewriteable;
import org.elasticsearch.xpack.esql.core.util.Holder;
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.planner.TranslatorHandler;
import org.elasticsearch.xpack.esql.plugin.TransportActionServices;
import org.elasticsearch.xpack.esql.session.IndexResolver;

import java.io.IOException;
import java.util.HashSet;
import java.util.Set;

/**
* Some {@link FullTextFunction} implementations such as {@link org.elasticsearch.xpack.esql.expression.function.fulltext.Match}
* will be translated to a {@link QueryBuilder} that require a rewrite phase on the coordinator.
* {@link QueryBuilderResolver#resolveQueryBuilders(LogicalPlan, TransportActionServices, ActionListener)} will rewrite the plan by
* replacing {@link FullTextFunction} expression with new ones that hold rewritten {@link QueryBuilder}s.
*/
public final class QueryBuilderResolver {

private QueryBuilderResolver() {}

public static void resolveQueryBuilders(LogicalPlan plan, TransportActionServices services, ActionListener<LogicalPlan> listener) {
var hasFullTextFunctions = plan.anyMatch(p -> {
Holder<Boolean> hasFullTextFunction = new Holder<>(false);
p.forEachExpression(FullTextFunction.class, unused -> hasFullTextFunction.set(true));
return hasFullTextFunction.get();
});
if (hasFullTextFunctions) {
Rewriteable.rewriteAndFetch(
new FullTextFunctionsRewritable(plan),
queryRewriteContext(services, indexNames(plan)),
listener.delegateFailureAndWrap((l, r) -> l.onResponse(r.plan))
);
} else {
listener.onResponse(plan);
}
}

private static QueryRewriteContext queryRewriteContext(TransportActionServices services, Set<String> indexNames) {
ResolvedIndices resolvedIndices = ResolvedIndices.resolveWithIndexNamesAndOptions(
indexNames.toArray(String[]::new),
IndexResolver.FIELD_CAPS_INDICES_OPTIONS,
services.clusterService().state(),
services.indexNameExpressionResolver(),
services.transportService().getRemoteClusterService(),
System.currentTimeMillis()
);

return services.searchService().getRewriteContext(System::currentTimeMillis, resolvedIndices, null);
}

private static Set<String> indexNames(LogicalPlan plan) {
Set<String> indexNames = new HashSet<>();
plan.forEachDown(EsRelation.class, esRelation -> indexNames.addAll(esRelation.concreteIndices()));
return indexNames;
}

private record FullTextFunctionsRewritable(LogicalPlan plan) implements Rewriteable<QueryBuilderResolver.FullTextFunctionsRewritable> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:
While this pattern (of using record) works, it's somewhat misleading - instead use a closure to encapsulate the method directly at the calling site to reference the plan:

Rewriteable<QueryBuilderResolver.FullTextFunctionsRewritable> closure = c -> {
  ...
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A nice proposal, but I'd like to keep it separated, just to maintain the legibility of that method (which would otherwise grow quite a bit).

@Override
public FullTextFunctionsRewritable rewrite(QueryRewriteContext ctx) throws IOException {
Holder<IOException> exceptionHolder = new Holder<>();
Holder<Boolean> updated = new Holder<>(false);
LogicalPlan newPlan = plan.transformExpressionsDown(FullTextFunction.class, f -> {
QueryBuilder builder = f.queryBuilder(), initial = builder;
builder = builder == null ? f.asQuery(TranslatorHandler.TRANSLATOR_HANDLER).asBuilder() : builder;
try {
builder = builder.rewrite(ctx);
} catch (IOException e) {
exceptionHolder.setIfAbsent(e);
}
var rewritten = builder != initial;
updated.set(updated.get() || rewritten);
return rewritten ? f.replaceQueryBuilder(builder) : f;
});
if (exceptionHolder.get() != null) {
throw exceptionHolder.get();
}
return updated.get() ? new FullTextFunctionsRewritable(newPlan) : this;
}
}
}
Loading