[ML] Ensure queued AbstractRunnables are notified when executor stops#135966
[ML] Ensure queued AbstractRunnables are notified when executor stops#135966DonalEvans merged 5 commits intoelastic:mainfrom
Conversation
AbstractProcessWorkerExecutorService.notifyQueueRunnables() was making an incorrect assumption that all AbstractRunnables that were submitted for execution would be queued as AbstractRunnables. However, PriorityProcessWorkerExecutorService wraps AbstractRunnables in OrderedRunnable before queueing them, and since OrderedRunnable is not an AbstractRunnable, these were skipped when notifyQueueRunnables() drained the queue, leading to potential hangs. - Refactor notifyQueueRunnables() to allow PriorityProcessWorkerExecutorService to notify the AbstractRunnable contained within queued OrderedRunnables - Ensure that notifyQueueRunnables() is called and the executor marked as shut down if an exception is thrown from start() - Add unit tests Closes elastic#134651
|
Pinging @elastic/ml-core (Team:ML) |
|
Hi @DonalEvans, I've created a changelog YAML for you. |
jonathan-buttner
left a comment
There was a problem hiding this comment.
Great work, left one suggestion
| } | ||
| } | ||
|
|
||
| protected abstract void notifyIfAbstractRunnable(T runnable, Exception ex, String msg); |
There was a problem hiding this comment.
What do you think about having AbstractProcessWorkerExecutorService contain the logic for notifyIfAbstractRunnable and relying on an abstract method like getAsAbstractRunnable which either returns the AbstractRunnable or null. That way child classes don't need to call back into the parent to do the notification, they would only return the abstract runnable if it was one.
Something like:
protected abstract AbstractRunnable getAsAbstractRunnable(T runnable);
private void notifyIfAbstractRunnable(T runnable, Exception ex, String msg) {
var abstractRunnable = getAsAbstractRunnable(runnable);
if (abstractRunnable != null) {
notifyAbstractRunnable(ex, msg, abstractRunnable);
}
}
Then PriorityProcessWorkerExecutorService would have something like:
@Override
protected AbstractRunnable getAsAbstractRunnable(OrderedRunnable orderedRunnable, Exception ex, String msg) {
// The runnable contained within OrderedRunnable is always an AbstractRunnable, so no need to check the type
return orderedRunnable.runnable();
}
There was a problem hiding this comment.
Good idea, this is better
| * most cases will be the insertion order | ||
| */ | ||
| public record OrderedRunnable(RequestPriority priority, long tieBreaker, Runnable runnable) | ||
| protected record OrderedRunnable(RequestPriority priority, long tieBreaker, AbstractRunnable runnable) |
There was a problem hiding this comment.
Another option is make OrderedRunnable extend AbstractRunnable and implement the required methods as calling the runnable parameter
There was a problem hiding this comment.
This solution adds more code than the original one, but I think it might be better, since it simplifies the logic for determining what we need to notify.
…elastic#135966) AbstractProcessWorkerExecutorService.notifyQueueRunnables() was making an incorrect assumption that all AbstractRunnables that were submitted for execution would be queued as AbstractRunnables. However, PriorityProcessWorkerExecutorService wraps AbstractRunnables in OrderedRunnable before queueing them, and since OrderedRunnable is not an AbstractRunnable, these were skipped when notifyQueueRunnables() drained the queue, leading to potential hangs. - Make OrderedRunnable extend AbstractRunnable and pass onFailure() and onRejection() calls to the AbstractRunnable it wraps - Ensure that notifyQueueRunnables() is called and the executor marked as shut down if an exception is thrown from start() - Add unit tests - Update docs/changelog/135966.yaml Closes elastic#134651
💔 Backport failed
You can use sqren/backport to manually backport by running |
…elastic#135966) AbstractProcessWorkerExecutorService.notifyQueueRunnables() was making an incorrect assumption that all AbstractRunnables that were submitted for execution would be queued as AbstractRunnables. However, PriorityProcessWorkerExecutorService wraps AbstractRunnables in OrderedRunnable before queueing them, and since OrderedRunnable is not an AbstractRunnable, these were skipped when notifyQueueRunnables() drained the queue, leading to potential hangs. - Make OrderedRunnable extend AbstractRunnable and pass onFailure() and onRejection() calls to the AbstractRunnable it wraps - Ensure that notifyQueueRunnables() is called and the executor marked as shut down if an exception is thrown from start() - Add unit tests - Update docs/changelog/135966.yaml Closes elastic#134651 (cherry picked from commit d3d013e)
💚 All backports created successfully
Questions ?Please refer to the Backport tool documentation |
…elastic#135966) AbstractProcessWorkerExecutorService.notifyQueueRunnables() was making an incorrect assumption that all AbstractRunnables that were submitted for execution would be queued as AbstractRunnables. However, PriorityProcessWorkerExecutorService wraps AbstractRunnables in OrderedRunnable before queueing them, and since OrderedRunnable is not an AbstractRunnable, these were skipped when notifyQueueRunnables() drained the queue, leading to potential hangs. - Make OrderedRunnable extend AbstractRunnable and pass onFailure() and onRejection() calls to the AbstractRunnable it wraps - Ensure that notifyQueueRunnables() is called and the executor marked as shut down if an exception is thrown from start() - Add unit tests - Update docs/changelog/135966.yaml Closes elastic#134651 (cherry picked from commit d3d013e)
…elastic#135966) AbstractProcessWorkerExecutorService.notifyQueueRunnables() was making an incorrect assumption that all AbstractRunnables that were submitted for execution would be queued as AbstractRunnables. However, PriorityProcessWorkerExecutorService wraps AbstractRunnables in OrderedRunnable before queueing them, and since OrderedRunnable is not an AbstractRunnable, these were skipped when notifyQueueRunnables() drained the queue, leading to potential hangs. - Make OrderedRunnable extend AbstractRunnable and pass onFailure() and onRejection() calls to the AbstractRunnable it wraps - Ensure that notifyQueueRunnables() is called and the executor marked as shut down if an exception is thrown from start() - Add unit tests - Update docs/changelog/135966.yaml Closes elastic#134651 (cherry picked from commit d3d013e)
…#135966) (#136122) AbstractProcessWorkerExecutorService.notifyQueueRunnables() was making an incorrect assumption that all AbstractRunnables that were submitted for execution would be queued as AbstractRunnables. However, PriorityProcessWorkerExecutorService wraps AbstractRunnables in OrderedRunnable before queueing them, and since OrderedRunnable is not an AbstractRunnable, these were skipped when notifyQueueRunnables() drained the queue, leading to potential hangs. - Make OrderedRunnable extend AbstractRunnable and pass onFailure() and onRejection() calls to the AbstractRunnable it wraps - Ensure that notifyQueueRunnables() is called and the executor marked as shut down if an exception is thrown from start() - Add unit tests - Update docs/changelog/135966.yaml Closes #134651
…#135966) (#136126) AbstractProcessWorkerExecutorService.notifyQueueRunnables() was making an incorrect assumption that all AbstractRunnables that were submitted for execution would be queued as AbstractRunnables. However, PriorityProcessWorkerExecutorService wraps AbstractRunnables in OrderedRunnable before queueing them, and since OrderedRunnable is not an AbstractRunnable, these were skipped when notifyQueueRunnables() drained the queue, leading to potential hangs. - Make OrderedRunnable extend AbstractRunnable and pass onFailure() and onRejection() calls to the AbstractRunnable it wraps - Ensure that notifyQueueRunnables() is called and the executor marked as shut down if an exception is thrown from start() - Add unit tests - Update docs/changelog/135966.yaml Closes #134651 (cherry picked from commit d3d013e)
…#135966) (#136128) AbstractProcessWorkerExecutorService.notifyQueueRunnables() was making an incorrect assumption that all AbstractRunnables that were submitted for execution would be queued as AbstractRunnables. However, PriorityProcessWorkerExecutorService wraps AbstractRunnables in OrderedRunnable before queueing them, and since OrderedRunnable is not an AbstractRunnable, these were skipped when notifyQueueRunnables() drained the queue, leading to potential hangs. - Make OrderedRunnable extend AbstractRunnable and pass onFailure() and onRejection() calls to the AbstractRunnable it wraps - Ensure that notifyQueueRunnables() is called and the executor marked as shut down if an exception is thrown from start() - Add unit tests - Update docs/changelog/135966.yaml Closes #134651 (cherry picked from commit d3d013e)
…#135966) (#136125) AbstractProcessWorkerExecutorService.notifyQueueRunnables() was making an incorrect assumption that all AbstractRunnables that were submitted for execution would be queued as AbstractRunnables. However, PriorityProcessWorkerExecutorService wraps AbstractRunnables in OrderedRunnable before queueing them, and since OrderedRunnable is not an AbstractRunnable, these were skipped when notifyQueueRunnables() drained the queue, leading to potential hangs. - Make OrderedRunnable extend AbstractRunnable and pass onFailure() and onRejection() calls to the AbstractRunnable it wraps - Ensure that notifyQueueRunnables() is called and the executor marked as shut down if an exception is thrown from start() - Add unit tests - Update docs/changelog/135966.yaml Closes #134651 (cherry picked from commit d3d013e)
…#135966) (#136127) AbstractProcessWorkerExecutorService.notifyQueueRunnables() was making an incorrect assumption that all AbstractRunnables that were submitted for execution would be queued as AbstractRunnables. However, PriorityProcessWorkerExecutorService wraps AbstractRunnables in OrderedRunnable before queueing them, and since OrderedRunnable is not an AbstractRunnable, these were skipped when notifyQueueRunnables() drained the queue, leading to potential hangs. - Make OrderedRunnable extend AbstractRunnable and pass onFailure() and onRejection() calls to the AbstractRunnable it wraps - Ensure that notifyQueueRunnables() is called and the executor marked as shut down if an exception is thrown from start() - Add unit tests - Update docs/changelog/135966.yaml Closes #134651 (cherry picked from commit d3d013e)
AbstractProcessWorkerExecutorService.notifyQueueRunnables() was making an incorrect assumption that all AbstractRunnables that were submitted for execution would be queued as AbstractRunnables. However, PriorityProcessWorkerExecutorService wraps AbstractRunnables in OrderedRunnable before queueing them, and since OrderedRunnable is not an AbstractRunnable, these were skipped when notifyQueueRunnables() drained the queue, leading to potential hangs.
Closes #134651