Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
b336579
Retry unsuccessful snapshot deletions
nicktindall Sep 20, 2024
8ab9763
Disable AWS client retries
nicktindall Sep 20, 2024
01fd52c
Update docs/changelog/113237.yaml
nicktindall Sep 20, 2024
77273f8
Make delay and retries configurable, re-acquire client before each retry
nicktindall Sep 23, 2024
24bfb25
Merge remote-tracking branch 'origin/main' into bugfix/ES-8562_retry_…
nicktindall Sep 23, 2024
7c3d9f9
Use client-reference-per-batch, remove test for aborting on closed repo
nicktindall Sep 23, 2024
17118de
Add test for retry delete count histogram
nicktindall Sep 24, 2024
2773f44
Tidy
nicktindall Sep 24, 2024
5b940b3
Record metrics when retries are exhausted
nicktindall Sep 24, 2024
87d37bd
Merge remote-tracking branch 'origin/main' into bugfix/ES-8562_retry_…
nicktindall Sep 24, 2024
b70a1c9
Update changelog
nicktindall Sep 24, 2024
4103273
Merge remote-tracking branch 'origin/main' into bugfix/ES-8562_retry_…
nicktindall Sep 24, 2024
704c499
Merge branch 'main' into bugfix/ES-8562_retry_unsuccessful_snapshot_d…
nicktindall Sep 25, 2024
f8e610e
Use new BackoffPolicy package
nicktindall Sep 25, 2024
53dad4b
Assert interrupt is preserved
nicktindall Oct 9, 2024
52a315b
Be specific about the retries that are being aborted
nicktindall Oct 9, 2024
c579c56
Make S3ErrorResponse a record
nicktindall Oct 9, 2024
b8467af
Fix naming
nicktindall Oct 9, 2024
04dd6b2
Merge remote-tracking branch 'origin/main' into bugfix/ES-8562_retry_…
nicktindall Oct 9, 2024
3ed3bc4
Use linear back-off, document back-off parameters, add details to tim…
nicktindall Oct 11, 2024
882d6c0
Merge remote-tracking branch 'origin/main' into bugfix/ES-8562_retry_…
nicktindall Oct 11, 2024
df5585e
Grammar fix(?)
nicktindall Oct 11, 2024
75c4674
Update docs/reference/snapshot-restore/repository-s3.asciidoc
nicktindall Oct 11, 2024
f186512
Update modules/repository-s3/src/main/java/org/elasticsearch/reposito…
nicktindall Oct 11, 2024
e6b4940
Update modules/repository-s3/src/main/java/org/elasticsearch/reposito…
nicktindall Oct 11, 2024
8868f2b
Change default for maximum delay
nicktindall Oct 11, 2024
c1c74e2
Update docs/reference/snapshot-restore/repository-s3.asciidoc
nicktindall Oct 11, 2024
ab10b44
Add test for unlimited linear backoff
nicktindall Oct 11, 2024
1e207e0
Change unit for maximum delay
nicktindall Oct 11, 2024
7c5a2d8
Merge remote-tracking branch 'origin/main' into bugfix/ES-8562_retry_…
nicktindall Oct 11, 2024
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/113237.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 113237
summary: Retry throttled snapshot deletions
area: Snapshot/Restore
type: bug
issues: []
14 changes: 14 additions & 0 deletions docs/reference/snapshot-restore/repository-s3.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,20 @@ include::repository-shared-settings.asciidoc[]
`1000` which is the maximum number supported by the https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListMultipartUploads.html[AWS
ListMultipartUploads API]. If set to `0`, {es} will not attempt to clean up dangling multipart uploads.

`throttled_delete_retry.delay_increment`::

(<<time-units,time value>>) This value is used as the delay before the first retry and the amount the delay is incremented by on each subsequent retry. Default is 50ms, minimum is 0ms.

`throttled_delete_retry.maximum_delay`::

(<<time-units,time value>>) This is the upper bound on how long the delays between retries will grow to. Default is 5s, minimum is 0ms.

`throttled_delete_retry.maximum_number_of_retries`::

(integer) Sets the number times to retry a throttled snapshot deletion. Defaults to `10`, minimum value is `0` which
will disable retries altogether. Note that if retries are enabled in the Azure client, each of these retries
comprises that many client-level retries.

NOTE: The option of defining client settings in the repository settings as
documented below is considered deprecated, and will be removed in a future
version.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
Expand All @@ -31,13 +32,16 @@
import org.elasticsearch.test.ESIntegTestCase;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

import static org.elasticsearch.repositories.RepositoriesMetrics.HTTP_REQUEST_TIME_IN_MILLIS_HISTOGRAM;
import static org.elasticsearch.repositories.RepositoriesMetrics.METRIC_EXCEPTIONS_HISTOGRAM;
Expand All @@ -48,9 +52,11 @@
import static org.elasticsearch.repositories.RepositoriesMetrics.METRIC_THROTTLES_HISTOGRAM;
import static org.elasticsearch.repositories.RepositoriesMetrics.METRIC_THROTTLES_TOTAL;
import static org.elasticsearch.repositories.RepositoriesMetrics.METRIC_UNSUCCESSFUL_OPERATIONS_TOTAL;
import static org.elasticsearch.repositories.s3.S3RepositoriesMetrics.METRIC_DELETE_RETRIES_HISTOGRAM;
import static org.elasticsearch.rest.RestStatus.INTERNAL_SERVER_ERROR;
import static org.elasticsearch.rest.RestStatus.NOT_FOUND;
import static org.elasticsearch.rest.RestStatus.REQUESTED_RANGE_NOT_SATISFIED;
import static org.elasticsearch.rest.RestStatus.SERVICE_UNAVAILABLE;
import static org.elasticsearch.rest.RestStatus.TOO_MANY_REQUESTS;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
Expand All @@ -61,14 +67,22 @@
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST)
public class S3BlobStoreRepositoryMetricsTests extends S3BlobStoreRepositoryTests {

private final Queue<RestStatus> errorStatusQueue = new LinkedBlockingQueue<>();
private static final S3ErrorResponse S3_SLOW_DOWN_RESPONSE = new S3ErrorResponse(SERVICE_UNAVAILABLE, """
<?xml version="1.0" encoding="UTF-8"?>
<Error>
<Code>SlowDown</Code>
<Message>This is a throttling message</Message>
<Resource>/bucket/</Resource>
<RequestId>4442587FB7D0A2F9</RequestId>
</Error>""");
private final Queue<S3ErrorResponse> errorResponseQueue = new LinkedBlockingQueue<>();

// Always create erroneous handler
@Override
protected Map<String, HttpHandler> createHttpHandlers() {
return Collections.singletonMap(
"/bucket",
new S3StatsCollectorHttpHandler(new S3MetricErroneousHttpHandler(new S3BlobStoreHttpHandler("bucket"), errorStatusQueue))
new S3StatsCollectorHttpHandler(new S3MetricErroneousHttpHandler(new S3BlobStoreHttpHandler("bucket"), errorResponseQueue))
);
}

Expand Down Expand Up @@ -244,8 +258,74 @@ public void testMetricsForRequestRangeNotSatisfied() {
}
}

public void testRetrySnapshotDeleteMetricsOnEventualSuccess() throws IOException {
final int maxRetries = 5;
final String repositoryName = randomRepositoryName();
// Disable retries in the client for this repo
createRepository(
repositoryName,
Settings.builder()
.put(repositorySettings(repositoryName))
.put(S3ClientSettings.MAX_RETRIES_SETTING.getConcreteSettingForNamespace("placeholder").getKey(), 0)
Copy link
Contributor Author

@nicktindall nicktindall Sep 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems as though putting client settings in the repository settings like this is deprecated, and/or I've done it wrong. Any advice on how to override AWS client internal retries on a per-test basis would be appreciated.

I tried making a different client with that setting in the node settings (set at the class level), but there are a lot of additional settings configured for the test client that I'd need to duplicate to make that work.

Copy link
Contributor Author

@nicktindall nicktindall Sep 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels like the correct way to do this would be to change the client settings in S3BlobStoreRepositoryTests to be configured for default client, allowing them to be selectively overridden in individual tests. But that's a larger change. Perhaps there's an easier/better way?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's ok in tests, we don't have a plan for removing this apparently-deprecated functionality any time soon.

.put(S3Repository.RETRY_THROTTLED_DELETE_DELAY_INCREMENT.getKey(), TimeValue.timeValueMillis(10))
.put(S3Repository.RETRY_THROTTLED_DELETE_MAX_NUMBER_OF_RETRIES.getKey(), maxRetries)
.build(),
false
);
final String dataNodeName = internalCluster().getNodeNameThat(DiscoveryNode::canContainData);
final BlobContainer blobContainer = getBlobContainer(dataNodeName, repositoryName);
final TestTelemetryPlugin plugin = getPlugin(dataNodeName);
final int numberOfDeletes = randomIntBetween(1, 3);
final List<Long> numberOfRetriesPerAttempt = new ArrayList<>();
for (int i = 0; i < numberOfDeletes; i++) {
int numFailures = randomIntBetween(1, maxRetries);
numberOfRetriesPerAttempt.add((long) numFailures);
IntStream.range(0, numFailures).forEach(ignored -> addErrorStatus(S3_SLOW_DOWN_RESPONSE));
blobContainer.deleteBlobsIgnoringIfNotExists(
randomFrom(OperationPurpose.SNAPSHOT_DATA, OperationPurpose.SNAPSHOT_METADATA),
List.of(randomIdentifier()).iterator()
);
}
List<Measurement> longHistogramMeasurement = plugin.getLongHistogramMeasurement(METRIC_DELETE_RETRIES_HISTOGRAM);
assertThat(longHistogramMeasurement.stream().map(Measurement::getLong).toList(), equalTo(numberOfRetriesPerAttempt));
}

public void testRetrySnapshotDeleteMetricsWhenRetriesExhausted() {
final String repositoryName = randomRepositoryName();
// Disable retries in the client for this repo
int maxRetries = 3;
createRepository(
repositoryName,
Settings.builder()
.put(repositorySettings(repositoryName))
.put(S3ClientSettings.MAX_RETRIES_SETTING.getConcreteSettingForNamespace("placeholder").getKey(), 0)
.put(S3Repository.RETRY_THROTTLED_DELETE_DELAY_INCREMENT.getKey(), TimeValue.timeValueMillis(10))
.put(S3Repository.RETRY_THROTTLED_DELETE_MAX_NUMBER_OF_RETRIES.getKey(), maxRetries)
.build(),
false
);
final String dataNodeName = internalCluster().getNodeNameThat(DiscoveryNode::canContainData);
final BlobContainer blobContainer = getBlobContainer(dataNodeName, repositoryName);
final TestTelemetryPlugin plugin = getPlugin(dataNodeName);
// Keep throttling past the max number of retries
IntStream.range(0, maxRetries + 1).forEach(ignored -> addErrorStatus(S3_SLOW_DOWN_RESPONSE));
assertThrows(
IOException.class,
() -> blobContainer.deleteBlobsIgnoringIfNotExists(
randomFrom(OperationPurpose.SNAPSHOT_DATA, OperationPurpose.SNAPSHOT_METADATA),
List.of(randomIdentifier()).iterator()
)
);
List<Measurement> longHistogramMeasurement = plugin.getLongHistogramMeasurement(METRIC_DELETE_RETRIES_HISTOGRAM);
assertThat(longHistogramMeasurement.get(0).getLong(), equalTo(3L));
}

private void addErrorStatus(RestStatus... statuses) {
errorStatusQueue.addAll(Arrays.asList(statuses));
errorResponseQueue.addAll(Arrays.stream(statuses).map(S3ErrorResponse::new).toList());
}

private void addErrorStatus(S3ErrorResponse... responses) {
errorResponseQueue.addAll(Arrays.asList(responses));
}

private long getLongCounterValue(TestTelemetryPlugin plugin, String instrumentName, Operation operation) {
Expand Down Expand Up @@ -275,25 +355,25 @@ private long getLongHistogramValue(TestTelemetryPlugin plugin, String instrument
private static class S3MetricErroneousHttpHandler implements DelegatingHttpHandler {

private final HttpHandler delegate;
private final Queue<RestStatus> errorStatusQueue;
private final Queue<S3ErrorResponse> errorResponseQueue;

S3MetricErroneousHttpHandler(HttpHandler delegate, Queue<RestStatus> errorStatusQueue) {
S3MetricErroneousHttpHandler(HttpHandler delegate, Queue<S3ErrorResponse> errorResponseQueue) {
this.delegate = delegate;
this.errorStatusQueue = errorStatusQueue;
this.errorResponseQueue = errorResponseQueue;
}

@Override
public void handle(HttpExchange exchange) throws IOException {
final RestStatus status = errorStatusQueue.poll();
if (status == null) {
final S3ErrorResponse errorResponse = errorResponseQueue.poll();
if (errorResponse == null) {
delegate.handle(exchange);
} else if (status == INTERNAL_SERVER_ERROR) {
} else if (errorResponse.status == INTERNAL_SERVER_ERROR) {
// Simulate an retryable exception
throw new IOException("ouch");
} else {
try (exchange) {
drainInputStream(exchange.getRequestBody());
exchange.sendResponseHeaders(status.getStatus(), -1);
errorResponse.writeResponse(exchange);
}
}
}
Expand All @@ -302,4 +382,22 @@ public HttpHandler getDelegate() {
return delegate;
}
}

record S3ErrorResponse(RestStatus status, String responseBody) {

S3ErrorResponse(RestStatus status) {
this(status, null);
}

@SuppressForbidden(reason = "this test uses a HttpServer to emulate an S3 endpoint")
public void writeResponse(HttpExchange exchange) throws IOException {
if (responseBody != null) {
byte[] responseBytes = responseBody.getBytes(StandardCharsets.UTF_8);
exchange.sendResponseHeaders(status.getStatus(), responseBytes.length);
exchange.getResponseBody().write(responseBytes);
} else {
exchange.sendResponseHeaders(status.getStatus(), -1);
}
}
}
}
Loading