deletion: fix log deletion with line filters (#8151)

pull/8337/head
Sandeep Sukhani 3 years ago committed by GitHub
parent ab9ef39cc6
commit b2d0481cbe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 10
      integration/client/client.go
  3. 71
      integration/cluster/cluster.go
  4. 186
      integration/loki_micro_services_delete_test.go
  5. 5
      pkg/chunkenc/facade.go
  6. 2
      pkg/chunkenc/memchunk.go
  7. 2
      pkg/chunkenc/memchunk_test.go
  8. 6
      pkg/storage/stores/indexshipper/compactor/compactor.go
  9. 135
      pkg/storage/stores/indexshipper/compactor/deletion/delete_request.go
  10. 171
      pkg/storage/stores/indexshipper/compactor/deletion/delete_request_test.go
  11. 59
      pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_manager.go
  12. 175
      pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_manager_test.go
  13. 6
      pkg/storage/stores/indexshipper/compactor/retention/expiration.go
  14. 136
      pkg/storage/stores/indexshipper/compactor/retention/retention.go
  15. 345
      pkg/storage/stores/indexshipper/compactor/retention/retention_test.go
  16. 4
      pkg/util/filter/filter_function.go

@ -41,6 +41,7 @@
* [8120](https://github.com/grafana/loki/pull/8120) **ashwanthgoli** fix panic on hitting /scheduler/ring when ring is disabled.
* [8251](https://github.com/grafana/loki/pull/8251) **sandeepsukhani** index-store: fix indexing of chunks overlapping multiple schemas.
* [8120](https://github.com/grafana/loki/pull/8232) **TaehyunHwang** Fix version info issue that shows wrong version.
* [8151](https://github.com/grafana/loki/pull/8151) **sandeepsukhani** fix log deletion with line filters.
##### Changes

@ -252,12 +252,10 @@ func (c *Client) AddDeleteRequest(params DeleteRequestParams) error {
type DeleteRequests []DeleteRequest
type DeleteRequest struct {
RequestID string `json:"request_id"`
StartTime float64 `json:"start_time"`
EndTime float64 `json:"end_time"`
Query string `json:"query"`
Status string `json:"status"`
CreatedAt float64 `json:"created_at"`
StartTime int64 `json:"start_time"`
EndTime int64 `json:"end_time"`
Query string `json:"query"`
Status string `json:"status"`
}
// GetDeleteRequest gets a delete request using the request ID

@ -9,6 +9,7 @@ import (
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"strings"
"sync"
"text/template"
@ -16,9 +17,11 @@ import (
"github.com/grafana/dskit/multierror"
"github.com/prometheus/client_golang/prometheus"
"gopkg.in/yaml.v2"
"github.com/grafana/loki/pkg/loki"
"github.com/grafana/loki/pkg/util/cfg"
"github.com/grafana/loki/pkg/validation"
)
var (
@ -152,9 +155,10 @@ func (w *wrappedRegisterer) MustRegister(collectors ...prometheus.Collector) {
}
type Cluster struct {
sharedPath string
components []*Component
waitGroup sync.WaitGroup
sharedPath string
overridesFile string
components []*Component
waitGroup sync.WaitGroup
}
func New() *Cluster {
@ -164,8 +168,16 @@ func New() *Cluster {
panic(err.Error())
}
overridesFile := filepath.Join(sharedPath, "loki-overrides.yaml")
err = os.WriteFile(filepath.Join(sharedPath, "loki-overrides.yaml"), []byte(`overrides:`), 0777)
if err != nil {
panic(fmt.Errorf("error creating overrides file: %w", err))
}
return &Cluster{
sharedPath: sharedPath,
sharedPath: sharedPath,
overridesFile: overridesFile,
}
}
@ -235,10 +247,11 @@ func (c *Cluster) stop(cleanupFiles bool) error {
func (c *Cluster) AddComponent(name string, flags ...string) *Component {
component := &Component{
name: name,
cluster: c,
flags: flags,
running: false,
name: name,
cluster: c,
flags: flags,
running: false,
overridesFile: c.overridesFile,
}
c.components = append(c.components, component)
@ -251,11 +264,12 @@ type Component struct {
cluster *Cluster
flags []string
configFile string
dataPath string
rulerWALPath string
rulesPath string
RulesTenant string
configFile string
overridesFile string
dataPath string
rulerWALPath string
rulesPath string
RulesTenant string
running bool
wg sync.WaitGroup
@ -352,6 +366,8 @@ func (c *Component) run() error {
c.flags,
"-config.file",
c.configFile,
"-limits.per-user-override-config",
c.overridesFile,
), flagset); err != nil {
return err
}
@ -440,6 +456,35 @@ func (c *Component) Restart() error {
return c.run()
}
type runtimeConfigValues struct {
TenantLimits map[string]*validation.Limits `yaml:"overrides"`
}
func (c *Component) SetTenantLimits(tenant string, limits validation.Limits) error {
rcv := runtimeConfigValues{}
rcv.TenantLimits = c.loki.TenantLimits.AllByUserID()
if rcv.TenantLimits == nil {
rcv.TenantLimits = map[string]*validation.Limits{}
}
rcv.TenantLimits[tenant] = &limits
config, err := yaml.Marshal(rcv)
if err != nil {
return err
}
return os.WriteFile(c.overridesFile, config, 0777)
}
func (c *Component) GetTenantLimits(tenant string) validation.Limits {
limits := c.loki.TenantLimits.TenantLimits(tenant)
if limits == nil {
return c.loki.Cfg.LimitsConfig
}
return *limits
}
func NewRemoteWriteServer(handler *http.HandlerFunc) *httptest.Server {
l, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {

@ -2,6 +2,8 @@ package integration
import (
"context"
"sort"
"strconv"
"testing"
"time"
@ -30,7 +32,8 @@ func TestMicroServicesDeleteRequest(t *testing.T) {
"-boltdb.shipper.compactor.retention-delete-delay=1s",
// By default, a minute is added to the delete request start time. This compensates for that.
"-boltdb.shipper.compactor.delete-request-cancel-period=-60s",
"-compactor.deletion-mode=filter-and-delete",
"-compactor.deletion-mode=filter-only",
"-limits.per-user-override-period=1s",
)
tDistributor = clu.AddComponent(
"distributor",
@ -45,6 +48,7 @@ func TestMicroServicesDeleteRequest(t *testing.T) {
"ingester",
"-target=ingester",
"-ingester.flush-on-shutdown=true",
"-ingester.wal-enabled=false",
)
tQueryScheduler = clu.AddComponent(
"query-scheduler",
@ -84,30 +88,92 @@ func TestMicroServicesDeleteRequest(t *testing.T) {
cliCompactor := client.New(tenantID, "", tCompactor.HTTPURL())
cliCompactor.Now = now
t.Run("ingest-logs-store", func(t *testing.T) {
// ingest some log lines
require.NoError(t, cliDistributor.PushLogLineWithTimestamp("lineA", now.Add(-45*time.Minute), map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLineWithTimestamp("lineB", now.Add(-45*time.Minute), map[string]string{"job": "fake"}))
})
var expectedStreams []client.StreamValues
for _, deletionType := range []string{"filter", "filter_no_match", "nothing", "partially_by_time", "whole"} {
expectedStreams = append(expectedStreams, client.StreamValues{
Stream: map[string]string{
"job": "fake",
"deletion_type": deletionType,
},
Values: [][]string{
{
strconv.FormatInt(now.Add(-45*time.Minute).UnixNano(), 10),
"lineA",
},
{
strconv.FormatInt(now.Add(-45*time.Minute).UnixNano(), 10),
"lineB",
},
{
strconv.FormatInt(now.Add(-time.Minute).UnixNano(), 10),
"lineC",
},
{
strconv.FormatInt(now.Add(-time.Minute).UnixNano(), 10),
"lineD",
},
},
})
}
t.Run("ingest-logs-ingester", func(t *testing.T) {
// ingest some log lines
require.NoError(t, cliDistributor.PushLogLine("lineC", map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLine("lineD", map[string]string{"job": "fake"}))
})
expectedDeleteRequests := []client.DeleteRequest{
{
StartTime: now.Add(-time.Hour).Unix(),
EndTime: now.Unix(),
Query: `{deletion_type="filter"} |= "lineB"`,
Status: "received",
},
{
StartTime: now.Add(-time.Hour).Unix(),
EndTime: now.Unix(),
Query: `{deletion_type="filter_no_match"} |= "foo"`,
Status: "received",
},
{
StartTime: now.Add(-time.Hour).Unix(),
EndTime: now.Add(-10 * time.Minute).Unix(),
Query: `{deletion_type="partially_by_time"}`,
Status: "received",
},
{
StartTime: now.Add(-time.Hour).Unix(),
EndTime: now.Unix(),
Query: `{deletion_type="whole"}`,
Status: "received",
},
}
t.Run("query", func(t *testing.T) {
resp, err := cliQueryFrontend.RunRangeQuery(context.Background(), `{job="fake"}`)
require.NoError(t, err)
validateQueryResponse := func(resp *client.Response) {
t.Helper()
assert.Equal(t, "streams", resp.Data.ResultType)
var lines []string
require.Len(t, resp.Data.Stream, len(expectedStreams))
sort.Slice(resp.Data.Stream, func(i, j int) bool {
return resp.Data.Stream[i].Stream["deletion_type"] < resp.Data.Stream[j].Stream["deletion_type"]
})
for _, stream := range resp.Data.Stream {
sort.Slice(stream.Values, func(i, j int) bool {
return stream.Values[i][1] < stream.Values[j][1]
})
}
require.Equal(t, expectedStreams, resp.Data.Stream)
}
t.Run("ingest-logs", func(t *testing.T) {
// ingest some log lines
for _, stream := range expectedStreams {
for _, val := range stream.Values {
lines = append(lines, val[1])
tsNs, err := strconv.ParseInt(val[0], 10, 64)
require.NoError(t, err)
require.NoError(t, cliDistributor.PushLogLineWithTimestamp(val[1], time.Unix(0, tsNs), stream.Stream))
}
}
assert.ElementsMatch(t, []string{"lineA", "lineB", "lineC", "lineD"}, lines)
})
t.Run("query", func(t *testing.T) {
resp, err := cliQueryFrontend.RunRangeQuery(context.Background(), `{job="fake"}`)
require.NoError(t, err)
validateQueryResponse(resp)
})
t.Run("flush-logs-and-restart-ingester-querier", func(t *testing.T) {
@ -118,7 +184,7 @@ func TestMicroServicesDeleteRequest(t *testing.T) {
cliIngester.Now = now
metrics, err := cliIngester.Metrics()
require.NoError(t, err)
checkMetricValue(t, "loki_ingester_chunks_flushed_total", metrics, 1)
checkMetricValue(t, "loki_ingester_chunks_flushed_total", metrics, 5)
// reset boltdb-shipper client and restart querier
storage.ResetBoltDBIndexClientWithShipper()
@ -129,66 +195,92 @@ func TestMicroServicesDeleteRequest(t *testing.T) {
t.Run("query again to verify logs being served from storage", func(t *testing.T) {
resp, err := cliQueryFrontend.RunRangeQuery(context.Background(), `{job="fake"}`)
require.NoError(t, err)
assert.Equal(t, "streams", resp.Data.ResultType)
validateQueryResponse(resp)
})
var lines []string
for _, stream := range resp.Data.Stream {
for _, val := range stream.Values {
lines = append(lines, val[1])
t.Run("add-delete-requests", func(t *testing.T) {
for _, deleteRequest := range expectedDeleteRequests {
params := client.DeleteRequestParams{
Start: strconv.FormatInt(deleteRequest.StartTime, 10),
End: strconv.FormatInt(deleteRequest.EndTime, 10),
Query: deleteRequest.Query,
}
require.NoError(t, cliCompactor.AddDeleteRequest(params))
}
assert.ElementsMatch(t, []string{"lineA", "lineB", "lineC", "lineD"}, lines)
})
t.Run("add-delete-request", func(t *testing.T) {
params := client.DeleteRequestParams{Start: "0000000000", Query: `{job="fake"} |= "lineB"`}
require.NoError(t, cliCompactor.AddDeleteRequest(params))
})
t.Run("read-delete-request", func(t *testing.T) {
deleteRequests, err := cliCompactor.GetDeleteRequests()
require.NoError(t, err)
require.NotEmpty(t, deleteRequests)
require.Len(t, deleteRequests, 1)
require.Equal(t, `{job="fake"} |= "lineB"`, deleteRequests[0].Query)
require.Equal(t, "received", deleteRequests[0].Status)
require.Equal(t, client.DeleteRequests(expectedDeleteRequests), deleteRequests)
})
// Query lines
t.Run("verify query time filtering", func(t *testing.T) {
// reset boltdb-shipper client and restart querier
storage.ResetBoltDBIndexClientWithShipper()
require.NoError(t, tQuerier.Restart())
// update expectedStreams as per the issued requests
expectedStreams[0].Values = append(expectedStreams[0].Values[:1], expectedStreams[0].Values[2:]...)
expectedStreams[3].Values = expectedStreams[3].Values[2:]
expectedStreams = expectedStreams[:4]
// query and verify that we get the resp which matches expectedStreams
resp, err := cliQueryFrontend.RunRangeQuery(context.Background(), `{job="fake"}`)
require.NoError(t, err)
validateQueryResponse(resp)
})
// Wait until delete request is finished
t.Run("wait-until-delete-request-processed", func(t *testing.T) {
tenantLimits := tCompactor.GetTenantLimits(tenantID)
tenantLimits.DeletionMode = "filter-and-delete"
require.NoError(t, tCompactor.SetTenantLimits(tenantID, tenantLimits))
// all the delete requests should have been processed
for i := range expectedDeleteRequests {
expectedDeleteRequests[i].Status = "processed"
}
require.Eventually(t, func() bool {
deleteRequests, err := cliCompactor.GetDeleteRequests()
require.NoError(t, err)
require.Len(t, deleteRequests, 1)
return deleteRequests[0].Status == "processed"
require.Len(t, deleteRequests, len(expectedDeleteRequests))
for i := range deleteRequests {
if deleteRequests[i] != expectedDeleteRequests[i] {
return false
}
}
return true
}, 10*time.Second, 1*time.Second)
// Check metrics
metrics, err := cliCompactor.Metrics()
require.NoError(t, err)
checkUserLabelAndMetricValue(t, "loki_compactor_delete_requests_processed_total", metrics, tenantID, 1)
checkUserLabelAndMetricValue(t, "loki_compactor_delete_requests_processed_total", metrics, tenantID, float64(len(expectedDeleteRequests)))
checkUserLabelAndMetricValue(t, "loki_compactor_deleted_lines", metrics, tenantID, 1)
})
// Query lines
t.Run("query", func(t *testing.T) {
t.Run("query-without-query-time-filtering", func(t *testing.T) {
// disable deletion for tenant to stop query time filtering of data requested for deletion
tenantLimits := tQuerier.GetTenantLimits(tenantID)
tenantLimits.DeletionMode = "disabled"
require.NoError(t, tQuerier.SetTenantLimits(tenantID, tenantLimits))
// restart querier to make it sync the index
storage.ResetBoltDBIndexClientWithShipper()
require.NoError(t, tQuerier.Restart())
// ensure the deletion-mode limit is updated
require.Equal(t, "disabled", tQuerier.GetTenantLimits(tenantID).DeletionMode)
resp, err := cliQueryFrontend.RunRangeQuery(context.Background(), `{job="fake"}`)
require.NoError(t, err)
assert.Equal(t, "streams", resp.Data.ResultType)
var lines []string
for _, stream := range resp.Data.Stream {
for _, val := range stream.Values {
lines = append(lines, val[1])
}
}
assert.ElementsMatch(t, []string{"lineA", "lineC", "lineD"}, lines, "lineB should not be there")
validateQueryResponse(resp)
})
}

@ -2,6 +2,7 @@ package chunkenc
import (
"io"
"time"
"github.com/prometheus/common/model"
@ -42,6 +43,10 @@ func NewFacade(c Chunk, blockSize, targetSize int) chunk.Data {
}
}
func (f Facade) Bounds() (time.Time, time.Time) {
return f.c.Bounds()
}
// Marshal implements chunk.Chunk.
func (f Facade) Marshal(w io.Writer) error {
if f.c == nil {

@ -931,7 +931,7 @@ func (c *MemChunk) Rebound(start, end time.Time, filter filter.Func) (Chunk, err
for itr.Next() {
entry := itr.Entry()
if filter != nil && filter(entry.Line) {
if filter != nil && filter(entry.Timestamp, entry.Line) {
continue
}
if err := newChunk.Append(&entry); err != nil {

@ -1253,7 +1253,7 @@ func TestMemChunk_ReboundAndFilter_with_filter(t *testing.T) {
chkThrough := chkFrom.Add(10 * time.Second)
chkThroughPlus1 := chkThrough.Add(1 * time.Second)
filterFunc := func(in string) bool {
filterFunc := func(_ time.Time, in string) bool {
return strings.HasPrefix(in, "matching")
}

@ -10,8 +10,6 @@ import (
"sync"
"time"
"github.com/grafana/loki/pkg/validation"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/kv"
@ -30,7 +28,9 @@ import (
shipper_storage "github.com/grafana/loki/pkg/storage/stores/indexshipper/storage"
"github.com/grafana/loki/pkg/usagestats"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/filter"
util_log "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/validation"
)
// Here is how the generic compactor works:
@ -652,7 +652,7 @@ func newExpirationChecker(retentionExpiryChecker, deletionExpiryChecker retentio
return &expirationChecker{retentionExpiryChecker, deletionExpiryChecker}
}
func (e *expirationChecker) Expired(ref retention.ChunkEntry, now model.Time) (bool, []retention.IntervalFilter) {
func (e *expirationChecker) Expired(ref retention.ChunkEntry, now model.Time) (bool, filter.Func) {
if expired, nonDeletedIntervals := e.retentionExpiryChecker.Expired(ref, now); expired {
return expired, nonDeletedIntervals
}

@ -1,6 +1,8 @@
package deletion
import (
"time"
"github.com/go-kit/log/level"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
@ -11,6 +13,10 @@ import (
util_log "github.com/grafana/loki/pkg/util/log"
)
type timeInterval struct {
start, end time.Time
}
type DeleteRequest struct {
RequestID string `json:"request_id"`
StartTime model.Time `json:"start_time"`
@ -23,6 +29,7 @@ type DeleteRequest struct {
SequenceNum int64 `json:"-"`
matchers []*labels.Matcher `json:"-"`
logSelectorExpr syntax.LogSelectorExpr `json:"-"`
timeInterval *timeInterval `json:"-"`
Metrics *deleteRequestsManagerMetrics `json:"-"`
DeletedLines int32 `json:"-"`
@ -39,19 +46,32 @@ func (d *DeleteRequest) SetQuery(logQL string) error {
return nil
}
// FilterFunction returns a filter function that returns true if the given line matches
// Note: FilterFunction can be nil when the delete request does not have a line filter.
// FilterFunction returns a filter function that returns true if the given line should be deleted based on the DeleteRequest
func (d *DeleteRequest) FilterFunction(labels labels.Labels) (filter.Func, error) {
if d.logSelectorExpr == nil {
err := d.SetQuery(d.Query)
if err != nil {
return nil, err
// init d.timeInterval used to efficiently check log ts is within the bounds of delete request below in filter func
// without having to do conversion of timestamps for each log line we check.
if d.timeInterval == nil {
d.timeInterval = &timeInterval{
start: d.StartTime.Time(),
end: d.EndTime.Time(),
}
}
// return a filter func if the delete request has a line filter
if !allMatch(d.matchers, labels) {
return func(_ time.Time, s string) bool {
return false
}, nil
}
// if delete request doesn't have a line filter, just do time based filtering
if !d.logSelectorExpr.HasFilter() {
return nil, nil
return func(ts time.Time, s string) bool {
if ts.Before(d.timeInterval.start) || ts.After(d.timeInterval.end) {
return false
}
return true
}, nil
}
p, err := d.logSelectorExpr.Pipeline()
@ -59,14 +79,12 @@ func (d *DeleteRequest) FilterFunction(labels labels.Labels) (filter.Func, error
return nil, err
}
if !allMatch(d.matchers, labels) {
return func(s string) bool {
f := p.ForStream(labels).ProcessString
return func(ts time.Time, s string) bool {
if ts.Before(d.timeInterval.start) || ts.After(d.timeInterval.end) {
return false
}, nil
}
}
f := p.ForStream(labels).ProcessString
return func(s string) bool {
result, _, skip := f(0, s)
if len(result) != 0 || skip {
d.Metrics.deletedLinesTotal.WithLabelValues(d.UserID).Inc()
@ -87,8 +105,9 @@ func allMatch(matchers []*labels.Matcher, labels labels.Labels) bool {
}
// IsDeleted checks if the given ChunkEntry will be deleted by this DeleteRequest.
// It also returns the intervals of the ChunkEntry that will remain before filtering.
func (d *DeleteRequest) IsDeleted(entry retention.ChunkEntry) (bool, []retention.IntervalFilter) {
// It returns a filter.Func if the chunk is supposed to be deleted partially or the delete request contains line filters.
// If the filter.Func is nil, the whole chunk is supposed to be deleted.
func (d *DeleteRequest) IsDeleted(entry retention.ChunkEntry) (bool, filter.Func) {
if d.UserID != unsafeGetString(entry.UserID) {
return false, nil
}
@ -107,6 +126,24 @@ func (d *DeleteRequest) IsDeleted(entry retention.ChunkEntry) (bool, []retention
return false, nil
}
if d.logSelectorExpr == nil {
err := d.SetQuery(d.Query)
if err != nil {
level.Error(util_log.Logger).Log(
"msg", "failed to init log selector expr",
"delete_request_id", d.RequestID,
"user", d.UserID,
"err", err,
)
return false, nil
}
}
if d.StartTime <= entry.From && d.EndTime >= entry.Through && !d.logSelectorExpr.HasFilter() {
// Delete request covers the whole chunk and there are no line filters in the logSelectorExpr so the whole chunk will be deleted
return true, nil
}
ff, err := d.FilterFunction(entry.Labels)
if err != nil {
// The query in the delete request is checked when added to the table.
@ -120,71 +157,7 @@ func (d *DeleteRequest) IsDeleted(entry retention.ChunkEntry) (bool, []retention
return false, nil
}
if d.StartTime <= entry.From && d.EndTime >= entry.Through {
// if the logSelectorExpr has a filter part return the chunk boundaries as intervals
if d.logSelectorExpr.HasFilter() {
return true, []retention.IntervalFilter{
{
Interval: model.Interval{
Start: entry.From,
End: entry.Through,
},
Filter: ff,
},
}
}
// No filter in the logSelectorExpr so the whole chunk will be deleted
return true, nil
}
intervals := make([]retention.IntervalFilter, 0, 2)
// chunk partially deleted from the end
if d.StartTime > entry.From {
// Add the deleted part with Filter func
if ff != nil {
intervals = append(intervals, retention.IntervalFilter{
Interval: model.Interval{
Start: d.StartTime,
End: entry.Through,
},
Filter: ff,
})
}
// Add non-deleted part without Filter func
intervals = append(intervals, retention.IntervalFilter{
Interval: model.Interval{
Start: entry.From,
End: d.StartTime - 1,
},
})
}
// chunk partially deleted from the beginning
if d.EndTime < entry.Through {
// Add the deleted part with Filter func
if ff != nil {
intervals = append(intervals, retention.IntervalFilter{
Interval: model.Interval{
Start: entry.From,
End: d.EndTime,
},
Filter: ff,
})
}
// Add non-deleted part without Filter func
intervals = append(intervals, retention.IntervalFilter{
Interval: model.Interval{
Start: d.EndTime + 1,
End: entry.Through,
},
})
}
return true, intervals
return true, ff
}
func intervalsOverlap(interval1, interval2 model.Interval) bool {

@ -1,6 +1,8 @@
package deletion
import (
"math"
"strings"
"testing"
"time"
@ -21,9 +23,6 @@ func TestDeleteRequest_IsDeleted(t *testing.T) {
lbl := `{foo="bar", fizz="buzz"}`
lblWithFilter := `{foo="bar", fizz="buzz"} |= "filter"`
var dummyFilterFunc filter.Func = func(s string) bool {
return false
}
chunkEntry := retention.ChunkEntry{
ChunkRef: retention.ChunkRef{
@ -35,8 +34,8 @@ func TestDeleteRequest_IsDeleted(t *testing.T) {
}
type resp struct {
isDeleted bool
nonDeletedIntervals []retention.IntervalFilter
isDeleted bool
expectedFilter filter.Func
}
for _, tc := range []struct {
@ -53,8 +52,7 @@ func TestDeleteRequest_IsDeleted(t *testing.T) {
Query: lbl,
},
expectedResp: resp{
isDeleted: true,
nonDeletedIntervals: nil,
isDeleted: true,
},
},
{
@ -67,14 +65,12 @@ func TestDeleteRequest_IsDeleted(t *testing.T) {
},
expectedResp: resp{
isDeleted: true,
nonDeletedIntervals: []retention.IntervalFilter{
{
Interval: model.Interval{
Start: now.Add(-3 * time.Hour),
End: now.Add(-time.Hour),
},
Filter: dummyFilterFunc,
},
expectedFilter: func(ts time.Time, s string) bool {
tsUnixNano := ts.UnixNano()
if strings.Contains(s, "filter") && now.Add(-3*time.Hour).UnixNano() <= tsUnixNano && tsUnixNano <= now.Add(-time.Hour).UnixNano() {
return true
}
return false
},
},
},
@ -88,33 +84,12 @@ func TestDeleteRequest_IsDeleted(t *testing.T) {
},
expectedResp: resp{
isDeleted: true,
nonDeletedIntervals: []retention.IntervalFilter{
{
Interval: model.Interval{
Start: now.Add(-2*time.Hour) + 1,
End: now.Add(-time.Hour),
},
},
},
},
},
{
name: "chunk deleted from end",
deleteRequest: DeleteRequest{
UserID: user1,
StartTime: now.Add(-2 * time.Hour),
EndTime: now,
Query: lbl,
},
expectedResp: resp{
isDeleted: true,
nonDeletedIntervals: []retention.IntervalFilter{
{
Interval: model.Interval{
Start: now.Add(-3 * time.Hour),
End: now.Add(-2*time.Hour) - 1,
},
},
expectedFilter: func(ts time.Time, s string) bool {
tsUnixNano := ts.UnixNano()
if now.Add(-3*time.Hour).UnixNano() <= tsUnixNano && tsUnixNano <= now.Add(-2*time.Hour).UnixNano() {
return true
}
return false
},
},
},
@ -128,13 +103,12 @@ func TestDeleteRequest_IsDeleted(t *testing.T) {
},
expectedResp: resp{
isDeleted: true,
nonDeletedIntervals: []retention.IntervalFilter{
{
Interval: model.Interval{
Start: now.Add(-3 * time.Hour),
End: now.Add(-2*time.Hour) - 1,
},
},
expectedFilter: func(ts time.Time, s string) bool {
tsUnixNano := ts.UnixNano()
if now.Add(-2*time.Hour).UnixNano() <= tsUnixNano && tsUnixNano <= now.UnixNano() {
return true
}
return false
},
},
},
@ -148,20 +122,12 @@ func TestDeleteRequest_IsDeleted(t *testing.T) {
},
expectedResp: resp{
isDeleted: true,
nonDeletedIntervals: []retention.IntervalFilter{
{
Interval: model.Interval{
Start: now.Add(-2 * time.Hour),
End: now.Add(-time.Hour),
},
Filter: dummyFilterFunc,
},
{
Interval: model.Interval{
Start: now.Add(-3 * time.Hour),
End: now.Add(-2*time.Hour) - 1,
},
},
expectedFilter: func(ts time.Time, s string) bool {
tsUnixNano := ts.UnixNano()
if strings.Contains(s, "filter") && now.Add(-2*time.Hour).UnixNano() <= tsUnixNano && tsUnixNano <= now.UnixNano() {
return true
}
return false
},
},
},
@ -175,19 +141,12 @@ func TestDeleteRequest_IsDeleted(t *testing.T) {
},
expectedResp: resp{
isDeleted: true,
nonDeletedIntervals: []retention.IntervalFilter{
{
Interval: model.Interval{
Start: now.Add(-3 * time.Hour),
End: now.Add(-(2*time.Hour + 30*time.Minute)) - 1,
},
},
{
Interval: model.Interval{
Start: now.Add(-(time.Hour + 30*time.Minute)) + 1,
End: now.Add(-time.Hour),
},
},
expectedFilter: func(ts time.Time, s string) bool {
tsUnixNano := ts.UnixNano()
if now.Add(-(2*time.Hour+30*time.Minute)).UnixNano() <= tsUnixNano && tsUnixNano <= now.Add(-(time.Hour+30*time.Minute)).UnixNano() {
return true
}
return false
},
},
},
@ -230,23 +189,21 @@ func TestDeleteRequest_IsDeleted(t *testing.T) {
} {
t.Run(tc.name, func(t *testing.T) {
require.NoError(t, tc.deleteRequest.SetQuery(tc.deleteRequest.Query))
isDeleted, nonDeletedIntervals := tc.deleteRequest.IsDeleted(chunkEntry)
require.Equal(t, tc.expectedResp.isDeleted, isDeleted)
require.Len(t, nonDeletedIntervals, len(tc.expectedResp.nonDeletedIntervals))
for idx := range tc.expectedResp.nonDeletedIntervals {
require.Equal(t,
tc.expectedResp.nonDeletedIntervals[idx].Interval.Start,
nonDeletedIntervals[idx].Interval.Start,
)
require.Equal(t,
tc.expectedResp.nonDeletedIntervals[idx].Interval.End,
nonDeletedIntervals[idx].Interval.End,
)
if tc.expectedResp.nonDeletedIntervals[idx].Filter != nil {
require.NotNil(t, nonDeletedIntervals[idx].Filter)
} else {
require.Nil(t, nonDeletedIntervals[idx].Filter)
tc.deleteRequest.Metrics = newDeleteRequestsManagerMetrics(nil)
isExpired, filterFunc := tc.deleteRequest.IsDeleted(chunkEntry)
require.Equal(t, tc.expectedResp.isDeleted, isExpired)
if tc.expectedResp.expectedFilter == nil {
require.Nil(t, filterFunc)
return
}
require.NotNil(t, filterFunc)
for start := chunkEntry.From; start <= chunkEntry.Through; start = start.Add(time.Minute) {
line := "foo bar"
if start.Time().Minute()%2 == 1 {
line = "filter bar"
}
require.Equal(t, tc.expectedResp.expectedFilter(start.Time(), line), filterFunc(start.Time(), line), "line", line, "time", start.Time(), "now", now.Time())
}
})
}
@ -267,17 +224,20 @@ func TestDeleteRequest_FilterFunction(t *testing.T) {
Query: `{foo="bar"} |= "some"`,
DeletedLines: 0,
Metrics: newDeleteRequestsManagerMetrics(prometheus.NewPedanticRegistry()),
StartTime: 0,
EndTime: math.MaxInt64,
}
lblStr := `{foo="bar"}`
lbls := mustParseLabel(lblStr)
require.NoError(t, dr.SetQuery(dr.Query))
f, err := dr.FilterFunction(lbls)
require.NoError(t, err)
require.True(t, f(`some line`))
require.False(t, f(""))
require.False(t, f("other line"))
require.True(t, f(time.Now(), `some line`))
require.False(t, f(time.Now(), ""))
require.False(t, f(time.Now(), "other line"))
require.Equal(t, int32(1), dr.DeletedLines)
require.Equal(t, float64(1), testutil.ToFloat64(dr.Metrics.deletedLinesTotal))
})
@ -293,29 +253,42 @@ func TestDeleteRequest_FilterFunction(t *testing.T) {
lblStr := `{foo2="buzz"}`
lbls := mustParseLabel(lblStr)
require.NoError(t, dr.SetQuery(dr.Query))
f, err := dr.FilterFunction(lbls)
require.NoError(t, err)
require.False(t, f(""))
require.False(t, f("other line"))
require.False(t, f("some line"))
require.False(t, f(time.Time{}, ""))
require.False(t, f(time.Time{}, "other line"))
require.False(t, f(time.Time{}, "some line"))
require.Equal(t, int32(0), dr.DeletedLines)
// testutil.ToFloat64 panics when there are 0 metrics
require.Panics(t, func() { testutil.ToFloat64(dr.Metrics.deletedLinesTotal) })
})
t.Run("no_line_filter", func(t *testing.T) {
now := model.Now()
dr := DeleteRequest{
Query: `{namespace="default"}`,
DeletedLines: 0,
Metrics: newDeleteRequestsManagerMetrics(prometheus.NewPedanticRegistry()),
StartTime: now.Add(-time.Hour),
EndTime: now,
}
lblStr := `{namespace="default"}`
lbls := mustParseLabel(lblStr)
require.NoError(t, dr.SetQuery(dr.Query))
f, err := dr.FilterFunction(lbls)
require.NoError(t, err)
require.Nil(t, f)
require.NotNil(t, f)
require.True(t, f(now.Time(), `some line`))
require.False(t, f(now.Time().Add(-2*time.Hour), `some line`))
require.True(t, f(now.Time(), "other line"))
require.Equal(t, int32(0), dr.DeletedLines)
// testutil.ToFloat64 panics when there are 0 metrics
require.Panics(t, func() { testutil.ToFloat64(dr.Metrics.deletedLinesTotal) })
})
}

@ -7,13 +7,13 @@ import (
"sync"
"time"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/deletionmode"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/deletionmode"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/retention"
"github.com/grafana/loki/pkg/util/filter"
util_log "github.com/grafana/loki/pkg/util/log"
)
@ -32,10 +32,7 @@ type DeleteRequestsManager struct {
deleteRequestsStore DeleteRequestsStore
deleteRequestCancelPeriod time.Duration
deleteRequestsToProcess map[string]*userDeleteRequests
chunkIntervalsToRetain []retention.IntervalFilter
// WARN: If by any chance we change deleteRequestsToProcessMtx to sync.RWMutex to be able to check multiple chunks at a time,
// please take care of chunkIntervalsToRetain which should be unique per chunk.
deleteRequestsToProcess map[string]*userDeleteRequests
deleteRequestsToProcessMtx sync.Mutex
metrics *deleteRequestsManagerMetrics
wg sync.WaitGroup
@ -232,7 +229,7 @@ func (d *DeleteRequestsManager) shouldProcessRequest(dr DeleteRequest) (bool, er
return mode == deletionmode.FilterAndDelete, nil
}
func (d *DeleteRequestsManager) Expired(ref retention.ChunkEntry, _ model.Time) (bool, []retention.IntervalFilter) {
func (d *DeleteRequestsManager) Expired(ref retention.ChunkEntry, _ model.Time) (bool, filter.Func) {
d.deleteRequestsToProcessMtx.Lock()
defer d.deleteRequestsToProcessMtx.Unlock()
@ -244,40 +241,15 @@ func (d *DeleteRequestsManager) Expired(ref retention.ChunkEntry, _ model.Time)
return false, nil
}
isExpired := false
d.chunkIntervalsToRetain = d.chunkIntervalsToRetain[:0]
d.chunkIntervalsToRetain = append(d.chunkIntervalsToRetain, retention.IntervalFilter{
Interval: model.Interval{
Start: ref.From,
End: ref.Through,
},
})
var filterFuncs []filter.Func
for _, deleteRequest := range d.deleteRequestsToProcess[userIDStr].requests {
rebuiltIntervals := make([]retention.IntervalFilter, 0, len(d.chunkIntervalsToRetain))
for _, ivf := range d.chunkIntervalsToRetain {
if ivf.Filter != nil {
// This can happen when there are multiple delete requests touching the same chunk.
// It likely can have different line filters or no line filters at all.
// To keep things simple, let us not consider chunks which are already being considered for deletion with line filter.
// ToDo(Sandeep): See if we can efficiently consider multiple delete requests touching same chunk.
rebuiltIntervals = append(rebuiltIntervals, ivf)
continue
}
entry := ref
entry.From = ivf.Interval.Start
entry.Through = ivf.Interval.End
isDeleted, newIntervalsToRetain := deleteRequest.IsDeleted(entry)
if !isDeleted {
rebuiltIntervals = append(rebuiltIntervals, ivf)
} else {
isExpired = true
rebuiltIntervals = append(rebuiltIntervals, newIntervalsToRetain...)
}
isDeleted, ff := deleteRequest.IsDeleted(ref)
if !isDeleted {
continue
}
d.chunkIntervalsToRetain = rebuiltIntervals
if isExpired && len(d.chunkIntervalsToRetain) == 0 {
if ff == nil {
level.Info(util_log.Logger).Log(
"msg", "no chunks to retain: the whole chunk is deleted",
"delete_request_id", deleteRequest.RequestID,
@ -288,14 +260,23 @@ func (d *DeleteRequestsManager) Expired(ref retention.ChunkEntry, _ model.Time)
d.metrics.deleteRequestsChunksSelectedTotal.WithLabelValues(string(ref.UserID)).Inc()
return true, nil
}
filterFuncs = append(filterFuncs, ff)
}
if !isExpired {
if len(filterFuncs) == 0 {
return false, nil
}
d.metrics.deleteRequestsChunksSelectedTotal.WithLabelValues(string(ref.UserID)).Inc()
return true, d.chunkIntervalsToRetain
return true, func(ts time.Time, s string) bool {
for _, ff := range filterFuncs {
if ff(ts, s) {
return true
}
}
return false
}
}
func (d *DeleteRequestsManager) MarkPhaseStarted() {

@ -2,6 +2,7 @@ package deletion
import (
"context"
"strings"
"testing"
"time"
@ -18,11 +19,8 @@ const testUserID = "test-user"
func TestDeleteRequestsManager_Expired(t *testing.T) {
type resp struct {
isExpired bool
nonDeletedIntervals []retention.IntervalFilter
}
var dummyFilterFunc filter.Func = func(s string) bool {
return false
isExpired bool
expectedFilter filter.Func
}
now := model.Now()
@ -52,8 +50,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
deletionMode: deletionmode.FilterAndDelete,
batchSize: 70,
expectedResp: resp{
isExpired: false,
nonDeletedIntervals: nil,
isExpired: false,
},
},
{
@ -69,8 +66,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
},
},
expectedResp: resp{
isExpired: false,
nonDeletedIntervals: nil,
isExpired: false,
},
expectedDeletionRangeByUser: map[string]model.Interval{
"different-user": {
@ -92,8 +88,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
},
},
expectedResp: resp{
isExpired: true,
nonDeletedIntervals: nil,
isExpired: true,
},
expectedDeletionRangeByUser: map[string]model.Interval{
testUserID: {
@ -116,14 +111,8 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
},
expectedResp: resp{
isExpired: true,
nonDeletedIntervals: []retention.IntervalFilter{
{
Interval: model.Interval{
Start: chunkEntry.ChunkRef.From,
End: chunkEntry.ChunkRef.Through,
},
Filter: dummyFilterFunc,
},
expectedFilter: func(ts time.Time, s string) bool {
return strings.Contains(s, "fizz")
},
},
expectedDeletionRangeByUser: map[string]model.Interval{
@ -146,8 +135,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
},
},
expectedResp: resp{
isExpired: false,
nonDeletedIntervals: nil,
isExpired: false,
},
expectedDeletionRangeByUser: map[string]model.Interval{
testUserID: {
@ -175,8 +163,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
},
},
expectedResp: resp{
isExpired: false,
nonDeletedIntervals: nil,
isExpired: false,
},
expectedDeletionRangeByUser: map[string]model.Interval{
testUserID: {
@ -208,8 +195,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
},
},
expectedResp: resp{
isExpired: true,
nonDeletedIntervals: nil,
isExpired: true,
},
expectedDeletionRangeByUser: map[string]model.Interval{
testUserID: {
@ -238,14 +224,8 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
},
expectedResp: resp{
isExpired: true,
nonDeletedIntervals: []retention.IntervalFilter{
{
Interval: model.Interval{
Start: chunkEntry.ChunkRef.From,
End: chunkEntry.ChunkRef.Through,
},
Filter: dummyFilterFunc,
},
expectedFilter: func(ts time.Time, s string) bool {
return strings.Contains(s, "fizz")
},
},
expectedDeletionRangeByUser: map[string]model.Interval{
@ -287,25 +267,15 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
},
expectedResp: resp{
isExpired: true,
nonDeletedIntervals: []retention.IntervalFilter{
{
Interval: model.Interval{
Start: now.Add(-11*time.Hour) + 1,
End: now.Add(-10*time.Hour) - 1,
},
},
{
Interval: model.Interval{
Start: now.Add(-8*time.Hour) + 1,
End: now.Add(-6*time.Hour) - 1,
},
},
{
Interval: model.Interval{
Start: now.Add(-5*time.Hour) + 1,
End: now.Add(-2*time.Hour) - 1,
},
},
expectedFilter: func(ts time.Time, s string) bool {
tsUnixNano := ts.UnixNano()
if (now.Add(-13*time.Hour).UnixNano() <= tsUnixNano && tsUnixNano <= now.Add(-11*time.Hour).UnixNano()) ||
(now.Add(-10*time.Hour).UnixNano() <= tsUnixNano && tsUnixNano <= now.Add(-8*time.Hour).UnixNano()) ||
(now.Add(-6*time.Hour).UnixNano() <= tsUnixNano && tsUnixNano <= now.Add(-5*time.Hour).UnixNano()) ||
(now.Add(-2*time.Hour).UnixNano() <= tsUnixNano && tsUnixNano <= now.UnixNano()) {
return true
}
return false
},
},
expectedDeletionRangeByUser: map[string]model.Interval{
@ -334,8 +304,10 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
},
},
expectedResp: resp{
isExpired: true,
nonDeletedIntervals: nil,
isExpired: true,
expectedFilter: func(ts time.Time, s string) bool {
return true
},
},
expectedDeletionRangeByUser: map[string]model.Interval{
testUserID: {
@ -364,21 +336,8 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
},
expectedResp: resp{
isExpired: true,
nonDeletedIntervals: []retention.IntervalFilter{
{
Interval: model.Interval{
Start: chunkEntry.ChunkRef.From,
End: now.Add(-6 * time.Hour),
},
Filter: dummyFilterFunc,
},
{
Interval: model.Interval{
Start: now.Add(-6*time.Hour) + 1,
End: chunkEntry.ChunkRef.Through,
},
Filter: dummyFilterFunc,
},
expectedFilter: func(ts time.Time, s string) bool {
return strings.Contains(s, "fizz")
},
},
expectedDeletionRangeByUser: map[string]model.Interval{
@ -413,8 +372,10 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
},
},
expectedResp: resp{
isExpired: true,
nonDeletedIntervals: nil,
isExpired: true,
expectedFilter: func(ts time.Time, s string) bool {
return true
},
},
expectedDeletionRangeByUser: map[string]model.Interval{
testUserID: {
@ -449,28 +410,8 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
},
expectedResp: resp{
isExpired: true,
nonDeletedIntervals: []retention.IntervalFilter{
{
Interval: model.Interval{
Start: chunkEntry.ChunkRef.From,
End: now.Add(-6*time.Hour) - 1,
},
Filter: dummyFilterFunc,
},
{
Interval: model.Interval{
Start: now.Add(-6 * time.Hour),
End: now.Add(-4*time.Hour) - 1,
},
Filter: dummyFilterFunc,
},
{
Interval: model.Interval{
Start: now.Add(-4 * time.Hour),
End: chunkEntry.ChunkRef.Through,
},
Filter: dummyFilterFunc,
},
expectedFilter: func(ts time.Time, s string) bool {
return strings.Contains(s, "fizz")
},
},
expectedDeletionRangeByUser: map[string]model.Interval{
@ -511,8 +452,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
},
},
expectedResp: resp{
isExpired: false,
nonDeletedIntervals: nil,
isExpired: false,
},
},
{
@ -546,8 +486,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
},
},
expectedResp: resp{
isExpired: false,
nonDeletedIntervals: nil,
isExpired: false,
},
},
{
@ -582,19 +521,14 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
},
expectedResp: resp{
isExpired: true,
nonDeletedIntervals: []retention.IntervalFilter{
{
Interval: model.Interval{
Start: now.Add(-11*time.Hour) + 1,
End: now.Add(-10*time.Hour) - 1,
},
},
{
Interval: model.Interval{
Start: now.Add(-8*time.Hour) + 1,
End: now.Add(-time.Hour),
},
},
expectedFilter: func(ts time.Time, s string) bool {
tsUnixNano := ts.UnixNano()
if (now.Add(-13*time.Hour).UnixNano() <= tsUnixNano && tsUnixNano <= now.Add(-11*time.Hour).UnixNano()) ||
(now.Add(-10*time.Hour).UnixNano() <= tsUnixNano && tsUnixNano <= now.Add(-8*time.Hour).UnixNano()) {
return true
}
return false
},
},
expectedDeletionRangeByUser: map[string]model.Interval{
@ -615,17 +549,20 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
}
}
isExpired, nonDeletedIntervals := mgr.Expired(chunkEntry, model.Now())
isExpired, filterFunc := mgr.Expired(chunkEntry, model.Now())
require.Equal(t, tc.expectedResp.isExpired, isExpired)
require.Len(t, nonDeletedIntervals, len(tc.expectedResp.nonDeletedIntervals))
for idx, interval := range nonDeletedIntervals {
require.Equal(t, tc.expectedResp.nonDeletedIntervals[idx].Interval.Start, interval.Interval.Start)
require.Equal(t, tc.expectedResp.nonDeletedIntervals[idx].Interval.End, interval.Interval.End)
if tc.expectedResp.nonDeletedIntervals[idx].Filter != nil {
require.NotNil(t, interval.Filter)
} else {
require.Nil(t, interval.Filter)
if tc.expectedResp.expectedFilter == nil {
require.Nil(t, filterFunc)
return
}
require.NotNil(t, filterFunc)
for start := chunkEntry.From; start <= chunkEntry.Through; start = start.Add(time.Minute) {
line := "foo bar"
if start.Time().Minute()%2 == 1 {
line = "fizz buzz"
}
require.Equal(t, tc.expectedResp.expectedFilter(start.Time(), line), filterFunc(start.Time(), line), "line", line, "time", start.Time(), "now", now.Time())
}
require.Equal(t, len(tc.expectedDeletionRangeByUser), len(mgr.deleteRequestsToProcess))

@ -22,7 +22,7 @@ type IntervalFilter struct {
}
type ExpirationChecker interface {
Expired(ref ChunkEntry, now model.Time) (bool, []IntervalFilter)
Expired(ref ChunkEntry, now model.Time) (bool, filter.Func)
IntervalMayHaveExpiredChunks(interval model.Interval, userID string) bool
MarkPhaseStarted()
MarkPhaseFailed()
@ -50,7 +50,7 @@ func NewExpirationChecker(limits Limits) ExpirationChecker {
}
// Expired tells if a ref chunk is expired based on retention rules.
func (e *expirationChecker) Expired(ref ChunkEntry, now model.Time) (bool, []IntervalFilter) {
func (e *expirationChecker) Expired(ref ChunkEntry, now model.Time) (bool, filter.Func) {
userID := unsafeGetString(ref.UserID)
period := e.tenantsRetention.RetentionPeriodFor(userID, ref.Labels)
return now.Sub(ref.Through) > period, nil
@ -99,7 +99,7 @@ func NeverExpiringExpirationChecker(limits Limits) ExpirationChecker {
type neverExpiringExpirationChecker struct{}
func (e *neverExpiringExpirationChecker) Expired(ref ChunkEntry, now model.Time) (bool, []IntervalFilter) {
func (e *neverExpiringExpirationChecker) Expired(ref ChunkEntry, now model.Time) (bool, filter.Func) {
return false, nil
}
func (e *neverExpiringExpirationChecker) IntervalMayHaveExpiredChunks(interval model.Interval, userID string) bool {

@ -16,6 +16,8 @@ import (
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/chunk/client"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/filter"
util_log "github.com/grafana/loki/pkg/util/log"
)
@ -176,11 +178,14 @@ func markForDelete(
seriesMap.Add(c.SeriesID, c.UserID, c.Labels)
// see if the chunk is deleted completely or partially
if expired, nonDeletedIntervalFilters := expiration.Expired(c, now); expired {
if len(nonDeletedIntervalFilters) > 0 {
wroteChunks, err := chunkRewriter.rewriteChunk(ctx, c, tableInterval, nonDeletedIntervalFilters)
if expired, filterFunc := expiration.Expired(c, now); expired {
linesDeleted := true // tracks whether we deleted at least some data from the chunk
if filterFunc != nil {
wroteChunks := false
var err error
wroteChunks, linesDeleted, err = chunkRewriter.rewriteChunk(ctx, c, tableInterval, filterFunc)
if err != nil {
return false, fmt.Errorf("failed to rewrite chunk %s for intervals %+v with error %s", c.ChunkID, nonDeletedIntervalFilters, err)
return false, fmt.Errorf("failed to rewrite chunk %s with error %s", c.ChunkID, err)
}
if wroteChunks {
@ -190,17 +195,19 @@ func markForDelete(
}
}
modified = true
if linesDeleted {
modified = true
// Mark the chunk for deletion only if it is completely deleted, or this is the last table that the chunk is index in.
// For a partially deleted chunk, if we delete the source chunk before all the tables which index it are processed then
// the retention would fail because it would fail to find it in the storage.
if len(nonDeletedIntervalFilters) == 0 || c.Through <= tableInterval.End {
if err := marker.Put(c.ChunkID); err != nil {
return false, err
// Mark the chunk for deletion only if it is completely deleted, or this is the last table that the chunk is index in.
// For a partially deleted chunk, if we delete the source chunk before all the tables which index it are processed then
// the retention would fail because it would fail to find it in the storage.
if filterFunc == nil || c.Through <= tableInterval.End {
if err := marker.Put(c.ChunkID); err != nil {
return false, err
}
}
return true, nil
}
return true, nil
}
// The chunk is not deleted, now see if we can drop its index entry based on end time from tableInterval.
@ -332,75 +339,88 @@ func newChunkRewriter(chunkClient client.Client, tableName string, chunkIndexer
}
}
func (c *chunkRewriter) rewriteChunk(ctx context.Context, ce ChunkEntry, tableInterval model.Interval, intervalFilters []IntervalFilter) (bool, error) {
// rewriteChunk rewrites a chunk after filtering out logs using filterFunc.
// It first builds a newChunk using filterFunc.
// If the newChunk is same as the original chunk then there is nothing to do here, wroteChunks and linesDeleted both would be false.
// If the newChunk is different, linesDeleted would be true.
// The newChunk is indexed and uploaded only if it belongs to the current index table being processed,
// the status of which is set to wroteChunks.
func (c *chunkRewriter) rewriteChunk(ctx context.Context, ce ChunkEntry, tableInterval model.Interval, filterFunc filter.Func) (wroteChunks bool, linesDeleted bool, err error) {
userID := unsafeGetString(ce.UserID)
chunkID := unsafeGetString(ce.ChunkID)
chk, err := chunk.ParseExternalKey(userID, chunkID)
if err != nil {
return false, err
return false, false, err
}
chks, err := c.chunkClient.GetChunks(ctx, []chunk.Chunk{chk})
if err != nil {
return false, err
return false, false, err
}
if len(chks) != 1 {
return false, fmt.Errorf("expected 1 entry for chunk %s but found %d in storage", chunkID, len(chks))
return false, false, fmt.Errorf("expected 1 entry for chunk %s but found %d in storage", chunkID, len(chks))
}
wroteChunks := false
for _, ivf := range intervalFilters {
start := ivf.Interval.Start
end := ivf.Interval.End
newChunkData, err := chks[0].Data.Rebound(start, end, ivf.Filter)
if err != nil {
if errors.Is(err, chunk.ErrSliceNoDataInRange) {
level.Info(util_log.Logger).Log("msg", "Rebound leaves an empty chunk", "chunk ref", string(ce.ChunkRef.ChunkID))
// skip empty chunks
continue
}
return false, err
newChunkData, err := chks[0].Data.Rebound(ce.From, ce.Through, func(ts time.Time, s string) bool {
if filterFunc(ts, s) {
linesDeleted = true
return true
}
if start > tableInterval.End || end < tableInterval.Start {
continue
return false
})
if err != nil {
if errors.Is(err, chunk.ErrSliceNoDataInRange) {
level.Info(util_log.Logger).Log("msg", "Delete request filterFunc leaves an empty chunk", "chunk ref", string(ce.ChunkRef.ChunkID))
return false, true, nil
}
return false, false, err
}
facade, ok := newChunkData.(*chunkenc.Facade)
if !ok {
return false, errors.New("invalid chunk type")
}
// if no lines were deleted then there is nothing to do
if !linesDeleted {
return false, false, nil
}
newChunk := chunk.NewChunk(
userID, chks[0].FingerprintModel(), chks[0].Metric,
facade,
start,
end,
)
facade, ok := newChunkData.(*chunkenc.Facade)
if !ok {
return false, false, errors.New("invalid chunk type")
}
err = newChunk.Encode()
if err != nil {
return false, err
}
newChunkStart, newChunkEnd := util.RoundToMilliseconds(facade.Bounds())
uploadChunk, err := c.chunkIndexer.IndexChunk(newChunk)
if err != nil {
return false, err
}
// new chunk is out of range for this table so don't upload and index it
if newChunkStart > tableInterval.End || newChunkEnd < tableInterval.Start {
return false, linesDeleted, nil
}
// upload chunk only if an entry was written
if uploadChunk {
err = c.chunkClient.PutChunks(ctx, []chunk.Chunk{newChunk})
if err != nil {
return false, err
}
wroteChunks = true
newChunk := chunk.NewChunk(
userID, chks[0].FingerprintModel(), chks[0].Metric,
facade,
newChunkStart,
newChunkEnd,
)
err = newChunk.Encode()
if err != nil {
return false, false, err
}
uploadChunk, err := c.chunkIndexer.IndexChunk(newChunk)
if err != nil {
return false, false, err
}
// upload chunk only if an entry was written
if uploadChunk {
err = c.chunkClient.PutChunks(ctx, []chunk.Chunk{newChunk})
if err != nil {
return false, false, err
}
wroteChunks = true
}
return wroteChunks, nil
return wroteChunks, linesDeleted, nil
}

@ -21,7 +21,9 @@ import (
"github.com/grafana/loki/pkg/chunkenc"
ingesterclient "github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/log"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/util/filter"
util_log "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/validation"
)
@ -271,103 +273,199 @@ func labelsString(ls labels.Labels) string {
func TestChunkRewriter(t *testing.T) {
minListMarkDelay = 1 * time.Second
now := model.Now()
schema := allSchemas[3]
todaysTableInterval := ExtractIntervalFromTableName(schema.config.IndexTables.TableFor(now))
type tableResp struct {
mustDeleteLines bool
mustRewriteChunk bool
}
for _, tt := range []struct {
name string
chunk chunk.Chunk
rewriteIntervalFilters []IntervalFilter
filterFunc filter.Func
expectedRespByTables map[string]tableResp
retainedChunkIntervals []model.Interval
}{
{
name: "no rewrites",
chunk: createChunk(t, "1", labels.Labels{labels.Label{Name: "foo", Value: "bar"}}, now.Add(-time.Hour), now),
chunk: createChunk(t, "1", labels.Labels{labels.Label{Name: "foo", Value: "bar"}}, todaysTableInterval.Start, todaysTableInterval.Start.Add(time.Hour)),
filterFunc: func(ts time.Time, s string) bool {
return false
},
expectedRespByTables: map[string]tableResp{
schema.config.IndexTables.TableFor(todaysTableInterval.Start): {},
},
},
{
name: "no rewrites with chunk spanning multiple tables",
chunk: createChunk(t, "1", labels.Labels{labels.Label{Name: "foo", Value: "bar"}}, now.Add(-48*time.Hour), now),
chunk: createChunk(t, "1", labels.Labels{labels.Label{Name: "foo", Value: "bar"}}, todaysTableInterval.End.Add(-48*time.Hour), todaysTableInterval.End),
filterFunc: func(ts time.Time, s string) bool {
return false
},
expectedRespByTables: map[string]tableResp{
schema.config.IndexTables.TableFor(todaysTableInterval.End): {},
schema.config.IndexTables.TableFor(todaysTableInterval.End.Add(-24 * time.Hour)): {},
schema.config.IndexTables.TableFor(todaysTableInterval.End.Add(-48 * time.Hour)): {},
},
},
{
name: "rewrite first half",
chunk: createChunk(t, "1", labels.Labels{labels.Label{Name: "foo", Value: "bar"}}, now.Add(-2*time.Hour), now),
rewriteIntervalFilters: []IntervalFilter{
chunk: createChunk(t, "1", labels.Labels{labels.Label{Name: "foo", Value: "bar"}}, todaysTableInterval.Start, todaysTableInterval.Start.Add(2*time.Hour)),
filterFunc: func(ts time.Time, s string) bool {
tsUnixNano := ts.UnixNano()
if todaysTableInterval.Start.UnixNano() <= tsUnixNano && tsUnixNano <= todaysTableInterval.Start.Add(time.Hour).UnixNano() {
return true
}
return false
},
expectedRespByTables: map[string]tableResp{
schema.config.IndexTables.TableFor(todaysTableInterval.Start): {
mustDeleteLines: true,
mustRewriteChunk: true,
},
},
retainedChunkIntervals: []model.Interval{
{
Interval: model.Interval{
Start: now.Add(-2 * time.Hour),
End: now.Add(-1 * time.Hour),
},
Start: todaysTableInterval.Start.Add(time.Hour).Add(time.Minute),
End: todaysTableInterval.Start.Add(2 * time.Hour),
},
},
},
{
name: "rewrite second half",
chunk: createChunk(t, "1", labels.Labels{labels.Label{Name: "foo", Value: "bar"}}, now.Add(-2*time.Hour), now),
rewriteIntervalFilters: []IntervalFilter{
chunk: createChunk(t, "1", labels.Labels{labels.Label{Name: "foo", Value: "bar"}}, todaysTableInterval.Start, todaysTableInterval.Start.Add(2*time.Hour)),
filterFunc: func(ts time.Time, s string) bool {
tsUnixNano := ts.UnixNano()
if todaysTableInterval.Start.Add(time.Hour).UnixNano() <= tsUnixNano && tsUnixNano <= todaysTableInterval.Start.Add(2*time.Hour).UnixNano() {
return true
}
return false
},
expectedRespByTables: map[string]tableResp{
schema.config.IndexTables.TableFor(todaysTableInterval.Start): {
mustDeleteLines: true,
mustRewriteChunk: true,
},
},
retainedChunkIntervals: []model.Interval{
{
Interval: model.Interval{
Start: now.Add(-time.Hour),
End: now,
},
Start: todaysTableInterval.Start,
End: todaysTableInterval.Start.Add(time.Hour).Add(-time.Minute),
},
},
},
{
name: "rewrite multiple intervals",
chunk: createChunk(t, "1", labels.Labels{labels.Label{Name: "foo", Value: "bar"}}, now.Add(-12*time.Hour), now),
rewriteIntervalFilters: []IntervalFilter{
{
Interval: model.Interval{
Start: now.Add(-12 * time.Hour),
End: now.Add(-10 * time.Hour),
},
chunk: createChunk(t, "1", labels.Labels{labels.Label{Name: "foo", Value: "bar"}}, todaysTableInterval.Start, todaysTableInterval.Start.Add(12*time.Hour)),
filterFunc: func(ts time.Time, s string) bool {
tsUnixNano := ts.UnixNano()
if (todaysTableInterval.Start.UnixNano() <= tsUnixNano && tsUnixNano <= todaysTableInterval.Start.Add(2*time.Hour).UnixNano()) ||
(todaysTableInterval.Start.Add(5*time.Hour).UnixNano() <= tsUnixNano && tsUnixNano <= todaysTableInterval.Start.Add(9*time.Hour).UnixNano()) ||
(todaysTableInterval.Start.Add(10*time.Hour).UnixNano() <= tsUnixNano && tsUnixNano <= todaysTableInterval.Start.Add(12*time.Hour).UnixNano()) {
return true
}
return false
},
expectedRespByTables: map[string]tableResp{
schema.config.IndexTables.TableFor(todaysTableInterval.Start): {
mustDeleteLines: true,
mustRewriteChunk: true,
},
},
retainedChunkIntervals: []model.Interval{
{
Interval: model.Interval{
Start: now.Add(-9 * time.Hour),
End: now.Add(-5 * time.Hour),
},
Start: todaysTableInterval.Start.Add(2 * time.Hour).Add(time.Minute),
End: todaysTableInterval.Start.Add(5 * time.Hour).Add(-time.Minute),
},
{
Interval: model.Interval{
Start: now.Add(-2 * time.Hour),
End: now,
},
Start: todaysTableInterval.Start.Add(9 * time.Hour).Add(time.Minute),
End: todaysTableInterval.Start.Add(10 * time.Hour).Add(-time.Minute),
},
},
},
{
name: "rewrite chunk spanning multiple days with multiple intervals",
chunk: createChunk(t, "1", labels.Labels{labels.Label{Name: "foo", Value: "bar"}}, now.Add(-72*time.Hour), now),
rewriteIntervalFilters: []IntervalFilter{
name: "rewrite chunk spanning multiple days with multiple intervals - delete partially for each day",
chunk: createChunk(t, "1", labels.Labels{labels.Label{Name: "foo", Value: "bar"}}, todaysTableInterval.End.Add(-72*time.Hour), todaysTableInterval.End),
filterFunc: func(ts time.Time, s string) bool {
tsUnixNano := ts.UnixNano()
if (todaysTableInterval.End.Add(-71*time.Hour).UnixNano() <= tsUnixNano && tsUnixNano <= todaysTableInterval.End.Add(-47*time.Hour).UnixNano()) ||
(todaysTableInterval.End.Add(-40*time.Hour).UnixNano() <= tsUnixNano && tsUnixNano <= todaysTableInterval.End.Add(-30*time.Hour).UnixNano()) ||
(todaysTableInterval.End.Add(-2*time.Hour).UnixNano() <= tsUnixNano && tsUnixNano <= todaysTableInterval.End.UnixNano()) {
return true
}
return false
},
expectedRespByTables: map[string]tableResp{
schema.config.IndexTables.TableFor(todaysTableInterval.End): {
mustDeleteLines: true,
mustRewriteChunk: true,
},
schema.config.IndexTables.TableFor(todaysTableInterval.End.Add(-24 * time.Hour)): {
mustDeleteLines: true,
mustRewriteChunk: true,
},
schema.config.IndexTables.TableFor(todaysTableInterval.End.Add(-48 * time.Hour)): {
mustDeleteLines: true,
mustRewriteChunk: true,
},
schema.config.IndexTables.TableFor(todaysTableInterval.End.Add(-72 * time.Hour)): {
mustDeleteLines: true,
mustRewriteChunk: true,
},
},
retainedChunkIntervals: []model.Interval{
{
Interval: model.Interval{
Start: now.Add(-71 * time.Hour),
End: now.Add(-47 * time.Hour),
},
Start: todaysTableInterval.End.Add(-72 * time.Hour),
End: todaysTableInterval.End.Add(-71 * time.Hour).Add(-time.Minute),
},
{
Interval: model.Interval{
Start: now.Add(-40 * time.Hour),
End: now.Add(-30 * time.Hour),
},
Start: todaysTableInterval.End.Add(-47 * time.Hour).Add(time.Minute),
End: todaysTableInterval.End.Add(-40 * time.Hour).Add(-time.Minute),
},
{
Interval: model.Interval{
Start: now.Add(-2 * time.Hour),
End: now,
},
Start: todaysTableInterval.End.Add(-30 * time.Hour).Add(time.Minute),
End: todaysTableInterval.End.Add(-2 * time.Hour).Add(-time.Minute),
},
},
},
{
name: "remove no lines using a filter function",
chunk: createChunk(t, "1", labels.Labels{labels.Label{Name: "foo", Value: "bar"}}, now.Add(-2*time.Hour), now),
rewriteIntervalFilters: []IntervalFilter{
name: "rewrite chunk spanning multiple days with multiple intervals - delete just one whole day",
chunk: createChunk(t, "1", labels.Labels{labels.Label{Name: "foo", Value: "bar"}}, todaysTableInterval.End.Add(-72*time.Hour), todaysTableInterval.End),
filterFunc: func(ts time.Time, s string) bool {
tsUnixNano := ts.UnixNano()
if todaysTableInterval.Start.UnixNano() <= tsUnixNano && tsUnixNano <= todaysTableInterval.End.UnixNano() {
return true
}
return false
},
expectedRespByTables: map[string]tableResp{
schema.config.IndexTables.TableFor(todaysTableInterval.End): {
mustDeleteLines: true,
mustRewriteChunk: false,
},
schema.config.IndexTables.TableFor(todaysTableInterval.End.Add(-24 * time.Hour)): {
mustDeleteLines: true,
mustRewriteChunk: true,
},
schema.config.IndexTables.TableFor(todaysTableInterval.End.Add(-48 * time.Hour)): {
mustDeleteLines: true,
mustRewriteChunk: true,
},
schema.config.IndexTables.TableFor(todaysTableInterval.End.Add(-72 * time.Hour)): {
mustDeleteLines: true,
mustRewriteChunk: true,
},
},
retainedChunkIntervals: []model.Interval{
{
Interval: model.Interval{
Start: now.Add(-1 * time.Hour),
End: now,
},
Filter: func(s string) bool {
return false
},
Start: todaysTableInterval.End.Add(-72 * time.Hour),
End: todaysTableInterval.End.Add(-24 * time.Hour),
},
},
},
@ -378,33 +476,59 @@ func TestChunkRewriter(t *testing.T) {
require.NoError(t, store.Put(context.TODO(), []chunk.Chunk{tt.chunk}))
store.Stop()
for _, indexTable := range store.indexTables() {
indexTables := store.indexTables()
require.Len(t, indexTables, len(tt.expectedRespByTables))
for _, indexTable := range indexTables {
cr := newChunkRewriter(store.chunkClient, indexTable.name, indexTable)
wroteChunks, err := cr.rewriteChunk(context.Background(), entryFromChunk(tt.chunk), ExtractIntervalFromTableName(indexTable.name), tt.rewriteIntervalFilters)
wroteChunks, linesDeleted, err := cr.rewriteChunk(context.Background(), entryFromChunk(tt.chunk), ExtractIntervalFromTableName(indexTable.name), tt.filterFunc)
require.NoError(t, err)
if len(tt.rewriteIntervalFilters) == 0 {
require.False(t, wroteChunks)
}
require.Equal(t, tt.expectedRespByTables[indexTable.name].mustDeleteLines, linesDeleted)
require.Equal(t, tt.expectedRespByTables[indexTable.name].mustRewriteChunk, wroteChunks)
}
// we should have original chunk in the store
expectedChunks := [][]model.Interval{
{
{
Start: tt.chunk.From,
End: tt.chunk.Through,
},
},
}
chunks := store.GetChunks(tt.chunk.UserID, tt.chunk.From, tt.chunk.Through, tt.chunk.Metric)
// if we rewrote the chunk, we should have that too
if len(tt.retainedChunkIntervals) > 0 {
expectedChunks = append(expectedChunks, tt.retainedChunkIntervals)
if chunks[1].Checksum == tt.chunk.Checksum {
chunks[0], chunks[1] = chunks[1], chunks[0]
}
}
require.Len(t, chunks, len(expectedChunks))
// now verify the contents of the chunks
for i := 0; i < len(expectedChunks); i++ {
require.Equal(t, expectedChunks[i][0].Start, chunks[i].From)
require.Equal(t, expectedChunks[i][len(expectedChunks[i])-1].End, chunks[i].Through)
lokiChunk := chunks[i].Data.(*chunkenc.Facade).LokiChunk()
newChunkItr, err := lokiChunk.Iterator(context.Background(), chunks[i].From.Time(), chunks[i].Through.Add(time.Minute).Time(), logproto.FORWARD, log.NewNoopPipeline().ForStream(labels.Labels{}))
require.NoError(t, err)
// number of chunks should be the new re-written chunks + the source chunk
require.Len(t, chunks, len(tt.rewriteIntervalFilters)+1)
for _, ivf := range tt.rewriteIntervalFilters {
expectedChk := createChunk(t, tt.chunk.UserID, labels.Labels{labels.Label{Name: "foo", Value: "bar"}}, ivf.Interval.Start, ivf.Interval.End)
for i, chk := range chunks {
if getChunkID(chk.ChunkRef) == getChunkID(expectedChk.ChunkRef) {
chunks = append(chunks[:i], chunks[i+1:]...)
break
for _, interval := range expectedChunks[i] {
for curr := interval.Start; curr <= interval.End; curr = curr.Add(time.Minute) {
require.True(t, newChunkItr.Next())
require.Equal(t, logproto.Entry{
Timestamp: curr.Time(),
Line: curr.String(),
}, newChunkItr.Entry())
}
}
}
// the source chunk should still be there in the store
require.Len(t, chunks, 1)
require.Equal(t, getChunkID(tt.chunk.ChunkRef), getChunkID(chunks[0].ChunkRef))
// the iterator should not have any more entries left to iterate
require.False(t, newChunkItr.Next())
}
store.Stop()
})
}
@ -429,8 +553,8 @@ func (s *seriesCleanedRecorder) CleanupSeries(userID []byte, lbls labels.Labels)
}
type chunkExpiry struct {
isExpired bool
nonDeletedIntervalFilters []IntervalFilter
isExpired bool
filterFunc filter.Func
}
type mockExpirationChecker struct {
@ -445,12 +569,12 @@ func newMockExpirationChecker(chunksExpiry map[string]chunkExpiry) *mockExpirati
return &mockExpirationChecker{chunksExpiry: chunksExpiry}
}
func (m *mockExpirationChecker) Expired(ref ChunkEntry, now model.Time) (bool, []IntervalFilter) {
func (m *mockExpirationChecker) Expired(ref ChunkEntry, now model.Time) (bool, filter.Func) {
time.Sleep(m.delay)
m.calls++
ce := m.chunksExpiry[string(ref.ChunkID)]
return ce.isExpired, ce.nonDeletedIntervalFilters
return ce.isExpired, ce.filterFunc
}
func (m *mockExpirationChecker) DropFromIndex(ref ChunkEntry, tableEndTime model.Time, now model.Time) bool {
@ -495,6 +619,29 @@ func TestMarkForDelete_SeriesCleanup(t *testing.T) {
false,
},
},
{
name: "chunk deleted with filter but no lines matching",
chunks: []chunk.Chunk{
createChunk(t, userID, labels.Labels{labels.Label{Name: "foo", Value: "1"}}, todaysTableInterval.Start, todaysTableInterval.Start.Add(30*time.Minute)),
},
expiry: []chunkExpiry{
{
isExpired: true,
filterFunc: func(ts time.Time, s string) bool {
return false
},
},
},
expectedDeletedSeries: []map[uint64]struct{}{
nil,
},
expectedEmpty: []bool{
false,
},
expectedModified: []bool{
false,
},
},
{
name: "only one chunk in store which gets deleted",
chunks: []chunk.Chunk{
@ -523,12 +670,14 @@ func TestMarkForDelete_SeriesCleanup(t *testing.T) {
expiry: []chunkExpiry{
{
isExpired: true,
nonDeletedIntervalFilters: []IntervalFilter{{
Interval: model.Interval{
Start: todaysTableInterval.Start,
End: todaysTableInterval.Start.Add(15 * time.Minute),
},
}},
filterFunc: func(ts time.Time, s string) bool {
tsUnixNano := ts.UnixNano()
if todaysTableInterval.Start.UnixNano() <= tsUnixNano && tsUnixNano <= todaysTableInterval.Start.Add(15*time.Minute).UnixNano() {
return true
}
return false
},
},
},
expectedDeletedSeries: []map[uint64]struct{}{
@ -577,12 +726,14 @@ func TestMarkForDelete_SeriesCleanup(t *testing.T) {
},
{
isExpired: true,
nonDeletedIntervalFilters: []IntervalFilter{{
Interval: model.Interval{
Start: todaysTableInterval.Start,
End: todaysTableInterval.Start.Add(15 * time.Minute),
},
}},
filterFunc: func(ts time.Time, s string) bool {
tsUnixNano := ts.UnixNano()
if todaysTableInterval.Start.UnixNano() <= tsUnixNano && tsUnixNano <= todaysTableInterval.Start.Add(15*time.Minute).UnixNano() {
return true
}
return false
},
},
},
expectedDeletedSeries: []map[uint64]struct{}{
@ -603,12 +754,9 @@ func TestMarkForDelete_SeriesCleanup(t *testing.T) {
expiry: []chunkExpiry{
{
isExpired: true,
nonDeletedIntervalFilters: []IntervalFilter{{
Interval: model.Interval{
Start: todaysTableInterval.Start,
End: now,
},
}},
filterFunc: func(ts time.Time, s string) bool {
return ts.UnixNano() < todaysTableInterval.Start.UnixNano()
},
},
},
expectedDeletedSeries: []map[uint64]struct{}{
@ -629,12 +777,9 @@ func TestMarkForDelete_SeriesCleanup(t *testing.T) {
expiry: []chunkExpiry{
{
isExpired: true,
nonDeletedIntervalFilters: []IntervalFilter{{
Interval: model.Interval{
Start: todaysTableInterval.Start.Add(-30 * time.Minute),
End: now,
},
}},
filterFunc: func(ts time.Time, s string) bool {
return ts.UnixNano() < todaysTableInterval.Start.Add(-30*time.Minute).UnixNano()
},
},
},
expectedDeletedSeries: []map[uint64]struct{}{

@ -1,3 +1,5 @@
package filter
type Func func(string) bool
import "time"
type Func func(ts time.Time, s string) bool

Loading…
Cancel
Save