Skip to content

Wait for per-pipeline acknowledgments on shutdown and have beaters own shutdown sequencing#51136

Merged
blakerouse merged 11 commits into
elastic:mainfrom
blakerouse:pipeline-disconnect
Jun 18, 2026
Merged

Wait for per-pipeline acknowledgments on shutdown and have beaters own shutdown sequencing#51136
blakerouse merged 11 commits into
elastic:mainfrom
blakerouse:pipeline-disconnect

Conversation

@blakerouse

@blakerouse blakerouse commented Jun 6, 2026

Copy link
Copy Markdown
Contributor

Proposed commit message

Wait for per-pipeline acknowledgments on shutdown and have beaters own shutdown sequencing.

When Beats run as reloadable OTel receivers that share a single in-memory queue, shutting one pipeline down was not clean: the pipeline could be torn down before its Beater had stopped its inputs, and it did not wait for its own events to be acknowledged before releasing resources. The result was that in-flight events could be dropped on shutdown or reload, and one pipeline's shutdown could affect others sharing the queue.

  • Queue producers now expose ACKWaitChan(), a channel that closes once the producer is closed and all of its events have been acknowledged (or immediately for producers that don't track acks). Implemented for the slabqueue, memqueue and diskqueue.
  • The OTel output controller tracks the producers it hands out and, on disconnect, waits only for that pipeline's acknowledgments (bounded by the shutdown context). The shared queue is fully drained and torn down only when the last pipeline leaves.
  • Client shutdown is now two-stage: Close() stops accepting new events and closes the queue producer but returns immediately; the pipeline finalizes acknowledgments afterwards. A small reaper finalizes a client as soon as its events drain, so closed clients don't pile up while a pipeline keeps running.
  • Beaters own shutdown sequencing: the framework disconnects the publisher pipeline only after Run returns (so inputs are closed first), with a watchdog that releases the pipeline if a Beater fails to return promptly.

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • [ ] I have made corresponding changes to the documentation
  • [ ] I have made corresponding change to the default configuration files
  • I have added tests that prove my fix is effective or that my feature works. Where relevant, I have used the stresstest.sh script to run them under stress conditions and race detector to verify their stability.
  • I have added an entry in ./changelog/fragments using the changelog tool.

Disruptive User Impact

None

Related issues

@blakerouse blakerouse self-assigned this Jun 6, 2026
@blakerouse blakerouse added backport-skip Skip notification from the automated backport with mergify Team:Elastic-Agent-Data-Plane Label for the Agent Data Plane team labels Jun 6, 2026
@blakerouse blakerouse requested review from a team as code owners June 6, 2026 01:32
@blakerouse blakerouse requested review from AndersonQ and rdner June 6, 2026 01:32
@botelastic botelastic Bot added the needs_team Indicates that the issue/PR needs a Team:* label label Jun 6, 2026
@infra-vault-gh-plugin-prod

Copy link
Copy Markdown

Pinging @elastic/elastic-agent-data-plane (Team:Elastic-Agent-Data-Plane)

@botelastic botelastic Bot removed the needs_team Indicates that the issue/PR needs a Team:* label label Jun 6, 2026
@github-actions

github-actions Bot commented Jun 6, 2026

Copy link
Copy Markdown
Contributor

🤖 GitHub comments

Just comment with:

  • run docs-build : Re-trigger the docs validation. (use unformatted text in the comment!)
  • /test : Run the Buildkite pipeline.
@coderabbitai

coderabbitai Bot commented Jun 6, 2026

Copy link
Copy Markdown
Contributor

Review Change Stack

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

This PR implements a graceful shutdown mechanism for Beats that waits for in-flight events to be acknowledged before stopping. It extends the queue.Producer interface with an ACKWaitChan() method and implements it across diskqueue, memqueue, and slabqueue backends with atomic publish/resolve counters and force-close fan-out. Pipeline client shutdown is refactored from a single blocking stage into two stages: Close() stops new events and closes the producer immediately, while disconnect() finalizes acknowledgment handling after drain. A reaper goroutine in Pipeline monitors ACKWaitChan and finalizes clients as their events drain, with Disconnect handling any remaining clients. otelOutputController gains a tracked-producer registry with context-bounded graceful/forced shutdown. A shutdown watchdog in libbeat/cmd/instance/beat.go forces pipeline disconnect if beater.Run does not return within a configurable grace period. Filebeat's shutdown path is updated to call Publisher.Disconnect with a timeout context. BeatReceiver.Shutdown is updated to accept a context.Context and propagated through all x-pack receivers.

Possibly related PRs

  • elastic/beats#50655: Introduced Pipeline.Disconnect(ctx) which this PR builds on for bounded ACK-drain during shutdown.
  • elastic/beats#50625: Fixed the Publish/Close race in client.Close, which this PR's two-stage shutdown model relies on.
  • elastic/beats#51047: Modifies slabqueue producer lifecycle and drain semantics that this PR extends with ACKWaitChan and resolveN accounting.

Suggested labels

Team:Elastic-Agent-Control-Plane

🚥 Pre-merge checks | ✅ 2
✅ Passed checks (2 passed)
Check name Status Explanation
Linked Issues check ✅ Passed All five linked issues are addressed: beaters own shutdown sequencing (#49794), otelOutputController tracks producers (#50102), queue producers expose ACKWaitChan (#50103), two-stage client shutdown (#50104), and per-pipeline ack waiting (#50105).
Out of Scope Changes check ✅ Passed All changes are aligned with the PR objectives. No unrelated modifications detected outside the shutdown sequencing and ack-wait tracking scope.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • 🛠️ Update Documentation

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions

This comment has been minimized.

@github-actions

This comment has been minimized.

@github-actions

This comment has been minimized.

@github-actions

This comment has been minimized.

@github-actions

This comment has been minimized.

@github-actions

This comment has been minimized.

@blakerouse blakerouse marked this pull request as draft June 11, 2026 15:05
@mergify

mergify Bot commented Jun 11, 2026

Copy link
Copy Markdown
Contributor

This pull request is now in conflicts. Could you fix it? 🙏
To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/

git fetch upstream
git checkout -b pipeline-disconnect upstream/pipeline-disconnect
git merge upstream/main
git push upstream pipeline-disconnect
@blakerouse blakerouse marked this pull request as ready for review June 15, 2026 19:48

@cmacknz cmacknz left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've done a first pass on this and I don't see any major issues besides how much code is in here. Is there any way you could split this up?

I asked Claude and it had some reasonable suggestions to split it in half, at least the removal of intake queue ID could be a follow up now that it's also in here.

@blakerouse

Copy link
Copy Markdown
Contributor Author

@cmacknz I can split it up, was just trying to get it all to a state so I can get elastic/elastic-agent#13000 merged.

I removed the change that took out the intake_queue_id in this PR. I will do it in a follow up.

@cmacknz cmacknz left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for splitting this up, it made it easier to go through in detail. I found one area around the use of c.snapshotProducers that looks problematic, and Claude found another spot that looks like a possible deadlock.

Generally looking good but there is a lot of callback interaction to trace through here, not sure there's anything to be done about that though.

Comment thread libbeat/publisher/pipeline/client_test.go Outdated
Comment thread libbeat/publisher/pipeline/output_otel.go Outdated
Comment thread libbeat/publisher/queue/slabqueue/producer.go Outdated
Comment thread libbeat/cmd/instance/beat.go
Comment thread libbeat/publisher/queue/slabqueue/queue.go
Comment thread filebeat/beater/filebeat.go
Comment thread libbeat/publisher/pipeline/client.go Outdated
Comment thread libbeat/publisher/queue/memqueue/broker.go
@blakerouse

Copy link
Copy Markdown
Contributor Author

@cmacknz @belimawr Thanks for the reviews. I have updated the code from your comments and resolved them. Let me know if you see anything else or have any other questions.

Comment thread libbeat/publisher/queue/memqueue/produce.go Outdated
Comment thread libbeat/publisher/queue/diskqueue/producer.go
Comment thread libbeat/publisher/pipeline/output_process.go
@mergify

mergify Bot commented Jun 17, 2026

Copy link
Copy Markdown
Contributor

This pull request is now in conflicts. Could you fix it? 🙏
To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/

git fetch upstream
git checkout -b pipeline-disconnect upstream/pipeline-disconnect
git merge upstream/main
git push upstream pipeline-disconnect

@belimawr belimawr left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM.

@github-actions

Copy link
Copy Markdown
Contributor

TL;DR

The Buildkite failure is a configuration/mergeability failure, not a test or code-runtime failure: the upload step never reached .buildkite/pipeline.yml upload because .buildkite/hooks/post-checkout could not merge PR commit dd2bfe9ffb3609d6994d9ba204b796fbec659dcb into main. Resolve the PR/main conflicts and rerun Buildkite.

Remediation

  • Rebase or merge the latest main into blakerouse:pipeline-disconnect and resolve conflicts in libbeat/publisher/pipeline/output_otel.go, libbeat/publisher/queue/slabqueue/batch.go, and libbeat/publisher/queue/slabqueue/queue.go.
  • Push the resolved branch and rerun the failed Buildkite build; the pipeline upload should proceed once the post-checkout merge succeeds.
Investigation details

Root Cause

Buildkite runs .buildkite/hooks/post-checkout for PRs. That hook fetches the target branch, creates pr_merge_${BUILDKITE_PULL_REQUEST}, then runs git merge --no-edit "${pr_commit}" and exits non-zero if the merge fails (.buildkite/hooks/post-checkout:15-31).

For this build, that merge failed with content conflicts before pipeline upload could continue. The PR metadata also reported mergeable_state as dirty during this investigation.

Evidence

  • Build: https://buildkite.com/elastic/beats/builds/47829
  • Job/step: :pipeline::arrow_up: Upload Pipeline: .buildkite/pipeline.yml
  • Key log excerpt from /tmp/gh-aw/buildkite-logs/beats-pipelinearrow_up-upload-pipeline-buildkitepipelineyml.txt:127-147:
Auto-merging libbeat/publisher/pipeline/output_otel.go
CONFLICT (content): Merge conflict in libbeat/publisher/pipeline/output_otel.go
Auto-merging libbeat/publisher/queue/slabqueue/batch.go
CONFLICT (content): Merge conflict in libbeat/publisher/queue/slabqueue/batch.go
Auto-merging libbeat/publisher/queue/slabqueue/queue.go
CONFLICT (content): Merge conflict in libbeat/publisher/queue/slabqueue/queue.go
Automatic merge failed; fix conflicts and then commit the result.
--- Merge failed: 1
🚨 Error: running "repository post-checkout" shell hook: The repository post-checkout hook exited with status 1

Verification

  • Not run locally: this failure is reproduced directly by the Buildkite checkout hook log, and resolving it requires updating the PR branch against current main.

What is this? | From workflow: PR Buildkite Detective

Give us feedback! React with 🚀 if perfect, 👍 if helpful, 👎 if not.

@cmacknz cmacknz left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, I've read this several times now and have had Claude go through it multiple times as well and can't find any more significant issues.

@blakerouse

Copy link
Copy Markdown
Contributor Author

@cmacknz Looks like we need review from some of the other teams.

@blakerouse blakerouse merged commit 42c6f9d into elastic:main Jun 18, 2026
197 of 200 checks passed
@blakerouse blakerouse deleted the pipeline-disconnect branch June 18, 2026 13:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

backport-skip Skip notification from the automated backport with mergify Team:Elastic-Agent-Data-Plane Label for the Agent Data Plane team

3 participants