-
Notifications
You must be signed in to change notification settings - Fork 14.5k
KAFKA-19440: Handle top-level errors in AlterShareGroupOffsets RPC #20049
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
*/ | ||
public KafkaFuture<Void> all() { | ||
return this.future.thenApply(topicPartitionErrorsMap -> { | ||
List<TopicPartition> partitionsFailed = topicPartitionErrorsMap.entrySet() | ||
.stream() | ||
.filter(e -> e.getValue() != Errors.NONE) | ||
.filter(e -> e.getValue() != null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the patch, just out of curiosity, would the AlterConsumerGroupOffsets RPC have the same issue?
kafka/clients/src/main/java/org/apache/kafka/clients/admin/AlterConsumerGroupOffsetsResult.java
Lines 68 to 79 in 05b6e81
return this.future.thenApply(topicPartitionErrorsMap -> { | |
List<TopicPartition> partitionsFailed = topicPartitionErrorsMap.entrySet() | |
.stream() | |
.filter(e -> e.getValue() != Errors.NONE) | |
.map(Map.Entry::getKey) | |
.collect(Collectors.toList()); | |
for (Errors error : topicPartitionErrorsMap.values()) { | |
if (error != Errors.NONE) { | |
throw error.exception( | |
"Failed altering group offsets for the following partitions: " + partitionsFailed); | |
} | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll check it out before I merge, but the important difference here is in KafkaApis.scala
. For DeleteSGO, it already handled a non-zero error code. For AlterSGO, that code was missing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, sorry. I didn't read your comment accurately. The difference is that AlterShareGroupOffsets can successfully pass back an error code, which is why this is in terms of ApiException
rather than Errors
. For AlterConsumerGroupOffsets
, the RPC is actually OffsetCommit
and this does not have an ErrorMessage
field at all. So, it cannot be fixed for consumer groups until we have a version bump on the OffsetCommit
RPC.
*/ | ||
public KafkaFuture<Void> all() { | ||
return this.future.thenApply(topicPartitionErrorsMap -> { | ||
List<TopicPartition> partitionsFailed = topicPartitionErrorsMap.entrySet() | ||
.stream() | ||
.filter(e -> e.getValue() != Errors.NONE) | ||
.filter(e -> e.getValue() != null) | ||
.map(Map.Entry::getKey) | ||
.collect(Collectors.toList()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this change to immutable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure it matters here. The list in internal to this method, and it is just converted to a string and appended to the exception message.
While testing the code in #19820, it
became clear that the error handling problems were due to the underlying
Admin API. This PR fixes the error handling for top-level errors in the
AlterShareGroupOffsets RPC.
Reviewers: Apoorv Mittal apoorvmittal10@gmail.com