-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Ensure partition-reader starts up correctly #14845
Conversation
@@ -103,6 +103,9 @@ func (p *Reader) start(ctx context.Context) error { | |||
|
|||
// We manage our commits manually, so we must fetch the last offset for our consumer group to find out where to read from. | |||
lastCommittedOffset := p.fetchLastCommittedOffset(ctx) | |||
if lastCommittedOffset > 0 { | |||
lastCommittedOffset++ // We want to begin to read from the next offset, but only if we've previously committed an offset. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked Mimir and I can see that they do this too! 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does this compare with the comment on Line 379:
lastProducedOffset = lastProducedOffset - 1 // Kafka returns the next empty offset so we must subtract 1 to get the oldest written offset.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If Kafka returns the next empty offset, why do we need to add 1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's the last produced offset, vs last committed offset. Apparently they behave slightly differently!
What this PR does / why we need it:
This fixes an edge case where there are no new records available in Kafka during start up, which would cause a deadlock.
It also ensures we don't process the last comitted offset again at startup because we already processed and committed it.
Checklist
CONTRIBUTING.md
guide (required)feat
PRs are unlikely to be accepted unless a case can be made for the feature actually being a bug fix to existing behavior.docs/sources/setup/upgrade/_index.md
deprecated-config.yaml
anddeleted-config.yaml
files respectively in thetools/deprecated-config-checker
directory. Example PR