Skip to content

Commit 07ccf7c

Browse files
chore: merge back delete request when we are done processing all its splits (#15968)
1 parent 13d0a1e commit 07ccf7c

9 files changed

+257
-79
lines changed

‎pkg/compactor/deletion/delete_requests_manager.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,10 @@ func NewDeleteRequestsManager(store DeleteRequestsStore, deleteRequestCancelPeri
5656

5757
go dm.loop()
5858

59+
if err := dm.mergeShardedRequests(context.Background()); err != nil {
60+
level.Error(util_log.Logger).Log("msg", "failed to merge sharded requests", "err", err)
61+
}
62+
5963
return dm
6064
}
6165

@@ -83,6 +87,39 @@ func (d *DeleteRequestsManager) Stop() {
8387
d.wg.Wait()
8488
}
8589

90+
// mergeShardedRequests merges the sharded requests back to a single request when we are done with processing all the shards
91+
func (d *DeleteRequestsManager) mergeShardedRequests(ctx context.Context) error {
92+
deleteGroups, err := d.deleteRequestsStore.GetAllDeleteRequests(context.Background())
93+
if err != nil {
94+
return err
95+
}
96+
97+
deletesPerRequest := partitionByRequestID(deleteGroups)
98+
deleteRequests := mergeDeletes(deletesPerRequest)
99+
for _, req := range deleteRequests {
100+
// do not consider requests which do not have an id. Request ID won't be set in some tests or there is a bug in our code for loading requests.
101+
if req.RequestID == "" {
102+
level.Error(util_log.Logger).Log("msg", "skipped considering request without an id for merging its shards",
103+
"user_id", req.UserID,
104+
"start_time", req.StartTime.Unix(),
105+
"end_time", req.EndTime.Unix(),
106+
"query", req.Query,
107+
)
108+
continue
109+
}
110+
// do not do anything if we are not done with processing all the shards or the number of shards is 1
111+
if req.Status != StatusProcessed || len(deletesPerRequest[req.RequestID]) == 1 {
112+
continue
113+
}
114+
115+
if err := d.deleteRequestsStore.MergeShardedRequests(ctx, req, deletesPerRequest[req.RequestID]); err != nil {
116+
return err
117+
}
118+
}
119+
120+
return nil
121+
}
122+
86123
func (d *DeleteRequestsManager) updateMetrics() error {
87124
deleteRequests, err := d.deleteRequestsStore.GetDeleteRequestsByStatus(context.Background(), StatusReceived)
88125
if err != nil {
@@ -393,6 +430,10 @@ func (d *DeleteRequestsManager) MarkPhaseFinished() {
393430
)
394431
d.markRequestAsProcessed(req)
395432
}
433+
434+
if err := d.mergeShardedRequests(context.Background()); err != nil {
435+
level.Error(util_log.Logger).Log("msg", "failed to merge sharded requests", "err", err)
436+
}
396437
}
397438

398439
func (d *DeleteRequestsManager) IntervalMayHaveExpiredChunks(_ model.Interval, userID string) bool {

‎pkg/compactor/deletion/delete_requests_manager_test.go

Lines changed: 127 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package deletion
22

33
import (
44
"context"
5+
"path/filepath"
56
"strings"
67
"testing"
78
"time"
@@ -13,6 +14,8 @@ import (
1314
"github.com/grafana/loki/v3/pkg/compactor/deletionmode"
1415
"github.com/grafana/loki/v3/pkg/compactor/retention"
1516
"github.com/grafana/loki/v3/pkg/logql/syntax"
17+
"github.com/grafana/loki/v3/pkg/storage/chunk/client/local"
18+
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/storage"
1619
"github.com/grafana/loki/v3/pkg/util/filter"
1720
)
1821

@@ -1047,6 +1050,10 @@ func (m *mockDeleteRequestsStore) GetDeleteRequestsByStatus(_ context.Context, s
10471050
return reqs, nil
10481051
}
10491052

1053+
func (m *mockDeleteRequestsStore) GetAllDeleteRequests(_ context.Context) ([]DeleteRequest, error) {
1054+
return m.deleteRequests, nil
1055+
}
1056+
10501057
func (m *mockDeleteRequestsStore) AddDeleteRequestGroup(_ context.Context, reqs []DeleteRequest) ([]DeleteRequest, error) {
10511058
m.addReqs = reqs
10521059
if m.returnZeroDeleteRequests {
@@ -1085,13 +1092,132 @@ func (m *mockDeleteRequestsStore) UpdateStatus(_ context.Context, req DeleteRequ
10851092
return nil
10861093
}
10871094

1095+
func (m *mockDeleteRequestsStore) MergeShardedRequests(_ context.Context, requestToAdd DeleteRequest, requestsToRemove []DeleteRequest) error {
1096+
n := 0
1097+
for i := range m.deleteRequests {
1098+
for j := range requestsToRemove {
1099+
if requestsAreEqual(m.deleteRequests[i], requestsToRemove[j]) {
1100+
continue
1101+
}
1102+
m.deleteRequests[n] = m.deleteRequests[i]
1103+
n++
1104+
break
1105+
}
1106+
}
1107+
1108+
m.deleteRequests = m.deleteRequests[:n]
1109+
m.deleteRequests = append(m.deleteRequests, requestToAdd)
1110+
1111+
return nil
1112+
}
1113+
10881114
func requestsAreEqual(req1, req2 DeleteRequest) bool {
10891115
if req1.UserID == req2.UserID &&
10901116
req1.Query == req2.Query &&
10911117
req1.StartTime == req2.StartTime &&
1092-
req1.EndTime == req2.EndTime {
1118+
req1.EndTime == req2.EndTime &&
1119+
req1.SequenceNum == req2.SequenceNum &&
1120+
req1.Status == req2.Status {
10931121
return true
10941122
}
10951123

10961124
return false
10971125
}
1126+
1127+
func TestDeleteRequestsManager_mergeShardedRequests(t *testing.T) {
1128+
for _, tc := range []struct {
1129+
name string
1130+
reqsToAdd []DeleteRequest
1131+
shouldMarkProcessed func(DeleteRequest) bool
1132+
requestsShouldBeMerged bool
1133+
}{
1134+
{
1135+
name: "no requests in store",
1136+
},
1137+
{
1138+
name: "none of the requests are processed - should not merge",
1139+
reqsToAdd: buildRequests(time.Hour, `{foo="bar"}`, user1, now.Add(-24*time.Hour), now),
1140+
shouldMarkProcessed: func(_ DeleteRequest) bool {
1141+
return false
1142+
},
1143+
},
1144+
{
1145+
name: "not all requests are processed - should not merge",
1146+
reqsToAdd: buildRequests(time.Hour, `{foo="bar"}`, user1, now.Add(-24*time.Hour), now),
1147+
shouldMarkProcessed: func(request DeleteRequest) bool {
1148+
return request.SequenceNum%2 == 0
1149+
},
1150+
},
1151+
{
1152+
name: "all the requests are processed - should merge",
1153+
reqsToAdd: buildRequests(time.Hour, `{foo="bar"}`, user1, now.Add(-24*time.Hour), now),
1154+
shouldMarkProcessed: func(_ DeleteRequest) bool {
1155+
return true
1156+
},
1157+
requestsShouldBeMerged: true,
1158+
},
1159+
{ // build requests for 2 different users and mark all requests as processed for just one of the two
1160+
name: "merging requests from one user should not touch another users requests",
1161+
reqsToAdd: append(
1162+
buildRequests(time.Hour, `{foo="bar"}`, user1, now.Add(-24*time.Hour), now),
1163+
buildRequests(time.Hour, `{foo="bar"}`, user2, now.Add(-24*time.Hour), now)...,
1164+
),
1165+
shouldMarkProcessed: func(request DeleteRequest) bool {
1166+
return request.UserID == user2
1167+
},
1168+
},
1169+
} {
1170+
t.Run(tc.name, func(t *testing.T) {
1171+
mgr := setupManager(t)
1172+
reqs, err := mgr.deleteRequestsStore.AddDeleteRequestGroup(context.Background(), tc.reqsToAdd)
1173+
require.NoError(t, err)
1174+
require.GreaterOrEqual(t, len(reqs), len(tc.reqsToAdd))
1175+
1176+
for _, req := range reqs {
1177+
if !tc.shouldMarkProcessed(req) {
1178+
continue
1179+
}
1180+
require.NoError(t, mgr.deleteRequestsStore.UpdateStatus(context.Background(), req, StatusProcessed))
1181+
}
1182+
1183+
inStoreReqs, err := mgr.deleteRequestsStore.GetAllDeleteRequestsForUser(context.Background(), user1)
1184+
require.NoError(t, err)
1185+
1186+
require.NoError(t, mgr.mergeShardedRequests(context.Background()))
1187+
inStoreReqsAfterMerging, err := mgr.deleteRequestsStore.GetAllDeleteRequestsForUser(context.Background(), user1)
1188+
require.NoError(t, err)
1189+
1190+
if tc.requestsShouldBeMerged {
1191+
require.Len(t, inStoreReqsAfterMerging, 1)
1192+
require.True(t, requestsAreEqual(inStoreReqsAfterMerging[0], DeleteRequest{
1193+
UserID: user1,
1194+
Query: tc.reqsToAdd[0].Query,
1195+
StartTime: tc.reqsToAdd[0].StartTime,
1196+
EndTime: tc.reqsToAdd[len(tc.reqsToAdd)-1].EndTime,
1197+
Status: StatusProcessed,
1198+
}))
1199+
} else {
1200+
require.Len(t, inStoreReqsAfterMerging, len(inStoreReqs))
1201+
require.Equal(t, inStoreReqs, inStoreReqsAfterMerging)
1202+
}
1203+
})
1204+
}
1205+
}
1206+
1207+
func setupManager(t *testing.T) *DeleteRequestsManager {
1208+
t.Helper()
1209+
// build the store
1210+
tempDir := t.TempDir()
1211+
1212+
workingDir := filepath.Join(tempDir, "working-dir")
1213+
objectStorePath := filepath.Join(tempDir, "object-store")
1214+
1215+
objectClient, err := local.NewFSObjectClient(local.FSConfig{
1216+
Directory: objectStorePath,
1217+
})
1218+
require.NoError(t, err)
1219+
ds, err := NewDeleteStore(workingDir, storage.NewIndexStorageClient(objectClient, ""))
1220+
require.NoError(t, err)
1221+
1222+
return NewDeleteRequestsManager(ds, time.Hour, 1, &fakeLimits{defaultLimit: limit{deletionMode: deletionmode.FilterAndDelete.String()}}, nil)
1223+
}

‎pkg/compactor/deletion/delete_requests_store.go

Lines changed: 33 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,13 @@ var ErrDeleteRequestNotFound = errors.New("could not find matching delete reques
4343
type DeleteRequestsStore interface {
4444
AddDeleteRequestGroup(ctx context.Context, req []DeleteRequest) ([]DeleteRequest, error)
4545
GetDeleteRequestsByStatus(ctx context.Context, status DeleteRequestStatus) ([]DeleteRequest, error)
46+
GetAllDeleteRequests(ctx context.Context) ([]DeleteRequest, error)
4647
GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]DeleteRequest, error)
4748
UpdateStatus(ctx context.Context, req DeleteRequest, newStatus DeleteRequestStatus) error
4849
GetDeleteRequestGroup(ctx context.Context, userID, requestID string) ([]DeleteRequest, error)
4950
RemoveDeleteRequests(ctx context.Context, req []DeleteRequest) error
5051
GetCacheGenerationNumber(ctx context.Context, userID string) (string, error)
52+
MergeShardedRequests(ctx context.Context, requestToAdd DeleteRequest, requestsToRemove []DeleteRequest) error
5153
Stop()
5254
Name() string
5355
}
@@ -99,6 +101,7 @@ func (ds *deleteRequestsStore) AddDeleteRequestGroup(ctx context.Context, reqs [
99101
results = append(results, newReq)
100102
ds.writeDeleteRequest(newReq, writeBatch)
101103
}
104+
ds.updateCacheGen(reqs[0].UserID, writeBatch)
102105

103106
if err := ds.indexClient.BatchWrite(ctx, writeBatch); err != nil {
104107
return nil, err
@@ -107,6 +110,18 @@ func (ds *deleteRequestsStore) AddDeleteRequestGroup(ctx context.Context, reqs [
107110
return results, nil
108111
}
109112

113+
func (ds *deleteRequestsStore) MergeShardedRequests(ctx context.Context, requestToAdd DeleteRequest, requestsToRemove []DeleteRequest) error {
114+
writeBatch := ds.indexClient.NewWriteBatch()
115+
116+
ds.writeDeleteRequest(requestToAdd, writeBatch)
117+
118+
for _, req := range requestsToRemove {
119+
ds.removeRequest(req, writeBatch)
120+
}
121+
122+
return ds.indexClient.BatchWrite(ctx, writeBatch)
123+
}
124+
110125
func newRequest(req DeleteRequest, requestID []byte, createdAt model.Time, seqNumber int) (DeleteRequest, error) {
111126
req.RequestID = string(requestID)
112127
req.Status = StatusReceived
@@ -124,14 +139,15 @@ func (ds *deleteRequestsStore) writeDeleteRequest(req DeleteRequest, writeBatch
124139
// Add an entry with userID, requestID, and sequence number as range key and status as value to make it easy
125140
// to manage and lookup status. We don't want to set anything in hash key here since we would want to find
126141
// delete requests by just status
127-
writeBatch.Add(DeleteRequestsTableName, string(deleteRequestID), []byte(userIDAndRequestID), []byte(StatusReceived))
142+
writeBatch.Add(DeleteRequestsTableName, string(deleteRequestID), []byte(userIDAndRequestID), []byte(req.Status))
128143

129144
// Add another entry with additional details like creation time, time range of delete request and the logQL requests in value
130-
rangeValue := fmt.Sprintf("%x:%x:%x", int64(ds.now()), int64(req.StartTime), int64(req.EndTime))
145+
rangeValue := fmt.Sprintf("%x:%x:%x", int64(req.CreatedAt), int64(req.StartTime), int64(req.EndTime))
131146
writeBatch.Add(DeleteRequestsTableName, fmt.Sprintf("%s:%s", deleteRequestDetails, userIDAndRequestID), []byte(rangeValue), []byte(req.Query))
147+
}
132148

133-
// create a gen number for this result
134-
writeBatch.Add(DeleteRequestsTableName, fmt.Sprintf("%s:%s", cacheGenNum, req.UserID), []byte{}, generateCacheGenNumber())
149+
func (ds *deleteRequestsStore) updateCacheGen(userID string, writeBatch index.WriteBatch) {
150+
writeBatch.Add(DeleteRequestsTableName, fmt.Sprintf("%s:%s", cacheGenNum, userID), []byte{}, generateCacheGenNumber())
135151
}
136152

137153
// backwardCompatibleDeleteRequestHash generates the hash key for a delete request.
@@ -172,6 +188,14 @@ func (ds *deleteRequestsStore) GetDeleteRequestsByStatus(ctx context.Context, st
172188
})
173189
}
174190

191+
// GetAllDeleteRequests returns all the delete requests.
192+
func (ds *deleteRequestsStore) GetAllDeleteRequests(ctx context.Context) ([]DeleteRequest, error) {
193+
return ds.queryDeleteRequests(ctx, index.Query{
194+
TableName: DeleteRequestsTableName,
195+
HashValue: string(deleteRequestID),
196+
})
197+
}
198+
175199
// GetAllDeleteRequestsForUser returns all delete requests for a user.
176200
func (ds *deleteRequestsStore) GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]DeleteRequest, error) {
177201
return ds.queryDeleteRequests(ctx, index.Query{
@@ -188,11 +212,6 @@ func (ds *deleteRequestsStore) UpdateStatus(ctx context.Context, req DeleteReque
188212
writeBatch := ds.indexClient.NewWriteBatch()
189213
writeBatch.Add(DeleteRequestsTableName, string(deleteRequestID), []byte(userIDAndRequestID), []byte(newStatus))
190214

191-
if newStatus == StatusProcessed {
192-
// remove runtime filtering for deleted data
193-
writeBatch.Add(DeleteRequestsTableName, fmt.Sprintf("%s:%s", cacheGenNum, req.UserID), []byte{}, generateCacheGenNumber())
194-
}
195-
196215
return ds.indexClient.BatchWrite(ctx, writeBatch)
197216
}
198217

@@ -319,20 +338,22 @@ func unmarshalDeleteRequestDetails(itr index.ReadBatchIterator, req DeleteReques
319338
return DeleteRequest{}, nil
320339
}
321340

322-
if err = requestWithDetails.SetQuery(string(itr.Value())); err != nil {
323-
return DeleteRequest{}, err
324-
}
341+
requestWithDetails.Query = string(itr.Value())
325342

326343
return requestWithDetails, nil
327344
}
328345

329346
// RemoveDeleteRequests the passed delete requests
330347
func (ds *deleteRequestsStore) RemoveDeleteRequests(ctx context.Context, reqs []DeleteRequest) error {
348+
if len(reqs) == 0 {
349+
return nil
350+
}
331351
writeBatch := ds.indexClient.NewWriteBatch()
332352

333353
for _, r := range reqs {
334354
ds.removeRequest(r, writeBatch)
335355
}
356+
ds.updateCacheGen(reqs[0].UserID, writeBatch)
336357

337358
return ds.indexClient.BatchWrite(ctx, writeBatch)
338359
}
@@ -344,9 +365,6 @@ func (ds *deleteRequestsStore) removeRequest(req DeleteRequest, writeBatch index
344365
// Add another entry with additional details like creation time, time range of delete request and selectors in value
345366
rangeValue := fmt.Sprintf("%x:%x:%x", int64(req.CreatedAt), int64(req.StartTime), int64(req.EndTime))
346367
writeBatch.Delete(DeleteRequestsTableName, fmt.Sprintf("%s:%s", deleteRequestDetails, userIDAndRequestID), []byte(rangeValue))
347-
348-
// ensure caches are invalidated
349-
writeBatch.Add(DeleteRequestsTableName, fmt.Sprintf("%s:%s", cacheGenNum, req.UserID), []byte{}, []byte(strconv.FormatInt(time.Now().UnixNano(), 10)))
350368
}
351369

352370
func (ds *deleteRequestsStore) Name() string {

‎pkg/compactor/deletion/delete_requests_store_test.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -93,13 +93,14 @@ func TestDeleteRequestsStore(t *testing.T) {
9393
require.NoError(t, err)
9494
compareRequests(t, tc.user2Requests, user2Requests)
9595

96+
// caches should not be invalidated when we mark delete request as processed
9697
updateGenNumber, err := tc.store.GetCacheGenerationNumber(context.Background(), user1)
9798
require.NoError(t, err)
98-
require.NotEqual(t, createGenNumber, updateGenNumber)
99+
require.Equal(t, createGenNumber, updateGenNumber)
99100

100101
updateGenNumber2, err := tc.store.GetCacheGenerationNumber(context.Background(), user2)
101102
require.NoError(t, err)
102-
require.NotEqual(t, createGenNumber2, updateGenNumber2)
103+
require.Equal(t, createGenNumber2, updateGenNumber2)
103104

104105
// delete the requests from the store updated previously
105106
var remainingRequests []DeleteRequest
@@ -159,7 +160,7 @@ func TestBatchCreateGet(t *testing.T) {
159160
results, err := tc.store.GetDeleteRequestGroup(context.Background(), savedRequests[0].UserID, savedRequests[0].RequestID)
160161
require.NoError(t, err)
161162

162-
require.Equal(t, savedRequests, results)
163+
compareRequests(t, savedRequests, results)
163164
})
164165

165166
t.Run("updates a single request with a new status", func(t *testing.T) {
@@ -203,7 +204,7 @@ func compareRequests(t *testing.T, expected []DeleteRequest, actual []DeleteRequ
203204
return actual[i].RequestID < actual[j].RequestID
204205
})
205206
for i, deleteRequest := range actual {
206-
require.Equal(t, expected[i], deleteRequest)
207+
require.True(t, requestsAreEqual(expected[i], deleteRequest))
207208
}
208209
}
209210

@@ -238,8 +239,6 @@ func setup(t *testing.T) *testContext {
238239

239240
// build the store
240241
tempDir := t.TempDir()
241-
//tempDir := os.TempDir()
242-
fmt.Println(tempDir)
243242

244243
workingDir := filepath.Join(tempDir, "working-dir")
245244
objectStorePath := filepath.Join(tempDir, "object-store")

0 commit comments

Comments
 (0)