Skip to content

[libbeat/output/kafka]: fix data race and panic on Publish during Close#51484

Merged
orestisfl merged 2 commits into
elastic:mainfrom
orestisfl:fix/kafka-output-publish-close-race
Jun 25, 2026
Merged

[libbeat/output/kafka]: fix data race and panic on Publish during Close#51484
orestisfl merged 2 commits into
elastic:mainfrom
orestisfl:fix/kafka-output-publish-close-race

Conversation

@orestisfl

@orestisfl orestisfl commented Jun 24, 2026

Copy link
Copy Markdown
Contributor

Proposed commit message

Close closes c.done and then calls producer.AsyncClose(), which closes the producer's input channel. If Publish picks the send case on an already-closed channel, it produces a data race (chansend vs closechan) and panic.

This PR serializes sends against the channel close using a sync.RWMutex:

  • Publish delivers each event through a new send helper that holds the read lock and performs a non-blocking c.done check before the blocking select.
  • Close closes c.done (to release any blocked send) and then takes the write lock around AsyncClose.

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • ~~ I have made corresponding changes to the documentation~~
  • ~~ I have made corresponding change to the default configuration files~~
  • I have added tests that prove my fix is effective or that my feature works. Where relevant, I have used the stresstest.sh script to run them under stress conditions and race detector to verify their stability.
  • I have added an entry in ./changelog/fragments using the changelog tool.

How to test this PR locally

# Reproduces the data race / panic on main, passes with this PR.
./script/stresstest.sh --race ./libbeat/outputs/kafka '^TestClientShutdownPanic$' -p 28 -timeout 30s

# Without the race detector (exercises the actual "send on closed channel" panic).
./script/stresstest.sh ./libbeat/outputs/kafka '^TestClientShutdownPanic$' -p 28 -timeout 30s

With this change both profiles ran with zero failures (2158 runs under -race, 62828 runs without). On main the --race profile reports WARNING: DATA RACE within seconds.

Related issues

The previous shutdown fix wrapped the producer send in a select on
c.done, but Close closes c.done and then calls producer.AsyncClose(),
which closes the producer's input channel. Once both channels are
closed, Go's select picks a ready case pseudo-randomly, and a send on a
closed channel is "ready" but panics. This left a residual data race
(chansend vs closechan) and a "send on closed channel" panic on
shutdown, which a stress run under the race detector reproduced reliably.

Serialize sends against the channel close with a sync.RWMutex: Publish
sends under the read lock (via the new send helper) after a non-blocking
c.done check, and Close closes c.done and then takes the write lock
around AsyncClose. Read and write locks are mutually exclusive, so a send
and the close can never overlap; the leading c.done check ensures Publish
never enters the blocking select once shutdown has started, when the
channel may already be closed.

Verified with script/stresstest.sh: 2158 runs under -race and 62828 runs
without, both with 0 failures; the original code fails almost immediately
under the same stress.
@orestisfl orestisfl added bug Team:Elastic-Agent-Data-Plane Label for the Agent Data Plane team backport-active-all Automated backport with mergify to all the active branches labels Jun 24, 2026
@orestisfl orestisfl self-assigned this Jun 24, 2026
@botelastic botelastic Bot added needs_team Indicates that the issue/PR needs a Team:* label and removed needs_team Indicates that the issue/PR needs a Team:* label labels Jun 24, 2026
@github-actions

Copy link
Copy Markdown
Contributor

🤖 GitHub comments

Just comment with:

  • run docs-build : Re-trigger the docs validation. (use unformatted text in the comment!)
  • /test : Run the Buildkite pipeline.
@orestisfl orestisfl marked this pull request as ready for review June 24, 2026 08:03
@orestisfl orestisfl requested a review from a team as a code owner June 24, 2026 08:03
@orestisfl orestisfl requested review from AndersonQ and belimawr June 24, 2026 08:03
@infra-vault-gh-plugin-prod

Copy link
Copy Markdown

Pinging @elastic/elastic-agent-data-plane (Team:Elastic-Agent-Data-Plane)

@coderabbitai

coderabbitai Bot commented Jun 24, 2026

Copy link
Copy Markdown
Contributor

Review Change Stack

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Enterprise

Run ID: 35d569ee-5cb7-410e-91a9-a1a450a36e9d

📥 Commits

Reviewing files that changed from the base of the PR and between 3b9cc6c and 963136e.

📒 Files selected for processing (1)
  • libbeat/outputs/kafka/client.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • libbeat/outputs/kafka/client.go

📝 Walkthrough

Walkthrough

The Kafka client in libbeat/outputs/kafka/client.go now uses producerMux to coordinate shutdown and publishing. Close closes c.done and then acquires a write lock before calling producer.AsyncClose. Publish now calls a new send helper that read-locks, checks c.done, and sends to the producer input channel or returns false when shutdown is in progress. A changelog fragment documenting the fix is also added.

🚥 Pre-merge checks | ✅ 2
✅ Passed checks (2 passed)
Check name Status Explanation
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • 🛠️ Update Documentation

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands.

@github-actions

Copy link
Copy Markdown
Contributor

TL;DR

Libbeat: Run check/update failed because libbeat/outputs/kafka/client.go is not gofmt-clean after this PR’s struct field addition. Run gofmt on that file and push the resulting alignment change.

Remediation

  • Run gofmt -w libbeat/outputs/kafka/client.go and commit the resulting diff.
  • Re-run make -C libbeat check update && make check-no-changes or retrigger the Buildkite pipeline.
Investigation details

Root Cause

The PR adds producerMux sync.RWMutex immediately above producer sarama.AsyncProducer in libbeat/outputs/kafka/client.go:55-56. gofmt aligns consecutive struct fields, so the committed file should become:

producerMux sync.RWMutex
producer    sarama.AsyncProducer

Because the branch currently has producer sarama.AsyncProducer without the extra alignment spacing, the libbeat check modifies libbeat/outputs/kafka/client.go and fails the no-dirty-worktree check.

Evidence

Error: some files are not up-to-date. Run 'make update' then review and commit the changes. Modified: [libbeat/outputs/kafka/client.go]
make: *** [scripts/Makefile:141: check] Error 1

A local gofmt -d check on the same field layout produces:

 type client struct {
 	producerMux sync.RWMutex
-	producer sarama.AsyncProducer
+	producer    sarama.AsyncProducer
 }

Verification

  • Checked the PR diff and Buildkite log.
  • Ran a focused gofmt -d reproduction for the affected struct field layout.

What is this? | From workflow: PR Buildkite Detective

Give us feedback! React with 🚀 if perfect, 👍 if helpful, 👎 if not.

@mergify

mergify Bot commented Jun 24, 2026

Copy link
Copy Markdown
Contributor

Tick the box to add this pull request to the merge queue (same as @mergifyio queue).

  • Queue this pull request
@orestisfl orestisfl merged commit 15771bb into elastic:main Jun 25, 2026
201 checks passed
@orestisfl orestisfl deleted the fix/kafka-output-publish-close-race branch June 25, 2026 06:39
@github-actions

Copy link
Copy Markdown
Contributor

@Mergifyio backport 9.4 9.3 8.19

@mergify

mergify Bot commented Jun 25, 2026

Copy link
Copy Markdown
Contributor
orestisfl added a commit that referenced this pull request Jun 25, 2026
…se (#51484) (#51511)

## Proposed commit message

`Close` closes `c.done` and then calls `producer.AsyncClose()`, which closes the producer's input channel. If `Publish` picks the send case on an already-closed channel, it produces a data race (`chansend` vs `closechan`) and panic.

This PR serializes sends against the channel close using a `sync.RWMutex`:

- `Publish` delivers each event through a new `send` helper that holds the read lock and performs a non-blocking `c.done` check before the blocking `select`.
- `Close` closes `c.done` (to release any blocked send) and then takes the write lock around `AsyncClose`.

## How to test this PR locally

```bash
# Reproduces the data race / panic on main, passes with this PR.
./script/stresstest.sh --race ./libbeat/outputs/kafka '^TestClientShutdownPanic$' -p 28 -timeout 30s

# Without the race detector (exercises the actual "send on closed channel" panic).
./script/stresstest.sh ./libbeat/outputs/kafka '^TestClientShutdownPanic$' -p 28 -timeout 30s
```

With this change both profiles ran with zero failures (2158 runs under `-race`, 62828 runs without). On `main` the `--race` profile reports `WARNING: DATA RACE` within seconds.

## Related issues

- Relates #46109
- Relates #46446

(cherry picked from commit 15771bb)

Co-authored-by: Orestis Floros <orestis.floros@elastic.co>
orestisfl added a commit that referenced this pull request Jun 25, 2026
…se (#51484) (#51509)

## Proposed commit message

`Close` closes `c.done` and then calls `producer.AsyncClose()`, which closes the producer's input channel. If `Publish` picks the send case on an already-closed channel, it produces a data race (`chansend` vs `closechan`) and panic.

This PR serializes sends against the channel close using a `sync.RWMutex`:

- `Publish` delivers each event through a new `send` helper that holds the read lock and performs a non-blocking `c.done` check before the blocking `select`.
- `Close` closes `c.done` (to release any blocked send) and then takes the write lock around `AsyncClose`.

## How to test this PR locally

```bash
# Reproduces the data race / panic on main, passes with this PR.
./script/stresstest.sh --race ./libbeat/outputs/kafka '^TestClientShutdownPanic$' -p 28 -timeout 30s

# Without the race detector (exercises the actual "send on closed channel" panic).
./script/stresstest.sh ./libbeat/outputs/kafka '^TestClientShutdownPanic$' -p 28 -timeout 30s
```

With this change both profiles ran with zero failures (2158 runs under `-race`, 62828 runs without). On `main` the `--race` profile reports `WARNING: DATA RACE` within seconds.

## Related issues

- Relates #46109
- Relates #46446

(cherry picked from commit 15771bb)

Co-authored-by: Orestis Floros <orestis.floros@elastic.co>
orestisfl added a commit that referenced this pull request Jun 25, 2026
…se (#51484) (#51510)

## Proposed commit message

`Close` closes `c.done` and then calls `producer.AsyncClose()`, which closes the producer's input channel. If `Publish` picks the send case on an already-closed channel, it produces a data race (`chansend` vs `closechan`) and panic.

This PR serializes sends against the channel close using a `sync.RWMutex`:

- `Publish` delivers each event through a new `send` helper that holds the read lock and performs a non-blocking `c.done` check before the blocking `select`.
- `Close` closes `c.done` (to release any blocked send) and then takes the write lock around `AsyncClose`.

## How to test this PR locally

```bash
# Reproduces the data race / panic on main, passes with this PR.
./script/stresstest.sh --race ./libbeat/outputs/kafka '^TestClientShutdownPanic$' -p 28 -timeout 30s

# Without the race detector (exercises the actual "send on closed channel" panic).
./script/stresstest.sh ./libbeat/outputs/kafka '^TestClientShutdownPanic$' -p 28 -timeout 30s
```

With this change both profiles ran with zero failures (2158 runs under `-race`, 62828 runs without). On `main` the `--race` profile reports `WARNING: DATA RACE` within seconds.

## Related issues

- Relates #46109
- Relates #46446

(cherry picked from commit 15771bb)

Co-authored-by: Orestis Floros <orestis.floros@elastic.co>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

backport-active-all Automated backport with mergify to all the active branches bug Team:Elastic-Agent-Data-Plane Label for the Agent Data Plane team

2 participants