Fix race condition in CancellableRateLimitedFluxIterator#141323
Fix race condition in CancellableRateLimitedFluxIterator#141323nicktindall merged 3 commits intoelastic:mainfrom
Conversation
|
Pinging @elastic/es-distributed-coordination (Team:Distributed Coordination) |
|
Hi @nicktindall, I've created a changelog YAML for you. |
| cancelSubscription(); | ||
| signalConsumer(); | ||
| if (doneState.done() && doneState.error() != null) { | ||
| throw new RuntimeException(doneState.error()); |
There was a problem hiding this comment.
It's possible we responded true to hasNext() then an error occurs between then and the call to next(). We'd have cleared the queue in the meantime, meaning we end up here. If that is the case, just throw the error that occurred.
Note that in onError we set the doneState THEN call clearQueue, so in the above scenario, if we see an empty queue here, we'll see the error in doneState. It may mean that we allow consumption of an item after the error occurs, but that's not a big deal because the caller will receive the error on their next call to hasNext.
| running.set(false); | ||
| safeAwait(endBarrier); | ||
| } | ||
| } |
There was a problem hiding this comment.
This test reproduces the issue fairly quickly if you run with
while ./gradlew :modules:repository-azure:test --tests "org.elasticsearch.repositories.azure.CancellableRateLimitedFluxIteratorTests.testConcurrentErrorAndHasNext" -Dtests.iters=1000 --rerun --fail-fast; do echo again; done
| /** | ||
| * This is used to set 'done' and 'error' atomically | ||
| */ | ||
| private record DoneState(boolean done, Throwable error) { |
There was a problem hiding this comment.
Can we assert something like assert done == true || error == null?
| final var endBarrier = new CyclicBarrier(3); | ||
| final var running = new AtomicBoolean(true); | ||
| final var outstandingRequests = new AtomicInteger(0); | ||
| for (int i = 0; i < 20; i++) { |
There was a problem hiding this comment.
Is this loop necessary? It is basically here to run the test 20 times sequentially so that it is more likely to reproduce the failure?
There was a problem hiding this comment.
Yeah it was just to make it more likely to reproduce. It still only takes ~15ms so I think it's good just to make it more likely to surface the issue?
There was a problem hiding this comment.
That's ok. Mostly making sure my understanding is correct.
When an error occurs with the download, the reactive framework calls
org.elasticsearch.repositories.azure.CancellableRateLimitedFluxIterator#onErrorwhich setsdone=true, then clears the queue, then setserror=t.It's possible that
hasNextsees thedone=truebut doesn't see theerror=twhich will meanhasNextcan returnfalse, which we take to mean we've reached the end of the sequence of chunks, when in fact the iteration terminated due to an error.hasNextshould throw the error in this case so the consumer knows they've read an incomplete sequence.This iterator terminates with an error rather infrequently because currently we rely on retries in the Azure client (i.e. we'd only see it terminate with an error once the configured retries are exhausted). With the shift to using the common retry infrastructure, this code path will be executed for every individual failure, so it becomes more likely that we will hit this race condition.
This change modifies the update to
doneanderrorto make it atomic. This will remove the above edge case.