feat: Compactor deletion manifest builder (#17474)

pull/17468/head
Sandeep Sukhani 2 weeks ago committed by GitHub
parent 3ad1a64bc1
commit 47961f802d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 22
      pkg/compactor/deletion/delete_request.go
  2. 156
      pkg/compactor/deletion/delete_request_batch.go
  3. 719
      pkg/compactor/deletion/delete_request_batch_test.go
  4. 4
      pkg/compactor/deletion/delete_request_test.go
  5. 232
      pkg/compactor/deletion/delete_requests_manager.go
  6. 27
      pkg/compactor/deletion/delete_requests_manager_test.go
  7. 226
      pkg/compactor/deletion/deletion_manifest_builder.go
  8. 637
      pkg/compactor/deletion/deletion_manifest_builder_test.go
  9. 29
      pkg/compactor/retention/retention.go
  10. 4
      pkg/compactor/retention/util_test.go
  11. 2
      pkg/storage/stores/shipper/indexshipper/boltdb/compactor/iterator.go
  12. 2
      pkg/storage/stores/shipper/indexshipper/tsdb/compactor.go
  13. 2
      pkg/tool/audit/audit_test.go

@ -106,12 +106,10 @@ func allMatch(matchers []*labels.Matcher, labels labels.Labels) bool {
return true
}
// IsDeleted checks if the given ChunkEntry will be deleted by this DeleteRequest.
// It returns a filter.Func if the chunk is supposed to be deleted partially or the delete request contains line filters.
// If the filter.Func is nil, the whole chunk is supposed to be deleted.
func (d *DeleteRequest) IsDeleted(userID []byte, lbls labels.Labels, chunk retention.Chunk) (bool, filter.Func) {
// IsDeleted checks if the given chunk entry would have data requested for deletion.
func (d *DeleteRequest) IsDeleted(userID []byte, lbls labels.Labels, chunk retention.Chunk) bool {
if d.UserID != unsafeGetString(userID) {
return false, nil
return false
}
if !intervalsOverlap(model.Interval{
@ -121,7 +119,7 @@ func (d *DeleteRequest) IsDeleted(userID []byte, lbls labels.Labels, chunk reten
Start: d.StartTime,
End: d.EndTime,
}) {
return false, nil
return false
}
if d.logSelectorExpr == nil {
@ -133,11 +131,21 @@ func (d *DeleteRequest) IsDeleted(userID []byte, lbls labels.Labels, chunk reten
"user", d.UserID,
"err", err,
)
return false, nil
return false
}
}
if !labels.Selector(d.matchers).Matches(lbls) {
return false
}
return true
}
// GetChunkFilter tells whether the chunk is covered by the DeleteRequest and
// optionally returns a filter.Func if the chunk is supposed to be deleted partially or the delete request has line filters.
func (d *DeleteRequest) GetChunkFilter(userID []byte, lbls labels.Labels, chunk retention.Chunk) (bool, filter.Func) {
if !d.IsDeleted(userID, lbls, chunk) {
return false, nil
}

@ -0,0 +1,156 @@
package deletion
import (
"time"
"github.com/go-kit/log/level"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/grafana/loki/v3/pkg/compactor/retention"
"github.com/grafana/loki/v3/pkg/util/filter"
util_log "github.com/grafana/loki/v3/pkg/util/log"
)
// deleteRequestBatch holds a batch of requests loaded for processing
type deleteRequestBatch struct {
deleteRequestsToProcess map[string]*userDeleteRequests
duplicateRequests []DeleteRequest
count int
metrics *deleteRequestsManagerMetrics
}
func newDeleteRequestBatch(metrics *deleteRequestsManagerMetrics) *deleteRequestBatch {
return &deleteRequestBatch{
deleteRequestsToProcess: map[string]*userDeleteRequests{},
metrics: metrics,
}
}
func (b *deleteRequestBatch) reset() {
b.deleteRequestsToProcess = map[string]*userDeleteRequests{}
b.duplicateRequests = []DeleteRequest{}
b.count = 0
}
func (b *deleteRequestBatch) requestCount() int {
return b.count
}
// addDeleteRequest add a requests to the batch
func (b *deleteRequestBatch) addDeleteRequest(dr *DeleteRequest) {
dr.Metrics = b.metrics
ur, ok := b.deleteRequestsToProcess[dr.UserID]
if !ok {
ur = &userDeleteRequests{
requestsInterval: model.Interval{
Start: dr.StartTime,
End: dr.EndTime,
},
}
b.deleteRequestsToProcess[dr.UserID] = ur
}
ur.requests = append(ur.requests, dr)
if dr.StartTime < ur.requestsInterval.Start {
ur.requestsInterval.Start = dr.StartTime
}
if dr.EndTime > ur.requestsInterval.End {
ur.requestsInterval.End = dr.EndTime
}
b.count++
}
func (b *deleteRequestBatch) checkDuplicate(deleteRequest DeleteRequest) error {
ur, ok := b.deleteRequestsToProcess[deleteRequest.UserID]
if !ok {
return nil
}
for _, requestLoadedForProcessing := range ur.requests {
isDuplicate, err := requestLoadedForProcessing.IsDuplicate(&deleteRequest)
if err != nil {
return err
}
if isDuplicate {
level.Info(util_log.Logger).Log(
"msg", "found duplicate request of one of the requests loaded for processing",
"loaded_request_id", requestLoadedForProcessing.RequestID,
"duplicate_request_id", deleteRequest.RequestID,
"user", deleteRequest.UserID,
)
b.duplicateRequests = append(b.duplicateRequests, deleteRequest)
}
}
return nil
}
func (b *deleteRequestBatch) expired(userID []byte, chk retention.Chunk, lbls labels.Labels, skipRequest func(*DeleteRequest) bool) (bool, filter.Func) {
userIDStr := unsafeGetString(userID)
if b.deleteRequestsToProcess[userIDStr] == nil || !intervalsOverlap(b.deleteRequestsToProcess[userIDStr].requestsInterval, model.Interval{
Start: chk.From,
End: chk.Through,
}) {
return false, nil
}
var filterFuncs []filter.Func
for _, deleteRequest := range b.deleteRequestsToProcess[userIDStr].requests {
if skipRequest(deleteRequest) {
continue
}
isDeleted, ff := deleteRequest.GetChunkFilter(userID, lbls, chk)
if !isDeleted {
continue
}
if ff == nil {
level.Info(util_log.Logger).Log(
"msg", "no chunks to retain: the whole chunk is deleted",
"delete_request_id", deleteRequest.RequestID,
"sequence_num", deleteRequest.SequenceNum,
"user", deleteRequest.UserID,
"chunkID", string(chk.ChunkID),
)
b.metrics.deleteRequestsChunksSelectedTotal.WithLabelValues(string(userID)).Inc()
return true, nil
}
filterFuncs = append(filterFuncs, ff)
}
if len(filterFuncs) == 0 {
return false, nil
}
b.metrics.deleteRequestsChunksSelectedTotal.WithLabelValues(string(userID)).Inc()
return true, func(ts time.Time, s string, structuredMetadata labels.Labels) bool {
for _, ff := range filterFuncs {
if ff(ts, s, structuredMetadata) {
return true
}
}
return false
}
}
func (b *deleteRequestBatch) intervalMayHaveExpiredChunks(userID string) bool {
// We can't do the overlap check between the passed interval and delete requests interval from a user because
// if a request is issued just for today and there are chunks spanning today and yesterday then
// the overlap check would skip processing yesterday's index which would result in the index pointing to deleted chunks.
if userID != "" {
return b.deleteRequestsToProcess[userID] != nil
}
return len(b.deleteRequestsToProcess) != 0
}
func (b *deleteRequestBatch) getAllRequestsForUser(userID string) []*DeleteRequest {
userRequests, ok := b.deleteRequestsToProcess[userID]
if !ok {
return nil
}
return userRequests.requests
}

@ -0,0 +1,719 @@
package deletion
import (
"strings"
"testing"
"time"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/compactor/retention"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/util/filter"
)
func TestDeleteRequestBatch_Expired(t *testing.T) {
type resp struct {
isExpired bool
expectedFilter filter.Func
}
now := model.Now()
lblFoo, err := syntax.ParseLabels(`{foo="bar"}`)
require.NoError(t, err)
streamSelectorWithLineFilters := lblFoo.String() + `|="fizz"`
streamSelectorWithStructuredMetadataFilters := lblFoo.String() + `| ping="pong"`
streamSelectorWithLineAndStructuredMetadataFilters := lblFoo.String() + `| ping="pong" |= "fizz"`
chunkEntry := retention.Chunk{
From: now.Add(-12 * time.Hour),
Through: now.Add(-time.Hour),
}
for _, tc := range []struct {
name string
deleteRequests []DeleteRequest
expectedResp resp
expectedDeletionRangeByUser map[string]model.Interval
}{
{
name: "no delete requests",
expectedResp: resp{
isExpired: false,
},
},
{
name: "no relevant delete requests",
deleteRequests: []DeleteRequest{
{
UserID: "different-user",
Query: lblFoo.String(),
StartTime: now.Add(-24 * time.Hour),
EndTime: now,
Status: StatusReceived,
},
},
expectedResp: resp{
isExpired: false,
},
expectedDeletionRangeByUser: map[string]model.Interval{
"different-user": {
Start: now.Add(-24 * time.Hour),
End: now,
},
},
},
{
name: "no relevant delete requests",
deleteRequests: []DeleteRequest{
{
UserID: "different-user",
Query: lblFoo.String(),
StartTime: now.Add(-24 * time.Hour),
EndTime: now,
Status: StatusReceived,
},
},
expectedResp: resp{
isExpired: false,
},
expectedDeletionRangeByUser: map[string]model.Interval{
"different-user": {
Start: now.Add(-24 * time.Hour),
End: now,
},
},
},
{
name: "delete request not matching labels",
deleteRequests: []DeleteRequest{
{
UserID: testUserID,
Query: `{fizz="buzz"}`,
StartTime: now.Add(-24 * time.Hour),
EndTime: now,
Status: StatusReceived,
},
},
expectedResp: resp{
isExpired: false,
},
expectedDeletionRangeByUser: map[string]model.Interval{
testUserID: {
Start: now.Add(-24 * time.Hour),
End: now,
},
},
},
{
name: "whole chunk deleted by single request",
deleteRequests: []DeleteRequest{
{
UserID: testUserID,
Query: lblFoo.String(),
StartTime: now.Add(-24 * time.Hour),
EndTime: now,
Status: StatusReceived,
},
},
expectedResp: resp{
isExpired: true,
},
expectedDeletionRangeByUser: map[string]model.Interval{
testUserID: {
Start: now.Add(-24 * time.Hour),
End: now,
},
},
},
{
name: "whole chunk deleted by single request with line filters",
deleteRequests: []DeleteRequest{
{
UserID: testUserID,
Query: streamSelectorWithLineFilters,
StartTime: now.Add(-24 * time.Hour),
EndTime: now,
Status: StatusReceived,
},
},
expectedResp: resp{
isExpired: true,
expectedFilter: func(_ time.Time, s string, _ labels.Labels) bool {
return strings.Contains(s, "fizz")
},
},
expectedDeletionRangeByUser: map[string]model.Interval{
testUserID: {
Start: now.Add(-24 * time.Hour),
End: now,
},
},
},
{
name: "whole chunk deleted by single request with structured metadata filters",
deleteRequests: []DeleteRequest{
{
UserID: testUserID,
Query: streamSelectorWithStructuredMetadataFilters,
StartTime: now.Add(-24 * time.Hour),
EndTime: now,
Status: StatusReceived,
},
},
expectedResp: resp{
isExpired: true,
expectedFilter: func(_ time.Time, _ string, structuredMetadata labels.Labels) bool {
return structuredMetadata.Get(lblPing) == lblPong
},
},
expectedDeletionRangeByUser: map[string]model.Interval{
testUserID: {
Start: now.Add(-24 * time.Hour),
End: now,
},
},
},
{
name: "whole chunk deleted by single request with line and structured metadata filters",
deleteRequests: []DeleteRequest{
{
UserID: testUserID,
Query: streamSelectorWithLineAndStructuredMetadataFilters,
StartTime: now.Add(-24 * time.Hour),
EndTime: now,
Status: StatusReceived,
},
},
expectedResp: resp{
isExpired: true,
expectedFilter: func(_ time.Time, s string, structuredMetadata labels.Labels) bool {
return structuredMetadata.Get(lblPing) == lblPong && strings.Contains(s, "fizz")
},
},
expectedDeletionRangeByUser: map[string]model.Interval{
testUserID: {
Start: now.Add(-24 * time.Hour),
End: now,
},
},
},
{
name: "deleted interval out of range",
deleteRequests: []DeleteRequest{
{
UserID: testUserID,
Query: lblFoo.String(),
StartTime: now.Add(-48 * time.Hour),
EndTime: now.Add(-24 * time.Hour),
Status: StatusReceived,
},
},
expectedResp: resp{
isExpired: false,
},
expectedDeletionRangeByUser: map[string]model.Interval{
testUserID: {
Start: now.Add(-48 * time.Hour),
End: now.Add(-24 * time.Hour),
},
},
},
{
name: "deleted interval out of range(with multiple user requests)",
deleteRequests: []DeleteRequest{
{
UserID: testUserID,
Query: lblFoo.String(),
StartTime: now.Add(-48 * time.Hour),
EndTime: now.Add(-24 * time.Hour),
Status: StatusReceived,
},
{
UserID: "different-user",
Query: lblFoo.String(),
StartTime: now.Add(-24 * time.Hour),
EndTime: now,
Status: StatusReceived,
},
},
expectedResp: resp{
isExpired: false,
},
expectedDeletionRangeByUser: map[string]model.Interval{
testUserID: {
Start: now.Add(-48 * time.Hour),
End: now.Add(-24 * time.Hour),
},
"different-user": {
Start: now.Add(-24 * time.Hour),
End: now,
},
},
},
{
name: "multiple delete requests with one deleting the whole chunk",
deleteRequests: []DeleteRequest{
{
UserID: testUserID,
Query: lblFoo.String(),
StartTime: now.Add(-48 * time.Hour),
EndTime: now.Add(-24 * time.Hour),
Status: StatusReceived,
},
{
UserID: testUserID,
Query: lblFoo.String(),
StartTime: now.Add(-12 * time.Hour),
EndTime: now,
Status: StatusReceived,
},
},
expectedResp: resp{
isExpired: true,
},
expectedDeletionRangeByUser: map[string]model.Interval{
testUserID: {
Start: now.Add(-48 * time.Hour),
End: now,
},
},
},
{
name: "multiple delete requests with line filters and one deleting the whole chunk",
deleteRequests: []DeleteRequest{
{
UserID: testUserID,
Query: streamSelectorWithLineFilters,
StartTime: now.Add(-48 * time.Hour),
EndTime: now.Add(-24 * time.Hour),
Status: StatusReceived,
},
{
UserID: testUserID,
Query: streamSelectorWithLineFilters,
StartTime: now.Add(-12 * time.Hour),
EndTime: now,
Status: StatusReceived,
},
},
expectedResp: resp{
isExpired: true,
expectedFilter: func(_ time.Time, s string, _ labels.Labels) bool {
return strings.Contains(s, "fizz")
},
},
expectedDeletionRangeByUser: map[string]model.Interval{
testUserID: {
Start: now.Add(-48 * time.Hour),
End: now,
},
},
},
{
name: "multiple delete requests with structured metadata filters and one deleting the whole chunk",
deleteRequests: []DeleteRequest{
{
UserID: testUserID,
Query: streamSelectorWithStructuredMetadataFilters,
StartTime: now.Add(-48 * time.Hour),
EndTime: now.Add(-24 * time.Hour),
Status: StatusReceived,
},
{
UserID: testUserID,
Query: streamSelectorWithStructuredMetadataFilters,
StartTime: now.Add(-12 * time.Hour),
EndTime: now,
Status: StatusReceived,
},
},
expectedResp: resp{
isExpired: true,
expectedFilter: func(_ time.Time, _ string, structuredMetadata labels.Labels) bool {
return structuredMetadata.Get(lblPing) == lblPong
},
},
expectedDeletionRangeByUser: map[string]model.Interval{
testUserID: {
Start: now.Add(-48 * time.Hour),
End: now,
},
},
},
{
name: "multiple delete requests causing multiple holes",
deleteRequests: []DeleteRequest{
{
UserID: testUserID,
Query: lblFoo.String(),
StartTime: now.Add(-13 * time.Hour),
EndTime: now.Add(-11 * time.Hour),
Status: StatusReceived,
},
{
UserID: testUserID,
Query: lblFoo.String(),
StartTime: now.Add(-10 * time.Hour),
EndTime: now.Add(-8 * time.Hour),
Status: StatusReceived,
},
{
UserID: testUserID,
Query: lblFoo.String(),
StartTime: now.Add(-6 * time.Hour),
EndTime: now.Add(-5 * time.Hour),
Status: StatusReceived,
},
{
UserID: testUserID,
Query: lblFoo.String(),
StartTime: now.Add(-2 * time.Hour),
EndTime: now,
Status: StatusReceived,
},
},
expectedResp: resp{
isExpired: true,
expectedFilter: func(ts time.Time, _ string, _ labels.Labels) bool {
tsUnixNano := ts.UnixNano()
if (now.Add(-13*time.Hour).UnixNano() <= tsUnixNano && tsUnixNano <= now.Add(-11*time.Hour).UnixNano()) ||
(now.Add(-10*time.Hour).UnixNano() <= tsUnixNano && tsUnixNano <= now.Add(-8*time.Hour).UnixNano()) ||
(now.Add(-6*time.Hour).UnixNano() <= tsUnixNano && tsUnixNano <= now.Add(-5*time.Hour).UnixNano()) ||
(now.Add(-2*time.Hour).UnixNano() <= tsUnixNano && tsUnixNano <= now.UnixNano()) {
return true
}
return false
},
},
expectedDeletionRangeByUser: map[string]model.Interval{
testUserID: {
Start: now.Add(-13 * time.Hour),
End: now,
},
},
},
{
name: "multiple overlapping requests deleting the whole chunk",
deleteRequests: []DeleteRequest{
{
UserID: testUserID,
Query: lblFoo.String(),
StartTime: now.Add(-13 * time.Hour),
EndTime: now.Add(-6 * time.Hour),
Status: StatusReceived,
},
{
UserID: testUserID,
Query: lblFoo.String(),
StartTime: now.Add(-8 * time.Hour),
EndTime: now,
Status: StatusReceived,
},
},
expectedResp: resp{
isExpired: true,
expectedFilter: func(_ time.Time, _ string, _ labels.Labels) bool {
return true
},
},
expectedDeletionRangeByUser: map[string]model.Interval{
testUserID: {
Start: now.Add(-13 * time.Hour),
End: now,
},
},
},
{
name: "multiple overlapping requests with line filters deleting the whole chunk",
deleteRequests: []DeleteRequest{
{
UserID: testUserID,
Query: streamSelectorWithLineFilters,
StartTime: now.Add(-13 * time.Hour),
EndTime: now.Add(-6 * time.Hour),
Status: StatusReceived,
},
{
UserID: testUserID,
Query: streamSelectorWithLineFilters,
StartTime: now.Add(-8 * time.Hour),
EndTime: now,
Status: StatusReceived,
},
},
expectedResp: resp{
isExpired: true,
expectedFilter: func(_ time.Time, s string, _ labels.Labels) bool {
return strings.Contains(s, "fizz")
},
},
expectedDeletionRangeByUser: map[string]model.Interval{
testUserID: {
Start: now.Add(-13 * time.Hour),
End: now,
},
},
},
{
name: "multiple overlapping requests with structured metadata filters deleting the whole chunk",
deleteRequests: []DeleteRequest{
{
UserID: testUserID,
Query: streamSelectorWithStructuredMetadataFilters,
StartTime: now.Add(-13 * time.Hour),
EndTime: now.Add(-6 * time.Hour),
Status: StatusReceived,
},
{
UserID: testUserID,
Query: streamSelectorWithStructuredMetadataFilters,
StartTime: now.Add(-8 * time.Hour),
EndTime: now,
Status: StatusReceived,
},
},
expectedResp: resp{
isExpired: true,
expectedFilter: func(_ time.Time, _ string, structuredMetadata labels.Labels) bool {
return structuredMetadata.Get(lblPing) == lblPong
},
},
expectedDeletionRangeByUser: map[string]model.Interval{
testUserID: {
Start: now.Add(-13 * time.Hour),
End: now,
},
},
},
{
name: "multiple non-overlapping requests deleting the whole chunk",
deleteRequests: []DeleteRequest{
{
UserID: testUserID,
Query: lblFoo.String(),
StartTime: now.Add(-12 * time.Hour),
EndTime: now.Add(-6*time.Hour) - 1,
Status: StatusReceived,
},
{
UserID: testUserID,
Query: lblFoo.String(),
StartTime: now.Add(-6 * time.Hour),
EndTime: now.Add(-4*time.Hour) - 1,
Status: StatusReceived,
},
{
UserID: testUserID,
Query: lblFoo.String(),
StartTime: now.Add(-4 * time.Hour),
EndTime: now,
Status: StatusReceived,
},
},
expectedResp: resp{
isExpired: true,
expectedFilter: func(_ time.Time, _ string, _ labels.Labels) bool {
return true
},
},
expectedDeletionRangeByUser: map[string]model.Interval{
testUserID: {
Start: now.Add(-12 * time.Hour),
End: now,
},
},
},
{
name: "multiple non-overlapping requests with line filter deleting the whole chunk",
deleteRequests: []DeleteRequest{
{
UserID: testUserID,
Query: streamSelectorWithLineFilters,
StartTime: now.Add(-12 * time.Hour),
EndTime: now.Add(-6*time.Hour) - 1,
Status: StatusReceived,
},
{
UserID: testUserID,
Query: streamSelectorWithLineFilters,
StartTime: now.Add(-6 * time.Hour),
EndTime: now.Add(-4*time.Hour) - 1,
Status: StatusReceived,
},
{
UserID: testUserID,
Query: streamSelectorWithLineFilters,
StartTime: now.Add(-4 * time.Hour),
EndTime: now,
Status: StatusReceived,
},
},
expectedResp: resp{
isExpired: true,
expectedFilter: func(_ time.Time, s string, _ labels.Labels) bool {
return strings.Contains(s, "fizz")
},
},
expectedDeletionRangeByUser: map[string]model.Interval{
testUserID: {
Start: now.Add(-12 * time.Hour),
End: now,
},
},
},
{
name: "multiple non-overlapping requests with structured metadata filter deleting the whole chunk",
deleteRequests: []DeleteRequest{
{
UserID: testUserID,
Query: streamSelectorWithStructuredMetadataFilters,
StartTime: now.Add(-12 * time.Hour),
EndTime: now.Add(-6*time.Hour) - 1,
Status: StatusReceived,
},
{
UserID: testUserID,
Query: streamSelectorWithStructuredMetadataFilters,
StartTime: now.Add(-6 * time.Hour),
EndTime: now.Add(-4*time.Hour) - 1,
Status: StatusReceived,
},
{
UserID: testUserID,
Query: streamSelectorWithStructuredMetadataFilters,
StartTime: now.Add(-4 * time.Hour),
EndTime: now,
Status: StatusReceived,
},
},
expectedResp: resp{
isExpired: true,
expectedFilter: func(_ time.Time, _ string, structuredMetadata labels.Labels) bool {
return structuredMetadata.Get(lblPing) == lblPong
},
},
expectedDeletionRangeByUser: map[string]model.Interval{
testUserID: {
Start: now.Add(-12 * time.Hour),
End: now,
},
},
},
} {
t.Run(tc.name, func(t *testing.T) {
batch := newDeleteRequestBatch(newDeleteRequestsManagerMetrics(nil))
for _, req := range tc.deleteRequests {
batch.addDeleteRequest(&req)
}
for _, deleteRequests := range batch.deleteRequestsToProcess {
for _, dr := range deleteRequests.requests {
require.EqualValues(t, 0, dr.DeletedLines)
}
}
isExpired, filterFunc := batch.expired([]byte(testUserID), chunkEntry, lblFoo, func(_ *DeleteRequest) bool {
return false
})
require.Equal(t, tc.expectedResp.isExpired, isExpired)
if tc.expectedResp.expectedFilter == nil {
require.Nil(t, filterFunc)
} else {
require.NotNil(t, filterFunc)
for start := chunkEntry.From; start <= chunkEntry.Through; start = start.Add(time.Minute) {
line := "foo bar"
if start.Time().Minute()%2 == 1 {
line = "fizz buzz"
}
// mix of empty, ding=dong and ping=pong as structured metadata
var structuredMetadata labels.Labels
if start.Time().Minute()%3 == 0 {
structuredMetadata = labels.FromStrings(lblPing, lblPong)
} else if start.Time().Minute()%2 == 0 {
structuredMetadata = labels.FromStrings("ting", "tong")
}
require.Equal(t, tc.expectedResp.expectedFilter(start.Time(), line, structuredMetadata), filterFunc(start.Time(), line, structuredMetadata), "line", line, "time", start.Time(), "now", now.Time())
}
require.Equal(t, len(tc.expectedDeletionRangeByUser), len(batch.deleteRequestsToProcess))
for userID, dr := range tc.expectedDeletionRangeByUser {
require.Equal(t, dr, batch.deleteRequestsToProcess[userID].requestsInterval)
}
}
})
}
}
func TestDeleteRequestBatch_IntervalMayHaveExpiredChunks(t *testing.T) {
tests := []struct {
name string
deleteRequests map[string]*userDeleteRequests
userID string
expected bool
}{
{
name: "no delete requests",
deleteRequests: map[string]*userDeleteRequests{},
userID: "test-user",
expected: false,
},
{
name: "has delete requests for user",
deleteRequests: map[string]*userDeleteRequests{
"test-user": {
requests: []*DeleteRequest{
{
UserID: "test-user",
},
},
},
},
userID: "test-user",
expected: true,
},
{
name: "has delete requests but not for user",
deleteRequests: map[string]*userDeleteRequests{
"other-user": {
requests: []*DeleteRequest{
{
UserID: "other-user",
},
},
},
},
userID: "test-user",
expected: false,
},
{
name: "check for all users",
deleteRequests: map[string]*userDeleteRequests{
"test-user": {
requests: []*DeleteRequest{
{
UserID: "test-user",
},
},
},
},
userID: "",
expected: true,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
batch := &deleteRequestBatch{
deleteRequestsToProcess: tc.deleteRequests,
metrics: &deleteRequestsManagerMetrics{},
}
result := batch.intervalMayHaveExpiredChunks(tc.userID)
require.Equal(t, tc.expected, result)
})
}
}

@ -23,7 +23,7 @@ const (
lblPong = "pong"
)
func TestDeleteRequest_IsDeleted(t *testing.T) {
func TestDeleteRequest_GetChunkFilter(t *testing.T) {
now := model.Now()
user1 := "user1"
@ -271,7 +271,7 @@ func TestDeleteRequest_IsDeleted(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
require.NoError(t, tc.deleteRequest.SetQuery(tc.deleteRequest.Query))
tc.deleteRequest.Metrics = newDeleteRequestsManagerMetrics(nil)
isExpired, filterFunc := tc.deleteRequest.IsDeleted([]byte(user1), mustParseLabel(lbl), chunkEntry)
isExpired, filterFunc := tc.deleteRequest.GetChunkFilter([]byte(user1), mustParseLabel(lbl), chunkEntry)
require.Equal(t, tc.expectedResp.isDeleted, isExpired)
if tc.expectedResp.expectedFilter == nil {
require.Nil(t, filterFunc)

@ -22,11 +22,17 @@ import (
util_log "github.com/grafana/loki/v3/pkg/util/log"
)
type DeleteRequestsKind string
const (
statusSuccess = "success"
statusFail = "fail"
seriesProgressFilename = "series_progress.json"
DeleteRequestsWithLineFilters DeleteRequestsKind = "DeleteRequestsWithLineFilters"
DeleteRequestsWithoutLineFilters DeleteRequestsKind = "DeleteRequestsWithoutLineFilters"
DeleteRequestsAll DeleteRequestsKind = "DeleteRequestsAll"
)
type userDeleteRequests struct {
@ -40,24 +46,23 @@ type DeleteRequestsManager struct {
deleteRequestsStore DeleteRequestsStore
deleteRequestCancelPeriod time.Duration
deleteRequestsToProcess map[string]*userDeleteRequests
deleteRequestsToProcessMtx sync.Mutex
duplicateRequests []DeleteRequest
metrics *deleteRequestsManagerMetrics
wg sync.WaitGroup
done chan struct{}
batchSize int
limits Limits
processedSeries map[string]struct{}
metrics *deleteRequestsManagerMetrics
wg sync.WaitGroup
done chan struct{}
batchSize int
limits Limits
currentBatch *deleteRequestBatch
processedSeries map[string]struct{}
processedSeriesMtx sync.RWMutex
}
func NewDeleteRequestsManager(workingDir string, store DeleteRequestsStore, deleteRequestCancelPeriod time.Duration, batchSize int, limits Limits, registerer prometheus.Registerer) (*DeleteRequestsManager, error) {
metrics := newDeleteRequestsManagerMetrics(registerer)
dm := &DeleteRequestsManager{
workingDir: workingDir,
deleteRequestsStore: store,
deleteRequestCancelPeriod: deleteRequestCancelPeriod,
deleteRequestsToProcess: map[string]*userDeleteRequests{},
metrics: newDeleteRequestsManagerMetrics(registerer),
metrics: metrics,
done: make(chan struct{}),
batchSize: batchSize,
limits: limits,
@ -111,8 +116,8 @@ func (d *DeleteRequestsManager) Stop() {
}
func (d *DeleteRequestsManager) storeSeriesProgress() error {
d.deleteRequestsToProcessMtx.Lock()
defer d.deleteRequestsToProcessMtx.Unlock()
d.processedSeriesMtx.RLock()
defer d.processedSeriesMtx.RUnlock()
if len(d.processedSeries) == 0 {
return nil
@ -173,21 +178,29 @@ func (d *DeleteRequestsManager) updateMetrics() error {
return nil
}
func (d *DeleteRequestsManager) loadDeleteRequestsToProcess() error {
d.deleteRequestsToProcessMtx.Lock()
defer d.deleteRequestsToProcessMtx.Unlock()
// Reset this first so any errors result in a clear map
d.deleteRequestsToProcess = map[string]*userDeleteRequests{}
func (d *DeleteRequestsManager) loadDeleteRequestsToProcess(kind DeleteRequestsKind) (*deleteRequestBatch, error) {
batch := newDeleteRequestBatch(d.metrics)
deleteRequests, err := d.filteredSortedDeleteRequests()
if err != nil {
return err
return nil, err
}
reqCount := 0
for i := range deleteRequests {
deleteRequest := deleteRequests[i]
if deleteRequest.logSelectorExpr == nil {
err := deleteRequest.SetQuery(deleteRequest.Query)
if err != nil {
return nil, errors.Wrapf(err, "failed to init log selector expr for request_id=%s, user_id=%s", deleteRequest.RequestID, deleteRequest.UserID)
}
}
if kind == DeleteRequestsWithLineFilters && !deleteRequest.logSelectorExpr.HasFilter() {
continue
} else if kind == DeleteRequestsWithoutLineFilters && deleteRequest.logSelectorExpr.HasFilter() {
continue
}
maxRetentionInterval := getMaxRetentionInterval(deleteRequest.UserID, d.limits)
// retention interval 0 means retain the data forever
if maxRetentionInterval != 0 {
@ -202,55 +215,25 @@ func (d *DeleteRequestsManager) loadDeleteRequestsToProcess() error {
continue
}
}
if ur, ok := d.deleteRequestsToProcess[deleteRequest.UserID]; ok {
for _, requestLoadedForProcessing := range ur.requests {
isDuplicate, err := requestLoadedForProcessing.IsDuplicate(&deleteRequest)
if err != nil {
return err
}
if isDuplicate {
level.Info(util_log.Logger).Log(
"msg", "found duplicate request of one of the requests loaded for processing",
"loaded_request_id", requestLoadedForProcessing.RequestID,
"duplicate_request_id", deleteRequest.RequestID,
"user", deleteRequest.UserID,
)
d.duplicateRequests = append(d.duplicateRequests, deleteRequest)
}
}
if err := batch.checkDuplicate(deleteRequest); err != nil {
return nil, err
}
if reqCount >= d.batchSize {
logBatchTruncation(reqCount, len(deleteRequests))
break
}
if deleteRequest.logSelectorExpr == nil {
err := deleteRequest.SetQuery(deleteRequest.Query)
if err != nil {
return errors.Wrapf(err, "failed to init log selector expr for request_id=%s, user_id=%s", deleteRequest.RequestID, deleteRequest.UserID)
}
}
level.Info(util_log.Logger).Log(
"msg", "Started processing delete request for user",
"delete_request_id", deleteRequest.RequestID,
"user", deleteRequest.UserID,
)
deleteRequest.Metrics = d.metrics
ur := d.requestsForUser(deleteRequest)
ur.requests = append(ur.requests, &deleteRequest)
if deleteRequest.StartTime < ur.requestsInterval.Start {
ur.requestsInterval.Start = deleteRequest.StartTime
}
if deleteRequest.EndTime > ur.requestsInterval.End {
ur.requestsInterval.End = deleteRequest.EndTime
}
batch.addDeleteRequest(&deleteRequest)
reqCount++
}
return nil
return batch, nil
}
func (d *DeleteRequestsManager) filteredSortedDeleteRequests() ([]DeleteRequest, error) {
@ -294,20 +277,6 @@ func (d *DeleteRequestsManager) filteredRequests(reqs []DeleteRequest) ([]Delete
return filtered, nil
}
func (d *DeleteRequestsManager) requestsForUser(dr DeleteRequest) *userDeleteRequests {
ur, ok := d.deleteRequestsToProcess[dr.UserID]
if !ok {
ur = &userDeleteRequests{
requestsInterval: model.Interval{
Start: dr.StartTime,
End: dr.EndTime,
},
}
d.deleteRequestsToProcess[dr.UserID] = ur
}
return ur
}
func logBatchTruncation(size, total int) {
if size < total {
level.Info(util_log.Logger).Log(
@ -330,16 +299,12 @@ func (d *DeleteRequestsManager) shouldProcessRequest(dr DeleteRequest) (bool, er
}
func (d *DeleteRequestsManager) CanSkipSeries(userID []byte, lbls labels.Labels, seriesID []byte, _ model.Time, tableName string, _ model.Time) bool {
userIDStr := unsafeGetString(userID)
d.deleteRequestsToProcessMtx.Lock()
defer d.deleteRequestsToProcessMtx.Unlock()
d.processedSeriesMtx.RLock()
defer d.processedSeriesMtx.RUnlock()
if d.deleteRequestsToProcess[userIDStr] == nil {
return true
}
userIDStr := unsafeGetString(userID)
for _, deleteRequest := range d.deleteRequestsToProcess[userIDStr].requests {
for _, deleteRequest := range d.currentBatch.getAllRequestsForUser(userIDStr) {
// if the delete request does not touch the series, continue looking for other matching requests
if !labels.Selector(deleteRequest.matchers).Matches(lbls) {
continue
@ -355,81 +320,35 @@ func (d *DeleteRequestsManager) CanSkipSeries(userID []byte, lbls labels.Labels,
}
func (d *DeleteRequestsManager) Expired(userID []byte, chk retention.Chunk, lbls labels.Labels, seriesID []byte, tableName string, _ model.Time) (bool, filter.Func) {
d.deleteRequestsToProcessMtx.Lock()
defer d.deleteRequestsToProcessMtx.Unlock()
userIDStr := unsafeGetString(userID)
if d.deleteRequestsToProcess[userIDStr] == nil || !intervalsOverlap(d.deleteRequestsToProcess[userIDStr].requestsInterval, model.Interval{
Start: chk.From,
End: chk.Through,
}) {
return false, nil
}
var filterFuncs []filter.Func
for _, deleteRequest := range d.deleteRequestsToProcess[userIDStr].requests {
if _, ok := d.processedSeries[buildProcessedSeriesKey(deleteRequest.RequestID, deleteRequest.StartTime, deleteRequest.EndTime, seriesID, tableName)]; ok {
continue
}
isDeleted, ff := deleteRequest.IsDeleted(userID, lbls, chk)
if !isDeleted {
continue
}
if ff == nil {
level.Info(util_log.Logger).Log(
"msg", "no chunks to retain: the whole chunk is deleted",
"delete_request_id", deleteRequest.RequestID,
"sequence_num", deleteRequest.SequenceNum,
"user", deleteRequest.UserID,
"chunkID", string(chk.ChunkID),
)
d.metrics.deleteRequestsChunksSelectedTotal.WithLabelValues(string(userID)).Inc()
return true, nil
}
filterFuncs = append(filterFuncs, ff)
}
if len(filterFuncs) == 0 {
return false, nil
}
return d.currentBatch.expired(userID, chk, lbls, func(request *DeleteRequest) bool {
d.processedSeriesMtx.RLock()
defer d.processedSeriesMtx.RUnlock()
d.metrics.deleteRequestsChunksSelectedTotal.WithLabelValues(string(userID)).Inc()
return true, func(ts time.Time, s string, structuredMetadata labels.Labels) bool {
for _, ff := range filterFuncs {
if ff(ts, s, structuredMetadata) {
return true
}
}
return false
}
_, ok := d.processedSeries[buildProcessedSeriesKey(request.RequestID, request.StartTime, request.EndTime, seriesID, tableName)]
return ok
})
}
func (d *DeleteRequestsManager) MarkPhaseStarted() {
status := statusSuccess
if err := d.loadDeleteRequestsToProcess(); err != nil {
if batch, err := d.loadDeleteRequestsToProcess(DeleteRequestsAll); err != nil {
status = statusFail
d.currentBatch = nil
level.Error(util_log.Logger).Log("msg", "failed to load delete requests to process", "err", err)
} else {
d.currentBatch = batch
}
d.metrics.loadPendingRequestsAttemptsTotal.WithLabelValues(status).Inc()
}
func (d *DeleteRequestsManager) MarkPhaseFailed() {
d.deleteRequestsToProcessMtx.Lock()
defer d.deleteRequestsToProcessMtx.Unlock()
d.currentBatch.reset()
d.metrics.deletionFailures.WithLabelValues("error").Inc()
d.deleteRequestsToProcess = map[string]*userDeleteRequests{}
}
func (d *DeleteRequestsManager) MarkPhaseTimedOut() {
d.deleteRequestsToProcessMtx.Lock()
defer d.deleteRequestsToProcessMtx.Unlock()
d.currentBatch.reset()
d.metrics.deletionFailures.WithLabelValues("timeout").Inc()
d.deleteRequestsToProcess = map[string]*userDeleteRequests{}
}
func (d *DeleteRequestsManager) markRequestAsProcessed(deleteRequest DeleteRequest) {
@ -455,10 +374,11 @@ func (d *DeleteRequestsManager) markRequestAsProcessed(deleteRequest DeleteReque
}
func (d *DeleteRequestsManager) MarkPhaseFinished() {
d.deleteRequestsToProcessMtx.Lock()
defer d.deleteRequestsToProcessMtx.Unlock()
if d.currentBatch.requestCount() == 0 {
return
}
for _, userDeleteRequests := range d.deleteRequestsToProcess {
for _, userDeleteRequests := range d.currentBatch.deleteRequestsToProcess {
if userDeleteRequests == nil {
continue
}
@ -468,7 +388,7 @@ func (d *DeleteRequestsManager) MarkPhaseFinished() {
}
}
for _, req := range d.duplicateRequests {
for _, req := range d.currentBatch.duplicateRequests {
level.Info(util_log.Logger).Log("msg", "marking duplicate delete request as processed",
"delete_request_id", req.RequestID,
"sequence_num", req.SequenceNum,
@ -481,28 +401,22 @@ func (d *DeleteRequestsManager) MarkPhaseFinished() {
level.Error(util_log.Logger).Log("msg", "failed to merge sharded requests", "err", err)
}
// When we hit a timeout, MarkPhaseTimedOut is called to clear the list of delete requests to avoid marking delete requests as processed.
// Since this method is still called when we hit a timeout, we do not want to drop the progress so that deletion skips the already processed streams.
if len(d.deleteRequestsToProcess) > 0 {
d.processedSeries = map[string]struct{}{}
if err := os.Remove(filepath.Join(d.workingDir, seriesProgressFilename)); err != nil && !os.IsNotExist(err) {
level.Error(util_log.Logger).Log("msg", "failed to remove series progress file", "err", err)
}
d.processedSeriesMtx.Lock()
defer d.processedSeriesMtx.Unlock()
d.processedSeries = map[string]struct{}{}
d.currentBatch.reset()
if err := os.Remove(filepath.Join(d.workingDir, seriesProgressFilename)); err != nil && !os.IsNotExist(err) {
level.Error(util_log.Logger).Log("msg", "failed to remove series progress file", "err", err)
}
}
func (d *DeleteRequestsManager) IntervalMayHaveExpiredChunks(_ model.Interval, userID string) bool {
d.deleteRequestsToProcessMtx.Lock()
defer d.deleteRequestsToProcessMtx.Unlock()
// We can't do the overlap check between the passed interval and delete requests interval from a user because
// if a request is issued just for today and there are chunks spanning today and yesterday then
// the overlap check would skip processing yesterday's index which would result in the index pointing to deleted chunks.
if userID != "" {
return d.deleteRequestsToProcess[userID] != nil
if d.currentBatch.requestCount() == 0 {
return false
}
return len(d.deleteRequestsToProcess) != 0
return d.currentBatch.intervalMayHaveExpiredChunks(userID)
}
func (d *DeleteRequestsManager) DropFromIndex(_ []byte, _ retention.Chunk, _ labels.Labels, _ model.Time, _ model.Time) bool {
@ -510,15 +424,15 @@ func (d *DeleteRequestsManager) DropFromIndex(_ []byte, _ retention.Chunk, _ lab
}
func (d *DeleteRequestsManager) MarkSeriesAsProcessed(userID, seriesID []byte, lbls labels.Labels, tableName string) error {
d.deleteRequestsToProcessMtx.Lock()
defer d.deleteRequestsToProcessMtx.Unlock()
userIDStr := unsafeGetString(userID)
if d.deleteRequestsToProcess[userIDStr] == nil {
if d.currentBatch.requestCount() == 0 {
return nil
}
for _, req := range d.deleteRequestsToProcess[userIDStr].requests {
d.processedSeriesMtx.Lock()
defer d.processedSeriesMtx.Unlock()
for _, req := range d.currentBatch.getAllRequestsForUser(userIDStr) {
// if the delete request does not touch the series, do not waste space in storing the marker
if !labels.Selector(req.matchers).Matches(lbls) {
continue

@ -938,9 +938,10 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
deletionMode: tc.deletionMode.String(),
}}, nil)
require.NoError(t, err)
require.NoError(t, mgr.loadDeleteRequestsToProcess())
mgr.MarkPhaseStarted()
require.NotNil(t, mgr.currentBatch)
for _, deleteRequests := range mgr.deleteRequestsToProcess {
for _, deleteRequests := range mgr.currentBatch.deleteRequestsToProcess {
for _, dr := range deleteRequests.requests {
require.EqualValues(t, 0, dr.DeletedLines)
}
@ -968,12 +969,13 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
require.Equal(t, tc.expectedResp.expectedFilter(start.Time(), line, structuredMetadata), filterFunc(start.Time(), line, structuredMetadata), "line", line, "time", start.Time(), "now", now.Time())
}
require.Equal(t, len(tc.expectedDeletionRangeByUser), len(mgr.deleteRequestsToProcess))
require.Equal(t, len(tc.expectedDeletionRangeByUser), len(mgr.currentBatch.deleteRequestsToProcess))
for userID, dr := range tc.expectedDeletionRangeByUser {
require.Equal(t, dr, mgr.deleteRequestsToProcess[userID].requestsInterval)
require.Equal(t, dr, mgr.currentBatch.deleteRequestsToProcess[userID].requestsInterval)
}
}
duplicateRequests := mgr.currentBatch.duplicateRequests
mgr.MarkPhaseFinished()
processedRequests, err := mockDeleteRequestsStore.getDeleteRequestsByStatus(StatusProcessed)
@ -983,7 +985,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
for i, reqIdx := range tc.expectedRequestsMarkedAsProcessed {
require.True(t, requestsAreEqual(tc.deleteRequestsFromStore[reqIdx], processedRequests[i]))
}
require.Len(t, mgr.duplicateRequests, tc.expectedDuplicateRequestsCount)
require.Len(t, duplicateRequests, tc.expectedDuplicateRequestsCount)
})
}
}
@ -1007,7 +1009,8 @@ func TestDeleteRequestsManager_IntervalMayHaveExpiredChunks(t *testing.T) {
for _, tc := range tt {
mgr, err := NewDeleteRequestsManager(t.TempDir(), &mockDeleteRequestsStore{deleteRequests: tc.deleteRequestsFromStore}, time.Hour, 70, &fakeLimits{defaultLimit: limit{deletionMode: deletionmode.FilterAndDelete.String()}}, nil)
require.NoError(t, err)
require.NoError(t, mgr.loadDeleteRequestsToProcess())
mgr.MarkPhaseStarted()
require.NotNil(t, mgr.currentBatch)
interval := model.Interval{Start: 300, End: 600}
require.Equal(t, tc.hasChunks, mgr.IntervalMayHaveExpiredChunks(interval, tc.user))
@ -1192,7 +1195,8 @@ func TestDeleteRequestsManager_SeriesProgress(t *testing.T) {
mgr, err := NewDeleteRequestsManager(workingDir, deleteRequestsStore, time.Hour, 70, &fakeLimits{defaultLimit: limit{deletionMode: deletionmode.FilterAndDelete.String()}}, nil)
require.NoError(t, err)
require.NoError(t, mgr.loadDeleteRequestsToProcess())
mgr.MarkPhaseStarted()
require.NotNil(t, mgr.currentBatch)
for _, m := range tc.seriesToMarkProcessed {
require.NoError(t, mgr.MarkSeriesAsProcessed(m.userID, m.seriesID, m.lbls, m.tableName))
@ -1208,7 +1212,8 @@ func TestDeleteRequestsManager_SeriesProgress(t *testing.T) {
mgr, err = NewDeleteRequestsManager(workingDir, deleteRequestsStore, time.Hour, 70, &fakeLimits{defaultLimit: limit{deletionMode: deletionmode.FilterAndDelete.String()}}, nil)
require.NoError(t, err)
require.Equal(t, storedSeriesProgress, mgr.processedSeries)
require.NoError(t, mgr.loadDeleteRequestsToProcess())
mgr.MarkPhaseStarted()
require.NotNil(t, mgr.currentBatch)
// when the mark phase ends, series progress should get cleared
mgr.MarkPhaseFinished()
@ -1230,7 +1235,8 @@ func TestDeleteRequestsManager_SeriesProgressWithTimeout(t *testing.T) {
mgr, err := NewDeleteRequestsManager(workingDir, deleteRequestsStore, time.Hour, 70, &fakeLimits{defaultLimit: limit{deletionMode: deletionmode.FilterAndDelete.String()}}, nil)
require.NoError(t, err)
require.NoError(t, mgr.loadDeleteRequestsToProcess())
mgr.MarkPhaseStarted()
require.NotNil(t, mgr.currentBatch)
require.NoError(t, mgr.MarkSeriesAsProcessed(user1, []byte(lblFooBar.String()), lblFooBar, "t1"))
@ -1244,7 +1250,8 @@ func TestDeleteRequestsManager_SeriesProgressWithTimeout(t *testing.T) {
require.FileExists(t, filepath.Join(workingDir, seriesProgressFilename))
// load the requests again for processing
require.NoError(t, mgr.loadDeleteRequestsToProcess())
mgr.MarkPhaseStarted()
require.NotNil(t, mgr.currentBatch)
// not hitting the timeout should clear the series progress
mgr.MarkPhaseFinished()

@ -0,0 +1,226 @@
package deletion
import (
"context"
"encoding/json"
"fmt"
"path"
"strings"
"time"
"github.com/grafana/loki/v3/pkg/compactor/retention"
"github.com/grafana/loki/v3/pkg/storage/chunk/client"
)
var ErrNoChunksSelectedForDeletion = fmt.Errorf("no chunks selected for deletion")
const (
maxChunksPerSegment = 100000
manifestFileName = "manifest.json"
)
// ChunksGroup holds a group of chunks selected by the same set of requests
type ChunksGroup struct {
Requests []DeleteRequest `json:"requests"`
Chunks []retention.Chunk `json:"chunks"`
}
// segment holds limited chunks(upto maxChunksPerSegment) that needs to be processed.
// It also helps segregate chunks belonging to different users/tables.
type segment struct {
UserID string `json:"user_id"`
TableName string `json:"table_name"`
ChunksGroups []ChunksGroup `json:"chunk_groups"`
ChunksCount int `json:"chunks_count"`
}
// manifest represents the completion state and summary of discovering chunks which processing for the loaded deleteRequestBatch.
// It serves two purposes:
// 1. Acts as a completion marker indicating all chunks for the given delete requests have been found
// 2. Stores a summary of data stored in segments:
// - Original and duplicate deletion requests
// - Total number of segments and chunks to be processed
//
// Once all the segments are processed, Requests and DuplicateRequests in the manifest could be marked as processed.
type manifest struct {
Requests []DeleteRequest `json:"requests"`
DuplicateRequests []DeleteRequest `json:"duplicate_requests"`
SegmentsCount int `json:"segments_count"`
ChunksCount int `json:"chunks_count"`
}
// deletionManifestBuilder helps with building the manifest for listing out which chunks to process for a batch of delete requests.
// It is not meant to be used concurrently.
type deletionManifestBuilder struct {
deleteStoreClient client.ObjectClient
deleteRequestBatch deleteRequestBatch
currentSegment map[uint64]ChunksGroup
currentSegmentChunksCount int
currentUserID string
currentTableName string
allUserRequests []*DeleteRequest
creationTime time.Time
segmentsCount int
overallChunksCount int
}
func newDeletionManifestBuilder(deleteStoreClient client.ObjectClient, deleteRequestBatch deleteRequestBatch) (*deletionManifestBuilder, error) {
requestCount := 0
for _, userRequests := range deleteRequestBatch.deleteRequestsToProcess {
requestCount += len(userRequests.requests)
}
// We use a uint64 as a bit field to track which delete requests apply to each chunk.
// Since uint64 has 64 bits, we can only handle up to 64 delete requests at a time.
if requestCount > 64 {
return nil, fmt.Errorf("only upto 64 delete requests allowed, current count: %d", requestCount)
}
builder := &deletionManifestBuilder{
deleteStoreClient: deleteStoreClient,
deleteRequestBatch: deleteRequestBatch,
currentSegment: make(map[uint64]ChunksGroup),
creationTime: time.Now(),
}
return builder, nil
}
// AddSeries adds a series and its chunks to the current segment.
// It flushes the current segment if the user ID or table name changes.
// It also ensures that the current segment does not exceed the maximum number of chunks.
func (d *deletionManifestBuilder) AddSeries(ctx context.Context, tableName string, series retention.Series) error {
userIDStr := unsafeGetString(series.UserID())
if userIDStr != d.currentUserID || tableName != d.currentTableName {
if err := d.flushCurrentBatch(ctx); err != nil {
return err
}
d.currentSegmentChunksCount = 0
d.currentSegment = make(map[uint64]ChunksGroup)
d.currentUserID = string(series.UserID())
d.currentTableName = tableName
d.allUserRequests = d.deleteRequestBatch.getAllRequestsForUser(userIDStr)
if len(d.allUserRequests) == 0 {
return fmt.Errorf("no requests loaded for user: %s", userIDStr)
}
}
var chunksGroupIdentifier uint64
for _, chk := range series.Chunks() {
if d.currentSegmentChunksCount >= maxChunksPerSegment {
if err := d.flushCurrentBatch(ctx); err != nil {
return err
}
d.currentSegmentChunksCount = 0
for chunksGroupIdentifier := range d.currentSegment {
group := d.currentSegment[chunksGroupIdentifier]
group.Chunks = group.Chunks[:0]
d.currentSegment[chunksGroupIdentifier] = group
}
}
// We use a uint64 as a bit field to track which delete requests apply to each chunk.
chunksGroupIdentifier = 0
for i, deleteRequest := range d.allUserRequests {
if !deleteRequest.IsDeleted(series.UserID(), series.Labels(), chk) {
continue
}
chunksGroupIdentifier |= 1 << i
}
if chunksGroupIdentifier == 0 {
continue
}
d.currentSegmentChunksCount++
if _, ok := d.currentSegment[chunksGroupIdentifier]; !ok {
// Iterate through d.allUserRequests and find which bits are turned on in chunksGroupIdentifier
var deleteRequests []DeleteRequest
for i := range d.allUserRequests {
if chunksGroupIdentifier&(1<<i) != 0 { // Check if the i-th bit is turned on
deleteRequest := d.allUserRequests[i]
deleteRequests = append(deleteRequests, DeleteRequest{
RequestID: deleteRequest.RequestID,
Query: deleteRequest.Query,
StartTime: deleteRequest.StartTime,
EndTime: deleteRequest.EndTime,
})
}
}
d.currentSegment[chunksGroupIdentifier] = ChunksGroup{
Requests: deleteRequests,
}
}
group := d.currentSegment[chunksGroupIdentifier]
group.Chunks = append(group.Chunks, chk)
d.currentSegment[chunksGroupIdentifier] = group
}
return nil
}
// Finish flushes the current segment and builds the manifest.
func (d *deletionManifestBuilder) Finish(ctx context.Context) error {
if err := d.flushCurrentBatch(ctx); err != nil {
return err
}
if d.overallChunksCount == 0 {
return ErrNoChunksSelectedForDeletion
}
var requests []DeleteRequest
for userID := range d.deleteRequestBatch.deleteRequestsToProcess {
for i := range d.deleteRequestBatch.deleteRequestsToProcess[userID].requests {
requests = append(requests, *d.deleteRequestBatch.deleteRequestsToProcess[userID].requests[i])
}
}
manifestJSON, err := json.Marshal(manifest{
Requests: requests,
DuplicateRequests: d.deleteRequestBatch.duplicateRequests,
SegmentsCount: d.segmentsCount,
ChunksCount: d.overallChunksCount,
})
if err != nil {
return err
}
return d.deleteStoreClient.PutObject(ctx, d.buildObjectKey(manifestFileName), strings.NewReader(unsafeGetString(manifestJSON)))
}
func (d *deletionManifestBuilder) flushCurrentBatch(ctx context.Context) error {
b := segment{
UserID: d.currentUserID,
TableName: d.currentTableName,
ChunksCount: d.currentSegmentChunksCount,
}
for _, group := range d.currentSegment {
if len(group.Chunks) == 0 {
continue
}
b.ChunksGroups = append(b.ChunksGroups, group)
}
if len(b.ChunksGroups) == 0 {
return nil
}
batchJSON, err := json.Marshal(b)
if err != nil {
return err
}
d.segmentsCount++
d.overallChunksCount += d.currentSegmentChunksCount
d.currentSegmentChunksCount = 0
return d.deleteStoreClient.PutObject(ctx, d.buildObjectKey(fmt.Sprintf("%d.json", d.segmentsCount)), strings.NewReader(unsafeGetString(batchJSON)))
}
func (d *deletionManifestBuilder) buildObjectKey(filename string) string {
return path.Join(fmt.Sprint(d.creationTime.UnixNano()), filename)
}

@ -0,0 +1,637 @@
package deletion
import (
"context"
"encoding/json"
"fmt"
"io"
"slices"
"strings"
"testing"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/compactor/retention"
"github.com/grafana/loki/v3/pkg/storage/chunk/client/local"
)
const (
req1 = "req1"
req2 = "req2"
table1 = "table1"
table2 = "table2"
lblFizzBuzz = `{fizz="buzz"}`
lblFooBarAndFizzBuzz = `{foo="bar", fizz="buzz"}`
)
func buildChunks(start model.Time, count int) []retention.Chunk {
chunks := make([]retention.Chunk, 0, count)
chunks = append(chunks, retention.Chunk{
ChunkID: []byte(fmt.Sprintf("%d", start)),
From: start,
Through: start + 1,
})
for i := 1; i < count; i++ {
from := chunks[i-1].From + 1
chunks = append(chunks, retention.Chunk{
ChunkID: []byte(fmt.Sprintf("%d", from)),
From: from,
Through: from + 1,
})
}
return chunks
}
type mockSeries struct {
seriesID []byte
userID string
labels labels.Labels
chunks []retention.Chunk
}
func (m *mockSeries) SeriesID() []byte {
return m.seriesID
}
func (m *mockSeries) Reset(seriesID, userID []byte, labels labels.Labels) {
m.seriesID = seriesID
m.userID = string(userID)
m.labels = labels
m.chunks = nil
}
func (m *mockSeries) AppendChunks(ref ...retention.Chunk) {
m.chunks = append(m.chunks, ref...)
}
func (m *mockSeries) UserID() []byte {
return []byte(m.userID)
}
func (m *mockSeries) Labels() labels.Labels {
return m.labels
}
func (m *mockSeries) Chunks() []retention.Chunk {
return m.chunks
}
func TestDeletionManifestBuilder(t *testing.T) {
tests := []struct {
name string
deleteRequests []DeleteRequest
series []struct {
tableName string
series *mockSeries
}
expectedManifest manifest
expectedSegments []segment
validateFunc func(t *testing.T, builder *deletionManifestBuilder)
}{
{
name: "single user with single segment",
deleteRequests: []DeleteRequest{
{
UserID: user1,
RequestID: req1,
Query: lblFooBar,
StartTime: 0,
EndTime: 100,
},
},
series: []struct {
tableName string
series *mockSeries
}{
{
tableName: table1,
series: &mockSeries{
userID: user1,
labels: mustParseLabel(lblFooBar),
chunks: buildChunks(10, 100),
},
},
},
expectedManifest: manifest{
Requests: []DeleteRequest{
{
RequestID: req1,
Query: lblFooBar,
StartTime: 0,
EndTime: 100,
},
},
SegmentsCount: 1,
ChunksCount: 91,
},
expectedSegments: []segment{
{
UserID: user1,
TableName: table1,
ChunksGroups: []ChunksGroup{
{
Requests: []DeleteRequest{
{
RequestID: req1,
Query: lblFooBar,
StartTime: 0,
EndTime: 100,
},
},
Chunks: buildChunks(10, 91),
},
},
ChunksCount: 91,
},
},
},
{
name: "single user with multiple segments due to chunks count",
deleteRequests: []DeleteRequest{
{
UserID: user1,
RequestID: req1,
Query: lblFooBar,
StartTime: 0,
EndTime: maxChunksPerSegment + 1,
},
},
series: []struct {
tableName string
series *mockSeries
}{
{
tableName: table1,
series: &mockSeries{
userID: user1,
labels: mustParseLabel(lblFooBar),
chunks: buildChunks(0, maxChunksPerSegment+1),
},
},
},
expectedManifest: manifest{
Requests: []DeleteRequest{
{
RequestID: req1,
Query: lblFooBar,
StartTime: 0,
EndTime: maxChunksPerSegment + 1,
},
},
SegmentsCount: 2,
ChunksCount: maxChunksPerSegment + 1,
},
expectedSegments: []segment{
{
UserID: user1,
TableName: table1,
ChunksGroups: []ChunksGroup{
{
Requests: []DeleteRequest{
{
RequestID: req1,
Query: lblFooBar,
StartTime: 0,
EndTime: maxChunksPerSegment + 1,
},
},
Chunks: buildChunks(0, maxChunksPerSegment),
},
},
ChunksCount: maxChunksPerSegment,
},
{
UserID: user1,
TableName: table1,
ChunksGroups: []ChunksGroup{
{
Requests: []DeleteRequest{
{
RequestID: req1,
Query: lblFooBar,
StartTime: 0,
EndTime: maxChunksPerSegment + 1,
},
},
Chunks: buildChunks(maxChunksPerSegment, 1),
},
},
ChunksCount: 1,
},
},
},
{
name: "single user with multiple segments due to multiple tables having chunks to delete",
deleteRequests: []DeleteRequest{
{
UserID: user1,
RequestID: req1,
Query: lblFooBar,
StartTime: 0,
EndTime: 100,
},
},
series: []struct {
tableName string
series *mockSeries
}{
{
tableName: table1,
series: &mockSeries{
userID: user1,
labels: mustParseLabel(lblFooBar),
chunks: buildChunks(0, 50),
},
},
{
tableName: table2,
series: &mockSeries{
userID: user1,
labels: mustParseLabel(lblFooBar),
chunks: buildChunks(50, 50),
},
},
},
expectedManifest: manifest{
Requests: []DeleteRequest{
{
RequestID: req1,
Query: lblFooBar,
StartTime: 0,
EndTime: 100,
},
},
SegmentsCount: 2,
ChunksCount: 100,
},
expectedSegments: []segment{
{
UserID: user1,
TableName: table1,
ChunksGroups: []ChunksGroup{
{
Requests: []DeleteRequest{
{
RequestID: req1,
Query: lblFooBar,
StartTime: 0,
EndTime: 100,
},
},
Chunks: buildChunks(0, 50),
},
},
ChunksCount: 50,
},
{
UserID: user1,
TableName: table2,
ChunksGroups: []ChunksGroup{
{
Requests: []DeleteRequest{
{
RequestID: req1,
Query: lblFooBar,
StartTime: 0,
EndTime: 100,
},
},
Chunks: buildChunks(50, 50),
},
},
ChunksCount: 50,
},
},
},
{
name: "multiple users with multiple segments",
deleteRequests: []DeleteRequest{
{
UserID: user1,
RequestID: req1,
Query: lblFooBar,
StartTime: 0,
EndTime: maxChunksPerSegment + 1,
},
{
UserID: user2,
RequestID: req2,
Query: lblFizzBuzz,
StartTime: 10,
EndTime: 10 + maxChunksPerSegment + 1,
},
},
series: []struct {
tableName string
series *mockSeries
}{
{
tableName: table1,
series: &mockSeries{
userID: user1,
labels: mustParseLabel(lblFooBar),
chunks: buildChunks(0, maxChunksPerSegment+1),
},
},
{
tableName: table1,
series: &mockSeries{
userID: user2,
labels: mustParseLabel(lblFizzBuzz),
chunks: buildChunks(10, maxChunksPerSegment+1),
},
},
},
expectedManifest: manifest{
Requests: []DeleteRequest{
{
RequestID: req1,
Query: lblFooBar,
StartTime: 0,
EndTime: maxChunksPerSegment + 1,
},
{
RequestID: req2,
Query: lblFizzBuzz,
StartTime: 10,
EndTime: 10 + maxChunksPerSegment + 1,
},
},
SegmentsCount: 4,
ChunksCount: (maxChunksPerSegment + 1) * 2,
},
expectedSegments: []segment{
{
UserID: user1,
TableName: table1,
ChunksGroups: []ChunksGroup{
{
Requests: []DeleteRequest{
{
RequestID: req1,
Query: lblFooBar,
StartTime: 0,
EndTime: maxChunksPerSegment + 1,
},
},
Chunks: buildChunks(0, maxChunksPerSegment),
},
},
ChunksCount: maxChunksPerSegment,
},
{
UserID: user1,
TableName: table1,
ChunksGroups: []ChunksGroup{
{
Requests: []DeleteRequest{
{
RequestID: req1,
Query: lblFooBar,
StartTime: 0,
EndTime: maxChunksPerSegment + 1,
},
},
Chunks: buildChunks(maxChunksPerSegment, 1),
},
},
ChunksCount: 1,
},
{
UserID: user2,
TableName: table1,
ChunksGroups: []ChunksGroup{
{
Requests: []DeleteRequest{
{
RequestID: req2,
Query: lblFizzBuzz,
StartTime: 10,
EndTime: 10 + maxChunksPerSegment + 1,
},
},
Chunks: buildChunks(10, maxChunksPerSegment),
},
},
ChunksCount: maxChunksPerSegment,
},
{
UserID: user2,
TableName: table1,
ChunksGroups: []ChunksGroup{
{
Requests: []DeleteRequest{
{
RequestID: req2,
Query: lblFizzBuzz,
StartTime: 10,
EndTime: 10 + maxChunksPerSegment + 1,
},
},
Chunks: buildChunks(10+maxChunksPerSegment, 1),
},
},
ChunksCount: 1,
},
},
},
{
name: "multiple delete requests covering same chunks",
deleteRequests: []DeleteRequest{
{
UserID: user1,
RequestID: req1,
Query: lblFooBar,
StartTime: 0,
EndTime: 100,
},
{
UserID: user1,
RequestID: req2,
Query: lblFizzBuzz,
StartTime: 51,
EndTime: 100,
},
},
series: []struct {
tableName string
series *mockSeries
}{
{
tableName: table1,
series: &mockSeries{
userID: user1,
labels: mustParseLabel(lblFooBarAndFizzBuzz),
chunks: buildChunks(25, 50),
},
},
},
expectedManifest: manifest{
Requests: []DeleteRequest{
{
RequestID: req1,
Query: lblFooBar,
StartTime: 0,
EndTime: 100,
},
{
RequestID: req2,
Query: lblFizzBuzz,
StartTime: 51,
EndTime: 100,
},
},
SegmentsCount: 1,
ChunksCount: 50,
},
expectedSegments: []segment{
{
UserID: user1,
TableName: table1,
ChunksGroups: []ChunksGroup{
{
Requests: []DeleteRequest{
{
RequestID: req1,
Query: lblFooBar,
StartTime: 0,
EndTime: 100,
},
},
Chunks: buildChunks(25, 25),
},
{
Requests: []DeleteRequest{
{
RequestID: req1,
Query: lblFooBar,
StartTime: 0,
EndTime: 100,
},
{
RequestID: req2,
Query: lblFizzBuzz,
StartTime: 51,
EndTime: 100,
},
},
Chunks: buildChunks(50, 25),
},
},
ChunksCount: 50,
},
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
tempDir := t.TempDir()
ctx := context.Background()
objectClient, err := local.NewFSObjectClient(local.FSConfig{
Directory: tempDir,
})
require.NoError(t, err)
// Create delete request batch
batch := newDeleteRequestBatch(nil)
for _, req := range tc.deleteRequests {
batch.addDeleteRequest(&req)
}
// Create builder
builder, err := newDeletionManifestBuilder(objectClient, *batch)
require.NoError(t, err)
// Process series
for _, s := range tc.series {
err := builder.AddSeries(ctx, s.tableName, s.series)
require.NoError(t, err)
}
// Finish and validate
err = builder.Finish(ctx)
require.NoError(t, err)
require.Equal(t, tc.expectedManifest.SegmentsCount, builder.segmentsCount)
require.Equal(t, tc.expectedManifest.ChunksCount, builder.overallChunksCount)
reader, _, err := builder.deleteStoreClient.GetObject(context.Background(), builder.buildObjectKey(manifestFileName))
require.NoError(t, err)
manifestJSON, err := io.ReadAll(reader)
require.NoError(t, err)
require.NoError(t, reader.Close())
var manifest manifest
require.NoError(t, json.Unmarshal(manifestJSON, &manifest))
slices.SortFunc(manifest.Requests, func(a, b DeleteRequest) int {
return strings.Compare(a.RequestID, b.RequestID)
})
require.Equal(t, tc.expectedManifest, manifest)
for i := 0; i < tc.expectedManifest.SegmentsCount; i++ {
reader, _, err := builder.deleteStoreClient.GetObject(context.Background(), builder.buildObjectKey(fmt.Sprintf("%d.json", i+1)))
require.NoError(t, err)
segmentJSON, err := io.ReadAll(reader)
require.NoError(t, err)
require.NoError(t, reader.Close())
var segment segment
require.NoError(t, json.Unmarshal(segmentJSON, &segment))
slices.SortFunc(segment.ChunksGroups, func(a, b ChunksGroup) int {
switch {
case len(a.Requests) < len(b.Requests):
return -1
case len(a.Requests) > len(b.Requests):
return 1
default:
return 0
}
})
require.Equal(t, tc.expectedSegments[i], segment)
}
})
}
}
func TestDeletionManifestBuilder_Errors(t *testing.T) {
tempDir := t.TempDir()
ctx := context.Background()
objectClient, err := local.NewFSObjectClient(local.FSConfig{
Directory: tempDir,
})
require.NoError(t, err)
// Create delete request batch
batch := newDeleteRequestBatch(nil)
batch.addDeleteRequest(&DeleteRequest{
UserID: user1,
RequestID: req1,
Query: lblFooBar,
StartTime: 0,
EndTime: 100,
})
// Create builder
builder, err := newDeletionManifestBuilder(objectClient, *batch)
require.NoError(t, err)
err = builder.AddSeries(ctx, table1, &mockSeries{
userID: user2,
labels: mustParseLabel(lblFooBar),
chunks: buildChunks(0, 25),
})
require.EqualError(t, err, fmt.Sprintf("no requests loaded for user: %s", user2))
err = builder.Finish(ctx)
require.EqualError(t, err, ErrNoChunksSelectedForDeletion.Error())
}

@ -41,36 +41,49 @@ func (c Chunk) String() string {
return fmt.Sprintf("ChunkID: %s", c.ChunkID)
}
type Series struct {
type Series interface {
SeriesID() []byte
UserID() []byte
Labels() labels.Labels
Chunks() []Chunk
Reset(seriesID, userID []byte, labels labels.Labels)
AppendChunks(ref ...Chunk)
}
func NewSeries() Series {
return &series{}
}
type series struct {
seriesID, userID []byte
labels labels.Labels
chunks []Chunk
}
func (s *Series) SeriesID() []byte {
func (s *series) SeriesID() []byte {
return s.seriesID
}
func (s *Series) UserID() []byte {
func (s *series) UserID() []byte {
return s.userID
}
func (s *Series) Labels() labels.Labels {
func (s *series) Labels() labels.Labels {
return s.labels
}
func (s *Series) Chunks() []Chunk {
func (s *series) Chunks() []Chunk {
return s.chunks
}
func (s *Series) Reset(seriesID, userID []byte, labels labels.Labels) {
func (s *series) Reset(seriesID, userID []byte, labels labels.Labels) {
s.seriesID = seriesID
s.userID = userID
s.labels = labels
s.chunks = s.chunks[:0]
}
func (s *Series) AppendChunks(ref ...Chunk) {
func (s *series) AppendChunks(ref ...Chunk) {
s.chunks = append(s.chunks, ref...)
}
@ -219,7 +232,7 @@ func markForDelete(
}
}
if expiration.CanSkipSeries(s.UserID(), s.labels, s.SeriesID(), seriesStart, tableName, now) {
if expiration.CanSkipSeries(s.UserID(), s.Labels(), s.SeriesID(), seriesStart, tableName, now) {
empty = false
return nil
}

@ -142,14 +142,14 @@ func (t *table) ForEachSeries(ctx context.Context, callback SeriesCallback) erro
Through: chk.Through,
})
}
series := Series{}
series := series{}
series.Reset(
[]byte(seriesID),
[]byte(userID),
labels.NewBuilder(t.chunks[userID][seriesID][0].Metric).Del(labels.MetricName).Labels(),
)
series.AppendChunks(chunks...)
if err := callback(series); err != nil {
if err := callback(&series); err != nil {
return err
}
}

@ -30,7 +30,7 @@ func ForEachSeries(ctx context.Context, bucket *bbolt.Bucket, config config.Peri
}
cursor := bucket.Cursor()
var current retention.Series
current := retention.NewSeries()
for key, _ := cursor.First(); key != nil && ctx.Err() == nil; key, _ = cursor.Next() {
ref, ok, err := parseChunkRef(decodeKey(key))

@ -297,7 +297,7 @@ func (c *compactedIndex) ForEachSeries(ctx context.Context, callback retention.S
logprotoChunkRef := logproto.ChunkRef{
UserID: c.userID,
}
var series retention.Series
series := retention.NewSeries()
for seriesID, stream := range c.builder.streams {
series.Reset(
getUnsafeBytes(seriesID),

@ -48,7 +48,7 @@ type testCompactedIdx struct {
}
func (t testCompactedIdx) ForEachSeries(_ context.Context, f retention.SeriesCallback) error {
var series retention.Series
series := retention.NewSeries()
series.AppendChunks(t.chunks...)
return f(series)
}

Loading…
Cancel
Save