Support concurrent multipart uploads in Azure#128449
Conversation
|
Hi @tlrx, I've created a changelog YAML for you. |
henningandersen
left a comment
There was a problem hiding this comment.
Looks great, left a few comments.
| .getBlobAsyncClient(blobName) | ||
| .getBlockBlobAsyncClient(); | ||
|
|
||
| Flux.fromIterable(multiparts) |
| .block(); | ||
| } | ||
| } catch (final BlobStorageException e) { | ||
| if (failIfAlreadyExists |
There was a problem hiding this comment.
Can be follow-up for sure, but I guess we may need some cleanup to deal with failures, deleting the staged blocks? I notice that the original version also does not seem to care so we can defer for now.
There was a problem hiding this comment.
Yes, there is more work to do for handling failures. I'll create a task for this if we decide to go this route.
modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java
Outdated
Show resolved
Hide resolved
modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java
Outdated
Show resolved
Hide resolved
|
|
||
| private static InputStream toSynchronizedInputStream(String blobName, InputStream delegate, MultiPart multipart) { | ||
| assert delegate.markSupported() : "An InputStream with mark support was expected"; | ||
| // We need to introduce a read barrier in order to provide visibility for the underlying |
There was a problem hiding this comment.
Let us leave this for now, but it seems strange to need this synchronized part really, since we expect it to be used serially only anyway - and this must require a happens-before relationship established in reactor code even if this is used across threads.
| stream.mark(Integer.MAX_VALUE); | ||
| final var bytesRead = new AtomicLong(0L); | ||
| return Flux.defer(() -> { | ||
| // Code in this Flux.defer() can be concurrently executed by multiple threads |
There was a problem hiding this comment.
Can it? I think that would break everything. I think it is an existing assumption and code comment copy - but if you agree that it should not perhaps worth adding a question mark/todo around it?
There was a problem hiding this comment.
This was the case when I worked on this last week, I could see different repository_azure threads resetting the same input stream instance concurrently so I assumed the existing comment was also applicable.
I updated the comment so that we can revisit this later, now I fixed the bugs I made.
modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java
Outdated
Show resolved
Hide resolved
henningandersen
left a comment
There was a problem hiding this comment.
LGTM.
We should tackle tests too, but I am good with merging this since it is not really used and we can start benchmarking the effect more easily then.
|
Pinging @elastic/es-distributed-coordination (Team:Distributed Coordination) |
|
Thanks Henning! |
Enhances existing integration test to account for #128449. Relates ES-11815
Relates ES-11815