Skip to content

Commit 1bd6967

Browse files
ruslan-mikhailovknylander-grafana
authored andcommitted
Issue 4632: fix exemplars calculation (grafana#5115)
* [bugfix] Request at least 1 exemplar per shard In case exemplars * 1.2 is less than total it could lead to dropping all exemplars * Test buildBackendRequests * [bugfix] Correct exemplars calculation * [bugfix] Correct calculation of required pages per request Search the entire block if it's small * Changelog
1 parent f55a81c commit 1bd6967

File tree

4 files changed

+202
-3
lines changed

4 files changed

+202
-3
lines changed

‎CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ configurable via the throughput_bytes_slo field, and it will populate op="traces
109109
* [BUGFIX] Fix `TempoBlockListRisingQuickly` alert grouping. [#4876](https://github.com/grafana/tempo/pull/4876) (@mapno)
110110
* [BUGFIX] Reset `SkipMetricsGeneration` before reuse. [#5117](https://github.com/grafana/tempo/pull/5117) (@flxbk)
111111
* [BUGFIX] Fix metrics generator host info processor overrides config. [#5118](https://github.com/grafana/tempo/pull/5118) (@rlankfo)
112+
* [BUGFIX] Fix for queried number of exemplars (TraceQL Metrics) [#5115](https://github.com/grafana/tempo/pull/5115) (@ruslan-mikhailov)
112113

113114
# v2.7.2
114115

‎modules/frontend/metrics_query_range_sharder.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -166,10 +166,10 @@ func (s queryRangeSharder) RoundTrip(pipelineRequest pipeline.Request) (pipeline
166166
}
167167

168168
func (s *queryRangeSharder) exemplarsPerShard(total uint32, exemplars uint32) uint32 {
169-
if exemplars == 0 {
169+
if exemplars == 0 || total == 0 {
170170
return 0
171171
}
172-
return uint32(math.Ceil(float64(exemplars)*1.2)) / total
172+
return max(uint32(math.Ceil(float64(exemplars)*1.2))/total, 1) // require at least 1 exemplar per shard
173173
}
174174

175175
func (s *queryRangeSharder) backendRequests(ctx context.Context, tenantID string, parent pipeline.Request, searchReq tempopb.QueryRangeRequest, cutoff time.Time, targetBytesPerRequest int, reqCh chan pipeline.Request) (totalJobs, totalBlocks uint32, totalBlockBytes uint64) {
@@ -244,7 +244,7 @@ func (s *queryRangeSharder) buildBackendRequests(ctx context.Context, tenantID s
244244
if exemplars > 0 {
245245
// Scale the number of exemplars per block to match the size
246246
// of each sub request on this block. For very small blocks or other edge cases, return at least 1.
247-
exemplars = max(uint32(float64(exemplars)*float64(m.TotalRecords)/float64(pages)), 1)
247+
exemplars = max(uint32(float64(exemplars)*float64(pages)/float64(m.TotalRecords)), 1)
248248
}
249249

250250
dedColsJSON, err := colsToJSON.JSONForDedicatedColumns(m.DedicatedColumns)
Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
package frontend
2+
3+
import (
4+
"net/http/httptest"
5+
"strconv"
6+
"strings"
7+
"testing"
8+
"time"
9+
10+
"github.com/go-kit/log"
11+
"github.com/google/uuid"
12+
"github.com/stretchr/testify/assert"
13+
"github.com/stretchr/testify/require"
14+
15+
"github.com/grafana/tempo/modules/frontend/pipeline"
16+
"github.com/grafana/tempo/pkg/tempopb"
17+
"github.com/grafana/tempo/tempodb/backend"
18+
)
19+
20+
func FuzzExemplarsPerShard(f *testing.F) {
21+
f.Add(uint32(1), uint32(10)) // total = 1, exemplars = 10
22+
f.Add(uint32(100), uint32(1)) // total = 100, exemplars = 1
23+
f.Add(uint32(10), uint32(0)) // total = 10, exemplars = 0
24+
25+
s := &queryRangeSharder{}
26+
27+
f.Fuzz(func(t *testing.T, total uint32, exemplars uint32) {
28+
result := s.exemplarsPerShard(total, exemplars)
29+
30+
if exemplars == 0 || total == 0 {
31+
assert.Equal(t, uint32(0), result, "if exemplars is 0 or total is 0, result should be 0")
32+
} else {
33+
assert.Greater(t, result, uint32(0), "result should be greater than 0")
34+
}
35+
})
36+
}
37+
38+
func TestBuildBackendRequestsExemplarsOneBlock(t *testing.T) {
39+
// Create the test sharder with exemplars enabled
40+
sharder := &queryRangeSharder{
41+
logger: log.NewNopLogger(),
42+
cfg: QueryRangeSharderConfig{
43+
MaxExemplars: 100,
44+
},
45+
}
46+
tenantID := "test-tenant"
47+
targetBytesPerRequest := 1000
48+
49+
testCases := []struct {
50+
name string
51+
totalRecords uint32
52+
blockSize uint64
53+
exemplars uint32
54+
expectedBatches int
55+
expectedExemplars int
56+
}{
57+
{
58+
name: "basic",
59+
totalRecords: 100,
60+
blockSize: uint64(targetBytesPerRequest),
61+
exemplars: 5,
62+
expectedExemplars: 6, // 5 * 1.2
63+
expectedBatches: 1,
64+
},
65+
{
66+
name: "two batches",
67+
totalRecords: 100,
68+
blockSize: uint64(2 * targetBytesPerRequest),
69+
exemplars: 5,
70+
expectedExemplars: 6, // 5 * 1.2
71+
expectedBatches: 2,
72+
},
73+
{
74+
name: "high record count",
75+
totalRecords: 10000,
76+
blockSize: 50000,
77+
exemplars: 10,
78+
expectedExemplars: 50, // 1 per each batch
79+
expectedBatches: 50,
80+
},
81+
{
82+
name: "totalRecords == blockSize == targetBytesPerRequest",
83+
totalRecords: uint32(targetBytesPerRequest),
84+
blockSize: uint64(targetBytesPerRequest),
85+
exemplars: 10,
86+
expectedExemplars: 12, // 10 * 1.2
87+
expectedBatches: 1,
88+
},
89+
{
90+
name: "large block size",
91+
totalRecords: 500,
92+
blockSize: 50000,
93+
exemplars: 20,
94+
expectedExemplars: 50, // 1 per each batch
95+
expectedBatches: 50,
96+
},
97+
{
98+
name: "small block",
99+
totalRecords: 10,
100+
blockSize: 100,
101+
exemplars: 1,
102+
expectedExemplars: 2, // 1 * 1.2 -> rounded up to 2
103+
expectedBatches: 1,
104+
},
105+
{
106+
name: "block with single record",
107+
totalRecords: 1,
108+
blockSize: uint64(2 * targetBytesPerRequest),
109+
exemplars: 1,
110+
expectedExemplars: 2, // 1 * 1.2 -> rounded up to 2
111+
expectedBatches: 1,
112+
},
113+
{
114+
name: "block with single record",
115+
totalRecords: 1,
116+
blockSize: uint64(1.5 * float64(targetBytesPerRequest)),
117+
exemplars: 1,
118+
expectedExemplars: 2, // 1 * 1.2 -> rounded up to 2
119+
expectedBatches: 1,
120+
},
121+
{
122+
name: "block with 2 records",
123+
totalRecords: 2,
124+
blockSize: uint64(2 * targetBytesPerRequest),
125+
exemplars: 1,
126+
expectedExemplars: 2, // 1 * 1.2 -> rounded up to 2
127+
expectedBatches: 2,
128+
},
129+
}
130+
131+
for _, tc := range testCases {
132+
t.Run(tc.name, func(t *testing.T) {
133+
// Create a test requests with exemplars enabled
134+
req := httptest.NewRequest("GET", "/test", nil)
135+
parentReq := pipeline.NewHTTPRequest(req)
136+
searchReq := tempopb.QueryRangeRequest{
137+
Query: "test_query",
138+
Start: uint64(time.Now().Add(-1 * time.Hour).UnixNano()),
139+
End: uint64(time.Now().UnixNano()),
140+
Step: uint64(60 * time.Second.Nanoseconds()),
141+
Exemplars: tc.exemplars,
142+
}
143+
144+
// Create mock block metadata
145+
blockMeta := &backend.BlockMeta{
146+
BlockID: backend.MustParse(uuid.NewString()),
147+
TotalRecords: tc.totalRecords,
148+
Size_: tc.blockSize,
149+
StartTime: time.Now().Add(-1 * time.Hour),
150+
EndTime: time.Now(),
151+
}
152+
153+
reqCh := make(chan pipeline.Request, 10)
154+
155+
go func() {
156+
sharder.buildBackendRequests(t.Context(), tenantID, parentReq, searchReq, []*backend.BlockMeta{blockMeta}, targetBytesPerRequest, reqCh)
157+
}()
158+
159+
// Collect requests
160+
var generatedRequests []pipeline.Request
161+
for req := range reqCh {
162+
generatedRequests = append(generatedRequests, req)
163+
}
164+
assert.Equal(t, tc.expectedBatches, len(generatedRequests), "Number of generated requests should match expected value")
165+
166+
var totalExemplars int
167+
for _, req := range generatedRequests {
168+
uri := req.HTTPRequest().URL.String()
169+
exemplarsValue := extractExemplarsValue(t, uri)
170+
assert.Greater(t, exemplarsValue, 0, "Exemplars per batch should be at least 1")
171+
totalExemplars += exemplarsValue
172+
}
173+
assert.Equal(t, tc.expectedExemplars, totalExemplars, "Total exemplars should match expected value")
174+
})
175+
}
176+
}
177+
178+
// extractExemplarsValue extracts the exemplars value from the URL
179+
func extractExemplarsValue(t *testing.T, uri string) int {
180+
require.True(t, strings.Contains(uri, "exemplars="), "Request should contain exemplars parameter")
181+
exemplarsParam := ""
182+
for param := range strings.SplitSeq(uri, "&") {
183+
if strings.HasPrefix(param, "exemplars=") {
184+
exemplarsParam = strings.TrimPrefix(param, "exemplars=")
185+
break
186+
}
187+
}
188+
require.NotEmpty(t, exemplarsParam, "Exemplars parameter should not be empty")
189+
190+
exemplarsValue, err := strconv.Atoi(exemplarsParam)
191+
require.NoError(t, err, "Should be able to parse exemplars value")
192+
193+
return exemplarsValue
194+
}

‎modules/frontend/search_sharder.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -374,6 +374,10 @@ func pagesPerRequest(m *backend.BlockMeta, bytesPerRequest int) int {
374374
if m.Size_ == 0 || m.TotalRecords == 0 {
375375
return 0
376376
}
377+
// if the block is smaller than the bytesPerRequest, we can search the entire block
378+
if m.Size_ < uint64(bytesPerRequest) {
379+
return int(m.TotalRecords)
380+
}
377381

378382
bytesPerPage := m.Size_ / uint64(m.TotalRecords)
379383
if bytesPerPage == 0 {

0 commit comments

Comments
 (0)