Skip to content

x-pack/filebeat/input/streaming: re-evaluate url_program on reconnect#50383

Merged
efd6 merged 5 commits into
elastic:mainfrom
efd6:i14241-proofpoint_on_demand
Apr 30, 2026
Merged

x-pack/filebeat/input/streaming: re-evaluate url_program on reconnect#50383
efd6 merged 5 commits into
elastic:mainfrom
efd6:i14241-proofpoint_on_demand

Conversation

@efd6

@efd6 efd6 commented Apr 28, 2026

Copy link
Copy Markdown
Contributor

Proposed commit message

x-pack/filebeat/input/streaming: re-evaluate url_program on reconnect

Previously url_program was evaluated once at the start of FollowStream
and the resulting URL was reused for all reconnections within the
session. This meant cursor changes from processed messages could not
influence the URL used on reconnect.

Move the getURL call into both reconnect paths (error recovery and
OAuth2 token refresh) so that url_program sees the current cursor
state. On evaluation failure, fall back to the previously computed URL
rather than terminating the input.

To make the evolved cursor visible to url_program, change process() to
return the last known good cursor so callers can write it back into
the shared state map. The previous signature only returned an error;
the cursor updates from evalWith were confined to a shadowed local
variable and never propagated to FollowStream's state.

Also remove unnecessary bytes() wrapping from CEL programs in tests;
state.response already supports decode_json() directly.

Assisted-By: Cursor (claude-opus-4-6)

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • I have made corresponding change to the default configuration files
  • I have added tests that prove my fix is effective or that my feature works. Where relevant, I have used the stresstest.sh script to run them under stress conditions and race detector to verify their stability.
  • I have added an entry in ./changelog/fragments using the changelog tool.

Disruptive User Impact

How to test this PR locally

Related issues

Use cases

Screenshots

Logs

@efd6 efd6 self-assigned this Apr 28, 2026
@efd6 efd6 added enhancement Filebeat Filebeat backport-skip Skip notification from the automated backport with mergify Team:Security-Service Integrations Security Service Integrations Team labels Apr 28, 2026
@botelastic botelastic Bot added needs_team Indicates that the issue/PR needs a Team:* label and removed needs_team Indicates that the issue/PR needs a Team:* label labels Apr 28, 2026
@github-actions

Copy link
Copy Markdown
Contributor

🤖 GitHub comments

Just comment with:

  • run docs-build : Re-trigger the docs validation. (use unformatted text in the comment!)
  • /test : Run the Buildkite pipeline.
Previously url_program was evaluated once at the start of FollowStream
and the resulting URL was reused for all reconnections within the
session. This meant cursor changes from processed messages could not
influence the URL used on reconnect.

Move the getURL call into both reconnect paths (error recovery and
OAuth2 token refresh) so that url_program sees the current cursor
state. On evaluation failure, fall back to the previously computed URL
rather than terminating the input.

To make the evolved cursor visible to url_program, change process() to
return the last known good cursor so callers can write it back into
the shared state map. The previous signature only returned an error;
the cursor updates from evalWith were confined to a shadowed local
variable and never propagated to FollowStream's state.

Also remove unnecessary bytes() wrapping from CEL programs in tests;
state.response already supports decode_json() directly.

Assisted-By: Cursor (claude-opus-4-6)
@efd6 efd6 force-pushed the i14241-proofpoint_on_demand branch from 18180f7 to 028ccc6 Compare April 29, 2026 03:44
@efd6 efd6 marked this pull request as ready for review April 29, 2026 06:03
@efd6 efd6 requested a review from a team as a code owner April 29, 2026 06:03
@infra-vault-gh-plugin-prod

Copy link
Copy Markdown

Pinging @elastic/security-service-integrations (Team:Security-Service Integrations)

@coderabbitai

coderabbitai Bot commented Apr 29, 2026

Copy link
Copy Markdown
Contributor

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Enterprise

Run ID: 949ef098-01f8-4787-8290-3644b7215a00

📥 Commits

Reviewing files that changed from the base of the PR and between c789138 and c778c6c.

📒 Files selected for processing (1)
  • changelog/fragments/1777418848-streaming-url-program-reconnect.yaml
✅ Files skipped from review due to trivial changes (1)
  • changelog/fragments/1777418848-streaming-url-program-reconnect.yaml

📝 Walkthrough

Walkthrough

Recomputes the reconnect URL by re-evaluating url_program before each websocket reconnect (including on OAuth2 token refresh and after websocket read errors) so the session’s evolved cursor can influence the URL. processor.process now returns the last-known-good cursor; callers persist returned cursors into state["cursor"]. Changes touch websocket.go, input.go, crowdstrike.go, tests (CEL snippets), two reconnect tests, and a changelog fragment.

🚥 Pre-merge checks | ✅ 2
✅ Passed checks (2 passed)
Check name Status Explanation
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • 🛠️ Update Documentation: Commit on current branch
  • 🛠️ Update Documentation: Create PR

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
Review rate limit: 7/8 reviews remaining, refill in 7 minutes and 30 seconds.

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
x-pack/filebeat/input/streaming/input_test.go (1)

1045-1046: Add failure messages to assertions.

Per coding guidelines, testify assertions should include messages explaining the failure.

💡 Suggested improvement
-	assert.Equal(t, "/v1/stream", urls[0])
-	assert.Contains(t, urls[1], "sinceTime=2024-01-01T01:00:00Z")
+	assert.Equal(t, "/v1/stream", urls[0], "first connection should not have sinceTime query param")
+	assert.Contains(t, urls[1], "sinceTime=2024-01-01T01:00:00Z", "second connection URL should include sinceTime from first message cursor")
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@x-pack/filebeat/input/streaming/input_test.go` around lines 1045 - 1046, The
assertions on urls (assert.Equal(t, "/v1/stream", urls[0]) and
assert.Contains(t, urls[1], "sinceTime=2024-01-01T01:00:00Z")) lack failure
messages; update both to include descriptive messages (e.g., "expected first URL
path to be /v1/stream" and "expected second URL to contain sinceTime timestamp")
as the final argument to assert.Equal and assert.Contains so test failures
explain what went wrong when urls or their contents differ.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@x-pack/filebeat/input/streaming/input_test.go`:
- Around line 953-1047: In TestURLProgramReconnect replace the deprecated
logp.TestingSetup call with creating a test logger via logptest.NewTestingLogger
and pass that logger into the v2.Context (replace
logp.NewLogger("websocket_test") with the logger returned by
logptest.NewTestingLogger), add the necessary import for logptest (and add
errors to imports), and change the check after input{}.run from err !=
context.Canceled to using errors.Is(err, context.Canceled); this touches the
TestURLProgramReconnect function, the v2.Context initialization, and the final
error check around input{}.run.

---

Nitpick comments:
In `@x-pack/filebeat/input/streaming/input_test.go`:
- Around line 1045-1046: The assertions on urls (assert.Equal(t, "/v1/stream",
urls[0]) and assert.Contains(t, urls[1], "sinceTime=2024-01-01T01:00:00Z")) lack
failure messages; update both to include descriptive messages (e.g., "expected
first URL path to be /v1/stream" and "expected second URL to contain sinceTime
timestamp") as the final argument to assert.Equal and assert.Contains so test
failures explain what went wrong when urls or their contents differ.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Enterprise

Run ID: 9a44342c-f4bb-4e41-b86b-f58edcdb5247

📥 Commits

Reviewing files that changed from the base of the PR and between c37cfcf and 028ccc6.

📒 Files selected for processing (5)
  • changelog/fragments/1777418848-streaming-url-program-reconnect.yaml
  • x-pack/filebeat/input/streaming/crowdstrike.go
  • x-pack/filebeat/input/streaming/input.go
  • x-pack/filebeat/input/streaming/input_test.go
  • x-pack/filebeat/input/streaming/websocket.go
Comment thread x-pack/filebeat/input/streaming/input_test.go

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (1)
x-pack/filebeat/input/streaming/input_test.go (1)

1032-1034: ⚠️ Potential issue | 🟡 Minor

Use errors.Is for the canceled-error check.

This still compares err directly to context.Canceled, so a wrapped cancellation will be treated as a failure and make the test flaky. Please switch to errors.Is(err, context.Canceled).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@x-pack/filebeat/input/streaming/input_test.go` around lines 1032 - 1034, The
test currently compares err directly to context.Canceled after calling
input{}.run(v2Ctx, src, nil, &client), which fails if the cancellation error is
wrapped; change the condition to use errors.Is(err, context.Canceled) instead of
err == context.Canceled (import "errors" if necessary) so the check correctly
recognizes wrapped cancellation errors in the test.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Duplicate comments:
In `@x-pack/filebeat/input/streaming/input_test.go`:
- Around line 1032-1034: The test currently compares err directly to
context.Canceled after calling input{}.run(v2Ctx, src, nil, &client), which
fails if the cancellation error is wrapped; change the condition to use
errors.Is(err, context.Canceled) instead of err == context.Canceled (import
"errors" if necessary) so the check correctly recognizes wrapped cancellation
errors in the test.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Enterprise

Run ID: fb13c395-71d4-4007-ae10-f22f33c381a1

📥 Commits

Reviewing files that changed from the base of the PR and between 028ccc6 and b05ead1.

📒 Files selected for processing (1)
  • x-pack/filebeat/input/streaming/input_test.go
Comment thread x-pack/filebeat/input/streaming/websocket.go Outdated
Pass the current cursor from state["cursor"] to process() instead of
the startup snapshot in s.cursor. When process() handles a zero-events
message it returns its fallback cursor unchanged. Using the startup
snapshot as that fallback causes the cursor to regress to the initial
persisted value, so url_program on reconnect can produce a stale
sinceTime that moves backwards.

Fall back to s.cursor only when state["cursor"] has not yet been set
(i.e. before the first message is processed).

Apply the same fix to both the websocket and CrowdStrike paths.

Add TestURLProgramReconnectZeroEvents covering the regression scenario:
start with a persisted cursor, advance it, send a zero-events message,
force a reconnect, and assert the reconnect URL uses the advanced
cursor.
@efd6 efd6 requested a review from andrewkroh April 29, 2026 22:14
@efd6 efd6 added bugfix backport-8.19 Automated backport to the 8.19 branch backport-9.3 Automated backport to the 9.3 branch backport-9.4 and removed enhancement backport-skip Skip notification from the automated backport with mergify labels Apr 29, 2026
@efd6 efd6 merged commit 1f2cf8d into elastic:main Apr 30, 2026
30 of 33 checks passed
efd6 added a commit that referenced this pull request Apr 30, 2026
…#50383) (#50412)

Previously url_program was evaluated once at the start of FollowStream
and the resulting URL was reused for all reconnections within the
session. This meant cursor changes from processed messages could not
influence the URL used on reconnect.

Move the getURL call into both reconnect paths (error recovery and
OAuth2 token refresh) so that url_program sees the current cursor
state. On evaluation failure, fall back to the previously computed URL
rather than terminating the input.

To make the evolved cursor visible to url_program, change process() to
return the last known good cursor so callers can write it back into
the shared state map. The previous signature only returned an error;
the cursor updates from evalWith were confined to a shadowed local
variable and never propagated to FollowStream's state.

Also remove unnecessary bytes() wrapping from CEL programs in tests;
state.response already supports decode_json() directly.

Assisted-By: Cursor (claude-opus-4-6)
(cherry picked from commit 1f2cf8d)

Co-authored-by: Dan Kortschak <dan.kortschak@elastic.co>
efd6 added a commit that referenced this pull request Apr 30, 2026
…url_program on reconnect (#50410)

Previously url_program was evaluated once at the start of FollowStream
and the resulting URL was reused for all reconnections within the
session. This meant cursor changes from processed messages could not
influence the URL used on reconnect.

Move the getURL call into both reconnect paths (error recovery and
OAuth2 token refresh) so that url_program sees the current cursor
state. On evaluation failure, fall back to the previously computed URL
rather than terminating the input.

To make the evolved cursor visible to url_program, change process() to
return the last known good cursor so callers can write it back into
the shared state map. The previous signature only returned an error;
the cursor updates from evalWith were confined to a shadowed local
variable and never propagated to FollowStream's state.

Also remove unnecessary bytes() wrapping from CEL programs in tests;
state.response already supports decode_json() directly.

Assisted-By: Cursor (claude-opus-4-6)
(cherry picked from commit 1f2cf8d)

# Conflicts:
#	x-pack/filebeat/input/streaming/websocket.go

* resolve conflicts

---------

Co-authored-by: Dan Kortschak <dan.kortschak@elastic.co>
efd6 added a commit that referenced this pull request Apr 30, 2026
…#50383) (#50411)

Previously url_program was evaluated once at the start of FollowStream
and the resulting URL was reused for all reconnections within the
session. This meant cursor changes from processed messages could not
influence the URL used on reconnect.

Move the getURL call into both reconnect paths (error recovery and
OAuth2 token refresh) so that url_program sees the current cursor
state. On evaluation failure, fall back to the previously computed URL
rather than terminating the input.

To make the evolved cursor visible to url_program, change process() to
return the last known good cursor so callers can write it back into
the shared state map. The previous signature only returned an error;
the cursor updates from evalWith were confined to a shadowed local
variable and never propagated to FollowStream's state.

Also remove unnecessary bytes() wrapping from CEL programs in tests;
state.response already supports decode_json() directly.

Assisted-By: Cursor (claude-opus-4-6)
(cherry picked from commit 1f2cf8d)

Co-authored-by: Dan Kortschak <dan.kortschak@elastic.co>
brian-mckinney pushed a commit to brian-mckinney/beats that referenced this pull request May 5, 2026
…elastic#50383)

Previously url_program was evaluated once at the start of FollowStream
and the resulting URL was reused for all reconnections within the
session. This meant cursor changes from processed messages could not
influence the URL used on reconnect.

Move the getURL call into both reconnect paths (error recovery and
OAuth2 token refresh) so that url_program sees the current cursor
state. On evaluation failure, fall back to the previously computed URL
rather than terminating the input.

To make the evolved cursor visible to url_program, change process() to
return the last known good cursor so callers can write it back into
the shared state map. The previous signature only returned an error;
the cursor updates from evalWith were confined to a shadowed local
variable and never propagated to FollowStream's state.

Also remove unnecessary bytes() wrapping from CEL programs in tests;
state.response already supports decode_json() directly.

Assisted-By: Cursor (claude-opus-4-6)
brian-mckinney pushed a commit to brian-mckinney/beats that referenced this pull request May 6, 2026
…elastic#50383)

Previously url_program was evaluated once at the start of FollowStream
and the resulting URL was reused for all reconnections within the
session. This meant cursor changes from processed messages could not
influence the URL used on reconnect.

Move the getURL call into both reconnect paths (error recovery and
OAuth2 token refresh) so that url_program sees the current cursor
state. On evaluation failure, fall back to the previously computed URL
rather than terminating the input.

To make the evolved cursor visible to url_program, change process() to
return the last known good cursor so callers can write it back into
the shared state map. The previous signature only returned an error;
the cursor updates from evalWith were confined to a shadowed local
variable and never propagated to FollowStream's state.

Also remove unnecessary bytes() wrapping from CEL programs in tests;
state.response already supports decode_json() directly.

Assisted-By: Cursor (claude-opus-4-6)
brian-mckinney pushed a commit to brian-mckinney/beats that referenced this pull request May 6, 2026
…elastic#50383)

Previously url_program was evaluated once at the start of FollowStream
and the resulting URL was reused for all reconnections within the
session. This meant cursor changes from processed messages could not
influence the URL used on reconnect.

Move the getURL call into both reconnect paths (error recovery and
OAuth2 token refresh) so that url_program sees the current cursor
state. On evaluation failure, fall back to the previously computed URL
rather than terminating the input.

To make the evolved cursor visible to url_program, change process() to
return the last known good cursor so callers can write it back into
the shared state map. The previous signature only returned an error;
the cursor updates from evalWith were confined to a shadowed local
variable and never propagated to FollowStream's state.

Also remove unnecessary bytes() wrapping from CEL programs in tests;
state.response already supports decode_json() directly.

Assisted-By: Cursor (claude-opus-4-6)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

backport-8.19 Automated backport to the 8.19 branch backport-9.3 Automated backport to the 9.3 branch backport-9.4 bugfix Filebeat Filebeat Team:Security-Service Integrations Security Service Integrations Team

2 participants