@@ -545,11 +545,9 @@ func (cl *Client) UpdateFetchMaxBytes(maxBytes, maxPartBytes int32) {
545
545
// PauseFetchTopics sets the client to no longer fetch the given topics and
546
546
// returns all currently paused topics. Paused topics persist until resumed.
547
547
// You can call this function with no topics to simply receive the list of
548
- // currently paused topics.
549
- //
550
- // In contrast to the canonical Java client, this function does not clear
551
- // anything currently buffered. Buffered fetches containing paused topics are
552
- // still returned from polling.
548
+ // currently paused topics. Pausing topics drops everything currently buffered
549
+ // and kills any in flight fetch requests to ensure nothing that is paused
550
+ // can be returned anymore from polling.
553
551
//
554
552
// Pausing topics is independent from pausing individual partitions with the
555
553
// PauseFetchPartitions method. If you pause partitions for a topic with
@@ -564,6 +562,7 @@ func (cl *Client) PauseFetchTopics(topics ...string) []string {
564
562
565
563
c .pausedMu .Lock ()
566
564
defer c .pausedMu .Unlock ()
565
+ defer c .assignPartitions (nil , assignBumpSession , nil , fmt .Sprintf ("pausing fetch topics %v" , topics ))
567
566
568
567
paused := c .clonePaused ()
569
568
paused .addTopics (topics ... )
@@ -574,11 +573,9 @@ func (cl *Client) PauseFetchTopics(topics ...string) []string {
574
573
// PauseFetchPartitions sets the client to no longer fetch the given partitions
575
574
// and returns all currently paused partitions. Paused partitions persist until
576
575
// resumed. You can call this function with no partitions to simply receive the
577
- // list of currently paused partitions.
578
- //
579
- // In contrast to the canonical Java client, this function does not clear
580
- // anything currently buffered. Buffered fetches containing paused partitions
581
- // are still returned from polling.
576
+ // list of currently paused partitions. Pausing partitions drops everything
577
+ // currently buffered and kills any in flight fetch requests to ensure nothing
578
+ // that is paused can be returned anymore from polling.
582
579
//
583
580
// Pausing individual partitions is independent from pausing topics with the
584
581
// PauseFetchTopics method. If you pause partitions for a topic with
@@ -593,6 +590,7 @@ func (cl *Client) PauseFetchPartitions(topicPartitions map[string][]int32) map[s
593
590
594
591
c .pausedMu .Lock ()
595
592
defer c .pausedMu .Unlock ()
593
+ defer c .assignPartitions (nil , assignBumpSession , nil , fmt .Sprintf ("pausing fetch partitions %v" , topicPartitions ))
596
594
597
595
paused := c .clonePaused ()
598
596
paused .addPartitions (topicPartitions )
@@ -868,6 +866,10 @@ const (
868
866
// The counterpart to assignInvalidateMatching, assignSetMatching
869
867
// resets all matching partitions to the specified offset / epoch.
870
868
assignSetMatching
869
+
870
+ // For pausing, we want to drop anything inflight. We start a new
871
+ // session with the old tps.
872
+ assignBumpSession
871
873
)
872
874
873
875
func (h assignHow ) String () string {
@@ -882,6 +884,8 @@ func (h assignHow) String() string {
882
884
return "unassigning and purging any partition matching the input topics"
883
885
case assignSetMatching :
884
886
return "reassigning any currently assigned matching partition to the input"
887
+ case assignBumpSession :
888
+ return "bumping internal consumer session to drop anything currently in flight"
885
889
}
886
890
return ""
887
891
}
@@ -952,6 +956,8 @@ func (c *consumer) assignPartitions(assignments map[string]map[int32]Offset, how
952
956
// if we had no session before, which is why we need to pass in
953
957
// our topicPartitions.
954
958
session = c .guardSessionChange (tps )
959
+ } else if how == assignBumpSession {
960
+ loadOffsets , tps = c .stopSession ()
955
961
} else {
956
962
loadOffsets , _ = c .stopSession ()
957
963
@@ -998,7 +1004,7 @@ func (c *consumer) assignPartitions(assignments map[string]map[int32]Offset, how
998
1004
// assignment went straight to listing / epoch loading, and
999
1005
// that list/epoch never finished.
1000
1006
switch how {
1001
- case assignWithoutInvalidating :
1007
+ case assignWithoutInvalidating , assignBumpSession :
1002
1008
// Nothing to do -- this is handled above.
1003
1009
case assignInvalidateAll :
1004
1010
loadOffsets = listOrEpochLoads {}
@@ -1039,7 +1045,7 @@ func (c *consumer) assignPartitions(assignments map[string]map[int32]Offset, how
1039
1045
1040
1046
// This assignment could contain nothing (for the purposes of
1041
1047
// invalidating active fetches), so we only do this if needed.
1042
- if len (assignments ) == 0 || how == assignInvalidateMatching || how == assignPurgeMatching || how == assignSetMatching {
1048
+ if len (assignments ) == 0 || how == assignInvalidateMatching || how == assignPurgeMatching || how == assignSetMatching || how == assignBumpSession {
1043
1049
return
1044
1050
}
1045
1051
0 commit comments