Skip to content

Commit c3b083b

Browse files
committed
kgo: do not returned paused topics/partitions after pausing
This causes slightly more work (for simplicity, we drop everything buffered and kill all in flight fetch requests), but this is much easier to reason about. Closes #489.
1 parent e224e90 commit c3b083b

File tree

2 files changed

+76
-12
lines changed

2 files changed

+76
-12
lines changed

‎pkg/kgo/consumer.go

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -545,11 +545,9 @@ func (cl *Client) UpdateFetchMaxBytes(maxBytes, maxPartBytes int32) {
545545
// PauseFetchTopics sets the client to no longer fetch the given topics and
546546
// returns all currently paused topics. Paused topics persist until resumed.
547547
// You can call this function with no topics to simply receive the list of
548-
// currently paused topics.
549-
//
550-
// In contrast to the canonical Java client, this function does not clear
551-
// anything currently buffered. Buffered fetches containing paused topics are
552-
// still returned from polling.
548+
// currently paused topics. Pausing topics drops everything currently buffered
549+
// and kills any in flight fetch requests to ensure nothing that is paused
550+
// can be returned anymore from polling.
553551
//
554552
// Pausing topics is independent from pausing individual partitions with the
555553
// PauseFetchPartitions method. If you pause partitions for a topic with
@@ -564,6 +562,7 @@ func (cl *Client) PauseFetchTopics(topics ...string) []string {
564562

565563
c.pausedMu.Lock()
566564
defer c.pausedMu.Unlock()
565+
defer c.assignPartitions(nil, assignBumpSession, nil, fmt.Sprintf("pausing fetch topics %v", topics))
567566

568567
paused := c.clonePaused()
569568
paused.addTopics(topics...)
@@ -574,11 +573,9 @@ func (cl *Client) PauseFetchTopics(topics ...string) []string {
574573
// PauseFetchPartitions sets the client to no longer fetch the given partitions
575574
// and returns all currently paused partitions. Paused partitions persist until
576575
// resumed. You can call this function with no partitions to simply receive the
577-
// list of currently paused partitions.
578-
//
579-
// In contrast to the canonical Java client, this function does not clear
580-
// anything currently buffered. Buffered fetches containing paused partitions
581-
// are still returned from polling.
576+
// list of currently paused partitions. Pausing partitions drops everything
577+
// currently buffered and kills any in flight fetch requests to ensure nothing
578+
// that is paused can be returned anymore from polling.
582579
//
583580
// Pausing individual partitions is independent from pausing topics with the
584581
// PauseFetchTopics method. If you pause partitions for a topic with
@@ -593,6 +590,7 @@ func (cl *Client) PauseFetchPartitions(topicPartitions map[string][]int32) map[s
593590

594591
c.pausedMu.Lock()
595592
defer c.pausedMu.Unlock()
593+
defer c.assignPartitions(nil, assignBumpSession, nil, fmt.Sprintf("pausing fetch partitions %v", topicPartitions))
596594

597595
paused := c.clonePaused()
598596
paused.addPartitions(topicPartitions)
@@ -868,6 +866,10 @@ const (
868866
// The counterpart to assignInvalidateMatching, assignSetMatching
869867
// resets all matching partitions to the specified offset / epoch.
870868
assignSetMatching
869+
870+
// For pausing, we want to drop anything inflight. We start a new
871+
// session with the old tps.
872+
assignBumpSession
871873
)
872874

873875
func (h assignHow) String() string {
@@ -882,6 +884,8 @@ func (h assignHow) String() string {
882884
return "unassigning and purging any partition matching the input topics"
883885
case assignSetMatching:
884886
return "reassigning any currently assigned matching partition to the input"
887+
case assignBumpSession:
888+
return "bumping internal consumer session to drop anything currently in flight"
885889
}
886890
return ""
887891
}
@@ -952,6 +956,8 @@ func (c *consumer) assignPartitions(assignments map[string]map[int32]Offset, how
952956
// if we had no session before, which is why we need to pass in
953957
// our topicPartitions.
954958
session = c.guardSessionChange(tps)
959+
} else if how == assignBumpSession {
960+
loadOffsets, tps = c.stopSession()
955961
} else {
956962
loadOffsets, _ = c.stopSession()
957963

@@ -998,7 +1004,7 @@ func (c *consumer) assignPartitions(assignments map[string]map[int32]Offset, how
9981004
// assignment went straight to listing / epoch loading, and
9991005
// that list/epoch never finished.
10001006
switch how {
1001-
case assignWithoutInvalidating:
1007+
case assignWithoutInvalidating, assignBumpSession:
10021008
// Nothing to do -- this is handled above.
10031009
case assignInvalidateAll:
10041010
loadOffsets = listOrEpochLoads{}
@@ -1039,7 +1045,7 @@ func (c *consumer) assignPartitions(assignments map[string]map[int32]Offset, how
10391045

10401046
// This assignment could contain nothing (for the purposes of
10411047
// invalidating active fetches), so we only do this if needed.
1042-
if len(assignments) == 0 || how == assignInvalidateMatching || how == assignPurgeMatching || how == assignSetMatching {
1048+
if len(assignments) == 0 || how == assignInvalidateMatching || how == assignPurgeMatching || how == assignSetMatching || how == assignBumpSession {
10431049
return
10441050
}
10451051

‎pkg/kgo/consumer_direct_test.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,3 +260,61 @@ func TestAddRemovePartitions(t *testing.T) {
260260
t.Fatalf("expected to see v1 and v2, got %v", recs)
261261
}
262262
}
263+
264+
func TestPauseIssue489(t *testing.T) {
265+
t.Parallel()
266+
267+
t1, cleanup := tmpTopicPartitions(t, 2)
268+
defer cleanup()
269+
270+
cl, _ := NewClient(
271+
getSeedBrokers(),
272+
UnknownTopicRetries(-1),
273+
DefaultProduceTopic(t1),
274+
RecordPartitioner(ManualPartitioner()),
275+
ConsumeTopics(t1),
276+
FetchMaxWait(100*time.Millisecond),
277+
)
278+
defer cl.Close()
279+
280+
ctx, cancel := context.WithCancel(context.Background())
281+
go func() {
282+
var exit bool
283+
var zeroOne uint8
284+
for !exit {
285+
r := StringRecord("v")
286+
r.Partition = int32(zeroOne % 2)
287+
zeroOne++
288+
cl.Produce(ctx, r, func(r *Record, err error) {
289+
if err == context.Canceled {
290+
exit = true
291+
}
292+
})
293+
}
294+
}()
295+
defer cancel()
296+
297+
for i := 0; i < 10; i++ {
298+
var sawZero, sawOne bool
299+
for !sawZero || !sawOne {
300+
fs := cl.PollFetches(ctx)
301+
fs.EachRecord(func(r *Record) {
302+
sawZero = sawZero || r.Partition == 0
303+
sawOne = sawOne || r.Partition == 1
304+
})
305+
}
306+
cl.PauseFetchPartitions(map[string][]int32{t1: {0}})
307+
sawZero, sawOne = false, false
308+
for i := 0; i < 5; i++ {
309+
fs := cl.PollFetches(ctx)
310+
fs.EachRecord(func(r *Record) {
311+
sawZero = sawZero || r.Partition == 0
312+
sawOne = sawOne || r.Partition == 1
313+
})
314+
}
315+
if sawZero {
316+
t.Error("saw partition zero even though it was paused")
317+
}
318+
cl.ResumeFetchPartitions(map[string][]int32{t1: {0}})
319+
}
320+
}

0 commit comments

Comments
 (0)