Skip to content

Commit 00e4e76

Browse files
committed
kgo: tolerate buggy v1 group member metadata
Closes #493.
1 parent 34c8b3d commit 00e4e76

File tree

1 file changed

+17
-2
lines changed

1 file changed

+17
-2
lines changed

‎pkg/kgo/group_balancer.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -204,8 +204,23 @@ func NewConsumerBalancer(balance ConsumerBalancerBalance, members []kmsg.JoinGro
204204
for i, member := range members {
205205
meta := &b.metadatas[i]
206206
meta.Default()
207-
if err := meta.ReadFrom(member.ProtocolMetadata); err != nil {
208-
return nil, fmt.Errorf("unable to read member metadata: %v", err)
207+
memberMeta := member.ProtocolMetadata
208+
if err := meta.ReadFrom(memberMeta); err != nil {
209+
// Some buggy clients claimed support for v1 but then
210+
// did not add OwnedPartitions, resulting in a short
211+
// metadata. If we fail at reading and the version is
212+
// v1, we retry again as v0. We do not support other
213+
// versions because hopefully other clients stop
214+
// claiming higher and higher version support and not
215+
// actually supporting them. Sarama has a similarish
216+
// workaround. See #493.
217+
if bytes.HasPrefix(memberMeta, []byte{0, 1}) {
218+
memberMeta[0] = 0
219+
memberMeta[0] = 0
220+
if err = meta.ReadFrom(memberMeta); err != nil {
221+
return nil, fmt.Errorf("unable to read member metadata: %v", err)
222+
}
223+
}
209224
}
210225
for _, topic := range meta.Topics {
211226
b.topics[topic] = struct{}{}

0 commit comments

Comments
 (0)