Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Ensure partition-reader starts up correctly #14845

Merged
merged 2 commits into from
Nov 8, 2024

Conversation

benclive
Copy link
Contributor

@benclive benclive commented Nov 8, 2024

What this PR does / why we need it:

This fixes an edge case where there are no new records available in Kafka during start up, which would cause a deadlock.
It also ensures we don't process the last comitted offset again at startup because we already processed and committed it.

  • I also added a 30 second timeout & log line on each Poll call, which is very useful to debug why Loki isn't starting up sometimes!

Checklist

  • Reviewed the CONTRIBUTING.md guide (required)
  • Documentation added
  • Tests updated
  • Title matches the required conventional commits format, see here
    • Note that Promtail is considered to be feature complete, and future development for logs collection will be in Grafana Alloy. As such, feat PRs are unlikely to be accepted unless a case can be made for the feature actually being a bug fix to existing behavior.
  • Changes that require user attention or interaction to upgrade are documented in docs/sources/setup/upgrade/_index.md
  • If the change is deprecating or removing a configuration option, update the deprecated-config.yaml and deleted-config.yaml files respectively in the tools/deprecated-config-checker directory. Example PR
@benclive benclive requested a review from a team as a code owner November 8, 2024 12:38
@@ -103,6 +103,9 @@ func (p *Reader) start(ctx context.Context) error {

// We manage our commits manually, so we must fetch the last offset for our consumer group to find out where to read from.
lastCommittedOffset := p.fetchLastCommittedOffset(ctx)
if lastCommittedOffset > 0 {
lastCommittedOffset++ // We want to begin to read from the next offset, but only if we've previously committed an offset.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this compare with the comment on Line 379:

lastProducedOffset = lastProducedOffset - 1 // Kafka returns the next empty offset so we must subtract 1 to get the oldest written offset.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If Kafka returns the next empty offset, why do we need to add 1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's the last produced offset, vs last committed offset. Apparently they behave slightly differently!

@benclive benclive merged commit b2f3d2e into main Nov 8, 2024
57 checks passed
@benclive benclive deleted the fix-kafka-startup-duplicates branch November 8, 2024 14:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
2 participants