Skip to content
5 changes: 5 additions & 0 deletions docs/changelog/132833.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 132833
summary: Adding simulate ingest effective mapping
area: Ingest Node
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -2182,3 +2182,131 @@ setup:
- match: { docs.0.doc._index: "test" }
- match: { docs.0.doc._source.foo: "bar" }
- match: { docs.0.doc.error.type: "document_parsing_exception" }

---
"Test effective mapping":

# This creates two templates, where the first reroutes to the second. Then we simulate ingesting and make sure that
# the effective_mapping is for the index where the document eventually would land. Also, the second index is really
# a data stream, so we expect to see a @timestamp field.

- skip:
features:
- headers
- allowed_warnings

- do:
headers:
Content-Type: application/json
ingest.put_pipeline:
id: "reroute-pipeline"
body: >
{
"processors": [
{
"reroute": {
"destination": "second-index"
}
}
]
}
- match: { acknowledged: true }

- do:
allowed_warnings:
- "index template [first-index-template] has index patterns [first-index*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [first-index-template] will take precedence during new index creation"
indices.put_index_template:
name: first-index-template
body:
index_patterns: first-index*
template:
settings:
default_pipeline: "reroute-pipeline"
mappings:
dynamic: strict
properties:
foo:
type: text

- do:
allowed_warnings:
- "index template [second-index-template] has index patterns [second-index*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [second-index-template] will take precedence during new index creation"
indices.put_index_template:
name: second-index-template
body:
index_patterns: second-index*
template:
mappings:
dynamic: strict
properties:
bar:
type: text

- do:
indices.put_index_template:
name: second-index-template
body:
index_patterns: second-index*
template:
lifecycle:
data_retention: "7d"
mappings:
dynamic: strict
properties:
bar:
type: text
data_stream: {}

- do:
indices.create_data_stream:
name: second-index
- is_true: acknowledged

- do:
cluster.health:
wait_for_status: yellow

- do:
indices.put_data_stream_mappings:
name: second-index
body:
properties:
foo:
type: boolean

- match: { data_streams.0.applied_to_data_stream: true }

# Here is the meat of the test. We simulate ingesting into first-index, knowing it will be rerouted to second-index,
# which is actually a data stream. So we expect the effective_mapping to contain the fields from second-index
# (including the implicit @timestamp field) and not second-index. Plus, it ought to include fields from the
# mapping_addition that we pass in.
- do:
headers:
Content-Type: application/json
simulate.ingest:
body: >
{
"docs": [
{
"_index": "first-index",
"_id": "id",
"_source": {
"foo": "bar"
}
}
],
"mapping_addition": {
"dynamic": "strict",
"properties": {
"baz": {
"type": "keyword"
}
}
}
}
- length: { docs: 1 }
- match: { docs.0.doc._index: "second-index" }
- not_exists: docs.0.doc.effective_mapping._doc.properties.foo
- match: { docs.0.doc.effective_mapping._doc.properties.@timestamp.type: "date" }
- match: { docs.0.doc.effective_mapping._doc.properties.bar.type: "text" }
- match: { docs.0.doc.effective_mapping._doc.properties.baz.type: "keyword" }
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,7 @@ static TransportVersion def(int id) {
public static final TransportVersion EXTENDED_SNAPSHOT_STATS_IN_NODE_INFO = def(9_137_0_00);
public static final TransportVersion SIMULATE_INGEST_MAPPING_MERGE_TYPE = def(9_138_0_00);
public static final TransportVersion ESQL_LOOKUP_JOIN_ON_MANY_FIELDS = def(9_139_0_00);
public static final TransportVersion SIMULATE_INGEST_EFFECTIVE_MAPPING = def(9_140_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.features.FeatureService;
import org.elasticsearch.features.NodeFeature;
import org.elasticsearch.index.IndexSettingProvider;
Expand Down Expand Up @@ -144,14 +143,13 @@ protected void doInternalExecute(
DocWriteRequest<?> docRequest = bulkRequest.requests.get(i);
assert docRequest instanceof IndexRequest : "TransportSimulateBulkAction should only ever be called with IndexRequests";
IndexRequest request = (IndexRequest) docRequest;
Tuple<Collection<String>, Exception> validationResult = validateMappings(
ValidationResult validationResult = validateMappings(
componentTemplateSubstitutions,
indexTemplateSubstitutions,
mappingAddition,
request,
mappingMergeReason
);
Exception mappingValidationException = validationResult.v2();
responses.set(
i,
BulkItemResponse.success(
Expand All @@ -164,8 +162,9 @@ protected void doInternalExecute(
request.source(),
request.getContentType(),
request.getExecutedPipelines(),
validationResult.v1(),
mappingValidationException
validationResult.ignoredFields,
validationResult.validationException,
validationResult.effectiveMapping
)
)
);
Expand Down Expand Up @@ -193,7 +192,7 @@ private MapperService.MergeReason getMergeReason(String mergeType) {
* @return a Tuple containing: (1) in v1 the names of any fields that would be ignored upon indexing and (2) in v2 the mapping
* exception if the source does not match the mappings, otherwise null
*/
private Tuple<Collection<String>, Exception> validateMappings(
private ValidationResult validateMappings(
Map<String, ComponentTemplate> componentTemplateSubstitutions,
Map<String, ComposableIndexTemplate> indexTemplateSubstitutions,
Map<String, Object> mappingAddition,
Expand All @@ -211,6 +210,7 @@ private Tuple<Collection<String>, Exception> validateMappings(
);

ProjectMetadata project = projectResolver.getProjectMetadata(clusterService.state());
CompressedXContent effectiveMapping = null;
Exception mappingValidationException = null;
Collection<String> ignoredFields = List.of();
IndexAbstraction indexAbstraction = project.getIndicesLookup().get(request.index());
Expand All @@ -222,8 +222,8 @@ private Tuple<Collection<String>, Exception> validateMappings(
*/
IndexMetadata imd = project.getIndexSafe(indexAbstraction.getWriteIndex(request, project));
CompressedXContent mappings = Optional.ofNullable(imd.mapping()).map(MappingMetadata::source).orElse(null);
CompressedXContent mergedMappings = mappingAddition == null ? null : mergeMappings(mappings, mappingAddition);
ignoredFields = validateUpdatedMappingsFromIndexMetadata(imd, mergedMappings, request, sourceToParse, mappingMergeReason);
effectiveMapping = mappingAddition == null ? null : mergeMappings(mappings, mappingAddition);
ignoredFields = validateUpdatedMappingsFromIndexMetadata(imd, effectiveMapping, request, sourceToParse, mappingMergeReason);
} else {
/*
* The index did not exist, or we have component template substitutions, so we put together the mappings from existing
Expand Down Expand Up @@ -281,8 +281,8 @@ private Tuple<Collection<String>, Exception> validateMappings(
indexSettingProviders
);
CompressedXContent mappings = template.mappings();
CompressedXContent mergedMappings = mergeMappings(mappings, mappingAddition);
ignoredFields = validateUpdatedMappings(mappings, mergedMappings, request, sourceToParse, mappingMergeReason);
effectiveMapping = mergeMappings(mappings, mappingAddition);
ignoredFields = validateUpdatedMappings(mappings, effectiveMapping, request, sourceToParse, mappingMergeReason);
} else {
List<IndexTemplateMetadata> matchingTemplates = findV1Templates(simulatedProjectMetadata, request.index(), false);
if (matchingTemplates.isEmpty() == false) {
Expand All @@ -295,23 +295,27 @@ private Tuple<Collection<String>, Exception> validateMappings(
matchingTemplates.stream().map(IndexTemplateMetadata::getMappings).collect(toList()),
xContentRegistry
);
final CompressedXContent combinedMappings = mergeMappings(new CompressedXContent(mappingsMap), mappingAddition);
ignoredFields = validateUpdatedMappings(null, combinedMappings, request, sourceToParse, mappingMergeReason);
effectiveMapping = mergeMappings(new CompressedXContent(mappingsMap), mappingAddition);
ignoredFields = validateUpdatedMappings(null, effectiveMapping, request, sourceToParse, mappingMergeReason);
} else {
/*
* The index matched no templates and had no mapping of its own. If there were component template substitutions
* or index template substitutions, they didn't match anything. So just apply the mapping addition if it exists,
* and validate.
*/
final CompressedXContent combinedMappings = mergeMappings(null, mappingAddition);
ignoredFields = validateUpdatedMappings(null, combinedMappings, request, sourceToParse, mappingMergeReason);
effectiveMapping = mergeMappings(null, mappingAddition);
ignoredFields = validateUpdatedMappings(null, effectiveMapping, request, sourceToParse, mappingMergeReason);
}
}
}
} catch (Exception e) {
mappingValidationException = e;
}
return Tuple.tuple(ignoredFields, mappingValidationException);
return new ValidationResult(effectiveMapping, mappingValidationException, ignoredFields);
}

private record ValidationResult(CompressedXContent effectiveMapping, Exception validationException, Collection<String> ignoredFields) {

}

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.action.bulk.IndexDocFailureStoreStatus;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentHelper;
Expand All @@ -26,6 +27,7 @@
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;

/**
* This is an IndexResponse that is specifically for simulate requests. Unlike typical IndexResponses, we need to include the original
Expand All @@ -37,6 +39,7 @@ public class SimulateIndexResponse extends IndexResponse {
private final XContentType sourceXContentType;
private final Collection<String> ignoredFields;
private final Exception exception;
private final CompressedXContent effectiveMapping;

@SuppressWarnings("this-escape")
public SimulateIndexResponse(StreamInput in) throws IOException {
Expand All @@ -54,6 +57,15 @@ public SimulateIndexResponse(StreamInput in) throws IOException {
} else {
this.ignoredFields = List.of();
}
if (in.getTransportVersion().onOrAfter(TransportVersions.SIMULATE_INGEST_EFFECTIVE_MAPPING)) {
if (in.readBoolean()) {
this.effectiveMapping = CompressedXContent.readCompressedString(in);
} else {
this.effectiveMapping = null;
}
} else {
effectiveMapping = null;
}
}

@SuppressWarnings("this-escape")
Expand All @@ -65,7 +77,8 @@ public SimulateIndexResponse(
XContentType sourceXContentType,
List<String> pipelines,
Collection<String> ignoredFields,
@Nullable Exception exception
@Nullable Exception exception,
@Nullable CompressedXContent effectiveMapping
) {
// We don't actually care about most of the IndexResponse fields:
super(
Expand All @@ -83,6 +96,7 @@ public SimulateIndexResponse(
setShardInfo(ShardInfo.EMPTY);
this.ignoredFields = ignoredFields;
this.exception = exception;
this.effectiveMapping = effectiveMapping;
}

@Override
Expand All @@ -108,6 +122,14 @@ public XContentBuilder innerToXContent(XContentBuilder builder, Params params) t
ElasticsearchException.generateThrowableXContent(builder, params, exception);
builder.endObject();
}
if (effectiveMapping == null) {
builder.field("effective_mapping", Map.of());
} else {
builder.field(
"effective_mapping",
XContentHelper.convertToMap(effectiveMapping.uncompressed(), true, builder.contentType()).v2()
);
}
return builder;
}

Expand All @@ -127,6 +149,12 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().onOrAfter(TransportVersions.SIMULATE_IGNORED_FIELDS)) {
out.writeStringCollection(ignoredFields);
}
if (out.getTransportVersion().onOrAfter(TransportVersions.SIMULATE_INGEST_EFFECTIVE_MAPPING)) {
out.writeBoolean(effectiveMapping != null);
if (effectiveMapping != null) {
effectiveMapping.writeTo(out);
}
}
}

public Exception getException() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,8 @@ public void onResponse(BulkResponse response) {
"_index": "%s",
"_version": -3,
"_source": %s,
"executed_pipelines": [%s]
"executed_pipelines": [%s],
"effective_mapping":{}
}""",
indexRequest.id(),
indexRequest.index(),
Expand Down Expand Up @@ -319,7 +320,8 @@ public void onResponse(BulkResponse response) {
"_version": -3,
"_source": %s,
"executed_pipelines": [%s],
"error":{"type":"exception","reason":"invalid mapping"}
"error":{"type":"exception","reason":"invalid mapping"},
"effective_mapping":{"_doc":{"dynamic":"strict"}}
}""",
indexRequest.id(),
indexName,
Expand All @@ -346,7 +348,8 @@ public void onResponse(BulkResponse response) {
"_index": "%s",
"_version": -3,
"_source": %s,
"executed_pipelines": [%s]
"executed_pipelines": [%s],
"effective_mapping":{"_doc":{"dynamic":"strict"}}
}""",
indexRequest.id(),
indexName,
Expand All @@ -373,7 +376,9 @@ public void onFailure(Exception e) {
};
when(indicesService.withTempIndexService(any(), any())).thenAnswer((Answer<?>) invocation -> {
IndexMetadata imd = invocation.getArgument(0);
if (indicesWithInvalidMappings.contains(imd.getIndex().getName())) {
if (indicesWithInvalidMappings.contains(imd.getIndex().getName())
// We only want to throw exceptions inside TransportSimulateBulkAction:
&& invocation.getArgument(1).getClass().getSimpleName().contains(TransportSimulateBulkAction.class.getSimpleName())) {
throw new ElasticsearchException("invalid mapping");
} else {
// we don't actually care what is returned, as long as no exception is thrown the request is considered valid:
Expand Down
Loading