Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/113462.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 113462
summary: Suppress merge-on-recovery for older indices
area: CRUD
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.IndexVersions;
import org.elasticsearch.index.analysis.AbstractTokenFilterFactory;
import org.elasticsearch.index.analysis.TokenFilterFactory;
import org.elasticsearch.index.engine.Engine;
Expand Down Expand Up @@ -120,6 +121,7 @@
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.engine.MockEngineSupport;
import org.elasticsearch.test.index.IndexVersionUtils;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TestTransportChannel;
Expand Down Expand Up @@ -2036,6 +2038,69 @@ public Settings onNodeStopped(String nodeName) {
assertBusy(() -> assertThat(searchableSegmentCountSupplier.getAsLong(), lessThan((long) initialSegmentCount)));
}

@Override
protected boolean forbidPrivateIndexSettings() {
return false; // need to set index.version.created to test difference in behaviour on older indices
}

public void testPostRecoveryMergeDisabledOnOlderIndices() throws Exception {
internalCluster().startMasterOnlyNode();
final var dataNode = internalCluster().startDataOnlyNode();
final var indexName = randomIdentifier();
createIndex(
indexName,
indexSettings(1, 0).put(INDEX_MERGE_ENABLED, false)
.put(
IndexMetadata.SETTING_VERSION_CREATED,
IndexVersionUtils.randomVersionBetween(
random(),
IndexVersionUtils.getFirstVersion(),
IndexVersionUtils.getPreviousVersion(IndexVersions.MERGE_ON_RECOVERY_VERSION)
)
)
.build()
);

final var initialSegmentCount = 20;
for (int i = 0; i < initialSegmentCount; i++) {
indexDoc(indexName, Integer.toString(i), "f", randomAlphaOfLength(10));
refresh(indexName); // force a one-doc segment
}
flush(indexName); // commit all the one-doc segments

final LongSupplier searchableSegmentCountSupplier = () -> indicesAdmin().prepareSegments(indexName)
.get(SAFE_AWAIT_TIMEOUT)
.getIndices()
.get(indexName)
.getShards()
.get(0)
.shards()[0].getSegments()
.stream()
.filter(Segment::isSearch)
.count();

assertEquals(initialSegmentCount, searchableSegmentCountSupplier.getAsLong());

// force a recovery by restarting the node, re-enabling merges while the node is down
internalCluster().restartNode(dataNode, new InternalTestCluster.RestartCallback() {
@Override
public Settings onNodeStopped(String nodeName) {
final var request = new UpdateSettingsRequest(Settings.builder().putNull(INDEX_MERGE_ENABLED).build(), indexName);
request.reopen(true);
safeGet(indicesAdmin().updateSettings(request));
return Settings.EMPTY;
}
});

ensureGreen(indexName);
final var mergeStats = indicesAdmin().prepareStats(indexName).clear().setMerge(true).get().getIndex(indexName).getShards()[0]
.getStats()
.getMerge();
assertEquals(0, mergeStats.getCurrent());
assertEquals(0, mergeStats.getTotal());
assertEquals(initialSegmentCount, searchableSegmentCountSupplier.getAsLong());
}

private void assertGlobalCheckpointIsStableAndSyncedInAllNodes(String indexName, List<String> nodes, int shard) throws Exception {
assertThat(nodes, is(not(empty())));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ private static IndexVersion def(int id, Version luceneVersion) {
public static final IndexVersion INDEX_SORTING_ON_NESTED = def(8_512_00_0, Version.LUCENE_9_11_1);
public static final IndexVersion LENIENT_UPDATEABLE_SYNONYMS = def(8_513_00_0, Version.LUCENE_9_11_1);
public static final IndexVersion ENABLE_IGNORE_MALFORMED_LOGSDB = def(8_514_00_0, Version.LUCENE_9_11_1);
public static final IndexVersion MERGE_ON_RECOVERY_VERSION = def(8_515_00_0, Version.LUCENE_9_11_1);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -896,7 +896,7 @@ public void createShard(
indexShard.startRecovery(
recoveryState,
recoveryTargetService,
postRecoveryMerger.maybeMergeAfterRecovery(shardRouting, recoveryListener),
postRecoveryMerger.maybeMergeAfterRecovery(indexService.getMetadata(), shardRouting, recoveryListener),
repositoriesService,
(mapping, listener) -> {
assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@

import org.apache.lucene.index.IndexWriter;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThrottledTaskRunner;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Strings;
import org.elasticsearch.index.IndexVersions;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardLongFieldRange;
Expand Down Expand Up @@ -44,14 +46,14 @@ class PostRecoveryMerger {
private static final boolean TRIGGER_MERGE_AFTER_RECOVERY;

static {
final var propertyValue = System.getProperty("es.trigger_merge_after_recovery");
final var propertyValue = System.getProperty("es.trigger_merge_after_recovery_8_515_00_0");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renaming the kill-switch here to nullify the overrides that are in place today and automatically re-enable this feature when upgrading. We can rename it back again later.

if (propertyValue == null) {
TRIGGER_MERGE_AFTER_RECOVERY = true;
} else if ("false".equals(propertyValue)) {
TRIGGER_MERGE_AFTER_RECOVERY = false;
} else {
throw new IllegalStateException(
"system property [es.trigger_merge_after_recovery] may only be set to [false], but was [" + propertyValue + "]"
"system property [es.trigger_merge_after_recovery_8_515_00_0] may only be set to [false], but was [" + propertyValue + "]"
);
}
}
Expand Down Expand Up @@ -81,6 +83,7 @@ class PostRecoveryMerger {
}

PeerRecoveryTargetService.RecoveryListener maybeMergeAfterRecovery(
IndexMetadata indexMetadata,
ShardRouting shardRouting,
PeerRecoveryTargetService.RecoveryListener recoveryListener
) {
Expand All @@ -92,6 +95,10 @@ PeerRecoveryTargetService.RecoveryListener maybeMergeAfterRecovery(
return recoveryListener;
}

if (indexMetadata.getCreationVersion().before(IndexVersions.MERGE_ON_RECOVERY_VERSION)) {
return recoveryListener;
}

final var shardId = shardRouting.shardId();
return new PeerRecoveryTargetService.RecoveryListener() {
@Override
Expand Down