Skip to content

Commit 06272c6

Browse files
committed
kgo: bugfix batched FindCoordinator requests against older brokers
All credit to @douglasbouttell for the excellent find here. Batched FindCoordinator requests against brokers that do not support batched requests no longer worked at of 1.19.3. The problem was the removal of pinReq, and well, a bug in the full removal. This fixes the problem and adds a test test that fails without the fix. I checked all linters I know of (opting into all revive, staticcheck, gocritic, golangci-lint linters) and none of them catch this, for valid reasons. It's hard to check. I then went back and forth with Claude to generate some code that would check it, and surprisingly came up with something. However, since it generates a _few_ false positives and I don't want to learn enough of the ast code to fully maintain the linter, I'm going to comment the code on #1034 and this PR and if anybody wants to take it and run with it, feel free. The one other variable the lint flagged was a false positive. Closes #1034.
1 parent ef502bc commit 06272c6

File tree

2 files changed

+30
-1
lines changed

2 files changed

+30
-1
lines changed

‎pkg/kgo/client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3227,7 +3227,7 @@ func (*findCoordinatorSharder) shard(_ context.Context, kreq kmsg.Request, lastE
32273227
sreq.CoordinatorType = req.CoordinatorType
32283228
sreq.CoordinatorKey = key
32293229
issues = append(issues, issueShard{
3230-
req: req,
3230+
req: sreq,
32313231
pin: &pinReq{pinMax: true, max: 3},
32323232
any: true,
32333233
})

‎pkg/kgo/client_test.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,3 +228,32 @@ func TestPing(t *testing.T) {
228228
t.Errorf("unable to ping: %v", err)
229229
}
230230
}
231+
232+
func TestIssue1034(t *testing.T) {
233+
t.Parallel()
234+
235+
cl, _ := newTestClient()
236+
defer cl.Close()
237+
238+
req := kmsg.NewPtrFindCoordinatorRequest()
239+
req.CoordinatorType = 0
240+
req.CoordinatorKeys = []string{"foo", "bar", "biz"}
241+
resp, err := req.RequestWith(context.Background(), cl)
242+
if err != nil {
243+
t.Fatalf("unable to request coordinators: %v", err)
244+
}
245+
need := make(map[string]struct{})
246+
for _, k := range req.CoordinatorKeys {
247+
need[k] = struct{}{}
248+
}
249+
for _, c := range resp.Coordinators {
250+
if c.Key == "" {
251+
t.Errorf("response coordinator erroneously has an empty key")
252+
continue
253+
}
254+
delete(need, c.Key)
255+
}
256+
if len(need) > 0 {
257+
t.Errorf("coordinator key responses missing: %v", need)
258+
}
259+
}

0 commit comments

Comments
 (0)