From 47961f802d1d52851135e5077887aeae682381a4 Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Fri, 9 May 2025 23:36:51 +0530 Subject: [PATCH] feat: Compactor deletion manifest builder (#17474) --- pkg/compactor/deletion/delete_request.go | 22 +- .../deletion/delete_request_batch.go | 156 ++++ .../deletion/delete_request_batch_test.go | 719 ++++++++++++++++++ pkg/compactor/deletion/delete_request_test.go | 4 +- .../deletion/delete_requests_manager.go | 232 ++---- .../deletion/delete_requests_manager_test.go | 27 +- .../deletion/deletion_manifest_builder.go | 226 ++++++ .../deletion_manifest_builder_test.go | 637 ++++++++++++++++ pkg/compactor/retention/retention.go | 29 +- pkg/compactor/retention/util_test.go | 4 +- .../indexshipper/boltdb/compactor/iterator.go | 2 +- .../shipper/indexshipper/tsdb/compactor.go | 2 +- pkg/tool/audit/audit_test.go | 2 +- 13 files changed, 1871 insertions(+), 191 deletions(-) create mode 100644 pkg/compactor/deletion/delete_request_batch.go create mode 100644 pkg/compactor/deletion/delete_request_batch_test.go create mode 100644 pkg/compactor/deletion/deletion_manifest_builder.go create mode 100644 pkg/compactor/deletion/deletion_manifest_builder_test.go diff --git a/pkg/compactor/deletion/delete_request.go b/pkg/compactor/deletion/delete_request.go index 61831820f8..2ebd1ed674 100644 --- a/pkg/compactor/deletion/delete_request.go +++ b/pkg/compactor/deletion/delete_request.go @@ -106,12 +106,10 @@ func allMatch(matchers []*labels.Matcher, labels labels.Labels) bool { return true } -// IsDeleted checks if the given ChunkEntry will be deleted by this DeleteRequest. -// It returns a filter.Func if the chunk is supposed to be deleted partially or the delete request contains line filters. -// If the filter.Func is nil, the whole chunk is supposed to be deleted. -func (d *DeleteRequest) IsDeleted(userID []byte, lbls labels.Labels, chunk retention.Chunk) (bool, filter.Func) { +// IsDeleted checks if the given chunk entry would have data requested for deletion. +func (d *DeleteRequest) IsDeleted(userID []byte, lbls labels.Labels, chunk retention.Chunk) bool { if d.UserID != unsafeGetString(userID) { - return false, nil + return false } if !intervalsOverlap(model.Interval{ @@ -121,7 +119,7 @@ func (d *DeleteRequest) IsDeleted(userID []byte, lbls labels.Labels, chunk reten Start: d.StartTime, End: d.EndTime, }) { - return false, nil + return false } if d.logSelectorExpr == nil { @@ -133,11 +131,21 @@ func (d *DeleteRequest) IsDeleted(userID []byte, lbls labels.Labels, chunk reten "user", d.UserID, "err", err, ) - return false, nil + return false } } if !labels.Selector(d.matchers).Matches(lbls) { + return false + } + + return true +} + +// GetChunkFilter tells whether the chunk is covered by the DeleteRequest and +// optionally returns a filter.Func if the chunk is supposed to be deleted partially or the delete request has line filters. +func (d *DeleteRequest) GetChunkFilter(userID []byte, lbls labels.Labels, chunk retention.Chunk) (bool, filter.Func) { + if !d.IsDeleted(userID, lbls, chunk) { return false, nil } diff --git a/pkg/compactor/deletion/delete_request_batch.go b/pkg/compactor/deletion/delete_request_batch.go new file mode 100644 index 0000000000..b691dab0a3 --- /dev/null +++ b/pkg/compactor/deletion/delete_request_batch.go @@ -0,0 +1,156 @@ +package deletion + +import ( + "time" + + "github.com/go-kit/log/level" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" + + "github.com/grafana/loki/v3/pkg/compactor/retention" + "github.com/grafana/loki/v3/pkg/util/filter" + util_log "github.com/grafana/loki/v3/pkg/util/log" +) + +// deleteRequestBatch holds a batch of requests loaded for processing +type deleteRequestBatch struct { + deleteRequestsToProcess map[string]*userDeleteRequests + duplicateRequests []DeleteRequest + count int + metrics *deleteRequestsManagerMetrics +} + +func newDeleteRequestBatch(metrics *deleteRequestsManagerMetrics) *deleteRequestBatch { + return &deleteRequestBatch{ + deleteRequestsToProcess: map[string]*userDeleteRequests{}, + metrics: metrics, + } +} + +func (b *deleteRequestBatch) reset() { + b.deleteRequestsToProcess = map[string]*userDeleteRequests{} + b.duplicateRequests = []DeleteRequest{} + b.count = 0 +} + +func (b *deleteRequestBatch) requestCount() int { + return b.count +} + +// addDeleteRequest add a requests to the batch +func (b *deleteRequestBatch) addDeleteRequest(dr *DeleteRequest) { + dr.Metrics = b.metrics + ur, ok := b.deleteRequestsToProcess[dr.UserID] + if !ok { + ur = &userDeleteRequests{ + requestsInterval: model.Interval{ + Start: dr.StartTime, + End: dr.EndTime, + }, + } + b.deleteRequestsToProcess[dr.UserID] = ur + } + + ur.requests = append(ur.requests, dr) + if dr.StartTime < ur.requestsInterval.Start { + ur.requestsInterval.Start = dr.StartTime + } + if dr.EndTime > ur.requestsInterval.End { + ur.requestsInterval.End = dr.EndTime + } + b.count++ +} + +func (b *deleteRequestBatch) checkDuplicate(deleteRequest DeleteRequest) error { + ur, ok := b.deleteRequestsToProcess[deleteRequest.UserID] + if !ok { + return nil + } + for _, requestLoadedForProcessing := range ur.requests { + isDuplicate, err := requestLoadedForProcessing.IsDuplicate(&deleteRequest) + if err != nil { + return err + } + if isDuplicate { + level.Info(util_log.Logger).Log( + "msg", "found duplicate request of one of the requests loaded for processing", + "loaded_request_id", requestLoadedForProcessing.RequestID, + "duplicate_request_id", deleteRequest.RequestID, + "user", deleteRequest.UserID, + ) + b.duplicateRequests = append(b.duplicateRequests, deleteRequest) + } + } + + return nil +} + +func (b *deleteRequestBatch) expired(userID []byte, chk retention.Chunk, lbls labels.Labels, skipRequest func(*DeleteRequest) bool) (bool, filter.Func) { + userIDStr := unsafeGetString(userID) + if b.deleteRequestsToProcess[userIDStr] == nil || !intervalsOverlap(b.deleteRequestsToProcess[userIDStr].requestsInterval, model.Interval{ + Start: chk.From, + End: chk.Through, + }) { + return false, nil + } + + var filterFuncs []filter.Func + + for _, deleteRequest := range b.deleteRequestsToProcess[userIDStr].requests { + if skipRequest(deleteRequest) { + continue + } + isDeleted, ff := deleteRequest.GetChunkFilter(userID, lbls, chk) + if !isDeleted { + continue + } + + if ff == nil { + level.Info(util_log.Logger).Log( + "msg", "no chunks to retain: the whole chunk is deleted", + "delete_request_id", deleteRequest.RequestID, + "sequence_num", deleteRequest.SequenceNum, + "user", deleteRequest.UserID, + "chunkID", string(chk.ChunkID), + ) + b.metrics.deleteRequestsChunksSelectedTotal.WithLabelValues(string(userID)).Inc() + return true, nil + } + filterFuncs = append(filterFuncs, ff) + } + + if len(filterFuncs) == 0 { + return false, nil + } + + b.metrics.deleteRequestsChunksSelectedTotal.WithLabelValues(string(userID)).Inc() + return true, func(ts time.Time, s string, structuredMetadata labels.Labels) bool { + for _, ff := range filterFuncs { + if ff(ts, s, structuredMetadata) { + return true + } + } + + return false + } +} + +func (b *deleteRequestBatch) intervalMayHaveExpiredChunks(userID string) bool { + // We can't do the overlap check between the passed interval and delete requests interval from a user because + // if a request is issued just for today and there are chunks spanning today and yesterday then + // the overlap check would skip processing yesterday's index which would result in the index pointing to deleted chunks. + if userID != "" { + return b.deleteRequestsToProcess[userID] != nil + } + + return len(b.deleteRequestsToProcess) != 0 +} + +func (b *deleteRequestBatch) getAllRequestsForUser(userID string) []*DeleteRequest { + userRequests, ok := b.deleteRequestsToProcess[userID] + if !ok { + return nil + } + + return userRequests.requests +} diff --git a/pkg/compactor/deletion/delete_request_batch_test.go b/pkg/compactor/deletion/delete_request_batch_test.go new file mode 100644 index 0000000000..199829c0f7 --- /dev/null +++ b/pkg/compactor/deletion/delete_request_batch_test.go @@ -0,0 +1,719 @@ +package deletion + +import ( + "strings" + "testing" + "time" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/v3/pkg/compactor/retention" + "github.com/grafana/loki/v3/pkg/logql/syntax" + "github.com/grafana/loki/v3/pkg/util/filter" +) + +func TestDeleteRequestBatch_Expired(t *testing.T) { + type resp struct { + isExpired bool + expectedFilter filter.Func + } + + now := model.Now() + lblFoo, err := syntax.ParseLabels(`{foo="bar"}`) + require.NoError(t, err) + streamSelectorWithLineFilters := lblFoo.String() + `|="fizz"` + streamSelectorWithStructuredMetadataFilters := lblFoo.String() + `| ping="pong"` + streamSelectorWithLineAndStructuredMetadataFilters := lblFoo.String() + `| ping="pong" |= "fizz"` + + chunkEntry := retention.Chunk{ + From: now.Add(-12 * time.Hour), + Through: now.Add(-time.Hour), + } + + for _, tc := range []struct { + name string + deleteRequests []DeleteRequest + expectedResp resp + expectedDeletionRangeByUser map[string]model.Interval + }{ + { + name: "no delete requests", + expectedResp: resp{ + isExpired: false, + }, + }, + { + name: "no relevant delete requests", + deleteRequests: []DeleteRequest{ + { + UserID: "different-user", + Query: lblFoo.String(), + StartTime: now.Add(-24 * time.Hour), + EndTime: now, + Status: StatusReceived, + }, + }, + expectedResp: resp{ + isExpired: false, + }, + expectedDeletionRangeByUser: map[string]model.Interval{ + "different-user": { + Start: now.Add(-24 * time.Hour), + End: now, + }, + }, + }, + { + name: "no relevant delete requests", + deleteRequests: []DeleteRequest{ + { + UserID: "different-user", + Query: lblFoo.String(), + StartTime: now.Add(-24 * time.Hour), + EndTime: now, + Status: StatusReceived, + }, + }, + expectedResp: resp{ + isExpired: false, + }, + expectedDeletionRangeByUser: map[string]model.Interval{ + "different-user": { + Start: now.Add(-24 * time.Hour), + End: now, + }, + }, + }, + { + name: "delete request not matching labels", + deleteRequests: []DeleteRequest{ + { + UserID: testUserID, + Query: `{fizz="buzz"}`, + StartTime: now.Add(-24 * time.Hour), + EndTime: now, + Status: StatusReceived, + }, + }, + expectedResp: resp{ + isExpired: false, + }, + expectedDeletionRangeByUser: map[string]model.Interval{ + testUserID: { + Start: now.Add(-24 * time.Hour), + End: now, + }, + }, + }, + { + name: "whole chunk deleted by single request", + deleteRequests: []DeleteRequest{ + { + UserID: testUserID, + Query: lblFoo.String(), + StartTime: now.Add(-24 * time.Hour), + EndTime: now, + Status: StatusReceived, + }, + }, + expectedResp: resp{ + isExpired: true, + }, + expectedDeletionRangeByUser: map[string]model.Interval{ + testUserID: { + Start: now.Add(-24 * time.Hour), + End: now, + }, + }, + }, + { + name: "whole chunk deleted by single request with line filters", + deleteRequests: []DeleteRequest{ + { + UserID: testUserID, + Query: streamSelectorWithLineFilters, + StartTime: now.Add(-24 * time.Hour), + EndTime: now, + Status: StatusReceived, + }, + }, + expectedResp: resp{ + isExpired: true, + expectedFilter: func(_ time.Time, s string, _ labels.Labels) bool { + return strings.Contains(s, "fizz") + }, + }, + expectedDeletionRangeByUser: map[string]model.Interval{ + testUserID: { + Start: now.Add(-24 * time.Hour), + End: now, + }, + }, + }, + { + name: "whole chunk deleted by single request with structured metadata filters", + deleteRequests: []DeleteRequest{ + { + UserID: testUserID, + Query: streamSelectorWithStructuredMetadataFilters, + StartTime: now.Add(-24 * time.Hour), + EndTime: now, + Status: StatusReceived, + }, + }, + expectedResp: resp{ + isExpired: true, + expectedFilter: func(_ time.Time, _ string, structuredMetadata labels.Labels) bool { + return structuredMetadata.Get(lblPing) == lblPong + }, + }, + expectedDeletionRangeByUser: map[string]model.Interval{ + testUserID: { + Start: now.Add(-24 * time.Hour), + End: now, + }, + }, + }, + { + name: "whole chunk deleted by single request with line and structured metadata filters", + deleteRequests: []DeleteRequest{ + { + UserID: testUserID, + Query: streamSelectorWithLineAndStructuredMetadataFilters, + StartTime: now.Add(-24 * time.Hour), + EndTime: now, + Status: StatusReceived, + }, + }, + expectedResp: resp{ + isExpired: true, + expectedFilter: func(_ time.Time, s string, structuredMetadata labels.Labels) bool { + return structuredMetadata.Get(lblPing) == lblPong && strings.Contains(s, "fizz") + }, + }, + expectedDeletionRangeByUser: map[string]model.Interval{ + testUserID: { + Start: now.Add(-24 * time.Hour), + End: now, + }, + }, + }, + { + name: "deleted interval out of range", + deleteRequests: []DeleteRequest{ + { + UserID: testUserID, + Query: lblFoo.String(), + StartTime: now.Add(-48 * time.Hour), + EndTime: now.Add(-24 * time.Hour), + Status: StatusReceived, + }, + }, + expectedResp: resp{ + isExpired: false, + }, + expectedDeletionRangeByUser: map[string]model.Interval{ + testUserID: { + Start: now.Add(-48 * time.Hour), + End: now.Add(-24 * time.Hour), + }, + }, + }, + { + name: "deleted interval out of range(with multiple user requests)", + deleteRequests: []DeleteRequest{ + { + UserID: testUserID, + Query: lblFoo.String(), + StartTime: now.Add(-48 * time.Hour), + EndTime: now.Add(-24 * time.Hour), + Status: StatusReceived, + }, + { + UserID: "different-user", + Query: lblFoo.String(), + StartTime: now.Add(-24 * time.Hour), + EndTime: now, + Status: StatusReceived, + }, + }, + expectedResp: resp{ + isExpired: false, + }, + expectedDeletionRangeByUser: map[string]model.Interval{ + testUserID: { + Start: now.Add(-48 * time.Hour), + End: now.Add(-24 * time.Hour), + }, + "different-user": { + Start: now.Add(-24 * time.Hour), + End: now, + }, + }, + }, + { + name: "multiple delete requests with one deleting the whole chunk", + deleteRequests: []DeleteRequest{ + { + UserID: testUserID, + Query: lblFoo.String(), + StartTime: now.Add(-48 * time.Hour), + EndTime: now.Add(-24 * time.Hour), + Status: StatusReceived, + }, + { + UserID: testUserID, + Query: lblFoo.String(), + StartTime: now.Add(-12 * time.Hour), + EndTime: now, + Status: StatusReceived, + }, + }, + expectedResp: resp{ + isExpired: true, + }, + expectedDeletionRangeByUser: map[string]model.Interval{ + testUserID: { + Start: now.Add(-48 * time.Hour), + End: now, + }, + }, + }, + { + name: "multiple delete requests with line filters and one deleting the whole chunk", + deleteRequests: []DeleteRequest{ + { + UserID: testUserID, + Query: streamSelectorWithLineFilters, + StartTime: now.Add(-48 * time.Hour), + EndTime: now.Add(-24 * time.Hour), + Status: StatusReceived, + }, + { + UserID: testUserID, + Query: streamSelectorWithLineFilters, + StartTime: now.Add(-12 * time.Hour), + EndTime: now, + Status: StatusReceived, + }, + }, + expectedResp: resp{ + isExpired: true, + expectedFilter: func(_ time.Time, s string, _ labels.Labels) bool { + return strings.Contains(s, "fizz") + }, + }, + expectedDeletionRangeByUser: map[string]model.Interval{ + testUserID: { + Start: now.Add(-48 * time.Hour), + End: now, + }, + }, + }, + { + name: "multiple delete requests with structured metadata filters and one deleting the whole chunk", + deleteRequests: []DeleteRequest{ + { + UserID: testUserID, + Query: streamSelectorWithStructuredMetadataFilters, + StartTime: now.Add(-48 * time.Hour), + EndTime: now.Add(-24 * time.Hour), + Status: StatusReceived, + }, + { + UserID: testUserID, + Query: streamSelectorWithStructuredMetadataFilters, + StartTime: now.Add(-12 * time.Hour), + EndTime: now, + Status: StatusReceived, + }, + }, + expectedResp: resp{ + isExpired: true, + expectedFilter: func(_ time.Time, _ string, structuredMetadata labels.Labels) bool { + return structuredMetadata.Get(lblPing) == lblPong + }, + }, + expectedDeletionRangeByUser: map[string]model.Interval{ + testUserID: { + Start: now.Add(-48 * time.Hour), + End: now, + }, + }, + }, + { + name: "multiple delete requests causing multiple holes", + deleteRequests: []DeleteRequest{ + { + UserID: testUserID, + Query: lblFoo.String(), + StartTime: now.Add(-13 * time.Hour), + EndTime: now.Add(-11 * time.Hour), + Status: StatusReceived, + }, + { + UserID: testUserID, + Query: lblFoo.String(), + StartTime: now.Add(-10 * time.Hour), + EndTime: now.Add(-8 * time.Hour), + Status: StatusReceived, + }, + { + UserID: testUserID, + Query: lblFoo.String(), + StartTime: now.Add(-6 * time.Hour), + EndTime: now.Add(-5 * time.Hour), + Status: StatusReceived, + }, + { + UserID: testUserID, + Query: lblFoo.String(), + StartTime: now.Add(-2 * time.Hour), + EndTime: now, + Status: StatusReceived, + }, + }, + expectedResp: resp{ + isExpired: true, + expectedFilter: func(ts time.Time, _ string, _ labels.Labels) bool { + tsUnixNano := ts.UnixNano() + if (now.Add(-13*time.Hour).UnixNano() <= tsUnixNano && tsUnixNano <= now.Add(-11*time.Hour).UnixNano()) || + (now.Add(-10*time.Hour).UnixNano() <= tsUnixNano && tsUnixNano <= now.Add(-8*time.Hour).UnixNano()) || + (now.Add(-6*time.Hour).UnixNano() <= tsUnixNano && tsUnixNano <= now.Add(-5*time.Hour).UnixNano()) || + (now.Add(-2*time.Hour).UnixNano() <= tsUnixNano && tsUnixNano <= now.UnixNano()) { + return true + } + return false + }, + }, + expectedDeletionRangeByUser: map[string]model.Interval{ + testUserID: { + Start: now.Add(-13 * time.Hour), + End: now, + }, + }, + }, + { + name: "multiple overlapping requests deleting the whole chunk", + deleteRequests: []DeleteRequest{ + { + UserID: testUserID, + Query: lblFoo.String(), + StartTime: now.Add(-13 * time.Hour), + EndTime: now.Add(-6 * time.Hour), + Status: StatusReceived, + }, + { + UserID: testUserID, + Query: lblFoo.String(), + StartTime: now.Add(-8 * time.Hour), + EndTime: now, + Status: StatusReceived, + }, + }, + expectedResp: resp{ + isExpired: true, + expectedFilter: func(_ time.Time, _ string, _ labels.Labels) bool { + return true + }, + }, + expectedDeletionRangeByUser: map[string]model.Interval{ + testUserID: { + Start: now.Add(-13 * time.Hour), + End: now, + }, + }, + }, + { + name: "multiple overlapping requests with line filters deleting the whole chunk", + deleteRequests: []DeleteRequest{ + { + UserID: testUserID, + Query: streamSelectorWithLineFilters, + StartTime: now.Add(-13 * time.Hour), + EndTime: now.Add(-6 * time.Hour), + Status: StatusReceived, + }, + { + UserID: testUserID, + Query: streamSelectorWithLineFilters, + StartTime: now.Add(-8 * time.Hour), + EndTime: now, + Status: StatusReceived, + }, + }, + expectedResp: resp{ + isExpired: true, + expectedFilter: func(_ time.Time, s string, _ labels.Labels) bool { + return strings.Contains(s, "fizz") + }, + }, + expectedDeletionRangeByUser: map[string]model.Interval{ + testUserID: { + Start: now.Add(-13 * time.Hour), + End: now, + }, + }, + }, + { + name: "multiple overlapping requests with structured metadata filters deleting the whole chunk", + deleteRequests: []DeleteRequest{ + { + UserID: testUserID, + Query: streamSelectorWithStructuredMetadataFilters, + StartTime: now.Add(-13 * time.Hour), + EndTime: now.Add(-6 * time.Hour), + Status: StatusReceived, + }, + { + UserID: testUserID, + Query: streamSelectorWithStructuredMetadataFilters, + StartTime: now.Add(-8 * time.Hour), + EndTime: now, + Status: StatusReceived, + }, + }, + expectedResp: resp{ + isExpired: true, + expectedFilter: func(_ time.Time, _ string, structuredMetadata labels.Labels) bool { + return structuredMetadata.Get(lblPing) == lblPong + }, + }, + expectedDeletionRangeByUser: map[string]model.Interval{ + testUserID: { + Start: now.Add(-13 * time.Hour), + End: now, + }, + }, + }, + { + name: "multiple non-overlapping requests deleting the whole chunk", + deleteRequests: []DeleteRequest{ + { + UserID: testUserID, + Query: lblFoo.String(), + StartTime: now.Add(-12 * time.Hour), + EndTime: now.Add(-6*time.Hour) - 1, + Status: StatusReceived, + }, + { + UserID: testUserID, + Query: lblFoo.String(), + StartTime: now.Add(-6 * time.Hour), + EndTime: now.Add(-4*time.Hour) - 1, + Status: StatusReceived, + }, + { + UserID: testUserID, + Query: lblFoo.String(), + StartTime: now.Add(-4 * time.Hour), + EndTime: now, + Status: StatusReceived, + }, + }, + expectedResp: resp{ + isExpired: true, + expectedFilter: func(_ time.Time, _ string, _ labels.Labels) bool { + return true + }, + }, + expectedDeletionRangeByUser: map[string]model.Interval{ + testUserID: { + Start: now.Add(-12 * time.Hour), + End: now, + }, + }, + }, + { + name: "multiple non-overlapping requests with line filter deleting the whole chunk", + deleteRequests: []DeleteRequest{ + { + UserID: testUserID, + Query: streamSelectorWithLineFilters, + StartTime: now.Add(-12 * time.Hour), + EndTime: now.Add(-6*time.Hour) - 1, + Status: StatusReceived, + }, + { + UserID: testUserID, + Query: streamSelectorWithLineFilters, + StartTime: now.Add(-6 * time.Hour), + EndTime: now.Add(-4*time.Hour) - 1, + Status: StatusReceived, + }, + { + UserID: testUserID, + Query: streamSelectorWithLineFilters, + StartTime: now.Add(-4 * time.Hour), + EndTime: now, + Status: StatusReceived, + }, + }, + expectedResp: resp{ + isExpired: true, + expectedFilter: func(_ time.Time, s string, _ labels.Labels) bool { + return strings.Contains(s, "fizz") + }, + }, + expectedDeletionRangeByUser: map[string]model.Interval{ + testUserID: { + Start: now.Add(-12 * time.Hour), + End: now, + }, + }, + }, + { + name: "multiple non-overlapping requests with structured metadata filter deleting the whole chunk", + deleteRequests: []DeleteRequest{ + { + UserID: testUserID, + Query: streamSelectorWithStructuredMetadataFilters, + StartTime: now.Add(-12 * time.Hour), + EndTime: now.Add(-6*time.Hour) - 1, + Status: StatusReceived, + }, + { + UserID: testUserID, + Query: streamSelectorWithStructuredMetadataFilters, + StartTime: now.Add(-6 * time.Hour), + EndTime: now.Add(-4*time.Hour) - 1, + Status: StatusReceived, + }, + { + UserID: testUserID, + Query: streamSelectorWithStructuredMetadataFilters, + StartTime: now.Add(-4 * time.Hour), + EndTime: now, + Status: StatusReceived, + }, + }, + expectedResp: resp{ + isExpired: true, + expectedFilter: func(_ time.Time, _ string, structuredMetadata labels.Labels) bool { + return structuredMetadata.Get(lblPing) == lblPong + }, + }, + expectedDeletionRangeByUser: map[string]model.Interval{ + testUserID: { + Start: now.Add(-12 * time.Hour), + End: now, + }, + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + batch := newDeleteRequestBatch(newDeleteRequestsManagerMetrics(nil)) + for _, req := range tc.deleteRequests { + batch.addDeleteRequest(&req) + } + + for _, deleteRequests := range batch.deleteRequestsToProcess { + for _, dr := range deleteRequests.requests { + require.EqualValues(t, 0, dr.DeletedLines) + } + } + + isExpired, filterFunc := batch.expired([]byte(testUserID), chunkEntry, lblFoo, func(_ *DeleteRequest) bool { + return false + }) + require.Equal(t, tc.expectedResp.isExpired, isExpired) + if tc.expectedResp.expectedFilter == nil { + require.Nil(t, filterFunc) + } else { + require.NotNil(t, filterFunc) + + for start := chunkEntry.From; start <= chunkEntry.Through; start = start.Add(time.Minute) { + line := "foo bar" + if start.Time().Minute()%2 == 1 { + line = "fizz buzz" + } + // mix of empty, ding=dong and ping=pong as structured metadata + var structuredMetadata labels.Labels + if start.Time().Minute()%3 == 0 { + structuredMetadata = labels.FromStrings(lblPing, lblPong) + } else if start.Time().Minute()%2 == 0 { + structuredMetadata = labels.FromStrings("ting", "tong") + } + require.Equal(t, tc.expectedResp.expectedFilter(start.Time(), line, structuredMetadata), filterFunc(start.Time(), line, structuredMetadata), "line", line, "time", start.Time(), "now", now.Time()) + } + + require.Equal(t, len(tc.expectedDeletionRangeByUser), len(batch.deleteRequestsToProcess)) + for userID, dr := range tc.expectedDeletionRangeByUser { + require.Equal(t, dr, batch.deleteRequestsToProcess[userID].requestsInterval) + } + } + }) + } +} + +func TestDeleteRequestBatch_IntervalMayHaveExpiredChunks(t *testing.T) { + tests := []struct { + name string + deleteRequests map[string]*userDeleteRequests + userID string + expected bool + }{ + { + name: "no delete requests", + deleteRequests: map[string]*userDeleteRequests{}, + userID: "test-user", + expected: false, + }, + { + name: "has delete requests for user", + deleteRequests: map[string]*userDeleteRequests{ + "test-user": { + requests: []*DeleteRequest{ + { + UserID: "test-user", + }, + }, + }, + }, + userID: "test-user", + expected: true, + }, + { + name: "has delete requests but not for user", + deleteRequests: map[string]*userDeleteRequests{ + "other-user": { + requests: []*DeleteRequest{ + { + UserID: "other-user", + }, + }, + }, + }, + userID: "test-user", + expected: false, + }, + { + name: "check for all users", + deleteRequests: map[string]*userDeleteRequests{ + "test-user": { + requests: []*DeleteRequest{ + { + UserID: "test-user", + }, + }, + }, + }, + userID: "", + expected: true, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + batch := &deleteRequestBatch{ + deleteRequestsToProcess: tc.deleteRequests, + metrics: &deleteRequestsManagerMetrics{}, + } + + result := batch.intervalMayHaveExpiredChunks(tc.userID) + require.Equal(t, tc.expected, result) + }) + } +} diff --git a/pkg/compactor/deletion/delete_request_test.go b/pkg/compactor/deletion/delete_request_test.go index 897e0a5af5..a19818a180 100644 --- a/pkg/compactor/deletion/delete_request_test.go +++ b/pkg/compactor/deletion/delete_request_test.go @@ -23,7 +23,7 @@ const ( lblPong = "pong" ) -func TestDeleteRequest_IsDeleted(t *testing.T) { +func TestDeleteRequest_GetChunkFilter(t *testing.T) { now := model.Now() user1 := "user1" @@ -271,7 +271,7 @@ func TestDeleteRequest_IsDeleted(t *testing.T) { t.Run(tc.name, func(t *testing.T) { require.NoError(t, tc.deleteRequest.SetQuery(tc.deleteRequest.Query)) tc.deleteRequest.Metrics = newDeleteRequestsManagerMetrics(nil) - isExpired, filterFunc := tc.deleteRequest.IsDeleted([]byte(user1), mustParseLabel(lbl), chunkEntry) + isExpired, filterFunc := tc.deleteRequest.GetChunkFilter([]byte(user1), mustParseLabel(lbl), chunkEntry) require.Equal(t, tc.expectedResp.isDeleted, isExpired) if tc.expectedResp.expectedFilter == nil { require.Nil(t, filterFunc) diff --git a/pkg/compactor/deletion/delete_requests_manager.go b/pkg/compactor/deletion/delete_requests_manager.go index c272cfe4a1..f949680fa7 100644 --- a/pkg/compactor/deletion/delete_requests_manager.go +++ b/pkg/compactor/deletion/delete_requests_manager.go @@ -22,11 +22,17 @@ import ( util_log "github.com/grafana/loki/v3/pkg/util/log" ) +type DeleteRequestsKind string + const ( statusSuccess = "success" statusFail = "fail" seriesProgressFilename = "series_progress.json" + + DeleteRequestsWithLineFilters DeleteRequestsKind = "DeleteRequestsWithLineFilters" + DeleteRequestsWithoutLineFilters DeleteRequestsKind = "DeleteRequestsWithoutLineFilters" + DeleteRequestsAll DeleteRequestsKind = "DeleteRequestsAll" ) type userDeleteRequests struct { @@ -40,24 +46,23 @@ type DeleteRequestsManager struct { deleteRequestsStore DeleteRequestsStore deleteRequestCancelPeriod time.Duration - deleteRequestsToProcess map[string]*userDeleteRequests - deleteRequestsToProcessMtx sync.Mutex - duplicateRequests []DeleteRequest - metrics *deleteRequestsManagerMetrics - wg sync.WaitGroup - done chan struct{} - batchSize int - limits Limits - processedSeries map[string]struct{} + metrics *deleteRequestsManagerMetrics + wg sync.WaitGroup + done chan struct{} + batchSize int + limits Limits + currentBatch *deleteRequestBatch + processedSeries map[string]struct{} + processedSeriesMtx sync.RWMutex } func NewDeleteRequestsManager(workingDir string, store DeleteRequestsStore, deleteRequestCancelPeriod time.Duration, batchSize int, limits Limits, registerer prometheus.Registerer) (*DeleteRequestsManager, error) { + metrics := newDeleteRequestsManagerMetrics(registerer) dm := &DeleteRequestsManager{ workingDir: workingDir, deleteRequestsStore: store, deleteRequestCancelPeriod: deleteRequestCancelPeriod, - deleteRequestsToProcess: map[string]*userDeleteRequests{}, - metrics: newDeleteRequestsManagerMetrics(registerer), + metrics: metrics, done: make(chan struct{}), batchSize: batchSize, limits: limits, @@ -111,8 +116,8 @@ func (d *DeleteRequestsManager) Stop() { } func (d *DeleteRequestsManager) storeSeriesProgress() error { - d.deleteRequestsToProcessMtx.Lock() - defer d.deleteRequestsToProcessMtx.Unlock() + d.processedSeriesMtx.RLock() + defer d.processedSeriesMtx.RUnlock() if len(d.processedSeries) == 0 { return nil @@ -173,21 +178,29 @@ func (d *DeleteRequestsManager) updateMetrics() error { return nil } -func (d *DeleteRequestsManager) loadDeleteRequestsToProcess() error { - d.deleteRequestsToProcessMtx.Lock() - defer d.deleteRequestsToProcessMtx.Unlock() - - // Reset this first so any errors result in a clear map - d.deleteRequestsToProcess = map[string]*userDeleteRequests{} +func (d *DeleteRequestsManager) loadDeleteRequestsToProcess(kind DeleteRequestsKind) (*deleteRequestBatch, error) { + batch := newDeleteRequestBatch(d.metrics) deleteRequests, err := d.filteredSortedDeleteRequests() if err != nil { - return err + return nil, err } reqCount := 0 for i := range deleteRequests { deleteRequest := deleteRequests[i] + + if deleteRequest.logSelectorExpr == nil { + err := deleteRequest.SetQuery(deleteRequest.Query) + if err != nil { + return nil, errors.Wrapf(err, "failed to init log selector expr for request_id=%s, user_id=%s", deleteRequest.RequestID, deleteRequest.UserID) + } + } + if kind == DeleteRequestsWithLineFilters && !deleteRequest.logSelectorExpr.HasFilter() { + continue + } else if kind == DeleteRequestsWithoutLineFilters && deleteRequest.logSelectorExpr.HasFilter() { + continue + } maxRetentionInterval := getMaxRetentionInterval(deleteRequest.UserID, d.limits) // retention interval 0 means retain the data forever if maxRetentionInterval != 0 { @@ -202,55 +215,25 @@ func (d *DeleteRequestsManager) loadDeleteRequestsToProcess() error { continue } } - if ur, ok := d.deleteRequestsToProcess[deleteRequest.UserID]; ok { - for _, requestLoadedForProcessing := range ur.requests { - isDuplicate, err := requestLoadedForProcessing.IsDuplicate(&deleteRequest) - if err != nil { - return err - } - if isDuplicate { - level.Info(util_log.Logger).Log( - "msg", "found duplicate request of one of the requests loaded for processing", - "loaded_request_id", requestLoadedForProcessing.RequestID, - "duplicate_request_id", deleteRequest.RequestID, - "user", deleteRequest.UserID, - ) - d.duplicateRequests = append(d.duplicateRequests, deleteRequest) - } - } + if err := batch.checkDuplicate(deleteRequest); err != nil { + return nil, err } if reqCount >= d.batchSize { logBatchTruncation(reqCount, len(deleteRequests)) break } - if deleteRequest.logSelectorExpr == nil { - err := deleteRequest.SetQuery(deleteRequest.Query) - if err != nil { - return errors.Wrapf(err, "failed to init log selector expr for request_id=%s, user_id=%s", deleteRequest.RequestID, deleteRequest.UserID) - } - } - level.Info(util_log.Logger).Log( "msg", "Started processing delete request for user", "delete_request_id", deleteRequest.RequestID, "user", deleteRequest.UserID, ) - deleteRequest.Metrics = d.metrics - - ur := d.requestsForUser(deleteRequest) - ur.requests = append(ur.requests, &deleteRequest) - if deleteRequest.StartTime < ur.requestsInterval.Start { - ur.requestsInterval.Start = deleteRequest.StartTime - } - if deleteRequest.EndTime > ur.requestsInterval.End { - ur.requestsInterval.End = deleteRequest.EndTime - } + batch.addDeleteRequest(&deleteRequest) reqCount++ } - return nil + return batch, nil } func (d *DeleteRequestsManager) filteredSortedDeleteRequests() ([]DeleteRequest, error) { @@ -294,20 +277,6 @@ func (d *DeleteRequestsManager) filteredRequests(reqs []DeleteRequest) ([]Delete return filtered, nil } -func (d *DeleteRequestsManager) requestsForUser(dr DeleteRequest) *userDeleteRequests { - ur, ok := d.deleteRequestsToProcess[dr.UserID] - if !ok { - ur = &userDeleteRequests{ - requestsInterval: model.Interval{ - Start: dr.StartTime, - End: dr.EndTime, - }, - } - d.deleteRequestsToProcess[dr.UserID] = ur - } - return ur -} - func logBatchTruncation(size, total int) { if size < total { level.Info(util_log.Logger).Log( @@ -330,16 +299,12 @@ func (d *DeleteRequestsManager) shouldProcessRequest(dr DeleteRequest) (bool, er } func (d *DeleteRequestsManager) CanSkipSeries(userID []byte, lbls labels.Labels, seriesID []byte, _ model.Time, tableName string, _ model.Time) bool { - userIDStr := unsafeGetString(userID) - - d.deleteRequestsToProcessMtx.Lock() - defer d.deleteRequestsToProcessMtx.Unlock() + d.processedSeriesMtx.RLock() + defer d.processedSeriesMtx.RUnlock() - if d.deleteRequestsToProcess[userIDStr] == nil { - return true - } + userIDStr := unsafeGetString(userID) - for _, deleteRequest := range d.deleteRequestsToProcess[userIDStr].requests { + for _, deleteRequest := range d.currentBatch.getAllRequestsForUser(userIDStr) { // if the delete request does not touch the series, continue looking for other matching requests if !labels.Selector(deleteRequest.matchers).Matches(lbls) { continue @@ -355,81 +320,35 @@ func (d *DeleteRequestsManager) CanSkipSeries(userID []byte, lbls labels.Labels, } func (d *DeleteRequestsManager) Expired(userID []byte, chk retention.Chunk, lbls labels.Labels, seriesID []byte, tableName string, _ model.Time) (bool, filter.Func) { - d.deleteRequestsToProcessMtx.Lock() - defer d.deleteRequestsToProcessMtx.Unlock() - - userIDStr := unsafeGetString(userID) - if d.deleteRequestsToProcess[userIDStr] == nil || !intervalsOverlap(d.deleteRequestsToProcess[userIDStr].requestsInterval, model.Interval{ - Start: chk.From, - End: chk.Through, - }) { - return false, nil - } - - var filterFuncs []filter.Func - - for _, deleteRequest := range d.deleteRequestsToProcess[userIDStr].requests { - if _, ok := d.processedSeries[buildProcessedSeriesKey(deleteRequest.RequestID, deleteRequest.StartTime, deleteRequest.EndTime, seriesID, tableName)]; ok { - continue - } - isDeleted, ff := deleteRequest.IsDeleted(userID, lbls, chk) - if !isDeleted { - continue - } - - if ff == nil { - level.Info(util_log.Logger).Log( - "msg", "no chunks to retain: the whole chunk is deleted", - "delete_request_id", deleteRequest.RequestID, - "sequence_num", deleteRequest.SequenceNum, - "user", deleteRequest.UserID, - "chunkID", string(chk.ChunkID), - ) - d.metrics.deleteRequestsChunksSelectedTotal.WithLabelValues(string(userID)).Inc() - return true, nil - } - filterFuncs = append(filterFuncs, ff) - } - - if len(filterFuncs) == 0 { - return false, nil - } + return d.currentBatch.expired(userID, chk, lbls, func(request *DeleteRequest) bool { + d.processedSeriesMtx.RLock() + defer d.processedSeriesMtx.RUnlock() - d.metrics.deleteRequestsChunksSelectedTotal.WithLabelValues(string(userID)).Inc() - return true, func(ts time.Time, s string, structuredMetadata labels.Labels) bool { - for _, ff := range filterFuncs { - if ff(ts, s, structuredMetadata) { - return true - } - } - - return false - } + _, ok := d.processedSeries[buildProcessedSeriesKey(request.RequestID, request.StartTime, request.EndTime, seriesID, tableName)] + return ok + }) } func (d *DeleteRequestsManager) MarkPhaseStarted() { status := statusSuccess - if err := d.loadDeleteRequestsToProcess(); err != nil { + if batch, err := d.loadDeleteRequestsToProcess(DeleteRequestsAll); err != nil { status = statusFail + d.currentBatch = nil level.Error(util_log.Logger).Log("msg", "failed to load delete requests to process", "err", err) + } else { + d.currentBatch = batch } d.metrics.loadPendingRequestsAttemptsTotal.WithLabelValues(status).Inc() } func (d *DeleteRequestsManager) MarkPhaseFailed() { - d.deleteRequestsToProcessMtx.Lock() - defer d.deleteRequestsToProcessMtx.Unlock() - + d.currentBatch.reset() d.metrics.deletionFailures.WithLabelValues("error").Inc() - d.deleteRequestsToProcess = map[string]*userDeleteRequests{} } func (d *DeleteRequestsManager) MarkPhaseTimedOut() { - d.deleteRequestsToProcessMtx.Lock() - defer d.deleteRequestsToProcessMtx.Unlock() - + d.currentBatch.reset() d.metrics.deletionFailures.WithLabelValues("timeout").Inc() - d.deleteRequestsToProcess = map[string]*userDeleteRequests{} } func (d *DeleteRequestsManager) markRequestAsProcessed(deleteRequest DeleteRequest) { @@ -455,10 +374,11 @@ func (d *DeleteRequestsManager) markRequestAsProcessed(deleteRequest DeleteReque } func (d *DeleteRequestsManager) MarkPhaseFinished() { - d.deleteRequestsToProcessMtx.Lock() - defer d.deleteRequestsToProcessMtx.Unlock() + if d.currentBatch.requestCount() == 0 { + return + } - for _, userDeleteRequests := range d.deleteRequestsToProcess { + for _, userDeleteRequests := range d.currentBatch.deleteRequestsToProcess { if userDeleteRequests == nil { continue } @@ -468,7 +388,7 @@ func (d *DeleteRequestsManager) MarkPhaseFinished() { } } - for _, req := range d.duplicateRequests { + for _, req := range d.currentBatch.duplicateRequests { level.Info(util_log.Logger).Log("msg", "marking duplicate delete request as processed", "delete_request_id", req.RequestID, "sequence_num", req.SequenceNum, @@ -481,28 +401,22 @@ func (d *DeleteRequestsManager) MarkPhaseFinished() { level.Error(util_log.Logger).Log("msg", "failed to merge sharded requests", "err", err) } - // When we hit a timeout, MarkPhaseTimedOut is called to clear the list of delete requests to avoid marking delete requests as processed. - // Since this method is still called when we hit a timeout, we do not want to drop the progress so that deletion skips the already processed streams. - if len(d.deleteRequestsToProcess) > 0 { - d.processedSeries = map[string]struct{}{} - if err := os.Remove(filepath.Join(d.workingDir, seriesProgressFilename)); err != nil && !os.IsNotExist(err) { - level.Error(util_log.Logger).Log("msg", "failed to remove series progress file", "err", err) - } + d.processedSeriesMtx.Lock() + defer d.processedSeriesMtx.Unlock() + + d.processedSeries = map[string]struct{}{} + d.currentBatch.reset() + if err := os.Remove(filepath.Join(d.workingDir, seriesProgressFilename)); err != nil && !os.IsNotExist(err) { + level.Error(util_log.Logger).Log("msg", "failed to remove series progress file", "err", err) } } func (d *DeleteRequestsManager) IntervalMayHaveExpiredChunks(_ model.Interval, userID string) bool { - d.deleteRequestsToProcessMtx.Lock() - defer d.deleteRequestsToProcessMtx.Unlock() - - // We can't do the overlap check between the passed interval and delete requests interval from a user because - // if a request is issued just for today and there are chunks spanning today and yesterday then - // the overlap check would skip processing yesterday's index which would result in the index pointing to deleted chunks. - if userID != "" { - return d.deleteRequestsToProcess[userID] != nil + if d.currentBatch.requestCount() == 0 { + return false } - return len(d.deleteRequestsToProcess) != 0 + return d.currentBatch.intervalMayHaveExpiredChunks(userID) } func (d *DeleteRequestsManager) DropFromIndex(_ []byte, _ retention.Chunk, _ labels.Labels, _ model.Time, _ model.Time) bool { @@ -510,15 +424,15 @@ func (d *DeleteRequestsManager) DropFromIndex(_ []byte, _ retention.Chunk, _ lab } func (d *DeleteRequestsManager) MarkSeriesAsProcessed(userID, seriesID []byte, lbls labels.Labels, tableName string) error { - d.deleteRequestsToProcessMtx.Lock() - defer d.deleteRequestsToProcessMtx.Unlock() - userIDStr := unsafeGetString(userID) - if d.deleteRequestsToProcess[userIDStr] == nil { + if d.currentBatch.requestCount() == 0 { return nil } - for _, req := range d.deleteRequestsToProcess[userIDStr].requests { + d.processedSeriesMtx.Lock() + defer d.processedSeriesMtx.Unlock() + + for _, req := range d.currentBatch.getAllRequestsForUser(userIDStr) { // if the delete request does not touch the series, do not waste space in storing the marker if !labels.Selector(req.matchers).Matches(lbls) { continue diff --git a/pkg/compactor/deletion/delete_requests_manager_test.go b/pkg/compactor/deletion/delete_requests_manager_test.go index 40bf65b0ca..9c1ec76dfe 100644 --- a/pkg/compactor/deletion/delete_requests_manager_test.go +++ b/pkg/compactor/deletion/delete_requests_manager_test.go @@ -938,9 +938,10 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { deletionMode: tc.deletionMode.String(), }}, nil) require.NoError(t, err) - require.NoError(t, mgr.loadDeleteRequestsToProcess()) + mgr.MarkPhaseStarted() + require.NotNil(t, mgr.currentBatch) - for _, deleteRequests := range mgr.deleteRequestsToProcess { + for _, deleteRequests := range mgr.currentBatch.deleteRequestsToProcess { for _, dr := range deleteRequests.requests { require.EqualValues(t, 0, dr.DeletedLines) } @@ -968,12 +969,13 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { require.Equal(t, tc.expectedResp.expectedFilter(start.Time(), line, structuredMetadata), filterFunc(start.Time(), line, structuredMetadata), "line", line, "time", start.Time(), "now", now.Time()) } - require.Equal(t, len(tc.expectedDeletionRangeByUser), len(mgr.deleteRequestsToProcess)) + require.Equal(t, len(tc.expectedDeletionRangeByUser), len(mgr.currentBatch.deleteRequestsToProcess)) for userID, dr := range tc.expectedDeletionRangeByUser { - require.Equal(t, dr, mgr.deleteRequestsToProcess[userID].requestsInterval) + require.Equal(t, dr, mgr.currentBatch.deleteRequestsToProcess[userID].requestsInterval) } } + duplicateRequests := mgr.currentBatch.duplicateRequests mgr.MarkPhaseFinished() processedRequests, err := mockDeleteRequestsStore.getDeleteRequestsByStatus(StatusProcessed) @@ -983,7 +985,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { for i, reqIdx := range tc.expectedRequestsMarkedAsProcessed { require.True(t, requestsAreEqual(tc.deleteRequestsFromStore[reqIdx], processedRequests[i])) } - require.Len(t, mgr.duplicateRequests, tc.expectedDuplicateRequestsCount) + require.Len(t, duplicateRequests, tc.expectedDuplicateRequestsCount) }) } } @@ -1007,7 +1009,8 @@ func TestDeleteRequestsManager_IntervalMayHaveExpiredChunks(t *testing.T) { for _, tc := range tt { mgr, err := NewDeleteRequestsManager(t.TempDir(), &mockDeleteRequestsStore{deleteRequests: tc.deleteRequestsFromStore}, time.Hour, 70, &fakeLimits{defaultLimit: limit{deletionMode: deletionmode.FilterAndDelete.String()}}, nil) require.NoError(t, err) - require.NoError(t, mgr.loadDeleteRequestsToProcess()) + mgr.MarkPhaseStarted() + require.NotNil(t, mgr.currentBatch) interval := model.Interval{Start: 300, End: 600} require.Equal(t, tc.hasChunks, mgr.IntervalMayHaveExpiredChunks(interval, tc.user)) @@ -1192,7 +1195,8 @@ func TestDeleteRequestsManager_SeriesProgress(t *testing.T) { mgr, err := NewDeleteRequestsManager(workingDir, deleteRequestsStore, time.Hour, 70, &fakeLimits{defaultLimit: limit{deletionMode: deletionmode.FilterAndDelete.String()}}, nil) require.NoError(t, err) - require.NoError(t, mgr.loadDeleteRequestsToProcess()) + mgr.MarkPhaseStarted() + require.NotNil(t, mgr.currentBatch) for _, m := range tc.seriesToMarkProcessed { require.NoError(t, mgr.MarkSeriesAsProcessed(m.userID, m.seriesID, m.lbls, m.tableName)) @@ -1208,7 +1212,8 @@ func TestDeleteRequestsManager_SeriesProgress(t *testing.T) { mgr, err = NewDeleteRequestsManager(workingDir, deleteRequestsStore, time.Hour, 70, &fakeLimits{defaultLimit: limit{deletionMode: deletionmode.FilterAndDelete.String()}}, nil) require.NoError(t, err) require.Equal(t, storedSeriesProgress, mgr.processedSeries) - require.NoError(t, mgr.loadDeleteRequestsToProcess()) + mgr.MarkPhaseStarted() + require.NotNil(t, mgr.currentBatch) // when the mark phase ends, series progress should get cleared mgr.MarkPhaseFinished() @@ -1230,7 +1235,8 @@ func TestDeleteRequestsManager_SeriesProgressWithTimeout(t *testing.T) { mgr, err := NewDeleteRequestsManager(workingDir, deleteRequestsStore, time.Hour, 70, &fakeLimits{defaultLimit: limit{deletionMode: deletionmode.FilterAndDelete.String()}}, nil) require.NoError(t, err) - require.NoError(t, mgr.loadDeleteRequestsToProcess()) + mgr.MarkPhaseStarted() + require.NotNil(t, mgr.currentBatch) require.NoError(t, mgr.MarkSeriesAsProcessed(user1, []byte(lblFooBar.String()), lblFooBar, "t1")) @@ -1244,7 +1250,8 @@ func TestDeleteRequestsManager_SeriesProgressWithTimeout(t *testing.T) { require.FileExists(t, filepath.Join(workingDir, seriesProgressFilename)) // load the requests again for processing - require.NoError(t, mgr.loadDeleteRequestsToProcess()) + mgr.MarkPhaseStarted() + require.NotNil(t, mgr.currentBatch) // not hitting the timeout should clear the series progress mgr.MarkPhaseFinished() diff --git a/pkg/compactor/deletion/deletion_manifest_builder.go b/pkg/compactor/deletion/deletion_manifest_builder.go new file mode 100644 index 0000000000..7e05242d58 --- /dev/null +++ b/pkg/compactor/deletion/deletion_manifest_builder.go @@ -0,0 +1,226 @@ +package deletion + +import ( + "context" + "encoding/json" + "fmt" + "path" + "strings" + "time" + + "github.com/grafana/loki/v3/pkg/compactor/retention" + "github.com/grafana/loki/v3/pkg/storage/chunk/client" +) + +var ErrNoChunksSelectedForDeletion = fmt.Errorf("no chunks selected for deletion") + +const ( + maxChunksPerSegment = 100000 + manifestFileName = "manifest.json" +) + +// ChunksGroup holds a group of chunks selected by the same set of requests +type ChunksGroup struct { + Requests []DeleteRequest `json:"requests"` + Chunks []retention.Chunk `json:"chunks"` +} + +// segment holds limited chunks(upto maxChunksPerSegment) that needs to be processed. +// It also helps segregate chunks belonging to different users/tables. +type segment struct { + UserID string `json:"user_id"` + TableName string `json:"table_name"` + ChunksGroups []ChunksGroup `json:"chunk_groups"` + ChunksCount int `json:"chunks_count"` +} + +// manifest represents the completion state and summary of discovering chunks which processing for the loaded deleteRequestBatch. +// It serves two purposes: +// 1. Acts as a completion marker indicating all chunks for the given delete requests have been found +// 2. Stores a summary of data stored in segments: +// - Original and duplicate deletion requests +// - Total number of segments and chunks to be processed +// +// Once all the segments are processed, Requests and DuplicateRequests in the manifest could be marked as processed. +type manifest struct { + Requests []DeleteRequest `json:"requests"` + DuplicateRequests []DeleteRequest `json:"duplicate_requests"` + SegmentsCount int `json:"segments_count"` + ChunksCount int `json:"chunks_count"` +} + +// deletionManifestBuilder helps with building the manifest for listing out which chunks to process for a batch of delete requests. +// It is not meant to be used concurrently. +type deletionManifestBuilder struct { + deleteStoreClient client.ObjectClient + deleteRequestBatch deleteRequestBatch + + currentSegment map[uint64]ChunksGroup + currentSegmentChunksCount int + currentUserID string + currentTableName string + + allUserRequests []*DeleteRequest + creationTime time.Time + segmentsCount int + overallChunksCount int +} + +func newDeletionManifestBuilder(deleteStoreClient client.ObjectClient, deleteRequestBatch deleteRequestBatch) (*deletionManifestBuilder, error) { + requestCount := 0 + for _, userRequests := range deleteRequestBatch.deleteRequestsToProcess { + requestCount += len(userRequests.requests) + } + + // We use a uint64 as a bit field to track which delete requests apply to each chunk. + // Since uint64 has 64 bits, we can only handle up to 64 delete requests at a time. + if requestCount > 64 { + return nil, fmt.Errorf("only upto 64 delete requests allowed, current count: %d", requestCount) + } + + builder := &deletionManifestBuilder{ + deleteStoreClient: deleteStoreClient, + deleteRequestBatch: deleteRequestBatch, + currentSegment: make(map[uint64]ChunksGroup), + creationTime: time.Now(), + } + + return builder, nil +} + +// AddSeries adds a series and its chunks to the current segment. +// It flushes the current segment if the user ID or table name changes. +// It also ensures that the current segment does not exceed the maximum number of chunks. +func (d *deletionManifestBuilder) AddSeries(ctx context.Context, tableName string, series retention.Series) error { + userIDStr := unsafeGetString(series.UserID()) + if userIDStr != d.currentUserID || tableName != d.currentTableName { + if err := d.flushCurrentBatch(ctx); err != nil { + return err + } + d.currentSegmentChunksCount = 0 + d.currentSegment = make(map[uint64]ChunksGroup) + + d.currentUserID = string(series.UserID()) + d.currentTableName = tableName + d.allUserRequests = d.deleteRequestBatch.getAllRequestsForUser(userIDStr) + if len(d.allUserRequests) == 0 { + return fmt.Errorf("no requests loaded for user: %s", userIDStr) + } + } + + var chunksGroupIdentifier uint64 + for _, chk := range series.Chunks() { + if d.currentSegmentChunksCount >= maxChunksPerSegment { + if err := d.flushCurrentBatch(ctx); err != nil { + return err + } + d.currentSegmentChunksCount = 0 + for chunksGroupIdentifier := range d.currentSegment { + group := d.currentSegment[chunksGroupIdentifier] + group.Chunks = group.Chunks[:0] + d.currentSegment[chunksGroupIdentifier] = group + } + } + + // We use a uint64 as a bit field to track which delete requests apply to each chunk. + chunksGroupIdentifier = 0 + for i, deleteRequest := range d.allUserRequests { + if !deleteRequest.IsDeleted(series.UserID(), series.Labels(), chk) { + continue + } + + chunksGroupIdentifier |= 1 << i + } + + if chunksGroupIdentifier == 0 { + continue + } + d.currentSegmentChunksCount++ + + if _, ok := d.currentSegment[chunksGroupIdentifier]; !ok { + // Iterate through d.allUserRequests and find which bits are turned on in chunksGroupIdentifier + var deleteRequests []DeleteRequest + for i := range d.allUserRequests { + if chunksGroupIdentifier&(1< len(b.Requests): + return 1 + default: + return 0 + } + }) + require.Equal(t, tc.expectedSegments[i], segment) + } + }) + } +} + +func TestDeletionManifestBuilder_Errors(t *testing.T) { + tempDir := t.TempDir() + ctx := context.Background() + objectClient, err := local.NewFSObjectClient(local.FSConfig{ + Directory: tempDir, + }) + require.NoError(t, err) + + // Create delete request batch + batch := newDeleteRequestBatch(nil) + batch.addDeleteRequest(&DeleteRequest{ + UserID: user1, + RequestID: req1, + Query: lblFooBar, + StartTime: 0, + EndTime: 100, + }) + + // Create builder + builder, err := newDeletionManifestBuilder(objectClient, *batch) + require.NoError(t, err) + + err = builder.AddSeries(ctx, table1, &mockSeries{ + userID: user2, + labels: mustParseLabel(lblFooBar), + chunks: buildChunks(0, 25), + }) + require.EqualError(t, err, fmt.Sprintf("no requests loaded for user: %s", user2)) + + err = builder.Finish(ctx) + require.EqualError(t, err, ErrNoChunksSelectedForDeletion.Error()) +} diff --git a/pkg/compactor/retention/retention.go b/pkg/compactor/retention/retention.go index 4778871d9a..57d2940e09 100644 --- a/pkg/compactor/retention/retention.go +++ b/pkg/compactor/retention/retention.go @@ -41,36 +41,49 @@ func (c Chunk) String() string { return fmt.Sprintf("ChunkID: %s", c.ChunkID) } -type Series struct { +type Series interface { + SeriesID() []byte + UserID() []byte + Labels() labels.Labels + Chunks() []Chunk + Reset(seriesID, userID []byte, labels labels.Labels) + AppendChunks(ref ...Chunk) +} + +func NewSeries() Series { + return &series{} +} + +type series struct { seriesID, userID []byte labels labels.Labels chunks []Chunk } -func (s *Series) SeriesID() []byte { +func (s *series) SeriesID() []byte { return s.seriesID } -func (s *Series) UserID() []byte { +func (s *series) UserID() []byte { return s.userID } -func (s *Series) Labels() labels.Labels { +func (s *series) Labels() labels.Labels { return s.labels } -func (s *Series) Chunks() []Chunk { +func (s *series) Chunks() []Chunk { return s.chunks } -func (s *Series) Reset(seriesID, userID []byte, labels labels.Labels) { +func (s *series) Reset(seriesID, userID []byte, labels labels.Labels) { s.seriesID = seriesID s.userID = userID s.labels = labels s.chunks = s.chunks[:0] } -func (s *Series) AppendChunks(ref ...Chunk) { +func (s *series) AppendChunks(ref ...Chunk) { s.chunks = append(s.chunks, ref...) } @@ -219,7 +232,7 @@ func markForDelete( } } - if expiration.CanSkipSeries(s.UserID(), s.labels, s.SeriesID(), seriesStart, tableName, now) { + if expiration.CanSkipSeries(s.UserID(), s.Labels(), s.SeriesID(), seriesStart, tableName, now) { empty = false return nil } diff --git a/pkg/compactor/retention/util_test.go b/pkg/compactor/retention/util_test.go index 420acf5618..aa98ab5097 100644 --- a/pkg/compactor/retention/util_test.go +++ b/pkg/compactor/retention/util_test.go @@ -142,14 +142,14 @@ func (t *table) ForEachSeries(ctx context.Context, callback SeriesCallback) erro Through: chk.Through, }) } - series := Series{} + series := series{} series.Reset( []byte(seriesID), []byte(userID), labels.NewBuilder(t.chunks[userID][seriesID][0].Metric).Del(labels.MetricName).Labels(), ) series.AppendChunks(chunks...) - if err := callback(series); err != nil { + if err := callback(&series); err != nil { return err } } diff --git a/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/iterator.go b/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/iterator.go index b5d0fe14ea..250ffd7df8 100644 --- a/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/iterator.go +++ b/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/iterator.go @@ -30,7 +30,7 @@ func ForEachSeries(ctx context.Context, bucket *bbolt.Bucket, config config.Peri } cursor := bucket.Cursor() - var current retention.Series + current := retention.NewSeries() for key, _ := cursor.First(); key != nil && ctx.Err() == nil; key, _ = cursor.Next() { ref, ok, err := parseChunkRef(decodeKey(key)) diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/compactor.go b/pkg/storage/stores/shipper/indexshipper/tsdb/compactor.go index eafdb4b60e..e2b933c121 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/compactor.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/compactor.go @@ -297,7 +297,7 @@ func (c *compactedIndex) ForEachSeries(ctx context.Context, callback retention.S logprotoChunkRef := logproto.ChunkRef{ UserID: c.userID, } - var series retention.Series + series := retention.NewSeries() for seriesID, stream := range c.builder.streams { series.Reset( getUnsafeBytes(seriesID), diff --git a/pkg/tool/audit/audit_test.go b/pkg/tool/audit/audit_test.go index b8cfb689f5..df2bafc82b 100644 --- a/pkg/tool/audit/audit_test.go +++ b/pkg/tool/audit/audit_test.go @@ -48,7 +48,7 @@ type testCompactedIdx struct { } func (t testCompactedIdx) ForEachSeries(_ context.Context, f retention.SeriesCallback) error { - var series retention.Series + series := retention.NewSeries() series.AppendChunks(t.chunks...) return f(series) }