Use Azure blob batch API to delete blobs in batches#114566
Use Azure blob batch API to delete blobs in batches#114566nicktindall merged 46 commits intoelastic:mainfrom
Conversation
2d9b166 to
8e08e60
Compare
| api "com.azure:azure-identity:1.13.2" | ||
| api "com.azure:azure-json:1.2.0" | ||
| api "com.azure:azure-storage-blob:12.27.1" | ||
| api "com.azure:azure-storage-blob-batch:12.23.1" |
There was a problem hiding this comment.
This is the version consistent with the others from the BOM
…_deletions_in_azure
| requires reactor.core; | ||
| requires reactor.netty.core; | ||
| requires reactor.netty.http; | ||
| requires com.azure.storage.blob.batch; |
There was a problem hiding this comment.
IntelliJ seemed to optimize the requires, the ones removed above are all transitively required by com.azure.storage.blob.batch.
…thout inspecting the body)
| PUT_BLOCK("PutBlock"), | ||
| PUT_BLOCK_LIST("PutBlockList"); | ||
| PUT_BLOCK_LIST("PutBlockList"), | ||
| BLOB_BATCH("BlobBatch"); |
There was a problem hiding this comment.
We can't be specific about the type of operation we're performing in a batch without inspecting the request body. I think it's better to track BlobBatch than potentially erroneously track BatchDelete (if one day we start using batch to "set access tier")
|
Pinging @elastic/es-distributed (Team:Distributed) |
…_deletions_in_azure
ywangd
left a comment
There was a problem hiding this comment.
Sorry for the delay here. I took a second and closer look at the changes. I think we might want to consider adding controls for resource usages (heap and concurrent requests).
Btw, the PR should be now labelled as :>enhancement due to the new setting.
| // locationMode is set per repository, not per client | ||
| this.locationMode = Repository.LOCATION_MODE_SETTING.get(metadata.settings()); | ||
| this.maxSinglePartUploadSize = Repository.MAX_SINGLE_PART_UPLOAD_SIZE_SETTING.get(metadata.settings()); | ||
| this.maxDeletesPerBatch = Repository.DELETION_BATCH_SIZE_SETTING.get(metadata.settings()); |
There was a problem hiding this comment.
Nit: Can we rename this field to be deletionBatchSize which is consistent with the setting name and avoid clashing with the static MAX_ELEMENTS_PER_BATCH?
|
|
||
| public static <E extends Exception> void doPrivilegedVoidExceptionExplicit(Class<E> exception, StorageRunnable action) throws E { | ||
| doPrivilegedVoidException(action); | ||
| } |
There was a problem hiding this comment.
Is this necessary? The existing code works ok without the explicit throws? If we want to change this, I'd prefer to update the existing method so that it explicit throws IOException in its catch block if the cause is an IOException. Since it requires some cascading changes, I think a separate PR would be better.
| for (BlobItem blobItem : blobContainerClient.listBlobs(options, null)) { | ||
| if (blobItem.isPrefix()) { | ||
| continue; | ||
| } | ||
| blobNames.add(blobItem.getName()); | ||
| bytesDeleted.addAndGet(blobItem.getProperties().getContentLength()); | ||
| blobsDeleted.incrementAndGet(); | ||
| } | ||
| if (blobNames.isEmpty() == false) { | ||
| deleteListOfBlobs(client, blobNames.iterator()); |
There was a problem hiding this comment.
I wonder whether there is an issue in materializing all blobItems from the listing before invoking delete. If there are a large number of items, it could be rather inefficient. IIUC, listBlobs returns an Iterable that lazily load. I think this change means we no longer leverage it?
There was a problem hiding this comment.
Good call, I've changed this now to use Flux all the way through. I think that should pipeline all this stuff better.
| final List<Mono<Void>> batchResponses = new ArrayList<>(); | ||
| while (blobNames.hasNext()) { | ||
| final BlobBatch currentBatch = batchAsyncClient.getBlobBatch(); | ||
| int counter = 0; | ||
| while (counter < maxDeletesPerBatch && blobNames.hasNext()) { | ||
| currentBatch.deleteBlob(container, blobNames.next()); | ||
| counter++; | ||
| } | ||
| batchResponses.add(batchAsyncClient.submitBatch(currentBatch)); |
There was a problem hiding this comment.
This is more likely a theoretical concern. Technically the number of concurrent requests here are also unbounded while previously it is hard-coded to 100.
There was a problem hiding this comment.
I've limited these. There is an underlying limit imposed at the node level (max open connections for a pool shared between clients, which defaults to 50 and Azure is HTTP/1.1 so that's effectively a global max concurrent requests for a node) and also for the execution of these blocks that dispatch the request there's a thread pool limit in the reactor runtime (which seems to default to 5 threads).
So I think with a limit of 100 the actual number of concurrent requests would be much lower. In any case I've added an explicit limit which is configurable and defaults to 10. Because these are bulk requests that means by default that's a maximum of 2560 concurrent individual deletes.
There was a problem hiding this comment.
Thanks for explaining. Makes sense from the networking perspective. I should have been more clear. By unbounded number of requests, I mostly mean the number of request objects that are constructed during this process. I guess they are eagerly instantiated even when the underlying network stack is not ready to consume them? If so, they consume memory and in extreme case may even lead to oom.
There was a problem hiding this comment.
Ah yep I understand. I think the limitation I added should restrict that as it'll limit the number of concurrent subscribers. As I understand it nothing happens until a subscriber asks for the next value(s) so there should only be at most 10 batch requests being processed at any one time.
| final CountDownLatch allRequestsFinished = new CountDownLatch(deleteTasks.size()); | ||
| final List<Throwable> errors = new CopyOnWriteArrayList<>(); |
There was a problem hiding this comment.
Similarly, I think we should limit the number of errors. Also, seeing the CountDownLatch makes me think whether it is possible to leverage the Flux approach similar to how existing deletion code reiles on Flux.then().blocks(). Maybe something like Flux#fromIterable so that it takes a custom Iterable implementation that internally constructs deletion requests which in turn consumes the listing response. I feel it could somewhat address my previous comments about limiting resource usages. It's just a rough idea. There maybe issues that I just haven't noticed.
…_deletions_in_azure
|
Hi @nicktindall, I've created a changelog YAML for you. |
ywangd
left a comment
There was a problem hiding this comment.
LGTM
Thanks for the iterations!
| logger.info("Using SAS token authentication"); | ||
| secureSettings.setString("azure.client.default.sas_token", System.getProperty("test.azure.sas_token")); | ||
| } else { | ||
| logger.info("Using key authentication"); |
There was a problem hiding this comment.
Nit: can we add --> in the beginning of the logging messages? It's an informal conventional to make these test logging messages easier to search.
| // We need to use a container-scoped BlobBatchClient, so the restype=container parameter | ||
| // is sent, and we can support all SAS token types | ||
| // See https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch?tabs=shared-access-signatures#authorization | ||
| final BlobBatchAsyncClient batchAsyncClient = new BlobBatchClientBuilder( | ||
| azureBlobServiceClient.getAsyncClient().getBlobContainerAsyncClient(container) | ||
| ).buildAsyncClient(); |
There was a problem hiding this comment.
I assume this still works with non-container-scoped tokens and other azure crendential types that we support?
There was a problem hiding this comment.
I've just kicked off the tests with the latest changes https://buildkite.com/elastic/elasticsearch-periodic/builds/4557
EDIT: third party tests all still pass :)
| }); | ||
| }, maxConcurrentBatchDeletes).collectList().block(); | ||
| if (errors.isEmpty() == false) { | ||
| final IOException ex = new IOException("Error deleting batches"); |
There was a problem hiding this comment.
Nit: I think we can include a brief message about exactly how many errors have been encountered if errorsCollected is greater than 10 so that it is clear that some errors are skipped.
| } catch (RuntimeException e) { | ||
| throw new IOException("Error deleting batches", e); |
There was a problem hiding this comment.
Mostly for my own education: Why are we specifically catching RuntimeException here? Is there a concrete concern of anything thrown here or is it to match the existing code. The existing code seems to catch a broader Exception instead?
There was a problem hiding this comment.
It was really because there are no checked exceptions thrown in the try/catch block, perhaps it's safer just to catch exception in case there's a SocketAccess.doPrivilegedVoidException-type scenario going on in there somewhere.
| /** | ||
| * The maximum number of concurrent batch deletes | ||
| */ | ||
| static final Setting<Integer> MAX_CONCURRENT_BATCH_DELETES_SETTING = Setting.intSetting("max_concurrent_batch_deletes", 10, 1); |
There was a problem hiding this comment.
I suggest we give it a sensible max value, e.g. 100.
Co-authored-by: Yang Wang <ywangd@gmail.com>
…_deletions_in_azure
This PR implements blob deletion as one or more blob batch requests, rather than deleting each blob individually.
The reason this wasn't implemented originally was due to concerns around the blob batch API's SAS token auth support.
The difference in the approach in this PR is down to the use of a container-scoped client which sends an additional request parameter (
restype=container). Using the API in this way means that SAS tokens are supported.I ran this branch through the
elasticsearch / periodicpipeline (results here) and everything passed. If I'm reading it correctly, that includes running theAzureStorageCleanupThirdPartyTestsusing a SAS token, and that test includes code paths that use the new bulk delete.Closes ES-9777