Drain responses on completion for TransportNodesAction#130303
Drain responses on completion for TransportNodesAction#130303elasticsearchmachine merged 20 commits intoelastic:mainfrom
Conversation
This PR ensures the node responses are copied and drained exclusively in onCompletion so that they do not get concurrently modified by cancellation. Resolves: elastic#128852
|
Hi @ywangd, I've created a changelog YAML for you. |
|
Pinging @elastic/es-distributed-coordination (Team:Distributed Coordination) |
server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java
Outdated
Show resolved
Hide resolved
| ) { | ||
| final var waited = new AtomicBoolean(); | ||
| for (var response : testNodeResponses) { | ||
| if (waited.compareAndSet(false, true)) { |
There was a problem hiding this comment.
This is kind of a convoluted way to wait on a nonempty list. There's no concurrency here so the compareAndSet is a bit of a sledgehammer. Can we just check testNodeResponses.isEmpty()?
There was a problem hiding this comment.
This is to wait for only the first response. You are right there is no need for AtomicBoolean. I changed it to a primitive boolean variable.
| boolean waited = false; | ||
| for (var response : testNodeResponses) { | ||
| if (waited == false) { | ||
| waited = true; | ||
| safeAwait(barrier); | ||
| safeAwait(barrier); | ||
| } | ||
| } |
There was a problem hiding this comment.
Can we not just do this?
| boolean waited = false; | |
| for (var response : testNodeResponses) { | |
| if (waited == false) { | |
| waited = true; | |
| safeAwait(barrier); | |
| safeAwait(barrier); | |
| } | |
| } | |
| if (testNodeResponses.isEmpty() == false) { | |
| safeAwait(barrier); | |
| safeAwait(barrier); | |
| } |
Indeed can we not assert that testNodeResponses is nonempty in this test?
There was a problem hiding this comment.
The for-loop is to reproduce the ConcurrentModificationException reported in #128852. The test always passes without it.
There was a problem hiding this comment.
I see, could you add a comment to that effect or else this'll get "tidied up"
| assert task instanceof CancellableTask : "expect CancellableTask, but got: " + task; | ||
| final var cancellableTask = (CancellableTask) task; | ||
| assert cancellableTask.isCancelled(); | ||
| throw new TaskCancelledException("task cancelled [" + cancellableTask.getReasonCancelled() + "]"); |
There was a problem hiding this comment.
getReasonCancelled is racy according to its Javadocs: "May also be null if the task was just cancelled since we don't set the reason and the cancellation flag atomically." You need to use notifyIfCancelled to get the right behaviour here.
There was a problem hiding this comment.
Thanks. Pushed 3d07261. Please let me know if it has used the right listener.
| logger.debug("task cancelled after all responses were collected"); | ||
| assert task instanceof CancellableTask : "expect CancellableTask, but got: " + task; | ||
| final var cancellableTask = (CancellableTask) task; | ||
| assert cancellableTask.isCancelled(); | ||
| throw new TaskCancelledException("task cancelled [" + cancellableTask.getReasonCancelled() + "]"); |
There was a problem hiding this comment.
This change is to address the edge case commented here. But I struggle to write a test for it. Essentially we need the cancel to comes in after all node responses are collected but before the AtomicBoolean responsesHandled is checked. One option is to extract the creation of CancellableFanOut into its own protected method plus wrapping the returned value with a delgating CancellableFanOut. But this requires making the 4 protected methods in CancellableFanOut package private. I am a bit suspicous on whether this is the right path to go down. I am open to suggestions.
There was a problem hiding this comment.
I'd be content with a test which concurrently completes the action and cancels it, and asserts that we always either get an exception or we get a successful response. I expect such a test would find the bug here pretty reliably.
|
|
||
| try { | ||
| final var testNodesResponse = future.actionGet(SAFE_AWAIT_TIMEOUT); | ||
| assertFalse(cancellableTask.isCancelled()); |
There was a problem hiding this comment.
I don't think this'll hold in general, we could cancel the task after the completion has already passed the point of no return and then the task's cancellation flag will be set even though it completed successfully.
There was a problem hiding this comment.
Yeah good point, Thanks. I removed that in b38783d which also contains a few other tweaks.
|
@elasticmachine update branch |
|
@elasticmachine update branch |
|
@elasticmachine update branch |
This PR ensures the node responses are copied and drained exclusively in onCompletion so that they do not get concurrently modified by cancellation. Resolves: elastic#128852
This PR ensures the node responses are copied and drained exclusively in onCompletion so that they do not get concurrently modified by cancellation. Resolves: elastic#128852
This PR ensures the node responses are copied and drained exclusively in onCompletion so that they do not get concurrently modified by cancellation. Resolves: elastic#128852
This PR ensures the node responses are copied and drained exclusively in onCompletion so that they do not get concurrently modified by cancellation.
Resolves: #128852