Pass in the now value to the retention. (#3673)

This allows to call model.Time only once per table which is reasonable for accuracy and this will save some cpu cycle.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
pull/3687/head
Cyril Tovena 4 years ago committed by GitHub
parent aa544a9dff
commit 1995eaa426
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      pkg/storage/stores/shipper/compactor/retention/expiration.go
  2. 2
      pkg/storage/stores/shipper/compactor/retention/expiration_test.go
  3. 4
      pkg/storage/stores/shipper/compactor/retention/retention.go

@ -9,7 +9,7 @@ import (
)
type ExpirationChecker interface {
Expired(ref ChunkEntry) bool
Expired(ref ChunkEntry, now model.Time) bool
}
type expirationChecker struct {
@ -28,7 +28,7 @@ func NewExpirationChecker(limits Limits) ExpirationChecker {
}
// Expired tells if a ref chunk is expired based on retention rules.
func (e *expirationChecker) Expired(ref ChunkEntry) bool {
func (e *expirationChecker) Expired(ref ChunkEntry, now model.Time) bool {
userID := unsafeGetString(ref.UserID)
streamRetentions := e.limits.StreamRetention(userID)
globalRetention := e.limits.RetentionPeriod(userID)
@ -58,7 +58,7 @@ Outer:
matchedRule = streamRetention
}
if found {
return model.Now().Sub(ref.Through) > matchedRule.Period
return now.Sub(ref.Through) > matchedRule.Period
}
return model.Now().Sub(ref.Through) > globalRetention
return now.Sub(ref.Through) > globalRetention
}

@ -55,7 +55,7 @@ func Test_expirationChecker_Expired(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
require.Equal(t, tt.want, e.Expired(tt.ref))
require.Equal(t, tt.want, e.Expired(tt.ref, model.Now()))
})
}
}

@ -11,6 +11,7 @@ import (
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"go.etcd.io/bbolt"
"github.com/grafana/loki/pkg/storage"
@ -123,12 +124,13 @@ func (t *Marker) markTable(ctx context.Context, tableName string, db *bbolt.DB)
func markforDelete(ctx context.Context, marker MarkerStorageWriter, chunkIt ChunkEntryIterator, seriesCleaner SeriesCleaner, expiration ExpirationChecker) (bool, error) {
seriesMap := newUserSeriesMap()
empty := true
now := model.Now()
for chunkIt.Next() {
if chunkIt.Err() != nil {
return false, chunkIt.Err()
}
c := chunkIt.Entry()
if expiration.Expired(c) {
if expiration.Expired(c, now) {
seriesMap.Add(c.SeriesID, c.UserID)
if err := chunkIt.Delete(); err != nil {
return false, err

Loading…
Cancel
Save