Skip to content

Commit 66c4839

Browse files
authored
chore(blooms): Reduce memory footprint of download queue (#18242)
The download queue has a fixed size of 100k elements and is implemented using a channel. Since the channel has a fixed size, this will allocate `capacity * object size`, where the object is `downloadRequest[BlockRef, BlockDirectory]`. This PR changes the channel from a channel of objects (`chan downloadRequest[T, R]`) to a channel of pointers (`chan *downloadRequest[T, R]`) which should significantly reduce the permanently allocated memory for the queue. Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
1 parent ab41c00 commit 66c4839

File tree

1 file changed

+5
-5
lines changed

1 file changed

+5
-5
lines changed

‎pkg/storage/stores/shipper/bloomshipper/fetcher.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -484,7 +484,7 @@ type downloadResponse[R any] struct {
484484
}
485485

486486
type downloadQueue[T any, R any] struct {
487-
queue chan downloadRequest[T, R]
487+
queue chan *downloadRequest[T, R]
488488
enqueued *swiss.Map[string, struct{}]
489489
enqueuedMutex sync.Mutex
490490
mu keymutex.KeyMutex
@@ -502,7 +502,7 @@ func newDownloadQueue[T any, R any](size, workers int, process processFunc[T, R]
502502
return nil, errors.New("queue requires at least 1 worker")
503503
}
504504
q := &downloadQueue[T, R]{
505-
queue: make(chan downloadRequest[T, R], size),
505+
queue: make(chan *downloadRequest[T, R], size),
506506
enqueued: swiss.NewMap[string, struct{}](uint32(size)),
507507
mu: keymutex.NewHashed(workers),
508508
done: make(chan struct{}),
@@ -518,7 +518,7 @@ func newDownloadQueue[T any, R any](size, workers int, process processFunc[T, R]
518518

519519
func (q *downloadQueue[T, R]) enqueue(t downloadRequest[T, R]) {
520520
if !t.async {
521-
q.queue <- t
521+
q.queue <- &t
522522
return
523523
}
524524
// for async task we attempt to dedupe task already in progress.
@@ -528,7 +528,7 @@ func (q *downloadQueue[T, R]) enqueue(t downloadRequest[T, R]) {
528528
return
529529
}
530530
select {
531-
case q.queue <- t:
531+
case q.queue <- &t:
532532
q.enqueued.Put(t.key, struct{}{})
533533
default:
534534
// todo we probably want a metric on dropped items
@@ -544,7 +544,7 @@ func (q *downloadQueue[T, R]) runWorker() {
544544
case <-q.done:
545545
return
546546
case task := <-q.queue:
547-
q.do(task.ctx, task)
547+
q.do(task.ctx, *task)
548548
}
549549
}
550550
}

0 commit comments

Comments
 (0)