Skip to content

KAFKA-19435: Optimize kafka-consumer-groups.sh to return the offset info when some partitions without leaders #20064

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: trunk
Choose a base branch
from

Conversation

xijiu
Copy link
Collaborator

@xijiu xijiu commented Jun 29, 2025

  1. Optimize the corresponding logic in the ConsumerGroupCommand by
    first checking if a leader exists for the partition before invoking the
    admin.listOffsets. Finally, concatenate the data and return
  2. Add integration test, create a cluster with 3 brokers, then shutdown
    a broker and observe whether the output meets the expectations
@github-actions github-actions bot added the triage PRs from the community label Jun 29, 2025
@xijiu xijiu requested a review from FrankYang0529 June 29, 2025 04:55
@github-actions github-actions bot added tools small Small PRs labels Jun 29, 2025
@xijiu xijiu added ci-approved and removed small Small PRs labels Jun 29, 2025
@xijiu xijiu changed the title KAFKA-10357: Optimize kafka-consumer-groups.sh to return the offset info of other partitions even when the leader of some partitions are missing Jun 29, 2025
@xijiu xijiu changed the title KAFKA-10357: Optimize kafka-consumer-groups.sh to return the offset info when some partitions without leaders Jun 29, 2025
// shutdown the target broker
int noneLeaderPartition = 2;
int shutdownBrokerId = clusterInstance.getLeaderBrokerId(new TopicPartition(topic, noneLeaderPartition));
clusterInstance.shutdownBroker(shutdownBrokerId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shutting down the broker could cause some admin APIs to hang, so could you please add timeout to this test in order to avoid impacting CI?

Copy link
Collaborator

@m1a2st m1a2st left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @xijiu for this patch, left some comments, and some tests are fail, PTAL

}

// append the command
List<String> cgcArgs = new ArrayList<>(List.of("--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group, "--all-topics"));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
List<String> cgcArgs = new ArrayList<>(List.of("--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group, "--all-topics"));
List<String> cgcArgs = List.of("--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group, "--all-topics");

private Set<TopicPartition> filterNoneLeaderPartitions(List<TopicPartition> topicPartitions) {
// collect all topics
Set<String> topics = topicPartitions.stream().map(TopicPartition::topic).collect(Collectors.toSet());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to convert it to a set here, since AdminClient.describeTopics doesn’t handle duplicate topic names anyway

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we still need to map to TopicPartition::topic, the current approach is acceptable.

@xijiu
Copy link
Collaborator Author

xijiu commented Jun 29, 2025

@chia7712 @m1a2st Thanks for CR, and I have fixed them including the failed tests, PTAL

&& Arrays.stream(lines[1].trim().split("\\s+")).toList().equals(partition0content)
&& Arrays.stream(lines[2].trim().split("\\s+")).toList().equals(partition1content)
&& Arrays.stream(lines[3].trim().split("\\s+")).toList().equals(partition2content);
}, "Expected a data row and no error in describe groups when a broker shutdown.");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, the topic has 3 partitions, so there're 3 data rows.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@FrankYang0529 Yeah, I have fixed it, PTAL

@github-actions github-actions bot removed the triage PRs from the community label Jun 30, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
4 participants