-
Notifications
You must be signed in to change notification settings - Fork 14.5k
KAFKA-19435: Optimize kafka-consumer-groups.sh
to return the offset info when some partitions without leaders
#20064
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
kafka-consumer-groups.sh
to return the offset info of other partitions even when the leader of some partitions are missingkafka-consumer-groups.sh
to return the offset info when some partitions without leaders// shutdown the target broker | ||
int noneLeaderPartition = 2; | ||
int shutdownBrokerId = clusterInstance.getLeaderBrokerId(new TopicPartition(topic, noneLeaderPartition)); | ||
clusterInstance.shutdownBroker(shutdownBrokerId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shutting down the broker could cause some admin APIs to hang, so could you please add timeout to this test in order to avoid impacting CI?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @xijiu for this patch, left some comments, and some tests are fail, PTAL
} | ||
|
||
// append the command | ||
List<String> cgcArgs = new ArrayList<>(List.of("--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group, "--all-topics")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
List<String> cgcArgs = new ArrayList<>(List.of("--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group, "--all-topics")); | |
List<String> cgcArgs = List.of("--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group, "--all-topics"); |
|
||
private Set<TopicPartition> filterNoneLeaderPartitions(List<TopicPartition> topicPartitions) { | ||
// collect all topics | ||
Set<String> topics = topicPartitions.stream().map(TopicPartition::topic).collect(Collectors.toSet()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need to convert it to a set here, since AdminClient.describeTopics doesn’t handle duplicate topic names anyway
kafka/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
Line 2320 in a5a54dc
for (String topicName : topicNames) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we still need to map to TopicPartition::topic, the current approach is acceptable.
&& Arrays.stream(lines[1].trim().split("\\s+")).toList().equals(partition0content) | ||
&& Arrays.stream(lines[2].trim().split("\\s+")).toList().equals(partition1content) | ||
&& Arrays.stream(lines[3].trim().split("\\s+")).toList().equals(partition2content); | ||
}, "Expected a data row and no error in describe groups when a broker shutdown."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, the topic has 3 partitions, so there're 3 data rows.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@FrankYang0529 Yeah, I have fixed it, PTAL
ConsumerGroupCommand
byfirst checking if a leader exists for the partition before invoking the
admin.listOffsets
. Finally, concatenate the data and returna broker and observe whether the output meets the expectations