Skip to content

KAFKA-19440: Handle top-level errors in AlterShareGroupOffsets RPC #20049

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: trunk
Choose a base branch
from

Conversation

AndrewJSchofield
Copy link
Member

@AndrewJSchofield AndrewJSchofield commented Jun 26, 2025

While testing the code in #19820, it
became clear that the error handling problems were due to the underlying
Admin API. This PR fixes the error handling for top-level errors in the
AlterShareGroupOffsets RPC.

Reviewers: Apoorv Mittal apoorvmittal10@gmail.com

*/
public KafkaFuture<Void> all() {
return this.future.thenApply(topicPartitionErrorsMap -> {
List<TopicPartition> partitionsFailed = topicPartitionErrorsMap.entrySet()
.stream()
.filter(e -> e.getValue() != Errors.NONE)
.filter(e -> e.getValue() != null)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the patch, just out of curiosity, would the AlterConsumerGroupOffsets RPC have the same issue?

return this.future.thenApply(topicPartitionErrorsMap -> {
List<TopicPartition> partitionsFailed = topicPartitionErrorsMap.entrySet()
.stream()
.filter(e -> e.getValue() != Errors.NONE)
.map(Map.Entry::getKey)
.collect(Collectors.toList());
for (Errors error : topicPartitionErrorsMap.values()) {
if (error != Errors.NONE) {
throw error.exception(
"Failed altering group offsets for the following partitions: " + partitionsFailed);
}
}

Copy link
Member Author

@AndrewJSchofield AndrewJSchofield Jun 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll check it out before I merge, but the important difference here is in KafkaApis.scala. For DeleteSGO, it already handled a non-zero error code. For AlterSGO, that code was missing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, sorry. I didn't read your comment accurately. The difference is that AlterShareGroupOffsets can successfully pass back an error code, which is why this is in terms of ApiException rather than Errors. For AlterConsumerGroupOffsets, the RPC is actually OffsetCommit and this does not have an ErrorMessage field at all. So, it cannot be fixed for consumer groups until we have a version bump on the OffsetCommit RPC.

*/
public KafkaFuture<Void> all() {
return this.future.thenApply(topicPartitionErrorsMap -> {
List<TopicPartition> partitionsFailed = topicPartitionErrorsMap.entrySet()
.stream()
.filter(e -> e.getValue() != Errors.NONE)
.filter(e -> e.getValue() != null)
.map(Map.Entry::getKey)
.collect(Collectors.toList());
Copy link
Collaborator

@TaiJuWu TaiJuWu Jun 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this change to immutable?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure it matters here. The list in internal to this method, and it is just converted to a string and appended to the exception message.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
3 participants