From db59d0e9b2c463d85238e13a0c8dfa8a66e3967d Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Thu, 10 Apr 2025 17:11:13 +0530 Subject: [PATCH] fix: wire up deletion series progress tracking (#17099) --- pkg/compactor/compactor.go | 8 ++++++++ pkg/compactor/retention/expiration.go | 8 ++++++++ pkg/compactor/retention/retention.go | 6 +++++- pkg/compactor/retention/retention_test.go | 4 ++++ 4 files changed, 25 insertions(+), 1 deletion(-) diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index efd7fe7471..bd3e641389 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -890,6 +890,14 @@ func (e *expirationChecker) CanSkipSeries(userID []byte, lbls labels.Labels, ser return e.retentionExpiryChecker.CanSkipSeries(userID, lbls, seriesID, seriesStart, tableName, now) && e.deletionExpiryChecker.CanSkipSeries(userID, lbls, seriesID, seriesStart, tableName, now) } +func (e *expirationChecker) MarkSeriesAsProcessed(userID, seriesID []byte, lbls labels.Labels, tableName string) error { + if err := e.retentionExpiryChecker.MarkSeriesAsProcessed(userID, seriesID, lbls, tableName); err != nil { + return err + } + + return e.deletionExpiryChecker.MarkSeriesAsProcessed(userID, seriesID, lbls, tableName) +} + func (c *Compactor) OnRingInstanceRegister(_ *ring.BasicLifecycler, ringDesc ring.Desc, instanceExists bool, _ string, instanceDesc ring.InstanceDesc) (ring.InstanceState, ring.Tokens) { // When we initialize the compactor instance in the ring we want to start from // a clean situation, so whatever is the state we set it JOINING, while we keep existing diff --git a/pkg/compactor/retention/expiration.go b/pkg/compactor/retention/expiration.go index b1e9fe0f5e..2d6dd78075 100644 --- a/pkg/compactor/retention/expiration.go +++ b/pkg/compactor/retention/expiration.go @@ -31,6 +31,7 @@ type ExpirationChecker interface { MarkPhaseFinished() DropFromIndex(userID []byte, chk Chunk, labels labels.Labels, tableEndTime model.Time, now model.Time) bool CanSkipSeries(userID []byte, lbls labels.Labels, seriesID []byte, seriesStart model.Time, tableName string, now model.Time) bool + MarkSeriesAsProcessed(userID, seriesID []byte, lbls labels.Labels, tableName string) error } type expirationChecker struct { @@ -96,6 +97,10 @@ func (e *expirationChecker) CanSkipSeries(userID []byte, lbls labels.Labels, _ [ return now.Sub(seriesStart) < period } +func (e *expirationChecker) MarkSeriesAsProcessed(_, _ []byte, _ labels.Labels, _ string) error { + return nil +} + func (e *expirationChecker) IntervalMayHaveExpiredChunks(interval model.Interval, userID string) bool { // when userID is empty, it means we are checking for common index table. In this case we use e.overallLatestRetentionStartTime. latestRetentionStartTime := e.latestRetentionStartTime.overall @@ -136,6 +141,9 @@ func (e *neverExpiringExpirationChecker) DropFromIndex(_ []byte, _ Chunk, _ labe func (e *neverExpiringExpirationChecker) CanSkipSeries(_ []byte, _ labels.Labels, _ []byte, _ model.Time, _ string, _ model.Time) bool { return true } +func (e *neverExpiringExpirationChecker) MarkSeriesAsProcessed(_, _ []byte, _ labels.Labels, _ string) error { + return nil +} type TenantsRetention struct { limits Limits diff --git a/pkg/compactor/retention/retention.go b/pkg/compactor/retention/retention.go index 9cdfe0d85b..c8e741038f 100644 --- a/pkg/compactor/retention/retention.go +++ b/pkg/compactor/retention/retention.go @@ -280,7 +280,11 @@ func markForDelete( empty = false seriesMap.MarkSeriesNotDeleted(s.SeriesID(), s.UserID()) } - return iterCtx.Err() + if err := iterCtx.Err(); err != nil { + return err + } + + return expiration.MarkSeriesAsProcessed(s.UserID(), s.SeriesID(), s.Labels(), tableName) }) if err != nil { if errors.Is(err, context.DeadlineExceeded) && errors.Is(iterCtx.Err(), context.DeadlineExceeded) { diff --git a/pkg/compactor/retention/retention_test.go b/pkg/compactor/retention/retention_test.go index e14097f44b..47be3b9f47 100644 --- a/pkg/compactor/retention/retention_test.go +++ b/pkg/compactor/retention/retention_test.go @@ -700,6 +700,10 @@ func (m *mockExpirationChecker) CanSkipSeries(_ []byte, lbls labels.Labels, _ [] return m.skipSeries[lbls.String()] } +func (m *mockExpirationChecker) MarkSeriesAsProcessed(_, _ []byte, _ labels.Labels, _ string) error { + return nil +} + func TestMarkForDelete_SeriesCleanup(t *testing.T) { now := model.Now() schema := allSchemas[2]