Wait for per-pipeline acknowledgments on shutdown and have beaters own shutdown sequencing#51136
Conversation
|
Pinging @elastic/elastic-agent-data-plane (Team:Elastic-Agent-Data-Plane) |
🤖 GitHub commentsJust comment with:
|
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughThis PR implements a graceful shutdown mechanism for Beats that waits for in-flight events to be acknowledged before stopping. It extends the Possibly related PRs
Suggested labels
🚥 Pre-merge checks | ✅ 2✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
|
This pull request is now in conflicts. Could you fix it? 🙏 |
cmacknz
left a comment
There was a problem hiding this comment.
I've done a first pass on this and I don't see any major issues besides how much code is in here. Is there any way you could split this up?
I asked Claude and it had some reasonable suggestions to split it in half, at least the removal of intake queue ID could be a follow up now that it's also in here.
This reverts commit 159c5fb.
|
@cmacknz I can split it up, was just trying to get it all to a state so I can get elastic/elastic-agent#13000 merged. I removed the change that took out the |
cmacknz
left a comment
There was a problem hiding this comment.
Thanks for splitting this up, it made it easier to go through in detail. I found one area around the use of c.snapshotProducers that looks problematic, and Claude found another spot that looks like a possible deadlock.
Generally looking good but there is a lot of callback interaction to trace through here, not sure there's anything to be done about that though.
|
This pull request is now in conflicts. Could you fix it? 🙏 |
TL;DRThe Buildkite failure is a configuration/mergeability failure, not a test or code-runtime failure: the upload step never reached Remediation
Investigation detailsRoot CauseBuildkite runs For this build, that merge failed with content conflicts before pipeline upload could continue. The PR metadata also reported Evidence
Verification
What is this? | From workflow: PR Buildkite Detective Give us feedback! React with 🚀 if perfect, 👍 if helpful, 👎 if not. |
cmacknz
left a comment
There was a problem hiding this comment.
LGTM, I've read this several times now and have had Claude go through it multiple times as well and can't find any more significant issues.
|
@cmacknz Looks like we need review from some of the other teams. |
Proposed commit message
Wait for per-pipeline acknowledgments on shutdown and have beaters own shutdown sequencing.
When Beats run as reloadable OTel receivers that share a single in-memory queue, shutting one pipeline down was not clean: the pipeline could be torn down before its Beater had stopped its inputs, and it did not wait for its own events to be acknowledged before releasing resources. The result was that in-flight events could be dropped on shutdown or reload, and one pipeline's shutdown could affect others sharing the queue.
ACKWaitChan(), a channel that closes once the producer is closed and all of its events have been acknowledged (or immediately for producers that don't track acks). Implemented for the slabqueue, memqueue and diskqueue.Close()stops accepting new events and closes the queue producer but returns immediately; the pipeline finalizes acknowledgments afterwards. A small reaper finalizes a client as soon as its events drain, so closed clients don't pile up while a pipeline keeps running.Runreturns (so inputs are closed first), with a watchdog that releases the pipeline if a Beater fails to return promptly.Checklist
[ ] I have made corresponding changes to the documentation[ ] I have made corresponding change to the default configuration filesstresstest.shscript to run them under stress conditions and race detector to verify their stability../changelog/fragmentsusing the changelog tool.Disruptive User Impact
None
Related issues