From e3e1f096ea50cdcea8d64e726827368af14702a8 Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Thu, 20 Mar 2025 10:42:20 +0530 Subject: [PATCH] feat: store details of processed streams while processing delete requests (#16825) --- pkg/compactor/compactor.go | 33 +-- pkg/compactor/deletion/delete_request.go | 14 +- pkg/compactor/deletion/delete_request_test.go | 12 +- .../deletion/delete_requests_manager.go | 144 +++++++++++- .../deletion/delete_requests_manager_test.go | 221 +++++++++++++++++- pkg/compactor/retention/expiration.go | 36 ++- pkg/compactor/retention/expiration_test.go | 104 ++++++--- pkg/compactor/retention/retention.go | 180 +++++++++----- pkg/compactor/retention/retention_test.go | 131 +++++++++-- pkg/compactor/retention/series.go | 16 +- pkg/compactor/retention/util_test.go | 95 ++++---- pkg/compactor/testutil.go | 7 +- .../boltdb/compactor/compacted_index.go | 13 +- .../boltdb/compactor/compacted_index_test.go | 29 ++- .../indexshipper/boltdb/compactor/index.go | 24 +- .../indexshipper/boltdb/compactor/iterator.go | 67 +++++- .../boltdb/compactor/iterator_test.go | 81 +++---- .../indexshipper/boltdb/compactor/series.go | 6 +- .../shipper/indexshipper/tsdb/compactor.go | 57 +++-- .../indexshipper/tsdb/compactor_test.go | 164 ++++++++----- .../shipper/indexshipper/tsdb/index/chunk.go | 2 +- pkg/tool/audit/audit.go | 18 +- pkg/tool/audit/audit_test.go | 25 +- tools/tsdb/tsdb-map/main.go | 22 +- 24 files changed, 1073 insertions(+), 428 deletions(-) diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index e6e05abcd5..efd7fe7471 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -13,14 +13,14 @@ import ( "unsafe" "github.com/go-kit/log/level" - "github.com/pkg/errors" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/common/model" - "github.com/grafana/dskit/backoff" "github.com/grafana/dskit/kv" "github.com/grafana/dskit/ring" "github.com/grafana/dskit/services" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" "github.com/grafana/loki/v3/pkg/analytics" "github.com/grafana/loki/v3/pkg/compactor/deletion" @@ -406,13 +406,10 @@ func (c *Compactor) initDeletes(objectClient client.ObjectClient, indexUpdatePro c.DeleteRequestsGRPCHandler = deletion.NewGRPCRequestHandler(c.deleteRequestsStore, limits) - c.deleteRequestsManager = deletion.NewDeleteRequestsManager( - c.deleteRequestsStore, - c.cfg.DeleteRequestCancelPeriod, - c.cfg.DeleteBatchSize, - limits, - r, - ) + c.deleteRequestsManager, err = deletion.NewDeleteRequestsManager(deletionWorkDir, c.deleteRequestsStore, c.cfg.DeleteRequestCancelPeriod, c.cfg.DeleteBatchSize, limits, r) + if err != nil { + return err + } c.expirationChecker = newExpirationChecker(retention.NewExpirationChecker(limits), c.deleteRequestsManager) return nil @@ -853,12 +850,12 @@ func newExpirationChecker(retentionExpiryChecker, deletionExpiryChecker retentio return &expirationChecker{retentionExpiryChecker, deletionExpiryChecker} } -func (e *expirationChecker) Expired(ref retention.ChunkEntry, now model.Time) (bool, filter.Func) { - if expired, nonDeletedIntervals := e.retentionExpiryChecker.Expired(ref, now); expired { +func (e *expirationChecker) Expired(userID []byte, chk retention.Chunk, lbls labels.Labels, seriesID []byte, tableName string, now model.Time) (bool, filter.Func) { + if expired, nonDeletedIntervals := e.retentionExpiryChecker.Expired(userID, chk, lbls, seriesID, tableName, now); expired { return expired, nonDeletedIntervals } - return e.deletionExpiryChecker.Expired(ref, now) + return e.deletionExpiryChecker.Expired(userID, chk, lbls, seriesID, tableName, now) } func (e *expirationChecker) MarkPhaseStarted() { @@ -885,8 +882,12 @@ func (e *expirationChecker) IntervalMayHaveExpiredChunks(interval model.Interval return e.retentionExpiryChecker.IntervalMayHaveExpiredChunks(interval, userID) || e.deletionExpiryChecker.IntervalMayHaveExpiredChunks(interval, userID) } -func (e *expirationChecker) DropFromIndex(ref retention.ChunkEntry, tableEndTime model.Time, now model.Time) bool { - return e.retentionExpiryChecker.DropFromIndex(ref, tableEndTime, now) || e.deletionExpiryChecker.DropFromIndex(ref, tableEndTime, now) +func (e *expirationChecker) DropFromIndex(userID []byte, chk retention.Chunk, labels labels.Labels, tableEndTime model.Time, now model.Time) bool { + return e.retentionExpiryChecker.DropFromIndex(userID, chk, labels, tableEndTime, now) || e.deletionExpiryChecker.DropFromIndex(userID, chk, labels, tableEndTime, now) +} + +func (e *expirationChecker) CanSkipSeries(userID []byte, lbls labels.Labels, seriesID []byte, seriesStart model.Time, tableName string, now model.Time) bool { + return e.retentionExpiryChecker.CanSkipSeries(userID, lbls, seriesID, seriesStart, tableName, now) && e.deletionExpiryChecker.CanSkipSeries(userID, lbls, seriesID, seriesStart, tableName, now) } func (c *Compactor) OnRingInstanceRegister(_ *ring.BasicLifecycler, ringDesc ring.Desc, instanceExists bool, _ string, instanceDesc ring.InstanceDesc) (ring.InstanceState, ring.Tokens) { diff --git a/pkg/compactor/deletion/delete_request.go b/pkg/compactor/deletion/delete_request.go index 5dea77bb32..4fc16c9fc6 100644 --- a/pkg/compactor/deletion/delete_request.go +++ b/pkg/compactor/deletion/delete_request.go @@ -109,14 +109,14 @@ func allMatch(matchers []*labels.Matcher, labels labels.Labels) bool { // IsDeleted checks if the given ChunkEntry will be deleted by this DeleteRequest. // It returns a filter.Func if the chunk is supposed to be deleted partially or the delete request contains line filters. // If the filter.Func is nil, the whole chunk is supposed to be deleted. -func (d *DeleteRequest) IsDeleted(entry retention.ChunkEntry) (bool, filter.Func) { - if d.UserID != unsafeGetString(entry.UserID) { +func (d *DeleteRequest) IsDeleted(userID []byte, lbls labels.Labels, chunk retention.Chunk) (bool, filter.Func) { + if d.UserID != unsafeGetString(userID) { return false, nil } if !intervalsOverlap(model.Interval{ - Start: entry.From, - End: entry.Through, + Start: chunk.From, + End: chunk.Through, }, model.Interval{ Start: d.StartTime, End: d.EndTime, @@ -137,16 +137,16 @@ func (d *DeleteRequest) IsDeleted(entry retention.ChunkEntry) (bool, filter.Func } } - if !labels.Selector(d.matchers).Matches(entry.Labels) { + if !labels.Selector(d.matchers).Matches(lbls) { return false, nil } - if d.StartTime <= entry.From && d.EndTime >= entry.Through && !d.logSelectorExpr.HasFilter() { + if d.StartTime <= chunk.From && d.EndTime >= chunk.Through && !d.logSelectorExpr.HasFilter() { // Delete request covers the whole chunk and there are no line filters in the logSelectorExpr so the whole chunk will be deleted return true, nil } - ff, err := d.FilterFunction(entry.Labels) + ff, err := d.FilterFunction(lbls) if err != nil { // The query in the delete request is checked when added to the table. // So this error should not occur. diff --git a/pkg/compactor/deletion/delete_request_test.go b/pkg/compactor/deletion/delete_request_test.go index d8b64f2031..325a2c002f 100644 --- a/pkg/compactor/deletion/delete_request_test.go +++ b/pkg/compactor/deletion/delete_request_test.go @@ -33,13 +33,9 @@ func TestDeleteRequest_IsDeleted(t *testing.T) { lblWithStructuredMetadataFilter := `{foo="bar", fizz="buzz"} | ping="pong"` lblWithLineAndStructuredMetadataFilter := `{foo="bar", fizz="buzz"} | ping="pong" |= "filter"` - chunkEntry := retention.ChunkEntry{ - ChunkRef: retention.ChunkRef{ - UserID: []byte(user1), - From: now.Add(-3 * time.Hour), - Through: now.Add(-time.Hour), - }, - Labels: mustParseLabel(lbl), + chunkEntry := retention.Chunk{ + From: now.Add(-3 * time.Hour), + Through: now.Add(-time.Hour), } type resp struct { @@ -275,7 +271,7 @@ func TestDeleteRequest_IsDeleted(t *testing.T) { t.Run(tc.name, func(t *testing.T) { require.NoError(t, tc.deleteRequest.SetQuery(tc.deleteRequest.Query)) tc.deleteRequest.Metrics = newDeleteRequestsManagerMetrics(nil) - isExpired, filterFunc := tc.deleteRequest.IsDeleted(chunkEntry) + isExpired, filterFunc := tc.deleteRequest.IsDeleted([]byte(user1), mustParseLabel(lbl), chunkEntry) require.Equal(t, tc.expectedResp.isDeleted, isExpired) if tc.expectedResp.expectedFilter == nil { require.Nil(t, filterFunc) diff --git a/pkg/compactor/deletion/delete_requests_manager.go b/pkg/compactor/deletion/delete_requests_manager.go index 230a67ce79..55a566679b 100644 --- a/pkg/compactor/deletion/delete_requests_manager.go +++ b/pkg/compactor/deletion/delete_requests_manager.go @@ -2,12 +2,16 @@ package deletion import ( "context" + "encoding/json" "fmt" + "os" + "path/filepath" "sort" "sync" "time" "github.com/go-kit/log/level" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" @@ -21,6 +25,8 @@ import ( const ( statusSuccess = "success" statusFail = "fail" + + seriesProgressFilename = "series_progress.json" ) type userDeleteRequests struct { @@ -30,6 +36,7 @@ type userDeleteRequests struct { } type DeleteRequestsManager struct { + workingDir string deleteRequestsStore DeleteRequestsStore deleteRequestCancelPeriod time.Duration @@ -41,10 +48,12 @@ type DeleteRequestsManager struct { done chan struct{} batchSize int limits Limits + processedSeries map[string]struct{} } -func NewDeleteRequestsManager(store DeleteRequestsStore, deleteRequestCancelPeriod time.Duration, batchSize int, limits Limits, registerer prometheus.Registerer) *DeleteRequestsManager { +func NewDeleteRequestsManager(workingDir string, store DeleteRequestsStore, deleteRequestCancelPeriod time.Duration, batchSize int, limits Limits, registerer prometheus.Registerer) (*DeleteRequestsManager, error) { dm := &DeleteRequestsManager{ + workingDir: workingDir, deleteRequestsStore: store, deleteRequestCancelPeriod: deleteRequestCancelPeriod, deleteRequestsToProcess: map[string]*userDeleteRequests{}, @@ -52,6 +61,13 @@ func NewDeleteRequestsManager(store DeleteRequestsStore, deleteRequestCancelPeri done: make(chan struct{}), batchSize: batchSize, limits: limits, + processedSeries: map[string]struct{}{}, + } + + var err error + dm.processedSeries, err = loadSeriesProgress(workingDir) + if err != nil { + return nil, err } go dm.loop() @@ -60,7 +76,7 @@ func NewDeleteRequestsManager(store DeleteRequestsStore, deleteRequestCancelPeri level.Error(util_log.Logger).Log("msg", "failed to merge sharded requests", "err", err) } - return dm + return dm, nil } func (d *DeleteRequestsManager) loop() { @@ -76,6 +92,10 @@ func (d *DeleteRequestsManager) loop() { if err := d.updateMetrics(); err != nil { level.Error(util_log.Logger).Log("msg", "failed to update metrics", "err", err) } + + if err := d.storeSeriesProgress(); err != nil { + level.Error(util_log.Logger).Log("msg", "failed to store series progress", "err", err) + } case <-d.done: return } @@ -85,6 +105,27 @@ func (d *DeleteRequestsManager) loop() { func (d *DeleteRequestsManager) Stop() { close(d.done) d.wg.Wait() + if err := d.storeSeriesProgress(); err != nil { + level.Error(util_log.Logger).Log("msg", "failed to store series progress", "err", err) + } +} + +func (d *DeleteRequestsManager) storeSeriesProgress() error { + if len(d.processedSeries) == 0 { + return nil + } + + data, err := json.Marshal(d.processedSeries) + if err != nil { + return errors.Wrap(err, "failed to json encode series progress") + } + + err = os.WriteFile(filepath.Join(d.workingDir, seriesProgressFilename), data, 0640) + if err != nil { + return errors.Wrap(err, "failed to store series progress to the file") + } + + return nil } func (d *DeleteRequestsManager) updateMetrics() error { @@ -180,6 +221,13 @@ func (d *DeleteRequestsManager) loadDeleteRequestsToProcess() error { break } + if deleteRequest.logSelectorExpr == nil { + err := deleteRequest.SetQuery(deleteRequest.Query) + if err != nil { + return errors.Wrapf(err, "failed to init log selector expr for request_id=%s, user_id=%s", deleteRequest.RequestID, deleteRequest.UserID) + } + } + level.Info(util_log.Logger).Log( "msg", "Started processing delete request for user", "delete_request_id", deleteRequest.RequestID, @@ -278,14 +326,39 @@ func (d *DeleteRequestsManager) shouldProcessRequest(dr DeleteRequest) (bool, er return mode == deletionmode.FilterAndDelete, nil } -func (d *DeleteRequestsManager) Expired(ref retention.ChunkEntry, _ model.Time) (bool, filter.Func) { +func (d *DeleteRequestsManager) CanSkipSeries(userID []byte, lbls labels.Labels, seriesID []byte, _ model.Time, tableName string, _ model.Time) bool { + userIDStr := unsafeGetString(userID) + + d.deleteRequestsToProcessMtx.Lock() + defer d.deleteRequestsToProcessMtx.Unlock() + + if d.deleteRequestsToProcess[userIDStr] == nil { + return true + } + + for _, deleteRequest := range d.deleteRequestsToProcess[userIDStr].requests { + // if the delete request does not touch the series, continue looking for other matching requests + if !labels.Selector(deleteRequest.matchers).Matches(lbls) { + continue + } + + // The delete request touches the series. Do not skip if the series is not processed yet. + if _, ok := d.processedSeries[buildProcessedSeriesKey(deleteRequest.RequestID, seriesID, tableName)]; !ok { + return false + } + } + + return true +} + +func (d *DeleteRequestsManager) Expired(userID []byte, chk retention.Chunk, lbls labels.Labels, seriesID []byte, tableName string, _ model.Time) (bool, filter.Func) { d.deleteRequestsToProcessMtx.Lock() defer d.deleteRequestsToProcessMtx.Unlock() - userIDStr := unsafeGetString(ref.UserID) + userIDStr := unsafeGetString(userID) if d.deleteRequestsToProcess[userIDStr] == nil || !intervalsOverlap(d.deleteRequestsToProcess[userIDStr].requestsInterval, model.Interval{ - Start: ref.From, - End: ref.Through, + Start: chk.From, + End: chk.Through, }) { return false, nil } @@ -293,7 +366,10 @@ func (d *DeleteRequestsManager) Expired(ref retention.ChunkEntry, _ model.Time) var filterFuncs []filter.Func for _, deleteRequest := range d.deleteRequestsToProcess[userIDStr].requests { - isDeleted, ff := deleteRequest.IsDeleted(ref) + if _, ok := d.processedSeries[buildProcessedSeriesKey(deleteRequest.RequestID, seriesID, tableName)]; ok { + continue + } + isDeleted, ff := deleteRequest.IsDeleted(userID, lbls, chk) if !isDeleted { continue } @@ -304,9 +380,9 @@ func (d *DeleteRequestsManager) Expired(ref retention.ChunkEntry, _ model.Time) "delete_request_id", deleteRequest.RequestID, "sequence_num", deleteRequest.SequenceNum, "user", deleteRequest.UserID, - "chunkID", string(ref.ChunkID), + "chunkID", string(chk.ChunkID), ) - d.metrics.deleteRequestsChunksSelectedTotal.WithLabelValues(string(ref.UserID)).Inc() + d.metrics.deleteRequestsChunksSelectedTotal.WithLabelValues(string(userID)).Inc() return true, nil } filterFuncs = append(filterFuncs, ff) @@ -316,7 +392,7 @@ func (d *DeleteRequestsManager) Expired(ref retention.ChunkEntry, _ model.Time) return false, nil } - d.metrics.deleteRequestsChunksSelectedTotal.WithLabelValues(string(ref.UserID)).Inc() + d.metrics.deleteRequestsChunksSelectedTotal.WithLabelValues(string(userID)).Inc() return true, func(ts time.Time, s string, structuredMetadata ...labels.Label) bool { for _, ff := range filterFuncs { if ff(ts, s, structuredMetadata...) { @@ -401,6 +477,11 @@ func (d *DeleteRequestsManager) MarkPhaseFinished() { if err := d.deleteRequestsStore.MergeShardedRequests(context.Background()); err != nil { level.Error(util_log.Logger).Log("msg", "failed to merge sharded requests", "err", err) } + + d.processedSeries = map[string]struct{}{} + if err := os.Remove(filepath.Join(d.workingDir, seriesProgressFilename)); err != nil && !os.IsNotExist(err) { + level.Error(util_log.Logger).Log("msg", "failed to remove series progress file", "err", err) + } } func (d *DeleteRequestsManager) IntervalMayHaveExpiredChunks(_ model.Interval, userID string) bool { @@ -417,10 +498,35 @@ func (d *DeleteRequestsManager) IntervalMayHaveExpiredChunks(_ model.Interval, u return len(d.deleteRequestsToProcess) != 0 } -func (d *DeleteRequestsManager) DropFromIndex(_ retention.ChunkEntry, _ model.Time, _ model.Time) bool { +func (d *DeleteRequestsManager) DropFromIndex(_ []byte, _ retention.Chunk, _ labels.Labels, _ model.Time, _ model.Time) bool { return false } +func (d *DeleteRequestsManager) MarkSeriesAsProcessed(userID, seriesID []byte, lbls labels.Labels, tableName string) error { + userIDStr := unsafeGetString(userID) + if d.deleteRequestsToProcess[userIDStr] == nil { + return nil + } + + for _, req := range d.deleteRequestsToProcess[userIDStr].requests { + // if the delete request does not touch the series, do not waste space in storing the marker + if !labels.Selector(req.matchers).Matches(lbls) { + continue + } + processedSeriesKey := buildProcessedSeriesKey(req.RequestID, seriesID, tableName) + if _, ok := d.processedSeries[processedSeriesKey]; ok { + return fmt.Errorf("series for [table: %s, series: %s, user: %s, req: %s]", tableName, seriesID, userID, req.RequestID) + } + d.processedSeries[processedSeriesKey] = struct{}{} + } + + return nil +} + +func buildProcessedSeriesKey(requestID string, seriesID []byte, tableName string) string { + return fmt.Sprintf("%s/%s/%s", requestID, tableName, seriesID) +} + func getMaxRetentionInterval(userID string, limits Limits) time.Duration { maxRetention := model.Duration(limits.RetentionPeriod(userID)) if maxRetention == 0 { @@ -438,3 +544,19 @@ func getMaxRetentionInterval(userID string, limits Limits) time.Duration { return time.Duration(maxRetention) } + +func loadSeriesProgress(workingDir string) (map[string]struct{}, error) { + data, err := os.ReadFile(filepath.Join(workingDir, seriesProgressFilename)) + if err != nil && !os.IsNotExist(err) { + return nil, err + } + + processedSeries := map[string]struct{}{} + if len(data) > 0 { + if err := json.Unmarshal(data, &processedSeries); err != nil { + return nil, err + } + } + + return processedSeries, nil +} diff --git a/pkg/compactor/deletion/delete_requests_manager_test.go b/pkg/compactor/deletion/delete_requests_manager_test.go index 24123a703e..f586cdf0e1 100644 --- a/pkg/compactor/deletion/delete_requests_manager_test.go +++ b/pkg/compactor/deletion/delete_requests_manager_test.go @@ -2,6 +2,7 @@ package deletion import ( "context" + "path/filepath" "strings" "testing" "time" @@ -31,13 +32,9 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { streamSelectorWithStructuredMetadataFilters := lblFoo.String() + `| ping="pong"` streamSelectorWithLineAndStructuredMetadataFilters := lblFoo.String() + `| ping="pong" |= "fizz"` - chunkEntry := retention.ChunkEntry{ - ChunkRef: retention.ChunkRef{ - UserID: []byte(testUserID), - From: now.Add(-12 * time.Hour), - Through: now.Add(-time.Hour), - }, - Labels: lblFoo, + chunkEntry := retention.Chunk{ + From: now.Add(-12 * time.Hour), + Through: now.Add(-time.Hour), } for _, tc := range []struct { @@ -936,10 +933,11 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { } { t.Run(tc.name, func(t *testing.T) { mockDeleteRequestsStore := &mockDeleteRequestsStore{deleteRequests: tc.deleteRequestsFromStore} - mgr := NewDeleteRequestsManager(mockDeleteRequestsStore, time.Hour, tc.batchSize, &fakeLimits{defaultLimit: limit{ + mgr, err := NewDeleteRequestsManager(t.TempDir(), mockDeleteRequestsStore, time.Hour, tc.batchSize, &fakeLimits{defaultLimit: limit{ retentionPeriod: 7 * 24 * time.Hour, deletionMode: tc.deletionMode.String(), }}, nil) + require.NoError(t, err) require.NoError(t, mgr.loadDeleteRequestsToProcess()) for _, deleteRequests := range mgr.deleteRequestsToProcess { @@ -948,7 +946,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { } } - isExpired, filterFunc := mgr.Expired(chunkEntry, model.Now()) + isExpired, filterFunc := mgr.Expired([]byte(testUserID), chunkEntry, lblFoo, nil, "", model.Now()) require.Equal(t, tc.expectedResp.isExpired, isExpired) if tc.expectedResp.expectedFilter == nil { require.Nil(t, filterFunc) @@ -1007,7 +1005,8 @@ func TestDeleteRequestsManager_IntervalMayHaveExpiredChunks(t *testing.T) { } for _, tc := range tt { - mgr := NewDeleteRequestsManager(&mockDeleteRequestsStore{deleteRequests: tc.deleteRequestsFromStore}, time.Hour, 70, &fakeLimits{defaultLimit: limit{deletionMode: deletionmode.FilterAndDelete.String()}}, nil) + mgr, err := NewDeleteRequestsManager(t.TempDir(), &mockDeleteRequestsStore{deleteRequests: tc.deleteRequestsFromStore}, time.Hour, 70, &fakeLimits{defaultLimit: limit{deletionMode: deletionmode.FilterAndDelete.String()}}, nil) + require.NoError(t, err) require.NoError(t, mgr.loadDeleteRequestsToProcess()) interval := model.Interval{Start: 300, End: 600} @@ -1015,6 +1014,208 @@ func TestDeleteRequestsManager_IntervalMayHaveExpiredChunks(t *testing.T) { } } +func TestDeleteRequestsManager_SeriesProgress(t *testing.T) { + user1 := []byte("user1") + user2 := []byte("user2") + lblFooBar := mustParseLabel(`{foo="bar"}`) + lblFizzBuzz := mustParseLabel(`{fizz="buzz"}`) + deleteRequestsStore := &mockDeleteRequestsStore{deleteRequests: []DeleteRequest{ + {RequestID: "1", Query: lblFooBar.String(), UserID: string(user1), StartTime: 0, EndTime: 100, Status: StatusReceived}, + {RequestID: "2", Query: lblFooBar.String(), UserID: string(user2), StartTime: 0, EndTime: 100, Status: StatusReceived}, + }} + type markSeriesProcessed struct { + userID, seriesID []byte + lbls labels.Labels + tableName string + } + + type chunkEntry struct { + userID []byte + chk retention.Chunk + lbls labels.Labels + seriesID []byte + tableName string + } + + for _, tc := range []struct { + name string + seriesToMarkProcessed []markSeriesProcessed + chunkEntry chunkEntry + expSkipSeries bool + expExpired bool + }{ + { + name: "no series marked as processed", + chunkEntry: chunkEntry{ + userID: user1, + chk: retention.Chunk{ + From: 10, + Through: 20, + }, + lbls: lblFooBar, + seriesID: []byte(lblFooBar.String()), + tableName: "t1", + }, + expSkipSeries: false, + expExpired: true, + }, + { + name: "chunk's series marked as processed", + seriesToMarkProcessed: []markSeriesProcessed{ + { + userID: user1, + seriesID: []byte(lblFooBar.String()), + lbls: lblFooBar, + tableName: "t1", + }, + }, + chunkEntry: chunkEntry{ + userID: user1, + chk: retention.Chunk{ + From: 10, + Through: 20, + }, + lbls: lblFooBar, + seriesID: []byte(lblFooBar.String()), + tableName: "t1", + }, + expSkipSeries: true, + expExpired: false, + }, + { + name: "a different series marked as processed", + seriesToMarkProcessed: []markSeriesProcessed{ + { + userID: user1, + seriesID: []byte(lblFizzBuzz.String()), + lbls: lblFizzBuzz, + tableName: "t1", + }, + }, + chunkEntry: chunkEntry{ + userID: user1, + chk: retention.Chunk{ + From: 10, + Through: 20, + }, + lbls: lblFooBar, + seriesID: []byte(lblFooBar.String()), + tableName: "t1", + }, + expSkipSeries: false, + expExpired: true, + }, + { + name: "a different users series marked as processed", + seriesToMarkProcessed: []markSeriesProcessed{ + { + userID: user2, + seriesID: []byte(lblFooBar.String()), + lbls: lblFooBar, + tableName: "t1", + }, + }, + chunkEntry: chunkEntry{ + userID: user1, + chk: retention.Chunk{ + From: 10, + Through: 20, + }, + lbls: lblFooBar, + seriesID: []byte(lblFooBar.String()), + tableName: "t1", + }, + expSkipSeries: false, + expExpired: true, + }, + { + name: "series from different table marked as processed", + seriesToMarkProcessed: []markSeriesProcessed{ + { + userID: user1, + seriesID: []byte(lblFooBar.String()), + lbls: lblFooBar, + tableName: "t2", + }, + }, + chunkEntry: chunkEntry{ + userID: user1, + chk: retention.Chunk{ + From: 10, + Through: 20, + }, + lbls: lblFooBar, + seriesID: []byte(lblFooBar.String()), + tableName: "t1", + }, + expSkipSeries: false, + expExpired: true, + }, + { + name: "multiple series marked as processed", + seriesToMarkProcessed: []markSeriesProcessed{ + { + userID: user1, + seriesID: []byte(lblFooBar.String()), + lbls: lblFooBar, + tableName: "t1", + }, + { + userID: user1, + seriesID: []byte(lblFooBar.String()), + lbls: lblFooBar, + tableName: "t2", + }, + { + userID: user2, + seriesID: []byte(lblFooBar.String()), + lbls: lblFooBar, + tableName: "t1", + }, + }, + chunkEntry: chunkEntry{ + userID: user1, + chk: retention.Chunk{ + From: 10, + Through: 20, + }, + lbls: lblFooBar, + seriesID: []byte(lblFooBar.String()), + tableName: "t1", + }, + expSkipSeries: true, + expExpired: false, + }, + } { + t.Run(tc.name, func(t *testing.T) { + workingDir := t.TempDir() + mgr, err := NewDeleteRequestsManager(workingDir, deleteRequestsStore, time.Hour, 70, &fakeLimits{defaultLimit: limit{deletionMode: deletionmode.FilterAndDelete.String()}}, nil) + require.NoError(t, err) + require.NoError(t, mgr.loadDeleteRequestsToProcess()) + + for _, m := range tc.seriesToMarkProcessed { + require.NoError(t, mgr.MarkSeriesAsProcessed(m.userID, m.seriesID, m.lbls, m.tableName)) + } + + require.Equal(t, tc.expSkipSeries, mgr.CanSkipSeries(tc.chunkEntry.userID, tc.chunkEntry.lbls, tc.chunkEntry.seriesID, 0, tc.chunkEntry.tableName, 0)) + isExpired, _ := mgr.Expired(tc.chunkEntry.userID, tc.chunkEntry.chk, tc.chunkEntry.lbls, tc.chunkEntry.seriesID, tc.chunkEntry.tableName, 0) + require.Equal(t, tc.expExpired, isExpired) + + // see if stopping the manager properly retains the progress and loads back when initialized + storedSeriesProgress := mgr.processedSeries + mgr.Stop() + mgr, err = NewDeleteRequestsManager(workingDir, deleteRequestsStore, time.Hour, 70, &fakeLimits{defaultLimit: limit{deletionMode: deletionmode.FilterAndDelete.String()}}, nil) + require.NoError(t, err) + require.Equal(t, storedSeriesProgress, mgr.processedSeries) + + // when the mark phase ends, series progress should get cleared + mgr.MarkPhaseFinished() + require.Len(t, mgr.processedSeries, 0) + require.NoFileExists(t, filepath.Join(workingDir, seriesProgressFilename)) + }) + } +} + type storeAddReqDetails struct { userID, query string startTime, endTime model.Time diff --git a/pkg/compactor/retention/expiration.go b/pkg/compactor/retention/expiration.go index a1d2415ace..b1e9fe0f5e 100644 --- a/pkg/compactor/retention/expiration.go +++ b/pkg/compactor/retention/expiration.go @@ -23,13 +23,14 @@ type IntervalFilter struct { } type ExpirationChecker interface { - Expired(ref ChunkEntry, now model.Time) (bool, filter.Func) + Expired(userID []byte, chk Chunk, lbls labels.Labels, seriesID []byte, tableName string, now model.Time) (bool, filter.Func) IntervalMayHaveExpiredChunks(interval model.Interval, userID string) bool MarkPhaseStarted() MarkPhaseFailed() MarkPhaseTimedOut() MarkPhaseFinished() - DropFromIndex(ref ChunkEntry, tableEndTime model.Time, now model.Time) bool + DropFromIndex(userID []byte, chk Chunk, labels labels.Labels, tableEndTime model.Time, now model.Time) bool + CanSkipSeries(userID []byte, lbls labels.Labels, seriesID []byte, seriesStart model.Time, tableName string, now model.Time) bool } type expirationChecker struct { @@ -52,22 +53,22 @@ func NewExpirationChecker(limits Limits) ExpirationChecker { } // Expired tells if a ref chunk is expired based on retention rules. -func (e *expirationChecker) Expired(ref ChunkEntry, now model.Time) (bool, filter.Func) { - userID := unsafeGetString(ref.UserID) - period := e.tenantsRetention.RetentionPeriodFor(userID, ref.Labels) +func (e *expirationChecker) Expired(userID []byte, chk Chunk, lbls labels.Labels, _ []byte, _ string, now model.Time) (bool, filter.Func) { + userIDStr := unsafeGetString(userID) + period := e.tenantsRetention.RetentionPeriodFor(userIDStr, lbls) // The 0 value should disable retention if period <= 0 { return false, nil } - return now.Sub(ref.Through) > period, nil + return now.Sub(chk.Through) > period, nil } // DropFromIndex tells if it is okay to drop the chunk entry from index table. // We check if tableEndTime is out of retention period, calculated using the labels from the chunk. // If the tableEndTime is out of retention then we can drop the chunk entry without removing the chunk from the store. -func (e *expirationChecker) DropFromIndex(ref ChunkEntry, tableEndTime model.Time, now model.Time) bool { - userID := unsafeGetString(ref.UserID) - period := e.tenantsRetention.RetentionPeriodFor(userID, ref.Labels) +func (e *expirationChecker) DropFromIndex(userID []byte, _ Chunk, labels labels.Labels, tableEndTime model.Time, now model.Time) bool { + userIDStr := unsafeGetString(userID) + period := e.tenantsRetention.RetentionPeriodFor(userIDStr, labels) // The 0 value should disable retention if period <= 0 { return false @@ -84,6 +85,16 @@ func (e *expirationChecker) MarkPhaseStarted() { func (e *expirationChecker) MarkPhaseFailed() {} func (e *expirationChecker) MarkPhaseTimedOut() {} func (e *expirationChecker) MarkPhaseFinished() {} +func (e *expirationChecker) CanSkipSeries(userID []byte, lbls labels.Labels, _ []byte, seriesStart model.Time, _ string, now model.Time) bool { + userIDStr := unsafeGetString(userID) + period := e.tenantsRetention.RetentionPeriodFor(userIDStr, lbls) + // The 0 value should disable retention + if period <= 0 { + return true + } + + return now.Sub(seriesStart) < period +} func (e *expirationChecker) IntervalMayHaveExpiredChunks(interval model.Interval, userID string) bool { // when userID is empty, it means we are checking for common index table. In this case we use e.overallLatestRetentionStartTime. @@ -109,7 +120,7 @@ func NeverExpiringExpirationChecker(_ Limits) ExpirationChecker { type neverExpiringExpirationChecker struct{} -func (e *neverExpiringExpirationChecker) Expired(_ ChunkEntry, _ model.Time) (bool, filter.Func) { +func (e *neverExpiringExpirationChecker) Expired(_ []byte, _ Chunk, _ labels.Labels, _ []byte, _ string, _ model.Time) (bool, filter.Func) { return false, nil } func (e *neverExpiringExpirationChecker) IntervalMayHaveExpiredChunks(_ model.Interval, _ string) bool { @@ -119,9 +130,12 @@ func (e *neverExpiringExpirationChecker) MarkPhaseStarted() {} func (e *neverExpiringExpirationChecker) MarkPhaseFailed() {} func (e *neverExpiringExpirationChecker) MarkPhaseTimedOut() {} func (e *neverExpiringExpirationChecker) MarkPhaseFinished() {} -func (e *neverExpiringExpirationChecker) DropFromIndex(_ ChunkEntry, _ model.Time, _ model.Time) bool { +func (e *neverExpiringExpirationChecker) DropFromIndex(_ []byte, _ Chunk, _ labels.Labels, _ model.Time, _ model.Time) bool { return false } +func (e *neverExpiringExpirationChecker) CanSkipSeries(_ []byte, _ labels.Labels, _ []byte, _ model.Time, _ string, _ model.Time) bool { + return true +} type TenantsRetention struct { limits Limits diff --git a/pkg/compactor/retention/expiration_test.go b/pkg/compactor/retention/expiration_test.go index 09e04c4517..154b2eebc8 100644 --- a/pkg/compactor/retention/expiration_test.go +++ b/pkg/compactor/retention/expiration_test.go @@ -108,20 +108,22 @@ func Test_expirationChecker_Expired(t *testing.T) { e := NewExpirationChecker(o) tests := []struct { - name string - ref ChunkEntry - want bool + name string + userID string + labels string + chunk Chunk + want bool }{ - {"expired tenant", newChunkEntry("1", `{foo="buzz"}`, model.Now().Add(-3*time.Hour), model.Now().Add(-2*time.Hour)), true}, - {"just expired tenant", newChunkEntry("1", `{foo="buzz"}`, model.Now().Add(-3*time.Hour), model.Now().Add(-1*time.Hour+(10*time.Millisecond))), false}, - {"not expired tenant", newChunkEntry("1", `{foo="buzz"}`, model.Now().Add(-3*time.Hour), model.Now().Add(-30*time.Minute)), false}, - {"not expired tenant by far", newChunkEntry("2", `{foo="buzz"}`, model.Now().Add(-72*time.Hour), model.Now().Add(-3*time.Hour)), false}, - {"expired stream override", newChunkEntry("2", `{foo="bar"}`, model.Now().Add(-12*time.Hour), model.Now().Add(-10*time.Hour)), true}, - {"non expired stream override", newChunkEntry("1", `{foo="bar"}`, model.Now().Add(-3*time.Hour), model.Now().Add(-90*time.Minute)), false}, + {"expired tenant", "1", `{foo="buzz"}`, Chunk{From: model.Now().Add(-3 * time.Hour), Through: model.Now().Add(-2 * time.Hour)}, true}, + {"just expired tenant", "1", `{foo="buzz"}`, Chunk{From: model.Now().Add(-3 * time.Hour), Through: model.Now().Add(-1*time.Hour + (10 * time.Millisecond))}, false}, + {"not expired tenant", "1", `{foo="buzz"}`, Chunk{From: model.Now().Add(-3 * time.Hour), Through: model.Now().Add(-30 * time.Minute)}, false}, + {"not expired tenant by far", "2", `{foo="buzz"}`, Chunk{From: model.Now().Add(-72 * time.Hour), Through: model.Now().Add(-3 * time.Hour)}, false}, + {"expired stream override", "2", `{foo="bar"}`, Chunk{From: model.Now().Add(-12 * time.Hour), Through: model.Now().Add(-10 * time.Hour)}, true}, + {"non expired stream override", "1", `{foo="bar"}`, Chunk{From: model.Now().Add(-3 * time.Hour), Through: model.Now().Add(-90 * time.Minute)}, false}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - actual, nonDeletedIntervalFilters := e.Expired(tt.ref, model.Now()) + actual, nonDeletedIntervalFilters := e.Expired([]byte(tt.userID), tt.chunk, mustParseLabels(tt.labels), nil, "", model.Now()) require.Equal(t, tt.want, actual) require.Nil(t, nonDeletedIntervalFilters) }) @@ -183,18 +185,20 @@ func Test_expirationChecker_Expired_zeroValue(t *testing.T) { require.NoError(t, err) e := NewExpirationChecker(o) tests := []struct { - name string - ref ChunkEntry - want bool + name string + userID string + labels string + chunk Chunk + want bool }{ - {"tenant with no override should not delete", newChunkEntry("1", `{foo="buzz"}`, model.Now().Add(-3*time.Hour), model.Now().Add(-2*time.Hour)), false}, - {"tenant with no override, REALLY old chunk should not delete", newChunkEntry("1", `{foo="buzz"}`, model.Now().Add(-10000*time.Hour+(1*time.Hour)), model.Now().Add(-10000*time.Hour)), false}, - {"tenant with override chunk less than retention should not delete", newChunkEntry("2", `{foo="buzz"}`, model.Now().Add(-3*time.Hour), model.Now().Add(-2*time.Hour)), false}, - {"tenant with override should delete", newChunkEntry("2", `{foo="buzz"}`, model.Now().Add(-31*time.Hour), model.Now().Add(-30*time.Hour)), true}, + {"tenant with no override should not delete", "1", `{foo="buzz"}`, Chunk{From: model.Now().Add(-3 * time.Hour), Through: model.Now().Add(-2 * time.Hour)}, false}, + {"tenant with no override, REALLY old chunk should not delete", "1", `{foo="buzz"}`, Chunk{From: model.Now().Add(-10000*time.Hour + (1 * time.Hour)), Through: model.Now().Add(-10000 * time.Hour)}, false}, + {"tenant with override chunk less than retention should not delete", "2", `{foo="buzz"}`, Chunk{From: model.Now().Add(-3 * time.Hour), Through: model.Now().Add(-2 * time.Hour)}, false}, + {"tenant with override should delete", "2", `{foo="buzz"}`, Chunk{From: model.Now().Add(-31 * time.Hour), Through: model.Now().Add(-30 * time.Hour)}, true}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - actual, nonDeletedIntervalFilters := e.Expired(tt.ref, model.Now()) + actual, nonDeletedIntervalFilters := e.Expired([]byte(tt.userID), tt.chunk, mustParseLabels(tt.labels), nil, "", model.Now()) require.Equal(t, tt.want, actual) require.Nil(t, nonDeletedIntervalFilters) }) @@ -229,17 +233,19 @@ func Test_expirationChecker_Expired_zeroValueOverride(t *testing.T) { e := NewExpirationChecker(o) tests := []struct { - name string - ref ChunkEntry - want bool + name string + userID string + labels string + chunk Chunk + want bool }{ - {"tenant with no override should delete", newChunkEntry("1", `{foo="buzz"}`, model.Now().Add(-31*time.Hour), model.Now().Add(-30*time.Hour)), true}, - {"tenant with override should not delete", newChunkEntry("2", `{foo="buzz"}`, model.Now().Add(-31*time.Hour), model.Now().Add(-30*time.Hour)), false}, - {"tenant with zero value without unit should not delete", newChunkEntry("3", `{foo="buzz"}`, model.Now().Add(-31*time.Hour), model.Now().Add(-30*time.Hour)), false}, + {"tenant with no override should delete", "1", `{foo="buzz"}`, Chunk{From: model.Now().Add(-31 * time.Hour), Through: model.Now().Add(-30 * time.Hour)}, true}, + {"tenant with override should not delete", "2", `{foo="buzz"}`, Chunk{From: model.Now().Add(-31 * time.Hour), Through: model.Now().Add(-30 * time.Hour)}, false}, + {"tenant with zero value without unit should not delete", "3", `{foo="buzz"}`, Chunk{From: model.Now().Add(-31 * time.Hour), Through: model.Now().Add(-30 * time.Hour)}, false}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - actual, nonDeletedIntervalFilters := e.Expired(tt.ref, model.Now()) + actual, nonDeletedIntervalFilters := e.Expired([]byte(tt.userID), tt.chunk, mustParseLabels(tt.labels), nil, "", model.Now()) require.Equal(t, tt.want, actual) require.Nil(t, nonDeletedIntervalFilters) }) @@ -267,17 +273,55 @@ func Test_expirationChecker_DropFromIndex_zeroValue(t *testing.T) { chunkThrough := model.Now().Add(-2 * time.Hour) tests := []struct { name string - ref ChunkEntry + userID string + labels string + chunk Chunk tableEndTime model.Time want bool }{ - {"tenant with no override should not delete", newChunkEntry("1", `{foo="buzz"}`, chunkFrom, chunkThrough), model.Now().Add(-48 * time.Hour), false}, - {"tenant with override tableEndTime within retention period should not delete", newChunkEntry("2", `{foo="buzz"}`, chunkFrom, chunkThrough), model.Now().Add(-1 * time.Hour), false}, - {"tenant with override should delete", newChunkEntry("2", `{foo="buzz"}`, chunkFrom, chunkThrough), model.Now().Add(-48 * time.Hour), true}, + {"tenant with no override should not delete", "1", `{foo="buzz"}`, Chunk{From: chunkFrom, Through: chunkThrough}, model.Now().Add(-48 * time.Hour), false}, + {"tenant with override tableEndTime within retention period should not delete", "2", `{foo="buzz"}`, Chunk{From: chunkFrom, Through: chunkThrough}, model.Now().Add(-1 * time.Hour), false}, + {"tenant with override should delete", "2", `{foo="buzz"}`, Chunk{From: chunkFrom, Through: chunkThrough}, model.Now().Add(-48 * time.Hour), true}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - actual := e.DropFromIndex(tt.ref, tt.tableEndTime, model.Now()) + actual := e.DropFromIndex([]byte(tt.userID), tt.chunk, mustParseLabels(tt.labels), tt.tableEndTime, model.Now()) + require.Equal(t, tt.want, actual) + }) + } +} + +func Test_expirationChecker_CanSkipSeries(t *testing.T) { + // Default retention should be zero + d := defaultLimitsTestConfig() + + // Override tenant 2 to have 24 hour retention + tl := defaultLimitsTestConfig() + oneDay, _ := model.ParseDuration("24h") + tl.RetentionPeriod = oneDay + f := fakeOverrides{ + tenantLimits: map[string]*validation.Limits{ + "2": &tl, + }, + } + o, err := overridesTestConfig(d, f) + require.NoError(t, err) + e := NewExpirationChecker(o) + + tests := []struct { + name string + userID string + labels string + seriesStart model.Time + want bool + }{ + {"tenant with no override should skip series", "1", `{foo="buzz"}`, model.Now().Add(-48 * time.Hour), true}, + {"tenant with override, seriesStart within retention period should skip series", "2", `{foo="buzz"}`, model.Now().Add(-1 * time.Hour), true}, + {"tenant with override, seriesStart outside retention period should not skip series", "2", `{foo="buzz"}`, model.Now().Add(-48 * time.Hour), false}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + actual := e.CanSkipSeries([]byte(tt.userID), mustParseLabels(tt.labels), nil, tt.seriesStart, "", model.Now()) require.Equal(t, tt.want, actual) }) } diff --git a/pkg/compactor/retention/retention.go b/pkg/compactor/retention/retention.go index 1e7be7b81d..9cdfe0d85b 100644 --- a/pkg/compactor/retention/retention.go +++ b/pkg/compactor/retention/retention.go @@ -31,30 +31,57 @@ const ( MarkersFolder = "markers" ) -type ChunkRef struct { - UserID []byte - SeriesID []byte - ChunkID []byte - From model.Time - Through model.Time +type Chunk struct { + ChunkID []byte + From model.Time + Through model.Time } -func (c ChunkRef) String() string { - return fmt.Sprintf("UserID: %s , SeriesID: %s , Time: [%s,%s]", c.UserID, c.SeriesID, c.From, c.Through) +func (c Chunk) String() string { + return fmt.Sprintf("ChunkID: %s", c.ChunkID) } -type ChunkEntry struct { - ChunkRef - Labels labels.Labels +type Series struct { + seriesID, userID []byte + labels labels.Labels + chunks []Chunk } -type ChunkEntryCallback func(ChunkEntry) (deleteChunk bool, err error) +func (s *Series) SeriesID() []byte { + return s.seriesID +} + +func (s *Series) UserID() []byte { + return s.userID +} + +func (s *Series) Labels() labels.Labels { + return s.labels +} + +func (s *Series) Chunks() []Chunk { + return s.chunks +} -type ChunkIterator interface { - ForEachChunk(ctx context.Context, callback ChunkEntryCallback) error +func (s *Series) Reset(seriesID, userID []byte, labels labels.Labels) { + s.seriesID = seriesID + s.userID = userID + s.labels = labels + s.chunks = s.chunks[:0] } -type SeriesCleaner interface { +func (s *Series) AppendChunks(ref ...Chunk) { + s.chunks = append(s.chunks, ref...) +} + +type SeriesCallback func(series Series) (err error) + +type SeriesIterator interface { + ForEachSeries(ctx context.Context, callback SeriesCallback) error +} + +type IndexCleaner interface { + RemoveChunk(from, through model.Time, userID []byte, labels labels.Labels, chunkID []byte) error // CleanupSeries is for cleaning up the series that do have any chunks left in the index. // It would only be called for the series that have all their chunks deleted without adding new ones. CleanupSeries(userID []byte, lbls labels.Labels) error @@ -70,9 +97,9 @@ type chunkIndexer interface { } type IndexProcessor interface { - ChunkIterator + SeriesIterator chunkIndexer - SeriesCleaner + IndexCleaner } var errNoChunksFound = errors.New("no chunks found in table, please check if there are really no chunks and manually drop the table or " + @@ -176,57 +203,84 @@ func markForDelete( iterCtx, cancel := ctxForTimeout(timeout) defer cancel() - err := indexFile.ForEachChunk(iterCtx, func(c ChunkEntry) (bool, error) { + err := indexFile.ForEachSeries(iterCtx, func(s Series) error { + chunks := s.Chunks() + if len(chunks) == 0 { + // add the series to series map so that it gets cleaned up + seriesMap.Add(s.SeriesID(), s.UserID(), s.Labels()) + return nil + } + chunksFound = true - seriesMap.Add(c.SeriesID, c.UserID, c.Labels) - - // see if the chunk is deleted completely or partially - if expired, filterFunc := expiration.Expired(c, now); expired { - linesDeleted := true // tracks whether we deleted at least some data from the chunk - if filterFunc != nil { - wroteChunks := false - var err error - wroteChunks, linesDeleted, err = chunkRewriter.rewriteChunk(ctx, c, tableInterval, filterFunc) - if err != nil { - return false, fmt.Errorf("failed to rewrite chunk %s with error %s", c.ChunkID, err) - } + seriesStart := chunks[0].From + for i := 0; i < len(chunks); i++ { + if chunks[i].From < seriesStart { + seriesStart = chunks[i].From + } + } + + if expiration.CanSkipSeries(s.UserID(), s.labels, s.SeriesID(), seriesStart, tableName, now) { + empty = false + return nil + } + seriesMap.Add(s.SeriesID(), s.UserID(), s.Labels()) + + for i := 0; i < len(chunks) && iterCtx.Err() == nil; i++ { + c := chunks[i] + // see if the chunk is deleted completely or partially + if expired, filterFunc := expiration.Expired(s.UserID(), c, s.Labels(), s.SeriesID(), tableName, now); expired { + linesDeleted := true // tracks whether we deleted at least some data from the chunk + if filterFunc != nil { + wroteChunks := false + var err error + wroteChunks, linesDeleted, err = chunkRewriter.rewriteChunk(ctx, s.UserID(), c, tableInterval, filterFunc) + if err != nil { + return fmt.Errorf("failed to rewrite chunk %s with error %s", c.ChunkID, err) + } - if wroteChunks { - // we have re-written chunk to the storage so the table won't be empty and the series are still being referred. - empty = false - seriesMap.MarkSeriesNotDeleted(c.SeriesID, c.UserID) + if wroteChunks { + // we have re-written chunk to the storage so the table won't be empty and the series are still being referred. + empty = false + seriesMap.MarkSeriesNotDeleted(s.SeriesID(), s.UserID()) + } } - } - if linesDeleted { - modified = true + if linesDeleted { + modified = true - // Mark the chunk for deletion only if it is completely deleted, or this is the last table that the chunk is index in. - // For a partially deleted chunk, if we delete the source chunk before all the tables which index it are processed then - // the retention would fail because it would fail to find it in the storage. - if filterFunc == nil || c.From >= tableInterval.Start { - if err := marker.Put(c.ChunkID); err != nil { - return false, err + // Mark the chunk for deletion only if it is completely deleted, or this is the last table that the chunk is index in. + // For a partially deleted chunk, if we delete the source chunk before all the tables which index it are processed then + // the retention would fail because it would fail to find it in the storage. + if filterFunc == nil || c.From >= tableInterval.Start { + if err := marker.Put(c.ChunkID); err != nil { + return err + } + } + if err := indexFile.RemoveChunk(c.From, c.Through, s.UserID(), s.Labels(), c.ChunkID); err != nil { + return fmt.Errorf("failed to remove chunk %s from index with error %s", c.ChunkID, err) } + continue } - return true, nil } - } - // The chunk is not deleted, now see if we can drop its index entry based on end time from tableInterval. - // If chunk end time is after the end time of tableInterval, it means the chunk would also be indexed in the next table. - // We would now check if the end time of the tableInterval is out of retention period so that - // we can drop the chunk entry from this table without removing the chunk from the store. - if c.Through.After(tableInterval.End) { - if expiration.DropFromIndex(c, tableInterval.End, now) { - modified = true - return true, nil + // The chunk is not deleted, now see if we can drop its index entry based on end time from tableInterval. + // If chunk end time is after the end time of tableInterval, it means the chunk would also be indexed in the next table. + // We would now check if the end time of the tableInterval is out of retention period so that + // we can drop the chunk entry from this table without removing the chunk from the store. + if c.Through.After(tableInterval.End) { + if expiration.DropFromIndex(s.UserID(), c, nil, tableInterval.End, now) { + modified = true + if err := indexFile.RemoveChunk(c.From, c.Through, s.UserID(), s.Labels(), c.ChunkID); err != nil { + return fmt.Errorf("failed to remove chunk %s from index with error %s", c.ChunkID, err) + } + continue + } } - } - empty = false - seriesMap.MarkSeriesNotDeleted(c.SeriesID, c.UserID) - return false, nil + empty = false + seriesMap.MarkSeriesNotDeleted(s.SeriesID(), s.UserID()) + } + return iterCtx.Err() }) if err != nil { if errors.Is(err, context.DeadlineExceeded) && errors.Is(iterCtx.Err(), context.DeadlineExceeded) { @@ -366,11 +420,11 @@ func newChunkRewriter(chunkClient client.Client, tableName string, chunkIndexer // If the newChunk is different, linesDeleted would be true. // The newChunk is indexed and uploaded only if it belongs to the current index table being processed, // the status of which is set to wroteChunks. -func (c *chunkRewriter) rewriteChunk(ctx context.Context, ce ChunkEntry, tableInterval model.Interval, filterFunc filter.Func) (wroteChunks bool, linesDeleted bool, err error) { - userID := unsafeGetString(ce.UserID) +func (c *chunkRewriter) rewriteChunk(ctx context.Context, userID []byte, ce Chunk, tableInterval model.Interval, filterFunc filter.Func) (wroteChunks bool, linesDeleted bool, err error) { + userIDStr := unsafeGetString(userID) chunkID := unsafeGetString(ce.ChunkID) - chk, err := chunk.ParseExternalKey(userID, chunkID) + chk, err := chunk.ParseExternalKey(userIDStr, chunkID) if err != nil { return false, false, err } @@ -381,7 +435,7 @@ func (c *chunkRewriter) rewriteChunk(ctx context.Context, ce ChunkEntry, tableIn } if len(chks) != 1 { - return false, false, fmt.Errorf("expected 1 entry for chunk %s but found %d in storage", chunkID, len(chks)) + return false, false, fmt.Errorf("expected 1 entry for chunk %s but found %d in storage", ce.ChunkID, len(chks)) } newChunkData, err := chks[0].Data.Rebound(ce.From, ce.Through, func(ts time.Time, s string, structuredMetadata ...labels.Label) bool { @@ -394,7 +448,7 @@ func (c *chunkRewriter) rewriteChunk(ctx context.Context, ce ChunkEntry, tableIn }) if err != nil { if errors.Is(err, chunk.ErrSliceNoDataInRange) { - level.Info(util_log.Logger).Log("msg", "Delete request filterFunc leaves an empty chunk", "chunk ref", string(ce.ChunkRef.ChunkID)) + level.Info(util_log.Logger).Log("msg", "Delete request filterFunc leaves an empty chunk", "chunk ref", string(ce.ChunkID)) return false, true, nil } return false, false, err @@ -418,7 +472,7 @@ func (c *chunkRewriter) rewriteChunk(ctx context.Context, ce ChunkEntry, tableIn } newChunk := chunk.NewChunk( - userID, chks[0].FingerprintModel(), chks[0].Metric, + userIDStr, chks[0].FingerprintModel(), chks[0].Metric, facade, newChunkStart, newChunkEnd, diff --git a/pkg/compactor/retention/retention_test.go b/pkg/compactor/retention/retention_test.go index cdc7ef61db..e14097f44b 100644 --- a/pkg/compactor/retention/retention_test.go +++ b/pkg/compactor/retention/retention_test.go @@ -259,10 +259,18 @@ func Test_EmptyTable(t *testing.T) { tables := store.indexTables() require.Len(t, tables, 1) + + // disabled retention should not do anything to the table + empty, modified, err := markForDelete(context.Background(), 0, tables[0].name, &noopWriter{}, tables[0], NewExpirationChecker(&fakeLimits{}), nil, util_log.Logger) + require.NoError(t, err) + require.False(t, empty) + require.False(t, modified) + // Set a very low retention to make sure all chunks are marked for deletion which will create an empty table. - empty, _, err := markForDelete(context.Background(), 0, tables[0].name, &noopWriter{}, tables[0], NewExpirationChecker(&fakeLimits{perTenant: map[string]retentionLimit{"1": {retentionPeriod: time.Second}, "2": {retentionPeriod: time.Second}}}), nil, util_log.Logger) + empty, modified, err = markForDelete(context.Background(), 0, tables[0].name, &noopWriter{}, tables[0], NewExpirationChecker(&fakeLimits{perTenant: map[string]retentionLimit{"1": {retentionPeriod: time.Second}, "2": {retentionPeriod: time.Second}}}), nil, util_log.Logger) require.NoError(t, err) require.True(t, empty) + require.True(t, modified) _, _, err = markForDelete(context.Background(), 0, tables[0].name, &noopWriter{}, newTable("test"), NewExpirationChecker(&fakeLimits{}), nil, util_log.Logger) require.Equal(t, err, errNoChunksFound) @@ -575,7 +583,11 @@ func TestChunkRewriter(t *testing.T) { for _, indexTable := range indexTables { cr := newChunkRewriter(store.chunkClient, indexTable.name, indexTable) - wroteChunks, linesDeleted, err := cr.rewriteChunk(context.Background(), entryFromChunk(tt.chunk), ExtractIntervalFromTableName(indexTable.name), tt.filterFunc) + wroteChunks, linesDeleted, err := cr.rewriteChunk(context.Background(), []byte(tt.chunk.UserID), Chunk{ + ChunkID: []byte(getChunkID(tt.chunk.ChunkRef)), + From: tt.chunk.From, + Through: tt.chunk.Through, + }, ExtractIntervalFromTableName(indexTable.name), tt.filterFunc) require.NoError(t, err) require.Equal(t, tt.expectedRespByTables[indexTable.name].mustDeleteLines, linesDeleted) require.Equal(t, tt.expectedRespByTables[indexTable.name].mustRewriteChunk, wroteChunks) @@ -657,25 +669,26 @@ type chunkExpiry struct { type mockExpirationChecker struct { ExpirationChecker - chunksExpiry map[string]chunkExpiry - delay time.Duration - calls int - timedOut bool + chunksExpiry map[string]chunkExpiry + skipSeries map[string]bool + delay time.Duration + numExpiryChecks int + timedOut bool } -func newMockExpirationChecker(chunksExpiry map[string]chunkExpiry) *mockExpirationChecker { - return &mockExpirationChecker{chunksExpiry: chunksExpiry} +func newMockExpirationChecker(chunksExpiry map[string]chunkExpiry, skipSeries map[string]bool) *mockExpirationChecker { + return &mockExpirationChecker{chunksExpiry: chunksExpiry, skipSeries: skipSeries} } -func (m *mockExpirationChecker) Expired(ref ChunkEntry, _ model.Time) (bool, filter.Func) { +func (m *mockExpirationChecker) Expired(_ []byte, chk Chunk, _ labels.Labels, _ []byte, _ string, _ model.Time) (bool, filter.Func) { time.Sleep(m.delay) - m.calls++ + m.numExpiryChecks++ - ce := m.chunksExpiry[string(ref.ChunkID)] + ce := m.chunksExpiry[string(chk.ChunkID)] return ce.isExpired, ce.filterFunc } -func (m *mockExpirationChecker) DropFromIndex(_ ChunkEntry, _ model.Time, _ model.Time) bool { +func (m *mockExpirationChecker) DropFromIndex(_ []byte, _ Chunk, _ labels.Labels, _ model.Time, _ model.Time) bool { return false } @@ -683,6 +696,10 @@ func (m *mockExpirationChecker) MarkPhaseTimedOut() { m.timedOut = true } +func (m *mockExpirationChecker) CanSkipSeries(_ []byte, lbls labels.Labels, _ []byte, _ model.Time, _ string, _ model.Time) bool { + return m.skipSeries[lbls.String()] +} + func TestMarkForDelete_SeriesCleanup(t *testing.T) { now := model.Now() schema := allSchemas[2] @@ -690,13 +707,15 @@ func TestMarkForDelete_SeriesCleanup(t *testing.T) { todaysTableInterval := ExtractIntervalFromTableName(schema.config.IndexTables.TableFor(now)) for _, tc := range []struct { - name string - chunks []chunk.Chunk - expiry []chunkExpiry - expectedDeletedSeries []map[uint64]struct{} - expectedEmpty []bool - expectedModified []bool - numChunksDeleted []int64 + name string + chunks []chunk.Chunk + expiry []chunkExpiry + skipSeries map[string]bool + expectedDeletedSeries []map[uint64]struct{} + expectedEmpty []bool + expectedModified []bool + numChunksDeleted []int64 + numExpectedExpiryChecks int }{ { name: "no chunk and series deleted", @@ -720,6 +739,7 @@ func TestMarkForDelete_SeriesCleanup(t *testing.T) { numChunksDeleted: []int64{ 0, }, + numExpectedExpiryChecks: 1, }, { name: "chunk deleted with filter but no lines matching", @@ -746,6 +766,7 @@ func TestMarkForDelete_SeriesCleanup(t *testing.T) { numChunksDeleted: []int64{ 0, }, + numExpectedExpiryChecks: 1, }, { name: "only one chunk in store which gets deleted", @@ -769,6 +790,7 @@ func TestMarkForDelete_SeriesCleanup(t *testing.T) { numChunksDeleted: []int64{ 1, }, + numExpectedExpiryChecks: 1, }, { name: "only one chunk in store which gets partially deleted", @@ -800,6 +822,7 @@ func TestMarkForDelete_SeriesCleanup(t *testing.T) { numChunksDeleted: []int64{ 1, }, + numExpectedExpiryChecks: 1, }, { name: "one of two chunks deleted", @@ -827,6 +850,65 @@ func TestMarkForDelete_SeriesCleanup(t *testing.T) { numChunksDeleted: []int64{ 1, }, + numExpectedExpiryChecks: 2, + }, + { + name: "one of two series skipped", + chunks: []chunk.Chunk{ + createChunk(t, userID, labels.Labels{labels.Label{Name: "foo", Value: "1"}}, todaysTableInterval.Start, todaysTableInterval.Start.Add(30*time.Minute)), + createChunk(t, userID, labels.Labels{labels.Label{Name: "foo", Value: "2"}}, todaysTableInterval.Start, todaysTableInterval.Start.Add(30*time.Minute)), + }, + skipSeries: map[string]bool{`{foo="1"}`: true}, + expiry: []chunkExpiry{ + { + isExpired: false, + }, + { + isExpired: true, + }, + }, + expectedDeletedSeries: []map[uint64]struct{}{ + {labels.Labels{labels.Label{Name: "foo", Value: "2"}}.Hash(): struct{}{}}, + }, + expectedEmpty: []bool{ + false, + }, + expectedModified: []bool{ + true, + }, + numChunksDeleted: []int64{ + 1, + }, + numExpectedExpiryChecks: 1, + }, + { + name: "all series skipped", + chunks: []chunk.Chunk{ + createChunk(t, userID, labels.Labels{labels.Label{Name: "foo", Value: "1"}}, todaysTableInterval.Start, todaysTableInterval.Start.Add(30*time.Minute)), + createChunk(t, userID, labels.Labels{labels.Label{Name: "foo", Value: "2"}}, todaysTableInterval.Start, todaysTableInterval.Start.Add(30*time.Minute)), + }, + skipSeries: map[string]bool{`{foo="1"}`: true, `{foo="2"}`: true}, + expiry: []chunkExpiry{ + { + isExpired: false, + }, + { + isExpired: false, + }, + }, + expectedDeletedSeries: []map[uint64]struct{}{ + nil, + }, + expectedEmpty: []bool{ + false, + }, + expectedModified: []bool{ + false, + }, + numChunksDeleted: []int64{ + 0, + }, + numExpectedExpiryChecks: 0, }, { name: "one of two chunks partially deleted", @@ -862,6 +944,7 @@ func TestMarkForDelete_SeriesCleanup(t *testing.T) { numChunksDeleted: []int64{ 1, }, + numExpectedExpiryChecks: 2, }, { name: "one big chunk partially deleted for yesterdays table without rewrite", @@ -888,6 +971,7 @@ func TestMarkForDelete_SeriesCleanup(t *testing.T) { numChunksDeleted: []int64{ 1, 0, }, + numExpectedExpiryChecks: 2, }, { name: "one big chunk partially deleted for yesterdays table with rewrite", @@ -914,6 +998,7 @@ func TestMarkForDelete_SeriesCleanup(t *testing.T) { numChunksDeleted: []int64{ 1, 0, }, + numExpectedExpiryChecks: 2, }, } { t.Run(tc.name, func(t *testing.T) { @@ -925,7 +1010,7 @@ func TestMarkForDelete_SeriesCleanup(t *testing.T) { chunksExpiry[getChunkID(chunk.ChunkRef)] = tc.expiry[i] } - expirationChecker := newMockExpirationChecker(chunksExpiry) + expirationChecker := newMockExpirationChecker(chunksExpiry, tc.skipSeries) store.Stop() @@ -945,6 +1030,8 @@ func TestMarkForDelete_SeriesCleanup(t *testing.T) { require.EqualValues(t, tc.expectedDeletedSeries[i], seriesCleanRecorder.deletedSeries[userID]) } + + require.Equal(t, tc.numExpectedExpiryChecks, expirationChecker.numExpiryChecks) }) } } @@ -967,7 +1054,7 @@ func TestDeleteTimeout(t *testing.T) { require.NoError(t, store.Put(context.TODO(), chunks)) store.Stop() - expirationChecker := newMockExpirationChecker(map[string]chunkExpiry{}) + expirationChecker := newMockExpirationChecker(map[string]chunkExpiry{}, nil) expirationChecker.delay = 10 * time.Millisecond table := store.indexTables()[0] @@ -985,7 +1072,7 @@ func TestDeleteTimeout(t *testing.T) { require.NoError(t, err) require.False(t, empty) require.False(t, isModified) - require.Equal(t, tc.calls, expirationChecker.calls) + require.Equal(t, tc.calls, expirationChecker.numExpiryChecks) require.Equal(t, tc.timedOut, expirationChecker.timedOut) } } diff --git a/pkg/compactor/retention/series.go b/pkg/compactor/retention/series.go index e81d80a99e..b0d6b8d7f4 100644 --- a/pkg/compactor/retention/series.go +++ b/pkg/compactor/retention/series.go @@ -9,7 +9,7 @@ type userSeries struct { seriesIDLen int } -func newUserSeries(seriesID []byte, userID []byte) userSeries { +func newUserSeries(seriesID, userID []byte) userSeries { key := make([]byte, 0, len(seriesID)+len(userID)) key = append(key, seriesID...) key = append(key, userID...) @@ -31,16 +31,6 @@ func (us userSeries) UserID() []byte { return us.key[us.seriesIDLen:] } -func (us *userSeries) Reset(seriesID []byte, userID []byte) { - if us.key == nil { - us.key = make([]byte, 0, len(seriesID)+len(userID)) - } - us.key = us.key[:0] - us.key = append(us.key, seriesID...) - us.key = append(us.key, userID...) - us.seriesIDLen = len(seriesID) -} - type userSeriesInfo struct { userSeries isDeleted bool @@ -53,7 +43,7 @@ func newUserSeriesMap() userSeriesMap { return make(userSeriesMap) } -func (u userSeriesMap) Add(seriesID []byte, userID []byte, lbls labels.Labels) { +func (u userSeriesMap) Add(seriesID, userID []byte, lbls labels.Labels) { us := newUserSeries(seriesID, userID) if _, ok := u[us.Key()]; ok { return @@ -67,7 +57,7 @@ func (u userSeriesMap) Add(seriesID []byte, userID []byte, lbls labels.Labels) { } // MarkSeriesNotDeleted is used to mark series not deleted when it still has some chunks left in the store -func (u userSeriesMap) MarkSeriesNotDeleted(seriesID []byte, userID []byte) { +func (u userSeriesMap) MarkSeriesNotDeleted(seriesID, userID []byte) { us := newUserSeries(seriesID, userID) usi := u[us.Key()] usi.isDeleted = false diff --git a/pkg/compactor/retention/util_test.go b/pkg/compactor/retention/util_test.go index 3597a11565..f30713fc04 100644 --- a/pkg/compactor/retention/util_test.go +++ b/pkg/compactor/retention/util_test.go @@ -117,51 +117,50 @@ var ( sweepMetrics = newSweeperMetrics(prometheus.DefaultRegisterer) ) -func newChunkEntry(userID, labels string, from, through model.Time) ChunkEntry { +func mustParseLabels(labels string) labels.Labels { lbs, err := syntax.ParseLabels(labels) if err != nil { panic(err) } - return ChunkEntry{ - ChunkRef: ChunkRef{ - UserID: []byte(userID), - SeriesID: labelsSeriesID(lbs), - From: from, - Through: through, - }, - Labels: lbs, - } + + return lbs } type table struct { name string - chunks map[string][]chunk.Chunk + chunks map[string]map[string][]chunk.Chunk } -func (t *table) ForEachChunk(ctx context.Context, callback ChunkEntryCallback) error { - for userID, chks := range t.chunks { - i := 0 - for j := 0; j < len(chks) && ctx.Err() == nil; j++ { - chk := chks[j] - deleteChunk, err := callback(entryFromChunk(chk)) - if err != nil { - return err +func (t *table) ForEachSeries(ctx context.Context, callback SeriesCallback) error { + for userID := range t.chunks { + for seriesID := range t.chunks[userID] { + chunks := make([]Chunk, 0, len(t.chunks[userID][seriesID])) + for _, chk := range t.chunks[userID][seriesID] { + chunks = append(chunks, Chunk{ + ChunkID: []byte(getChunkID(chk.ChunkRef)), + From: chk.From, + Through: chk.Through, + }) } - - if !deleteChunk { - t.chunks[userID][i] = chk - i++ + series := Series{} + series.Reset( + []byte(seriesID), + []byte(userID), + labels.NewBuilder(t.chunks[userID][seriesID][0].Metric).Del(labels.MetricName).Labels(), + ) + series.AppendChunks(chunks...) + if err := callback(series); err != nil { + return err } } - - t.chunks[userID] = t.chunks[userID][:i] } return ctx.Err() } func (t *table) IndexChunk(chunk chunk.Chunk) (bool, error) { - t.chunks[chunk.UserID] = append(t.chunks[chunk.UserID], chunk) + seriesID := string(labelsSeriesID(chunk.Metric)) + t.chunks[chunk.UserID][seriesID] = append(t.chunks[chunk.UserID][seriesID], chunk) return true, nil } @@ -169,19 +168,34 @@ func (t *table) CleanupSeries(_ []byte, _ labels.Labels) error { return nil } +func (t *table) RemoveChunk(_, _ model.Time, userID []byte, lbls labels.Labels, chunkID []byte) error { + seriesID := string(labelsSeriesID(labels.NewBuilder(lbls).Set(labels.MetricName, "logs").Labels())) + for i, chk := range t.chunks[string(userID)][seriesID] { + if getChunkID(chk.ChunkRef) == string(chunkID) { + t.chunks[string(userID)][seriesID] = append(t.chunks[string(userID)][seriesID][:i], t.chunks[string(userID)][seriesID][i+1:]...) + } + } + + return nil +} + func newTable(name string) *table { return &table{ name: name, - chunks: map[string][]chunk.Chunk{}, + chunks: map[string]map[string][]chunk.Chunk{}, } } func (t *table) Put(chk chunk.Chunk) { if _, ok := t.chunks[chk.UserID]; !ok { - t.chunks[chk.UserID] = []chunk.Chunk{} + t.chunks[chk.UserID] = make(map[string][]chunk.Chunk) + } + seriesID := string(labelsSeriesID(chk.Metric)) + if _, ok := t.chunks[chk.UserID][seriesID]; !ok { + t.chunks[chk.UserID][seriesID] = []chunk.Chunk{} } - t.chunks[chk.UserID] = append(t.chunks[chk.UserID], chk) + t.chunks[chk.UserID][seriesID] = append(t.chunks[chk.UserID][seriesID], chk) } func (t *table) GetChunks(userID string, from, through model.Time, metric labels.Labels) []chunk.Chunk { @@ -191,11 +205,13 @@ func (t *table) GetChunks(userID string, from, through model.Time, metric labels matchers = append(matchers, labels.MustNewMatcher(labels.MatchEqual, l.Name, l.Value)) } - for _, chk := range t.chunks[userID] { - if chk.From > through || chk.Through < from || !allMatch(matchers, chk.Metric) { - continue + for seriesID := range t.chunks[userID] { + for _, chk := range t.chunks[userID][seriesID] { + if chk.From > through || chk.Through < from || !allMatch(matchers, chk.Metric) { + continue + } + chunks = append(chunks, chk) } - chunks = append(chunks, chk) } return chunks @@ -311,19 +327,6 @@ func (t *testStore) GetChunks(userID string, from, through model.Time, metric la return fetchedChunk } -func entryFromChunk(c chunk.Chunk) ChunkEntry { - return ChunkEntry{ - ChunkRef: ChunkRef{ - UserID: []byte(c.UserID), - SeriesID: labelsSeriesID(c.Metric), - ChunkID: []byte(getChunkID(c.ChunkRef)), - From: c.From, - Through: c.Through, - }, - Labels: labels.NewBuilder(c.Metric).Del(labels.MetricName).Labels(), - } -} - func getChunkID(c logproto.ChunkRef) string { return schemaCfg.ExternalKey(c) } diff --git a/pkg/compactor/testutil.go b/pkg/compactor/testutil.go index feba141ba5..c60c98fd94 100644 --- a/pkg/compactor/testutil.go +++ b/pkg/compactor/testutil.go @@ -14,6 +14,7 @@ import ( "github.com/go-kit/log" "github.com/klauspost/compress/gzip" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" @@ -159,7 +160,7 @@ func openCompactedIndex(path string) (*compactedIndex, error) { return &compactedIndex{indexFile: idxFile}, nil } -func (c compactedIndex) ForEachChunk(_ context.Context, _ retention.ChunkEntryCallback) error { +func (c compactedIndex) ForEachSeries(_ context.Context, _ retention.SeriesCallback) error { return nil } @@ -171,6 +172,10 @@ func (c compactedIndex) CleanupSeries(_ []byte, _ labels.Labels) error { return nil } +func (c compactedIndex) RemoveChunk(_, _ model.Time, _ []byte, _ labels.Labels, _ []byte) error { + return nil +} + func (c compactedIndex) Cleanup() { _ = c.indexFile.Close() } diff --git a/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/compacted_index.go b/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/compacted_index.go index cb73f9aa95..2ba43fe602 100644 --- a/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/compacted_index.go +++ b/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/compacted_index.go @@ -9,6 +9,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "go.etcd.io/bbolt" @@ -136,7 +137,7 @@ func (c *CompactedIndex) setupIndexProcessors() error { return nil } -func (c *CompactedIndex) ForEachChunk(ctx context.Context, callback retention.ChunkEntryCallback) error { +func (c *CompactedIndex) ForEachSeries(ctx context.Context, callback retention.SeriesCallback) error { if err := c.setupIndexProcessors(); err != nil { return err } @@ -146,7 +147,7 @@ func (c *CompactedIndex) ForEachChunk(ctx context.Context, callback retention.Ch return fmt.Errorf("required boltdb bucket not found") } - return ForEachChunk(ctx, bucket, c.periodConfig, callback) + return ForEachSeries(ctx, bucket, c.periodConfig, callback) } func (c *CompactedIndex) IndexChunk(chunk chunk.Chunk) (bool, error) { @@ -165,6 +166,14 @@ func (c *CompactedIndex) CleanupSeries(userID []byte, lbls labels.Labels) error return c.seriesCleaner.CleanupSeries(userID, lbls) } +func (c *CompactedIndex) RemoveChunk(from, through model.Time, userID []byte, labels labels.Labels, chunkID []byte) error { + if err := c.setupIndexProcessors(); err != nil { + return err + } + + return c.seriesCleaner.RemoveChunk(from, through, userID, labels, chunkID) +} + func (c *CompactedIndex) ToIndexFile() (shipperindex.Index, error) { if c.boltdbTx != nil { err := c.boltdbTx.Commit() diff --git a/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/compacted_index_test.go b/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/compacted_index_test.go index 433835e0ff..36fedee719 100644 --- a/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/compacted_index_test.go +++ b/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/compacted_index_test.go @@ -47,18 +47,23 @@ func TestCompactedIndex_IndexProcessor(t *testing.T) { // remove c1, c2 chunk and index c4 with same labels as c2 c4 := createChunk(t, chunkfmt, headfmt, "2", labels.Labels{labels.Label{Name: "foo", Value: "bar"}, labels.Label{Name: "fizz", Value: "buzz"}}, tt.from, tt.from.Add(30*time.Minute)) - err = compactedIndex.ForEachChunk(context.Background(), func(entry retention.ChunkEntry) (deleteChunk bool, err error) { - if entry.Labels.Get("fizz") == "buzz" { + err = compactedIndex.ForEachSeries(context.Background(), func(series retention.Series) (err error) { + if series.Labels().Get("fizz") == "buzz" { chunkIndexed, err := compactedIndex.IndexChunk(c4) require.NoError(t, err) require.True(t, chunkIndexed) } - return entry.Labels.Get("foo") == "bar", nil + if series.Labels().Get("foo") == "bar" { + for _, chk := range series.Chunks() { + require.NoError(t, compactedIndex.RemoveChunk(chk.From, chk.Through, series.UserID(), series.Labels(), chk.ChunkID)) + } + } + return nil }) require.NoError(t, err) // remove series for c1 since all its chunks are deleted - err = compactedIndex.CleanupSeries(entryFromChunk(testSchema, c1).UserID, c1.Metric) + err = compactedIndex.CleanupSeries([]byte(c1.UserID), c1.Metric) require.NoError(t, err) indexFile, err := compactedIndex.ToIndexFile() @@ -74,7 +79,7 @@ func TestCompactedIndex_IndexProcessor(t *testing.T) { err = modifiedBoltDB.View(func(tx *bbolt.Tx) error { return tx.Bucket(local.IndexBucketName).ForEach(func(k, _ []byte) error { - c1SeriesID := entryFromChunk(testSchema, c1).SeriesID + c1SeriesID := labelsSeriesID(c1.Metric) series, ok, err := parseLabelIndexSeriesID(decodeKey(k)) if !ok { return nil @@ -92,15 +97,15 @@ func TestCompactedIndex_IndexProcessor(t *testing.T) { }) require.NoError(t, err) - expectedChunkEntries := []retention.ChunkEntry{ - entryFromChunk(testSchema, c3), - entryFromChunk(testSchema, c4), + expectedChunkEntries := []retention.Chunk{ + retentionChunkFromChunk(testSchema, c3), + retentionChunkFromChunk(testSchema, c4), } - chunkEntriesFound := []retention.ChunkEntry{} + var chunkEntriesFound []retention.Chunk err = modifiedBoltDB.View(func(tx *bbolt.Tx) error { - return ForEachChunk(context.Background(), tx.Bucket(local.IndexBucketName), tt.config, func(entry retention.ChunkEntry) (deleteChunk bool, err error) { - chunkEntriesFound = append(chunkEntriesFound, entry) - return false, nil + return ForEachSeries(context.Background(), tx.Bucket(local.IndexBucketName), tt.config, func(series retention.Series) (err error) { + chunkEntriesFound = append(chunkEntriesFound, series.Chunks()...) + return nil }) }) require.NoError(t, err) diff --git a/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/index.go b/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/index.go index 73e87e06e1..3e4826cc09 100644 --- a/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/index.go +++ b/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/index.go @@ -7,8 +7,6 @@ import ( "strconv" "github.com/prometheus/common/model" - - "github.com/grafana/loki/v3/pkg/compactor/retention" ) const ( @@ -39,36 +37,44 @@ func (e InvalidIndexKeyError) Is(target error) bool { return target == ErrInvalidIndexKey } -func parseChunkRef(hashKey, rangeKey []byte) (retention.ChunkRef, bool, error) { +type chunkRef struct { + UserID []byte + SeriesID []byte + ChunkID []byte + From model.Time + Through model.Time +} + +func parseChunkRef(hashKey, rangeKey []byte) (chunkRef, bool, error) { componentsRef := getComponents() defer putComponents(componentsRef) components := componentsRef.components components = decodeRangeKey(rangeKey, components) if len(components) == 0 { - return retention.ChunkRef{}, false, newInvalidIndexKeyError(hashKey, rangeKey) + return chunkRef{}, false, newInvalidIndexKeyError(hashKey, rangeKey) } keyType := components[len(components)-1] if len(keyType) == 0 || keyType[0] != chunkTimeRangeKeyV3 { - return retention.ChunkRef{}, false, nil + return chunkRef{}, false, nil } chunkID := components[len(components)-2] userID, hexFrom, hexThrough, ok := parseChunkID(chunkID) if !ok { - return retention.ChunkRef{}, false, newInvalidIndexKeyError(hashKey, rangeKey) + return chunkRef{}, false, newInvalidIndexKeyError(hashKey, rangeKey) } from, err := strconv.ParseInt(unsafeGetString(hexFrom), 16, 64) if err != nil { - return retention.ChunkRef{}, false, err + return chunkRef{}, false, err } through, err := strconv.ParseInt(unsafeGetString(hexThrough), 16, 64) if err != nil { - return retention.ChunkRef{}, false, err + return chunkRef{}, false, err } - return retention.ChunkRef{ + return chunkRef{ UserID: userID, SeriesID: seriesFromHash(hashKey), From: model.Time(from), diff --git a/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/iterator.go b/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/iterator.go index 7b2422fdc1..f504a3418a 100644 --- a/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/iterator.go +++ b/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/iterator.go @@ -1,6 +1,7 @@ package compactor import ( + "bytes" "context" "fmt" @@ -19,17 +20,17 @@ const ( ) var ( - _ retention.SeriesCleaner = &seriesCleaner{} + _ retention.IndexCleaner = &seriesCleaner{} ) -func ForEachChunk(ctx context.Context, bucket *bbolt.Bucket, config config.PeriodConfig, callback retention.ChunkEntryCallback) error { +func ForEachSeries(ctx context.Context, bucket *bbolt.Bucket, config config.PeriodConfig, callback retention.SeriesCallback) error { labelsMapper, err := newSeriesLabelsMapper(bucket, config) if err != nil { return err } cursor := bucket.Cursor() - var current retention.ChunkEntry + var current retention.Series for key, _ := cursor.First(); key != nil && ctx.Err() == nil; key, _ = cursor.Next() { ref, ok, err := parseChunkRef(decodeKey(key)) @@ -40,17 +41,32 @@ func ForEachChunk(ctx context.Context, bucket *bbolt.Bucket, config config.Perio if !ok { continue } - current.ChunkRef = ref - current.Labels = labelsMapper.Get(ref.SeriesID, ref.UserID) - deleteChunk, err := callback(current) - if err != nil { - return err - } - if deleteChunk { - if err := cursor.Delete(); err != nil { + if len(current.Chunks()) == 0 { + current.Reset(ref.SeriesID, ref.UserID, labelsMapper.Get(ref.SeriesID, ref.UserID)) + } else if bytes.Compare(current.UserID(), ref.UserID) != 0 || bytes.Compare(current.SeriesID(), ref.SeriesID) != 0 { + err = callback(current) + if err != nil { return err } + + current.Reset(ref.SeriesID, ref.UserID, labelsMapper.Get(ref.SeriesID, ref.UserID)) + } + + current.AppendChunks(retention.Chunk{ + ChunkID: ref.ChunkID, + From: ref.From, + Through: ref.Through, + }) + } + if ctx.Err() != nil { + return ctx.Err() + } + + if len(current.Chunks()) != 0 { + err = callback(current) + if err != nil { + return err } } @@ -117,3 +133,32 @@ func (s *seriesCleaner) CleanupSeries(userID []byte, lbls labels.Labels) error { return nil } + +func (s *seriesCleaner) RemoveChunk(from, through model.Time, userID []byte, lbls labels.Labels, chunkID []byte) error { + // We need to add metric name label as well if it is missing since the series ids are calculated including that. + if lbls.Get(labels.MetricName) == "" { + lbls = append(lbls, labels.Label{ + Name: labels.MetricName, + Value: logMetricName, + }) + } + + indexEntries, err := s.schema.GetChunkWriteEntries(from, through, string(userID), logMetricName, lbls, string(chunkID)) + if err != nil { + return err + } + + for _, indexEntry := range indexEntries { + key := make([]byte, 0, len(indexEntry.HashValue)+len(separator)+len(indexEntry.RangeValue)) + key = append(key, []byte(indexEntry.HashValue)...) + key = append(key, []byte(separator)...) + key = append(key, indexEntry.RangeValue...) + + err := s.bucket.Delete(key) + if err != nil { + return err + } + } + + return nil +} diff --git a/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/iterator_test.go b/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/iterator_test.go index c4acfd33b6..d97616af0f 100644 --- a/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/iterator_test.go +++ b/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/iterator_test.go @@ -42,30 +42,34 @@ func Test_ChunkIterator(t *testing.T) { tables := store.indexTables() require.Len(t, tables, 1) - var actual []retention.ChunkEntry + var actual []retention.Chunk err = tables[0].DB.Update(func(tx *bbolt.Tx) error { - return ForEachChunk(context.Background(), tx.Bucket(local.IndexBucketName), tt.config, func(entry retention.ChunkEntry) (deleteChunk bool, err error) { - actual = append(actual, entry) - return len(actual) == 2, nil + seriesCleaner := newSeriesCleaner(tx.Bucket(local.IndexBucketName), tt.config, tables[0].name) + return ForEachSeries(context.Background(), tx.Bucket(local.IndexBucketName), tt.config, func(series retention.Series) (err error) { + actual = append(actual, series.Chunks()...) + if string(series.UserID()) == c2.UserID { + return seriesCleaner.RemoveChunk(actual[1].From, actual[1].Through, series.UserID(), series.Labels(), actual[1].ChunkID) + } + return nil }) }) require.NoError(t, err) - require.Equal(t, []retention.ChunkEntry{ - entryFromChunk(store.schemaCfg, c1), - entryFromChunk(store.schemaCfg, c2), + require.Equal(t, []retention.Chunk{ + retentionChunkFromChunk(store.schemaCfg, c1), + retentionChunkFromChunk(store.schemaCfg, c2), }, actual) // second pass we delete c2 actual = actual[:0] err = tables[0].DB.Update(func(tx *bbolt.Tx) error { - return ForEachChunk(context.Background(), tx.Bucket(local.IndexBucketName), tt.config, func(entry retention.ChunkEntry) (deleteChunk bool, err error) { - actual = append(actual, entry) - return false, nil + return ForEachSeries(context.Background(), tx.Bucket(local.IndexBucketName), tt.config, func(series retention.Series) (err error) { + actual = append(actual, series.Chunks()...) + return nil }) }) require.NoError(t, err) - require.Equal(t, []retention.ChunkEntry{ - entryFromChunk(store.schemaCfg, c1), + require.Equal(t, []retention.Chunk{ + retentionChunkFromChunk(store.schemaCfg, c1), }, actual) }) } @@ -92,12 +96,12 @@ func Test_ChunkIteratorContextCancelation(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - var actual []retention.ChunkEntry + var actual []retention.Chunk err = tables[0].DB.Update(func(tx *bbolt.Tx) error { - return ForEachChunk(ctx, tx.Bucket(local.IndexBucketName), schemaCfg.Configs[0], func(entry retention.ChunkEntry) (deleteChunk bool, err error) { - actual = append(actual, entry) + return ForEachSeries(ctx, tx.Bucket(local.IndexBucketName), schemaCfg.Configs[0], func(series retention.Series) (err error) { + actual = append(actual, series.Chunks()...) cancel() - return len(actual) == 2, nil + return nil }) }) @@ -110,7 +114,6 @@ func Test_SeriesCleaner(t *testing.T) { t.Run(tt.schema, func(t *testing.T) { cm := storage.NewClientMetrics() defer cm.Unregister() - testSchema := config.SchemaConfig{Configs: []config.PeriodConfig{tt.config}} store := newTestStore(t, cm) chunkfmt, headfmt, err := tt.config.ChunkFormat() require.NoError(t, err) @@ -129,27 +132,33 @@ func Test_SeriesCleaner(t *testing.T) { require.Len(t, tables, 1) // remove c1, c2 chunk err = tables[0].DB.Update(func(tx *bbolt.Tx) error { - return ForEachChunk(context.Background(), tx.Bucket(local.IndexBucketName), tt.config, func(entry retention.ChunkEntry) (deleteChunk bool, err error) { - return entry.Labels.Get("bar") == "foo", nil + seriesCleaner := newSeriesCleaner(tx.Bucket(local.IndexBucketName), tt.config, tables[0].name) + return ForEachSeries(context.Background(), tx.Bucket(local.IndexBucketName), tt.config, func(series retention.Series) (err error) { + if series.Labels().Get("bar") == "foo" { + for _, chk := range series.Chunks() { + require.NoError(t, seriesCleaner.RemoveChunk(chk.From, chk.Through, series.UserID(), series.Labels(), chk.ChunkID)) + } + } + return nil }) }) require.NoError(t, err) err = tables[0].DB.Update(func(tx *bbolt.Tx) error { cleaner := newSeriesCleaner(tx.Bucket(local.IndexBucketName), tt.config, tables[0].name) - if err := cleaner.CleanupSeries(entryFromChunk(testSchema, c2).UserID, c2.Metric); err != nil { + if err := cleaner.CleanupSeries([]byte(c2.UserID), c2.Metric); err != nil { return err } // remove series for c1 without __name__ label, which should work just fine - return cleaner.CleanupSeries(entryFromChunk(testSchema, c1).UserID, labels.NewBuilder(c1.Metric).Del(labels.MetricName).Labels()) + return cleaner.CleanupSeries([]byte(c1.UserID), labels.NewBuilder(c1.Metric).Del(labels.MetricName).Labels()) }) require.NoError(t, err) err = tables[0].DB.View(func(tx *bbolt.Tx) error { return tx.Bucket(local.IndexBucketName).ForEach(func(k, _ []byte) error { - c1SeriesID := entryFromChunk(testSchema, c1).SeriesID - c2SeriesID := entryFromChunk(testSchema, c2).SeriesID + c1SeriesID := labelsSeriesID(c1.Metric) + c2SeriesID := labelsSeriesID(c2.Metric) series, ok, err := parseLabelIndexSeriesID(decodeKey(k)) if !ok { return nil @@ -215,21 +224,14 @@ func labelsString(ls labels.Labels) string { return b.String() } -func entryFromChunk(s config.SchemaConfig, c chunk.Chunk) retention.ChunkEntry { - return retention.ChunkEntry{ - ChunkRef: retention.ChunkRef{ - UserID: []byte(c.UserID), - SeriesID: labelsSeriesID(c.Metric), - ChunkID: []byte(s.ExternalKey(c.ChunkRef)), - From: c.From, - Through: c.Through, - }, - Labels: labels.NewBuilder(c.Metric).Del(labels.MetricName).Labels(), +func retentionChunkFromChunk(s config.SchemaConfig, c chunk.Chunk) retention.Chunk { + return retention.Chunk{ + ChunkID: []byte(s.ExternalKey(c.ChunkRef)), + From: c.From, + Through: c.Through, } } -var chunkEntry retention.ChunkEntry - func Benchmark_ChunkIterator(b *testing.B) { cm := storage.NewClientMetrics() defer cm.Unregister() @@ -249,14 +251,13 @@ func Benchmark_ChunkIterator(b *testing.B) { b.ReportAllocs() b.ResetTimer() - var total int64 + var total int _ = store.indexTables()[0].Update(func(tx *bbolt.Tx) error { bucket := tx.Bucket(local.IndexBucketName) for n := 0; n < b.N; n++ { - err := ForEachChunk(context.Background(), bucket, allSchemas[0].config, func(entry retention.ChunkEntry) (deleteChunk bool, err error) { - chunkEntry = entry - total++ - return true, nil + err := ForEachSeries(context.Background(), bucket, allSchemas[0].config, func(series retention.Series) (err error) { + total += len(series.Chunks()) + return nil }) require.NoError(b, err) } diff --git a/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/series.go b/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/series.go index 2e53a37b44..4e51bdf762 100644 --- a/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/series.go +++ b/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/series.go @@ -22,15 +22,15 @@ func newUserSeries(seriesID []byte, userID []byte) userSeries { } } -func (us userSeries) Key() string { +func (us *userSeries) Key() string { return unsafeGetString(us.key) } -func (us userSeries) SeriesID() []byte { +func (us *userSeries) SeriesID() []byte { return us.key[:us.seriesIDLen] } -func (us userSeries) UserID() []byte { +func (us *userSeries) UserID() []byte { return us.key[us.seriesIDLen:] } diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/compactor.go b/pkg/storage/stores/shipper/indexshipper/tsdb/compactor.go index ee9784a02d..eafdb4b60e 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/compactor.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/compactor.go @@ -288,24 +288,23 @@ func newCompactedIndex(ctx context.Context, tableName, userID, workingDir string } } -// ForEachChunk iterates over all the chunks in the builder and calls the callback function. -func (c *compactedIndex) ForEachChunk(ctx context.Context, callback retention.ChunkEntryCallback) error { +// ForEachSeries iterates over all the chunks in the builder and calls the callback function. +func (c *compactedIndex) ForEachSeries(ctx context.Context, callback retention.SeriesCallback) error { schemaCfg := config.SchemaConfig{ Configs: []config.PeriodConfig{c.periodConfig}, } - chunkEntry := retention.ChunkEntry{ - ChunkRef: retention.ChunkRef{ - UserID: getUnsafeBytes(c.userID), - }, - } logprotoChunkRef := logproto.ChunkRef{ UserID: c.userID, } + var series retention.Series for seriesID, stream := range c.builder.streams { + series.Reset( + getUnsafeBytes(seriesID), + getUnsafeBytes(c.userID), + withoutTenantLabel(stream.labels), + ) logprotoChunkRef.Fingerprint = uint64(stream.fp) - chunkEntry.SeriesID = getUnsafeBytes(seriesID) - chunkEntry.Labels = withoutTenantLabel(stream.labels) for i := 0; i < len(stream.chunks) && ctx.Err() == nil; i++ { chk := stream.chunks[i] @@ -313,19 +312,19 @@ func (c *compactedIndex) ForEachChunk(ctx context.Context, callback retention.Ch logprotoChunkRef.Through = chk.Through() logprotoChunkRef.Checksum = chk.Checksum - chunkEntry.ChunkID = getUnsafeBytes(schemaCfg.ExternalKey(logprotoChunkRef)) - chunkEntry.From = logprotoChunkRef.From - chunkEntry.Through = logprotoChunkRef.Through - - deleteChunk, err := callback(chunkEntry) - if err != nil { - return err - } + series.AppendChunks(retention.Chunk{ + ChunkID: getUnsafeBytes(schemaCfg.ExternalKey(logprotoChunkRef)), + From: logprotoChunkRef.From, + Through: logprotoChunkRef.Through, + }) + } + if ctx.Err() != nil { + return ctx.Err() + } - if deleteChunk { - // add the chunk to the list of chunks to delete which would be taken care of while building the index. - c.deleteChunks[seriesID] = append(c.deleteChunks[seriesID], chk) - } + err := callback(series) + if err != nil { + return err } } @@ -368,6 +367,22 @@ func (c *compactedIndex) CleanupSeries(_ []byte, lbls labels.Labels) error { return nil } +func (c *compactedIndex) RemoveChunk(from, through model.Time, userID []byte, labels labels.Labels, chunkID []byte) error { + chk, err := chunk.ParseExternalKey(string(userID), string(chunkID)) + if err != nil { + return err + } + + seriesID := labels.String() + c.deleteChunks[seriesID] = append(c.deleteChunks[seriesID], tsdbindex.ChunkMeta{ + Checksum: chk.Checksum, + MinTime: int64(from), + MaxTime: int64(through), + }) + + return nil +} + func (c *compactedIndex) Cleanup() {} // ToIndexFile creates an indexFile from the chunksmetas stored in the builder. diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/compactor_test.go b/pkg/storage/stores/shipper/indexshipper/tsdb/compactor_test.go index be0a343309..f5f8ff27e9 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/compactor_test.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/compactor_test.go @@ -626,18 +626,13 @@ func TestCompactor_Compact(t *testing.T) { } } -func chunkMetasToChunkEntry(schemaCfg config.SchemaConfig, userID string, lbls labels.Labels, chunkMetas index.ChunkMetas) []retention.ChunkEntry { - chunkEntries := make([]retention.ChunkEntry, 0, len(chunkMetas)) +func chunkMetasToRetentionChunk(schemaCfg config.SchemaConfig, userID string, lbls labels.Labels, chunkMetas index.ChunkMetas) []retention.Chunk { + chunkEntries := make([]retention.Chunk, 0, len(chunkMetas)) for _, chunkMeta := range chunkMetas { - chunkEntries = append(chunkEntries, retention.ChunkEntry{ - ChunkRef: retention.ChunkRef{ - UserID: []byte(userID), - SeriesID: []byte(lbls.String()), - ChunkID: []byte(schemaCfg.ExternalKey(chunkMetaToChunkRef(userID, chunkMeta, lbls))), - From: chunkMeta.From(), - Through: chunkMeta.Through(), - }, - Labels: lbls, + chunkEntries = append(chunkEntries, retention.Chunk{ + ChunkID: []byte(schemaCfg.ExternalKey(chunkMetaToChunkRef(userID, chunkMeta, lbls))), + From: chunkMeta.From(), + Through: chunkMeta.Through(), }) } @@ -658,35 +653,58 @@ func TestCompactedIndex(t *testing.T) { testCtx := setupCompactedIndex(t) for name, tc := range map[string]struct { - deleteChunks map[string]index.ChunkMetas + deleteChunks map[string][]retention.Chunk addChunks []chunk.Chunk deleteSeries []labels.Labels shouldErr bool - finalExpectedChunks map[string]index.ChunkMetas + finalExpectedChunks map[string][]retention.Chunk }{ "no changes": { - finalExpectedChunks: map[string]index.ChunkMetas{ - testCtx.lbls1.String(): buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(10)), - testCtx.lbls2.String(): buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(20)), + finalExpectedChunks: map[string][]retention.Chunk{ + testCtx.lbls1.String(): chunkMetasToRetentionChunk(testCtx.schemaCfg, testCtx.userID, testCtx.lbls1, buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(10))), + testCtx.lbls2.String(): chunkMetasToRetentionChunk(testCtx.schemaCfg, testCtx.userID, testCtx.lbls2, buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(20))), }, }, "delete some chunks from a stream": { - deleteChunks: map[string]index.ChunkMetas{ - testCtx.lbls1.String(): append(buildChunkMetas(testCtx.shiftTableStart(3), testCtx.shiftTableStart(5)), buildChunkMetas(testCtx.shiftTableStart(7), testCtx.shiftTableStart(8))...), + deleteChunks: map[string][]retention.Chunk{ + testCtx.lbls1.String(): chunkMetasToRetentionChunk( + testCtx.schemaCfg, testCtx.userID, testCtx.lbls1, + append( + buildChunkMetas(testCtx.shiftTableStart(3), testCtx.shiftTableStart(5)), + buildChunkMetas(testCtx.shiftTableStart(7), testCtx.shiftTableStart(8))..., + ), + ), }, - finalExpectedChunks: map[string]index.ChunkMetas{ - testCtx.lbls1.String(): append(buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(2)), append(buildChunkMetas(testCtx.shiftTableStart(6), testCtx.shiftTableStart(6)), buildChunkMetas(testCtx.shiftTableStart(9), testCtx.shiftTableStart(10))...)...), - testCtx.lbls2.String(): buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(20)), + finalExpectedChunks: map[string][]retention.Chunk{ + testCtx.lbls1.String(): chunkMetasToRetentionChunk( + testCtx.schemaCfg, testCtx.userID, testCtx.lbls1, + append( + buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(2)), + append(buildChunkMetas(testCtx.shiftTableStart(6), testCtx.shiftTableStart(6)), + buildChunkMetas(testCtx.shiftTableStart(9), testCtx.shiftTableStart(10))..., + )..., + ), + ), + testCtx.lbls2.String(): chunkMetasToRetentionChunk( + testCtx.schemaCfg, testCtx.userID, testCtx.lbls2, + buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(20)), + ), }, }, "delete all chunks from a stream": { - deleteChunks: map[string]index.ChunkMetas{ - testCtx.lbls1.String(): buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(10)), + deleteChunks: map[string][]retention.Chunk{ + testCtx.lbls1.String(): chunkMetasToRetentionChunk( + testCtx.schemaCfg, testCtx.userID, testCtx.lbls1, + buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(10)), + ), }, deleteSeries: []labels.Labels{testCtx.lbls1}, - finalExpectedChunks: map[string]index.ChunkMetas{ - testCtx.lbls2.String(): buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(20)), + finalExpectedChunks: map[string][]retention.Chunk{ + testCtx.lbls2.String(): chunkMetasToRetentionChunk( + testCtx.schemaCfg, testCtx.userID, testCtx.lbls2, + buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(20)), + ), }, }, "add some chunks to a stream": { @@ -702,9 +720,15 @@ func TestCompactedIndex(t *testing.T) { Data: dummyChunkData{}, }, }, - finalExpectedChunks: map[string]index.ChunkMetas{ - testCtx.lbls1.String(): buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(12)), - testCtx.lbls2.String(): buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(20)), + finalExpectedChunks: map[string][]retention.Chunk{ + testCtx.lbls1.String(): chunkMetasToRetentionChunk( + testCtx.schemaCfg, testCtx.userID, testCtx.lbls1, + buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(12)), + ), + testCtx.lbls2.String(): chunkMetasToRetentionChunk( + testCtx.schemaCfg, testCtx.userID, testCtx.lbls2, + buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(20)), + ), }, }, "__name__ label should get dropped while indexing chunks": { @@ -720,9 +744,15 @@ func TestCompactedIndex(t *testing.T) { Data: dummyChunkData{}, }, }, - finalExpectedChunks: map[string]index.ChunkMetas{ - testCtx.lbls1.String(): buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(12)), - testCtx.lbls2.String(): buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(20)), + finalExpectedChunks: map[string][]retention.Chunk{ + testCtx.lbls1.String(): chunkMetasToRetentionChunk( + testCtx.schemaCfg, testCtx.userID, testCtx.lbls1, + buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(12)), + ), + testCtx.lbls2.String(): chunkMetasToRetentionChunk( + testCtx.schemaCfg, testCtx.userID, testCtx.lbls2, + buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(20)), + ), }, }, "add some chunks out of table interval to a stream": { @@ -749,9 +779,15 @@ func TestCompactedIndex(t *testing.T) { Data: dummyChunkData{}, }, }, - finalExpectedChunks: map[string]index.ChunkMetas{ - testCtx.lbls1.String(): buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(12)), - testCtx.lbls2.String(): buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(20)), + finalExpectedChunks: map[string][]retention.Chunk{ + testCtx.lbls1.String(): chunkMetasToRetentionChunk( + testCtx.schemaCfg, testCtx.userID, testCtx.lbls1, + buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(12)), + ), + testCtx.lbls2.String(): chunkMetasToRetentionChunk( + testCtx.schemaCfg, testCtx.userID, testCtx.lbls2, + buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(20)), + ), }, }, "add and delete some chunks in a stream": { @@ -767,12 +803,24 @@ func TestCompactedIndex(t *testing.T) { Data: dummyChunkData{}, }, }, - deleteChunks: map[string]index.ChunkMetas{ - testCtx.lbls1.String(): buildChunkMetas(testCtx.shiftTableStart(3), testCtx.shiftTableStart(5)), + deleteChunks: map[string][]retention.Chunk{ + testCtx.lbls1.String(): chunkMetasToRetentionChunk( + testCtx.schemaCfg, testCtx.userID, testCtx.lbls1, + buildChunkMetas(testCtx.shiftTableStart(3), testCtx.shiftTableStart(5)), + ), }, - finalExpectedChunks: map[string]index.ChunkMetas{ - testCtx.lbls1.String(): append(buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(2)), buildChunkMetas(testCtx.shiftTableStart(6), testCtx.shiftTableStart(12))...), - testCtx.lbls2.String(): buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(20)), + finalExpectedChunks: map[string][]retention.Chunk{ + testCtx.lbls1.String(): chunkMetasToRetentionChunk( + testCtx.schemaCfg, testCtx.userID, testCtx.lbls1, + append( + buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(2)), + buildChunkMetas(testCtx.shiftTableStart(6), testCtx.shiftTableStart(12))..., + ), + ), + testCtx.lbls2.String(): chunkMetasToRetentionChunk( + testCtx.schemaCfg, testCtx.userID, testCtx.lbls2, + buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(20)), + ), }, }, "adding chunk to non-existing stream should error": { @@ -789,19 +837,17 @@ func TestCompactedIndex(t *testing.T) { t.Run(name, func(t *testing.T) { compactedIndex := testCtx.buildCompactedIndex() - foundChunkEntries := map[string][]retention.ChunkEntry{} - err := compactedIndex.ForEachChunk(context.Background(), func(chunkEntry retention.ChunkEntry) (deleteChunk bool, err error) { - seriesIDStr := string(chunkEntry.SeriesID) - foundChunkEntries[seriesIDStr] = append(foundChunkEntries[seriesIDStr], chunkEntry) - if chks, ok := tc.deleteChunks[string(chunkEntry.SeriesID)]; ok { + foundChunkEntries := map[string][]retention.Chunk{} + err := compactedIndex.ForEachSeries(context.Background(), func(series retention.Series) error { + seriesIDStr := string(series.SeriesID()) + foundChunkEntries[seriesIDStr] = append(foundChunkEntries[seriesIDStr], series.Chunks()...) + if chks, ok := tc.deleteChunks[string(series.SeriesID())]; ok { for _, chk := range chks { - if chk.MinTime == int64(chunkEntry.From) && chk.MaxTime == int64(chunkEntry.Through) { - return true, nil - } + require.NoError(t, compactedIndex.RemoveChunk(chk.From, chk.Through, series.UserID(), series.Labels(), chk.ChunkID)) } } - return false, nil + return nil }) require.NoError(t, err) @@ -823,9 +869,9 @@ func TestCompactedIndex(t *testing.T) { } require.NoError(t, err) - foundChunks := map[string]index.ChunkMetas{} + foundChunks := map[string][]retention.Chunk{} err = indexFile.(*TSDBFile).Index.(*TSDBIndex).ForSeries(context.Background(), "", nil, 0, math.MaxInt64, func(lbls labels.Labels, _ model.Fingerprint, chks []index.ChunkMeta) (stop bool) { - foundChunks[lbls.String()] = append(index.ChunkMetas{}, chks...) + foundChunks[lbls.String()] = chunkMetasToRetentionChunk(testCtx.schemaCfg, testCtx.userID, lbls, chks) return false }, labels.MustNewMatcher(labels.MatchEqual, "", "")) require.NoError(t, err) @@ -843,11 +889,8 @@ func TestIteratorContextCancelation(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) cancel() - var foundChunkEntries []retention.ChunkEntry - err := compactedIndex.ForEachChunk(ctx, func(chunkEntry retention.ChunkEntry) (deleteChunk bool, err error) { - foundChunkEntries = append(foundChunkEntries, chunkEntry) - - return false, nil + err := compactedIndex.ForEachSeries(ctx, func(_ retention.Series) error { + return nil }) require.ErrorIs(t, err, context.Canceled) @@ -860,7 +903,8 @@ type testContext struct { tableInterval model.Interval shiftTableStart func(ms int64) int64 buildCompactedIndex func() *compactedIndex - expectedChunkEntries map[string][]retention.ChunkEntry + expectedChunkEntries map[string][]retention.Chunk + schemaCfg config.SchemaConfig } func setupCompactedIndex(t *testing.T) *testContext { @@ -903,12 +947,12 @@ func setupCompactedIndex(t *testing.T) *testContext { return newCompactedIndex(context.Background(), tableName.Prefix, buildUserID(0), t.TempDir(), periodConfig, builder) } - expectedChunkEntries := map[string][]retention.ChunkEntry{ - lbls1.String(): chunkMetasToChunkEntry(schemaCfg, userID, lbls1, buildChunkMetas(shiftTableStart(0), shiftTableStart(10))), - lbls2.String(): chunkMetasToChunkEntry(schemaCfg, userID, lbls2, buildChunkMetas(shiftTableStart(0), shiftTableStart(20))), + expectedChunkEntries := map[string][]retention.Chunk{ + lbls1.String(): chunkMetasToRetentionChunk(schemaCfg, userID, lbls1, buildChunkMetas(shiftTableStart(0), shiftTableStart(10))), + lbls2.String(): chunkMetasToRetentionChunk(schemaCfg, userID, lbls2, buildChunkMetas(shiftTableStart(0), shiftTableStart(20))), } - return &testContext{lbls1, lbls2, userID, tableInterval, shiftTableStart, buildCompactedIndex, expectedChunkEntries} + return &testContext{lbls1, lbls2, userID, tableInterval, shiftTableStart, buildCompactedIndex, expectedChunkEntries, schemaCfg} } type dummyChunkData struct { diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/index/chunk.go b/pkg/storage/stores/shipper/indexshipper/tsdb/index/chunk.go index 8094e19af0..b33c019fb6 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/index/chunk.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/index/chunk.go @@ -151,7 +151,7 @@ func (c ChunkMetas) Drop(chk ChunkMeta) (ChunkMetas, bool) { return ichk.Checksum >= chk.Checksum }) - if j >= len(c) || c[j] != chk { + if j >= len(c) || c[j].Checksum != chk.Checksum || c[j].MinTime != chk.MinTime || c[j].MaxTime != chk.MaxTime { return c, false } diff --git a/pkg/tool/audit/audit.go b/pkg/tool/audit/audit.go index 017a3a9e93..b155f169a1 100644 --- a/pkg/tool/audit/audit.go +++ b/pkg/tool/audit/audit.go @@ -101,20 +101,22 @@ func ValidateCompactedIndex(ctx context.Context, objClient client.ObjectClient, g, ctx := errgroup.WithContext(ctx) g.SetLimit(parallelism) - compactedIdx.ForEachChunk(ctx, func(ce retention.ChunkEntry) (deleteChunk bool, err error) { //nolint:errcheck + compactedIdx.ForEachSeries(ctx, func(s retention.Series) (err error) { //nolint:errcheck bar.Add(1) // nolint:errcheck g.Go(func() error { - exists, err := CheckChunkExistance(string(ce.ChunkID), objClient) - if err != nil || !exists { - missingChunks.Add(1) - logger.Log("msg", "chunk is missing", "err", err, "chunk_id", string(ce.ChunkID)) - return nil + for _, c := range s.Chunks() { + exists, err := CheckChunkExistance(string(c.ChunkID), objClient) + if err != nil || !exists { + missingChunks.Add(1) + logger.Log("msg", "chunk is missing", "err", err, "chunk_id", string(c.ChunkID)) + return nil + } + foundChunks.Add(1) } - foundChunks.Add(1) return nil }) - return false, nil + return nil }) g.Wait() // nolint:errcheck diff --git a/pkg/tool/audit/audit_test.go b/pkg/tool/audit/audit_test.go index 4e20b075be..b8cfb689f5 100644 --- a/pkg/tool/audit/audit_test.go +++ b/pkg/tool/audit/audit_test.go @@ -44,28 +44,25 @@ func (t testObjClient) GetAttributes(_ context.Context, object string) (client.O type testCompactedIdx struct { compactor.CompactedIndex - chunks []retention.ChunkEntry + chunks []retention.Chunk } -func (t testCompactedIdx) ForEachChunk(_ context.Context, f retention.ChunkEntryCallback) error { - for _, chunk := range t.chunks { - if _, err := f(chunk); err != nil { - return err - } - } - return nil +func (t testCompactedIdx) ForEachSeries(_ context.Context, f retention.SeriesCallback) error { + var series retention.Series + series.AppendChunks(t.chunks...) + return f(series) } func TestAuditIndex(t *testing.T) { ctx := context.Background() objClient := testObjClient{} compactedIdx := testCompactedIdx{ - chunks: []retention.ChunkEntry{ - {ChunkRef: retention.ChunkRef{ChunkID: []byte("found-1")}}, - {ChunkRef: retention.ChunkRef{ChunkID: []byte("found-2")}}, - {ChunkRef: retention.ChunkRef{ChunkID: []byte("found-3")}}, - {ChunkRef: retention.ChunkRef{ChunkID: []byte("found-4")}}, - {ChunkRef: retention.ChunkRef{ChunkID: []byte("missing-1")}}, + chunks: []retention.Chunk{ + {ChunkID: []byte("found-1")}, + {ChunkID: []byte("found-2")}, + {ChunkID: []byte("found-3")}, + {ChunkID: []byte("found-4")}, + {ChunkID: []byte("missing-1")}, }, } logger := log.NewNopLogger() diff --git a/tools/tsdb/tsdb-map/main.go b/tools/tsdb/tsdb-map/main.go index 9f35b53fe4..0d06908ad1 100644 --- a/tools/tsdb/tsdb-map/main.go +++ b/tools/tsdb/tsdb-map/main.go @@ -78,15 +78,19 @@ func main() { // loads everything into memory. if err := db.View(func(t *bbolt.Tx) error { - return boltdbcompactor.ForEachChunk(context.Background(), t.Bucket([]byte("index")), periodConfig, func(entry retention.ChunkEntry) (bool, error) { - builder.AddSeries(entry.Labels, model.Fingerprint(entry.Labels.Hash()), []index.ChunkMeta{{ - Checksum: extractChecksumFromChunkID(entry.ChunkID), - MinTime: int64(entry.From), - MaxTime: int64(entry.Through), - KB: ((3 << 20) / 4) / 1024, // guess: 0.75mb, 1/2 of the max size, rounded to KB - Entries: 10000, // guess: 10k entries - }}) - return false, nil + return boltdbcompactor.ForEachSeries(context.Background(), t.Bucket([]byte("index")), periodConfig, func(s retention.Series) error { + chunkMetas := make([]index.ChunkMeta, 0, len(s.Chunks())) + for _, chunk := range s.Chunks() { + chunkMetas = append(chunkMetas, index.ChunkMeta{ + Checksum: extractChecksumFromChunkID(chunk.ChunkID), + MinTime: int64(chunk.From), + MaxTime: int64(chunk.Through), + KB: ((3 << 20) / 4) / 1024, // guess: 0.75mb, 1/2 of the max size, rounded to KB + Entries: 10000, // guess: 10k entries + }) + } + builder.AddSeries(s.Labels(), model.Fingerprint(s.Labels().Hash()), chunkMetas) + return nil }) }); err != nil { panic(err)