Skip to content

Commit a09622b

Browse files
committed
chore: Add bool to metrics.go log line whether query used bloom filters (#14645)
Better observability of which queries use bloom filters for chunk filtering. Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
1 parent 7ff99f4 commit a09622b

File tree

11 files changed

+165
-106
lines changed

11 files changed

+165
-106
lines changed

‎pkg/bloomgateway/querier.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -101,10 +101,10 @@ func convertToShortRef(ref *logproto.ChunkRef) *logproto.ShortRef {
101101
return &logproto.ShortRef{From: ref.From, Through: ref.Through, Checksum: ref.Checksum}
102102
}
103103

104-
func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from, through model.Time, chunkRefs []*logproto.ChunkRef, queryPlan plan.QueryPlan) ([]*logproto.ChunkRef, error) {
104+
func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from, through model.Time, chunkRefs []*logproto.ChunkRef, queryPlan plan.QueryPlan) ([]*logproto.ChunkRef, bool, error) {
105105
// Shortcut that does not require any filtering
106106
if !bq.limits.BloomGatewayEnabled(tenant) || len(chunkRefs) == 0 || len(v1.ExtractTestableLabelMatchers(queryPlan.AST)) == 0 {
107-
return chunkRefs, nil
107+
return chunkRefs, false, nil
108108
}
109109

110110
logger, ctx := spanlogger.NewWithLogger(ctx, bq.logger, "bloomquerier.FilterChunkRefs")
@@ -140,7 +140,7 @@ func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from
140140
bq.metrics.seriesTotal.Add(float64(preFilterSeries))
141141
bq.metrics.seriesFiltered.Add(0)
142142

143-
return chunkRefs, nil
143+
return chunkRefs, false, nil
144144
}
145145
}
146146

@@ -152,12 +152,12 @@ func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from
152152
day := bloomshipper.NewInterval(s.day.Time, s.day.Add(Day))
153153
blocks, skipped, err := bq.blockResolver.Resolve(ctx, tenant, day, s.series)
154154
if err != nil {
155-
return nil, err
155+
return nil, false, err
156156
}
157157

158158
refs, err := bq.c.FilterChunks(ctx, tenant, s.interval, blocks, queryPlan)
159159
if err != nil {
160-
return nil, err
160+
return nil, false, err
161161
}
162162

163163
skippedGrps = append(skippedGrps, skipped)
@@ -167,7 +167,7 @@ func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from
167167
// add chunk refs from series that were not mapped to any blocks
168168
skippedDeduped, err := mergeSeries(skippedGrps, nil)
169169
if err != nil {
170-
return nil, errors.Wrap(err, "failed to dedupe skipped series")
170+
return nil, false, errors.Wrap(err, "failed to dedupe skipped series")
171171
}
172172

173173
var chunksSkipped int
@@ -177,7 +177,7 @@ func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from
177177

178178
deduped, err := mergeSeries(responses, nil)
179179
if err != nil {
180-
return nil, errors.Wrap(err, "failed to dedupe results")
180+
return nil, false, errors.Wrap(err, "failed to dedupe results")
181181
}
182182

183183
result := make([]*logproto.ChunkRef, 0, len(chunkRefs))
@@ -218,7 +218,7 @@ func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from
218218
bq.metrics.seriesSkipped.Add(float64(len(skippedDeduped)))
219219
bq.metrics.seriesFiltered.Add(float64(preFilterSeries - postFilterSeries))
220220

221-
return result, nil
221+
return result, true, nil
222222
}
223223

224224
// groupChunkRefs takes a slice of chunk refs sorted by their fingerprint and

‎pkg/bloomgateway/querier_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ func TestBloomQuerier(t *testing.T) {
7979
}
8080
expr, err := syntax.ParseExpr(`{foo="bar"}`)
8181
require.NoError(t, err)
82-
res, err := bq.FilterChunkRefs(ctx, tenant, from, through, chunkRefs, plan.QueryPlan{AST: expr})
82+
res, _, err := bq.FilterChunkRefs(ctx, tenant, from, through, chunkRefs, plan.QueryPlan{AST: expr})
8383
require.NoError(t, err)
8484
require.Equal(t, chunkRefs, res)
8585
require.Equal(t, 0, c.callCount)
@@ -95,7 +95,7 @@ func TestBloomQuerier(t *testing.T) {
9595
chunkRefs := []*logproto.ChunkRef{}
9696
expr, err := syntax.ParseExpr(`{foo="bar"} | trace_id="exists"`)
9797
require.NoError(t, err)
98-
res, err := bq.FilterChunkRefs(ctx, tenant, from, through, chunkRefs, plan.QueryPlan{AST: expr})
98+
res, _, err := bq.FilterChunkRefs(ctx, tenant, from, through, chunkRefs, plan.QueryPlan{AST: expr})
9999
require.NoError(t, err)
100100
require.Equal(t, chunkRefs, res)
101101
require.Equal(t, 0, c.callCount)
@@ -115,7 +115,7 @@ func TestBloomQuerier(t *testing.T) {
115115
}
116116
expr, err := syntax.ParseExpr(`{foo="bar"} | trace_id="exists"`)
117117
require.NoError(t, err)
118-
res, err := bq.FilterChunkRefs(ctx, tenant, from, through, chunkRefs, plan.QueryPlan{AST: expr})
118+
res, _, err := bq.FilterChunkRefs(ctx, tenant, from, through, chunkRefs, plan.QueryPlan{AST: expr})
119119
require.Error(t, err)
120120
require.Nil(t, res)
121121
})
@@ -134,7 +134,7 @@ func TestBloomQuerier(t *testing.T) {
134134
}
135135
expr, err := syntax.ParseExpr(`{foo="bar"} | trace_id="exists"`)
136136
require.NoError(t, err)
137-
res, err := bq.FilterChunkRefs(ctx, tenant, from, through, chunkRefs, plan.QueryPlan{AST: expr})
137+
res, _, err := bq.FilterChunkRefs(ctx, tenant, from, through, chunkRefs, plan.QueryPlan{AST: expr})
138138
require.NoError(t, err)
139139
require.Equal(t, chunkRefs, res)
140140
require.Equal(t, 2, c.callCount)

‎pkg/indexgateway/gateway.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ type IndexClientWithRange struct {
5858
}
5959

6060
type BloomQuerier interface {
61-
FilterChunkRefs(ctx context.Context, tenant string, from, through model.Time, chunks []*logproto.ChunkRef, plan plan.QueryPlan) ([]*logproto.ChunkRef, error)
61+
FilterChunkRefs(ctx context.Context, tenant string, from, through model.Time, chunks []*logproto.ChunkRef, plan plan.QueryPlan) ([]*logproto.ChunkRef, bool, error)
6262
}
6363

6464
type Gateway struct {
@@ -257,14 +257,15 @@ func (g *Gateway) GetChunkRef(ctx context.Context, req *logproto.GetChunkRefRequ
257257
return result, nil
258258
}
259259

260-
chunkRefs, err := g.bloomQuerier.FilterChunkRefs(ctx, instanceID, req.From, req.Through, result.Refs, req.Plan)
260+
chunkRefs, used, err := g.bloomQuerier.FilterChunkRefs(ctx, instanceID, req.From, req.Through, result.Refs, req.Plan)
261261
if err != nil {
262262
return nil, err
263263
}
264264

265265
result.Refs = chunkRefs
266-
level.Info(logger).Log("msg", "return filtered chunk refs", "unfiltered", initialChunkCount, "filtered", len(result.Refs))
266+
level.Info(logger).Log("msg", "return filtered chunk refs", "unfiltered", initialChunkCount, "filtered", len(result.Refs), "used_blooms", used)
267267
result.Stats.PostFilterChunks = int64(len(result.Refs))
268+
result.Stats.UsedBloomFilters = used
268269
return result, nil
269270
}
270271

‎pkg/logql/metrics.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,7 @@ func RecordRangeAndInstantQueryMetrics(
208208
"index_total_chunks", stats.Index.TotalChunks,
209209
"index_post_bloom_filter_chunks", stats.Index.PostFilterChunks,
210210
"index_bloom_filter_ratio", fmt.Sprintf("%.2f", bloomRatio),
211+
"index_used_bloom_filters", stats.Index.UsedBloomFilters,
211212
"index_shard_resolver_duration", time.Duration(stats.Index.ShardsDuration),
212213
}...)
213214

‎pkg/logqlmodel/stats/context.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,9 @@ func (i *Index) Merge(m Index) {
240240
i.TotalChunks += m.TotalChunks
241241
i.PostFilterChunks += m.PostFilterChunks
242242
i.ShardsDuration += m.ShardsDuration
243+
if m.UsedBloomFilters {
244+
i.UsedBloomFilters = m.UsedBloomFilters
245+
}
243246
}
244247

245248
func (c *Caches) Merge(m Caches) {

0 commit comments

Comments
 (0)