Skip to content
5 changes: 5 additions & 0 deletions docs/changelog/139228.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 139228
summary: S3 `compareAndExchange` using conditional writes
area: Snapshot/Restore
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ protected void createRepository(String repoName) {
settings.put("storage_class", storageClass);
}
}
settings.put("unsafely_incompatible_with_s3_conditional_writes", randomBoolean());
AcknowledgedResponse putRepositoryResponse = clusterAdmin().preparePutRepository(
TEST_REQUEST_TIMEOUT,
TEST_REQUEST_TIMEOUT,
Expand All @@ -139,7 +140,15 @@ protected void createRepository(String repoName) {
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
}

public void testCompareAndExchangeCleanup() throws IOException {
@Override
public void testFailIfAlreadyExists() {
assumeTrue("S3 repository is not configured with conditional writes and existence check", supportsConditionalWrites());
super.testFailIfAlreadyExists();
}

public void testMPUCompareAndExchangeCleanup() throws IOException {
assumeFalse("S3 repository is configured condtional-writes and does not use MPU for CAS", supportsConditionalWrites());

final var timeOffsetMillis = new AtomicLong();
final var threadpool = new TestThreadPool(getTestName()) {
@Override
Expand Down Expand Up @@ -286,4 +295,9 @@ public void testMultipartCopy() {

assertArrayEquals(BytesReference.toBytes(blobBytes), targetBytes);
}

boolean supportsConditionalWrites() {
final var repoMetadata = node().injector().getInstance(RepositoriesService.class).repository(TEST_REPO_NAME).getMetadata();
return repoMetadata.settings().getAsBoolean("unsafely_incompatible_with_s3_conditional_writes", false) == false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,7 @@ static void putObject(
if (s3BlobStore.serverSideEncryption()) {
putRequestBuilder.serverSideEncryption(ServerSideEncryption.AES256);
}
if (s3BlobStore.supportsConditionalWrites(purpose)) {
if (s3BlobStore.supportsConditionalWrites()) {
switch (condition) {
case ConditionalOperation.IfMatch ifMatch -> putRequestBuilder.ifMatch(ifMatch.etag);
case ConditionalOperation.IfNoneMatch ignored -> putRequestBuilder.ifNoneMatch("*");
Expand Down Expand Up @@ -700,7 +700,7 @@ private void executeMultipart(
.uploadId(uploadId)
.multipartUpload(b -> b.parts(parts));

if (s3BlobStore.supportsConditionalWrites(purpose)) {
if (s3BlobStore.supportsConditionalWrites()) {
switch (condition) {
case ConditionalOperation.IfMatch ifMatch -> completeMultipartUploadRequestBuilder.ifMatch(ifMatch.etag);
case ConditionalOperation.IfNoneMatch ignored -> completeMultipartUploadRequestBuilder.ifNoneMatch("*");
Expand Down Expand Up @@ -1172,7 +1172,7 @@ private void completeMultipartUpload(String uploadId, String partETag, String ex
Operation.PUT_MULTIPART_OBJECT,
purpose
);
if (blobStore.supportsConditionalWrites(purpose)) {
if (blobStore.supportsConditionalWrites()) {
if (existingEtag == null) {
completeMultipartUploadRequestBuilder.ifNoneMatch("*");
} else {
Expand All @@ -1185,6 +1185,58 @@ private void completeMultipartUpload(String uploadId, String partETag, String ex
}
}

private void conditionalWriteCompareAndExchangeOperation(
OperationPurpose purpose,
String key,
BytesReference expected,
BytesReference updated,
ActionListener<OptionalBytesReference> listener
) {
assert blobStore.supportsConditionalWrites();

SubscribableListener

.<RegisterAndEtag>newForked(l -> getRegisterAndEtag(purpose, key, l))

.andThenApply(regEtag -> {
assert BytesArray.EMPTY.equals(RegisterAndEtag.ABSENT.registerContents())
: "absent-register must match empty-expected-register, or register will never be created";
if (expected.equals(regEtag.registerContents()) == false) {
// register does not match, return value from S3
// if register was changed, return current value
// if register was deleted, return ABSENT(BytesArray.EMPTY)
return OptionalBytesReference.of(regEtag.registerContents());
} else {
final var conditionalOperation = regEtag == RegisterAndEtag.ABSENT
? ConditionalOperation.IF_NONE_MATCH
: ConditionalOperation.ifMatch(regEtag.eTag());
try {
putObject(
purpose,
blobStore,
buildKey(key),
updated.length(),
() -> RequestBody.fromBytes(BytesReference.toBytes(updated)),
conditionalOperation
);
return OptionalBytesReference.of(expected);
} catch (SdkServiceException e) {
final var statusCode = RestStatus.fromCode(e.statusCode());
switch (statusCode) {
// conflict happened, there is no known register
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/conditional-writes.html#conditional-error-response
case NOT_FOUND, CONFLICT, PRECONDITION_FAILED -> {
return OptionalBytesReference.MISSING;
}
default -> throw e;
}
}
}
})

.addListener(listener);
}

@Override
public void compareAndExchangeRegister(
OperationPurpose purpose,
Expand All @@ -1193,14 +1245,18 @@ public void compareAndExchangeRegister(
BytesReference updated,
ActionListener<OptionalBytesReference> listener
) {
final var clientReference = blobStore.clientReference();
new MultipartUploadCompareAndExchangeOperation(
purpose,
clientReference.client(),
blobStore.bucket(),
key,
blobStore.getThreadPool()
).run(expected, updated, ActionListener.releaseBefore(clientReference, listener));
if (blobStore.supportsConditionalWrites()) {
conditionalWriteCompareAndExchangeOperation(purpose, key, expected, updated, listener);
} else {
final var clientReference = blobStore.clientReference();
new MultipartUploadCompareAndExchangeOperation(
purpose,
clientReference.client(),
blobStore.bucket(),
key,
blobStore.getThreadPool()
).run(expected, updated, ActionListener.releaseBefore(clientReference, listener));
}
}

/**
Expand Down Expand Up @@ -1274,7 +1330,7 @@ private String getRequiredEtag(OperationPurpose purpose, GetObjectResponse getOb
final var etag = getObjectResponse.eTag();
if (Strings.hasText(etag)) {
return etag;
} else if (blobStore.supportsConditionalWrites(purpose)) {
} else if (blobStore.supportsConditionalWrites()) {
throw new UnsupportedOperationException("GetObject response contained no ETag header, cannot perform conditional write");
} else {
// blob stores which do not support conditional writes may also not return ETag headers, but we won't use it anyway so return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,15 @@ class S3BlobStore implements BlobStore {

private final boolean addPurposeCustomQueryParameter;

/**
* Some storage claims S3-compatibility despite failing to support the {@code If-Match} and {@code If-None-Match} functionality
* properly. We allow to disable the use of this functionality, making all writes unconditional, using the
* {@link S3Repository#UNSAFELY_INCOMPATIBLE_WITH_S3_CONDITIONAL_WRITES} setting.
*/
public boolean supportsConditionalWrites() {
return supportsConditionalWrites;
}

S3BlobStore(
@Nullable ProjectId projectId,
S3Service service,
Expand Down Expand Up @@ -625,13 +634,4 @@ public void addPurposeQueryParameter(OperationPurpose purpose, AwsRequestOverrid
}
}

/**
* Some storage claims S3-compatibility despite failing to support the {@code If-Match} and {@code If-None-Match} functionality
* properly. We allow to disable the use of this functionality, making all writes unconditional, using the
* {@link S3Repository#UNSAFELY_INCOMPATIBLE_WITH_S3_CONDITIONAL_WRITES} setting.
*/
public boolean supportsConditionalWrites(OperationPurpose purpose) {
// REPOSITORY_ANALYSIS is a strict check for 100% S3 compatibility, including conditional write support
return supportsConditionalWrites || purpose == OperationPurpose.REPOSITORY_ANALYSIS;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1359,7 +1359,8 @@ class RejectsUploadPartRequests extends S3HttpHandler {

@Override
public void handle(HttpExchange exchange) throws IOException {
if (parseRequest(exchange).isUploadPartRequest()) {
final var s3request = parseRequest(exchange);
if (s3request.isUploadPartRequest() || s3request.isPutObjectRequest()) {
exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1);
} else {
super.handle(exchange);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public void testExecuteSingleUpload() throws IOException {
final S3BlobStore blobStore = mock(S3BlobStore.class);
when(blobStore.bucket()).thenReturn(bucketName);
when(blobStore.bufferSizeInBytes()).thenReturn((long) bufferSize);
when(blobStore.supportsConditionalWrites(any())).thenReturn(true);
when(blobStore.supportsConditionalWrites()).thenReturn(true);

final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore);

Expand Down Expand Up @@ -251,7 +251,7 @@ void testExecuteMultipart(boolean doCopy) throws IOException {
final S3BlobStore blobStore = mock(S3BlobStore.class);
when(blobStore.bucket()).thenReturn(bucketName);
when(blobStore.bufferSizeInBytes()).thenReturn(bufferSize);
when(blobStore.supportsConditionalWrites(any())).thenReturn(true);
when(blobStore.supportsConditionalWrites()).thenReturn(true);

final S3BlobStore sourceBlobStore = mock(S3BlobStore.class);
when(sourceBlobStore.bucket()).thenReturn(sourceBucketName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ protected static ElasticsearchCluster buildCluster(S3HttpFixture s3HttpFixture)
.build();
}

abstract S3ConsistencyModel consistencyModel();

@Override
protected Settings repositorySettings() {
final String bucket = System.getProperty("test.s3.bucket");
Expand All @@ -118,8 +120,7 @@ protected Settings repositorySettings() {
.put("max_copy_size_before_multipart", ByteSizeValue.ofMb(5))
// verify we always set the x-purpose header even if disabled for other repository operations
.put(randomBooleanSetting("add_purpose_custom_query_parameter"))
// this parameter is ignored for repo analysis
.put(randomBooleanSetting("unsafely_incompatible_with_s3_conditional_writes"))
.put("unsafely_incompatible_with_s3_conditional_writes", consistencyModel().hasConditionalWrites() == false)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,9 @@ public class S3RepositoryAnalysisRestIT extends AbstractS3RepositoryAnalysisRest
protected String getTestRestCluster() {
return cluster.getHttpAddresses();
}

@Override
S3ConsistencyModel consistencyModel() {
return S3ConsistencyModel.AWS_DEFAULT;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,9 @@ public void testRepositoryAnalysis() throws Exception {
super.testRepositoryAnalysis();
} // else we're running against a real AWS S3 which has a different consistency model, so this test isn't meaningful
}

@Override
S3ConsistencyModel consistencyModel() {
return S3ConsistencyModel.STRONG_MPUS;
}
}