Skip to content

Commit 5d8220c

Browse files
authored
fix: data race / nil channel read in pattern aggregation push (#15410)
1 parent bc3d994 commit 5d8220c

File tree

2 files changed

+10
-5
lines changed

2 files changed

+10
-5
lines changed

‎pkg/pattern/aggregation/push.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,9 @@ type Push struct {
5959
contentType string
6060
logger log.Logger
6161

62-
// shutdown channels
63-
quit chan struct{}
62+
running sync.WaitGroup
63+
quit chan struct{}
64+
quitOnce sync.Once
6465

6566
// auth
6667
username, password string
@@ -149,6 +150,7 @@ func NewPush(
149150
metrics: metrics,
150151
}
151152

153+
p.running.Add(1)
152154
go p.run(pushPeriod)
153155
return p, nil
154156
}
@@ -160,10 +162,10 @@ func (p *Push) WriteEntry(ts time.Time, e string, lbls labels.Labels) {
160162

161163
// Stop will cancel any ongoing requests and stop the goroutine listening for requests
162164
func (p *Push) Stop() {
163-
if p.quit != nil {
165+
p.quitOnce.Do(func() {
164166
close(p.quit)
165-
p.quit = nil
166-
}
167+
})
168+
p.running.Wait()
167169
}
168170

169171
// buildPayload creates the snappy compressed protobuf to send to Loki
@@ -244,6 +246,8 @@ func (p *Push) buildPayload(ctx context.Context) ([]byte, error) {
244246

245247
// run pulls lines out of the channel and sends them to Loki
246248
func (p *Push) run(pushPeriod time.Duration) {
249+
defer p.running.Done()
250+
247251
ctx, cancel := context.WithCancel(context.Background())
248252
pushTicker := time.NewTimer(pushPeriod)
249253
defer pushTicker.Stop()

‎pkg/pattern/aggregation/push_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ func Test_Push(t *testing.T) {
169169
lbls2,
170170
)
171171

172+
p.running.Add(1)
172173
go p.run(time.Nanosecond)
173174

174175
select {

0 commit comments

Comments
 (0)