Skip to content

Commit 34c8b3d

Browse files
committed
kgo: add AddConsumePartitions and RemoveConsumePartitions
There are some catches, but this isn't too bad. Closes #475.
1 parent b5cafba commit 34c8b3d

File tree

4 files changed

+172
-5
lines changed

4 files changed

+172
-5
lines changed

‎pkg/kgo/consumer.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -749,6 +749,91 @@ func (cl *Client) AddConsumeTopics(topics ...string) {
749749
cl.triggerUpdateMetadataNow("from AddConsumeTopics")
750750
}
751751

752+
// AddConsumePartitions adds new partitions to be consumed at the given
753+
// offsets. This function works only for direct, non-regex consumers.
754+
func (cl *Client) AddConsumePartitions(partitions map[string]map[int32]Offset) {
755+
c := &cl.consumer
756+
if c.d == nil || cl.cfg.regex {
757+
return
758+
}
759+
var topics []string
760+
for t, ps := range partitions {
761+
if len(ps) == 0 {
762+
delete(partitions, t)
763+
continue
764+
}
765+
topics = append(topics, t)
766+
}
767+
if len(partitions) == 0 {
768+
return
769+
}
770+
771+
c.mu.Lock()
772+
defer c.mu.Unlock()
773+
774+
c.d.tps.storeTopics(topics)
775+
for t, ps := range partitions {
776+
if c.d.ps[t] == nil {
777+
c.d.ps[t] = make(map[int32]Offset)
778+
}
779+
for p, o := range ps {
780+
c.d.m.add(t, p)
781+
c.d.ps[t][p] = o
782+
}
783+
}
784+
cl.triggerUpdateMetadataNow("from AddConsumePartitions")
785+
}
786+
787+
// RemoveConsumePartitions removes partitions from being consumed. This
788+
// function works only for direct, non-regex consumers.
789+
//
790+
// This method does not purge the concept of any topics from the client -- if
791+
// you remove all partitions from a topic that was being consumed, metadata
792+
// fetches will still occur for the topic. If you want to remove the topic
793+
// entirely, use PurgeTopicsFromClient.
794+
//
795+
// If you specified ConsumeTopics and this function removes all partitions for
796+
// a topic, the topic will no longer be consumed.
797+
func (cl *Client) RemoveConsumePartitions(partitions map[string][]int32) {
798+
c := &cl.consumer
799+
if c.d == nil || cl.cfg.regex {
800+
return
801+
}
802+
for t, ps := range partitions {
803+
if len(ps) == 0 {
804+
delete(partitions, t)
805+
continue
806+
}
807+
}
808+
if len(partitions) == 0 {
809+
return
810+
}
811+
812+
c.mu.Lock()
813+
defer c.mu.Unlock()
814+
815+
removeOffsets := make(map[string]map[int32]Offset, len(partitions))
816+
for t, ps := range partitions {
817+
removePartitionOffsets := make(map[int32]Offset, len(ps))
818+
for _, p := range ps {
819+
removePartitionOffsets[p] = Offset{}
820+
}
821+
removeOffsets[t] = removePartitionOffsets
822+
}
823+
824+
c.assignPartitions(removeOffsets, assignInvalidateMatching, c.d.tps, fmt.Sprintf("remove of %v requested", partitions))
825+
for t, ps := range partitions {
826+
for _, p := range ps {
827+
c.d.using.remove(t, p)
828+
c.d.m.remove(t, p)
829+
delete(c.d.ps[t], p)
830+
}
831+
if len(c.d.ps[t]) == 0 {
832+
delete(c.d.ps, t)
833+
}
834+
}
835+
}
836+
752837
// assignHow controls how assignPartitions operates.
753838
type assignHow int8
754839

‎pkg/kgo/consumer_direct.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,11 @@ package kgo
22

33
type directConsumer struct {
44
cfg *cfg
5-
tps *topicsPartitions // data for topics that the user assigned
6-
using mtmps // topics we are currently using
7-
m mtmps // mirrors cfg.topics and cfg.partitions, but can change with Purge or Add
8-
reSeen map[string]bool // topics we evaluated against regex, and whether we want them or not
5+
tps *topicsPartitions // data for topics that the user assigned
6+
using mtmps // topics we are currently using
7+
m mtmps // mirrors cfg.topics and cfg.partitions, but can change with Purge or Add
8+
ps map[string]map[int32]Offset // mirrors cfg.partitions, changed in Purge or Add
9+
reSeen map[string]bool // topics we evaluated against regex, and whether we want them or not
910
}
1011

1112
func (c *consumer) initDirect() {
@@ -15,6 +16,7 @@ func (c *consumer) initDirect() {
1516
reSeen: make(map[string]bool),
1617
using: make(mtmps),
1718
m: make(mtmps),
19+
ps: make(map[string]map[int32]Offset),
1820
}
1921
c.d = d
2022

@@ -28,6 +30,11 @@ func (c *consumer) initDirect() {
2830
for partition := range partitions {
2931
d.m.add(topic, partition)
3032
}
33+
p := make(map[int32]Offset, len(partitions))
34+
for partition, offset := range partitions {
35+
p[partition] = offset
36+
}
37+
d.ps[topic] = p
3138
}
3239
for topic := range d.cfg.topics {
3340
topics = append(topics, topic)
@@ -106,7 +113,7 @@ func (d *directConsumer) findNewAssignments() map[string]map[int32]Offset {
106113
// we set those. We only use partitions from topics that have
107114
// not been purged.
108115
for topic := range d.m {
109-
for partition, offset := range d.cfg.partitions[topic] {
116+
for partition, offset := range d.ps[topic] {
110117
toUseTopic, exists := toUse[topic]
111118
if !exists {
112119
toUseTopic = make(map[int32]Offset, 10)

‎pkg/kgo/consumer_direct_test.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package kgo
33
import (
44
"context"
55
"fmt"
6+
"sort"
67
"testing"
78
"time"
89
)
@@ -199,3 +200,63 @@ func TestIssue434(t *testing.T) {
199200
}
200201
}
201202
}
203+
204+
func TestAddRemovePartitions(t *testing.T) {
205+
t.Parallel()
206+
207+
t1, cleanup := tmpTopicPartitions(t, 2)
208+
defer cleanup()
209+
210+
cl, _ := NewClient(
211+
getSeedBrokers(),
212+
UnknownTopicRetries(-1),
213+
RecordPartitioner(ManualPartitioner()),
214+
FetchMaxWait(100*time.Millisecond),
215+
)
216+
defer cl.Close()
217+
218+
if err := cl.ProduceSync(context.Background(),
219+
&Record{Topic: t1, Partition: 0, Value: []byte("v1")},
220+
&Record{Topic: t1, Partition: 1, Value: []byte("v2")},
221+
&Record{Topic: t1, Partition: 1, Value: []byte("v3")},
222+
).FirstErr(); err != nil {
223+
t.Fatal(err)
224+
}
225+
226+
cl.AddConsumePartitions(map[string]map[int32]Offset{
227+
t1: {0: NewOffset().At(0)},
228+
})
229+
230+
recs := cl.PollFetches(context.Background()).Records()
231+
if len(recs) != 1 || string(recs[0].Value) != "v1" {
232+
t.Fatalf("expected to see v1, got %v", recs)
233+
}
234+
235+
cl.RemoveConsumePartitions(map[string][]int32{
236+
t1: {0, 1, 2},
237+
"t2": {0, 1, 2},
238+
})
239+
240+
cl.AddConsumePartitions(map[string]map[int32]Offset{
241+
t1: {
242+
0: NewOffset().At(0),
243+
1: NewOffset().At(1),
244+
},
245+
})
246+
247+
recs = recs[:0]
248+
for len(recs) < 2 {
249+
recs = append(recs, cl.PollFetches(context.Background()).Records()...)
250+
}
251+
if len(recs) > 2 {
252+
t.Fatalf("expected to see 2 records, got %v", recs)
253+
}
254+
255+
sort.Slice(recs, func(i, j int) bool {
256+
return recs[i].Partition < recs[j].Partition
257+
})
258+
259+
if string(recs[0].Value) != "v1" || string(recs[1].Value) != "v3" {
260+
t.Fatalf("expected to see v1 and v2, got %v", recs)
261+
}
262+
}

‎pkg/kgo/topics_and_partitions.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,20 @@ func (m mtmps) onlyt(t string) bool {
108108
return exists && len(ps) == 0
109109
}
110110

111+
func (m mtmps) remove(t string, p int32) {
112+
if m == nil {
113+
return
114+
}
115+
mps, exists := m[t]
116+
if !exists {
117+
return
118+
}
119+
delete(mps, p)
120+
if len(mps) == 0 {
121+
delete(m, t)
122+
}
123+
}
124+
111125
////////////
112126
// PAUSED // -- types for pausing topics and partitions
113127
////////////

0 commit comments

Comments
 (0)