skip applying retention when data in table is still within retention period and there are no overlapping delete requests (#3934)

pull/3936/head
Sandeep Sukhani 4 years ago committed by GitHub
parent 43c16ed8a1
commit 7a7eeb0d63
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      docs/sources/operations/storage/retention.md
  2. 50
      pkg/distributor/validator_test.go
  3. 2
      pkg/loki/modules.go
  4. 30
      pkg/loki/runtime_config.go
  5. 2
      pkg/loki/runtime_config_test.go
  6. 8
      pkg/ruler/appender_test.go
  7. 49
      pkg/storage/stores/shipper/compactor/compactor.go
  8. 41
      pkg/storage/stores/shipper/compactor/compactor_test.go
  9. 20
      pkg/storage/stores/shipper/compactor/deletion/delete_requests_manager.go
  10. 44
      pkg/storage/stores/shipper/compactor/retention/expiration.go
  11. 173
      pkg/storage/stores/shipper/compactor/retention/expiration_test.go
  12. 43
      pkg/storage/stores/shipper/compactor/retention/retention_test.go
  13. 6
      pkg/storage/stores/shipper/compactor/table.go
  14. 8
      pkg/storage/stores/shipper/compactor/table_test.go
  15. 23
      pkg/validation/limits.go

@ -230,4 +230,4 @@ chunk_store_config:
table_manager:
retention_deletes_enabled: true
retention_period: 672h
```
```

@ -21,6 +21,16 @@ var (
testTime = time.Now()
)
type fakeLimits struct {
limits *validation.Limits
}
func (f fakeLimits) TenantLimits(userID string) *validation.Limits {
return f.limits
}
func (f fakeLimits) ForEachTenantLimit(validation.ForEachTenantLimitCallback) {}
func TestValidator_ValidateEntry(t *testing.T) {
tests := []struct {
name string
@ -39,11 +49,11 @@ func TestValidator_ValidateEntry(t *testing.T) {
{
"test too old",
"test",
func(userID string) *validation.Limits {
return &validation.Limits{
fakeLimits{
&validation.Limits{
RejectOldSamples: true,
RejectOldSamplesMaxAge: model.Duration(1 * time.Hour),
}
},
},
logproto.Entry{Timestamp: testTime.Add(-time.Hour * 5), Line: "test"},
httpgrpc.Errorf(http.StatusBadRequest, validation.GreaterThanMaxSampleAgeErrorMsg, testStreamLabels, testTime.Add(-time.Hour*5)),
@ -58,10 +68,10 @@ func TestValidator_ValidateEntry(t *testing.T) {
{
"line too long",
"test",
func(userID string) *validation.Limits {
return &validation.Limits{
fakeLimits{
&validation.Limits{
MaxLineSize: 10,
}
},
},
logproto.Entry{Timestamp: testTime, Line: "12345678901"},
httpgrpc.Errorf(http.StatusBadRequest, validation.LineTooLongErrorMsg, 10, testStreamLabels, 11),
@ -107,8 +117,8 @@ func TestValidator_ValidateLabels(t *testing.T) {
{
"test too many labels",
"test",
func(userID string) *validation.Limits {
return &validation.Limits{MaxLabelNamesPerSeries: 2}
fakeLimits{
&validation.Limits{MaxLabelNamesPerSeries: 2},
},
"{foo=\"bar\",food=\"bars\",fed=\"bears\"}",
httpgrpc.Errorf(http.StatusBadRequest, validation.MaxLabelNamesPerSeriesErrorMsg, "{foo=\"bar\",food=\"bars\",fed=\"bears\"}", 3, 2),
@ -116,11 +126,11 @@ func TestValidator_ValidateLabels(t *testing.T) {
{
"label name too long",
"test",
func(userID string) *validation.Limits {
return &validation.Limits{
fakeLimits{
&validation.Limits{
MaxLabelNamesPerSeries: 2,
MaxLabelNameLength: 5,
}
},
},
"{fooooo=\"bar\"}",
httpgrpc.Errorf(http.StatusBadRequest, validation.LabelNameTooLongErrorMsg, "{fooooo=\"bar\"}", "fooooo"),
@ -128,12 +138,12 @@ func TestValidator_ValidateLabels(t *testing.T) {
{
"label value too long",
"test",
func(userID string) *validation.Limits {
return &validation.Limits{
fakeLimits{
&validation.Limits{
MaxLabelNamesPerSeries: 2,
MaxLabelNameLength: 5,
MaxLabelValueLength: 5,
}
},
},
"{foo=\"barrrrrr\"}",
httpgrpc.Errorf(http.StatusBadRequest, validation.LabelValueTooLongErrorMsg, "{foo=\"barrrrrr\"}", "barrrrrr"),
@ -141,12 +151,12 @@ func TestValidator_ValidateLabels(t *testing.T) {
{
"duplicate label",
"test",
func(userID string) *validation.Limits {
return &validation.Limits{
fakeLimits{
&validation.Limits{
MaxLabelNamesPerSeries: 2,
MaxLabelNameLength: 5,
MaxLabelValueLength: 5,
}
},
},
"{foo=\"bar\", foo=\"barf\"}",
httpgrpc.Errorf(http.StatusBadRequest, validation.DuplicateLabelNamesErrorMsg, "{foo=\"bar\", foo=\"barf\"}", "foo"),
@ -154,12 +164,12 @@ func TestValidator_ValidateLabels(t *testing.T) {
{
"label value contains %",
"test",
func(userID string) *validation.Limits {
return &validation.Limits{
fakeLimits{
&validation.Limits{
MaxLabelNamesPerSeries: 2,
MaxLabelNameLength: 5,
MaxLabelValueLength: 5,
}
},
},
"{foo=\"bar\", foo=\"barf%s\"}",
httpgrpc.ErrorFromHTTPResponse(&httpgrpc.HTTPResponse{

@ -140,7 +140,7 @@ func (t *Loki) initRuntimeConfig() (services.Service, error) {
}
func (t *Loki) initOverrides() (_ services.Service, err error) {
t.overrides, err = validation.NewOverrides(t.Cfg.LimitsConfig, tenantLimitsFromRuntimeConfig(t.runtimeConfig))
t.overrides, err = validation.NewOverrides(t.Cfg.LimitsConfig, newtenantLimitsFromRuntimeConfig(t.runtimeConfig))
// overrides are not a service, since they don't have any operational state.
return nil, err
}

@ -45,20 +45,34 @@ func loadRuntimeConfig(r io.Reader) (interface{}, error) {
return overrides, nil
}
func tenantLimitsFromRuntimeConfig(c *runtimeconfig.Manager) validation.TenantLimits {
if c == nil {
type tenantLimitsFromRuntimeConfig struct {
c *runtimeconfig.Manager
}
func (t *tenantLimitsFromRuntimeConfig) TenantLimits(userID string) *validation.Limits {
cfg, ok := t.c.GetConfig().(*runtimeConfigValues)
if !ok || cfg == nil {
return nil
}
return func(userID string) *validation.Limits {
cfg, ok := c.GetConfig().(*runtimeConfigValues)
if !ok || cfg == nil {
return nil
}
return cfg.TenantLimits[userID]
return cfg.TenantLimits[userID]
}
func (t *tenantLimitsFromRuntimeConfig) ForEachTenantLimit(callback validation.ForEachTenantLimitCallback) {
cfg, ok := t.c.GetConfig().(*runtimeConfigValues)
if !ok || cfg == nil {
return
}
for userID, tenantLimit := range cfg.TenantLimits {
callback(userID, tenantLimit)
}
}
func newtenantLimitsFromRuntimeConfig(c *runtimeconfig.Manager) validation.TenantLimits {
return &tenantLimitsFromRuntimeConfig{c: c}
}
func tenantConfigFromRuntimeConfig(c *runtimeconfig.Manager) runtime.TenantConfig {
if c == nil {
return nil

@ -112,7 +112,7 @@ func newTestOverrides(t *testing.T, yaml string) *validation.Overrides {
require.NoError(t, runtimeConfig.AwaitTerminated(context.Background()))
}()
overrides, err := validation.NewOverrides(defaults, tenantLimitsFromRuntimeConfig(runtimeConfig))
overrides, err := validation.NewOverrides(defaults, newtenantLimitsFromRuntimeConfig(runtimeConfig))
require.NoError(t, err)
return overrides
}

@ -102,13 +102,7 @@ func TestQueueCapacityTenantOverride(t *testing.T) {
appendable := createBasicAppendable(queueCapacity)
overriddenCapacity := 999
overrides, err := validation.NewOverrides(validation.Limits{}, func(userID string) *validation.Limits {
return &validation.Limits{
RulerRemoteWriteQueueCapacity: overriddenCapacity,
}
})
require.Nil(t, err)
appendable.overrides = overrides
appendable.overrides = fakeLimits(overriddenCapacity)
appender := appendable.Appender(ctx).(*RemoteWriteAppender)
require.Equal(t, appender.queue.Capacity(), overriddenCapacity)

@ -6,6 +6,7 @@ import (
"flag"
"path/filepath"
"reflect"
"strconv"
"strings"
"sync"
"time"
@ -73,6 +74,7 @@ type Compactor struct {
deleteRequestsStore deletion.DeleteRequestsStore
DeleteRequestsHandler *deletion.DeleteRequestHandler
deleteRequestsManager *deletion.DeleteRequestsManager
expirationChecker retention.ExpirationChecker
metrics *metrics
}
@ -130,9 +132,9 @@ func (c *Compactor) init(storageConfig storage.Config, schemaConfig loki_storage
c.DeleteRequestsHandler = deletion.NewDeleteRequestHandler(c.deleteRequestsStore, time.Hour, r)
c.deleteRequestsManager = deletion.NewDeleteRequestsManager(c.deleteRequestsStore, c.cfg.DeleteRequestCancelPeriod, r)
expirationChecker := newExpirationChecker(retention.NewExpirationChecker(limits), c.deleteRequestsManager)
c.expirationChecker = newExpirationChecker(retention.NewExpirationChecker(limits), c.deleteRequestsManager)
c.tableMarker, err = retention.NewMarker(retentionWorkDir, schemaConfig, expirationChecker, chunkClient, r)
c.tableMarker, err = retention.NewMarker(retentionWorkDir, schemaConfig, c.expirationChecker, chunkClient, r)
if err != nil {
return err
}
@ -195,7 +197,8 @@ func (c *Compactor) CompactTable(ctx context.Context, tableName string) error {
return err
}
err = table.compact()
interval := extractIntervalFromTableName(tableName)
err = table.compact(c.expirationChecker.IntervalHasExpiredChunks(interval))
if err != nil {
level.Error(util_log.Logger).Log("msg", "failed to compact files", "table", tableName, "err", err)
return err
@ -208,14 +211,14 @@ func (c *Compactor) RunCompaction(ctx context.Context) error {
start := time.Now()
if c.cfg.RetentionEnabled {
c.deleteRequestsManager.MarkPhaseStarted()
c.expirationChecker.MarkPhaseStarted()
}
defer func() {
c.metrics.compactTablesOperationTotal.WithLabelValues(status).Inc()
dmCallback := c.deleteRequestsManager.MarkPhaseFailed
dmCallback := c.expirationChecker.MarkPhaseFailed
if status == statusSuccess {
dmCallback = c.deleteRequestsManager.MarkPhaseFinished
dmCallback = c.expirationChecker.MarkPhaseFinished
c.metrics.compactTablesOperationDurationSeconds.Set(time.Since(start).Seconds())
c.metrics.compactTablesOperationLastSuccess.SetToCurrentTime()
}
@ -271,3 +274,37 @@ func (e *expirationChecker) Expired(ref retention.ChunkEntry, now model.Time) (b
return e.deletionExpiryChecker.Expired(ref, now)
}
func (e *expirationChecker) MarkPhaseStarted() {
e.retentionExpiryChecker.MarkPhaseStarted()
e.deletionExpiryChecker.MarkPhaseStarted()
}
func (e *expirationChecker) MarkPhaseFailed() {
e.retentionExpiryChecker.MarkPhaseFailed()
e.deletionExpiryChecker.MarkPhaseFailed()
}
func (e *expirationChecker) MarkPhaseFinished() {
e.retentionExpiryChecker.MarkPhaseFinished()
e.deletionExpiryChecker.MarkPhaseFinished()
}
func (e *expirationChecker) IntervalHasExpiredChunks(interval model.Interval) bool {
return e.retentionExpiryChecker.IntervalHasExpiredChunks(interval) || e.deletionExpiryChecker.IntervalHasExpiredChunks(interval)
}
func extractIntervalFromTableName(tableName string) model.Interval {
interval := model.Interval{
Start: 0,
End: model.Now(),
}
tableNumber, err := strconv.ParseInt(tableName[len(tableName)-5:], 10, 64)
if err != nil {
return interval
}
interval.Start = model.TimeFromUnix(tableNumber * 86400)
interval.End = interval.Start.Add(24 * time.Hour)
return interval
}

@ -5,6 +5,9 @@ import (
"testing"
"time"
"github.com/cortexproject/cortex/pkg/chunk"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
)
@ -30,3 +33,41 @@ func TestIsDefaults(t *testing.T) {
})
}
}
func TestExtractIntervalFromTableName(t *testing.T) {
periodicTableConfig := chunk.PeriodicTableConfig{
Prefix: "dummy",
Period: 24 * time.Hour,
}
const millisecondsInDay = model.Time(24 * time.Hour / time.Millisecond)
calculateInterval := func(tm model.Time) (m model.Interval) {
m.Start = tm - tm%millisecondsInDay
m.End = m.Start + millisecondsInDay
return
}
for i, tc := range []struct {
tableName string
expectedInterval model.Interval
}{
{
tableName: periodicTableConfig.TableFor(model.Now()),
expectedInterval: calculateInterval(model.Now()),
},
{
tableName: periodicTableConfig.TableFor(model.Now().Add(-24 * time.Hour)),
expectedInterval: calculateInterval(model.Now().Add(-24 * time.Hour)),
},
{
tableName: periodicTableConfig.TableFor(model.Now().Add(-24 * time.Hour).Add(time.Minute)),
expectedInterval: calculateInterval(model.Now().Add(-24 * time.Hour).Add(time.Minute)),
},
} {
t.Run(fmt.Sprint(i), func(t *testing.T) {
require.Equal(t, tc.expectedInterval, extractIntervalFromTableName(tc.tableName))
})
}
}

@ -194,3 +194,23 @@ func (d *DeleteRequestsManager) MarkPhaseFinished() {
d.metrics.deleteRequestsProcessedTotal.WithLabelValues(deleteRequest.UserID).Inc()
}
}
func (d *DeleteRequestsManager) IntervalHasExpiredChunks(interval model.Interval) bool {
d.deleteRequestsToProcessMtx.Lock()
defer d.deleteRequestsToProcessMtx.Unlock()
if len(d.deleteRequestsToProcess) == 0 {
return false
}
for _, deleteRequest := range d.deleteRequestsToProcess {
if intervalsOverlap(interval, model.Interval{
Start: deleteRequest.StartTime,
End: deleteRequest.EndTime,
}) {
return true
}
}
return false
}

@ -11,15 +11,22 @@ import (
type ExpirationChecker interface {
Expired(ref ChunkEntry, now model.Time) (bool, []model.Interval)
IntervalHasExpiredChunks(interval model.Interval) bool
MarkPhaseStarted()
MarkPhaseFailed()
MarkPhaseFinished()
}
type expirationChecker struct {
tenantsRetention *TenantsRetention
tenantsRetention *TenantsRetention
earliestRetentionStartTime model.Time
}
type Limits interface {
RetentionPeriod(userID string) time.Duration
StreamRetention(userID string) []validation.StreamRetention
ForEachTenantLimit(validation.ForEachTenantLimitCallback)
DefaultLimits() *validation.Limits
}
func NewExpirationChecker(limits Limits) ExpirationChecker {
@ -35,6 +42,17 @@ func (e *expirationChecker) Expired(ref ChunkEntry, now model.Time) (bool, []mod
return now.Sub(ref.Through) > period, nil
}
func (e *expirationChecker) MarkPhaseStarted() {
e.earliestRetentionStartTime = model.Now().Add(-findHighestRetentionPeriod(e.tenantsRetention.limits))
}
func (e *expirationChecker) MarkPhaseFailed() {}
func (e *expirationChecker) MarkPhaseFinished() {}
func (e *expirationChecker) IntervalHasExpiredChunks(interval model.Interval) bool {
return e.earliestRetentionStartTime.Before(interval.Start) || e.earliestRetentionStartTime.Before(interval.End)
}
type TenantsRetention struct {
limits Limits
}
@ -78,3 +96,27 @@ Outer:
}
return globalRetention
}
func findHighestRetentionPeriod(limits Limits) time.Duration {
defaultLimits := limits.DefaultLimits()
highestRetentionPeriod := defaultLimits.RetentionPeriod
for _, streamRetention := range defaultLimits.StreamRetention {
if streamRetention.Period > highestRetentionPeriod {
highestRetentionPeriod = streamRetention.Period
}
}
limits.ForEachTenantLimit(func(userID string, limit *validation.Limits) {
if limit.RetentionPeriod > highestRetentionPeriod {
highestRetentionPeriod = limit.RetentionPeriod
}
for _, streamRetention := range limit.StreamRetention {
if streamRetention.Period > highestRetentionPeriod {
highestRetentionPeriod = streamRetention.Period
}
}
})
return time.Duration(highestRetentionPeriod)
}

@ -11,33 +11,57 @@ import (
"github.com/grafana/loki/pkg/validation"
)
type retentionLimit struct {
retentionPeriod time.Duration
streamRetention []validation.StreamRetention
}
func (r retentionLimit) convertToValidationLimit() *validation.Limits {
return &validation.Limits{
RetentionPeriod: model.Duration(r.retentionPeriod),
StreamRetention: r.streamRetention,
}
}
type fakeLimits struct {
perTenant map[string]time.Duration
perStream map[string][]validation.StreamRetention
defaultLimit retentionLimit
perTenant map[string]retentionLimit
}
func (f fakeLimits) RetentionPeriod(userID string) time.Duration {
return f.perTenant[userID]
return f.perTenant[userID].retentionPeriod
}
func (f fakeLimits) StreamRetention(userID string) []validation.StreamRetention {
return f.perStream[userID]
return f.perTenant[userID].streamRetention
}
func (f fakeLimits) ForEachTenantLimit(callback validation.ForEachTenantLimitCallback) {
for userID, limit := range f.perTenant {
callback(userID, limit.convertToValidationLimit())
}
}
func (f fakeLimits) DefaultLimits() *validation.Limits {
return f.defaultLimit.convertToValidationLimit()
}
func Test_expirationChecker_Expired(t *testing.T) {
e := NewExpirationChecker(&fakeLimits{
perTenant: map[string]time.Duration{
"1": time.Hour,
"2": 24 * time.Hour,
},
perStream: map[string][]validation.StreamRetention{
perTenant: map[string]retentionLimit{
"1": {
{Period: model.Duration(2 * time.Hour), Priority: 10, Matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}},
{Period: model.Duration(2 * time.Hour), Priority: 1, Matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "foo", "ba.+")}},
retentionPeriod: time.Hour,
streamRetention: []validation.StreamRetention{
{Period: model.Duration(2 * time.Hour), Priority: 10, Matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}},
{Period: model.Duration(2 * time.Hour), Priority: 1, Matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "foo", "ba.+")}},
},
},
"2": {
{Period: model.Duration(1 * time.Hour), Matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}},
{Period: model.Duration(2 * time.Hour), Matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "foo", "ba.")}},
retentionPeriod: 24 * time.Hour,
streamRetention: []validation.StreamRetention{
{Period: model.Duration(1 * time.Hour), Matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}},
{Period: model.Duration(2 * time.Hour), Matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "foo", "ba.")}},
},
},
},
})
@ -61,3 +85,126 @@ func Test_expirationChecker_Expired(t *testing.T) {
})
}
}
func TestFindEarliestRetentionStartTime(t *testing.T) {
const dayDuration = 24 * time.Hour
for _, tc := range []struct {
name string
limit fakeLimits
expectedEarliestRetentionStartTime time.Duration
}{
{
name: "only default retention set",
limit: fakeLimits{
defaultLimit: retentionLimit{
retentionPeriod: 7 * dayDuration,
},
},
expectedEarliestRetentionStartTime: 7 * dayDuration,
},
{
name: "default retention period highest",
limit: fakeLimits{
defaultLimit: retentionLimit{
retentionPeriod: 7 * dayDuration,
streamRetention: []validation.StreamRetention{
{
Period: model.Duration(dayDuration),
},
},
},
perTenant: map[string]retentionLimit{
"0": {retentionPeriod: 2 * dayDuration},
"1": {retentionPeriod: 5 * dayDuration},
},
},
expectedEarliestRetentionStartTime: 7 * dayDuration,
},
{
name: "default stream retention period highest",
limit: fakeLimits{
defaultLimit: retentionLimit{
retentionPeriod: 7 * dayDuration,
streamRetention: []validation.StreamRetention{
{
Period: model.Duration(10 * dayDuration),
},
},
},
perTenant: map[string]retentionLimit{
"0": {retentionPeriod: 2 * dayDuration},
"1": {retentionPeriod: 5 * dayDuration},
},
},
expectedEarliestRetentionStartTime: 10 * dayDuration,
},
{
name: "user retention retention period highest",
limit: fakeLimits{
defaultLimit: retentionLimit{
retentionPeriod: 7 * dayDuration,
streamRetention: []validation.StreamRetention{
{
Period: model.Duration(10 * dayDuration),
},
},
},
perTenant: map[string]retentionLimit{
"0": {
retentionPeriod: 20 * dayDuration,
streamRetention: []validation.StreamRetention{
{
Period: model.Duration(10 * dayDuration),
},
},
},
"1": {
retentionPeriod: 5 * dayDuration,
streamRetention: []validation.StreamRetention{
{
Period: model.Duration(15 * dayDuration),
},
},
},
},
},
expectedEarliestRetentionStartTime: 20 * dayDuration,
},
{
name: "user stream retention period highest",
limit: fakeLimits{
defaultLimit: retentionLimit{
retentionPeriod: 7 * dayDuration,
streamRetention: []validation.StreamRetention{
{
Period: model.Duration(10 * dayDuration),
},
},
},
perTenant: map[string]retentionLimit{
"0": {
retentionPeriod: 20 * dayDuration,
streamRetention: []validation.StreamRetention{
{
Period: model.Duration(10 * dayDuration),
},
},
},
"1": {
retentionPeriod: 5 * dayDuration,
streamRetention: []validation.StreamRetention{
{
Period: model.Duration(25 * dayDuration),
},
},
},
},
},
expectedEarliestRetentionStartTime: 25 * dayDuration,
},
} {
t.Run(tc.name, func(t *testing.T) {
require.Equal(t, tc.expectedEarliestRetentionStartTime, findHighestRetentionPeriod(tc.limit))
})
}
}

@ -4,6 +4,7 @@ import (
"context"
"crypto/sha256"
"encoding/base64"
"fmt"
"path/filepath"
"sort"
"strconv"
@ -29,14 +30,14 @@ import (
type mockChunkClient struct {
mtx sync.Mutex
deletedChunks []string
deletedChunks map[string]struct{}
}
func (m *mockChunkClient) DeleteChunk(_ context.Context, _, chunkID string) error {
m.mtx.Lock()
defer m.mtx.Unlock()
m.deletedChunks = append(m.deletedChunks, string([]byte(chunkID))) // forces a copy, because this string is only valid within the delete fn.
m.deletedChunks[string([]byte(chunkID))] = struct{}{} // forces a copy, because this string is only valid within the delete fn.
return nil
}
@ -44,7 +45,12 @@ func (m *mockChunkClient) getDeletedChunkIds() []string {
m.mtx.Lock()
defer m.mtx.Unlock()
return m.deletedChunks
chunkIDs := make([]string, 0, len(m.deletedChunks))
for chunkID := range m.deletedChunks {
chunkIDs = append(chunkIDs, chunkID)
}
return chunkIDs
}
func Test_Retention(t *testing.T) {
@ -58,11 +64,10 @@ func Test_Retention(t *testing.T) {
{
"nothing is expiring",
fakeLimits{
perTenant: map[string]time.Duration{
"1": 1000 * time.Hour,
"2": 1000 * time.Hour,
perTenant: map[string]retentionLimit{
"1": {retentionPeriod: 1000 * time.Hour},
"2": {retentionPeriod: 1000 * time.Hour},
},
perStream: map[string][]validation.StreamRetention{},
},
[]chunk.Chunk{
createChunk(t, "1", labels.Labels{labels.Label{Name: "foo", Value: "bar"}}, start, start.Add(1*time.Hour)),
@ -76,11 +81,10 @@ func Test_Retention(t *testing.T) {
{
"one global expiration",
fakeLimits{
perTenant: map[string]time.Duration{
"1": 10 * time.Hour,
"2": 1000 * time.Hour,
perTenant: map[string]retentionLimit{
"1": {retentionPeriod: 10 * time.Hour},
"2": {retentionPeriod: 1000 * time.Hour},
},
perStream: map[string][]validation.StreamRetention{},
},
[]chunk.Chunk{
createChunk(t, "1", labels.Labels{labels.Label{Name: "foo", Value: "bar"}}, start, start.Add(1*time.Hour)),
@ -94,14 +98,14 @@ func Test_Retention(t *testing.T) {
{
"one global expiration and stream",
fakeLimits{
perTenant: map[string]time.Duration{
"1": 10 * time.Hour,
"2": 1000 * time.Hour,
},
perStream: map[string][]validation.StreamRetention{
perTenant: map[string]retentionLimit{
"1": {
{Period: model.Duration(5 * time.Hour), Matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "buzz")}},
retentionPeriod: 10 * time.Hour,
streamRetention: []validation.StreamRetention{
{Period: model.Duration(5 * time.Hour), Matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "buzz")}},
},
},
"2": {retentionPeriod: 1000 * time.Hour},
},
},
[]chunk.Chunk{
@ -133,7 +137,7 @@ func Test_Retention(t *testing.T) {
// marks and sweep
expiration := NewExpirationChecker(tt.limits)
workDir := filepath.Join(t.TempDir(), "retention")
chunkClient := &mockChunkClient{}
chunkClient := &mockChunkClient{deletedChunks: map[string]struct{}{}}
sweep, err := NewSweeper(workDir, chunkClient, 10, 0, nil)
require.NoError(t, err)
sweep.Start()
@ -163,6 +167,7 @@ func Test_Retention(t *testing.T) {
require.Eventually(t, func() bool {
actual := chunkClient.getDeletedChunkIds()
sort.Strings(actual)
fmt.Println(expectDeleted, actual)
return assert.ObjectsAreEqual(expectDeleted, actual)
}, 10*time.Second, 1*time.Second)
}
@ -199,7 +204,7 @@ func Test_EmptyTable(t *testing.T) {
it, err := newChunkIndexIterator(tx.Bucket(bucketName), schema.config)
require.NoError(t, err)
empty, err := markforDelete(context.Background(), noopWriter{}, it, noopCleaner{},
NewExpirationChecker(&fakeLimits{perTenant: map[string]time.Duration{"1": 0, "2": 0}}), nil)
NewExpirationChecker(&fakeLimits{perTenant: map[string]retentionLimit{"1": {retentionPeriod: 0}, "2": {retentionPeriod: 0}}}), nil)
require.NoError(t, err)
require.True(t, empty)
return nil

@ -66,7 +66,7 @@ func newTable(ctx context.Context, workingDirectory string, objectClient chunk.O
return &table, nil
}
func (t *table) compact() error {
func (t *table) compact(tableHasExpiredStreams bool) error {
objects, err := util.ListDirectory(t.ctx, t.name, t.storageClient)
if err != nil {
return err
@ -81,7 +81,9 @@ func (t *table) compact() error {
}
}()
if !t.applyRetention {
applyRetention := t.applyRetention && tableHasExpiredStreams
if !applyRetention {
if len(objects) < compactMinDBs {
level.Info(util_log.Logger).Log("msg", fmt.Sprintf("skipping compaction since we have just %d files in storage", len(objects)))
return nil

@ -59,7 +59,7 @@ func TestTable_Compaction(t *testing.T) {
table, err := newTable(context.Background(), tableWorkingDirectory, objectClient, false, nil)
require.NoError(t, err)
require.NoError(t, table.compact())
require.NoError(t, table.compact(false))
// verify that we have only 1 file left in storage after compaction.
files, err := ioutil.ReadDir(tablePathInStorage)
@ -164,7 +164,7 @@ func TestTable_CompactionRetention(t *testing.T) {
table, err := newTable(context.Background(), tableWorkingDirectory, objectClient, true, tt.tableMarker)
require.NoError(t, err)
require.NoError(t, table.compact())
require.NoError(t, table.compact(true))
tt.assert(t, objectStoragePath, tableName)
})
}
@ -208,7 +208,7 @@ func TestTable_CompactionFailure(t *testing.T) {
require.NoError(t, err)
// compaction should fail due to a non-boltdb file.
require.Error(t, table.compact())
require.Error(t, table.compact(true))
// ensure that files in storage are intact.
files, err := ioutil.ReadDir(tablePathInStorage)
@ -223,7 +223,7 @@ func TestTable_CompactionFailure(t *testing.T) {
table, err = newTable(context.Background(), tableWorkingDirectory, objectClient, false, nil)
require.NoError(t, err)
require.NoError(t, table.compact())
require.NoError(t, table.compact(true))
// ensure that we have cleanup the local working directory after successful compaction.
require.NoFileExists(t, tableWorkingDirectory)

@ -184,9 +184,14 @@ func SetDefaultLimitsForYAMLUnmarshalling(defaults Limits) {
defaultLimits = &defaults
}
// TenantLimits is a function that returns limits for given tenant, or
// nil, if there are no tenant-specific limits.
type TenantLimits func(userID string) *Limits
type ForEachTenantLimitCallback func(userID string, limit *Limits)
type TenantLimits interface {
// TenantLimits is a function that returns limits for given tenant, or
// nil, if there are no tenant-specific limits.
TenantLimits(userID string) *Limits
ForEachTenantLimit(ForEachTenantLimitCallback)
}
// Overrides periodically fetch a set of per-user overrides, and provides convenience
// functions for fetching the correct value.
@ -366,14 +371,22 @@ func (o *Overrides) RetentionPeriod(userID string) time.Duration {
return time.Duration(o.getOverridesForUser(userID).RetentionPeriod)
}
// RetentionPeriod returns the retention period for a given user.
// StreamRetention returns the retention period for a given user.
func (o *Overrides) StreamRetention(userID string) []StreamRetention {
return o.getOverridesForUser(userID).StreamRetention
}
func (o *Overrides) ForEachTenantLimit(callback ForEachTenantLimitCallback) {
o.tenantLimits.ForEachTenantLimit(callback)
}
func (o *Overrides) DefaultLimits() *Limits {
return o.defaultLimits
}
func (o *Overrides) getOverridesForUser(userID string) *Limits {
if o.tenantLimits != nil {
l := o.tenantLimits(userID)
l := o.tenantLimits.TenantLimits(userID)
if l != nil {
return l
}

Loading…
Cancel
Save