[libbeat/output/kafka]: fix data race and panic on Publish during Close#51484
Conversation
The previous shutdown fix wrapped the producer send in a select on c.done, but Close closes c.done and then calls producer.AsyncClose(), which closes the producer's input channel. Once both channels are closed, Go's select picks a ready case pseudo-randomly, and a send on a closed channel is "ready" but panics. This left a residual data race (chansend vs closechan) and a "send on closed channel" panic on shutdown, which a stress run under the race detector reproduced reliably. Serialize sends against the channel close with a sync.RWMutex: Publish sends under the read lock (via the new send helper) after a non-blocking c.done check, and Close closes c.done and then takes the write lock around AsyncClose. Read and write locks are mutually exclusive, so a send and the close can never overlap; the leading c.done check ensures Publish never enters the blocking select once shutdown has started, when the channel may already be closed. Verified with script/stresstest.sh: 2158 runs under -race and 62828 runs without, both with 0 failures; the original code fails almost immediately under the same stress.
🤖 GitHub commentsJust comment with:
|
|
Pinging @elastic/elastic-agent-data-plane (Team:Elastic-Agent-Data-Plane) |
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Enterprise Run ID: 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
📝 WalkthroughWalkthroughThe Kafka client in 🚥 Pre-merge checks | ✅ 2✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
TL;DR
Remediation
Investigation detailsRoot CauseThe PR adds producerMux sync.RWMutex
producer sarama.AsyncProducerBecause the branch currently has Evidence
A local type client struct {
producerMux sync.RWMutex
- producer sarama.AsyncProducer
+ producer sarama.AsyncProducer
}Verification
What is this? | From workflow: PR Buildkite Detective Give us feedback! React with 🚀 if perfect, 👍 if helpful, 👎 if not. |
|
Tick the box to add this pull request to the merge queue (same as
|
|
@Mergifyio backport 9.4 9.3 8.19 |
✅ Backports have been createdDetails
|
…se (#51484) (#51511) ## Proposed commit message `Close` closes `c.done` and then calls `producer.AsyncClose()`, which closes the producer's input channel. If `Publish` picks the send case on an already-closed channel, it produces a data race (`chansend` vs `closechan`) and panic. This PR serializes sends against the channel close using a `sync.RWMutex`: - `Publish` delivers each event through a new `send` helper that holds the read lock and performs a non-blocking `c.done` check before the blocking `select`. - `Close` closes `c.done` (to release any blocked send) and then takes the write lock around `AsyncClose`. ## How to test this PR locally ```bash # Reproduces the data race / panic on main, passes with this PR. ./script/stresstest.sh --race ./libbeat/outputs/kafka '^TestClientShutdownPanic$' -p 28 -timeout 30s # Without the race detector (exercises the actual "send on closed channel" panic). ./script/stresstest.sh ./libbeat/outputs/kafka '^TestClientShutdownPanic$' -p 28 -timeout 30s ``` With this change both profiles ran with zero failures (2158 runs under `-race`, 62828 runs without). On `main` the `--race` profile reports `WARNING: DATA RACE` within seconds. ## Related issues - Relates #46109 - Relates #46446 (cherry picked from commit 15771bb) Co-authored-by: Orestis Floros <orestis.floros@elastic.co>
…se (#51484) (#51509) ## Proposed commit message `Close` closes `c.done` and then calls `producer.AsyncClose()`, which closes the producer's input channel. If `Publish` picks the send case on an already-closed channel, it produces a data race (`chansend` vs `closechan`) and panic. This PR serializes sends against the channel close using a `sync.RWMutex`: - `Publish` delivers each event through a new `send` helper that holds the read lock and performs a non-blocking `c.done` check before the blocking `select`. - `Close` closes `c.done` (to release any blocked send) and then takes the write lock around `AsyncClose`. ## How to test this PR locally ```bash # Reproduces the data race / panic on main, passes with this PR. ./script/stresstest.sh --race ./libbeat/outputs/kafka '^TestClientShutdownPanic$' -p 28 -timeout 30s # Without the race detector (exercises the actual "send on closed channel" panic). ./script/stresstest.sh ./libbeat/outputs/kafka '^TestClientShutdownPanic$' -p 28 -timeout 30s ``` With this change both profiles ran with zero failures (2158 runs under `-race`, 62828 runs without). On `main` the `--race` profile reports `WARNING: DATA RACE` within seconds. ## Related issues - Relates #46109 - Relates #46446 (cherry picked from commit 15771bb) Co-authored-by: Orestis Floros <orestis.floros@elastic.co>
…se (#51484) (#51510) ## Proposed commit message `Close` closes `c.done` and then calls `producer.AsyncClose()`, which closes the producer's input channel. If `Publish` picks the send case on an already-closed channel, it produces a data race (`chansend` vs `closechan`) and panic. This PR serializes sends against the channel close using a `sync.RWMutex`: - `Publish` delivers each event through a new `send` helper that holds the read lock and performs a non-blocking `c.done` check before the blocking `select`. - `Close` closes `c.done` (to release any blocked send) and then takes the write lock around `AsyncClose`. ## How to test this PR locally ```bash # Reproduces the data race / panic on main, passes with this PR. ./script/stresstest.sh --race ./libbeat/outputs/kafka '^TestClientShutdownPanic$' -p 28 -timeout 30s # Without the race detector (exercises the actual "send on closed channel" panic). ./script/stresstest.sh ./libbeat/outputs/kafka '^TestClientShutdownPanic$' -p 28 -timeout 30s ``` With this change both profiles ran with zero failures (2158 runs under `-race`, 62828 runs without). On `main` the `--race` profile reports `WARNING: DATA RACE` within seconds. ## Related issues - Relates #46109 - Relates #46446 (cherry picked from commit 15771bb) Co-authored-by: Orestis Floros <orestis.floros@elastic.co>
Proposed commit message
Closeclosesc.doneand then callsproducer.AsyncClose(), which closes the producer's input channel. IfPublishpicks the send case on an already-closed channel, it produces a data race (chansendvsclosechan) and panic.This PR serializes sends against the channel close using a
sync.RWMutex:Publishdelivers each event through a newsendhelper that holds the read lock and performs a non-blockingc.donecheck before the blockingselect.Closeclosesc.done(to release any blocked send) and then takes the write lock aroundAsyncClose.Checklist
stresstest.shscript to run them under stress conditions and race detector to verify their stability../changelog/fragmentsusing the changelog tool.How to test this PR locally
With this change both profiles ran with zero failures (2158 runs under
-race, 62828 runs without). Onmainthe--raceprofile reportsWARNING: DATA RACEwithin seconds.Related issues