Skip to content

chore: Simplify API of topk heap #18126

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 1 addition & 14 deletions pkg/dataobj/querier/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"io"
"slices"
"sync"

"github.com/grafana/loki/v3/pkg/dataobj/sections/logs"
Expand Down Expand Up @@ -147,19 +146,7 @@ func lessFn(direction logproto.Direction) func(a, b entryWithLabels) bool {
// heapIterator creates a new EntryIterator for the given topk heap. After
// calling heapIterator, h is emptied.
func heapIterator(h *topk.Heap[entryWithLabels]) iter.EntryIterator {
elems := h.PopAll()

// We need to reverse the order of the entries in the slice to maintain the order of logs we
// want to return:
//
// For FORWARD direction, we want smallest timestamps first (but the heap is
// ordered by largest timestamps first due to lessFn).
//
// For BACKWARD direction, we want largest timestamps first (but the heap is
// ordered by smallest timestamps first due to lessFn).
slices.Reverse(elems)

return &sliceIterator{entries: elems}
return &sliceIterator{entries: h.PopAll()}
}

type sliceIterator struct {
Expand Down
5 changes: 1 addition & 4 deletions pkg/engine/executor/dataobjscan.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,10 +249,7 @@ func (s *dataobjScan) read() (arrow.Record, error) {
rb := array.NewRecordBuilder(memory.NewGoAllocator(), schema)
defer rb.Release()

records := heap.PopAll()
slices.Reverse(records)

for _, record := range records {
for _, record := range heap.PopAll() {
for i := 0; i < schema.NumFields(); i++ {
field, builder := rb.Schema().Field(i), rb.Field(i)
s.appendToBuilder(builder, &field, &record)
Expand Down
9 changes: 5 additions & 4 deletions pkg/util/topk/topk.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,21 +43,22 @@ func (h *Heap[T]) Pop() (T, bool) {
return zero, false
}

return heap.Pop(heapImpl[T]{h}).(T), true
return heap.Pop(h.impl()).(T), true
}

// Len returns the current number of elements in the heap.
func (h *Heap[T]) Len() int { return len(h.values) }

// PopAll removes and returns all elements from the heap in sorted order.
// PopAll removes and returns all elements from the heap in sorted reversed order.
func (h *Heap[T]) PopAll() []T {
res := h.values

// Since the Less function is reversed, we need to "undo" it while sorting the elements.
slices.SortFunc(res, func(a, b T) int {
if h.Less(a, b) {
return -1
return 1
}
return 1
return -1
})

// Reset h.values to nil to avoid changes to the heap modifying the returned
Expand Down
35 changes: 30 additions & 5 deletions pkg/util/topk/topk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package topk_test

import (
"fmt"
"slices"
"sort"
"testing"

Expand All @@ -24,8 +23,6 @@ func ExampleHeap_greatest() {
}

actual := heap.PopAll()
slices.Reverse(actual) // Reverse to get in greatest-descending order.

fmt.Println(actual)
// Output: [9 8 7]
}
Expand All @@ -43,12 +40,40 @@ func ExampleHeap_least() {
}

actual := heap.PopAll()
slices.Reverse(actual) // Reverse to get in least-ascending order.

fmt.Println(actual)
// Output: [0 1 2]
}

func TestHeap_PopAll_smallest_ascending(t *testing.T) {
heap := &topk.Heap[int]{
Limit: 3,
Less: func(a, b int) bool { return a > b },
}

for i := range 10 {
heap.Push(i)
}

actual := heap.PopAll()
expected := []int{0, 1, 2}
require.Equal(t, expected, actual)
}

func TestHeap_PopAll_greatest_descending(t *testing.T) {
heap := &topk.Heap[int]{
Limit: 3,
Less: func(a, b int) bool { return a < b },
}

for i := range 10 {
heap.Push(i)
}

actual := heap.PopAll()
expected := []int{9, 8, 7}
require.Equal(t, expected, actual)
}

func TestHeap_Range(t *testing.T) {
heap := &topk.Heap[int]{
Limit: 3,
Expand Down