Skip to content

Commit 3b8d993

Browse files
authored
feat: Skip writeback for chunks fetched by queriers older than a duration (#15605)
1 parent 226e9f1 commit 3b8d993

File tree

8 files changed

+69
-38
lines changed

8 files changed

+69
-38
lines changed

‎docs/sources/shared/configuration.md

+5
Original file line numberDiff line numberDiff line change
@@ -1732,6 +1732,11 @@ The `chunk_store_config` block configures how chunks will be cached and how long
17321732
# The CLI flags prefix for this block configuration is: store.index-cache-write
17331733
[write_dedupe_cache_config: <cache_config>]
17341734
1735+
# Chunks fetched from queriers before this duration will not be written to the
1736+
# cache. A value of 0 will write all chunks to the cache
1737+
# CLI flag: -store.skip-query-writeback-older-than
1738+
[skip_query_writeback_cache_older_than: <duration> | default = 0s]
1739+
17351740
# Chunks will be handed off to the L2 cache after this duration. 0 to disable L2
17361741
# cache.
17371742
# CLI flag: -store.chunks-cache-l2.handoff

‎pkg/storage/chunk/cache/cache_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ func testChunkFetcher(t *testing.T, c cache.Cache, chunks []chunk.Chunk) {
132132
},
133133
}
134134

135-
fetcher, err := fetcher.New(c, nil, false, s, nil, 0)
135+
fetcher, err := fetcher.New(c, nil, false, s, nil, 0, 0)
136136
require.NoError(t, err)
137137
defer fetcher.Stop()
138138

‎pkg/storage/chunk/fetcher/fetcher.go

+18-9
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@ type Fetcher struct {
4949
cachel2 cache.Cache
5050
cacheStubs bool
5151

52-
l2CacheHandoff time.Duration
52+
l2CacheHandoff time.Duration
53+
skipQueryWritebackCacheOlderThan time.Duration
5354

5455
wait sync.WaitGroup
5556
decodeRequests chan decodeRequest
@@ -69,15 +70,16 @@ type decodeResponse struct {
6970
}
7071

7172
// New makes a new ChunkFetcher.
72-
func New(cache cache.Cache, cachel2 cache.Cache, cacheStubs bool, schema config.SchemaConfig, storage client.Client, l2CacheHandoff time.Duration) (*Fetcher, error) {
73+
func New(cache cache.Cache, cachel2 cache.Cache, cacheStubs bool, schema config.SchemaConfig, storage client.Client, l2CacheHandoff time.Duration, skipQueryWritebackOlderThan time.Duration) (*Fetcher, error) {
7374
c := &Fetcher{
74-
schema: schema,
75-
storage: storage,
76-
cache: cache,
77-
cachel2: cachel2,
78-
l2CacheHandoff: l2CacheHandoff,
79-
cacheStubs: cacheStubs,
80-
decodeRequests: make(chan decodeRequest),
75+
schema: schema,
76+
storage: storage,
77+
cache: cache,
78+
cachel2: cachel2,
79+
l2CacheHandoff: l2CacheHandoff,
80+
cacheStubs: cacheStubs,
81+
skipQueryWritebackCacheOlderThan: skipQueryWritebackOlderThan,
82+
decodeRequests: make(chan decodeRequest),
8183
}
8284

8385
c.wait.Add(chunkDecodeParallelism)
@@ -138,6 +140,9 @@ func (c *Fetcher) FetchChunks(ctx context.Context, chunks []chunk.Chunk) ([]chun
138140
l2OnlyChunks := make([]chunk.Chunk, 0, len(chunks))
139141

140142
for _, m := range chunks {
143+
if c.skipQueryWritebackCacheOlderThan > 0 && m.From.Time().Before(time.Now().UTC().Add(-c.skipQueryWritebackCacheOlderThan)) {
144+
continue
145+
}
141146
// Similar to below, this is an optimization to not bother looking in the l1 cache if there isn't a reasonable
142147
// expectation to find it there.
143148
if c.l2CacheHandoff > 0 && m.From.Time().Before(time.Now().UTC().Add(-extendedHandoff)) {
@@ -230,6 +235,10 @@ func (c *Fetcher) WriteBackCache(ctx context.Context, chunks []chunk.Chunk) erro
230235
keysL2 := make([]string, 0, len(chunks))
231236
bufsL2 := make([][]byte, 0, len(chunks))
232237
for i := range chunks {
238+
if c.skipQueryWritebackCacheOlderThan > 0 && chunks[i].From.Time().Before(time.Now().UTC().Add(-c.skipQueryWritebackCacheOlderThan)) {
239+
continue
240+
}
241+
233242
var encoded []byte
234243
var err error
235244
if !c.cacheStubs {

‎pkg/storage/chunk/fetcher/fetcher_test.go

+37-22
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,17 @@ import (
2525
func Test(t *testing.T) {
2626
now := time.Now()
2727
tests := []struct {
28-
name string
29-
handoff time.Duration
30-
storeStart []chunk.Chunk
31-
l1Start []chunk.Chunk
32-
l2Start []chunk.Chunk
33-
fetch []chunk.Chunk
34-
l1KeysRequested int
35-
l1End []chunk.Chunk
36-
l2KeysRequested int
37-
l2End []chunk.Chunk
28+
name string
29+
handoff time.Duration
30+
skipQueryWriteback time.Duration
31+
storeStart []chunk.Chunk
32+
l1Start []chunk.Chunk
33+
l2Start []chunk.Chunk
34+
fetch []chunk.Chunk
35+
l1KeysRequested int
36+
l1End []chunk.Chunk
37+
l2KeysRequested int
38+
l2End []chunk.Chunk
3839
}{
3940
{
4041
name: "all found in L1 cache",
@@ -82,6 +83,19 @@ func Test(t *testing.T) {
8283
l2KeysRequested: 3,
8384
l2End: makeChunks(now, c{7 * time.Hour, 8 * time.Hour}, c{8 * time.Hour, 9 * time.Hour}, c{9 * time.Hour, 10 * time.Hour}),
8485
},
86+
{
87+
name: "skipQueryWriteback",
88+
handoff: 24 * time.Hour,
89+
skipQueryWriteback: 3 * 24 * time.Hour,
90+
storeStart: makeChunks(now, c{time.Hour, 2 * time.Hour}, c{2 * time.Hour, 3 * time.Hour}, c{3 * time.Hour, 4 * time.Hour}, c{5 * 24 * time.Hour, 6 * 24 * time.Hour}, c{5 * 24 * time.Hour, 6 * 24 * time.Hour}),
91+
l1Start: []chunk.Chunk{},
92+
l2Start: []chunk.Chunk{},
93+
fetch: makeChunks(now, c{time.Hour, 2 * time.Hour}, c{2 * time.Hour, 3 * time.Hour}, c{3 * time.Hour, 4 * time.Hour}, c{5 * 24 * time.Hour, 6 * 24 * time.Hour}, c{5 * 24 * time.Hour, 6 * 24 * time.Hour}),
94+
l1KeysRequested: 3,
95+
l1End: makeChunks(now, c{time.Hour, 2 * time.Hour}, c{2 * time.Hour, 3 * time.Hour}, c{3 * time.Hour, 4 * time.Hour}),
96+
l2KeysRequested: 0,
97+
l2End: []chunk.Chunk{},
98+
},
8599
{
86100
name: "writeback l1",
87101
handoff: 24 * time.Hour,
@@ -194,7 +208,7 @@ func Test(t *testing.T) {
194208
assert.NoError(t, chunkClient.PutChunks(context.Background(), test.storeStart))
195209

196210
// Build fetcher
197-
f, err := New(c1, c2, false, sc, chunkClient, test.handoff)
211+
f, err := New(c1, c2, false, sc, chunkClient, test.handoff, test.skipQueryWriteback)
198212
assert.NoError(t, err)
199213

200214
// Run the test
@@ -235,16 +249,17 @@ func BenchmarkFetch(b *testing.B) {
235249
fetch = append(fetch, storeStart...)
236250

237251
test := struct {
238-
name string
239-
handoff time.Duration
240-
storeStart []chunk.Chunk
241-
l1Start []chunk.Chunk
242-
l2Start []chunk.Chunk
243-
fetch []chunk.Chunk
244-
l1KeysRequested int
245-
l1End []chunk.Chunk
246-
l2KeysRequested int
247-
l2End []chunk.Chunk
252+
name string
253+
handoff time.Duration
254+
skipQueryWriteback time.Duration
255+
storeStart []chunk.Chunk
256+
l1Start []chunk.Chunk
257+
l2Start []chunk.Chunk
258+
fetch []chunk.Chunk
259+
l1KeysRequested int
260+
l1End []chunk.Chunk
261+
l2KeysRequested int
262+
l2End []chunk.Chunk
248263
}{
249264
name: "some in L1, some in L2",
250265
handoff: time.Duration(numchunks/3+100) * time.Hour,
@@ -291,7 +306,7 @@ func BenchmarkFetch(b *testing.B) {
291306
_ = chunkClient.PutChunks(context.Background(), test.storeStart)
292307

293308
// Build fetcher
294-
f, _ := New(c1, c2, false, sc, chunkClient, test.handoff)
309+
f, _ := New(c1, c2, false, sc, chunkClient, test.handoff, test.skipQueryWriteback)
295310

296311
for i := 0; i < b.N; i++ {
297312
_, err := f.FetchChunks(context.Background(), test.fetch)

‎pkg/storage/config/store.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,10 @@ import (
1010
)
1111

1212
type ChunkStoreConfig struct {
13-
ChunkCacheConfig cache.Config `yaml:"chunk_cache_config"`
14-
ChunkCacheConfigL2 cache.Config `yaml:"chunk_cache_config_l2"`
15-
WriteDedupeCacheConfig cache.Config `yaml:"write_dedupe_cache_config" doc:"description=Write dedupe cache is deprecated along with legacy index types (aws, aws-dynamo, bigtable, bigtable-hashed, cassandra, gcp, gcp-columnkey, grpc-store).\nConsider using TSDB index which does not require a write dedupe cache."`
13+
ChunkCacheConfig cache.Config `yaml:"chunk_cache_config"`
14+
ChunkCacheConfigL2 cache.Config `yaml:"chunk_cache_config_l2"`
15+
WriteDedupeCacheConfig cache.Config `yaml:"write_dedupe_cache_config" doc:"description=Write dedupe cache is deprecated along with legacy index types (aws, aws-dynamo, bigtable, bigtable-hashed, cassandra, gcp, gcp-columnkey, grpc-store).\nConsider using TSDB index which does not require a write dedupe cache."`
16+
SkipQueryWritebackOlderThan time.Duration `yaml:"skip_query_writeback_cache_older_than"`
1617

1718
L2ChunkCacheHandoff time.Duration `yaml:"l2_chunk_cache_handoff"`
1819
CacheLookupsOlderThan model.Duration `yaml:"cache_lookups_older_than"`
@@ -38,6 +39,7 @@ func (cfg *ChunkStoreConfig) RegisterFlags(f *flag.FlagSet) {
3839
f.DurationVar(&cfg.L2ChunkCacheHandoff, "store.chunks-cache-l2.handoff", 0, "Chunks will be handed off to the L2 cache after this duration. 0 to disable L2 cache.")
3940
f.BoolVar(&cfg.chunkCacheStubs, "store.chunks-cache.cache-stubs", false, "If true, don't write the full chunk to cache, just a stub entry.")
4041
cfg.WriteDedupeCacheConfig.RegisterFlagsWithPrefix("store.index-cache-write.", "", f)
42+
f.DurationVar(&cfg.SkipQueryWritebackOlderThan, "store.skip-query-writeback-older-than", 0, "Chunks fetched from queriers before this duration will not be written to the cache. A value of 0 will write all chunks to the cache")
4143

4244
f.Var(&cfg.CacheLookupsOlderThan, "store.cache-lookups-older-than", "Cache index entries older than this period. 0 to disable.")
4345
}

‎pkg/storage/store.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ func (s *LokiStore) init() error {
198198
if err != nil {
199199
return err
200200
}
201-
f, err := fetcher.New(s.chunksCache, s.chunksCacheL2, s.storeCfg.ChunkCacheStubs(), s.schemaCfg, chunkClient, s.storeCfg.L2ChunkCacheHandoff)
201+
f, err := fetcher.New(s.chunksCache, s.chunksCacheL2, s.storeCfg.ChunkCacheStubs(), s.schemaCfg, chunkClient, s.storeCfg.L2ChunkCacheHandoff, s.storeCfg.SkipQueryWritebackOlderThan)
202202
if err != nil {
203203
return err
204204
}

‎pkg/storage/stores/series_store_write_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ func TestChunkWriter_PutOne(t *testing.T) {
160160
idx := &mockIndexWriter{}
161161
client := &mockChunksClient{}
162162

163-
f, err := fetcher.New(cache, nil, false, schemaConfig, client, 0)
163+
f, err := fetcher.New(cache, nil, false, schemaConfig, client, 0, 0)
164164
require.NoError(t, err)
165165

166166
cw := NewChunkWriter(f, schemaConfig, idx, true)

‎pkg/storage/util_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ func (m *mockChunkStore) GetChunks(_ context.Context, _ string, _, _ model.Time,
261261
panic(err)
262262
}
263263

264-
f, err := fetcher.New(cache, nil, false, m.schemas, m.client, 0)
264+
f, err := fetcher.New(cache, nil, false, m.schemas, m.client, 0, 0)
265265
if err != nil {
266266
panic(err)
267267
}

0 commit comments

Comments
 (0)