Skip to content

Fix race condition in CancellableRateLimitedFluxIterator#141323

Merged
nicktindall merged 3 commits intoelastic:mainfrom
nicktindall:fix_race_condition_flux_iterator
Jan 27, 2026
Merged

Fix race condition in CancellableRateLimitedFluxIterator#141323
nicktindall merged 3 commits intoelastic:mainfrom
nicktindall:fix_race_condition_flux_iterator

Conversation

@nicktindall
Copy link
Contributor

@nicktindall nicktindall commented Jan 27, 2026

When an error occurs with the download, the reactive framework calls org.elasticsearch.repositories.azure.CancellableRateLimitedFluxIterator#onError which sets done=true, then clears the queue, then sets error=t.

It's possible that hasNext sees the done=true but doesn't see the error=t which will mean hasNext can return false, which we take to mean we've reached the end of the sequence of chunks, when in fact the iteration terminated due to an error. hasNext should throw the error in this case so the consumer knows they've read an incomplete sequence.

This iterator terminates with an error rather infrequently because currently we rely on retries in the Azure client (i.e. we'd only see it terminate with an error once the configured retries are exhausted). With the shift to using the common retry infrastructure, this code path will be executed for every individual failure, so it becomes more likely that we will hit this race condition.

This change modifies the update to done and error to make it atomic. This will remove the above edge case.

@nicktindall nicktindall added >bug :Distributed/Snapshot/Restore Anything directly related to the `_snapshot/*` APIs labels Jan 27, 2026
@elasticsearchmachine elasticsearchmachine added Team:Distributed Coordination (obsolete) Meta label for Distributed Coordination team. Obsolete. Please do not use. v9.4.0 labels Jan 27, 2026
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-distributed-coordination (Team:Distributed Coordination)

@elasticsearchmachine
Copy link
Collaborator

Hi @nicktindall, I've created a changelog YAML for you.

cancelSubscription();
signalConsumer();
if (doneState.done() && doneState.error() != null) {
throw new RuntimeException(doneState.error());
Copy link
Contributor Author

@nicktindall nicktindall Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's possible we responded true to hasNext() then an error occurs between then and the call to next(). We'd have cleared the queue in the meantime, meaning we end up here. If that is the case, just throw the error that occurred.

Note that in onError we set the doneState THEN call clearQueue, so in the above scenario, if we see an empty queue here, we'll see the error in doneState. It may mean that we allow consumption of an item after the error occurs, but that's not a big deal because the caller will receive the error on their next call to hasNext.

running.set(false);
safeAwait(endBarrier);
}
}
Copy link
Contributor Author

@nicktindall nicktindall Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test reproduces the issue fairly quickly if you run with

while ./gradlew :modules:repository-azure:test --tests "org.elasticsearch.repositories.azure.CancellableRateLimitedFluxIteratorTests.testConcurrentErrorAndHasNext" -Dtests.iters=1000 --rerun --fail-fast; do echo again; done
@nicktindall nicktindall requested a review from ywangd January 27, 2026 02:22
Copy link
Member

@ywangd ywangd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

/**
* This is used to set 'done' and 'error' atomically
*/
private record DoneState(boolean done, Throwable error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we assert something like assert done == true || error == null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in 3744b53

final var endBarrier = new CyclicBarrier(3);
final var running = new AtomicBoolean(true);
final var outstandingRequests = new AtomicInteger(0);
for (int i = 0; i < 20; i++) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this loop necessary? It is basically here to run the test 20 times sequentially so that it is more likely to reproduce the failure?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it was just to make it more likely to reproduce. It still only takes ~15ms so I think it's good just to make it more likely to surface the issue?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's ok. Mostly making sure my understanding is correct.

@nicktindall nicktindall added auto-backport Automatically create backport pull requests when merged v9.3.1 v9.2.5 labels Jan 27, 2026
@nicktindall nicktindall merged commit 7c244ee into elastic:main Jan 27, 2026
35 of 36 checks passed
@elasticsearchmachine
Copy link
Collaborator

💚 Backport successful

Status Branch Result
9.3
9.2
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

auto-backport Automatically create backport pull requests when merged >bug :Distributed/Snapshot/Restore Anything directly related to the `_snapshot/*` APIs Team:Distributed Coordination (obsolete) Meta label for Distributed Coordination team. Obsolete. Please do not use. v9.2.5 v9.3.1 v9.4.0

3 participants