Add timeouts to deletions (#6879)

* Add context to chunk iteration

* Configure timeouts

* add expiration timeout

* update test

* update test

* Clarify timeout implications, log more specifically
pull/6882/head
Travis Patterson 3 years ago committed by GitHub
parent afedabf0e9
commit 3170edf3e3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 13
      docs/sources/configuration/_index.md
  2. 9
      pkg/storage/stores/indexshipper/compactor/compactor.go
  3. 9
      pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_manager.go
  4. 6
      pkg/storage/stores/indexshipper/compactor/deletion/metrics.go
  5. 4
      pkg/storage/stores/indexshipper/compactor/index_set.go
  6. 3
      pkg/storage/stores/indexshipper/compactor/retention/expiration.go
  7. 46
      pkg/storage/stores/indexshipper/compactor/retention/retention.go
  8. 72
      pkg/storage/stores/indexshipper/compactor/retention/retention_test.go
  9. 7
      pkg/storage/stores/indexshipper/compactor/retention/util_test.go
  10. 2
      pkg/storage/stores/indexshipper/compactor/testutil.go
  11. 5
      pkg/storage/stores/shipper/index/compactor/compacted_index.go
  12. 4
      pkg/storage/stores/shipper/index/compactor/compacted_index_test.go
  13. 7
      pkg/storage/stores/shipper/index/compactor/iterator.go
  14. 39
      pkg/storage/stores/shipper/index/compactor/iterator_test.go
  15. 7
      pkg/storage/stores/tsdb/compactor.go
  16. 185
      pkg/storage/stores/tsdb/compactor_test.go
  17. 2
      tools/tsdb/tsdb-map/main.go

@ -2105,6 +2105,19 @@ compacts index shards to more performant forms.
# CLI flag: -boltdb.shipper.compactor.delete-request-cancel-period
[delete_request_cancel_period: <duration> | default = 24h]
# The max number of delete requests to run per compaction cycle.
# CLI flag: -boltdb.shipper.compactor.delete-batch-size
[delete_batch_size: <duration> | default = 70]
# The maximum amount of time to spend running retention and deletion
# on any given table in the index. 0 is no timeout
#
# NOTE: This timeout prioritizes runtime over completeness of retention/deletion.
# It may take several compaction runs to fully perform retention and process
# all outstanding delete requests
# CLI flag: -boltdb.shipper.compactor.retention-table-timeout
[retention_table_timeout: <duration> | default = 0]
# Maximum number of tables to compact in parallel.
# While increasing this value, please make sure compactor has enough disk space
# allocated to be able to store and compact as many tables.

@ -78,6 +78,7 @@ type Config struct {
RetentionEnabled bool `yaml:"retention_enabled"`
RetentionDeleteDelay time.Duration `yaml:"retention_delete_delay"`
RetentionDeleteWorkCount int `yaml:"retention_delete_worker_count"`
RetentionTableTimeout time.Duration `yaml:"retention_table_timeout"`
DeleteBatchSize int `yaml:"delete_batch_size"`
DeleteRequestCancelPeriod time.Duration `yaml:"delete_request_cancel_period"`
MaxCompactionParallelism int `yaml:"max_compaction_parallelism"`
@ -97,6 +98,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.RetentionDeleteWorkCount, "boltdb.shipper.compactor.retention-delete-worker-count", 150, "The total amount of worker to use to delete chunks.")
f.IntVar(&cfg.DeleteBatchSize, "boltdb.shipper.compactor.delete-batch-size", 70, "The max number of delete requests to run per compaction cycle.")
f.DurationVar(&cfg.DeleteRequestCancelPeriod, "boltdb.shipper.compactor.delete-request-cancel-period", 24*time.Hour, "Allow cancellation of delete request until duration after they are created. Data would be deleted only after delete requests have been older than this duration. Ideally this should be set to at least 24h.")
f.DurationVar(&cfg.RetentionTableTimeout, "boltdb.shipper.compactor.retention-table-timeout", 0, "The maximum amount of time to spend running retention and deletion on any given table in the index.")
f.IntVar(&cfg.MaxCompactionParallelism, "boltdb.shipper.compactor.max-compaction-parallelism", 1, "Maximum number of tables to compact in parallel. While increasing this value, please make sure compactor has enough disk space allocated to be able to store and compact as many tables.")
f.BoolVar(&cfg.RunOnce, "boltdb.shipper.compactor.run-once", false, "Run the compactor one time to cleanup and compact index files only (no retention applied)")
cfg.CompactorRing.RegisterFlagsWithPrefix("boltdb.shipper.compactor.", "collectors/", f)
@ -233,7 +235,7 @@ func (c *Compactor) init(objectClient client.ObjectClient, schemaConfig config.S
return err
}
c.tableMarker, err = retention.NewMarker(retentionWorkDir, c.expirationChecker, chunkClient, r)
c.tableMarker, err = retention.NewMarker(retentionWorkDir, c.expirationChecker, c.cfg.RetentionTableTimeout, chunkClient, r)
if err != nil {
return err
}
@ -635,6 +637,11 @@ func (e *expirationChecker) MarkPhaseFinished() {
e.deletionExpiryChecker.MarkPhaseFinished()
}
func (e *expirationChecker) MarkPhaseTimedOut() {
e.retentionExpiryChecker.MarkPhaseTimedOut()
e.deletionExpiryChecker.MarkPhaseTimedOut()
}
func (e *expirationChecker) IntervalMayHaveExpiredChunks(interval model.Interval, userID string) bool {
return e.retentionExpiryChecker.IntervalMayHaveExpiredChunks(interval, userID) || e.deletionExpiryChecker.IntervalMayHaveExpiredChunks(interval, userID)
}

@ -289,6 +289,15 @@ func (d *DeleteRequestsManager) MarkPhaseFailed() {
d.deleteRequestsToProcessMtx.Lock()
defer d.deleteRequestsToProcessMtx.Unlock()
d.metrics.deletionFailures.WithLabelValues("error").Inc()
d.deleteRequestsToProcess = map[string]*userDeleteRequests{}
}
func (d *DeleteRequestsManager) MarkPhaseTimedOut() {
d.deleteRequestsToProcessMtx.Lock()
defer d.deleteRequestsToProcessMtx.Unlock()
d.metrics.deletionFailures.WithLabelValues("timeout").Inc()
d.deleteRequestsToProcess = map[string]*userDeleteRequests{}
}

@ -48,6 +48,7 @@ type deleteRequestsManagerMetrics struct {
deleteRequestsProcessedTotal *prometheus.CounterVec
deleteRequestsChunksSelectedTotal *prometheus.CounterVec
loadPendingRequestsAttemptsTotal *prometheus.CounterVec
deletionFailures *prometheus.CounterVec
oldestPendingDeleteRequestAgeSeconds prometheus.Gauge
pendingDeleteRequestsCount prometheus.Gauge
deletedLinesTotal *prometheus.CounterVec
@ -66,6 +67,11 @@ func newDeleteRequestsManagerMetrics(r prometheus.Registerer) *deleteRequestsMan
Name: "compactor_delete_requests_chunks_selected_total",
Help: "Number of chunks selected while building delete plans per user",
}, []string{"user"})
m.deletionFailures = promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "compactor_delete_processing_fails_total",
Help: "Number times the delete phase of compaction has failed",
}, []string{"cause"})
m.loadPendingRequestsAttemptsTotal = promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "compactor_load_pending_requests_attempts_total",

@ -97,6 +97,10 @@ func newIndexSet(ctx context.Context, tableName, userID string, baseIndexSet sto
logger: logger,
}
if userID != "" {
ui.logger = log.With(logger, "user-id", userID)
}
var err error
ui.sourceObjects, err = ui.baseIndexSet.ListFiles(ui.ctx, ui.tableName, ui.userID, false)
if err != nil {

@ -26,6 +26,7 @@ type ExpirationChecker interface {
IntervalMayHaveExpiredChunks(interval model.Interval, userID string) bool
MarkPhaseStarted()
MarkPhaseFailed()
MarkPhaseTimedOut()
MarkPhaseFinished()
DropFromIndex(ref ChunkEntry, tableEndTime model.Time, now model.Time) bool
}
@ -71,6 +72,7 @@ func (e *expirationChecker) MarkPhaseStarted() {
}
func (e *expirationChecker) MarkPhaseFailed() {}
func (e *expirationChecker) MarkPhaseTimedOut() {}
func (e *expirationChecker) MarkPhaseFinished() {}
func (e *expirationChecker) IntervalMayHaveExpiredChunks(interval model.Interval, userID string) bool {
@ -105,6 +107,7 @@ func (e *neverExpiringExpirationChecker) IntervalMayHaveExpiredChunks(interval m
}
func (e *neverExpiringExpirationChecker) MarkPhaseStarted() {}
func (e *neverExpiringExpirationChecker) MarkPhaseFailed() {}
func (e *neverExpiringExpirationChecker) MarkPhaseTimedOut() {}
func (e *neverExpiringExpirationChecker) MarkPhaseFinished() {}
func (e *neverExpiringExpirationChecker) DropFromIndex(ref ChunkEntry, tableEndTime model.Time, now model.Time) bool {
return false

@ -45,7 +45,7 @@ type ChunkEntry struct {
type ChunkEntryCallback func(ChunkEntry) (deleteChunk bool, err error)
type ChunkIterator interface {
ForEachChunk(callback ChunkEntryCallback) error
ForEachChunk(ctx context.Context, callback ChunkEntryCallback) error
}
type SeriesCleaner interface {
@ -82,15 +82,17 @@ type Marker struct {
expiration ExpirationChecker
markerMetrics *markerMetrics
chunkClient client.Client
markTimeout time.Duration
}
func NewMarker(workingDirectory string, expiration ExpirationChecker, chunkClient client.Client, r prometheus.Registerer) (*Marker, error) {
func NewMarker(workingDirectory string, expiration ExpirationChecker, markTimeout time.Duration, chunkClient client.Client, r prometheus.Registerer) (*Marker, error) {
metrics := newMarkerMetrics(r)
return &Marker{
workingDirectory: workingDirectory,
expiration: expiration,
markerMetrics: metrics,
chunkClient: chunkClient,
markTimeout: markTimeout,
}, nil
}
@ -104,7 +106,7 @@ func (t *Marker) MarkForDelete(ctx context.Context, tableName, userID string, in
}()
level.Debug(logger).Log("msg", "starting to process table")
empty, modified, err := t.markTable(ctx, tableName, userID, indexProcessor)
empty, modified, err := t.markTable(ctx, tableName, userID, indexProcessor, logger)
if err != nil {
status = statusFailure
return false, false, err
@ -112,7 +114,7 @@ func (t *Marker) MarkForDelete(ctx context.Context, tableName, userID string, in
return empty, modified, nil
}
func (t *Marker) markTable(ctx context.Context, tableName, userID string, indexProcessor IndexProcessor) (bool, bool, error) {
func (t *Marker) markTable(ctx context.Context, tableName, userID string, indexProcessor IndexProcessor, logger log.Logger) (bool, bool, error) {
markerWriter, err := NewMarkerStorageWriter(t.workingDirectory)
if err != nil {
return false, false, fmt.Errorf("failed to create marker writer: %w", err)
@ -124,7 +126,7 @@ func (t *Marker) markTable(ctx context.Context, tableName, userID string, indexP
chunkRewriter := newChunkRewriter(t.chunkClient, tableName, indexProcessor)
empty, modified, err := markforDelete(ctx, tableName, markerWriter, indexProcessor, t.expiration, chunkRewriter)
empty, modified, err := markForDelete(ctx, t.markTimeout, tableName, markerWriter, indexProcessor, t.expiration, chunkRewriter, logger)
if err != nil {
return false, false, err
}
@ -146,8 +148,16 @@ func (t *Marker) markTable(ctx context.Context, tableName, userID string, indexP
return empty, modified, nil
}
func markforDelete(ctx context.Context, tableName string, marker MarkerStorageWriter, indexFile IndexProcessor,
expiration ExpirationChecker, chunkRewriter *chunkRewriter) (bool, bool, error) {
func markForDelete(
ctx context.Context,
timeout time.Duration,
tableName string,
marker MarkerStorageWriter,
indexFile IndexProcessor,
expiration ExpirationChecker,
chunkRewriter *chunkRewriter,
logger log.Logger,
) (bool, bool, error) {
seriesMap := newUserSeriesMap()
// tableInterval holds the interval for which the table is expected to have the chunks indexed
tableInterval := ExtractIntervalFromTableName(tableName)
@ -156,7 +166,12 @@ func markforDelete(ctx context.Context, tableName string, marker MarkerStorageWr
now := model.Now()
chunksFound := false
err := indexFile.ForEachChunk(func(c ChunkEntry) (bool, error) {
// This is a fresh context so we know when deletes timeout vs something going
// wrong with the other context
iterCtx, cancel := ctxForTimeout(timeout)
defer cancel()
err := indexFile.ForEachChunk(iterCtx, func(c ChunkEntry) (bool, error) {
chunksFound = true
seriesMap.Add(c.SeriesID, c.UserID, c.Labels)
@ -204,7 +219,13 @@ func markforDelete(ctx context.Context, tableName string, marker MarkerStorageWr
return false, nil
})
if err != nil {
return false, false, err
if errors.Is(err, context.DeadlineExceeded) && errors.Is(iterCtx.Err(), context.DeadlineExceeded) {
// Deletes timed out. Don't return an error so compaction can continue and deletes can be retried
level.Warn(logger).Log("msg", "Timed out while running delete")
expiration.MarkPhaseTimedOut()
} else {
return false, false, err
}
}
if !chunksFound {
@ -226,6 +247,13 @@ func markforDelete(ctx context.Context, tableName string, marker MarkerStorageWr
})
}
func ctxForTimeout(t time.Duration) (context.Context, context.CancelFunc) {
if t == 0 {
return context.Background(), func() {}
}
return context.WithTimeout(context.Background(), t)
}
type ChunkClient interface {
DeleteChunk(ctx context.Context, userID, chunkID string) error
IsChunkNotFoundErr(err error) bool

@ -145,7 +145,7 @@ func Test_Retention(t *testing.T) {
sweep.Start()
defer sweep.Stop()
marker, err := NewMarker(workDir, expiration, nil, prometheus.NewRegistry())
marker, err := NewMarker(workDir, expiration, time.Hour, nil, prometheus.NewRegistry())
require.NoError(t, err)
for _, table := range store.indexTables() {
_, _, err := marker.MarkForDelete(context.Background(), table.name, "", table, util_log.Logger)
@ -192,11 +192,11 @@ func Test_EmptyTable(t *testing.T) {
tables := store.indexTables()
require.Len(t, tables, 1)
empty, _, err := markforDelete(context.Background(), tables[0].name, noopWriter{}, tables[0], NewExpirationChecker(&fakeLimits{perTenant: map[string]retentionLimit{"1": {retentionPeriod: 0}, "2": {retentionPeriod: 0}}}), nil)
empty, _, err := markForDelete(context.Background(), 0, tables[0].name, noopWriter{}, tables[0], NewExpirationChecker(&fakeLimits{perTenant: map[string]retentionLimit{"1": {retentionPeriod: 0}, "2": {retentionPeriod: 0}}}), nil, util_log.Logger)
require.NoError(t, err)
require.True(t, empty)
_, _, err = markforDelete(context.Background(), tables[0].name, noopWriter{}, newTable("test"), NewExpirationChecker(&fakeLimits{}), nil)
_, _, err = markForDelete(context.Background(), 0, tables[0].name, noopWriter{}, newTable("test"), NewExpirationChecker(&fakeLimits{}), nil, util_log.Logger)
require.Equal(t, err, errNoChunksFound)
}
@ -436,21 +436,31 @@ type chunkExpiry struct {
type mockExpirationChecker struct {
ExpirationChecker
chunksExpiry map[string]chunkExpiry
delay time.Duration
calls int
timedOut bool
}
func newMockExpirationChecker(chunksExpiry map[string]chunkExpiry) mockExpirationChecker {
return mockExpirationChecker{chunksExpiry: chunksExpiry}
func newMockExpirationChecker(chunksExpiry map[string]chunkExpiry) *mockExpirationChecker {
return &mockExpirationChecker{chunksExpiry: chunksExpiry}
}
func (m mockExpirationChecker) Expired(ref ChunkEntry, now model.Time) (bool, []IntervalFilter) {
func (m *mockExpirationChecker) Expired(ref ChunkEntry, now model.Time) (bool, []IntervalFilter) {
time.Sleep(m.delay)
m.calls++
ce := m.chunksExpiry[string(ref.ChunkID)]
return ce.isExpired, ce.nonDeletedIntervalFilters
}
func (m mockExpirationChecker) DropFromIndex(ref ChunkEntry, tableEndTime model.Time, now model.Time) bool {
func (m *mockExpirationChecker) DropFromIndex(ref ChunkEntry, tableEndTime model.Time, now model.Time) bool {
return false
}
func (m *mockExpirationChecker) MarkPhaseTimedOut() {
m.timedOut = true
}
func TestMarkForDelete_SeriesCleanup(t *testing.T) {
now := model.Now()
schema := allSchemas[2]
@ -658,8 +668,7 @@ func TestMarkForDelete_SeriesCleanup(t *testing.T) {
seriesCleanRecorder := newSeriesCleanRecorder(table)
cr := newChunkRewriter(store.chunkClient, table.name, table)
empty, isModified, err := markforDelete(context.Background(), table.name, noopWriter{}, seriesCleanRecorder,
expirationChecker, cr)
empty, isModified, err := markForDelete(context.Background(), 0, table.name, noopWriter{}, seriesCleanRecorder, expirationChecker, cr, util_log.Logger)
require.NoError(t, err)
require.Equal(t, tc.expectedEmpty[i], empty)
require.Equal(t, tc.expectedModified[i], isModified)
@ -670,6 +679,47 @@ func TestMarkForDelete_SeriesCleanup(t *testing.T) {
}
}
func TestDeleteTimeout(t *testing.T) {
chunks := []chunk.Chunk{
createChunk(t, "user", labels.Labels{labels.Label{Name: "foo", Value: "1"}}, model.Now(), model.Now().Add(270*time.Hour)),
createChunk(t, "user", labels.Labels{labels.Label{Name: "foo", Value: "2"}}, model.Now(), model.Now().Add(270*time.Hour)),
}
for _, tc := range []struct {
timeout time.Duration
calls int
timedOut bool
}{
{timeout: 2 * time.Millisecond, calls: 1, timedOut: true},
{timeout: 0, calls: 2, timedOut: false},
} {
store := newTestStore(t)
require.NoError(t, store.Put(context.TODO(), chunks))
store.Stop()
expirationChecker := newMockExpirationChecker(map[string]chunkExpiry{})
expirationChecker.delay = 10 * time.Millisecond
table := store.indexTables()[0]
empty, isModified, err := markForDelete(
context.Background(),
tc.timeout,
table.name,
noopWriter{},
newSeriesCleanRecorder(table),
expirationChecker,
newChunkRewriter(store.chunkClient, table.name, table),
util_log.Logger,
)
require.NoError(t, err)
require.False(t, empty)
require.False(t, isModified)
require.Equal(t, tc.calls, expirationChecker.calls)
require.Equal(t, tc.timedOut, expirationChecker.timedOut)
}
}
func TestMarkForDelete_DropChunkFromIndex(t *testing.T) {
schema := allSchemas[2]
store := newTestStore(t)
@ -696,8 +746,8 @@ func TestMarkForDelete_DropChunkFromIndex(t *testing.T) {
require.Len(t, tables, 8)
for i, table := range tables {
empty, _, err := markforDelete(context.Background(), table.name, noopWriter{}, table,
NewExpirationChecker(fakeLimits{perTenant: map[string]retentionLimit{"1": {retentionPeriod: retentionPeriod}}}), nil)
empty, _, err := markForDelete(context.Background(), 0, table.name, noopWriter{}, table,
NewExpirationChecker(fakeLimits{perTenant: map[string]retentionLimit{"1": {retentionPeriod: retentionPeriod}}}), nil, util_log.Logger)
require.NoError(t, err)
if i == 7 {
require.False(t, empty)

@ -121,10 +121,11 @@ type table struct {
chunks map[string][]chunk.Chunk
}
func (t *table) ForEachChunk(callback ChunkEntryCallback) error {
func (t *table) ForEachChunk(ctx context.Context, callback ChunkEntryCallback) error {
for userID, chks := range t.chunks {
i := 0
for _, chk := range chks {
for j := 0; j < len(chks) && ctx.Err() == nil; j++ {
chk := chks[j]
deleteChunk, err := callback(entryFromChunk(chk))
if err != nil {
return err
@ -139,7 +140,7 @@ func (t *table) ForEachChunk(callback ChunkEntryCallback) error {
t.chunks[userID] = t.chunks[userID][:i]
}
return nil
return ctx.Err()
}
func (t *table) IndexChunk(chunk chunk.Chunk) (bool, error) {

@ -160,7 +160,7 @@ func openCompactedIndex(path string) (*compactedIndex, error) {
return &compactedIndex{indexFile: idxFile}, nil
}
func (c compactedIndex) ForEachChunk(_ retention.ChunkEntryCallback) error {
func (c compactedIndex) ForEachChunk(_ context.Context, _ retention.ChunkEntryCallback) error {
return nil
}

@ -1,6 +1,7 @@
package compactor
import (
"context"
"fmt"
"os"
"path/filepath"
@ -135,7 +136,7 @@ func (c *CompactedIndex) setupIndexProcessors() error {
return nil
}
func (c *CompactedIndex) ForEachChunk(callback retention.ChunkEntryCallback) error {
func (c *CompactedIndex) ForEachChunk(ctx context.Context, callback retention.ChunkEntryCallback) error {
if err := c.setupIndexProcessors(); err != nil {
return err
}
@ -145,7 +146,7 @@ func (c *CompactedIndex) ForEachChunk(callback retention.ChunkEntryCallback) err
return fmt.Errorf("required boltdb bucket not found")
}
return ForEachChunk(bucket, c.periodConfig, callback)
return ForEachChunk(ctx, bucket, c.periodConfig, callback)
}
func (c *CompactedIndex) IndexChunk(chunk chunk.Chunk) (bool, error) {

@ -46,7 +46,7 @@ func TestCompactedIndex_IndexProcessor(t *testing.T) {
// remove c1, c2 chunk and index c4 with same labels as c2
c4 := createChunk(t, "2", labels.Labels{labels.Label{Name: "foo", Value: "bar"}, labels.Label{Name: "fizz", Value: "buzz"}}, tt.from, tt.from.Add(30*time.Minute))
err := compactedIndex.ForEachChunk(func(entry retention.ChunkEntry) (deleteChunk bool, err error) {
err := compactedIndex.ForEachChunk(context.Background(), func(entry retention.ChunkEntry) (deleteChunk bool, err error) {
if entry.Labels.Get("fizz") == "buzz" {
chunkIndexed, err := compactedIndex.IndexChunk(c4)
require.NoError(t, err)
@ -97,7 +97,7 @@ func TestCompactedIndex_IndexProcessor(t *testing.T) {
}
chunkEntriesFound := []retention.ChunkEntry{}
err = modifiedBoltDB.View(func(tx *bbolt.Tx) error {
return ForEachChunk(tx.Bucket(local.IndexBucketName), tt.config, func(entry retention.ChunkEntry) (deleteChunk bool, err error) {
return ForEachChunk(context.Background(), tx.Bucket(local.IndexBucketName), tt.config, func(entry retention.ChunkEntry) (deleteChunk bool, err error) {
chunkEntriesFound = append(chunkEntriesFound, entry)
return false, nil
})

@ -1,6 +1,7 @@
package compactor
import (
"context"
"fmt"
"github.com/prometheus/common/model"
@ -21,7 +22,7 @@ var (
_ retention.SeriesCleaner = &seriesCleaner{}
)
func ForEachChunk(bucket *bbolt.Bucket, config config.PeriodConfig, callback retention.ChunkEntryCallback) error {
func ForEachChunk(ctx context.Context, bucket *bbolt.Bucket, config config.PeriodConfig, callback retention.ChunkEntryCallback) error {
labelsMapper, err := newSeriesLabelsMapper(bucket, config)
if err != nil {
return err
@ -30,7 +31,7 @@ func ForEachChunk(bucket *bbolt.Bucket, config config.PeriodConfig, callback ret
cursor := bucket.Cursor()
var current retention.ChunkEntry
for key, _ := cursor.First(); key != nil; key, _ = cursor.Next() {
for key, _ := cursor.First(); key != nil && ctx.Err() == nil; key, _ = cursor.Next() {
ref, ok, err := parseChunkRef(decodeKey(key))
if err != nil {
return err
@ -53,7 +54,7 @@ func ForEachChunk(bucket *bbolt.Bucket, config config.PeriodConfig, callback ret
}
}
return nil
return ctx.Err()
}
type seriesCleaner struct {

@ -42,7 +42,7 @@ func Test_ChunkIterator(t *testing.T) {
require.Len(t, tables, 1)
var actual []retention.ChunkEntry
err := tables[0].DB.Update(func(tx *bbolt.Tx) error {
return ForEachChunk(tx.Bucket(local.IndexBucketName), tt.config, func(entry retention.ChunkEntry) (deleteChunk bool, err error) {
return ForEachChunk(context.Background(), tx.Bucket(local.IndexBucketName), tt.config, func(entry retention.ChunkEntry) (deleteChunk bool, err error) {
actual = append(actual, entry)
return len(actual) == 2, nil
})
@ -56,7 +56,7 @@ func Test_ChunkIterator(t *testing.T) {
// second pass we delete c2
actual = actual[:0]
err = tables[0].DB.Update(func(tx *bbolt.Tx) error {
return ForEachChunk(tx.Bucket(local.IndexBucketName), tt.config, func(entry retention.ChunkEntry) (deleteChunk bool, err error) {
return ForEachChunk(context.Background(), tx.Bucket(local.IndexBucketName), tt.config, func(entry retention.ChunkEntry) (deleteChunk bool, err error) {
actual = append(actual, entry)
return false, nil
})
@ -69,6 +69,37 @@ func Test_ChunkIterator(t *testing.T) {
}
}
func Test_ChunkIteratorContextCancelation(t *testing.T) {
cm := storage.NewClientMetrics()
defer cm.Unregister()
store := newTestStore(t, cm)
from := schemaCfg.Configs[0].From.Time
c1 := createChunk(t, "1", labels.Labels{labels.Label{Name: "foo", Value: "bar"}}, from, from.Add(1*time.Hour))
c2 := createChunk(t, "2", labels.Labels{labels.Label{Name: "foo", Value: "buzz"}, labels.Label{Name: "bar", Value: "foo"}}, from, from.Add(1*time.Hour))
require.NoError(t, store.Put(context.TODO(), []chunk.Chunk{c1, c2}))
store.Stop()
tables := store.indexTables()
require.Len(t, tables, 1)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var actual []retention.ChunkEntry
err := tables[0].DB.Update(func(tx *bbolt.Tx) error {
return ForEachChunk(ctx, tx.Bucket(local.IndexBucketName), schemaCfg.Configs[0], func(entry retention.ChunkEntry) (deleteChunk bool, err error) {
actual = append(actual, entry)
cancel()
return len(actual) == 2, nil
})
})
require.ErrorIs(t, err, context.Canceled)
require.Len(t, actual, 1)
}
func Test_SeriesCleaner(t *testing.T) {
for _, tt := range allSchemas {
tt := tt
@ -91,7 +122,7 @@ func Test_SeriesCleaner(t *testing.T) {
require.Len(t, tables, 1)
// remove c1, c2 chunk
err := tables[0].DB.Update(func(tx *bbolt.Tx) error {
return ForEachChunk(tx.Bucket(local.IndexBucketName), tt.config, func(entry retention.ChunkEntry) (deleteChunk bool, err error) {
return ForEachChunk(context.Background(), tx.Bucket(local.IndexBucketName), tt.config, func(entry retention.ChunkEntry) (deleteChunk bool, err error) {
return entry.Labels.Get("bar") == "foo", nil
})
})
@ -213,7 +244,7 @@ func Benchmark_ChunkIterator(b *testing.B) {
_ = store.indexTables()[0].Update(func(tx *bbolt.Tx) error {
bucket := tx.Bucket(local.IndexBucketName)
for n := 0; n < b.N; n++ {
err := ForEachChunk(bucket, allSchemas[0].config, func(entry retention.ChunkEntry) (deleteChunk bool, err error) {
err := ForEachChunk(context.Background(), bucket, allSchemas[0].config, func(entry retention.ChunkEntry) (deleteChunk bool, err error) {
chunkEntry = entry
total++
return true, nil

@ -269,7 +269,7 @@ func newCompactedIndex(ctx context.Context, tableName, userID, workingDir string
}
// ForEachChunk iterates over all the chunks in the builder and calls the callback function.
func (c *compactedIndex) ForEachChunk(callback retention.ChunkEntryCallback) error {
func (c *compactedIndex) ForEachChunk(ctx context.Context, callback retention.ChunkEntryCallback) error {
schemaCfg := config.SchemaConfig{
Configs: []config.PeriodConfig{c.periodConfig},
}
@ -287,7 +287,8 @@ func (c *compactedIndex) ForEachChunk(callback retention.ChunkEntryCallback) err
chunkEntry.SeriesID = getUnsafeBytes(seriesID)
chunkEntry.Labels = withoutTenantLabel(stream.labels)
for _, chk := range stream.chunks {
for i := 0; i < len(stream.chunks) && ctx.Err() == nil; i++ {
chk := stream.chunks[i]
logprotoChunkRef.From = chk.From()
logprotoChunkRef.Through = chk.Through()
logprotoChunkRef.Checksum = chk.Checksum
@ -308,7 +309,7 @@ func (c *compactedIndex) ForEachChunk(callback retention.ChunkEntryCallback) err
}
}
return nil
return ctx.Err()
}
// IndexChunk adds the chunk to the list of chunks to index.

@ -639,45 +639,7 @@ func chunkMetaToChunkRef(userID string, chunkMeta index.ChunkMeta, lbls labels.L
}
func TestCompactedIndex(t *testing.T) {
now := model.Now()
periodConfig := config.PeriodConfig{
IndexTables: config.PeriodicTableConfig{Period: config.ObjectStorageIndexRequiredPeriod},
Schema: "v12",
}
schemaCfg := config.SchemaConfig{
Configs: []config.PeriodConfig{periodConfig},
}
indexBuckets, err := indexBuckets(now, now, []config.TableRange{periodConfig.GetIndexTableNumberRange(config.DayTime{Time: now})})
require.NoError(t, err)
tableName := indexBuckets[0]
tableInterval := retention.ExtractIntervalFromTableName(tableName)
// shiftTableStart shift tableInterval.Start by the given amount of milliseconds.
// It is used for building chunkmetas relative to start time of the table.
shiftTableStart := func(ms int64) int64 {
return int64(tableInterval.Start) + ms
}
lbls1 := mustParseLabels(`{foo="bar", a="b"}`)
lbls2 := mustParseLabels(`{fizz="buzz", a="b"}`)
userID := buildUserID(0)
buildCompactedIndex := func() *compactedIndex {
builder := NewBuilder()
stream := buildStream(lbls1, buildChunkMetas(shiftTableStart(0), shiftTableStart(10)), "")
builder.AddSeries(stream.labels, stream.fp, stream.chunks)
stream = buildStream(lbls2, buildChunkMetas(shiftTableStart(0), shiftTableStart(20)), "")
builder.AddSeries(stream.labels, stream.fp, stream.chunks)
builder.FinalizeChunks()
return newCompactedIndex(context.Background(), tableName, buildUserID(0), t.TempDir(), periodConfig, builder)
}
expectedChunkEntries := map[string][]retention.ChunkEntry{
lbls1.String(): chunkMetasToChunkEntry(schemaCfg, userID, lbls1, buildChunkMetas(shiftTableStart(0), shiftTableStart(10))),
lbls2.String(): chunkMetasToChunkEntry(schemaCfg, userID, lbls2, buildChunkMetas(shiftTableStart(0), shiftTableStart(20))),
}
testCtx := setupCompactedIndex(t)
for name, tc := range map[string]struct {
deleteChunks map[string]index.ChunkMetas
@ -689,101 +651,101 @@ func TestCompactedIndex(t *testing.T) {
}{
"no changes": {
finalExpectedChunks: map[string]index.ChunkMetas{
lbls1.String(): buildChunkMetas(shiftTableStart(0), shiftTableStart(10)),
lbls2.String(): buildChunkMetas(shiftTableStart(0), shiftTableStart(20)),
testCtx.lbls1.String(): buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(10)),
testCtx.lbls2.String(): buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(20)),
},
},
"delete some chunks from a stream": {
deleteChunks: map[string]index.ChunkMetas{
lbls1.String(): append(buildChunkMetas(shiftTableStart(3), shiftTableStart(5)), buildChunkMetas(shiftTableStart(7), shiftTableStart(8))...),
testCtx.lbls1.String(): append(buildChunkMetas(testCtx.shiftTableStart(3), testCtx.shiftTableStart(5)), buildChunkMetas(testCtx.shiftTableStart(7), testCtx.shiftTableStart(8))...),
},
finalExpectedChunks: map[string]index.ChunkMetas{
lbls1.String(): append(buildChunkMetas(shiftTableStart(0), shiftTableStart(2)), append(buildChunkMetas(shiftTableStart(6), shiftTableStart(6)), buildChunkMetas(shiftTableStart(9), shiftTableStart(10))...)...),
lbls2.String(): buildChunkMetas(shiftTableStart(0), shiftTableStart(20)),
testCtx.lbls1.String(): append(buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(2)), append(buildChunkMetas(testCtx.shiftTableStart(6), testCtx.shiftTableStart(6)), buildChunkMetas(testCtx.shiftTableStart(9), testCtx.shiftTableStart(10))...)...),
testCtx.lbls2.String(): buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(20)),
},
},
"delete all chunks from a stream": {
deleteChunks: map[string]index.ChunkMetas{
lbls1.String(): buildChunkMetas(shiftTableStart(0), shiftTableStart(10)),
testCtx.lbls1.String(): buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(10)),
},
deleteSeries: []labels.Labels{lbls1},
deleteSeries: []labels.Labels{testCtx.lbls1},
finalExpectedChunks: map[string]index.ChunkMetas{
lbls2.String(): buildChunkMetas(shiftTableStart(0), shiftTableStart(20)),
testCtx.lbls2.String(): buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(20)),
},
},
"add some chunks to a stream": {
addChunks: []chunk.Chunk{
{
Metric: lbls1,
ChunkRef: chunkMetaToChunkRef(userID, buildChunkMetas(shiftTableStart(11), shiftTableStart(11))[0], lbls1),
Metric: testCtx.lbls1,
ChunkRef: chunkMetaToChunkRef(testCtx.userID, buildChunkMetas(testCtx.shiftTableStart(11), testCtx.shiftTableStart(11))[0], testCtx.lbls1),
Data: dummyChunkData{},
},
{
Metric: lbls1,
ChunkRef: chunkMetaToChunkRef(userID, buildChunkMetas(shiftTableStart(12), shiftTableStart(12))[0], lbls1),
Metric: testCtx.lbls1,
ChunkRef: chunkMetaToChunkRef(testCtx.userID, buildChunkMetas(testCtx.shiftTableStart(12), testCtx.shiftTableStart(12))[0], testCtx.lbls1),
Data: dummyChunkData{},
},
},
finalExpectedChunks: map[string]index.ChunkMetas{
lbls1.String(): buildChunkMetas(shiftTableStart(0), shiftTableStart(12)),
lbls2.String(): buildChunkMetas(shiftTableStart(0), shiftTableStart(20)),
testCtx.lbls1.String(): buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(12)),
testCtx.lbls2.String(): buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(20)),
},
},
"add some chunks out of table interval to a stream": {
addChunks: []chunk.Chunk{
{
Metric: lbls1,
ChunkRef: chunkMetaToChunkRef(userID, buildChunkMetas(shiftTableStart(11), shiftTableStart(11))[0], lbls1),
Metric: testCtx.lbls1,
ChunkRef: chunkMetaToChunkRef(testCtx.userID, buildChunkMetas(testCtx.shiftTableStart(11), testCtx.shiftTableStart(11))[0], testCtx.lbls1),
Data: dummyChunkData{},
},
{
Metric: lbls1,
ChunkRef: chunkMetaToChunkRef(userID, buildChunkMetas(shiftTableStart(12), shiftTableStart(12))[0], lbls1),
Metric: testCtx.lbls1,
ChunkRef: chunkMetaToChunkRef(testCtx.userID, buildChunkMetas(testCtx.shiftTableStart(12), testCtx.shiftTableStart(12))[0], testCtx.lbls1),
Data: dummyChunkData{},
},
// these chunks should not be added
{
Metric: lbls1,
ChunkRef: chunkMetaToChunkRef(userID, buildChunkMetas(int64(tableInterval.End+100), int64(tableInterval.End+100))[0], lbls1),
Metric: testCtx.lbls1,
ChunkRef: chunkMetaToChunkRef(testCtx.userID, buildChunkMetas(int64(testCtx.tableInterval.End+100), int64(testCtx.tableInterval.End+100))[0], testCtx.lbls1),
Data: dummyChunkData{},
},
{
Metric: lbls1,
ChunkRef: chunkMetaToChunkRef(userID, buildChunkMetas(int64(tableInterval.End+200), int64(tableInterval.End+200))[0], lbls1),
Metric: testCtx.lbls1,
ChunkRef: chunkMetaToChunkRef(testCtx.userID, buildChunkMetas(int64(testCtx.tableInterval.End+200), int64(testCtx.tableInterval.End+200))[0], testCtx.lbls1),
Data: dummyChunkData{},
},
},
finalExpectedChunks: map[string]index.ChunkMetas{
lbls1.String(): buildChunkMetas(shiftTableStart(0), shiftTableStart(12)),
lbls2.String(): buildChunkMetas(shiftTableStart(0), shiftTableStart(20)),
testCtx.lbls1.String(): buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(12)),
testCtx.lbls2.String(): buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(20)),
},
},
"add and delete some chunks in a stream": {
addChunks: []chunk.Chunk{
{
Metric: lbls1,
ChunkRef: chunkMetaToChunkRef(userID, buildChunkMetas(shiftTableStart(11), shiftTableStart(11))[0], lbls1),
Metric: testCtx.lbls1,
ChunkRef: chunkMetaToChunkRef(testCtx.userID, buildChunkMetas(testCtx.shiftTableStart(11), testCtx.shiftTableStart(11))[0], testCtx.lbls1),
Data: dummyChunkData{},
},
{
Metric: lbls1,
ChunkRef: chunkMetaToChunkRef(userID, buildChunkMetas(shiftTableStart(12), shiftTableStart(12))[0], lbls1),
Metric: testCtx.lbls1,
ChunkRef: chunkMetaToChunkRef(testCtx.userID, buildChunkMetas(testCtx.shiftTableStart(12), testCtx.shiftTableStart(12))[0], testCtx.lbls1),
Data: dummyChunkData{},
},
},
deleteChunks: map[string]index.ChunkMetas{
lbls1.String(): buildChunkMetas(shiftTableStart(3), shiftTableStart(5)),
testCtx.lbls1.String(): buildChunkMetas(testCtx.shiftTableStart(3), testCtx.shiftTableStart(5)),
},
finalExpectedChunks: map[string]index.ChunkMetas{
lbls1.String(): append(buildChunkMetas(shiftTableStart(0), shiftTableStart(2)), buildChunkMetas(shiftTableStart(6), shiftTableStart(12))...),
lbls2.String(): buildChunkMetas(shiftTableStart(0), shiftTableStart(20)),
testCtx.lbls1.String(): append(buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(2)), buildChunkMetas(testCtx.shiftTableStart(6), testCtx.shiftTableStart(12))...),
testCtx.lbls2.String(): buildChunkMetas(testCtx.shiftTableStart(0), testCtx.shiftTableStart(20)),
},
},
"adding chunk to non-existing stream should error": {
addChunks: []chunk.Chunk{
{
Metric: labels.NewBuilder(lbls1).Set("new", "label").Labels(),
ChunkRef: chunkMetaToChunkRef(userID, buildChunkMetas(shiftTableStart(11), shiftTableStart(11))[0], lbls1),
Metric: labels.NewBuilder(testCtx.lbls1).Set("new", "label").Labels(),
ChunkRef: chunkMetaToChunkRef(testCtx.userID, buildChunkMetas(testCtx.shiftTableStart(11), testCtx.shiftTableStart(11))[0], testCtx.lbls1),
Data: dummyChunkData{},
},
},
@ -791,10 +753,10 @@ func TestCompactedIndex(t *testing.T) {
},
} {
t.Run(name, func(t *testing.T) {
compactedIndex := buildCompactedIndex()
compactedIndex := testCtx.buildCompactedIndex()
foundChunkEntries := map[string][]retention.ChunkEntry{}
err := compactedIndex.ForEachChunk(func(chunkEntry retention.ChunkEntry) (deleteChunk bool, err error) {
err := compactedIndex.ForEachChunk(context.Background(), func(chunkEntry retention.ChunkEntry) (deleteChunk bool, err error) {
seriesIDStr := string(chunkEntry.SeriesID)
foundChunkEntries[seriesIDStr] = append(foundChunkEntries[seriesIDStr], chunkEntry)
if chks, ok := tc.deleteChunks[string(chunkEntry.SeriesID)]; ok {
@ -809,7 +771,7 @@ func TestCompactedIndex(t *testing.T) {
})
require.NoError(t, err)
require.Equal(t, expectedChunkEntries, foundChunkEntries)
require.Equal(t, testCtx.expectedChunkEntries, foundChunkEntries)
for _, lbls := range tc.deleteSeries {
require.NoError(t, compactedIndex.CleanupSeries(nil, lbls))
@ -839,6 +801,79 @@ func TestCompactedIndex(t *testing.T) {
}
func TestIteratorContextCancelation(t *testing.T) {
tc := setupCompactedIndex(t)
compactedIndex := tc.buildCompactedIndex()
ctx, cancel := context.WithCancel(context.Background())
cancel()
var foundChunkEntries []retention.ChunkEntry
err := compactedIndex.ForEachChunk(ctx, func(chunkEntry retention.ChunkEntry) (deleteChunk bool, err error) {
foundChunkEntries = append(foundChunkEntries, chunkEntry)
return false, nil
})
require.ErrorIs(t, err, context.Canceled)
}
type testContext struct {
lbls1 labels.Labels
lbls2 labels.Labels
userID string
tableInterval model.Interval
shiftTableStart func(ms int64) int64
buildCompactedIndex func() *compactedIndex
expectedChunkEntries map[string][]retention.ChunkEntry
}
func setupCompactedIndex(t *testing.T) *testContext {
t.Helper()
now := model.Now()
periodConfig := config.PeriodConfig{
IndexTables: config.PeriodicTableConfig{Period: config.ObjectStorageIndexRequiredPeriod},
Schema: "v12",
}
schemaCfg := config.SchemaConfig{
Configs: []config.PeriodConfig{periodConfig},
}
indexBuckets, err := indexBuckets(now, now, []config.TableRange{periodConfig.GetIndexTableNumberRange(config.DayTime{Time: now})})
require.NoError(t, err)
tableName := indexBuckets[0]
tableInterval := retention.ExtractIntervalFromTableName(tableName)
// shiftTableStart shift tableInterval.Start by the given amount of milliseconds.
// It is used for building chunkmetas relative to start time of the table.
shiftTableStart := func(ms int64) int64 {
return int64(tableInterval.Start) + ms
}
lbls1 := mustParseLabels(`{foo="bar", a="b"}`)
lbls2 := mustParseLabels(`{fizz="buzz", a="b"}`)
userID := buildUserID(0)
buildCompactedIndex := func() *compactedIndex {
builder := NewBuilder()
stream := buildStream(lbls1, buildChunkMetas(shiftTableStart(0), shiftTableStart(10)), "")
builder.AddSeries(stream.labels, stream.fp, stream.chunks)
stream = buildStream(lbls2, buildChunkMetas(shiftTableStart(0), shiftTableStart(20)), "")
builder.AddSeries(stream.labels, stream.fp, stream.chunks)
builder.FinalizeChunks()
return newCompactedIndex(context.Background(), tableName, buildUserID(0), t.TempDir(), periodConfig, builder)
}
expectedChunkEntries := map[string][]retention.ChunkEntry{
lbls1.String(): chunkMetasToChunkEntry(schemaCfg, userID, lbls1, buildChunkMetas(shiftTableStart(0), shiftTableStart(10))),
lbls2.String(): chunkMetasToChunkEntry(schemaCfg, userID, lbls2, buildChunkMetas(shiftTableStart(0), shiftTableStart(20))),
}
return &testContext{lbls1, lbls2, userID, tableInterval, shiftTableStart, buildCompactedIndex, expectedChunkEntries}
}
type dummyChunkData struct {
chunk.Data
}

@ -73,7 +73,7 @@ func main() {
// loads everything into memory.
if err := db.View(func(t *bbolt.Tx) error {
return compactor.ForEachChunk(t.Bucket([]byte("index")), periodConfig, func(entry retention.ChunkEntry) (bool, error) {
return compactor.ForEachChunk(context.Background(), t.Bucket([]byte("index")), periodConfig, func(entry retention.ChunkEntry) (bool, error) {
builder.AddSeries(entry.Labels, model.Fingerprint(entry.Labels.Hash()), []index.ChunkMeta{{
Checksum: extractChecksumFromChunkID(entry.ChunkID),
MinTime: int64(entry.From),

Loading…
Cancel
Save