Skip to content

Commit dab5bb7

Browse files
ingester: use a 64-bit hash to avoid collisions (#5276) (#5302)
* ingester: use a 64-bit hash to avoid collisions We had this issue in the distributor, it it can happen in the ingester too with live traces. * Update changelog * Add test for live traces collision
1 parent 50e25b7 commit dab5bb7

File tree

3 files changed

+78
-21
lines changed

3 files changed

+78
-21
lines changed

‎CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
## main / unreleased
22

3+
* [BUGFIX] Fix ingester issue where a hash collision could lead to spans stored incorrectly [#5276](https://github.com/grafana/tempo/pull/5276) (@carles-grafana)
4+
35
# v2.8.0
46

57
* [CHANGE] **BREAKING CHANGE** Change default http-listen-port from 80 to 3200 [#4960](https://github.com/grafana/tempo/pull/4960) (@martialblog)

‎modules/ingester/instance.go

Lines changed: 6 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@ import (
66
"encoding/hex"
77
"errors"
88
"fmt"
9-
"hash"
10-
"hash/fnv"
119
"sort"
1210
"sync"
1311
"time"
@@ -25,6 +23,7 @@ import (
2523
"github.com/grafana/tempo/pkg/model/trace"
2624
"github.com/grafana/tempo/pkg/tempopb"
2725
"github.com/grafana/tempo/pkg/tracesizes"
26+
"github.com/grafana/tempo/pkg/util"
2827
"github.com/grafana/tempo/pkg/util/log"
2928
"github.com/grafana/tempo/pkg/validation"
3029
"github.com/grafana/tempo/tempodb"
@@ -79,7 +78,7 @@ var (
7978

8079
type instance struct {
8180
tracesMtx sync.Mutex
82-
traces map[uint32]*liveTrace
81+
traces map[uint64]*liveTrace
8382
traceSizes *tracesizes.Tracker
8483
traceSizeBytes uint64
8584

@@ -105,16 +104,14 @@ type instance struct {
105104
localReader backend.Reader
106105
localWriter backend.Writer
107106

108-
hash hash.Hash32
109-
110107
logger kitlog.Logger
111108
maxTraceLogger *log.RateLimitedLogger
112109
}
113110

114111
func newInstance(instanceID string, limiter Limiter, overrides ingesterOverrides, writer tempodb.Writer, l *local.Backend, dedicatedColumns backend.DedicatedColumns) (*instance, error) {
115112
logger := kitlog.With(log.Logger, "tenant", instanceID)
116113
i := &instance{
117-
traces: map[uint32]*liveTrace{},
114+
traces: map[uint64]*liveTrace{},
118115
traceSizes: tracesizes.New(),
119116

120117
instanceID: instanceID,
@@ -130,8 +127,6 @@ func newInstance(instanceID string, limiter Limiter, overrides ingesterOverrides
130127
localReader: backend.NewReader(l),
131128
localWriter: backend.NewWriter(l),
132129

133-
hash: fnv.New32(),
134-
135130
logger: logger,
136131
maxTraceLogger: log.NewRateLimitedLogger(maxTraceLogLinesPerSecond, level.Warn(logger)),
137132
}
@@ -213,7 +208,7 @@ func (i *instance) push(ctx context.Context, id, traceBytes []byte) error {
213208
return errTraceTooLarge
214209
}
215210

216-
tkn := i.tokenForTraceID(id)
211+
tkn := util.HashForTraceID(id)
217212
trace := i.getOrCreateTrace(id, tkn)
218213

219214
err = trace.Push(ctx, i.instanceID, traceBytes)
@@ -418,7 +413,7 @@ func (i *instance) FindTraceByID(ctx context.Context, id []byte, allowPartialTra
418413

419414
// live traces
420415
i.tracesMtx.Lock()
421-
if liveTrace, ok := i.traces[i.tokenForTraceID(id)]; ok {
416+
if liveTrace, ok := i.traces[util.HashForTraceID(id)]; ok {
422417
completeTrace, err = model.MustNewSegmentDecoder(model.CurrentEncoding).PrepareForRead(liveTrace.batches)
423418
for _, b := range liveTrace.batches {
424419
metrics.InspectedBytes += uint64(len(b))
@@ -514,7 +509,7 @@ func (i *instance) AddCompletingBlock(b common.WALBlock) {
514509
// getOrCreateTrace will return a new trace object for the given request
515510
//
516511
// It must be called under the i.tracesMtx lock
517-
func (i *instance) getOrCreateTrace(traceID []byte, fp uint32) *liveTrace {
512+
func (i *instance) getOrCreateTrace(traceID []byte, fp uint64) *liveTrace {
518513
trace, ok := i.traces[fp]
519514
if ok {
520515
return trace
@@ -526,13 +521,6 @@ func (i *instance) getOrCreateTrace(traceID []byte, fp uint32) *liveTrace {
526521
return trace
527522
}
528523

529-
// tokenForTraceID hash trace ID, should be called under lock
530-
func (i *instance) tokenForTraceID(id []byte) uint32 {
531-
i.hash.Reset()
532-
_, _ = i.hash.Write(id)
533-
return i.hash.Sum32()
534-
}
535-
536524
// resetHeadBlock() should be called under lock
537525
func (i *instance) resetHeadBlock() error {
538526
dedicatedColumns := i.getDedicatedColumns()

‎modules/ingester/instance_test.go

Lines changed: 70 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/grafana/tempo/pkg/model/trace"
2121
"github.com/grafana/tempo/pkg/tempopb"
2222
v1_trace "github.com/grafana/tempo/pkg/tempopb/trace/v1"
23+
"github.com/grafana/tempo/pkg/util"
2324
"github.com/grafana/tempo/pkg/util/test"
2425
)
2526

@@ -409,7 +410,7 @@ func TestInstanceCutCompleteTraces(t *testing.T) {
409410
instance, _ := defaultInstance(t)
410411

411412
for _, trace := range tc.input {
412-
fp := instance.tokenForTraceID(trace.traceID)
413+
fp := util.HashForTraceID(trace.traceID)
413414
instance.traces[fp] = trace
414415
}
415416

@@ -418,12 +419,12 @@ func TestInstanceCutCompleteTraces(t *testing.T) {
418419

419420
require.Equal(t, len(tc.expectedExist), len(instance.traces))
420421
for _, expectedExist := range tc.expectedExist {
421-
_, ok := instance.traces[instance.tokenForTraceID(expectedExist.traceID)]
422+
_, ok := instance.traces[util.HashForTraceID(expectedExist.traceID)]
422423
require.True(t, ok)
423424
}
424425

425426
for _, expectedNotExist := range tc.expectedNotExist {
426-
_, ok := instance.traces[instance.tokenForTraceID(expectedNotExist.traceID)]
427+
_, ok := instance.traces[util.HashForTraceID(expectedNotExist.traceID)]
427428
require.False(t, ok)
428429
}
429430
})
@@ -684,6 +685,72 @@ func TestInstancePartialSuccess(t *testing.T) {
684685
require.Equal(t, expected, result)
685686
}
686687

688+
func TestFindTraceByIDNoCollisions(t *testing.T) {
689+
ctx := context.Background()
690+
ingester, _, _ := defaultIngester(t, t.TempDir())
691+
instance, err := ingester.getOrCreateInstance(testTenantID)
692+
require.NoError(t, err, "unexpected error creating new instance")
693+
694+
// Verify that Trace IDs that collide with fnv-32 hash do not collide with fnv-64 hash
695+
hexIDs := []string{
696+
"fd5980503add11f09f80f77608c1b2da",
697+
"091ea7803ade11f0998a055186ee1243",
698+
"9e0d446036dc11f09ac04988d2097052",
699+
"a61ed97036dc11f0883771db3b51b1ec",
700+
"6b27f5501eda11f09e99db1b2c23c542",
701+
"6b4149b01eda11f0b0e2a966cf7ebbc8",
702+
"3e9582202f9a11f0afb01b7c06024bd6",
703+
"370db6802f9a11f0a9a212dff3125239",
704+
"978d70802a7311f0991f350653ef0ab4",
705+
"9b66da202a7311f09d292db17ccfd31a",
706+
"de567f703bb711f0b8c377682d1667e6",
707+
"dc2d0fc03bb711f091de732fcf93048c",
708+
}
709+
710+
ids := make([][]byte, len(hexIDs))
711+
for i, hexID := range hexIDs {
712+
traceID, err := util.HexStringToTraceID(hexID)
713+
require.NoError(t, err)
714+
ids[i] = traceID
715+
}
716+
717+
multiMaxBytes := make([]int, len(ids))
718+
for j := range multiMaxBytes {
719+
multiMaxBytes[j] = 1000
720+
}
721+
req := makePushBytesRequestMultiTraces(ids, multiMaxBytes)
722+
require.Equal(t, len(ids), len(req.Traces))
723+
response := instance.PushBytesRequest(ctx, req)
724+
errored, maxLiveCount, traceTooLargeCount := CheckPushBytesError(response)
725+
726+
require.False(t, errored)
727+
require.Equal(t, 0, maxLiveCount)
728+
require.Equal(t, 0, traceTooLargeCount)
729+
730+
traceResults := make([]*tempopb.Trace, len(ids))
731+
for i := range ids {
732+
result, err := instance.FindTraceByID(ctx, ids[i], false)
733+
require.NoError(t, err, "error finding trace by id")
734+
require.NotNil(t, result)
735+
require.NotNil(t, result.Trace)
736+
traceResults[i] = result.Trace
737+
}
738+
739+
// Verify that spans do not appear in multiple traces
740+
spanIDs := map[string]struct{}{}
741+
for _, trace := range traceResults {
742+
for _, resourceSpan := range trace.ResourceSpans {
743+
for _, scopeSpan := range resourceSpan.ScopeSpans {
744+
for _, span := range scopeSpan.Spans {
745+
_, found := spanIDs[string(span.SpanId)]
746+
require.False(t, found, "span %s appears in multiple traces", span.SpanId)
747+
spanIDs[string(span.SpanId)] = struct{}{}
748+
}
749+
}
750+
}
751+
}
752+
}
753+
687754
func TestSortByteSlices(t *testing.T) {
688755
numTraces := 100
689756

0 commit comments

Comments
 (0)