From 3170edf3e3cf9cc354c41cf01556eb6024016d4f Mon Sep 17 00:00:00 2001 From: Travis Patterson Date: Thu, 11 Aug 2022 11:13:29 -0600 Subject: [PATCH] Add timeouts to deletions (#6879) * Add context to chunk iteration * Configure timeouts * add expiration timeout * update test * update test * Clarify timeout implications, log more specifically --- docs/sources/configuration/_index.md | 13 ++ .../indexshipper/compactor/compactor.go | 9 +- .../deletion/delete_requests_manager.go | 9 + .../compactor/deletion/metrics.go | 6 + .../indexshipper/compactor/index_set.go | 4 + .../compactor/retention/expiration.go | 3 + .../compactor/retention/retention.go | 46 ++++- .../compactor/retention/retention_test.go | 72 +++++-- .../compactor/retention/util_test.go | 7 +- .../stores/indexshipper/compactor/testutil.go | 2 +- .../index/compactor/compacted_index.go | 5 +- .../index/compactor/compacted_index_test.go | 4 +- .../shipper/index/compactor/iterator.go | 7 +- .../shipper/index/compactor/iterator_test.go | 39 +++- pkg/storage/stores/tsdb/compactor.go | 7 +- pkg/storage/stores/tsdb/compactor_test.go | 185 +++++++++++------- tools/tsdb/tsdb-map/main.go | 2 +- 17 files changed, 305 insertions(+), 115 deletions(-) diff --git a/docs/sources/configuration/_index.md b/docs/sources/configuration/_index.md index 423aa256bc..7908b82ed9 100644 --- a/docs/sources/configuration/_index.md +++ b/docs/sources/configuration/_index.md @@ -2105,6 +2105,19 @@ compacts index shards to more performant forms. # CLI flag: -boltdb.shipper.compactor.delete-request-cancel-period [delete_request_cancel_period: | default = 24h] +# The max number of delete requests to run per compaction cycle. +# CLI flag: -boltdb.shipper.compactor.delete-batch-size +[delete_batch_size: | default = 70] + +# The maximum amount of time to spend running retention and deletion +# on any given table in the index. 0 is no timeout +# +# NOTE: This timeout prioritizes runtime over completeness of retention/deletion. +# It may take several compaction runs to fully perform retention and process +# all outstanding delete requests +# CLI flag: -boltdb.shipper.compactor.retention-table-timeout +[retention_table_timeout: | default = 0] + # Maximum number of tables to compact in parallel. # While increasing this value, please make sure compactor has enough disk space # allocated to be able to store and compact as many tables. diff --git a/pkg/storage/stores/indexshipper/compactor/compactor.go b/pkg/storage/stores/indexshipper/compactor/compactor.go index 145dfe3740..5d2257f71f 100644 --- a/pkg/storage/stores/indexshipper/compactor/compactor.go +++ b/pkg/storage/stores/indexshipper/compactor/compactor.go @@ -78,6 +78,7 @@ type Config struct { RetentionEnabled bool `yaml:"retention_enabled"` RetentionDeleteDelay time.Duration `yaml:"retention_delete_delay"` RetentionDeleteWorkCount int `yaml:"retention_delete_worker_count"` + RetentionTableTimeout time.Duration `yaml:"retention_table_timeout"` DeleteBatchSize int `yaml:"delete_batch_size"` DeleteRequestCancelPeriod time.Duration `yaml:"delete_request_cancel_period"` MaxCompactionParallelism int `yaml:"max_compaction_parallelism"` @@ -97,6 +98,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.RetentionDeleteWorkCount, "boltdb.shipper.compactor.retention-delete-worker-count", 150, "The total amount of worker to use to delete chunks.") f.IntVar(&cfg.DeleteBatchSize, "boltdb.shipper.compactor.delete-batch-size", 70, "The max number of delete requests to run per compaction cycle.") f.DurationVar(&cfg.DeleteRequestCancelPeriod, "boltdb.shipper.compactor.delete-request-cancel-period", 24*time.Hour, "Allow cancellation of delete request until duration after they are created. Data would be deleted only after delete requests have been older than this duration. Ideally this should be set to at least 24h.") + f.DurationVar(&cfg.RetentionTableTimeout, "boltdb.shipper.compactor.retention-table-timeout", 0, "The maximum amount of time to spend running retention and deletion on any given table in the index.") f.IntVar(&cfg.MaxCompactionParallelism, "boltdb.shipper.compactor.max-compaction-parallelism", 1, "Maximum number of tables to compact in parallel. While increasing this value, please make sure compactor has enough disk space allocated to be able to store and compact as many tables.") f.BoolVar(&cfg.RunOnce, "boltdb.shipper.compactor.run-once", false, "Run the compactor one time to cleanup and compact index files only (no retention applied)") cfg.CompactorRing.RegisterFlagsWithPrefix("boltdb.shipper.compactor.", "collectors/", f) @@ -233,7 +235,7 @@ func (c *Compactor) init(objectClient client.ObjectClient, schemaConfig config.S return err } - c.tableMarker, err = retention.NewMarker(retentionWorkDir, c.expirationChecker, chunkClient, r) + c.tableMarker, err = retention.NewMarker(retentionWorkDir, c.expirationChecker, c.cfg.RetentionTableTimeout, chunkClient, r) if err != nil { return err } @@ -635,6 +637,11 @@ func (e *expirationChecker) MarkPhaseFinished() { e.deletionExpiryChecker.MarkPhaseFinished() } +func (e *expirationChecker) MarkPhaseTimedOut() { + e.retentionExpiryChecker.MarkPhaseTimedOut() + e.deletionExpiryChecker.MarkPhaseTimedOut() +} + func (e *expirationChecker) IntervalMayHaveExpiredChunks(interval model.Interval, userID string) bool { return e.retentionExpiryChecker.IntervalMayHaveExpiredChunks(interval, userID) || e.deletionExpiryChecker.IntervalMayHaveExpiredChunks(interval, userID) } diff --git a/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_manager.go b/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_manager.go index 6852e15bf9..c773bf0502 100644 --- a/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_manager.go +++ b/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_manager.go @@ -289,6 +289,15 @@ func (d *DeleteRequestsManager) MarkPhaseFailed() { d.deleteRequestsToProcessMtx.Lock() defer d.deleteRequestsToProcessMtx.Unlock() + d.metrics.deletionFailures.WithLabelValues("error").Inc() + d.deleteRequestsToProcess = map[string]*userDeleteRequests{} +} + +func (d *DeleteRequestsManager) MarkPhaseTimedOut() { + d.deleteRequestsToProcessMtx.Lock() + defer d.deleteRequestsToProcessMtx.Unlock() + + d.metrics.deletionFailures.WithLabelValues("timeout").Inc() d.deleteRequestsToProcess = map[string]*userDeleteRequests{} } diff --git a/pkg/storage/stores/indexshipper/compactor/deletion/metrics.go b/pkg/storage/stores/indexshipper/compactor/deletion/metrics.go index 6c7706e2f7..302b145164 100644 --- a/pkg/storage/stores/indexshipper/compactor/deletion/metrics.go +++ b/pkg/storage/stores/indexshipper/compactor/deletion/metrics.go @@ -48,6 +48,7 @@ type deleteRequestsManagerMetrics struct { deleteRequestsProcessedTotal *prometheus.CounterVec deleteRequestsChunksSelectedTotal *prometheus.CounterVec loadPendingRequestsAttemptsTotal *prometheus.CounterVec + deletionFailures *prometheus.CounterVec oldestPendingDeleteRequestAgeSeconds prometheus.Gauge pendingDeleteRequestsCount prometheus.Gauge deletedLinesTotal *prometheus.CounterVec @@ -66,6 +67,11 @@ func newDeleteRequestsManagerMetrics(r prometheus.Registerer) *deleteRequestsMan Name: "compactor_delete_requests_chunks_selected_total", Help: "Number of chunks selected while building delete plans per user", }, []string{"user"}) + m.deletionFailures = promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Namespace: "loki", + Name: "compactor_delete_processing_fails_total", + Help: "Number times the delete phase of compaction has failed", + }, []string{"cause"}) m.loadPendingRequestsAttemptsTotal = promauto.With(r).NewCounterVec(prometheus.CounterOpts{ Namespace: "loki", Name: "compactor_load_pending_requests_attempts_total", diff --git a/pkg/storage/stores/indexshipper/compactor/index_set.go b/pkg/storage/stores/indexshipper/compactor/index_set.go index 99b3ed78d8..2224afa296 100644 --- a/pkg/storage/stores/indexshipper/compactor/index_set.go +++ b/pkg/storage/stores/indexshipper/compactor/index_set.go @@ -97,6 +97,10 @@ func newIndexSet(ctx context.Context, tableName, userID string, baseIndexSet sto logger: logger, } + if userID != "" { + ui.logger = log.With(logger, "user-id", userID) + } + var err error ui.sourceObjects, err = ui.baseIndexSet.ListFiles(ui.ctx, ui.tableName, ui.userID, false) if err != nil { diff --git a/pkg/storage/stores/indexshipper/compactor/retention/expiration.go b/pkg/storage/stores/indexshipper/compactor/retention/expiration.go index 8f9617b037..f6e87637a5 100644 --- a/pkg/storage/stores/indexshipper/compactor/retention/expiration.go +++ b/pkg/storage/stores/indexshipper/compactor/retention/expiration.go @@ -26,6 +26,7 @@ type ExpirationChecker interface { IntervalMayHaveExpiredChunks(interval model.Interval, userID string) bool MarkPhaseStarted() MarkPhaseFailed() + MarkPhaseTimedOut() MarkPhaseFinished() DropFromIndex(ref ChunkEntry, tableEndTime model.Time, now model.Time) bool } @@ -71,6 +72,7 @@ func (e *expirationChecker) MarkPhaseStarted() { } func (e *expirationChecker) MarkPhaseFailed() {} +func (e *expirationChecker) MarkPhaseTimedOut() {} func (e *expirationChecker) MarkPhaseFinished() {} func (e *expirationChecker) IntervalMayHaveExpiredChunks(interval model.Interval, userID string) bool { @@ -105,6 +107,7 @@ func (e *neverExpiringExpirationChecker) IntervalMayHaveExpiredChunks(interval m } func (e *neverExpiringExpirationChecker) MarkPhaseStarted() {} func (e *neverExpiringExpirationChecker) MarkPhaseFailed() {} +func (e *neverExpiringExpirationChecker) MarkPhaseTimedOut() {} func (e *neverExpiringExpirationChecker) MarkPhaseFinished() {} func (e *neverExpiringExpirationChecker) DropFromIndex(ref ChunkEntry, tableEndTime model.Time, now model.Time) bool { return false diff --git a/pkg/storage/stores/indexshipper/compactor/retention/retention.go b/pkg/storage/stores/indexshipper/compactor/retention/retention.go index 7b2a08c100..e83128fc0c 100644 --- a/pkg/storage/stores/indexshipper/compactor/retention/retention.go +++ b/pkg/storage/stores/indexshipper/compactor/retention/retention.go @@ -45,7 +45,7 @@ type ChunkEntry struct { type ChunkEntryCallback func(ChunkEntry) (deleteChunk bool, err error) type ChunkIterator interface { - ForEachChunk(callback ChunkEntryCallback) error + ForEachChunk(ctx context.Context, callback ChunkEntryCallback) error } type SeriesCleaner interface { @@ -82,15 +82,17 @@ type Marker struct { expiration ExpirationChecker markerMetrics *markerMetrics chunkClient client.Client + markTimeout time.Duration } -func NewMarker(workingDirectory string, expiration ExpirationChecker, chunkClient client.Client, r prometheus.Registerer) (*Marker, error) { +func NewMarker(workingDirectory string, expiration ExpirationChecker, markTimeout time.Duration, chunkClient client.Client, r prometheus.Registerer) (*Marker, error) { metrics := newMarkerMetrics(r) return &Marker{ workingDirectory: workingDirectory, expiration: expiration, markerMetrics: metrics, chunkClient: chunkClient, + markTimeout: markTimeout, }, nil } @@ -104,7 +106,7 @@ func (t *Marker) MarkForDelete(ctx context.Context, tableName, userID string, in }() level.Debug(logger).Log("msg", "starting to process table") - empty, modified, err := t.markTable(ctx, tableName, userID, indexProcessor) + empty, modified, err := t.markTable(ctx, tableName, userID, indexProcessor, logger) if err != nil { status = statusFailure return false, false, err @@ -112,7 +114,7 @@ func (t *Marker) MarkForDelete(ctx context.Context, tableName, userID string, in return empty, modified, nil } -func (t *Marker) markTable(ctx context.Context, tableName, userID string, indexProcessor IndexProcessor) (bool, bool, error) { +func (t *Marker) markTable(ctx context.Context, tableName, userID string, indexProcessor IndexProcessor, logger log.Logger) (bool, bool, error) { markerWriter, err := NewMarkerStorageWriter(t.workingDirectory) if err != nil { return false, false, fmt.Errorf("failed to create marker writer: %w", err) @@ -124,7 +126,7 @@ func (t *Marker) markTable(ctx context.Context, tableName, userID string, indexP chunkRewriter := newChunkRewriter(t.chunkClient, tableName, indexProcessor) - empty, modified, err := markforDelete(ctx, tableName, markerWriter, indexProcessor, t.expiration, chunkRewriter) + empty, modified, err := markForDelete(ctx, t.markTimeout, tableName, markerWriter, indexProcessor, t.expiration, chunkRewriter, logger) if err != nil { return false, false, err } @@ -146,8 +148,16 @@ func (t *Marker) markTable(ctx context.Context, tableName, userID string, indexP return empty, modified, nil } -func markforDelete(ctx context.Context, tableName string, marker MarkerStorageWriter, indexFile IndexProcessor, - expiration ExpirationChecker, chunkRewriter *chunkRewriter) (bool, bool, error) { +func markForDelete( + ctx context.Context, + timeout time.Duration, + tableName string, + marker MarkerStorageWriter, + indexFile IndexProcessor, + expiration ExpirationChecker, + chunkRewriter *chunkRewriter, + logger log.Logger, +) (bool, bool, error) { seriesMap := newUserSeriesMap() // tableInterval holds the interval for which the table is expected to have the chunks indexed tableInterval := ExtractIntervalFromTableName(tableName) @@ -156,7 +166,12 @@ func markforDelete(ctx context.Context, tableName string, marker MarkerStorageWr now := model.Now() chunksFound := false - err := indexFile.ForEachChunk(func(c ChunkEntry) (bool, error) { + // This is a fresh context so we know when deletes timeout vs something going + // wrong with the other context + iterCtx, cancel := ctxForTimeout(timeout) + defer cancel() + + err := indexFile.ForEachChunk(iterCtx, func(c ChunkEntry) (bool, error) { chunksFound = true seriesMap.Add(c.SeriesID, c.UserID, c.Labels) @@ -204,7 +219,13 @@ func markforDelete(ctx context.Context, tableName string, marker MarkerStorageWr return false, nil }) if err != nil { - return false, false, err + if errors.Is(err, context.DeadlineExceeded) && errors.Is(iterCtx.Err(), context.DeadlineExceeded) { + // Deletes timed out. Don't return an error so compaction can continue and deletes can be retried + level.Warn(logger).Log("msg", "Timed out while running delete") + expiration.MarkPhaseTimedOut() + } else { + return false, false, err + } } if !chunksFound { @@ -226,6 +247,13 @@ func markforDelete(ctx context.Context, tableName string, marker MarkerStorageWr }) } +func ctxForTimeout(t time.Duration) (context.Context, context.CancelFunc) { + if t == 0 { + return context.Background(), func() {} + } + return context.WithTimeout(context.Background(), t) +} + type ChunkClient interface { DeleteChunk(ctx context.Context, userID, chunkID string) error IsChunkNotFoundErr(err error) bool diff --git a/pkg/storage/stores/indexshipper/compactor/retention/retention_test.go b/pkg/storage/stores/indexshipper/compactor/retention/retention_test.go index 026506dafa..5b9ac8b4e5 100644 --- a/pkg/storage/stores/indexshipper/compactor/retention/retention_test.go +++ b/pkg/storage/stores/indexshipper/compactor/retention/retention_test.go @@ -145,7 +145,7 @@ func Test_Retention(t *testing.T) { sweep.Start() defer sweep.Stop() - marker, err := NewMarker(workDir, expiration, nil, prometheus.NewRegistry()) + marker, err := NewMarker(workDir, expiration, time.Hour, nil, prometheus.NewRegistry()) require.NoError(t, err) for _, table := range store.indexTables() { _, _, err := marker.MarkForDelete(context.Background(), table.name, "", table, util_log.Logger) @@ -192,11 +192,11 @@ func Test_EmptyTable(t *testing.T) { tables := store.indexTables() require.Len(t, tables, 1) - empty, _, err := markforDelete(context.Background(), tables[0].name, noopWriter{}, tables[0], NewExpirationChecker(&fakeLimits{perTenant: map[string]retentionLimit{"1": {retentionPeriod: 0}, "2": {retentionPeriod: 0}}}), nil) + empty, _, err := markForDelete(context.Background(), 0, tables[0].name, noopWriter{}, tables[0], NewExpirationChecker(&fakeLimits{perTenant: map[string]retentionLimit{"1": {retentionPeriod: 0}, "2": {retentionPeriod: 0}}}), nil, util_log.Logger) require.NoError(t, err) require.True(t, empty) - _, _, err = markforDelete(context.Background(), tables[0].name, noopWriter{}, newTable("test"), NewExpirationChecker(&fakeLimits{}), nil) + _, _, err = markForDelete(context.Background(), 0, tables[0].name, noopWriter{}, newTable("test"), NewExpirationChecker(&fakeLimits{}), nil, util_log.Logger) require.Equal(t, err, errNoChunksFound) } @@ -436,21 +436,31 @@ type chunkExpiry struct { type mockExpirationChecker struct { ExpirationChecker chunksExpiry map[string]chunkExpiry + delay time.Duration + calls int + timedOut bool } -func newMockExpirationChecker(chunksExpiry map[string]chunkExpiry) mockExpirationChecker { - return mockExpirationChecker{chunksExpiry: chunksExpiry} +func newMockExpirationChecker(chunksExpiry map[string]chunkExpiry) *mockExpirationChecker { + return &mockExpirationChecker{chunksExpiry: chunksExpiry} } -func (m mockExpirationChecker) Expired(ref ChunkEntry, now model.Time) (bool, []IntervalFilter) { +func (m *mockExpirationChecker) Expired(ref ChunkEntry, now model.Time) (bool, []IntervalFilter) { + time.Sleep(m.delay) + m.calls++ + ce := m.chunksExpiry[string(ref.ChunkID)] return ce.isExpired, ce.nonDeletedIntervalFilters } -func (m mockExpirationChecker) DropFromIndex(ref ChunkEntry, tableEndTime model.Time, now model.Time) bool { +func (m *mockExpirationChecker) DropFromIndex(ref ChunkEntry, tableEndTime model.Time, now model.Time) bool { return false } +func (m *mockExpirationChecker) MarkPhaseTimedOut() { + m.timedOut = true +} + func TestMarkForDelete_SeriesCleanup(t *testing.T) { now := model.Now() schema := allSchemas[2] @@ -658,8 +668,7 @@ func TestMarkForDelete_SeriesCleanup(t *testing.T) { seriesCleanRecorder := newSeriesCleanRecorder(table) cr := newChunkRewriter(store.chunkClient, table.name, table) - empty, isModified, err := markforDelete(context.Background(), table.name, noopWriter{}, seriesCleanRecorder, - expirationChecker, cr) + empty, isModified, err := markForDelete(context.Background(), 0, table.name, noopWriter{}, seriesCleanRecorder, expirationChecker, cr, util_log.Logger) require.NoError(t, err) require.Equal(t, tc.expectedEmpty[i], empty) require.Equal(t, tc.expectedModified[i], isModified) @@ -670,6 +679,47 @@ func TestMarkForDelete_SeriesCleanup(t *testing.T) { } } +func TestDeleteTimeout(t *testing.T) { + chunks := []chunk.Chunk{ + createChunk(t, "user", labels.Labels{labels.Label{Name: "foo", Value: "1"}}, model.Now(), model.Now().Add(270*time.Hour)), + createChunk(t, "user", labels.Labels{labels.Label{Name: "foo", Value: "2"}}, model.Now(), model.Now().Add(270*time.Hour)), + } + + for _, tc := range []struct { + timeout time.Duration + calls int + timedOut bool + }{ + {timeout: 2 * time.Millisecond, calls: 1, timedOut: true}, + {timeout: 0, calls: 2, timedOut: false}, + } { + store := newTestStore(t) + require.NoError(t, store.Put(context.TODO(), chunks)) + store.Stop() + + expirationChecker := newMockExpirationChecker(map[string]chunkExpiry{}) + expirationChecker.delay = 10 * time.Millisecond + + table := store.indexTables()[0] + empty, isModified, err := markForDelete( + context.Background(), + tc.timeout, + table.name, + noopWriter{}, + newSeriesCleanRecorder(table), + expirationChecker, + newChunkRewriter(store.chunkClient, table.name, table), + util_log.Logger, + ) + + require.NoError(t, err) + require.False(t, empty) + require.False(t, isModified) + require.Equal(t, tc.calls, expirationChecker.calls) + require.Equal(t, tc.timedOut, expirationChecker.timedOut) + } +} + func TestMarkForDelete_DropChunkFromIndex(t *testing.T) { schema := allSchemas[2] store := newTestStore(t) @@ -696,8 +746,8 @@ func TestMarkForDelete_DropChunkFromIndex(t *testing.T) { require.Len(t, tables, 8) for i, table := range tables { - empty, _, err := markforDelete(context.Background(), table.name, noopWriter{}, table, - NewExpirationChecker(fakeLimits{perTenant: map[string]retentionLimit{"1": {retentionPeriod: retentionPeriod}}}), nil) + empty, _, err := markForDelete(context.Background(), 0, table.name, noopWriter{}, table, + NewExpirationChecker(fakeLimits{perTenant: map[string]retentionLimit{"1": {retentionPeriod: retentionPeriod}}}), nil, util_log.Logger) require.NoError(t, err) if i == 7 { require.False(t, empty) diff --git a/pkg/storage/stores/indexshipper/compactor/retention/util_test.go b/pkg/storage/stores/indexshipper/compactor/retention/util_test.go index 9cf0a74d1e..060ac9aa9d 100644 --- a/pkg/storage/stores/indexshipper/compactor/retention/util_test.go +++ b/pkg/storage/stores/indexshipper/compactor/retention/util_test.go @@ -121,10 +121,11 @@ type table struct { chunks map[string][]chunk.Chunk } -func (t *table) ForEachChunk(callback ChunkEntryCallback) error { +func (t *table) ForEachChunk(ctx context.Context, callback ChunkEntryCallback) error { for userID, chks := range t.chunks { i := 0 - for _, chk := range chks { + for j := 0; j < len(chks) && ctx.Err() == nil; j++ { + chk := chks[j] deleteChunk, err := callback(entryFromChunk(chk)) if err != nil { return err @@ -139,7 +140,7 @@ func (t *table) ForEachChunk(callback ChunkEntryCallback) error { t.chunks[userID] = t.chunks[userID][:i] } - return nil + return ctx.Err() } func (t *table) IndexChunk(chunk chunk.Chunk) (bool, error) { diff --git a/pkg/storage/stores/indexshipper/compactor/testutil.go b/pkg/storage/stores/indexshipper/compactor/testutil.go index 487aecc2bf..4265fd94c8 100644 --- a/pkg/storage/stores/indexshipper/compactor/testutil.go +++ b/pkg/storage/stores/indexshipper/compactor/testutil.go @@ -160,7 +160,7 @@ func openCompactedIndex(path string) (*compactedIndex, error) { return &compactedIndex{indexFile: idxFile}, nil } -func (c compactedIndex) ForEachChunk(_ retention.ChunkEntryCallback) error { +func (c compactedIndex) ForEachChunk(_ context.Context, _ retention.ChunkEntryCallback) error { return nil } diff --git a/pkg/storage/stores/shipper/index/compactor/compacted_index.go b/pkg/storage/stores/shipper/index/compactor/compacted_index.go index 626a70eb6a..cb7765ec10 100644 --- a/pkg/storage/stores/shipper/index/compactor/compacted_index.go +++ b/pkg/storage/stores/shipper/index/compactor/compacted_index.go @@ -1,6 +1,7 @@ package compactor import ( + "context" "fmt" "os" "path/filepath" @@ -135,7 +136,7 @@ func (c *CompactedIndex) setupIndexProcessors() error { return nil } -func (c *CompactedIndex) ForEachChunk(callback retention.ChunkEntryCallback) error { +func (c *CompactedIndex) ForEachChunk(ctx context.Context, callback retention.ChunkEntryCallback) error { if err := c.setupIndexProcessors(); err != nil { return err } @@ -145,7 +146,7 @@ func (c *CompactedIndex) ForEachChunk(callback retention.ChunkEntryCallback) err return fmt.Errorf("required boltdb bucket not found") } - return ForEachChunk(bucket, c.periodConfig, callback) + return ForEachChunk(ctx, bucket, c.periodConfig, callback) } func (c *CompactedIndex) IndexChunk(chunk chunk.Chunk) (bool, error) { diff --git a/pkg/storage/stores/shipper/index/compactor/compacted_index_test.go b/pkg/storage/stores/shipper/index/compactor/compacted_index_test.go index 2f4e93b63c..1ddf908718 100644 --- a/pkg/storage/stores/shipper/index/compactor/compacted_index_test.go +++ b/pkg/storage/stores/shipper/index/compactor/compacted_index_test.go @@ -46,7 +46,7 @@ func TestCompactedIndex_IndexProcessor(t *testing.T) { // remove c1, c2 chunk and index c4 with same labels as c2 c4 := createChunk(t, "2", labels.Labels{labels.Label{Name: "foo", Value: "bar"}, labels.Label{Name: "fizz", Value: "buzz"}}, tt.from, tt.from.Add(30*time.Minute)) - err := compactedIndex.ForEachChunk(func(entry retention.ChunkEntry) (deleteChunk bool, err error) { + err := compactedIndex.ForEachChunk(context.Background(), func(entry retention.ChunkEntry) (deleteChunk bool, err error) { if entry.Labels.Get("fizz") == "buzz" { chunkIndexed, err := compactedIndex.IndexChunk(c4) require.NoError(t, err) @@ -97,7 +97,7 @@ func TestCompactedIndex_IndexProcessor(t *testing.T) { } chunkEntriesFound := []retention.ChunkEntry{} err = modifiedBoltDB.View(func(tx *bbolt.Tx) error { - return ForEachChunk(tx.Bucket(local.IndexBucketName), tt.config, func(entry retention.ChunkEntry) (deleteChunk bool, err error) { + return ForEachChunk(context.Background(), tx.Bucket(local.IndexBucketName), tt.config, func(entry retention.ChunkEntry) (deleteChunk bool, err error) { chunkEntriesFound = append(chunkEntriesFound, entry) return false, nil }) diff --git a/pkg/storage/stores/shipper/index/compactor/iterator.go b/pkg/storage/stores/shipper/index/compactor/iterator.go index 97f1807c6b..a09bf4b25f 100644 --- a/pkg/storage/stores/shipper/index/compactor/iterator.go +++ b/pkg/storage/stores/shipper/index/compactor/iterator.go @@ -1,6 +1,7 @@ package compactor import ( + "context" "fmt" "github.com/prometheus/common/model" @@ -21,7 +22,7 @@ var ( _ retention.SeriesCleaner = &seriesCleaner{} ) -func ForEachChunk(bucket *bbolt.Bucket, config config.PeriodConfig, callback retention.ChunkEntryCallback) error { +func ForEachChunk(ctx context.Context, bucket *bbolt.Bucket, config config.PeriodConfig, callback retention.ChunkEntryCallback) error { labelsMapper, err := newSeriesLabelsMapper(bucket, config) if err != nil { return err @@ -30,7 +31,7 @@ func ForEachChunk(bucket *bbolt.Bucket, config config.PeriodConfig, callback ret cursor := bucket.Cursor() var current retention.ChunkEntry - for key, _ := cursor.First(); key != nil; key, _ = cursor.Next() { + for key, _ := cursor.First(); key != nil && ctx.Err() == nil; key, _ = cursor.Next() { ref, ok, err := parseChunkRef(decodeKey(key)) if err != nil { return err @@ -53,7 +54,7 @@ func ForEachChunk(bucket *bbolt.Bucket, config config.PeriodConfig, callback ret } } - return nil + return ctx.Err() } type seriesCleaner struct { diff --git a/pkg/storage/stores/shipper/index/compactor/iterator_test.go b/pkg/storage/stores/shipper/index/compactor/iterator_test.go index 6a0d47c719..09d2fb904d 100644 --- a/pkg/storage/stores/shipper/index/compactor/iterator_test.go +++ b/pkg/storage/stores/shipper/index/compactor/iterator_test.go @@ -42,7 +42,7 @@ func Test_ChunkIterator(t *testing.T) { require.Len(t, tables, 1) var actual []retention.ChunkEntry err := tables[0].DB.Update(func(tx *bbolt.Tx) error { - return ForEachChunk(tx.Bucket(local.IndexBucketName), tt.config, func(entry retention.ChunkEntry) (deleteChunk bool, err error) { + return ForEachChunk(context.Background(), tx.Bucket(local.IndexBucketName), tt.config, func(entry retention.ChunkEntry) (deleteChunk bool, err error) { actual = append(actual, entry) return len(actual) == 2, nil }) @@ -56,7 +56,7 @@ func Test_ChunkIterator(t *testing.T) { // second pass we delete c2 actual = actual[:0] err = tables[0].DB.Update(func(tx *bbolt.Tx) error { - return ForEachChunk(tx.Bucket(local.IndexBucketName), tt.config, func(entry retention.ChunkEntry) (deleteChunk bool, err error) { + return ForEachChunk(context.Background(), tx.Bucket(local.IndexBucketName), tt.config, func(entry retention.ChunkEntry) (deleteChunk bool, err error) { actual = append(actual, entry) return false, nil }) @@ -69,6 +69,37 @@ func Test_ChunkIterator(t *testing.T) { } } +func Test_ChunkIteratorContextCancelation(t *testing.T) { + cm := storage.NewClientMetrics() + defer cm.Unregister() + store := newTestStore(t, cm) + + from := schemaCfg.Configs[0].From.Time + c1 := createChunk(t, "1", labels.Labels{labels.Label{Name: "foo", Value: "bar"}}, from, from.Add(1*time.Hour)) + c2 := createChunk(t, "2", labels.Labels{labels.Label{Name: "foo", Value: "buzz"}, labels.Label{Name: "bar", Value: "foo"}}, from, from.Add(1*time.Hour)) + + require.NoError(t, store.Put(context.TODO(), []chunk.Chunk{c1, c2})) + store.Stop() + + tables := store.indexTables() + require.Len(t, tables, 1) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var actual []retention.ChunkEntry + err := tables[0].DB.Update(func(tx *bbolt.Tx) error { + return ForEachChunk(ctx, tx.Bucket(local.IndexBucketName), schemaCfg.Configs[0], func(entry retention.ChunkEntry) (deleteChunk bool, err error) { + actual = append(actual, entry) + cancel() + return len(actual) == 2, nil + }) + }) + + require.ErrorIs(t, err, context.Canceled) + require.Len(t, actual, 1) +} + func Test_SeriesCleaner(t *testing.T) { for _, tt := range allSchemas { tt := tt @@ -91,7 +122,7 @@ func Test_SeriesCleaner(t *testing.T) { require.Len(t, tables, 1) // remove c1, c2 chunk err := tables[0].DB.Update(func(tx *bbolt.Tx) error { - return ForEachChunk(tx.Bucket(local.IndexBucketName), tt.config, func(entry retention.ChunkEntry) (deleteChunk bool, err error) { + return ForEachChunk(context.Background(), tx.Bucket(local.IndexBucketName), tt.config, func(entry retention.ChunkEntry) (deleteChunk bool, err error) { return entry.Labels.Get("bar") == "foo", nil }) }) @@ -213,7 +244,7 @@ func Benchmark_ChunkIterator(b *testing.B) { _ = store.indexTables()[0].Update(func(tx *bbolt.Tx) error { bucket := tx.Bucket(local.IndexBucketName) for n := 0; n < b.N; n++ { - err := ForEachChunk(bucket, allSchemas[0].config, func(entry retention.ChunkEntry) (deleteChunk bool, err error) { + err := ForEachChunk(context.Background(), bucket, allSchemas[0].config, func(entry retention.ChunkEntry) (deleteChunk bool, err error) { chunkEntry = entry total++ return true, nil diff --git a/pkg/storage/stores/tsdb/compactor.go b/pkg/storage/stores/tsdb/compactor.go index c67ba9a04b..c1cccd4895 100644 --- a/pkg/storage/stores/tsdb/compactor.go +++ b/pkg/storage/stores/tsdb/compactor.go @@ -269,7 +269,7 @@ func newCompactedIndex(ctx context.Context, tableName, userID, workingDir string } // ForEachChunk iterates over all the chunks in the builder and calls the callback function. -func (c *compactedIndex) ForEachChunk(callback retention.ChunkEntryCallback) error { +func (c *compactedIndex) ForEachChunk(ctx context.Context, callback retention.ChunkEntryCallback) error { schemaCfg := config.SchemaConfig{ Configs: []config.PeriodConfig{c.periodConfig}, } @@ -287,7 +287,8 @@ func (c *compactedIndex) ForEachChunk(callback retention.ChunkEntryCallback) err chunkEntry.SeriesID = getUnsafeBytes(seriesID) chunkEntry.Labels = withoutTenantLabel(stream.labels) - for _, chk := range stream.chunks { + for i := 0; i < len(stream.chunks) && ctx.Err() == nil; i++ { + chk := stream.chunks[i] logprotoChunkRef.From = chk.From() logprotoChunkRef.Through = chk.Through() logprotoChunkRef.Checksum = chk.Checksum @@ -308,7 +309,7 @@ func (c *compactedIndex) ForEachChunk(callback retention.ChunkEntryCallback) err } } - return nil + return ctx.Err() } // IndexChunk adds the chunk to the list of chunks to index. diff --git a/pkg/storage/stores/tsdb/compactor_test.go b/pkg/storage/stores/tsdb/compactor_test.go index a6a9334f29..8593e202f5 100644 --- a/pkg/storage/stores/tsdb/compactor_test.go +++ b/pkg/storage/stores/tsdb/compactor_test.go @@ -639,45 +639,7 @@ func chunkMetaToChunkRef(userID string, chunkMeta index.ChunkMeta, lbls labels.L } func TestCompactedIndex(t *testing.T) { - now := model.Now() - periodConfig := config.PeriodConfig{ - IndexTables: config.PeriodicTableConfig{Period: config.ObjectStorageIndexRequiredPeriod}, - Schema: "v12", - } - schemaCfg := config.SchemaConfig{ - Configs: []config.PeriodConfig{periodConfig}, - } - indexBuckets, err := indexBuckets(now, now, []config.TableRange{periodConfig.GetIndexTableNumberRange(config.DayTime{Time: now})}) - require.NoError(t, err) - tableName := indexBuckets[0] - tableInterval := retention.ExtractIntervalFromTableName(tableName) - // shiftTableStart shift tableInterval.Start by the given amount of milliseconds. - // It is used for building chunkmetas relative to start time of the table. - shiftTableStart := func(ms int64) int64 { - return int64(tableInterval.Start) + ms - } - - lbls1 := mustParseLabels(`{foo="bar", a="b"}`) - lbls2 := mustParseLabels(`{fizz="buzz", a="b"}`) - userID := buildUserID(0) - - buildCompactedIndex := func() *compactedIndex { - builder := NewBuilder() - stream := buildStream(lbls1, buildChunkMetas(shiftTableStart(0), shiftTableStart(10)), "") - builder.AddSeries(stream.labels, stream.fp, stream.chunks) - - stream = buildStream(lbls2, buildChunkMetas(shiftTableStart(0), shiftTableStart(20)), "") - builder.AddSeries(stream.labels, stream.fp, stream.chunks) - - builder.FinalizeChunks() - - return newCompactedIndex(context.Background(), tableName, buildUserID(0), t.TempDir(), periodConfig, builder) - } - - expectedChunkEntries := map[string][]retention.ChunkEntry{ - lbls1.String(): chunkMetasToChunkEntry(schemaCfg, userID, lbls1, buildChunkMetas(shiftTableStart(0), shiftTableStart(10))), - lbls2.String(): chunkMetasToChunkEntry(schemaCfg, userID, lbls2, buildChunkMetas(shiftTableStart(0), shiftTableStart(20))), - } + testCtx := setupCompactedIndex(t) for name, tc := range map[string]struct { deleteChunks map[string]index.ChunkMetas @@ -689,101 +651,101 @@ func TestCompactedIndex(t *testing.T) { }{ "no changes": { finalExpectedChunks: map[string]index.ChunkMetas{ - lbls1.String(): buildChunkMetas(shiftTableStart(0), shiftTableStart(10)), - lbls2.String(): buildChunkMetas(shiftTableStart(0), shiftTableStart(20)), + testCtx.lbls1.String(): buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(10)), + testCtx.lbls2.String(): buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(20)), }, }, "delete some chunks from a stream": { deleteChunks: map[string]index.ChunkMetas{ - lbls1.String(): append(buildChunkMetas(shiftTableStart(3), shiftTableStart(5)), buildChunkMetas(shiftTableStart(7), shiftTableStart(8))...), + testCtx.lbls1.String(): append(buildChunkMetas(testCtx.shiftTableStart(3), testCtx.shiftTableStart(5)), buildChunkMetas(testCtx.shiftTableStart(7), testCtx.shiftTableStart(8))...), }, finalExpectedChunks: map[string]index.ChunkMetas{ - lbls1.String(): append(buildChunkMetas(shiftTableStart(0), shiftTableStart(2)), append(buildChunkMetas(shiftTableStart(6), shiftTableStart(6)), buildChunkMetas(shiftTableStart(9), shiftTableStart(10))...)...), - lbls2.String(): buildChunkMetas(shiftTableStart(0), shiftTableStart(20)), + testCtx.lbls1.String(): append(buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(2)), append(buildChunkMetas(testCtx.shiftTableStart(6), testCtx.shiftTableStart(6)), buildChunkMetas(testCtx.shiftTableStart(9), testCtx.shiftTableStart(10))...)...), + testCtx.lbls2.String(): buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(20)), }, }, "delete all chunks from a stream": { deleteChunks: map[string]index.ChunkMetas{ - lbls1.String(): buildChunkMetas(shiftTableStart(0), shiftTableStart(10)), + testCtx.lbls1.String(): buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(10)), }, - deleteSeries: []labels.Labels{lbls1}, + deleteSeries: []labels.Labels{testCtx.lbls1}, finalExpectedChunks: map[string]index.ChunkMetas{ - lbls2.String(): buildChunkMetas(shiftTableStart(0), shiftTableStart(20)), + testCtx.lbls2.String(): buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(20)), }, }, "add some chunks to a stream": { addChunks: []chunk.Chunk{ { - Metric: lbls1, - ChunkRef: chunkMetaToChunkRef(userID, buildChunkMetas(shiftTableStart(11), shiftTableStart(11))[0], lbls1), + Metric: testCtx.lbls1, + ChunkRef: chunkMetaToChunkRef(testCtx.userID, buildChunkMetas(testCtx.shiftTableStart(11), testCtx.shiftTableStart(11))[0], testCtx.lbls1), Data: dummyChunkData{}, }, { - Metric: lbls1, - ChunkRef: chunkMetaToChunkRef(userID, buildChunkMetas(shiftTableStart(12), shiftTableStart(12))[0], lbls1), + Metric: testCtx.lbls1, + ChunkRef: chunkMetaToChunkRef(testCtx.userID, buildChunkMetas(testCtx.shiftTableStart(12), testCtx.shiftTableStart(12))[0], testCtx.lbls1), Data: dummyChunkData{}, }, }, finalExpectedChunks: map[string]index.ChunkMetas{ - lbls1.String(): buildChunkMetas(shiftTableStart(0), shiftTableStart(12)), - lbls2.String(): buildChunkMetas(shiftTableStart(0), shiftTableStart(20)), + testCtx.lbls1.String(): buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(12)), + testCtx.lbls2.String(): buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(20)), }, }, "add some chunks out of table interval to a stream": { addChunks: []chunk.Chunk{ { - Metric: lbls1, - ChunkRef: chunkMetaToChunkRef(userID, buildChunkMetas(shiftTableStart(11), shiftTableStart(11))[0], lbls1), + Metric: testCtx.lbls1, + ChunkRef: chunkMetaToChunkRef(testCtx.userID, buildChunkMetas(testCtx.shiftTableStart(11), testCtx.shiftTableStart(11))[0], testCtx.lbls1), Data: dummyChunkData{}, }, { - Metric: lbls1, - ChunkRef: chunkMetaToChunkRef(userID, buildChunkMetas(shiftTableStart(12), shiftTableStart(12))[0], lbls1), + Metric: testCtx.lbls1, + ChunkRef: chunkMetaToChunkRef(testCtx.userID, buildChunkMetas(testCtx.shiftTableStart(12), testCtx.shiftTableStart(12))[0], testCtx.lbls1), Data: dummyChunkData{}, }, // these chunks should not be added { - Metric: lbls1, - ChunkRef: chunkMetaToChunkRef(userID, buildChunkMetas(int64(tableInterval.End+100), int64(tableInterval.End+100))[0], lbls1), + Metric: testCtx.lbls1, + ChunkRef: chunkMetaToChunkRef(testCtx.userID, buildChunkMetas(int64(testCtx.tableInterval.End+100), int64(testCtx.tableInterval.End+100))[0], testCtx.lbls1), Data: dummyChunkData{}, }, { - Metric: lbls1, - ChunkRef: chunkMetaToChunkRef(userID, buildChunkMetas(int64(tableInterval.End+200), int64(tableInterval.End+200))[0], lbls1), + Metric: testCtx.lbls1, + ChunkRef: chunkMetaToChunkRef(testCtx.userID, buildChunkMetas(int64(testCtx.tableInterval.End+200), int64(testCtx.tableInterval.End+200))[0], testCtx.lbls1), Data: dummyChunkData{}, }, }, finalExpectedChunks: map[string]index.ChunkMetas{ - lbls1.String(): buildChunkMetas(shiftTableStart(0), shiftTableStart(12)), - lbls2.String(): buildChunkMetas(shiftTableStart(0), shiftTableStart(20)), + testCtx.lbls1.String(): buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(12)), + testCtx.lbls2.String(): buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(20)), }, }, "add and delete some chunks in a stream": { addChunks: []chunk.Chunk{ { - Metric: lbls1, - ChunkRef: chunkMetaToChunkRef(userID, buildChunkMetas(shiftTableStart(11), shiftTableStart(11))[0], lbls1), + Metric: testCtx.lbls1, + ChunkRef: chunkMetaToChunkRef(testCtx.userID, buildChunkMetas(testCtx.shiftTableStart(11), testCtx.shiftTableStart(11))[0], testCtx.lbls1), Data: dummyChunkData{}, }, { - Metric: lbls1, - ChunkRef: chunkMetaToChunkRef(userID, buildChunkMetas(shiftTableStart(12), shiftTableStart(12))[0], lbls1), + Metric: testCtx.lbls1, + ChunkRef: chunkMetaToChunkRef(testCtx.userID, buildChunkMetas(testCtx.shiftTableStart(12), testCtx.shiftTableStart(12))[0], testCtx.lbls1), Data: dummyChunkData{}, }, }, deleteChunks: map[string]index.ChunkMetas{ - lbls1.String(): buildChunkMetas(shiftTableStart(3), shiftTableStart(5)), + testCtx.lbls1.String(): buildChunkMetas(testCtx.shiftTableStart(3), testCtx.shiftTableStart(5)), }, finalExpectedChunks: map[string]index.ChunkMetas{ - lbls1.String(): append(buildChunkMetas(shiftTableStart(0), shiftTableStart(2)), buildChunkMetas(shiftTableStart(6), shiftTableStart(12))...), - lbls2.String(): buildChunkMetas(shiftTableStart(0), shiftTableStart(20)), + testCtx.lbls1.String(): append(buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(2)), buildChunkMetas(testCtx.shiftTableStart(6), testCtx.shiftTableStart(12))...), + testCtx.lbls2.String(): buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(20)), }, }, "adding chunk to non-existing stream should error": { addChunks: []chunk.Chunk{ { - Metric: labels.NewBuilder(lbls1).Set("new", "label").Labels(), - ChunkRef: chunkMetaToChunkRef(userID, buildChunkMetas(shiftTableStart(11), shiftTableStart(11))[0], lbls1), + Metric: labels.NewBuilder(testCtx.lbls1).Set("new", "label").Labels(), + ChunkRef: chunkMetaToChunkRef(testCtx.userID, buildChunkMetas(testCtx.shiftTableStart(11), testCtx.shiftTableStart(11))[0], testCtx.lbls1), Data: dummyChunkData{}, }, }, @@ -791,10 +753,10 @@ func TestCompactedIndex(t *testing.T) { }, } { t.Run(name, func(t *testing.T) { - compactedIndex := buildCompactedIndex() + compactedIndex := testCtx.buildCompactedIndex() foundChunkEntries := map[string][]retention.ChunkEntry{} - err := compactedIndex.ForEachChunk(func(chunkEntry retention.ChunkEntry) (deleteChunk bool, err error) { + err := compactedIndex.ForEachChunk(context.Background(), func(chunkEntry retention.ChunkEntry) (deleteChunk bool, err error) { seriesIDStr := string(chunkEntry.SeriesID) foundChunkEntries[seriesIDStr] = append(foundChunkEntries[seriesIDStr], chunkEntry) if chks, ok := tc.deleteChunks[string(chunkEntry.SeriesID)]; ok { @@ -809,7 +771,7 @@ func TestCompactedIndex(t *testing.T) { }) require.NoError(t, err) - require.Equal(t, expectedChunkEntries, foundChunkEntries) + require.Equal(t, testCtx.expectedChunkEntries, foundChunkEntries) for _, lbls := range tc.deleteSeries { require.NoError(t, compactedIndex.CleanupSeries(nil, lbls)) @@ -839,6 +801,79 @@ func TestCompactedIndex(t *testing.T) { } +func TestIteratorContextCancelation(t *testing.T) { + tc := setupCompactedIndex(t) + compactedIndex := tc.buildCompactedIndex() + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + var foundChunkEntries []retention.ChunkEntry + err := compactedIndex.ForEachChunk(ctx, func(chunkEntry retention.ChunkEntry) (deleteChunk bool, err error) { + foundChunkEntries = append(foundChunkEntries, chunkEntry) + + return false, nil + }) + + require.ErrorIs(t, err, context.Canceled) +} + +type testContext struct { + lbls1 labels.Labels + lbls2 labels.Labels + userID string + tableInterval model.Interval + shiftTableStart func(ms int64) int64 + buildCompactedIndex func() *compactedIndex + expectedChunkEntries map[string][]retention.ChunkEntry +} + +func setupCompactedIndex(t *testing.T) *testContext { + t.Helper() + + now := model.Now() + periodConfig := config.PeriodConfig{ + IndexTables: config.PeriodicTableConfig{Period: config.ObjectStorageIndexRequiredPeriod}, + Schema: "v12", + } + schemaCfg := config.SchemaConfig{ + Configs: []config.PeriodConfig{periodConfig}, + } + indexBuckets, err := indexBuckets(now, now, []config.TableRange{periodConfig.GetIndexTableNumberRange(config.DayTime{Time: now})}) + require.NoError(t, err) + tableName := indexBuckets[0] + tableInterval := retention.ExtractIntervalFromTableName(tableName) + // shiftTableStart shift tableInterval.Start by the given amount of milliseconds. + // It is used for building chunkmetas relative to start time of the table. + shiftTableStart := func(ms int64) int64 { + return int64(tableInterval.Start) + ms + } + + lbls1 := mustParseLabels(`{foo="bar", a="b"}`) + lbls2 := mustParseLabels(`{fizz="buzz", a="b"}`) + userID := buildUserID(0) + + buildCompactedIndex := func() *compactedIndex { + builder := NewBuilder() + stream := buildStream(lbls1, buildChunkMetas(shiftTableStart(0), shiftTableStart(10)), "") + builder.AddSeries(stream.labels, stream.fp, stream.chunks) + + stream = buildStream(lbls2, buildChunkMetas(shiftTableStart(0), shiftTableStart(20)), "") + builder.AddSeries(stream.labels, stream.fp, stream.chunks) + + builder.FinalizeChunks() + + return newCompactedIndex(context.Background(), tableName, buildUserID(0), t.TempDir(), periodConfig, builder) + } + + expectedChunkEntries := map[string][]retention.ChunkEntry{ + lbls1.String(): chunkMetasToChunkEntry(schemaCfg, userID, lbls1, buildChunkMetas(shiftTableStart(0), shiftTableStart(10))), + lbls2.String(): chunkMetasToChunkEntry(schemaCfg, userID, lbls2, buildChunkMetas(shiftTableStart(0), shiftTableStart(20))), + } + + return &testContext{lbls1, lbls2, userID, tableInterval, shiftTableStart, buildCompactedIndex, expectedChunkEntries} +} + type dummyChunkData struct { chunk.Data } diff --git a/tools/tsdb/tsdb-map/main.go b/tools/tsdb/tsdb-map/main.go index bb5aabe670..fdb872f89a 100644 --- a/tools/tsdb/tsdb-map/main.go +++ b/tools/tsdb/tsdb-map/main.go @@ -73,7 +73,7 @@ func main() { // loads everything into memory. if err := db.View(func(t *bbolt.Tx) error { - return compactor.ForEachChunk(t.Bucket([]byte("index")), periodConfig, func(entry retention.ChunkEntry) (bool, error) { + return compactor.ForEachChunk(context.Background(), t.Bucket([]byte("index")), periodConfig, func(entry retention.ChunkEntry) (bool, error) { builder.AddSeries(entry.Labels, model.Fingerprint(entry.Labels.Hash()), []index.ChunkMeta{{ Checksum: extractChecksumFromChunkID(entry.ChunkID), MinTime: int64(entry.From),