Fix cancellation race in CancellableRateLimitedFluxIterator#141974
Conversation
|
Pinging @elastic/es-distributed (Team:Distributed) |
|
Hi @nicktindall, I've created a changelog YAML for you. |
There was a problem hiding this comment.
Pull request overview
This PR fixes a race condition in CancellableRateLimitedFluxIterator where concurrent cancellation and error events could lead to incorrect error propagation. The fix ensures that producer errors take precedence over consumer cancellation errors.
Changes:
- Modified
CancellableRateLimitedFluxIteratorto track error source (producer vs consumer) and ensure producer errors are preserved even after cancellation - Added test coverage for cancellation after error and completion after cancellation scenarios
- Updated existing test to verify CancellationException is properly wrapped
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/CancellableRateLimitedFluxIterator.java |
Changed doneState from volatile to AtomicReference, added fromProducer tracking to DoneState record, implemented updateDoneState method to atomically manage state transitions with priority for producer errors |
modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/CancellableRateLimitedFluxIteratorTests.java |
Enhanced existing cancellation test to verify CancellationException wrapping, added testCancellationAfterError and testCompleteAfterCancellation to verify race condition fixes |
docs/changelog/141974.yaml |
Added changelog entry documenting the bug fix |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 3 out of 3 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
…itories/azure/CancellableRateLimitedFluxIteratorTests.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
| // If the existing error is not from the producer, allow it to be overwritten by one from the producer | ||
| if (existing.error() != null && existing.fromProducer() == false && newState.isErrorFromProducer()) { | ||
| return newState; | ||
| } |
There was a problem hiding this comment.
Why do we want to override the error if it is from the producer? Do we depend on this behaviour anywhere? I wonder whether it is necessary. Or maybe we can add the new error as suppressed?
There was a problem hiding this comment.
No, we only override the error if it's not from the producer (i.e. it's the cancellation error or the IllegalStateException). There are tests that ensure a producer error is apparent if it occurs after a cancellation, so this should be consistent with the existing behaviour.
(see org.elasticsearch.repositories.azure.CancellableRateLimitedFluxIteratorTests#testErrorAfterCancellation)
There was a problem hiding this comment.
I think the producer error would have more information in it, so we should prefer it if both occur. The cancellation and illegal state exception are merely to prevent the consumer from thinking it reached the end of the stream without error.
There was a problem hiding this comment.
My original comment was unclear. I did mean why a producer error should override a consumer one. I am OK with override. I wonder whether there is value in adding the one being overriden as suppressed. I might be overthinking if it has no use case.
There was a problem hiding this comment.
Yeah I think the client initiated errors aren't that interesting, they're kind-of synthetic and are really just to mark the iterator as failed.
|
|
||
| if (doneState.done() && doneState.error() != null) { | ||
| final var currentDoneState = doneState.get(); | ||
| if (currentDoneState.done() && currentDoneState.error() != null) { |
There was a problem hiding this comment.
Can we encapsulate these two conditions with a method on DoneState?
| } | ||
|
|
||
| // If the existing error is not from the producer, allow it to be overwritten by one from the producer | ||
| if (existing.error() != null && existing.fromProducer() == false && newState.isErrorFromProducer()) { |
There was a problem hiding this comment.
Similarly can we extract a method on DoneState for existing.error() != null && existing.fromProducer() == false?
💚 All backports created successfully
Questions ?Please refer to the Backport tool documentation |
This PR ensures that if the consumer and the producer update the done state independently: