Skip to content

Commit 04994ca

Browse files
authored
perf: Improve approx_topk performance by reducing allocations. (#15450)
**What this PR does / why we need it**: The metrics slice should keep a constant amount of memory by removing the smallest item when the maximum of labels is reached. ``` › benchstat before.log after.log goos: linux goarch: amd64 pkg: github.com/grafana/loki/v3/pkg/logql cpu: AMD Ryzen 7 3700X 8-Core Processor │ before.log │ after.log │ │ sec/op │ sec/op vs base │ _HeapCountMinSketchVectorAdd-16 839.0m ± 3% 418.9m ± 2% -50.07% (p=0.000 n=10) │ before.log │ after.log │ │ B/op │ B/op vs base │ _HeapCountMinSketchVectorAdd-16 72.58Mi ± 0% 12.10Mi ± 0% -83.33% (p=0.000 n=10) │ before.log │ after.log │ │ allocs/op │ allocs/op vs base │ _HeapCountMinSketchVectorAdd-16 4073.9k ± 0% 116.9k ± 0% -97.13% (p=0.000 n=10) ``` **Checklist** - [ ] Reviewed the [`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md) guide (**required**) - [ ] Documentation added - [x] Tests updated - [ ] Title matches the required conventional commits format, see [here](https://www.conventionalcommits.org/en/v1.0.0/) - **Note** that Promtail is considered to be feature complete, and future development for logs collection will be in [Grafana Alloy](https://github.com/grafana/alloy). As such, `feat` PRs are unlikely to be accepted unless a case can be made for the feature actually being a bug fix to existing behavior. - [ ] Changes that require user attention or interaction to upgrade are documented in `docs/sources/setup/upgrade/_index.md` - [ ] If the change is deprecating or removing a configuration option, update the `deprecated-config.yaml` and `deleted-config.yaml` files respectively in the `tools/deprecated-config-checker` directory. [Example PR](0d4416a)
1 parent 80e8e60 commit 04994ca

File tree

6 files changed

+90
-36
lines changed

6 files changed

+90
-36
lines changed

‎pkg/logql/count_min_sketch.go

Lines changed: 38 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66

77
"github.com/axiomhq/hyperloglog"
8+
"github.com/cespare/xxhash/v2"
89

910
"github.com/prometheus/prometheus/model/labels"
1011
"github.com/prometheus/prometheus/promql"
@@ -157,8 +158,13 @@ type HeapCountMinSketchVector struct {
157158
CountMinSketchVector
158159

159160
// internal set of observed events
160-
observed map[string]struct{}
161+
observed map[uint64]struct{}
161162
maxLabels int
163+
164+
// The buffers are used by `labels.Bytes` similar to `series.Hash` in `codec.MergeResponse`. They are alloccated
165+
// outside of the method in order to reuse them for the next `Add` call. This saves a lot of allocations.
166+
// 1KB is used for `b` after some experimentation. Reusing the buffer is not thread safe.
167+
buffer []byte
162168
}
163169

164170
func NewHeapCountMinSketchVector(ts int64, metricsLength, maxLabels int) HeapCountMinSketchVector {
@@ -172,31 +178,39 @@ func NewHeapCountMinSketchVector(ts int64, metricsLength, maxLabels int) HeapCou
172178
CountMinSketchVector: CountMinSketchVector{
173179
T: ts,
174180
F: f,
175-
Metrics: make([]labels.Labels, 0, metricsLength),
181+
Metrics: make([]labels.Labels, 0, metricsLength+1),
176182
},
177-
observed: make(map[string]struct{}),
183+
observed: make(map[uint64]struct{}),
178184
maxLabels: maxLabels,
185+
buffer: make([]byte, 0, 1024),
179186
}
180187
}
181188

182189
func (v *HeapCountMinSketchVector) Add(metric labels.Labels, value float64) {
183-
// TODO: we save a lot of allocations by reusing the buffer inside metric.String
184-
metricString := metric.String()
185-
v.F.Add(metricString, value)
190+
v.buffer = metric.Bytes(v.buffer)
191+
192+
v.F.Add(v.buffer, value)
186193

187194
// Add our metric if we haven't seen it
188-
if _, ok := v.observed[metricString]; !ok {
195+
196+
// TODO(karsten): There is a chance that the ids match but not the labels due to hash collision. Ideally there's
197+
// an else block the compares the series labels. However, that's not trivial. Besides, instance.Series has the
198+
// same issue in its deduping logic.
199+
id := xxhash.Sum64(v.buffer)
200+
if _, ok := v.observed[id]; !ok {
189201
heap.Push(v, metric)
190-
v.observed[metricString] = struct{}{}
191-
} else if v.Metrics[0].String() == metricString {
192-
// The smalles element has been updated to fix the heap.
202+
v.observed[id] = struct{}{}
203+
} else if labels.Equal(v.Metrics[0], metric) {
204+
// The smallest element has been updated to fix the heap.
193205
heap.Fix(v, 0)
194206
}
195207

196208
// The maximum number of labels has been reached, so drop the smallest element.
197209
if len(v.Metrics) > v.maxLabels {
198210
metric := heap.Pop(v).(labels.Labels)
199-
delete(v.observed, metric.String())
211+
v.buffer = metric.Bytes(v.buffer)
212+
id := xxhash.Sum64(v.buffer)
213+
delete(v.observed, id)
200214
}
201215
}
202216

@@ -205,8 +219,11 @@ func (v HeapCountMinSketchVector) Len() int {
205219
}
206220

207221
func (v HeapCountMinSketchVector) Less(i, j int) bool {
208-
left := v.F.Count(v.Metrics[i].String())
209-
right := v.F.Count(v.Metrics[j].String())
222+
v.buffer = v.Metrics[i].Bytes(v.buffer)
223+
left := v.F.Count(v.buffer)
224+
225+
v.buffer = v.Metrics[j].Bytes(v.buffer)
226+
right := v.F.Count(v.buffer)
210227
return left < right
211228
}
212229

@@ -295,6 +312,11 @@ func (e *countMinSketchVectorAggEvaluator) Error() error {
295312
type CountMinSketchVectorStepEvaluator struct {
296313
exhausted bool
297314
vec *CountMinSketchVector
315+
316+
// The buffers are used by `labels.Bytes` similar to `series.Hash` in `codec.MergeResponse`. They are alloccated
317+
// outside of the method in order to reuse them for the next `Next` call. This saves a lot of allocations.
318+
// 1KB is used for `b` after some experimentation. Reusing the buffer is not thread safe.
319+
buffer []byte
298320
}
299321

300322
var _ StepEvaluator = NewQuantileSketchVectorStepEvaluator(nil, 0)
@@ -303,6 +325,7 @@ func NewCountMinSketchVectorStepEvaluator(vec *CountMinSketchVector) *CountMinSk
303325
return &CountMinSketchVectorStepEvaluator{
304326
exhausted: false,
305327
vec: vec,
328+
buffer: make([]byte, 0, 1024),
306329
}
307330
}
308331

@@ -315,7 +338,8 @@ func (e *CountMinSketchVectorStepEvaluator) Next() (bool, int64, StepResult) {
315338

316339
for i, labels := range e.vec.Metrics {
317340

318-
f := e.vec.F.Count(labels.String())
341+
e.buffer = labels.Bytes(e.buffer)
342+
f := e.vec.F.Count(e.buffer)
319343

320344
vec[i] = promql.Sample{
321345
T: e.vec.T,

‎pkg/logql/count_min_sketch_test.go

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package logql
22

33
import (
4+
"fmt"
5+
"math/rand"
46
"testing"
57

68
"github.com/grafana/loki/v3/pkg/logproto"
@@ -57,8 +59,9 @@ func TestCountMinSketchSerialization(t *testing.T) {
5759
T: 42,
5860
F: cms,
5961
},
60-
observed: make(map[string]struct{}, 0),
62+
observed: make(map[uint64]struct{}, 0),
6163
maxLabels: 10_000,
64+
buffer: make([]byte, 0, 1024),
6265
}
6366
vec.Add(metric, 42.0)
6467

@@ -68,7 +71,7 @@ func TestCountMinSketchSerialization(t *testing.T) {
6871
Sketch: &logproto.CountMinSketch{
6972
Depth: 2,
7073
Width: 4,
71-
Counters: []float64{0, 0, 0, 42, 0, 42, 0, 0},
74+
Counters: []float64{0, 42, 0, 0, 0, 42, 0, 0},
7275
Hyperloglog: hllBytes,
7376
},
7477
Metrics: []*logproto.Labels{
@@ -86,3 +89,30 @@ func TestCountMinSketchSerialization(t *testing.T) {
8689
// The HeapCountMinSketchVector is serialized to a CountMinSketchVector.
8790
require.Equal(t, round, vec.CountMinSketchVector)
8891
}
92+
93+
func BenchmarkHeapCountMinSketchVectorAdd(b *testing.B) {
94+
maxLabels := 10_000
95+
v := NewHeapCountMinSketchVector(0, maxLabels, maxLabels)
96+
if len(v.Metrics) > maxLabels || cap(v.Metrics) > maxLabels+1 {
97+
b.Errorf("Length or capcity of metrics is too high: len=%d cap=%d", len(v.Metrics), cap(v.Metrics))
98+
}
99+
100+
eventsCount := 100_000
101+
uniqueEventsCount := 20_000
102+
events := make([]labels.Labels, eventsCount)
103+
for i := range events {
104+
events[i] = labels.Labels{{Name: "event", Value: fmt.Sprintf("%d", i%uniqueEventsCount)}}
105+
}
106+
107+
b.ResetTimer()
108+
b.ReportAllocs()
109+
110+
for n := 0; n < b.N; n++ {
111+
for _, event := range events {
112+
v.Add(event, rand.Float64())
113+
if len(v.Metrics) > maxLabels || cap(v.Metrics) > maxLabels+1 {
114+
b.Errorf("Length or capcity of metrics is too high: len=%d cap=%d", len(v.Metrics), cap(v.Metrics))
115+
}
116+
}
117+
}
118+
}

‎pkg/logql/sketch/cms.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import (
1010
type CountMinSketch struct {
1111
Depth, Width uint32
1212
Counters [][]float64
13-
HyperLogLog *hyperloglog.Sketch //hyperloglog.New16(),
13+
HyperLogLog *hyperloglog.Sketch // hyperloglog.New16(),
1414
}
1515

1616
// NewCountMinSketch creates a new CMS for a given width and depth.
@@ -46,8 +46,8 @@ func (s *CountMinSketch) getPos(h1, h2, row uint32) uint32 {
4646
}
4747

4848
// Add 'count' occurrences of the given input.
49-
func (s *CountMinSketch) Add(event string, count float64) {
50-
s.HyperLogLog.Insert(unsafeGetBytes(event))
49+
func (s *CountMinSketch) Add(event []byte, count float64) {
50+
s.HyperLogLog.Insert(event)
5151
// see the comments in the hashn function for how using only 2
5252
// hash functions rather than a function per row still fullfils
5353
// the pairwise indendent hash functions requirement for CMS
@@ -58,7 +58,7 @@ func (s *CountMinSketch) Add(event string, count float64) {
5858
}
5959
}
6060

61-
func (s *CountMinSketch) Increment(event string) {
61+
func (s *CountMinSketch) Increment(event []byte) {
6262
s.Add(event, 1)
6363
}
6464

@@ -69,8 +69,8 @@ func (s *CountMinSketch) Increment(event string) {
6969
// value that's less than Count(h) + count rather than all counters that h hashed to.
7070
// Returns the new estimate for the event as well as the both hashes which can be used
7171
// to identify the event for other things that need a hash.
72-
func (s *CountMinSketch) ConservativeAdd(event string, count float64) (float64, uint32, uint32) {
73-
s.HyperLogLog.Insert(unsafeGetBytes(event))
72+
func (s *CountMinSketch) ConservativeAdd(event []byte, count float64) (float64, uint32, uint32) {
73+
s.HyperLogLog.Insert(event)
7474

7575
min := float64(math.MaxUint64)
7676

@@ -94,12 +94,12 @@ func (s *CountMinSketch) ConservativeAdd(event string, count float64) (float64,
9494
return min, h1, h2
9595
}
9696

97-
func (s *CountMinSketch) ConservativeIncrement(event string) (float64, uint32, uint32) {
97+
func (s *CountMinSketch) ConservativeIncrement(event []byte) (float64, uint32, uint32) {
9898
return s.ConservativeAdd(event, float64(1))
9999
}
100100

101101
// Count returns the approximate min count for the given input.
102-
func (s *CountMinSketch) Count(event string) float64 {
102+
func (s *CountMinSketch) Count(event []byte) float64 {
103103
min := float64(math.MaxUint64)
104104
h1, h2 := hashn(event)
105105

‎pkg/logql/sketch/cms_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ func TestCMS(_ *testing.T) {
4343

4444
for _, e := range events {
4545
for i := 0; i < e.count; i++ {
46-
cms.ConservativeIncrement(e.name)
46+
cms.ConservativeIncrement(unsafeGetBytes(e.name))
4747
}
4848
}
4949
}

‎pkg/logql/sketch/hash.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import "hash/fnv"
2424
// SOFTWARE.
2525
//
2626
// <http://www.opensource.org/licenses/mit-license.php>
27-
func hashn(s string) (h1, h2 uint32) {
27+
func hashn(s []byte) (h1, h2 uint32) {
2828
// This construction comes from
2929
// http://www.eecs.harvard.edu/~michaelm/postscripts/tr-02-05.pdf
3030
// "Building a Better Bloom Filter", by Kirsch and Mitzenmacher. Their
@@ -34,7 +34,7 @@ func hashn(s string) (h1, h2 uint32) {
3434
// Empirically, though, this seems to work "just fine".
3535

3636
fnv1a := fnv.New32a()
37-
fnv1a.Write([]byte(s))
37+
fnv1a.Write(s)
3838
h1 = fnv1a.Sum32()
3939

4040
// inlined jenkins one-at-a-time hash

‎pkg/logql/sketch/topk.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -196,8 +196,8 @@ func (t *Topk) heapMinReplace(event string, estimate float64, removed string) {
196196
// updates the BF to ensure that the removed event won't be mistakenly thought
197197
// to be in the heap, and updates the BF to ensure that we would get a truthy result for the added event
198198
func (t *Topk) updateBF(removed, added string) {
199-
r1, r2 := hashn(removed)
200-
a1, a2 := hashn(added)
199+
r1, r2 := hashn(unsafeGetBytes(removed))
200+
a1, a2 := hashn(unsafeGetBytes(added))
201201
var pos uint32
202202
for i := range t.bf {
203203
// removed event
@@ -230,7 +230,7 @@ func unsafeGetBytes(s string) []byte {
230230
// for each node in the heap and rebalance the heap, and then if the event we're observing has an estimate that is still
231231
// greater than the minimum heap element count, we should put this event into the heap and remove the other one.
232232
func (t *Topk) Observe(event string) {
233-
estimate, h1, h2 := t.sketch.ConservativeIncrement(event)
233+
estimate, h1, h2 := t.sketch.ConservativeIncrement(unsafeGetBytes(event))
234234
t.hll.Insert(unsafeGetBytes(event))
235235

236236
if t.InTopk(h1, h2) {
@@ -246,12 +246,12 @@ func (t *Topk) Observe(event string) {
246246
var h1, h2 uint32
247247
var pos uint32
248248
for i := range *t.heap {
249-
(*t.heap)[i].count = t.sketch.Count((*t.heap)[i].event)
249+
(*t.heap)[i].count = t.sketch.Count(unsafeGetBytes((*t.heap)[i].event))
250250
if i <= len(*t.heap)/2 {
251251
heap.Fix(t.heap, i)
252252
}
253253
// ensure all the bf buckets are truthy for the event
254-
h1, h2 = hashn((*t.heap)[i].event)
254+
h1, h2 = hashn(unsafeGetBytes((*t.heap)[i].event))
255255
for j := range t.bf {
256256
pos = t.sketch.getPos(h1, h2, uint32(j))
257257
t.bf[j][pos] = true
@@ -304,11 +304,11 @@ func (t *Topk) Merge(from *Topk) error {
304304

305305
var all TopKResult
306306
for _, e := range *t.heap {
307-
all = append(all, element{Event: e.event, Count: t.sketch.Count(e.event)})
307+
all = append(all, element{Event: e.event, Count: t.sketch.Count(unsafeGetBytes(e.event))})
308308
}
309309

310310
for _, e := range *from.heap {
311-
all = append(all, element{Event: e.event, Count: t.sketch.Count(e.event)})
311+
all = append(all, element{Event: e.event, Count: t.sketch.Count(unsafeGetBytes(e.event))})
312312
}
313313

314314
all = removeDuplicates(all)
@@ -317,7 +317,7 @@ func (t *Topk) Merge(from *Topk) error {
317317
var h1, h2 uint32
318318
// TODO: merging should also potentially replace it's bloomfilter? or 0 everything in the bloomfilter
319319
for _, e := range all[:t.max] {
320-
h1, h2 = hashn(e.Event)
320+
h1, h2 = hashn(unsafeGetBytes(e.Event))
321321
t.heapPush(temp, e.Event, float64(e.Count), h1, h2)
322322
}
323323
t.heap = temp
@@ -347,7 +347,7 @@ func (t *Topk) Topk() TopKResult {
347347
for _, e := range *t.heap {
348348
res = append(res, element{
349349
Event: e.event,
350-
Count: t.sketch.Count(e.event),
350+
Count: t.sketch.Count(unsafeGetBytes(e.event)),
351351
})
352352
}
353353
sort.Sort(res)

0 commit comments

Comments
 (0)