Skip to content
Merged
Prev Previous commit
Next Next commit
Use sub slices instead of channel to pass documents into goroutines
  • Loading branch information
philippgille committed Mar 12, 2024
commit ff7e80fb0f476f0985d0efad35c7ab9ade136b6d
34 changes: 15 additions & 19 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,23 @@ func calcDocSimilarity(ctx context.Context, queryVectors []float32, docs []*Docu
}

wg := sync.WaitGroup{}
docChan := make(chan *Document, concurrency*2)
// Instead of using a channel to pass documents into the goroutines, we just
// split the slice into sub-slices and pass those to the goroutines.
// This turned out to be faster in the query benchmarks.
subSliceSize := len(docs) / concurrency // Can leave remainder, e.g. 10/3 = 3; leaves 1
rem := len(docs) % concurrency
for i := 0; i < concurrency; i++ {
start := i * subSliceSize
end := start + subSliceSize
// Add remainder to last goroutine
if i == concurrency-1 {
end += rem
}

wg.Add(1)
go func() {
go func(subSlice []*Document) {
defer wg.Done()
for doc := range docChan {
for _, doc := range subSlice {
// Stop work if another goroutine encountered an error.
if ctx.Err() != nil {
return
Expand All @@ -145,24 +156,9 @@ func calcDocSimilarity(ctx context.Context, queryVectors []float32, docs []*Docu
similarities = append(similarities, &docSim{docID: doc.ID, similarity: sim})
similaritiesLock.Unlock()
}
}()
}(docs[start:end])
}

OuterLoop:
for _, doc := range docs {
// The doc channel has limited capacity, so writing to the channel blocks
// when a goroutine runs into an error and then all goroutines stop processing
// the channel and it gets full.
// To avoid a deadlock we check for ctx.Done() here, which is closed by
// the goroutine that encountered the error.
select {
case docChan <- doc:
case <-ctx.Done():
break OuterLoop
}
}
close(docChan)

wg.Wait()

if sharedErr != nil {
Expand Down