fix: wire up deletion series progress tracking (#17099)

pull/17104/head
Sandeep Sukhani 9 months ago committed by GitHub
parent c49eeba326
commit db59d0e9b2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 8
      pkg/compactor/compactor.go
  2. 8
      pkg/compactor/retention/expiration.go
  3. 6
      pkg/compactor/retention/retention.go
  4. 4
      pkg/compactor/retention/retention_test.go

@ -890,6 +890,14 @@ func (e *expirationChecker) CanSkipSeries(userID []byte, lbls labels.Labels, ser
return e.retentionExpiryChecker.CanSkipSeries(userID, lbls, seriesID, seriesStart, tableName, now) && e.deletionExpiryChecker.CanSkipSeries(userID, lbls, seriesID, seriesStart, tableName, now)
}
func (e *expirationChecker) MarkSeriesAsProcessed(userID, seriesID []byte, lbls labels.Labels, tableName string) error {
if err := e.retentionExpiryChecker.MarkSeriesAsProcessed(userID, seriesID, lbls, tableName); err != nil {
return err
}
return e.deletionExpiryChecker.MarkSeriesAsProcessed(userID, seriesID, lbls, tableName)
}
func (c *Compactor) OnRingInstanceRegister(_ *ring.BasicLifecycler, ringDesc ring.Desc, instanceExists bool, _ string, instanceDesc ring.InstanceDesc) (ring.InstanceState, ring.Tokens) {
// When we initialize the compactor instance in the ring we want to start from
// a clean situation, so whatever is the state we set it JOINING, while we keep existing

@ -31,6 +31,7 @@ type ExpirationChecker interface {
MarkPhaseFinished()
DropFromIndex(userID []byte, chk Chunk, labels labels.Labels, tableEndTime model.Time, now model.Time) bool
CanSkipSeries(userID []byte, lbls labels.Labels, seriesID []byte, seriesStart model.Time, tableName string, now model.Time) bool
MarkSeriesAsProcessed(userID, seriesID []byte, lbls labels.Labels, tableName string) error
}
type expirationChecker struct {
@ -96,6 +97,10 @@ func (e *expirationChecker) CanSkipSeries(userID []byte, lbls labels.Labels, _ [
return now.Sub(seriesStart) < period
}
func (e *expirationChecker) MarkSeriesAsProcessed(_, _ []byte, _ labels.Labels, _ string) error {
return nil
}
func (e *expirationChecker) IntervalMayHaveExpiredChunks(interval model.Interval, userID string) bool {
// when userID is empty, it means we are checking for common index table. In this case we use e.overallLatestRetentionStartTime.
latestRetentionStartTime := e.latestRetentionStartTime.overall
@ -136,6 +141,9 @@ func (e *neverExpiringExpirationChecker) DropFromIndex(_ []byte, _ Chunk, _ labe
func (e *neverExpiringExpirationChecker) CanSkipSeries(_ []byte, _ labels.Labels, _ []byte, _ model.Time, _ string, _ model.Time) bool {
return true
}
func (e *neverExpiringExpirationChecker) MarkSeriesAsProcessed(_, _ []byte, _ labels.Labels, _ string) error {
return nil
}
type TenantsRetention struct {
limits Limits

@ -280,7 +280,11 @@ func markForDelete(
empty = false
seriesMap.MarkSeriesNotDeleted(s.SeriesID(), s.UserID())
}
return iterCtx.Err()
if err := iterCtx.Err(); err != nil {
return err
}
return expiration.MarkSeriesAsProcessed(s.UserID(), s.SeriesID(), s.Labels(), tableName)
})
if err != nil {
if errors.Is(err, context.DeadlineExceeded) && errors.Is(iterCtx.Err(), context.DeadlineExceeded) {

@ -700,6 +700,10 @@ func (m *mockExpirationChecker) CanSkipSeries(_ []byte, lbls labels.Labels, _ []
return m.skipSeries[lbls.String()]
}
func (m *mockExpirationChecker) MarkSeriesAsProcessed(_, _ []byte, _ labels.Labels, _ string) error {
return nil
}
func TestMarkForDelete_SeriesCleanup(t *testing.T) {
now := model.Now()
schema := allSchemas[2]

Loading…
Cancel
Save