Retention speedup (#4172)

* drop the multi table chunk entries from index when the whole table is out of retention

* speed up cleaning of deleted series from index

* mark partially deleted chunk for deletion when all its tables are processed
pull/4193/head
Sandeep Sukhani 4 years ago committed by GitHub
parent f17b189a3f
commit 29b07d190d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 18
      pkg/storage/stores/shipper/compactor/compactor.go
  2. 39
      pkg/storage/stores/shipper/compactor/compactor_test.go
  3. 4
      pkg/storage/stores/shipper/compactor/deletion/delete_requests_manager.go
  4. 10
      pkg/storage/stores/shipper/compactor/retention/expiration.go
  5. 97
      pkg/storage/stores/shipper/compactor/retention/iterator.go
  6. 6
      pkg/storage/stores/shipper/compactor/retention/iterator_test.go
  7. 54
      pkg/storage/stores/shipper/compactor/retention/retention.go
  8. 293
      pkg/storage/stores/shipper/compactor/retention/retention_test.go
  9. 32
      pkg/storage/stores/shipper/compactor/retention/series.go
  10. 16
      pkg/storage/stores/shipper/compactor/retention/series_test.go
  11. 29
      pkg/storage/stores/shipper/compactor/retention/util.go
  12. 38
      pkg/storage/stores/shipper/compactor/retention/util_test.go

@ -6,7 +6,6 @@ import (
"flag"
"path/filepath"
"reflect"
"strconv"
"sync"
"time"
@ -198,7 +197,7 @@ func (c *Compactor) CompactTable(ctx context.Context, tableName string) error {
return err
}
interval := extractIntervalFromTableName(tableName)
interval := retention.ExtractIntervalFromTableName(tableName)
intervalHasExpiredChunks := false
if c.cfg.RetentionEnabled {
intervalHasExpiredChunks = c.expirationChecker.IntervalHasExpiredChunks(interval)
@ -338,17 +337,6 @@ func (e *expirationChecker) IntervalHasExpiredChunks(interval model.Interval) bo
return e.retentionExpiryChecker.IntervalHasExpiredChunks(interval) || e.deletionExpiryChecker.IntervalHasExpiredChunks(interval)
}
func extractIntervalFromTableName(tableName string) model.Interval {
interval := model.Interval{
Start: 0,
End: model.Now(),
}
tableNumber, err := strconv.ParseInt(tableName[len(tableName)-5:], 10, 64)
if err != nil {
return interval
}
interval.Start = model.TimeFromUnix(tableNumber * 86400)
interval.End = interval.Start.Add(24 * time.Hour)
return interval
func (e *expirationChecker) DropFromIndex(ref retention.ChunkEntry, tableEndTime model.Time, now model.Time) bool {
return e.retentionExpiryChecker.DropFromIndex(ref, tableEndTime, now) || e.deletionExpiryChecker.DropFromIndex(ref, tableEndTime, now)
}

@ -12,11 +12,9 @@ import (
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
loki_storage "github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/chunk/local"
"github.com/grafana/loki/pkg/storage/chunk/storage"
"github.com/grafana/loki/pkg/storage/stores/shipper/testutil"
@ -59,43 +57,6 @@ func TestIsDefaults(t *testing.T) {
}
}
func TestExtractIntervalFromTableName(t *testing.T) {
periodicTableConfig := chunk.PeriodicTableConfig{
Prefix: "dummy",
Period: 24 * time.Hour,
}
const millisecondsInDay = model.Time(24 * time.Hour / time.Millisecond)
calculateInterval := func(tm model.Time) (m model.Interval) {
m.Start = tm - tm%millisecondsInDay
m.End = m.Start + millisecondsInDay
return
}
for i, tc := range []struct {
tableName string
expectedInterval model.Interval
}{
{
tableName: periodicTableConfig.TableFor(model.Now()),
expectedInterval: calculateInterval(model.Now()),
},
{
tableName: periodicTableConfig.TableFor(model.Now().Add(-24 * time.Hour)),
expectedInterval: calculateInterval(model.Now().Add(-24 * time.Hour)),
},
{
tableName: periodicTableConfig.TableFor(model.Now().Add(-24 * time.Hour).Add(time.Minute)),
expectedInterval: calculateInterval(model.Now().Add(-24 * time.Hour).Add(time.Minute)),
},
} {
t.Run(fmt.Sprint(i), func(t *testing.T) {
require.Equal(t, tc.expectedInterval, extractIntervalFromTableName(tc.tableName))
})
}
}
func TestCompactor_RunCompaction(t *testing.T) {
tempDir, err := ioutil.TempDir("", "compactor-run-compaction")
require.NoError(t, err)

@ -214,3 +214,7 @@ func (d *DeleteRequestsManager) IntervalHasExpiredChunks(interval model.Interval
return false
}
func (d *DeleteRequestsManager) DropFromIndex(_ retention.ChunkEntry, _ model.Time, _ model.Time) bool {
return false
}

@ -19,6 +19,7 @@ type ExpirationChecker interface {
MarkPhaseStarted()
MarkPhaseFailed()
MarkPhaseFinished()
DropFromIndex(ref ChunkEntry, tableEndTime model.Time, now model.Time) bool
}
type expirationChecker struct {
@ -46,6 +47,15 @@ func (e *expirationChecker) Expired(ref ChunkEntry, now model.Time) (bool, []mod
return now.Sub(ref.Through) > period, nil
}
// DropFromIndex tells if it is okay to drop the chunk entry from index table.
// We check if tableEndTime is out of retention period, calculated using the labels from the chunk.
// If the tableEndTime is out of retention then we can drop the chunk entry without removing the chunk from the store.
func (e *expirationChecker) DropFromIndex(ref ChunkEntry, tableEndTime model.Time, now model.Time) bool {
userID := unsafeGetString(ref.UserID)
period := e.tenantsRetention.RetentionPeriodFor(userID, ref.Labels)
return now.Sub(tableEndTime) > period
}
func (e *expirationChecker) MarkPhaseStarted() {
smallestRetentionPeriod := findSmallestRetentionPeriod(e.tenantsRetention.limits)
e.latestRetentionStartTime = model.Now().Add(-smallestRetentionPeriod)

@ -1,11 +1,9 @@
package retention
import (
"bytes"
"encoding/binary"
"fmt"
"time"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"go.etcd.io/bbolt"
@ -91,93 +89,60 @@ func (b *chunkIndexIterator) Next() bool {
}
type SeriesCleaner interface {
Cleanup(seriesID []byte, userID []byte) error
Cleanup(userID []byte, lbls labels.Labels) error
}
type seriesCleaner struct {
bucketTimestamps []string
shards map[uint32]string
cursor *bbolt.Cursor
config chunk.PeriodConfig
tableInterval model.Interval
shards map[uint32]string
bucket *bbolt.Bucket
config chunk.PeriodConfig
schema chunk.SeriesStoreSchema
buf []byte
}
func newSeriesCleaner(bucket *bbolt.Bucket, config chunk.PeriodConfig) *seriesCleaner {
var (
fromDay = config.From.Time.Unix() / int64(config.IndexTables.Period/time.Second)
throughDay = config.From.Add(config.IndexTables.Period).Unix() / int64(config.IndexTables.Period/time.Second)
bucketTimestamps = []string{}
)
for i := fromDay; i <= throughDay; i++ {
bucketTimestamps = append(bucketTimestamps, fmt.Sprintf("d%d", i))
}
func newSeriesCleaner(bucket *bbolt.Bucket, config chunk.PeriodConfig, tableName string) *seriesCleaner {
baseSchema, _ := config.CreateSchema()
schema := baseSchema.(chunk.SeriesStoreSchema)
var shards map[uint32]string
if config.RowShards != 0 {
shards = map[uint32]string{}
for s := uint32(0); s <= config.RowShards; s++ {
shards[s] = fmt.Sprintf("%02d", s)
}
}
return &seriesCleaner{
bucketTimestamps: bucketTimestamps,
cursor: bucket.Cursor(),
buf: make([]byte, 0, 1024),
config: config,
shards: shards,
tableInterval: ExtractIntervalFromTableName(tableName),
schema: schema,
bucket: bucket,
buf: make([]byte, 0, 1024),
config: config,
shards: shards,
}
}
func (s *seriesCleaner) Cleanup(seriesID []byte, userID []byte) error {
for _, timestamp := range s.bucketTimestamps {
// build the chunk ref prefix
s.buf = s.buf[:0]
if s.config.Schema != "v9" {
shard := binary.BigEndian.Uint32(seriesID) % s.config.RowShards
s.buf = append(s.buf, unsafeGetBytes(s.shards[shard])...)
s.buf = append(s.buf, ':')
}
s.buf = append(s.buf, userID...)
s.buf = append(s.buf, ':')
s.buf = append(s.buf, unsafeGetBytes(timestamp)...)
s.buf = append(s.buf, ':')
s.buf = append(s.buf, seriesID...)
if key, _ := s.cursor.Seek(s.buf); key != nil && bytes.HasPrefix(key, s.buf) {
// this series still have chunk entries we can't cleanup
continue
}
// we don't have any chunk ref for that series let's delete all label index entries
s.buf = s.buf[:0]
if s.config.Schema != "v9" {
shard := binary.BigEndian.Uint32(seriesID) % s.config.RowShards
s.buf = append(s.buf, unsafeGetBytes(s.shards[shard])...)
s.buf = append(s.buf, ':')
}
s.buf = append(s.buf, userID...)
s.buf = append(s.buf, ':')
s.buf = append(s.buf, unsafeGetBytes(timestamp)...)
s.buf = append(s.buf, ':')
s.buf = append(s.buf, unsafeGetBytes(logMetricName)...)
func (s *seriesCleaner) Cleanup(userID []byte, lbls labels.Labels) error {
_, indexEntries, err := s.schema.GetCacheKeysAndLabelWriteEntries(s.tableInterval.Start, s.tableInterval.End, string(userID), logMetricName, lbls, "")
if err != nil {
return err
}
// delete all seriesRangeKeyV1 and labelSeriesRangeKeyV1 via prefix
// todo(cyriltovena) we might be able to encode index key instead of parsing all label entries for faster delete.
for key, _ := s.cursor.Seek(s.buf); key != nil && bytes.HasPrefix(key, s.buf); key, _ = s.cursor.Next() {
for i := range indexEntries {
for _, indexEntry := range indexEntries[i] {
key := make([]byte, 0, len(indexEntry.HashValue)+len(separator)+len(indexEntry.RangeValue))
key = append(key, []byte(indexEntry.HashValue)...)
key = append(key, []byte(separator)...)
key = append(key, indexEntry.RangeValue...)
parsedSeriesID, ok, err := parseLabelIndexSeriesID(decodeKey(key))
err := s.bucket.Delete(key)
if err != nil {
return err
}
if !ok {
continue
}
if !bytes.Equal(seriesID, parsedSeriesID) {
continue
}
if err := s.cursor.Delete(); err != nil {
return err
}
}
}
return nil
}

@ -100,11 +100,11 @@ func Test_SeriesCleaner(t *testing.T) {
require.NoError(t, err)
err = tables[0].DB.Update(func(tx *bbolt.Tx) error {
cleaner := newSeriesCleaner(tx.Bucket(bucketName), tt.config)
if err := cleaner.Cleanup(entryFromChunk(c2).SeriesID, entryFromChunk(c2).UserID); err != nil {
cleaner := newSeriesCleaner(tx.Bucket(bucketName), tt.config, tables[0].name)
if err := cleaner.Cleanup(entryFromChunk(c2).UserID, c2.Metric); err != nil {
return err
}
if err := cleaner.Cleanup(entryFromChunk(c1).SeriesID, entryFromChunk(c1).UserID); err != nil {
if err := cleaner.Cleanup(entryFromChunk(c1).UserID, c1.Metric); err != nil {
return err
}
return nil

@ -104,7 +104,7 @@ func (t *Marker) markTable(ctx context.Context, tableName string, db *bbolt.DB)
return err
}
empty, err = markforDelete(ctx, markerWriter, chunkIt, newSeriesCleaner(bucket, schemaCfg), t.expiration, chunkRewriter)
empty, err = markforDelete(ctx, tableName, markerWriter, chunkIt, newSeriesCleaner(bucket, schemaCfg, tableName), t.expiration, chunkRewriter)
if err != nil {
return err
}
@ -129,38 +129,65 @@ func (t *Marker) markTable(ctx context.Context, tableName string, db *bbolt.DB)
return empty, markerWriter.Count(), nil
}
func markforDelete(ctx context.Context, marker MarkerStorageWriter, chunkIt ChunkEntryIterator, seriesCleaner SeriesCleaner, expiration ExpirationChecker, chunkRewriter *chunkRewriter) (bool, error) {
func markforDelete(ctx context.Context, tableName string, marker MarkerStorageWriter, chunkIt ChunkEntryIterator, seriesCleaner SeriesCleaner, expiration ExpirationChecker, chunkRewriter *chunkRewriter) (bool, error) {
seriesMap := newUserSeriesMap()
// tableInterval holds the interval for which the table is expected to have the chunks indexed
tableInterval := ExtractIntervalFromTableName(tableName)
empty := true
now := model.Now()
for chunkIt.Next() {
if chunkIt.Err() != nil {
return false, chunkIt.Err()
}
c := chunkIt.Entry()
seriesMap.Add(c.SeriesID, c.UserID, c.Labels)
// see if the chunk is deleted completely or partially
if expired, nonDeletedIntervals := expiration.Expired(c, now); expired {
if len(nonDeletedIntervals) == 0 {
seriesMap.Add(c.SeriesID, c.UserID)
} else {
if len(nonDeletedIntervals) > 0 {
wroteChunks, err := chunkRewriter.rewriteChunk(ctx, c, nonDeletedIntervals)
if err != nil {
return false, err
}
if !wroteChunks {
seriesMap.Add(c.SeriesID, c.UserID)
if wroteChunks {
// we have re-written chunk to the storage so the table won't be empty and the series are still being referred.
empty = false
seriesMap.MarkSeriesNotDeleted(c.SeriesID, c.UserID)
}
}
if err := chunkIt.Delete(); err != nil {
return false, err
}
if err := marker.Put(c.ChunkID); err != nil {
return false, err
// Mark the chunk for deletion only if it is completely deleted, or this is the last table that the chunk is index in.
// For a partially deleted chunk, if we delete the source chunk before all the tables which index it are processed then
// the retention would fail because it would fail to find it in the storage.
if len(nonDeletedIntervals) == 0 || c.Through <= tableInterval.End {
if err := marker.Put(c.ChunkID); err != nil {
return false, err
}
}
continue
}
// The chunk is not deleted, now see if we can drop its index entry based on end time from tableInterval.
// If chunk end time is after the end time of tableInterval, it means the chunk would also be indexed in the next table.
// We would now check if the end time of the tableInterval is out of retention period so that
// we can drop the chunk entry from this table without removing the chunk from the store.
if c.Through.After(tableInterval.End) {
if expiration.DropFromIndex(c, tableInterval.End, now) {
if err := chunkIt.Delete(); err != nil {
return false, err
}
continue
}
}
empty = false
seriesMap.MarkSeriesNotDeleted(c.SeriesID, c.UserID)
}
if empty {
return true, nil
@ -168,8 +195,13 @@ func markforDelete(ctx context.Context, marker MarkerStorageWriter, chunkIt Chun
if ctx.Err() != nil {
return false, ctx.Err()
}
return false, seriesMap.ForEach(func(seriesID, userID []byte) error {
return seriesCleaner.Cleanup(seriesID, userID)
return false, seriesMap.ForEach(func(info userSeriesInfo) error {
if !info.isDeleted {
return nil
}
return seriesCleaner.Cleanup(info.UserID(), info.lbls)
})
}

@ -187,7 +187,7 @@ func (noopWriter) Close() error { return nil }
type noopCleaner struct{}
func (noopCleaner) Cleanup(seriesID []byte, userID []byte) error { return nil }
func (noopCleaner) Cleanup(userID []byte, lbls labels.Labels) error { return nil }
func Test_EmptyTable(t *testing.T) {
schema := allSchemas[0]
@ -207,7 +207,7 @@ func Test_EmptyTable(t *testing.T) {
err := tables[0].DB.Update(func(tx *bbolt.Tx) error {
it, err := newChunkIndexIterator(tx.Bucket(bucketName), schema.config)
require.NoError(t, err)
empty, err := markforDelete(context.Background(), noopWriter{}, it, noopCleaner{},
empty, err := markforDelete(context.Background(), tables[0].name, noopWriter{}, it, noopCleaner{},
NewExpirationChecker(&fakeLimits{perTenant: map[string]retentionLimit{"1": {retentionPeriod: 0}, "2": {retentionPeriod: 0}}}), nil)
require.NoError(t, err)
require.True(t, empty)
@ -407,3 +407,292 @@ func TestChunkRewriter(t *testing.T) {
})
}
}
type seriesCleanedRecorder struct {
// map of userID -> map of labels hash -> struct{}
deletedSeries map[string]map[uint64]struct{}
}
func newSeriesCleanRecorder() *seriesCleanedRecorder {
return &seriesCleanedRecorder{map[string]map[uint64]struct{}{}}
}
func (s *seriesCleanedRecorder) Cleanup(userID []byte, lbls labels.Labels) error {
s.deletedSeries[string(userID)] = map[uint64]struct{}{lbls.Hash(): {}}
return nil
}
type chunkExpiry struct {
isExpired bool
nonDeletedIntervals []model.Interval
}
type mockExpirationChecker struct {
ExpirationChecker
chunksExpiry map[string]chunkExpiry
}
func newMockExpirationChecker(chunksExpiry map[string]chunkExpiry) mockExpirationChecker {
return mockExpirationChecker{chunksExpiry: chunksExpiry}
}
func (m mockExpirationChecker) Expired(ref ChunkEntry, now model.Time) (bool, []model.Interval) {
ce := m.chunksExpiry[string(ref.ChunkID)]
return ce.isExpired, ce.nonDeletedIntervals
}
func (m mockExpirationChecker) DropFromIndex(ref ChunkEntry, tableEndTime model.Time, now model.Time) bool {
return false
}
func TestMarkForDelete_SeriesCleanup(t *testing.T) {
now := model.Now()
schema := allSchemas[2]
userID := "1"
todaysTableInterval := ExtractIntervalFromTableName(schema.config.IndexTables.TableFor(now))
for _, tc := range []struct {
name string
chunks []chunk.Chunk
expiry []chunkExpiry
expectedDeletedSeries []map[uint64]struct{}
expectedEmpty []bool
}{
{
name: "no chunk and series deleted",
chunks: []chunk.Chunk{
createChunk(t, userID, labels.Labels{labels.Label{Name: "foo", Value: "1"}}, now.Add(-30*time.Minute), now),
},
expiry: []chunkExpiry{
{
isExpired: false,
},
},
expectedDeletedSeries: []map[uint64]struct{}{
nil,
},
expectedEmpty: []bool{
false,
},
},
{
name: "only one chunk in store which gets deleted",
chunks: []chunk.Chunk{
createChunk(t, userID, labels.Labels{labels.Label{Name: "foo", Value: "1"}}, now.Add(-30*time.Minute), now),
},
expiry: []chunkExpiry{
{
isExpired: true,
},
},
expectedDeletedSeries: []map[uint64]struct{}{
nil,
},
expectedEmpty: []bool{
true,
},
},
{
name: "only one chunk in store which gets partially deleted",
chunks: []chunk.Chunk{
createChunk(t, userID, labels.Labels{labels.Label{Name: "foo", Value: "1"}}, now.Add(-30*time.Minute), now),
},
expiry: []chunkExpiry{
{
isExpired: true,
nonDeletedIntervals: []model.Interval{{
Start: now.Add(-15 * time.Minute),
End: now,
}},
},
},
expectedDeletedSeries: []map[uint64]struct{}{
nil,
},
expectedEmpty: []bool{
false,
},
},
{
name: "one of two chunks deleted",
chunks: []chunk.Chunk{
createChunk(t, userID, labels.Labels{labels.Label{Name: "foo", Value: "1"}}, now.Add(-30*time.Minute), now),
createChunk(t, userID, labels.Labels{labels.Label{Name: "foo", Value: "2"}}, now.Add(-30*time.Minute), now),
},
expiry: []chunkExpiry{
{
isExpired: false,
},
{
isExpired: true,
},
},
expectedDeletedSeries: []map[uint64]struct{}{
{labels.Labels{labels.Label{Name: "foo", Value: "2"}}.Hash(): struct{}{}},
},
expectedEmpty: []bool{
false,
},
},
{
name: "one of two chunks partially deleted",
chunks: []chunk.Chunk{
createChunk(t, userID, labels.Labels{labels.Label{Name: "foo", Value: "1"}}, now.Add(-30*time.Minute), now),
createChunk(t, userID, labels.Labels{labels.Label{Name: "foo", Value: "2"}}, now.Add(-30*time.Minute), now),
},
expiry: []chunkExpiry{
{
isExpired: false,
},
{
isExpired: true,
nonDeletedIntervals: []model.Interval{{
Start: now.Add(-15 * time.Minute),
End: now,
}},
},
},
expectedDeletedSeries: []map[uint64]struct{}{
nil,
},
expectedEmpty: []bool{
false,
},
},
{
name: "one big chunk partially deleted for yesterdays table without rewrite",
chunks: []chunk.Chunk{
createChunk(t, userID, labels.Labels{labels.Label{Name: "foo", Value: "1"}}, todaysTableInterval.Start.Add(-time.Hour), now),
},
expiry: []chunkExpiry{
{
isExpired: true,
nonDeletedIntervals: []model.Interval{{
Start: todaysTableInterval.Start,
End: now,
}},
},
},
expectedDeletedSeries: []map[uint64]struct{}{
nil, nil,
},
expectedEmpty: []bool{
true, false,
},
},
{
name: "one big chunk partially deleted for yesterdays table with rewrite",
chunks: []chunk.Chunk{
createChunk(t, userID, labels.Labels{labels.Label{Name: "foo", Value: "1"}}, todaysTableInterval.Start.Add(-time.Hour), now),
},
expiry: []chunkExpiry{
{
isExpired: true,
nonDeletedIntervals: []model.Interval{{
Start: todaysTableInterval.Start.Add(-30 * time.Minute),
End: now,
}},
},
},
expectedDeletedSeries: []map[uint64]struct{}{
nil, nil,
},
expectedEmpty: []bool{
false, false,
},
},
} {
t.Run(tc.name, func(t *testing.T) {
store := newTestStore(t)
require.NoError(t, store.Put(context.TODO(), tc.chunks))
chunksExpiry := map[string]chunkExpiry{}
for i, chunk := range tc.chunks {
chunksExpiry[chunk.ExternalKey()] = tc.expiry[i]
}
expirationChecker := newMockExpirationChecker(chunksExpiry)
store.Stop()
tables := store.indexTables()
require.Len(t, tables, len(tc.expectedDeletedSeries))
chunkClient := objectclient.NewClient(newTestObjectClient(store.chunkDir), objectclient.Base64Encoder)
for i, table := range tables {
seriesCleanRecorder := newSeriesCleanRecorder()
err := table.DB.Update(func(tx *bbolt.Tx) error {
it, err := newChunkIndexIterator(tx.Bucket(bucketName), schema.config)
require.NoError(t, err)
cr, err := newChunkRewriter(chunkClient, schema.config, table.name, tx.Bucket(bucketName))
require.NoError(t, err)
empty, err := markforDelete(context.Background(), table.name, noopWriter{}, it, seriesCleanRecorder,
expirationChecker, cr)
require.NoError(t, err)
require.Equal(t, tc.expectedEmpty[i], empty)
return nil
})
require.NoError(t, err)
require.EqualValues(t, tc.expectedDeletedSeries[i], seriesCleanRecorder.deletedSeries[userID])
}
})
}
}
func TestMarkForDelete_DropChunkFromIndex(t *testing.T) {
schema := allSchemas[2]
store := newTestStore(t)
now := model.Now()
todaysTableInterval := ExtractIntervalFromTableName(schema.config.IndexTables.TableFor(now))
retentionPeriod := now.Sub(todaysTableInterval.Start) / 2
// chunks in retention
c1 := createChunk(t, "1", labels.Labels{labels.Label{Name: "foo", Value: "1"}}, todaysTableInterval.Start, now)
c2 := createChunk(t, "1", labels.Labels{labels.Label{Name: "foo", Value: "2"}}, todaysTableInterval.Start.Add(-7*24*time.Hour), now)
// chunks out of retention
c3 := createChunk(t, "1", labels.Labels{labels.Label{Name: "foo", Value: "1"}}, todaysTableInterval.Start, now.Add(-retentionPeriod))
c4 := createChunk(t, "1", labels.Labels{labels.Label{Name: "foo", Value: "3"}}, todaysTableInterval.Start.Add(-12*time.Hour), todaysTableInterval.Start.Add(-10*time.Hour))
c5 := createChunk(t, "1", labels.Labels{labels.Label{Name: "foo", Value: "4"}}, todaysTableInterval.Start, now.Add(-retentionPeriod))
require.NoError(t, store.Put(context.TODO(), []chunk.Chunk{
c1, c2, c3, c4, c5,
}))
store.Stop()
tables := store.indexTables()
require.Len(t, tables, 8)
for i, table := range tables {
err := table.DB.Update(func(tx *bbolt.Tx) error {
it, err := newChunkIndexIterator(tx.Bucket(bucketName), schema.config)
require.NoError(t, err)
empty, err := markforDelete(context.Background(), table.name, noopWriter{}, it, noopCleaner{},
NewExpirationChecker(fakeLimits{perTenant: map[string]retentionLimit{"1": {retentionPeriod: retentionPeriod}}}), nil)
require.NoError(t, err)
if i == 7 {
require.False(t, empty)
} else {
require.True(t, empty, "table %s must be empty", table.name)
}
return nil
})
require.NoError(t, err)
require.NoError(t, table.Close())
}
store.open()
// verify the chunks which were not supposed to be deleted are still there
require.True(t, store.HasChunk(c1))
require.True(t, store.HasChunk(c2))
// verify the chunks which were supposed to be deleted are gone
require.False(t, store.HasChunk(c3))
require.False(t, store.HasChunk(c4))
require.False(t, store.HasChunk(c5))
}

@ -44,20 +44,42 @@ func (us *userSeries) Reset(seriesID []byte, userID []byte) {
us.seriesIDLen = len(seriesID)
}
type userSeriesMap map[string]userSeries
type userSeriesInfo struct {
userSeries
isDeleted bool
lbls labels.Labels
}
type userSeriesMap map[string]userSeriesInfo
func newUserSeriesMap() userSeriesMap {
return make(userSeriesMap)
}
func (u userSeriesMap) Add(seriesID []byte, userID []byte) {
func (u userSeriesMap) Add(seriesID []byte, userID []byte, lbls labels.Labels) {
us := newUserSeries(seriesID, userID)
if _, ok := u[us.Key()]; ok {
return
}
u[us.Key()] = userSeriesInfo{
userSeries: us,
isDeleted: true,
lbls: lbls,
}
}
// MarkSeriesNotDeleted is used to mark series not deleted when it still has some chunks left in the store
func (u userSeriesMap) MarkSeriesNotDeleted(seriesID []byte, userID []byte) {
us := newUserSeries(seriesID, userID)
u[us.Key()] = us
usi := u[us.Key()]
usi.isDeleted = false
u[us.Key()] = usi
}
func (u userSeriesMap) ForEach(callback func(seriesID []byte, userID []byte) error) error {
func (u userSeriesMap) ForEach(callback func(info userSeriesInfo) error) error {
for _, v := range u {
if err := callback(v.SeriesID(), v.UserID()); err != nil {
if err := callback(v); err != nil {
return err
}
}

@ -10,17 +10,17 @@ import (
func Test_UserSeries(t *testing.T) {
m := newUserSeriesMap()
m.Add([]byte(`series1`), []byte(`user1`))
m.Add([]byte(`series1`), []byte(`user1`))
m.Add([]byte(`series1`), []byte(`user2`))
m.Add([]byte(`series2`), []byte(`user1`))
m.Add([]byte(`series2`), []byte(`user1`))
m.Add([]byte(`series2`), []byte(`user2`))
m.Add([]byte(`series1`), []byte(`user1`), nil)
m.Add([]byte(`series1`), []byte(`user1`), nil)
m.Add([]byte(`series1`), []byte(`user2`), nil)
m.Add([]byte(`series2`), []byte(`user1`), nil)
m.Add([]byte(`series2`), []byte(`user1`), nil)
m.Add([]byte(`series2`), []byte(`user2`), nil)
keys := []string{}
err := m.ForEach(func(seriesID, userID []byte) error {
keys = append(keys, string(seriesID)+":"+string(userID))
err := m.ForEach(func(info userSeriesInfo) error {
keys = append(keys, string(info.SeriesID())+":"+string(info.UserID()))
return nil
})
require.NoError(t, err)

@ -4,8 +4,11 @@ import (
"fmt"
"io"
"os"
"reflect"
"strconv"
"time"
"unsafe"
"github.com/prometheus/common/model"
)
// unsafeGetString is like yolostring but with a meaningful name
@ -13,14 +16,6 @@ func unsafeGetString(buf []byte) string {
return *((*string)(unsafe.Pointer(&buf)))
}
func unsafeGetBytes(s string) []byte {
var buf []byte
p := unsafe.Pointer(&buf)
*(*string)(p) = s
(*reflect.SliceHeader)(p).Cap = len(s)
return buf
}
func copyFile(src, dst string) (int64, error) {
sourceFileStat, err := os.Stat(src)
if err != nil {
@ -45,3 +40,19 @@ func copyFile(src, dst string) (int64, error) {
nBytes, err := io.Copy(destination, source)
return nBytes, err
}
// ExtractIntervalFromTableName gives back the time interval for which the table is expected to hold the chunks index.
func ExtractIntervalFromTableName(tableName string) model.Interval {
interval := model.Interval{
Start: 0,
End: model.Now(),
}
tableNumber, err := strconv.ParseInt(tableName[len(tableName)-5:], 10, 64)
if err != nil {
return interval
}
interval.Start = model.TimeFromUnix(tableNumber * 86400)
interval.End = interval.Start.Add(24 * time.Hour)
return interval
}

@ -2,6 +2,7 @@ package retention
import (
"context"
"fmt"
"io/ioutil"
"path/filepath"
"testing"
@ -262,3 +263,40 @@ func newTestStore(t testing.TB) *testStore {
limits: limits,
}
}
func TestExtractIntervalFromTableName(t *testing.T) {
periodicTableConfig := chunk.PeriodicTableConfig{
Prefix: "dummy",
Period: 24 * time.Hour,
}
const millisecondsInDay = model.Time(24 * time.Hour / time.Millisecond)
calculateInterval := func(tm model.Time) (m model.Interval) {
m.Start = tm - tm%millisecondsInDay
m.End = m.Start + millisecondsInDay
return
}
for i, tc := range []struct {
tableName string
expectedInterval model.Interval
}{
{
tableName: periodicTableConfig.TableFor(model.Now()),
expectedInterval: calculateInterval(model.Now()),
},
{
tableName: periodicTableConfig.TableFor(model.Now().Add(-24 * time.Hour)),
expectedInterval: calculateInterval(model.Now().Add(-24 * time.Hour)),
},
{
tableName: periodicTableConfig.TableFor(model.Now().Add(-24 * time.Hour).Add(time.Minute)),
expectedInterval: calculateInterval(model.Now().Add(-24 * time.Hour).Add(time.Minute)),
},
} {
t.Run(fmt.Sprint(i), func(t *testing.T) {
require.Equal(t, tc.expectedInterval, ExtractIntervalFromTableName(tc.tableName))
})
}
}

Loading…
Cancel
Save