Skip to content

Fix cancellation race in CancellableRateLimitedFluxIterator#141974

Merged
nicktindall merged 10 commits intoelastic:mainfrom
nicktindall:allow_consumer_errors_to_be_overwritten_by_producer_errors
Feb 9, 2026
Merged

Fix cancellation race in CancellableRateLimitedFluxIterator#141974
nicktindall merged 10 commits intoelastic:mainfrom
nicktindall:allow_consumer_errors_to_be_overwritten_by_producer_errors

Conversation

@nicktindall
Copy link
Copy Markdown
Contributor

@nicktindall nicktindall commented Feb 6, 2026

This PR ensures that if the consumer and the producer update the done state independently:

  • Errors from the producer take priority over errors from the consumer
  • Errors always take priority over completions
@nicktindall nicktindall added >bug :Distributed/Snapshot/Restore Anything directly related to the `_snapshot/*` APIs labels Feb 6, 2026
@elasticsearchmachine elasticsearchmachine added Team:Distributed Meta label for distributed team. v9.4.0 labels Feb 6, 2026
@elasticsearchmachine
Copy link
Copy Markdown
Collaborator

Pinging @elastic/es-distributed (Team:Distributed)

@elasticsearchmachine
Copy link
Copy Markdown
Collaborator

Hi @nicktindall, I've created a changelog YAML for you.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR fixes a race condition in CancellableRateLimitedFluxIterator where concurrent cancellation and error events could lead to incorrect error propagation. The fix ensures that producer errors take precedence over consumer cancellation errors.

Changes:

  • Modified CancellableRateLimitedFluxIterator to track error source (producer vs consumer) and ensure producer errors are preserved even after cancellation
  • Added test coverage for cancellation after error and completion after cancellation scenarios
  • Updated existing test to verify CancellationException is properly wrapped

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.

File Description
modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/CancellableRateLimitedFluxIterator.java Changed doneState from volatile to AtomicReference, added fromProducer tracking to DoneState record, implemented updateDoneState method to atomically manage state transitions with priority for producer errors
modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/CancellableRateLimitedFluxIteratorTests.java Enhanced existing cancellation test to verify CancellationException wrapping, added testCancellationAfterError and testCompleteAfterCancellation to verify race condition fixes
docs/changelog/141974.yaml Added changelog entry documenting the bug fix

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 3 out of 3 changed files in this pull request and generated 1 comment.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

…itories/azure/CancellableRateLimitedFluxIteratorTests.java

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
@nicktindall nicktindall requested a review from ywangd February 6, 2026 06:57
Comment on lines +272 to +275
// If the existing error is not from the producer, allow it to be overwritten by one from the producer
if (existing.error() != null && existing.fromProducer() == false && newState.isErrorFromProducer()) {
return newState;
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we want to override the error if it is from the producer? Do we depend on this behaviour anywhere? I wonder whether it is necessary. Or maybe we can add the new error as suppressed?

Copy link
Copy Markdown
Contributor Author

@nicktindall nicktindall Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we only override the error if it's not from the producer (i.e. it's the cancellation error or the IllegalStateException). There are tests that ensure a producer error is apparent if it occurs after a cancellation, so this should be consistent with the existing behaviour.

(see org.elasticsearch.repositories.azure.CancellableRateLimitedFluxIteratorTests#testErrorAfterCancellation)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the producer error would have more information in it, so we should prefer it if both occur. The cancellation and illegal state exception are merely to prevent the consumer from thinking it reached the end of the stream without error.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My original comment was unclear. I did mean why a producer error should override a consumer one. I am OK with override. I wonder whether there is value in adding the one being overriden as suppressed. I might be overthinking if it has no use case.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think the client initiated errors aren't that interesting, they're kind-of synthetic and are really just to mark the iterator as failed.

Copy link
Copy Markdown
Member

@ywangd ywangd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM


if (doneState.done() && doneState.error() != null) {
final var currentDoneState = doneState.get();
if (currentDoneState.done() && currentDoneState.error() != null) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we encapsulate these two conditions with a method on DoneState?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 73d8181

}

// If the existing error is not from the producer, allow it to be overwritten by one from the producer
if (existing.error() != null && existing.fromProducer() == false && newState.isErrorFromProducer()) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly can we extract a method on DoneState for existing.error() != null && existing.fromProducer() == false?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also in 73d8181

@nicktindall nicktindall merged commit 1948d32 into elastic:main Feb 9, 2026
34 of 35 checks passed
@nicktindall nicktindall deleted the allow_consumer_errors_to_be_overwritten_by_producer_errors branch February 9, 2026 09:40
@nicktindall
Copy link
Copy Markdown
Contributor Author

💚 All backports created successfully

Status Branch Result
9.3
9.2

Questions ?

Please refer to the Backport tool documentation

nicktindall added a commit to nicktindall/elasticsearch that referenced this pull request Mar 10, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

>bug :Distributed/Snapshot/Restore Anything directly related to the `_snapshot/*` APIs Team:Distributed Meta label for distributed team. v9.4.0

4 participants