Skip to content

Commit ff111dc

Browse files
authored
fix: Correctly propagate index stats to metrics.go log line (#14941)
Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
1 parent fb6789d commit ff111dc

File tree

3 files changed

+49
-19
lines changed

3 files changed

+49
-19
lines changed

‎pkg/logqlmodel/stats/context.go

+7
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,13 @@ func (c *Context) Index() Index {
108108
return c.index
109109
}
110110

111+
// Merge index stats from multiple respones in a concurrency-safe manner
112+
func (c *Context) MergeIndex(i Index) {
113+
c.mtx.Lock()
114+
defer c.mtx.Unlock()
115+
c.index.Merge(i)
116+
}
117+
111118
// Caches returns the cache statistics accumulated so far.
112119
func (c *Context) Caches() Caches {
113120
return Caches{

‎pkg/storage/stores/series/series_index_gateway_store.go

+3-14
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,7 @@ func (c *IndexGatewayClientStore) GetChunkRefs(ctx context.Context, _ string, fr
6060
}
6161

6262
statsCtx := statscontext.FromContext(ctx)
63-
statsCtx.AddIndexTotalChunkRefs(response.Stats.TotalChunks)
64-
statsCtx.AddIndexPostFilterChunkRefs(response.Stats.PostFilterChunks)
63+
statsCtx.MergeIndex(response.Stats)
6564

6665
return result, nil
6766
}
@@ -131,23 +130,13 @@ func (c *IndexGatewayClientStore) Volume(ctx context.Context, _ string, from, th
131130
})
132131
}
133132

134-
func (c *IndexGatewayClientStore) GetShards(
135-
ctx context.Context,
136-
_ string,
137-
from, through model.Time,
138-
targetBytesPerShard uint64,
139-
predicate chunk.Predicate,
140-
) (*logproto.ShardsResponse, error) {
141-
resp, err := c.client.GetShards(ctx, &logproto.ShardsRequest{
133+
func (c *IndexGatewayClientStore) GetShards(ctx context.Context, _ string, from, through model.Time, targetBytesPerShard uint64, predicate chunk.Predicate) (*logproto.ShardsResponse, error) {
134+
return c.client.GetShards(ctx, &logproto.ShardsRequest{
142135
From: from,
143136
Through: through,
144137
Query: predicate.Plan().AST.String(),
145138
TargetBytesPerShard: targetBytesPerShard,
146139
})
147-
if err != nil {
148-
return nil, err
149-
}
150-
return resp, nil
151140
}
152141

153142
func (c *IndexGatewayClientStore) SetChunkFilterer(_ chunk.RequestChunkFilterer) {

‎pkg/storage/stores/series/series_index_gateway_store_test.go

+39-5
Original file line numberDiff line numberDiff line change
@@ -9,18 +9,52 @@ import (
99
"github.com/stretchr/testify/require"
1010

1111
"github.com/grafana/loki/v3/pkg/logproto"
12+
"github.com/grafana/loki/v3/pkg/logqlmodel/stats"
13+
"github.com/grafana/loki/v3/pkg/storage/chunk"
1214
)
1315

14-
type fakeClient struct {
16+
type mockClient struct {
1517
GatewayClient
1618
}
1719

18-
func (fakeClient) GetSeries(_ context.Context, _ *logproto.GetSeriesRequest) (*logproto.GetSeriesResponse, error) {
20+
func (mockClient) GetSeries(_ context.Context, _ *logproto.GetSeriesRequest) (*logproto.GetSeriesResponse, error) {
1921
return &logproto.GetSeriesResponse{}, nil
2022
}
2123

22-
func Test_IndexGatewayClient(t *testing.T) {
23-
idx := NewIndexGatewayClientStore(fakeClient{}, log.NewNopLogger())
24-
_, err := idx.GetSeries(context.Background(), "foo", model.Earliest, model.Latest)
24+
func (mockClient) GetChunkRef(_ context.Context, _ *logproto.GetChunkRefRequest) (*logproto.GetChunkRefResponse, error) {
25+
return &logproto.GetChunkRefResponse{
26+
Refs: []*logproto.ChunkRef{},
27+
Stats: stats.Index{
28+
TotalChunks: 1000,
29+
PostFilterChunks: 10,
30+
ShardsDuration: 0,
31+
UsedBloomFilters: true,
32+
},
33+
}, nil
34+
}
35+
36+
func Test_IndexGatewayClientStore_GetSeries(t *testing.T) {
37+
idx := NewIndexGatewayClientStore(&mockClient{}, log.NewNopLogger())
38+
_, err := idx.GetSeries(context.Background(), "tenant", model.Earliest, model.Latest)
2539
require.NoError(t, err)
2640
}
41+
42+
func Test_IndexGatewayClientStore_GetChunkRefs(t *testing.T) {
43+
idx := NewIndexGatewayClientStore(&mockClient{}, log.NewNopLogger())
44+
45+
t.Run("stats context is merged correctly", func(t *testing.T) {
46+
ctx := context.Background()
47+
_, ctx = stats.NewContext(ctx)
48+
49+
_, err := idx.GetChunkRefs(ctx, "tenant", model.Earliest, model.Latest, chunk.NewPredicate(nil, nil))
50+
require.NoError(t, err)
51+
52+
_, err = idx.GetChunkRefs(ctx, "tenant", model.Earliest, model.Latest, chunk.NewPredicate(nil, nil))
53+
require.NoError(t, err)
54+
55+
statsCtx := stats.FromContext(ctx)
56+
require.True(t, statsCtx.Index().UsedBloomFilters)
57+
require.Equal(t, int64(2000), statsCtx.Index().TotalChunks)
58+
require.Equal(t, int64(20), statsCtx.Index().PostFilterChunks)
59+
})
60+
}

0 commit comments

Comments
 (0)