Skip to content

Commit 3277321

Browse files
feat: when using sqlite for storing delete requests, use the stored completion time to reduce the requests considered for querytime filtering (#16600)
1 parent 92d63be commit 3277321

19 files changed

+296
-190
lines changed

‎pkg/compactor/client/grpc.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ func (s *compactorGRPCClient) Stop() {
6767

6868
func (s *compactorGRPCClient) GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]deletion.DeleteRequest, error) {
6969
ctx = user.InjectOrgID(ctx, userID)
70-
grpcResp, err := s.grpcClient.GetDeleteRequests(ctx, &deletion_grpc.GetDeleteRequestsRequest{})
70+
grpcResp, err := s.grpcClient.GetDeleteRequests(ctx, &deletion_grpc.GetDeleteRequestsRequest{ForQuerytimeFiltering: true})
7171
if err != nil {
7272
return nil, err
7373
}

‎pkg/compactor/client/grpc/grpc.pb.go

Lines changed: 74 additions & 26 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎pkg/compactor/client/grpc/grpc.proto

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ service Compactor {
77
rpc GetCacheGenNumbers(GetCacheGenNumbersRequest) returns (GetCacheGenNumbersResponse);
88
}
99

10-
message GetDeleteRequestsRequest {}
10+
message GetDeleteRequestsRequest {
11+
bool forQuerytimeFiltering = 1;
12+
}
1113

1214
message GetDeleteRequestsResponse {
1315
repeated DeleteRequest deleteRequests = 1;

‎pkg/compactor/client/http.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,11 @@ func NewHTTPClient(addr string, cfg HTTPConfig) (deletion.CompactorClient, error
5151
level.Error(log.Logger).Log("msg", "error parsing url", "err", err)
5252
return nil, err
5353
}
54+
5455
u.Path = getDeletePath
56+
q := u.Query()
57+
q.Set(deletion.ForQuerytimeFilteringQueryParam, "true")
58+
u.RawQuery = q.Encode()
5559
deleteRequestsURL := u.String()
5660

5761
u.Path = cacheGenNumPath

‎pkg/compactor/compactor.go

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,16 @@ type Limits interface {
210210
DefaultLimits() *validation.Limits
211211
}
212212

213-
func NewCompactor(cfg Config, objectStoreClients map[config.DayTime]client.ObjectClient, deleteStoreClient client.ObjectClient, schemaConfig config.SchemaConfig, limits Limits, r prometheus.Registerer, metricsNamespace string) (*Compactor, error) {
213+
func NewCompactor(
214+
cfg Config,
215+
objectStoreClients map[config.DayTime]client.ObjectClient,
216+
deleteStoreClient client.ObjectClient,
217+
schemaConfig config.SchemaConfig,
218+
limits Limits,
219+
indexUpdatePropagationMaxDelay time.Duration,
220+
r prometheus.Registerer,
221+
metricsNamespace string,
222+
) (*Compactor, error) {
214223
retentionEnabledStats.Set("false")
215224
if cfg.RetentionEnabled {
216225
retentionEnabledStats.Set("true")
@@ -267,15 +276,22 @@ func NewCompactor(cfg Config, objectStoreClients map[config.DayTime]client.Objec
267276
compactor.subservicesWatcher = services.NewFailureWatcher()
268277
compactor.subservicesWatcher.WatchManager(compactor.subservices)
269278

270-
if err := compactor.init(objectStoreClients, deleteStoreClient, schemaConfig, limits, r); err != nil {
279+
if err := compactor.init(objectStoreClients, deleteStoreClient, schemaConfig, indexUpdatePropagationMaxDelay, limits, r); err != nil {
271280
return nil, fmt.Errorf("init compactor: %w", err)
272281
}
273282

274283
compactor.Service = services.NewBasicService(compactor.starting, compactor.loop, compactor.stopping)
275284
return compactor, nil
276285
}
277286

278-
func (c *Compactor) init(objectStoreClients map[config.DayTime]client.ObjectClient, deleteStoreClient client.ObjectClient, schemaConfig config.SchemaConfig, limits Limits, r prometheus.Registerer) error {
287+
func (c *Compactor) init(
288+
objectStoreClients map[config.DayTime]client.ObjectClient,
289+
deleteStoreClient client.ObjectClient,
290+
schemaConfig config.SchemaConfig,
291+
indexUpdatePropagationMaxDelay time.Duration,
292+
limits Limits,
293+
r prometheus.Registerer,
294+
) error {
279295
err := chunk_util.EnsureDirectory(c.cfg.WorkingDirectory)
280296
if err != nil {
281297
return err
@@ -286,7 +302,7 @@ func (c *Compactor) init(objectStoreClients map[config.DayTime]client.ObjectClie
286302
return fmt.Errorf("delete store client not initialised when retention is enabled")
287303
}
288304

289-
if err := c.initDeletes(deleteStoreClient, r, limits); err != nil {
305+
if err := c.initDeletes(deleteStoreClient, indexUpdatePropagationMaxDelay, r, limits); err != nil {
290306
return fmt.Errorf("failed to init delete store: %w", err)
291307
}
292308
}
@@ -365,10 +381,16 @@ func (c *Compactor) init(objectStoreClients map[config.DayTime]client.ObjectClie
365381
return nil
366382
}
367383

368-
func (c *Compactor) initDeletes(objectClient client.ObjectClient, r prometheus.Registerer, limits Limits) error {
384+
func (c *Compactor) initDeletes(objectClient client.ObjectClient, indexUpdatePropagationMaxDelay time.Duration, r prometheus.Registerer, limits Limits) error {
369385
deletionWorkDir := filepath.Join(c.cfg.WorkingDirectory, "deletion")
370386
indexStorageClient := storage.NewIndexStorageClient(objectClient, c.cfg.DeleteRequestStoreKeyPrefix)
371-
store, err := deletion.NewDeleteRequestsStore(deletion.DeleteRequestsStoreDBType(c.cfg.DeleteRequestStoreDBType), deletionWorkDir, indexStorageClient, deletion.DeleteRequestsStoreDBType(c.cfg.BackupDeleteRequestStoreDBType))
387+
store, err := deletion.NewDeleteRequestsStore(
388+
deletion.DeleteRequestsStoreDBType(c.cfg.DeleteRequestStoreDBType),
389+
deletionWorkDir,
390+
indexStorageClient,
391+
deletion.DeleteRequestsStoreDBType(c.cfg.BackupDeleteRequestStoreDBType),
392+
indexUpdatePropagationMaxDelay,
393+
)
372394
if err != nil {
373395
return err
374396
}

‎pkg/compactor/compactor_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func setupTestCompactor(t *testing.T, objectClients map[config.DayTime]client.Ob
6363

6464
c, err := NewCompactor(cfg, objectClients, objectClients[periodConfigs[len(periodConfigs)-1].From], config.SchemaConfig{
6565
Configs: periodConfigs,
66-
}, overrides, prometheus.NewPedanticRegistry(), constants.Loki)
66+
}, overrides, 0, prometheus.NewPedanticRegistry(), constants.Loki)
6767
require.NoError(t, err)
6868

6969
c.RegisterIndexCompactor("dummy", testIndexCompactor{})

‎pkg/compactor/deletion/delete_requests_client.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,3 +137,15 @@ func (c *deleteRequestsClient) currentUserIDs() []string {
137137

138138
return userIDs
139139
}
140+
141+
func NewNoOpDeleteRequestsClient() DeleteRequestsClient {
142+
return &noOpDeleteRequestsClient{}
143+
}
144+
145+
type noOpDeleteRequestsClient struct{}
146+
147+
func (n noOpDeleteRequestsClient) GetAllDeleteRequestsForUser(_ context.Context, _ string) ([]DeleteRequest, error) {
148+
return nil, nil
149+
}
150+
151+
func (n noOpDeleteRequestsClient) Stop() {}

‎pkg/compactor/deletion/delete_requests_manager_test.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1040,9 +1040,10 @@ type mockDeleteRequestsStore struct {
10401040
getResult []DeleteRequest
10411041
getErr error
10421042

1043-
getAllUser string
1044-
getAllResult []DeleteRequest
1045-
getAllErr error
1043+
getAllUser string
1044+
getAllResult []DeleteRequest
1045+
getAllErr error
1046+
getAllRequestedForQuerytimeFiltering bool
10461047

10471048
genNumber string
10481049
}
@@ -1093,8 +1094,9 @@ func (m *mockDeleteRequestsStore) GetDeleteRequest(_ context.Context, userID, re
10931094
return m.getResult[0], m.getErr
10941095
}
10951096

1096-
func (m *mockDeleteRequestsStore) GetAllDeleteRequestsForUser(_ context.Context, userID string) ([]DeleteRequest, error) {
1097+
func (m *mockDeleteRequestsStore) GetAllDeleteRequestsForUser(_ context.Context, userID string, forQuerytimeFiltering bool) ([]DeleteRequest, error) {
10971098
m.getAllUser = userID
1099+
m.getAllRequestedForQuerytimeFiltering = forQuerytimeFiltering
10981100
return m.getAllResult, m.getAllErr
10991101
}
11001102

0 commit comments

Comments
 (0)