Skip to content
Prev Previous commit
Next Next commit
Convert labels to bytes slice with a buffer to avoid allocations.
  • Loading branch information
jeschkies committed Dec 18, 2024
commit 19cfc527529be535b35995f1f6cb0d3a06190493
37 changes: 25 additions & 12 deletions pkg/logql/count_min_sketch.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"

"github.com/axiomhq/hyperloglog"
"github.com/cespare/xxhash/v2"

"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
Expand Down Expand Up @@ -157,8 +158,10 @@ type HeapCountMinSketchVector struct {
CountMinSketchVector

// internal set of observed events
observed map[string]struct{}
observed map[uint64]struct{}
maxLabels int

buffer []byte
}

func NewHeapCountMinSketchVector(ts int64, metricsLength, maxLabels int) HeapCountMinSketchVector {
Expand All @@ -174,21 +177,23 @@ func NewHeapCountMinSketchVector(ts int64, metricsLength, maxLabels int) HeapCou
F: f,
Metrics: make([]labels.Labels, 0, metricsLength+1),
},
observed: make(map[string]struct{}),
observed: make(map[uint64]struct{}),
maxLabels: maxLabels,
buffer: make([]byte, 0, 1024),
}
}

func (v *HeapCountMinSketchVector) Add(metric labels.Labels, value float64) {
// TODO: we save a lot of allocations by reusing the buffer inside metric.String
metricString := metric.String()
v.F.Add(metricString, value)
v.buffer = metric.Bytes(v.buffer)

v.F.Add(v.buffer, value)

// Add our metric if we haven't seen it
if _, ok := v.observed[metricString]; !ok {
id := xxhash.Sum64(v.buffer)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's already a hash function for labels.Labels we could use? I suppose what you're doing is actually better, since if we called metric.Bytes and metric.Hash we're iterating over the whole slice of labels twice

the only difference is prometheus' labels hash function limits the total size of the bytes used for the hash https://github.com/prometheus/prometheus/blob/main/model/labels/labels.go#L76-L87

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prometheus' labels hash function limits the total size of the bytes used for the hash

Not quite. It grows the bytes slice if it's not big enough. Unfortunately the function won't let one reuse the byte slice to avoid allocations. That's what I've found doing efac690

if _, ok := v.observed[id]; !ok {
heap.Push(v, metric)
v.observed[metricString] = struct{}{}
} else if v.Metrics[0].String() == metricString {
v.observed[id] = struct{}{}
} else if labels.Equal(v.Metrics[0], metric) {
// TODO: This check seems wrong.
// The smalles element has been updated to fix the heap.
heap.Fix(v, 0)
Expand All @@ -197,7 +202,9 @@ func (v *HeapCountMinSketchVector) Add(metric labels.Labels, value float64) {
// The maximum number of labels has been reached, so drop the smallest element.
if len(v.Metrics) > v.maxLabels {
metric := heap.Pop(v).(labels.Labels)
delete(v.observed, metric.String())
v.buffer = metric.Bytes(v.buffer)
id := xxhash.Sum64(v.buffer)
delete(v.observed, id)
}
}

Expand All @@ -206,8 +213,11 @@ func (v HeapCountMinSketchVector) Len() int {
}

func (v HeapCountMinSketchVector) Less(i, j int) bool {
left := v.F.Count(v.Metrics[i].String())
right := v.F.Count(v.Metrics[j].String())
v.buffer = v.Metrics[i].Bytes(v.buffer)
left := v.F.Count(v.buffer)

v.buffer = v.Metrics[j].Bytes(v.buffer)
right := v.F.Count(v.buffer)
return left < right
}

Expand Down Expand Up @@ -296,6 +306,7 @@ func (e *countMinSketchVectorAggEvaluator) Error() error {
type CountMinSketchVectorStepEvaluator struct {
exhausted bool
vec *CountMinSketchVector
buffer []byte
}

var _ StepEvaluator = NewQuantileSketchVectorStepEvaluator(nil, 0)
Expand All @@ -304,6 +315,7 @@ func NewCountMinSketchVectorStepEvaluator(vec *CountMinSketchVector) *CountMinSk
return &CountMinSketchVectorStepEvaluator{
exhausted: false,
vec: vec,
buffer: make([]byte, 0, 1024),
}
}

Expand All @@ -316,7 +328,8 @@ func (e *CountMinSketchVectorStepEvaluator) Next() (bool, int64, StepResult) {

for i, labels := range e.vec.Metrics {

f := e.vec.F.Count(labels.String())
e.buffer = labels.Bytes(e.buffer)
f := e.vec.F.Count(e.buffer)

vec[i] = promql.Sample{
T: e.vec.T,
Expand Down
3 changes: 2 additions & 1 deletion pkg/logql/count_min_sketch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ func TestCountMinSketchSerialization(t *testing.T) {
T: 42,
F: cms,
},
observed: make(map[string]struct{}, 0),
observed: make(map[uint64]struct{}, 0),
maxLabels: 10_000,
buffer: make([]byte, 0, 1024),
}
vec.Add(metric, 42.0)

Expand Down
16 changes: 8 additions & 8 deletions pkg/logql/sketch/cms.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package sketch

Check failure on line 1 in pkg/logql/sketch/cms.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

: # github.com/grafana/loki/v3/pkg/logql/sketch [github.com/grafana/loki/v3/pkg/logql/sketch.test]

import (
"fmt"
Expand All @@ -10,7 +10,7 @@
type CountMinSketch struct {
Depth, Width uint32
Counters [][]float64
HyperLogLog *hyperloglog.Sketch //hyperloglog.New16(),
HyperLogLog *hyperloglog.Sketch // hyperloglog.New16(),
}

// NewCountMinSketch creates a new CMS for a given width and depth.
Expand Down Expand Up @@ -46,8 +46,8 @@
}

// Add 'count' occurrences of the given input.
func (s *CountMinSketch) Add(event string, count float64) {
s.HyperLogLog.Insert(unsafeGetBytes(event))
func (s *CountMinSketch) Add(event []byte, count float64) {
s.HyperLogLog.Insert(event)
// see the comments in the hashn function for how using only 2
// hash functions rather than a function per row still fullfils
// the pairwise indendent hash functions requirement for CMS
Expand All @@ -58,7 +58,7 @@
}
}

func (s *CountMinSketch) Increment(event string) {
func (s *CountMinSketch) Increment(event []byte) {
s.Add(event, 1)
}

Expand All @@ -69,8 +69,8 @@
// value that's less than Count(h) + count rather than all counters that h hashed to.
// Returns the new estimate for the event as well as the both hashes which can be used
// to identify the event for other things that need a hash.
func (s *CountMinSketch) ConservativeAdd(event string, count float64) (float64, uint32, uint32) {
s.HyperLogLog.Insert(unsafeGetBytes(event))
func (s *CountMinSketch) ConservativeAdd(event []byte, count float64) (float64, uint32, uint32) {
s.HyperLogLog.Insert(event)

min := float64(math.MaxUint64)

Expand All @@ -94,12 +94,12 @@
return min, h1, h2
}

func (s *CountMinSketch) ConservativeIncrement(event string) (float64, uint32, uint32) {
func (s *CountMinSketch) ConservativeIncrement(event []byte) (float64, uint32, uint32) {
return s.ConservativeAdd(event, float64(1))
}

// Count returns the approximate min count for the given input.
func (s *CountMinSketch) Count(event string) float64 {
func (s *CountMinSketch) Count(event []byte) float64 {
min := float64(math.MaxUint64)
h1, h2 := hashn(event)

Expand Down
4 changes: 2 additions & 2 deletions pkg/logql/sketch/hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import "hash/fnv"
// SOFTWARE.
//
// <http://www.opensource.org/licenses/mit-license.php>
func hashn(s string) (h1, h2 uint32) {
func hashn(s []byte) (h1, h2 uint32) {
// This construction comes from
// http://www.eecs.harvard.edu/~michaelm/postscripts/tr-02-05.pdf
// "Building a Better Bloom Filter", by Kirsch and Mitzenmacher. Their
Expand All @@ -34,7 +34,7 @@ func hashn(s string) (h1, h2 uint32) {
// Empirically, though, this seems to work "just fine".

fnv1a := fnv.New32a()
fnv1a.Write([]byte(s))
fnv1a.Write(s)
h1 = fnv1a.Sum32()

// inlined jenkins one-at-a-time hash
Expand Down
18 changes: 9 additions & 9 deletions pkg/logql/sketch/topk.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,8 @@ func (t *Topk) heapMinReplace(event string, estimate float64, removed string) {
// updates the BF to ensure that the removed event won't be mistakenly thought
// to be in the heap, and updates the BF to ensure that we would get a truthy result for the added event
func (t *Topk) updateBF(removed, added string) {
r1, r2 := hashn(removed)
a1, a2 := hashn(added)
r1, r2 := hashn(unsafeGetBytes(removed))
a1, a2 := hashn(unsafeGetBytes(added))
var pos uint32
for i := range t.bf {
// removed event
Expand Down Expand Up @@ -230,7 +230,7 @@ func unsafeGetBytes(s string) []byte {
// for each node in the heap and rebalance the heap, and then if the event we're observing has an estimate that is still
// greater than the minimum heap element count, we should put this event into the heap and remove the other one.
func (t *Topk) Observe(event string) {
estimate, h1, h2 := t.sketch.ConservativeIncrement(event)
estimate, h1, h2 := t.sketch.ConservativeIncrement(unsafeGetBytes(event))
t.hll.Insert(unsafeGetBytes(event))

if t.InTopk(h1, h2) {
Expand All @@ -246,12 +246,12 @@ func (t *Topk) Observe(event string) {
var h1, h2 uint32
var pos uint32
for i := range *t.heap {
(*t.heap)[i].count = t.sketch.Count((*t.heap)[i].event)
(*t.heap)[i].count = t.sketch.Count(unsafeGetBytes((*t.heap)[i].event))
if i <= len(*t.heap)/2 {
heap.Fix(t.heap, i)
}
// ensure all the bf buckets are truthy for the event
h1, h2 = hashn((*t.heap)[i].event)
h1, h2 = hashn(unsafeGetBytes((*t.heap)[i].event))
for j := range t.bf {
pos = t.sketch.getPos(h1, h2, uint32(j))
t.bf[j][pos] = true
Expand Down Expand Up @@ -304,11 +304,11 @@ func (t *Topk) Merge(from *Topk) error {

var all TopKResult
for _, e := range *t.heap {
all = append(all, element{Event: e.event, Count: t.sketch.Count(e.event)})
all = append(all, element{Event: e.event, Count: t.sketch.Count(unsafeGetBytes(e.event))})
}

for _, e := range *from.heap {
all = append(all, element{Event: e.event, Count: t.sketch.Count(e.event)})
all = append(all, element{Event: e.event, Count: t.sketch.Count(unsafeGetBytes(e.event))})
}

all = removeDuplicates(all)
Expand All @@ -317,7 +317,7 @@ func (t *Topk) Merge(from *Topk) error {
var h1, h2 uint32
// TODO: merging should also potentially replace it's bloomfilter? or 0 everything in the bloomfilter
for _, e := range all[:t.max] {
h1, h2 = hashn(e.Event)
h1, h2 = hashn(unsafeGetBytes(e.Event))
t.heapPush(temp, e.Event, float64(e.Count), h1, h2)
}
t.heap = temp
Expand Down Expand Up @@ -347,7 +347,7 @@ func (t *Topk) Topk() TopKResult {
for _, e := range *t.heap {
res = append(res, element{
Event: e.event,
Count: t.sketch.Count(e.event),
Count: t.sketch.Count(unsafeGetBytes(e.event)),
})
}
sort.Sort(res)
Expand Down
4 changes: 2 additions & 2 deletions vendor/github.com/docker/go-metrics/namespace.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading