Skip to content
6 changes: 6 additions & 0 deletions docs/changelog/138228.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 138228
summary: "Fix: Downsample returns appropriate error when target index gets deleted\
\ unexpectedly."
area: Downsampling
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -133,20 +133,19 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc

/**
* This is the cluster state task executor for cluster state update actions.
* Visible for testing
*/
private static final SimpleBatchedExecutor<DownsampleClusterStateUpdateTask, Void> STATE_UPDATE_TASK_EXECUTOR =
new SimpleBatchedExecutor<>() {
@Override
public Tuple<ClusterState, Void> executeTask(DownsampleClusterStateUpdateTask task, ClusterState clusterState)
throws Exception {
return Tuple.tuple(task.execute(clusterState), null);
}
static final SimpleBatchedExecutor<DownsampleClusterStateUpdateTask, Void> STATE_UPDATE_TASK_EXECUTOR = new SimpleBatchedExecutor<>() {
@Override
public Tuple<ClusterState, Void> executeTask(DownsampleClusterStateUpdateTask task, ClusterState clusterState) throws Exception {
return Tuple.tuple(task.execute(clusterState), null);
}

@Override
public void taskSucceeded(DownsampleClusterStateUpdateTask task, Void unused) {
task.listener.onResponse(AcknowledgedResponse.TRUE);
}
};
@Override
public void taskSucceeded(DownsampleClusterStateUpdateTask task, Void unused) {
task.listener.onResponse(AcknowledgedResponse.TRUE);
}
};

@Inject
public TransportDownsampleAction(
Expand Down Expand Up @@ -1106,7 +1105,6 @@ public void onResponse(final AcknowledgedResponse response) {

@Override
public void onFailure(Exception e) {
recordSuccessMetrics(startTime); // Downsampling has already completed in all shards.
listener.onFailure(e);
}

Expand Down Expand Up @@ -1161,6 +1159,11 @@ public ClusterState execute(ClusterState currentState) {
logger.debug("Updating downsample index status for [{}]", downsampleIndexName);
final ProjectMetadata project = currentState.metadata().getProject(projectId);
final IndexMetadata downsampleIndex = project.index(downsampleIndexName);
if (downsampleIndex == null) {
throw new IllegalStateException(
"Failed to update downsample status because [" + downsampleIndexName + "] does not exist"
);
}
if (IndexMetadata.INDEX_DOWNSAMPLE_STATUS.get(downsampleIndex.getSettings()) == DownsampleTaskStatus.SUCCESS) {
return currentState;
}
Expand All @@ -1182,7 +1185,6 @@ public ClusterState execute(ClusterState currentState) {

@Override
public void onFailure(Exception e) {
recordSuccessMetrics(startTime); // Downsampling has already completed in all shards.
actionListener.onFailure(e);
}

Expand Down Expand Up @@ -1255,8 +1257,8 @@ public void onResponse(final AcknowledgedResponse response) {

@Override
public void onFailure(Exception e) {
recordSuccessMetrics(startTime);
logger.debug("Downsampling measured successfully", e);
recordFailureMetrics(startTime);
logger.debug("Downsampling failure measured successfully", e);
this.actionListener.onFailure(e);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.routing.allocation.DataTier;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.ClusterStateTaskExecutorUtils;
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.settings.IndexScopedSettings;
Expand Down Expand Up @@ -56,14 +57,14 @@
import org.elasticsearch.xpack.core.downsample.DownsampleShardIndexerStatus;
import org.elasticsearch.xpack.core.downsample.DownsampleShardPersistentTaskState;
import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Before;
import org.mockito.Answers;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.stubbing.Answer;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -173,12 +174,11 @@ public void setUp() throws Exception {
doAnswer(mockBroadcastResponse).when(indicesAdminClient).refresh(any(), any());
doAnswer(mockBroadcastResponse).when(indicesAdminClient).flush(any(), any());

// Mocks for updating downsampling metadata
doAnswer(invocation -> {
var updateTask = invocation.getArgument(1, TransportDownsampleAction.DownsampleClusterStateUpdateTask.class);
updateTask.listener.onResponse(randomBoolean() ? AcknowledgedResponse.TRUE : AcknowledgedResponse.FALSE);
updateTask.listener.onResponse(AcknowledgedResponse.TRUE);
return null;
}).when(taskQueue).submitTask(startsWith("update-downsample-metadata"), any(), any());
}).when(taskQueue).submitTask(startsWith("create-downsample-index"), any(), any());

// Mocks for mapping retrieval & merging
when(indicesService.createIndexMapperServiceForValidation(any())).thenReturn(mapperService);
Expand Down Expand Up @@ -219,11 +219,6 @@ public void testDownsampling() {

when(projectResolver.getProjectMetadata(any(ClusterState.class))).thenReturn(projectMetadata);

doAnswer(invocation -> {
var updateTask = invocation.getArgument(1, TransportDownsampleAction.DownsampleClusterStateUpdateTask.class);
updateTask.listener.onResponse(AcknowledgedResponse.TRUE);
return null;
}).when(taskQueue).submitTask(startsWith("create-downsample-index"), any(), any());
Answer<Void> mockPersistentTask = invocation -> {
ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> listener = invocation.getArgument(4);
PersistentTasksCustomMetadata.PersistentTask<?> task1 = mock(PersistentTasksCustomMetadata.PersistentTask.class);
Expand All @@ -243,6 +238,7 @@ public void testDownsampling() {
listener.onResponse(AcknowledgedResponse.TRUE);
return null;
}).when(indicesAdminClient).updateSettings(any(), any());
assertSuccessfulUpdateDownsampleStatus(clusterState);

PlainActionFuture<AcknowledgedResponse> listener = new PlainActionFuture<>();
action.masterOperation(
Expand Down Expand Up @@ -273,6 +269,7 @@ public void testDownsamplingWithShortCircuitAfterCreation() {
.build();

when(projectResolver.getProjectMetadata(any(ClusterState.class))).thenReturn(projectMetadata);
assertSuccessfulUpdateDownsampleStatus(clusterState);

PlainActionFuture<AcknowledgedResponse> listener = new PlainActionFuture<>();
action.masterOperation(
Expand All @@ -291,7 +288,7 @@ public void testDownsamplingWithShortCircuitAfterCreation() {
verifyIndexFinalisation();
}

public void testDownsamplingWithShortCircuitDuringCreation() throws IOException {
public void testDownsamplingWithShortCircuitDuringCreation() {
var projectMetadata = ProjectMetadata.builder(projectId)
.put(createSourceIndexMetadata(sourceIndex, primaryShards, replicaShards))
.build();
Expand All @@ -315,6 +312,7 @@ public void testDownsamplingWithShortCircuitDuringCreation() throws IOException
)
.build()
);
assertSuccessfulUpdateDownsampleStatus(clusterService.state());

PlainActionFuture<AcknowledgedResponse> listener = new PlainActionFuture<>();
action.masterOperation(
Expand All @@ -333,6 +331,76 @@ public void testDownsamplingWithShortCircuitDuringCreation() throws IOException
verifyIndexFinalisation();
}

public void testDownsamplingWhenTargetIndexGetsDeleted() {
var projectMetadata = ProjectMetadata.builder(projectId)
.put(createSourceIndexMetadata(sourceIndex, primaryShards, replicaShards))
.build();

var clusterState = ClusterState.builder(ClusterState.EMPTY_STATE)
.putProjectMetadata(projectMetadata)
.blocks(ClusterBlocks.builder().addIndexBlock(projectId, sourceIndex, IndexMetadata.INDEX_WRITE_BLOCK))
.build();

when(projectResolver.getProjectMetadata(any(ClusterState.class))).thenReturn(projectMetadata);

Answer<Void> mockPersistentTask = invocation -> {
ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> listener = invocation.getArgument(4);
PersistentTasksCustomMetadata.PersistentTask<?> task1 = mock(PersistentTasksCustomMetadata.PersistentTask.class);
when(task1.getId()).thenReturn(randomAlphaOfLength(10));
DownsampleShardPersistentTaskState runningTaskState = new DownsampleShardPersistentTaskState(
DownsampleShardIndexerStatus.COMPLETED,
null
);
when(task1.getState()).thenReturn(runningTaskState);
listener.onResponse(task1);
return null;
};
doAnswer(mockPersistentTask).when(persistentTaskService).sendStartRequest(anyString(), anyString(), any(), any(), any());
doAnswer(mockPersistentTask).when(persistentTaskService).waitForPersistentTaskCondition(any(), anyString(), any(), any(), any());
doAnswer(invocation -> {
var listener = invocation.getArgument(1, TransportDownsampleAction.UpdateDownsampleIndexSettingsActionListener.class);
listener.onResponse(AcknowledgedResponse.TRUE);
return null;
}).when(indicesAdminClient).updateSettings(any(), any());

doAnswer(invocation -> {
var updateTask = invocation.getArgument(1, TransportDownsampleAction.DownsampleClusterStateUpdateTask.class);
ClusterStateTaskExecutorUtils.executeHandlingResults(
clusterState,
TransportDownsampleAction.STATE_UPDATE_TASK_EXECUTOR,
List.of(updateTask),
task1 -> {},
TransportDownsampleAction.DownsampleClusterStateUpdateTask::onFailure
);
return null;
}).when(taskQueue).submitTask(startsWith("update-downsample-metadata"), any(), any());
IllegalStateException error = safeAwaitFailure(
IllegalStateException.class,
AcknowledgedResponse.class,
listener -> action.masterOperation(
task,
new DownsampleAction.Request(
ESTestCase.TEST_REQUEST_TIMEOUT,
sourceIndex,
targetIndex,
TimeValue.ONE_HOUR,
new DownsampleConfig(new DateHistogramInterval("5m"), randomSamplingMethod())
),
clusterState,
listener
)
);
assertThat(
error.getMessage(),
Matchers.startsWith("Failed to update downsample status because [" + targetIndex + "] does not exist")
);
verify(downsampleMetrics, never()).recordOperation(anyLong(), eq(DownsampleMetrics.ActionStatus.SUCCESS));
verify(downsampleMetrics).recordOperation(anyLong(), eq(DownsampleMetrics.ActionStatus.FAILED));
verify(indicesAdminClient).refresh(any(), any());
verify(indicesAdminClient, never()).flush(any(), any());
verify(indicesAdminClient, never()).forceMerge(any(), any());
}

private void verifyIndexFinalisation() {
verify(downsampleMetrics).recordOperation(anyLong(), eq(DownsampleMetrics.ActionStatus.SUCCESS));
verify(indicesAdminClient).refresh(any(), any());
Expand Down Expand Up @@ -476,4 +544,21 @@ public void testGetSupportedMetrics() {
assertThat(supported.defaultMetric(), is("max"));
assertThat(supported.supportedMetrics(), is(List.of(metricType.supportedAggs())));
}

private void assertSuccessfulUpdateDownsampleStatus(ClusterState clusterState) {
var projectMetadata = ProjectMetadata.builder(clusterState.metadata().getProject(projectId))
.put(createSourceIndexMetadata(targetIndex, primaryShards, replicaShards))
.build();

var updatedClusterState = ClusterState.builder(clusterState).putProjectMetadata(projectMetadata).build();
doAnswer(invocation -> {
var updateTask = invocation.getArgument(1, TransportDownsampleAction.DownsampleClusterStateUpdateTask.class);
ClusterStateTaskExecutorUtils.executeAndAssertSuccessful(
updatedClusterState,
TransportDownsampleAction.STATE_UPDATE_TASK_EXECUTOR,
List.of(updateTask)
);
return null;
}).when(taskQueue).submitTask(startsWith("update-downsample-metadata"), any(), any());
}
}