Skip to content

Commit 25084ea

Browse files
authored
Randomize sessions after aggregation (#2656)
1 parent 4486d81 commit 25084ea

File tree

2 files changed

+20
-3
lines changed

2 files changed

+20
-3
lines changed

‎pkg/distributor/distributor.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,6 @@ func (d *Distributor) PushParsed(ctx context.Context, req *distributormodel.Push
266266
req.TotalBytesUncompressed += int64(len(lbs.Value))
267267
}
268268
profName := phlaremodel.Labels(series.Labels).Get(ProfileName)
269-
series.Labels = d.limitMaxSessionsPerSeries(tenantID, series.Labels)
270269
for _, raw := range series.Samples {
271270
usagestats.NewCounter(fmt.Sprintf("distributor_profile_type_%s_received", profName)).Inc(1)
272271
d.profileReceivedStats.Inc(1)
@@ -385,6 +384,11 @@ func (d *Distributor) sendAggregatedProfile(ctx context.Context, req *distributo
385384
}
386385

387386
func (d *Distributor) sendRequests(ctx context.Context, req *distributormodel.PushRequest, tenantID string) (resp *connect.Response[pushv1.PushResponse], err error) {
387+
// Reduce cardinality of session_id label.
388+
for _, series := range req.Series {
389+
series.Labels = d.limitMaxSessionsPerSeries(tenantID, series.Labels)
390+
}
391+
388392
// Next we split profiles by labels. Newly allocated profiles should be closed after use.
389393
profileSeries, newProfiles := extractSampleSeries(req)
390394
defer func() {
@@ -509,7 +513,9 @@ func (d *Distributor) maybeAggregate(tenantID string, labels phlaremodel.Labels,
509513
if !ok {
510514
return nil, false, nil
511515
}
512-
r, ok, err := a.Aggregate(labels.Hash(), profile.TimeNanos, mergeProfile(profile))
516+
517+
k, _ := labels.HashWithoutLabels(make([]byte, 0, 1024), phlaremodel.LabelNameSessionID)
518+
r, ok, err := a.Aggregate(k, profile.TimeNanos, mergeProfile(profile))
513519
if err != nil {
514520
return nil, false, err
515521
}

‎pkg/distributor/distributor_test.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"context"
66
"errors"
77
"fmt"
8+
"math/rand"
89
"net/http"
910
"net/http/httptest"
1011
"os"
@@ -789,6 +790,7 @@ func TestPush_ShuffleSharding(t *testing.T) {
789790
}
790791

791792
func TestPush_Aggregation(t *testing.T) {
793+
const maxSessions = 16
792794
ingesterClient := newFakeIngester(t, false)
793795
d, err := New(
794796
Config{DistributorRing: ringConfig, PushTimeout: time.Second * 10},
@@ -798,6 +800,7 @@ func TestPush_Aggregation(t *testing.T) {
798800
l := validation.MockDefaultLimits()
799801
l.DistributorAggregationPeriod = model.Duration(time.Second)
800802
l.DistributorAggregationWindow = model.Duration(time.Second)
803+
l.MaxSessionsPerSeries = maxSessions
801804
tenantLimits["user-1"] = l
802805
}),
803806
nil, log.NewLogfmtLogger(os.Stdout),
@@ -824,6 +827,10 @@ func TestPush_Aggregation(t *testing.T) {
824827
{Name: "cluster", Value: "us-central1"},
825828
{Name: "client", Value: strconv.Itoa(i)},
826829
{Name: "__name__", Value: "cpu"},
830+
{
831+
Name: phlaremodel.LabelNameSessionID,
832+
Value: phlaremodel.SessionID(rand.Uint64()).String(),
833+
},
827834
},
828835
Samples: []*distributormodel.ProfileSample{
829836
{
@@ -844,11 +851,13 @@ func TestPush_Aggregation(t *testing.T) {
844851
d.asyncRequests.Wait()
845852

846853
var sum int64
854+
sessions := make(map[string]struct{})
847855
assert.GreaterOrEqual(t, len(ingesterClient.requests), 20)
848856
assert.Less(t, len(ingesterClient.requests), 100)
849857
for _, r := range ingesterClient.requests {
850858
for _, s := range r.Series {
851-
require.Len(t, s.Samples, 1)
859+
sessionID := phlaremodel.Labels(s.Labels).Get(phlaremodel.LabelNameSessionID)
860+
sessions[sessionID] = struct{}{}
852861
p, err := pprof2.RawFromBytes(s.Samples[0].RawProfile)
853862
require.NoError(t, err)
854863
for _, x := range p.Sample {
@@ -859,6 +868,8 @@ func TestPush_Aggregation(t *testing.T) {
859868

860869
// RF * samples_per_profile * clients * requests
861870
assert.Equal(t, int64(3*2*clients*requests), sum)
871+
assert.GreaterOrEqual(t, len(sessions), clients)
872+
assert.LessOrEqual(t, len(sessions), maxSessions+1)
862873
}
863874

864875
func testProfile(t int64) *profilev1.Profile {

0 commit comments

Comments
 (0)