x-pack/filebeat/input/streaming: re-evaluate url_program on reconnect#50383
Conversation
🤖 GitHub commentsJust comment with:
|
Previously url_program was evaluated once at the start of FollowStream and the resulting URL was reused for all reconnections within the session. This meant cursor changes from processed messages could not influence the URL used on reconnect. Move the getURL call into both reconnect paths (error recovery and OAuth2 token refresh) so that url_program sees the current cursor state. On evaluation failure, fall back to the previously computed URL rather than terminating the input. To make the evolved cursor visible to url_program, change process() to return the last known good cursor so callers can write it back into the shared state map. The previous signature only returned an error; the cursor updates from evalWith were confined to a shadowed local variable and never propagated to FollowStream's state. Also remove unnecessary bytes() wrapping from CEL programs in tests; state.response already supports decode_json() directly. Assisted-By: Cursor (claude-opus-4-6)
18180f7 to
028ccc6
Compare
|
Pinging @elastic/security-service-integrations (Team:Security-Service Integrations) |
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Enterprise Run ID: 📒 Files selected for processing (1)
✅ Files skipped from review due to trivial changes (1)
📝 WalkthroughWalkthroughRecomputes the reconnect URL by re-evaluating 🚥 Pre-merge checks | ✅ 2✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Review rate limit: 7/8 reviews remaining, refill in 7 minutes and 30 seconds.Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
x-pack/filebeat/input/streaming/input_test.go (1)
1045-1046: Add failure messages to assertions.Per coding guidelines, testify assertions should include messages explaining the failure.
💡 Suggested improvement
- assert.Equal(t, "/v1/stream", urls[0]) - assert.Contains(t, urls[1], "sinceTime=2024-01-01T01:00:00Z") + assert.Equal(t, "/v1/stream", urls[0], "first connection should not have sinceTime query param") + assert.Contains(t, urls[1], "sinceTime=2024-01-01T01:00:00Z", "second connection URL should include sinceTime from first message cursor")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@x-pack/filebeat/input/streaming/input_test.go` around lines 1045 - 1046, The assertions on urls (assert.Equal(t, "/v1/stream", urls[0]) and assert.Contains(t, urls[1], "sinceTime=2024-01-01T01:00:00Z")) lack failure messages; update both to include descriptive messages (e.g., "expected first URL path to be /v1/stream" and "expected second URL to contain sinceTime timestamp") as the final argument to assert.Equal and assert.Contains so test failures explain what went wrong when urls or their contents differ.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@x-pack/filebeat/input/streaming/input_test.go`:
- Around line 953-1047: In TestURLProgramReconnect replace the deprecated
logp.TestingSetup call with creating a test logger via logptest.NewTestingLogger
and pass that logger into the v2.Context (replace
logp.NewLogger("websocket_test") with the logger returned by
logptest.NewTestingLogger), add the necessary import for logptest (and add
errors to imports), and change the check after input{}.run from err !=
context.Canceled to using errors.Is(err, context.Canceled); this touches the
TestURLProgramReconnect function, the v2.Context initialization, and the final
error check around input{}.run.
---
Nitpick comments:
In `@x-pack/filebeat/input/streaming/input_test.go`:
- Around line 1045-1046: The assertions on urls (assert.Equal(t, "/v1/stream",
urls[0]) and assert.Contains(t, urls[1], "sinceTime=2024-01-01T01:00:00Z")) lack
failure messages; update both to include descriptive messages (e.g., "expected
first URL path to be /v1/stream" and "expected second URL to contain sinceTime
timestamp") as the final argument to assert.Equal and assert.Contains so test
failures explain what went wrong when urls or their contents differ.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Enterprise
Run ID: 9a44342c-f4bb-4e41-b86b-f58edcdb5247
📒 Files selected for processing (5)
changelog/fragments/1777418848-streaming-url-program-reconnect.yamlx-pack/filebeat/input/streaming/crowdstrike.gox-pack/filebeat/input/streaming/input.gox-pack/filebeat/input/streaming/input_test.gox-pack/filebeat/input/streaming/websocket.go
There was a problem hiding this comment.
♻️ Duplicate comments (1)
x-pack/filebeat/input/streaming/input_test.go (1)
1032-1034:⚠️ Potential issue | 🟡 MinorUse
errors.Isfor the canceled-error check.This still compares
errdirectly tocontext.Canceled, so a wrapped cancellation will be treated as a failure and make the test flaky. Please switch toerrors.Is(err, context.Canceled).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@x-pack/filebeat/input/streaming/input_test.go` around lines 1032 - 1034, The test currently compares err directly to context.Canceled after calling input{}.run(v2Ctx, src, nil, &client), which fails if the cancellation error is wrapped; change the condition to use errors.Is(err, context.Canceled) instead of err == context.Canceled (import "errors" if necessary) so the check correctly recognizes wrapped cancellation errors in the test.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@x-pack/filebeat/input/streaming/input_test.go`:
- Around line 1032-1034: The test currently compares err directly to
context.Canceled after calling input{}.run(v2Ctx, src, nil, &client), which
fails if the cancellation error is wrapped; change the condition to use
errors.Is(err, context.Canceled) instead of err == context.Canceled (import
"errors" if necessary) so the check correctly recognizes wrapped cancellation
errors in the test.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Enterprise
Run ID: fb13c395-71d4-4007-ae10-f22f33c381a1
📒 Files selected for processing (1)
x-pack/filebeat/input/streaming/input_test.go
Pass the current cursor from state["cursor"] to process() instead of the startup snapshot in s.cursor. When process() handles a zero-events message it returns its fallback cursor unchanged. Using the startup snapshot as that fallback causes the cursor to regress to the initial persisted value, so url_program on reconnect can produce a stale sinceTime that moves backwards. Fall back to s.cursor only when state["cursor"] has not yet been set (i.e. before the first message is processed). Apply the same fix to both the websocket and CrowdStrike paths. Add TestURLProgramReconnectZeroEvents covering the regression scenario: start with a persisted cursor, advance it, send a zero-events message, force a reconnect, and assert the reconnect URL uses the advanced cursor.
…#50383) (#50412) Previously url_program was evaluated once at the start of FollowStream and the resulting URL was reused for all reconnections within the session. This meant cursor changes from processed messages could not influence the URL used on reconnect. Move the getURL call into both reconnect paths (error recovery and OAuth2 token refresh) so that url_program sees the current cursor state. On evaluation failure, fall back to the previously computed URL rather than terminating the input. To make the evolved cursor visible to url_program, change process() to return the last known good cursor so callers can write it back into the shared state map. The previous signature only returned an error; the cursor updates from evalWith were confined to a shadowed local variable and never propagated to FollowStream's state. Also remove unnecessary bytes() wrapping from CEL programs in tests; state.response already supports decode_json() directly. Assisted-By: Cursor (claude-opus-4-6) (cherry picked from commit 1f2cf8d) Co-authored-by: Dan Kortschak <dan.kortschak@elastic.co>
…url_program on reconnect (#50410) Previously url_program was evaluated once at the start of FollowStream and the resulting URL was reused for all reconnections within the session. This meant cursor changes from processed messages could not influence the URL used on reconnect. Move the getURL call into both reconnect paths (error recovery and OAuth2 token refresh) so that url_program sees the current cursor state. On evaluation failure, fall back to the previously computed URL rather than terminating the input. To make the evolved cursor visible to url_program, change process() to return the last known good cursor so callers can write it back into the shared state map. The previous signature only returned an error; the cursor updates from evalWith were confined to a shadowed local variable and never propagated to FollowStream's state. Also remove unnecessary bytes() wrapping from CEL programs in tests; state.response already supports decode_json() directly. Assisted-By: Cursor (claude-opus-4-6) (cherry picked from commit 1f2cf8d) # Conflicts: # x-pack/filebeat/input/streaming/websocket.go * resolve conflicts --------- Co-authored-by: Dan Kortschak <dan.kortschak@elastic.co>
…#50383) (#50411) Previously url_program was evaluated once at the start of FollowStream and the resulting URL was reused for all reconnections within the session. This meant cursor changes from processed messages could not influence the URL used on reconnect. Move the getURL call into both reconnect paths (error recovery and OAuth2 token refresh) so that url_program sees the current cursor state. On evaluation failure, fall back to the previously computed URL rather than terminating the input. To make the evolved cursor visible to url_program, change process() to return the last known good cursor so callers can write it back into the shared state map. The previous signature only returned an error; the cursor updates from evalWith were confined to a shadowed local variable and never propagated to FollowStream's state. Also remove unnecessary bytes() wrapping from CEL programs in tests; state.response already supports decode_json() directly. Assisted-By: Cursor (claude-opus-4-6) (cherry picked from commit 1f2cf8d) Co-authored-by: Dan Kortschak <dan.kortschak@elastic.co>
…elastic#50383) Previously url_program was evaluated once at the start of FollowStream and the resulting URL was reused for all reconnections within the session. This meant cursor changes from processed messages could not influence the URL used on reconnect. Move the getURL call into both reconnect paths (error recovery and OAuth2 token refresh) so that url_program sees the current cursor state. On evaluation failure, fall back to the previously computed URL rather than terminating the input. To make the evolved cursor visible to url_program, change process() to return the last known good cursor so callers can write it back into the shared state map. The previous signature only returned an error; the cursor updates from evalWith were confined to a shadowed local variable and never propagated to FollowStream's state. Also remove unnecessary bytes() wrapping from CEL programs in tests; state.response already supports decode_json() directly. Assisted-By: Cursor (claude-opus-4-6)
…elastic#50383) Previously url_program was evaluated once at the start of FollowStream and the resulting URL was reused for all reconnections within the session. This meant cursor changes from processed messages could not influence the URL used on reconnect. Move the getURL call into both reconnect paths (error recovery and OAuth2 token refresh) so that url_program sees the current cursor state. On evaluation failure, fall back to the previously computed URL rather than terminating the input. To make the evolved cursor visible to url_program, change process() to return the last known good cursor so callers can write it back into the shared state map. The previous signature only returned an error; the cursor updates from evalWith were confined to a shadowed local variable and never propagated to FollowStream's state. Also remove unnecessary bytes() wrapping from CEL programs in tests; state.response already supports decode_json() directly. Assisted-By: Cursor (claude-opus-4-6)
…elastic#50383) Previously url_program was evaluated once at the start of FollowStream and the resulting URL was reused for all reconnections within the session. This meant cursor changes from processed messages could not influence the URL used on reconnect. Move the getURL call into both reconnect paths (error recovery and OAuth2 token refresh) so that url_program sees the current cursor state. On evaluation failure, fall back to the previously computed URL rather than terminating the input. To make the evolved cursor visible to url_program, change process() to return the last known good cursor so callers can write it back into the shared state map. The previous signature only returned an error; the cursor updates from evalWith were confined to a shadowed local variable and never propagated to FollowStream's state. Also remove unnecessary bytes() wrapping from CEL programs in tests; state.response already supports decode_json() directly. Assisted-By: Cursor (claude-opus-4-6)
Proposed commit message
Checklist
stresstest.shscript to run them under stress conditions and race detector to verify their stability../changelog/fragmentsusing the changelog tool.Disruptive User Impact
How to test this PR locally
Related issues
Use cases
Screenshots
Logs