non-indexed labels: support for deletion of logs with non-indexed labels (#10337)

**What this PR does / why we need it**:
We recently added support for storing and querying non-indexed labels
with each log line. This PR wires up the code for supporting deletion
using non-indexed labels.

**Checklist**
- [x] Tests updated
kavirajk/upgrade-prometheus-0.46
Sandeep Sukhani 2 years ago committed by GitHub
parent 210c937c41
commit 962e03932d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      integration/cluster/cluster.go
  2. 8
      integration/cluster/schema.go
  3. 169
      integration/loki_micro_services_delete_test.go
  4. 2
      pkg/chunkenc/memchunk.go
  5. 113
      pkg/chunkenc/memchunk_test.go
  6. 4
      pkg/logql/log/pipeline.go
  7. 22
      pkg/storage/stores/indexshipper/compactor/deletion/delete_request.go
  8. 168
      pkg/storage/stores/indexshipper/compactor/deletion/delete_request_test.go
  9. 5
      pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_manager.go
  10. 221
      pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_manager_test.go
  11. 4
      pkg/storage/stores/indexshipper/compactor/retention/retention.go
  12. 50
      pkg/storage/stores/indexshipper/compactor/retention/retention_test.go
  13. 8
      pkg/util/filter/filter_function.go

@ -142,6 +142,7 @@ type Cluster struct {
initedAt model.Time initedAt model.Time
periodCfgs []string periodCfgs []string
overridesFile string overridesFile string
schemaVer string
} }
func New(logLevel level.Value, opts ...func(*Cluster)) *Cluster { func New(logLevel level.Value, opts ...func(*Cluster)) *Cluster {
@ -166,6 +167,7 @@ func New(logLevel level.Value, opts ...func(*Cluster)) *Cluster {
sharedPath: sharedPath, sharedPath: sharedPath,
initedAt: model.Now(), initedAt: model.Now(),
overridesFile: overridesFile, overridesFile: overridesFile,
schemaVer: "v11",
} }
for _, opt := range opts { for _, opt := range opts {
@ -175,6 +177,11 @@ func New(logLevel level.Value, opts ...func(*Cluster)) *Cluster {
return cluster return cluster
} }
// SetSchemaVer sets a schema version for all the schemas
func (c *Cluster) SetSchemaVer(schemaVer string) {
c.schemaVer = schemaVer
}
func (c *Cluster) Run() error { func (c *Cluster) Run() error {
for _, component := range c.components { for _, component := range c.components {
if component.running { if component.running {
@ -360,6 +367,7 @@ func (c *Component) MergedConfig() ([]byte, error) {
Execute(&buf, map[string]interface{}{ Execute(&buf, map[string]interface{}{
"curPeriodStart": periodStart.String(), "curPeriodStart": periodStart.String(),
"additionalPeriodStart": additionalPeriodStart.String(), "additionalPeriodStart": additionalPeriodStart.String(),
"schemaVer": c.cluster.schemaVer,
}); err != nil { }); err != nil {
return nil, errors.New("error building schema_config") return nil, errors.New("error building schema_config")
} }

@ -7,7 +7,7 @@ schema_config:
- from: {{.curPeriodStart}} - from: {{.curPeriodStart}}
store: boltdb-shipper store: boltdb-shipper
object_store: filesystem object_store: filesystem
schema: v11 schema: {{.schemaVer}}
index: index:
prefix: index_ prefix: index_
period: 24h period: 24h
@ -18,7 +18,7 @@ schema_config:
- from: {{.additionalPeriodStart}} - from: {{.additionalPeriodStart}}
store: boltdb-shipper store: boltdb-shipper
object_store: store-1 object_store: store-1
schema: v11 schema: {{.schemaVer}}
index: index:
prefix: index_ prefix: index_
period: 24h period: 24h
@ -30,7 +30,7 @@ schema_config:
- from: {{.curPeriodStart}} - from: {{.curPeriodStart}}
store: tsdb store: tsdb
object_store: filesystem object_store: filesystem
schema: v11 schema: {{.schemaVer}}
index: index:
prefix: index_ prefix: index_
period: 24h period: 24h
@ -41,7 +41,7 @@ schema_config:
- from: {{.additionalPeriodStart}} - from: {{.additionalPeriodStart}}
store: tsdb store: tsdb
object_store: store-1 object_store: store-1
schema: v11 schema: {{.schemaVer}}
index: index:
prefix: index_tsdb_ prefix: index_tsdb_
period: 24h period: 24h

@ -7,18 +7,28 @@ import (
"testing" "testing"
"time" "time"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/grafana/loki/integration/client" "github.com/grafana/loki/integration/client"
"github.com/grafana/loki/integration/cluster" "github.com/grafana/loki/integration/cluster"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/push"
"github.com/grafana/loki/pkg/storage" "github.com/grafana/loki/pkg/storage"
) )
type pushRequest struct {
stream map[string]string
entries []logproto.Entry
}
func TestMicroServicesDeleteRequest(t *testing.T) { func TestMicroServicesDeleteRequest(t *testing.T) {
storage.ResetBoltDBIndexClientsWithShipper() storage.ResetBoltDBIndexClientsWithShipper()
clu := cluster.New(nil, cluster.SchemaWithBoltDBAndBoltDB) clu := cluster.New(nil, cluster.SchemaWithBoltDBAndBoltDB, func(c *cluster.Cluster) {
c.SetSchemaVer("v13")
})
defer func() { defer func() {
assert.NoError(t, clu.Cleanup()) assert.NoError(t, clu.Cleanup())
storage.ResetBoltDBIndexClientsWithShipper() storage.ResetBoltDBIndexClientsWithShipper()
@ -89,34 +99,88 @@ func TestMicroServicesDeleteRequest(t *testing.T) {
cliCompactor := client.New(tenantID, "", tCompactor.HTTPURL()) cliCompactor := client.New(tenantID, "", tCompactor.HTTPURL())
cliCompactor.Now = now cliCompactor.Now = now
var pushRequests []pushRequest
var expectedStreams []client.StreamValues var expectedStreams []client.StreamValues
for _, deletionType := range []string{"filter", "filter_no_match", "nothing", "partially_by_time", "whole"} { for _, deletionType := range []string{"filter", "filter_no_match", "nothing", "partially_by_time", "whole"} {
expectedStreams = append(expectedStreams, client.StreamValues{ pushRequests = append(pushRequests, pushRequest{
Stream: map[string]string{ stream: map[string]string{
"job": "fake", "job": "fake",
"deletion_type": deletionType, "deletion_type": deletionType,
}, },
Values: [][]string{ entries: []logproto.Entry{
{ {
strconv.FormatInt(now.Add(-48*time.Hour).UnixNano(), 10), Timestamp: now.Add(-48 * time.Hour),
"lineA", Line: "lineA",
}, },
{ {
strconv.FormatInt(now.Add(-48*time.Hour).UnixNano(), 10), Timestamp: now.Add(-48 * time.Hour),
"lineB", Line: "lineB",
}, },
{ {
strconv.FormatInt(now.Add(-time.Minute).UnixNano(), 10), Timestamp: now.Add(-time.Minute),
"lineC", Line: "lineC",
}, },
{ {
strconv.FormatInt(now.Add(-time.Minute).UnixNano(), 10), Timestamp: now.Add(-time.Minute),
"lineD", Line: "lineD",
}, },
}, },
}) })
} }
pushRequests = append(pushRequests, pushRequest{
stream: map[string]string{
"job": "fake",
"deletion_type": "with_non_indexed_labels",
},
entries: []logproto.Entry{
{
Timestamp: now.Add(-48 * time.Hour),
Line: "AlineA",
NonIndexedLabels: push.LabelsAdapter{
{
Name: "line",
Value: "A",
},
},
},
{
Timestamp: now.Add(-48 * time.Hour),
Line: "AlineB",
NonIndexedLabels: push.LabelsAdapter{
{
Name: "line",
Value: "B",
},
},
},
{
Timestamp: now.Add(-time.Minute),
Line: "AlineC",
NonIndexedLabels: push.LabelsAdapter{
{
Name: "line",
Value: "C",
},
},
},
{
Timestamp: now.Add(-time.Minute),
Line: "AlineD",
NonIndexedLabels: push.LabelsAdapter{
{
Name: "line",
Value: "D",
},
},
},
},
})
for _, pr := range pushRequests {
expectedStreams = append(expectedStreams, pushRequestToClientStreamValues(t, pr)...)
}
expectedDeleteRequests := []client.DeleteRequest{ expectedDeleteRequests := []client.DeleteRequest{
{ {
StartTime: now.Add(-48 * time.Hour).Unix(), StartTime: now.Add(-48 * time.Hour).Unix(),
@ -142,6 +206,12 @@ func TestMicroServicesDeleteRequest(t *testing.T) {
Query: `{deletion_type="whole"}`, Query: `{deletion_type="whole"}`,
Status: "received", Status: "received",
}, },
{
StartTime: now.Add(-48 * time.Hour).Unix(),
EndTime: now.Unix(),
Query: `{deletion_type="with_non_indexed_labels"} | line="A"`,
Status: "received",
},
} }
validateQueryResponse := func(expectedStreams []client.StreamValues, resp *client.Response) { validateQueryResponse := func(expectedStreams []client.StreamValues, resp *client.Response) {
@ -150,7 +220,7 @@ func TestMicroServicesDeleteRequest(t *testing.T) {
require.Len(t, resp.Data.Stream, len(expectedStreams)) require.Len(t, resp.Data.Stream, len(expectedStreams))
sort.Slice(resp.Data.Stream, func(i, j int) bool { sort.Slice(resp.Data.Stream, func(i, j int) bool {
return resp.Data.Stream[i].Stream["deletion_type"] < resp.Data.Stream[j].Stream["deletion_type"] return labels.FromMap(resp.Data.Stream[i].Stream).String() < labels.FromMap(resp.Data.Stream[j].Stream).String()
}) })
for _, stream := range resp.Data.Stream { for _, stream := range resp.Data.Stream {
sort.Slice(stream.Values, func(i, j int) bool { sort.Slice(stream.Values, func(i, j int) bool {
@ -162,11 +232,14 @@ func TestMicroServicesDeleteRequest(t *testing.T) {
t.Run("ingest-logs", func(t *testing.T) { t.Run("ingest-logs", func(t *testing.T) {
// ingest some log lines // ingest some log lines
for _, stream := range expectedStreams { for _, pr := range pushRequests {
for _, val := range stream.Values { for _, entry := range pr.entries {
tsNs, err := strconv.ParseInt(val[0], 10, 64) require.NoError(t, cliDistributor.PushLogLineWithTimestampAndNonIndexedLabels(
require.NoError(t, err) entry.Line,
require.NoError(t, cliDistributor.PushLogLineWithTimestamp(val[1], time.Unix(0, tsNs), stream.Stream)) entry.Timestamp,
logproto.FromLabelAdaptersToLabels(entry.NonIndexedLabels).Map(),
pr.stream,
))
} }
} }
}) })
@ -178,8 +251,20 @@ func TestMicroServicesDeleteRequest(t *testing.T) {
// given default value of query_ingesters_within is 3h, older samples won't be present in the response // given default value of query_ingesters_within is 3h, older samples won't be present in the response
var es []client.StreamValues var es []client.StreamValues
for _, stream := range expectedStreams { for _, stream := range expectedStreams {
stream.Values = stream.Values[2:] s := client.StreamValues{
es = append(es, stream) Stream: stream.Stream,
Values: nil,
}
for _, sv := range stream.Values {
tsNs, err := strconv.ParseInt(sv[0], 10, 64)
require.NoError(t, err)
if !time.Unix(0, tsNs).Before(now.Add(-3 * time.Hour)) {
s.Values = append(s.Values, sv)
}
}
if len(s.Values) > 0 {
es = append(es, s)
}
} }
validateQueryResponse(es, resp) validateQueryResponse(es, resp)
}) })
@ -192,7 +277,7 @@ func TestMicroServicesDeleteRequest(t *testing.T) {
cliIngester.Now = now cliIngester.Now = now
metrics, err := cliIngester.Metrics() metrics, err := cliIngester.Metrics()
require.NoError(t, err) require.NoError(t, err)
checkMetricValue(t, "loki_ingester_chunks_flushed_total", metrics, 5) checkMetricValue(t, "loki_ingester_chunks_flushed_total", metrics, 6)
// reset boltdb-shipper client and restart querier // reset boltdb-shipper client and restart querier
storage.ResetBoltDBIndexClientsWithShipper() storage.ResetBoltDBIndexClientsWithShipper()
@ -232,7 +317,7 @@ func TestMicroServicesDeleteRequest(t *testing.T) {
// update expectedStreams as per the issued requests // update expectedStreams as per the issued requests
expectedStreams[0].Values = append(expectedStreams[0].Values[:1], expectedStreams[0].Values[2:]...) expectedStreams[0].Values = append(expectedStreams[0].Values[:1], expectedStreams[0].Values[2:]...)
expectedStreams[3].Values = expectedStreams[3].Values[2:] expectedStreams[3].Values = expectedStreams[3].Values[2:]
expectedStreams = expectedStreams[:4] expectedStreams = append(expectedStreams[:4], expectedStreams[6:]...)
// query and verify that we get the resp which matches expectedStreams // query and verify that we get the resp which matches expectedStreams
resp, err := cliQueryFrontend.RunRangeQuery(context.Background(), `{job="fake"}`) resp, err := cliQueryFrontend.RunRangeQuery(context.Background(), `{job="fake"}`)
@ -273,9 +358,9 @@ func TestMicroServicesDeleteRequest(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
checkUserLabelAndMetricValue(t, "loki_compactor_delete_requests_processed_total", metrics, tenantID, float64(len(expectedDeleteRequests))) checkUserLabelAndMetricValue(t, "loki_compactor_delete_requests_processed_total", metrics, tenantID, float64(len(expectedDeleteRequests)))
// ideally this metric should be equal to 1 given that a single line matches the line filter // ideally this metric should be equal to 2 given that a single line matches the line filter and non-indexed labels filter
// but the same chunk is indexed in 3 tables // but the same chunks are indexed in 3 tables
checkUserLabelAndMetricValue(t, "loki_compactor_deleted_lines", metrics, tenantID, 3) checkUserLabelAndMetricValue(t, "loki_compactor_deleted_lines", metrics, tenantID, 6)
}) })
// Query lines // Query lines
@ -321,3 +406,35 @@ func getMetricValue(t *testing.T, metricName, metrics string) float64 {
require.NoError(t, err) require.NoError(t, err)
return val return val
} }
func pushRequestToClientStreamValues(t *testing.T, p pushRequest) []client.StreamValues {
logsByStream := map[string][][]string{}
for _, entry := range p.entries {
lb := labels.NewBuilder(labels.FromMap(p.stream))
for _, l := range entry.NonIndexedLabels {
lb.Set(l.Name, l.Value)
}
stream := lb.Labels().String()
logsByStream[stream] = append(logsByStream[stream], []string{
strconv.FormatInt(entry.Timestamp.UnixNano(), 10),
entry.Line,
})
}
var svs []client.StreamValues
for stream, values := range logsByStream {
parsedLabels, err := syntax.ParseLabels(stream)
require.NoError(t, err)
svs = append(svs, client.StreamValues{
Stream: parsedLabels.Map(),
Values: values,
})
}
sort.Slice(svs, func(i, j int) bool {
return labels.FromMap(svs[i].Stream).String() < labels.FromMap(svs[j].Stream).String()
})
return svs
}

@ -1119,7 +1119,7 @@ func (c *MemChunk) Rebound(start, end time.Time, filter filter.Func) (Chunk, err
for itr.Next() { for itr.Next() {
entry := itr.Entry() entry := itr.Entry()
if filter != nil && filter(entry.Timestamp, entry.Line) { if filter != nil && filter(entry.Timestamp, entry.Line, logproto.FromLabelAdaptersToLabels(entry.NonIndexedLabels)...) {
continue continue
} }
if err := newChunk.Append(&entry); err != nil { if err := newChunk.Append(&entry); err != nil {

@ -26,6 +26,7 @@ import (
"github.com/grafana/loki/pkg/logqlmodel/stats" "github.com/grafana/loki/pkg/logqlmodel/stats"
"github.com/grafana/loki/pkg/push" "github.com/grafana/loki/pkg/push"
"github.com/grafana/loki/pkg/storage/chunk" "github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/util/filter"
) )
var testEncoding = []Encoding{ var testEncoding = []Encoding{
@ -74,7 +75,11 @@ var (
} }
) )
const DefaultTestHeadBlockFmt = UnorderedWithNonIndexedLabelsHeadBlockFmt const (
DefaultTestHeadBlockFmt = UnorderedWithNonIndexedLabelsHeadBlockFmt
lblPing = "ping"
lblPong = "pong"
)
func TestBlocksInclusive(t *testing.T) { func TestBlocksInclusive(t *testing.T) {
for _, enc := range testEncoding { for _, enc := range testEncoding {
@ -1416,39 +1421,81 @@ func TestMemChunk_ReboundAndFilter_with_filter(t *testing.T) {
chkThrough := chkFrom.Add(10 * time.Second) chkThrough := chkFrom.Add(10 * time.Second)
chkThroughPlus1 := chkThrough.Add(1 * time.Second) chkThroughPlus1 := chkThrough.Add(1 * time.Second)
filterFunc := func(_ time.Time, in string) bool {
return strings.HasPrefix(in, "matching")
}
for _, tc := range []struct { for _, tc := range []struct {
name string name string
matchingSliceFrom, matchingSliceTo *time.Time testMemChunk *MemChunk
err error filterFunc filter.Func
nrMatching int err error
nrNotMatching int nrMatching int
nrNotMatching int
}{ }{
{ {
name: "no matches", name: "no matches",
testMemChunk: buildFilterableTestMemChunk(t, chkFrom, chkThrough, nil, nil, false),
filterFunc: func(_ time.Time, in string, _ ...labels.Label) bool {
return strings.HasPrefix(in, "matching")
},
nrMatching: 0,
nrNotMatching: 10,
},
{
name: "some lines removed",
testMemChunk: buildFilterableTestMemChunk(t, chkFrom, chkThrough, &chkFrom, &chkFromPlus5, false),
filterFunc: func(_ time.Time, in string, _ ...labels.Label) bool {
return strings.HasPrefix(in, "matching")
},
nrMatching: 5,
nrNotMatching: 5,
},
{
name: "all lines match",
testMemChunk: buildFilterableTestMemChunk(t, chkFrom, chkThrough, &chkFrom, &chkThroughPlus1, false),
filterFunc: func(_ time.Time, in string, _ ...labels.Label) bool {
return strings.HasPrefix(in, "matching")
},
err: chunk.ErrSliceNoDataInRange,
},
// Test cases with non-indexed labels
{
name: "no matches - chunk without non-indexed labels",
testMemChunk: buildFilterableTestMemChunk(t, chkFrom, chkThrough, &chkFrom, &chkThroughPlus1, false),
filterFunc: func(_ time.Time, in string, nonIndexedLabels ...labels.Label) bool {
return labels.Labels(nonIndexedLabels).Get(lblPing) == lblPong
},
nrMatching: 0, nrMatching: 0,
nrNotMatching: 10, nrNotMatching: 10,
}, },
{ {
name: "some lines removed", name: "non-indexed labels not matching",
matchingSliceFrom: &chkFrom, testMemChunk: buildFilterableTestMemChunk(t, chkFrom, chkThrough, &chkFrom, &chkThroughPlus1, true),
matchingSliceTo: &chkFromPlus5, filterFunc: func(_ time.Time, in string, nonIndexedLabels ...labels.Label) bool {
nrMatching: 5, return labels.Labels(nonIndexedLabels).Get("ding") == "dong"
nrNotMatching: 5, },
nrMatching: 0,
nrNotMatching: 10,
}, },
{ {
name: "all lines match", name: "some lines removed - with non-indexed labels",
err: chunk.ErrSliceNoDataInRange, testMemChunk: buildFilterableTestMemChunk(t, chkFrom, chkThrough, &chkFrom, &chkFromPlus5, true),
matchingSliceFrom: &chkFrom, filterFunc: func(_ time.Time, in string, nonIndexedLabels ...labels.Label) bool {
matchingSliceTo: &chkThroughPlus1, return labels.Labels(nonIndexedLabels).Get(lblPing) == lblPong
},
nrMatching: 5,
nrNotMatching: 5,
},
{
name: "all lines match - with non-indexed labels",
testMemChunk: buildFilterableTestMemChunk(t, chkFrom, chkThrough, &chkFrom, &chkThroughPlus1, true),
filterFunc: func(_ time.Time, in string, nonIndexedLabels ...labels.Label) bool {
return labels.Labels(nonIndexedLabels).Get(lblPing) == lblPong && strings.HasPrefix(in, "matching")
},
err: chunk.ErrSliceNoDataInRange,
}, },
} { } {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
originalChunk := buildFilterableTestMemChunk(t, chkFrom, chkThrough, tc.matchingSliceFrom, tc.matchingSliceTo) originalChunk := tc.testMemChunk
newChunk, err := originalChunk.Rebound(chkFrom, chkThrough, filterFunc) newChunk, err := originalChunk.Rebound(chkFrom, chkThrough, tc.filterFunc)
if tc.err != nil { if tc.err != nil {
require.Equal(t, tc.err, err) require.Equal(t, tc.err, err)
return return
@ -1476,25 +1523,35 @@ func TestMemChunk_ReboundAndFilter_with_filter(t *testing.T) {
} }
} }
func buildFilterableTestMemChunk(t *testing.T, from, through time.Time, matchingFrom, matchingTo *time.Time) *MemChunk { func buildFilterableTestMemChunk(t *testing.T, from, through time.Time, matchingFrom, matchingTo *time.Time, withNonIndexedLabels bool) *MemChunk {
chk := NewMemChunk(ChunkFormatV3, EncGZIP, DefaultTestHeadBlockFmt, defaultBlockSize, 0) chk := NewMemChunk(ChunkFormatV4, EncGZIP, DefaultTestHeadBlockFmt, defaultBlockSize, 0)
t.Logf("from : %v", from.String()) t.Logf("from : %v", from.String())
t.Logf("through: %v", through.String()) t.Logf("through: %v", through.String())
var nonIndexedLabels push.LabelsAdapter
if withNonIndexedLabels {
nonIndexedLabels = push.LabelsAdapter{{Name: lblPing, Value: lblPong}}
}
for from.Before(through) { for from.Before(through) {
// If a line is between matchingFrom and matchingTo add the prefix "matching" // If a line is between matchingFrom and matchingTo add the prefix "matching"
if matchingFrom != nil && matchingTo != nil && if matchingFrom != nil && matchingTo != nil &&
(from.Equal(*matchingFrom) || (from.After(*matchingFrom) && (from.Before(*matchingTo)))) { (from.Equal(*matchingFrom) || (from.After(*matchingFrom) && (from.Before(*matchingTo)))) {
t.Logf("%v matching line", from.String()) t.Logf("%v matching line", from.String())
err := chk.Append(&logproto.Entry{ err := chk.Append(&logproto.Entry{
Line: fmt.Sprintf("matching %v", from.String()), Line: fmt.Sprintf("matching %v", from.String()),
Timestamp: from, Timestamp: from,
NonIndexedLabels: nonIndexedLabels,
}) })
require.NoError(t, err) require.NoError(t, err)
} else { } else {
t.Logf("%v non-match line", from.String()) t.Logf("%v non-match line", from.String())
var nonIndexedLabels push.LabelsAdapter
if withNonIndexedLabels {
nonIndexedLabels = push.LabelsAdapter{{Name: "ding", Value: "dong"}}
}
err := chk.Append(&logproto.Entry{ err := chk.Append(&logproto.Entry{
Line: from.String(), Line: from.String(),
Timestamp: from, Timestamp: from,
NonIndexedLabels: nonIndexedLabels,
}) })
require.NoError(t, err) require.NoError(t, err)
} }

@ -308,7 +308,7 @@ func (sp *filteringStreamPipeline) Process(ts int64, line []byte, nonIndexedLabe
} }
} }
return sp.pipeline.Process(ts, line) return sp.pipeline.Process(ts, line, nonIndexedLabels...)
} }
func (sp *filteringStreamPipeline) ProcessString(ts int64, line string, nonIndexedLabels ...labels.Label) (string, LabelsResult, bool) { func (sp *filteringStreamPipeline) ProcessString(ts int64, line string, nonIndexedLabels ...labels.Label) (string, LabelsResult, bool) {
@ -323,7 +323,7 @@ func (sp *filteringStreamPipeline) ProcessString(ts int64, line string, nonIndex
} }
} }
return sp.pipeline.ProcessString(ts, line) return sp.pipeline.ProcessString(ts, line, nonIndexedLabels...)
} }
// ReduceStages reduces multiple stages into one. // ReduceStages reduces multiple stages into one.

@ -47,7 +47,7 @@ func (d *DeleteRequest) SetQuery(logQL string) error {
} }
// FilterFunction returns a filter function that returns true if the given line should be deleted based on the DeleteRequest // FilterFunction returns a filter function that returns true if the given line should be deleted based on the DeleteRequest
func (d *DeleteRequest) FilterFunction(labels labels.Labels) (filter.Func, error) { func (d *DeleteRequest) FilterFunction(lbls labels.Labels) (filter.Func, error) {
// init d.timeInterval used to efficiently check log ts is within the bounds of delete request below in filter func // init d.timeInterval used to efficiently check log ts is within the bounds of delete request below in filter func
// without having to do conversion of timestamps for each log line we check. // without having to do conversion of timestamps for each log line we check.
if d.timeInterval == nil { if d.timeInterval == nil {
@ -57,15 +57,15 @@ func (d *DeleteRequest) FilterFunction(labels labels.Labels) (filter.Func, error
} }
} }
if !allMatch(d.matchers, labels) { if !allMatch(d.matchers, lbls) {
return func(_ time.Time, s string) bool { return func(_ time.Time, _ string, _ ...labels.Label) bool {
return false return false
}, nil }, nil
} }
// if delete request doesn't have a line filter, just do time based filtering // if delete request doesn't have a line filter, just do time based filtering
if !d.logSelectorExpr.HasFilter() { if !d.logSelectorExpr.HasFilter() {
return func(ts time.Time, s string) bool { return func(ts time.Time, _ string, _ ...labels.Label) bool {
if ts.Before(d.timeInterval.start) || ts.After(d.timeInterval.end) { if ts.Before(d.timeInterval.start) || ts.After(d.timeInterval.end) {
return false return false
} }
@ -79,13 +79,13 @@ func (d *DeleteRequest) FilterFunction(labels labels.Labels) (filter.Func, error
return nil, err return nil, err
} }
f := p.ForStream(labels).ProcessString f := p.ForStream(lbls).ProcessString
return func(ts time.Time, s string) bool { return func(ts time.Time, s string, nonIndexedLabels ...labels.Label) bool {
if ts.Before(d.timeInterval.start) || ts.After(d.timeInterval.end) { if ts.Before(d.timeInterval.start) || ts.After(d.timeInterval.end) {
return false return false
} }
result, _, skip := f(0, s) result, _, skip := f(0, s, nonIndexedLabels...)
if len(result) != 0 || skip { if len(result) != 0 || skip {
d.Metrics.deletedLinesTotal.WithLabelValues(d.UserID).Inc() d.Metrics.deletedLinesTotal.WithLabelValues(d.UserID).Inc()
d.DeletedLines++ d.DeletedLines++
@ -122,10 +122,6 @@ func (d *DeleteRequest) IsDeleted(entry retention.ChunkEntry) (bool, filter.Func
return false, nil return false, nil
} }
if !labels.Selector(d.matchers).Matches(entry.Labels) {
return false, nil
}
if d.logSelectorExpr == nil { if d.logSelectorExpr == nil {
err := d.SetQuery(d.Query) err := d.SetQuery(d.Query)
if err != nil { if err != nil {
@ -139,6 +135,10 @@ func (d *DeleteRequest) IsDeleted(entry retention.ChunkEntry) (bool, filter.Func
} }
} }
if !labels.Selector(d.matchers).Matches(entry.Labels) {
return false, nil
}
if d.StartTime <= entry.From && d.EndTime >= entry.Through && !d.logSelectorExpr.HasFilter() { if d.StartTime <= entry.From && d.EndTime >= entry.Through && !d.logSelectorExpr.HasFilter() {
// Delete request covers the whole chunk and there are no line filters in the logSelectorExpr so the whole chunk will be deleted // Delete request covers the whole chunk and there are no line filters in the logSelectorExpr so the whole chunk will be deleted
return true, nil return true, nil

@ -17,12 +17,21 @@ import (
"github.com/grafana/loki/pkg/util/filter" "github.com/grafana/loki/pkg/util/filter"
) )
const (
lblFooBar = `{foo="bar"}`
lblPing = "ping"
lblPong = "pong"
)
func TestDeleteRequest_IsDeleted(t *testing.T) { func TestDeleteRequest_IsDeleted(t *testing.T) {
now := model.Now() now := model.Now()
user1 := "user1" user1 := "user1"
lbl := `{foo="bar", fizz="buzz"}` lbl := `{foo="bar", fizz="buzz"}`
lblWithFilter := `{foo="bar", fizz="buzz"} |= "filter"` lblWithLineFilter := `{foo="bar", fizz="buzz"} |= "filter"`
lblWithNonIndexedLabelsFilter := `{foo="bar", fizz="buzz"} | ping="pong"`
lblWithLineAndNonIndexedLabelsFilter := `{foo="bar", fizz="buzz"} | ping="pong" |= "filter"`
chunkEntry := retention.ChunkEntry{ chunkEntry := retention.ChunkEntry{
ChunkRef: retention.ChunkRef{ ChunkRef: retention.ChunkRef{
@ -56,16 +65,16 @@ func TestDeleteRequest_IsDeleted(t *testing.T) {
}, },
}, },
{ {
name: "whole chunk deleted with filter present", name: "whole chunk deleted with line filter present",
deleteRequest: DeleteRequest{ deleteRequest: DeleteRequest{
UserID: user1, UserID: user1,
StartTime: now.Add(-3 * time.Hour), StartTime: now.Add(-3 * time.Hour),
EndTime: now.Add(-time.Hour), EndTime: now.Add(-time.Hour),
Query: lblWithFilter, Query: lblWithLineFilter,
}, },
expectedResp: resp{ expectedResp: resp{
isDeleted: true, isDeleted: true,
expectedFilter: func(ts time.Time, s string) bool { expectedFilter: func(ts time.Time, s string, _ ...labels.Label) bool {
tsUnixNano := ts.UnixNano() tsUnixNano := ts.UnixNano()
if strings.Contains(s, "filter") && now.Add(-3*time.Hour).UnixNano() <= tsUnixNano && tsUnixNano <= now.Add(-time.Hour).UnixNano() { if strings.Contains(s, "filter") && now.Add(-3*time.Hour).UnixNano() <= tsUnixNano && tsUnixNano <= now.Add(-time.Hour).UnixNano() {
return true return true
@ -74,6 +83,44 @@ func TestDeleteRequest_IsDeleted(t *testing.T) {
}, },
}, },
}, },
{
name: "whole chunk deleted with non-indexed labels filter present",
deleteRequest: DeleteRequest{
UserID: user1,
StartTime: now.Add(-3 * time.Hour),
EndTime: now.Add(-time.Hour),
Query: lblWithNonIndexedLabelsFilter,
},
expectedResp: resp{
isDeleted: true,
expectedFilter: func(ts time.Time, s string, nonIndexedLabels ...labels.Label) bool {
tsUnixNano := ts.UnixNano()
if labels.Labels(nonIndexedLabels).Get(lblPing) == lblPong && now.Add(-3*time.Hour).UnixNano() <= tsUnixNano && tsUnixNano <= now.Add(-time.Hour).UnixNano() {
return true
}
return false
},
},
},
{
name: "whole chunk deleted with line and non-indexed labels filter present",
deleteRequest: DeleteRequest{
UserID: user1,
StartTime: now.Add(-3 * time.Hour),
EndTime: now.Add(-time.Hour),
Query: lblWithLineAndNonIndexedLabelsFilter,
},
expectedResp: resp{
isDeleted: true,
expectedFilter: func(ts time.Time, s string, nonIndexedLabels ...labels.Label) bool {
tsUnixNano := ts.UnixNano()
if strings.Contains(s, "filter") && labels.Labels(nonIndexedLabels).Get(lblPing) == lblPong && now.Add(-3*time.Hour).UnixNano() <= tsUnixNano && tsUnixNano <= now.Add(-time.Hour).UnixNano() {
return true
}
return false
},
},
},
{ {
name: "chunk deleted from beginning", name: "chunk deleted from beginning",
deleteRequest: DeleteRequest{ deleteRequest: DeleteRequest{
@ -84,7 +131,7 @@ func TestDeleteRequest_IsDeleted(t *testing.T) {
}, },
expectedResp: resp{ expectedResp: resp{
isDeleted: true, isDeleted: true,
expectedFilter: func(ts time.Time, s string) bool { expectedFilter: func(ts time.Time, s string, _ ...labels.Label) bool {
tsUnixNano := ts.UnixNano() tsUnixNano := ts.UnixNano()
if now.Add(-3*time.Hour).UnixNano() <= tsUnixNano && tsUnixNano <= now.Add(-2*time.Hour).UnixNano() { if now.Add(-3*time.Hour).UnixNano() <= tsUnixNano && tsUnixNano <= now.Add(-2*time.Hour).UnixNano() {
return true return true
@ -103,7 +150,7 @@ func TestDeleteRequest_IsDeleted(t *testing.T) {
}, },
expectedResp: resp{ expectedResp: resp{
isDeleted: true, isDeleted: true,
expectedFilter: func(ts time.Time, s string) bool { expectedFilter: func(ts time.Time, s string, _ ...labels.Label) bool {
tsUnixNano := ts.UnixNano() tsUnixNano := ts.UnixNano()
if now.Add(-2*time.Hour).UnixNano() <= tsUnixNano && tsUnixNano <= now.UnixNano() { if now.Add(-2*time.Hour).UnixNano() <= tsUnixNano && tsUnixNano <= now.UnixNano() {
return true return true
@ -118,11 +165,11 @@ func TestDeleteRequest_IsDeleted(t *testing.T) {
UserID: user1, UserID: user1,
StartTime: now.Add(-2 * time.Hour), StartTime: now.Add(-2 * time.Hour),
EndTime: now, EndTime: now,
Query: lblWithFilter, Query: lblWithLineFilter,
}, },
expectedResp: resp{ expectedResp: resp{
isDeleted: true, isDeleted: true,
expectedFilter: func(ts time.Time, s string) bool { expectedFilter: func(ts time.Time, s string, _ ...labels.Label) bool {
tsUnixNano := ts.UnixNano() tsUnixNano := ts.UnixNano()
if strings.Contains(s, "filter") && now.Add(-2*time.Hour).UnixNano() <= tsUnixNano && tsUnixNano <= now.UnixNano() { if strings.Contains(s, "filter") && now.Add(-2*time.Hour).UnixNano() <= tsUnixNano && tsUnixNano <= now.UnixNano() {
return true return true
@ -131,6 +178,44 @@ func TestDeleteRequest_IsDeleted(t *testing.T) {
}, },
}, },
}, },
{
name: "chunk deleted from end with non-indexed labels filter present",
deleteRequest: DeleteRequest{
UserID: user1,
StartTime: now.Add(-2 * time.Hour),
EndTime: now,
Query: lblWithNonIndexedLabelsFilter,
},
expectedResp: resp{
isDeleted: true,
expectedFilter: func(ts time.Time, s string, nonIndexedLabels ...labels.Label) bool {
tsUnixNano := ts.UnixNano()
if labels.Labels(nonIndexedLabels).Get(lblPing) == lblPong && now.Add(-2*time.Hour).UnixNano() <= tsUnixNano && tsUnixNano <= now.UnixNano() {
return true
}
return false
},
},
},
{
name: "chunk deleted from end with line and non-indexed labels filter present",
deleteRequest: DeleteRequest{
UserID: user1,
StartTime: now.Add(-2 * time.Hour),
EndTime: now,
Query: lblWithLineAndNonIndexedLabelsFilter,
},
expectedResp: resp{
isDeleted: true,
expectedFilter: func(ts time.Time, s string, nonIndexedLabels ...labels.Label) bool {
tsUnixNano := ts.UnixNano()
if strings.Contains(s, "filter") && labels.Labels(nonIndexedLabels).Get(lblPing) == lblPong && now.Add(-2*time.Hour).UnixNano() <= tsUnixNano && tsUnixNano <= now.UnixNano() {
return true
}
return false
},
},
},
{ {
name: "chunk deleted in the middle", name: "chunk deleted in the middle",
deleteRequest: DeleteRequest{ deleteRequest: DeleteRequest{
@ -141,7 +226,7 @@ func TestDeleteRequest_IsDeleted(t *testing.T) {
}, },
expectedResp: resp{ expectedResp: resp{
isDeleted: true, isDeleted: true,
expectedFilter: func(ts time.Time, s string) bool { expectedFilter: func(ts time.Time, s string, _ ...labels.Label) bool {
tsUnixNano := ts.UnixNano() tsUnixNano := ts.UnixNano()
if now.Add(-(2*time.Hour+30*time.Minute)).UnixNano() <= tsUnixNano && tsUnixNano <= now.Add(-(time.Hour+30*time.Minute)).UnixNano() { if now.Add(-(2*time.Hour+30*time.Minute)).UnixNano() <= tsUnixNano && tsUnixNano <= now.Add(-(time.Hour+30*time.Minute)).UnixNano() {
return true return true
@ -203,7 +288,15 @@ func TestDeleteRequest_IsDeleted(t *testing.T) {
if start.Time().Minute()%2 == 1 { if start.Time().Minute()%2 == 1 {
line = "filter bar" line = "filter bar"
} }
require.Equal(t, tc.expectedResp.expectedFilter(start.Time(), line), filterFunc(start.Time(), line), "line", line, "time", start.Time(), "now", now.Time())
// mix of empty, ding=dong and ping=pong as non-indexed labels
var nonIndexedLabels []labels.Label
if start.Time().Minute()%3 == 0 {
nonIndexedLabels = []labels.Label{{Name: lblPing, Value: lblPong}}
} else if start.Time().Minute()%2 == 0 {
nonIndexedLabels = []labels.Label{{Name: "ting", Value: "tong"}}
}
require.Equal(t, tc.expectedResp.expectedFilter(start.Time(), line, nonIndexedLabels...), filterFunc(start.Time(), line, nonIndexedLabels...), "line", line, "time", start.Time(), "now", now.Time())
} }
}) })
} }
@ -219,7 +312,7 @@ func mustParseLabel(input string) labels.Labels {
} }
func TestDeleteRequest_FilterFunction(t *testing.T) { func TestDeleteRequest_FilterFunction(t *testing.T) {
t.Run("one_line_matching", func(t *testing.T) { t.Run("one line matching with line filter", func(t *testing.T) {
dr := DeleteRequest{ dr := DeleteRequest{
Query: `{foo="bar"} |= "some"`, Query: `{foo="bar"} |= "some"`,
DeletedLines: 0, DeletedLines: 0,
@ -228,7 +321,7 @@ func TestDeleteRequest_FilterFunction(t *testing.T) {
EndTime: math.MaxInt64, EndTime: math.MaxInt64,
} }
lblStr := `{foo="bar"}` lblStr := lblFooBar
lbls := mustParseLabel(lblStr) lbls := mustParseLabel(lblStr)
require.NoError(t, dr.SetQuery(dr.Query)) require.NoError(t, dr.SetQuery(dr.Query))
@ -242,7 +335,54 @@ func TestDeleteRequest_FilterFunction(t *testing.T) {
require.Equal(t, float64(1), testutil.ToFloat64(dr.Metrics.deletedLinesTotal)) require.Equal(t, float64(1), testutil.ToFloat64(dr.Metrics.deletedLinesTotal))
}) })
t.Run("labels_not_matching", func(t *testing.T) { t.Run("one line matching with non-indexed labels filter", func(t *testing.T) {
dr := DeleteRequest{
Query: `{foo="bar"} | ping="pong"`,
DeletedLines: 0,
Metrics: newDeleteRequestsManagerMetrics(prometheus.NewPedanticRegistry()),
StartTime: 0,
EndTime: math.MaxInt64,
}
lblStr := lblFooBar
lbls := mustParseLabel(lblStr)
require.NoError(t, dr.SetQuery(dr.Query))
f, err := dr.FilterFunction(lbls)
require.NoError(t, err)
require.True(t, f(time.Now(), `some line`, labels.Label{Name: lblPing, Value: lblPong}))
require.False(t, f(time.Now(), ""))
require.False(t, f(time.Now(), "some line"))
require.Equal(t, int32(1), dr.DeletedLines)
require.Equal(t, float64(1), testutil.ToFloat64(dr.Metrics.deletedLinesTotal))
})
t.Run("one line matching with line and non-indexed labels filter", func(t *testing.T) {
dr := DeleteRequest{
Query: `{foo="bar"} | ping="pong" |= "some"`,
DeletedLines: 0,
Metrics: newDeleteRequestsManagerMetrics(prometheus.NewPedanticRegistry()),
StartTime: 0,
EndTime: math.MaxInt64,
}
lblStr := lblFooBar
lbls := mustParseLabel(lblStr)
require.NoError(t, dr.SetQuery(dr.Query))
f, err := dr.FilterFunction(lbls)
require.NoError(t, err)
require.True(t, f(time.Now(), `some line`, labels.Label{Name: lblPing, Value: lblPong}))
require.False(t, f(time.Now(), ""))
require.False(t, f(time.Now(), "some line"))
require.False(t, f(time.Now(), "other line", labels.Label{Name: lblPing, Value: lblPong}))
require.Equal(t, int32(1), dr.DeletedLines)
require.Equal(t, float64(1), testutil.ToFloat64(dr.Metrics.deletedLinesTotal))
})
t.Run("labels not matching", func(t *testing.T) {
dr := DeleteRequest{ dr := DeleteRequest{
Query: `{foo="bar"} |= "some"`, Query: `{foo="bar"} |= "some"`,
DeletedLines: 0, DeletedLines: 0,
@ -265,7 +405,7 @@ func TestDeleteRequest_FilterFunction(t *testing.T) {
require.Panics(t, func() { testutil.ToFloat64(dr.Metrics.deletedLinesTotal) }) require.Panics(t, func() { testutil.ToFloat64(dr.Metrics.deletedLinesTotal) })
}) })
t.Run("no_line_filter", func(t *testing.T) { t.Run("no line filter", func(t *testing.T) {
now := model.Now() now := model.Now()
dr := DeleteRequest{ dr := DeleteRequest{
Query: `{namespace="default"}`, Query: `{namespace="default"}`,

@ -10,6 +10,7 @@ import (
"github.com/go-kit/log/level" "github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/deletionmode" "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/deletionmode"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/retention" "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/retention"
@ -268,9 +269,9 @@ func (d *DeleteRequestsManager) Expired(ref retention.ChunkEntry, _ model.Time)
} }
d.metrics.deleteRequestsChunksSelectedTotal.WithLabelValues(string(ref.UserID)).Inc() d.metrics.deleteRequestsChunksSelectedTotal.WithLabelValues(string(ref.UserID)).Inc()
return true, func(ts time.Time, s string) bool { return true, func(ts time.Time, s string, nonIndexedLabels ...labels.Label) bool {
for _, ff := range filterFuncs { for _, ff := range filterFuncs {
if ff(ts, s) { if ff(ts, s, nonIndexedLabels...) {
return true return true
} }
} }

@ -7,6 +7,7 @@ import (
"time" "time"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/logql/syntax" "github.com/grafana/loki/pkg/logql/syntax"
@ -27,6 +28,8 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
lblFoo, err := syntax.ParseLabels(`{foo="bar"}`) lblFoo, err := syntax.ParseLabels(`{foo="bar"}`)
require.NoError(t, err) require.NoError(t, err)
streamSelectorWithLineFilters := lblFoo.String() + `|="fizz"` streamSelectorWithLineFilters := lblFoo.String() + `|="fizz"`
streamSelectorWithNonIndexedLabelsFilters := lblFoo.String() + `| ping="pong"`
streamSelectorWithLineAndNonIndexedLabelsFilters := lblFoo.String() + `| ping="pong" |= "fizz"`
chunkEntry := retention.ChunkEntry{ chunkEntry := retention.ChunkEntry{
ChunkRef: retention.ChunkRef{ ChunkRef: retention.ChunkRef{
@ -75,6 +78,50 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
}, },
}, },
}, },
{
name: "no relevant delete requests",
deletionMode: deletionmode.FilterAndDelete,
batchSize: 70,
deleteRequestsFromStore: []DeleteRequest{
{
UserID: "different-user",
Query: lblFoo.String(),
StartTime: now.Add(-24 * time.Hour),
EndTime: now,
},
},
expectedResp: resp{
isExpired: false,
},
expectedDeletionRangeByUser: map[string]model.Interval{
"different-user": {
Start: now.Add(-24 * time.Hour),
End: now,
},
},
},
{
name: "delete request not matching labels",
deletionMode: deletionmode.FilterAndDelete,
batchSize: 70,
deleteRequestsFromStore: []DeleteRequest{
{
UserID: testUserID,
Query: `{fizz="buzz"}`,
StartTime: now.Add(-24 * time.Hour),
EndTime: now,
},
},
expectedResp: resp{
isExpired: false,
},
expectedDeletionRangeByUser: map[string]model.Interval{
testUserID: {
Start: now.Add(-24 * time.Hour),
End: now,
},
},
},
{ {
name: "whole chunk deleted by single request", name: "whole chunk deleted by single request",
deletionMode: deletionmode.FilterAndDelete, deletionMode: deletionmode.FilterAndDelete,
@ -111,7 +158,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
}, },
expectedResp: resp{ expectedResp: resp{
isExpired: true, isExpired: true,
expectedFilter: func(ts time.Time, s string) bool { expectedFilter: func(ts time.Time, s string, _ ...labels.Label) bool {
return strings.Contains(s, "fizz") return strings.Contains(s, "fizz")
}, },
}, },
@ -122,6 +169,56 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
}, },
}, },
}, },
{
name: "whole chunk deleted by single request with non-indexed labels filters",
deletionMode: deletionmode.FilterAndDelete,
batchSize: 70,
deleteRequestsFromStore: []DeleteRequest{
{
UserID: testUserID,
Query: streamSelectorWithNonIndexedLabelsFilters,
StartTime: now.Add(-24 * time.Hour),
EndTime: now,
},
},
expectedResp: resp{
isExpired: true,
expectedFilter: func(ts time.Time, s string, nonIndexedLabels ...labels.Label) bool {
return labels.Labels(nonIndexedLabels).Get(lblPing) == lblPong
},
},
expectedDeletionRangeByUser: map[string]model.Interval{
testUserID: {
Start: now.Add(-24 * time.Hour),
End: now,
},
},
},
{
name: "whole chunk deleted by single request with line and non-indexed labels filters",
deletionMode: deletionmode.FilterAndDelete,
batchSize: 70,
deleteRequestsFromStore: []DeleteRequest{
{
UserID: testUserID,
Query: streamSelectorWithLineAndNonIndexedLabelsFilters,
StartTime: now.Add(-24 * time.Hour),
EndTime: now,
},
},
expectedResp: resp{
isExpired: true,
expectedFilter: func(ts time.Time, s string, nonIndexedLabels ...labels.Label) bool {
return labels.Labels(nonIndexedLabels).Get(lblPing) == lblPong && strings.Contains(s, "fizz")
},
},
expectedDeletionRangeByUser: map[string]model.Interval{
testUserID: {
Start: now.Add(-24 * time.Hour),
End: now,
},
},
},
{ {
name: "deleted interval out of range", name: "deleted interval out of range",
deletionMode: deletionmode.FilterAndDelete, deletionMode: deletionmode.FilterAndDelete,
@ -224,7 +321,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
}, },
expectedResp: resp{ expectedResp: resp{
isExpired: true, isExpired: true,
expectedFilter: func(ts time.Time, s string) bool { expectedFilter: func(ts time.Time, s string, _ ...labels.Label) bool {
return strings.Contains(s, "fizz") return strings.Contains(s, "fizz")
}, },
}, },
@ -235,6 +332,37 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
}, },
}, },
}, },
{
name: "multiple delete requests with non-indexed labels filters and one deleting the whole chunk",
deletionMode: deletionmode.FilterAndDelete,
batchSize: 70,
deleteRequestsFromStore: []DeleteRequest{
{
UserID: testUserID,
Query: streamSelectorWithNonIndexedLabelsFilters,
StartTime: now.Add(-48 * time.Hour),
EndTime: now.Add(-24 * time.Hour),
},
{
UserID: testUserID,
Query: streamSelectorWithNonIndexedLabelsFilters,
StartTime: now.Add(-12 * time.Hour),
EndTime: now,
},
},
expectedResp: resp{
isExpired: true,
expectedFilter: func(ts time.Time, s string, nonIndexedLabels ...labels.Label) bool {
return labels.Labels(nonIndexedLabels).Get(lblPing) == lblPong
},
},
expectedDeletionRangeByUser: map[string]model.Interval{
testUserID: {
Start: now.Add(-48 * time.Hour),
End: now,
},
},
},
{ {
name: "multiple delete requests causing multiple holes", name: "multiple delete requests causing multiple holes",
deletionMode: deletionmode.FilterAndDelete, deletionMode: deletionmode.FilterAndDelete,
@ -267,7 +395,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
}, },
expectedResp: resp{ expectedResp: resp{
isExpired: true, isExpired: true,
expectedFilter: func(ts time.Time, s string) bool { expectedFilter: func(ts time.Time, s string, _ ...labels.Label) bool {
tsUnixNano := ts.UnixNano() tsUnixNano := ts.UnixNano()
if (now.Add(-13*time.Hour).UnixNano() <= tsUnixNano && tsUnixNano <= now.Add(-11*time.Hour).UnixNano()) || if (now.Add(-13*time.Hour).UnixNano() <= tsUnixNano && tsUnixNano <= now.Add(-11*time.Hour).UnixNano()) ||
(now.Add(-10*time.Hour).UnixNano() <= tsUnixNano && tsUnixNano <= now.Add(-8*time.Hour).UnixNano()) || (now.Add(-10*time.Hour).UnixNano() <= tsUnixNano && tsUnixNano <= now.Add(-8*time.Hour).UnixNano()) ||
@ -305,7 +433,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
}, },
expectedResp: resp{ expectedResp: resp{
isExpired: true, isExpired: true,
expectedFilter: func(ts time.Time, s string) bool { expectedFilter: func(ts time.Time, s string, _ ...labels.Label) bool {
return true return true
}, },
}, },
@ -336,7 +464,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
}, },
expectedResp: resp{ expectedResp: resp{
isExpired: true, isExpired: true,
expectedFilter: func(ts time.Time, s string) bool { expectedFilter: func(ts time.Time, s string, _ ...labels.Label) bool {
return strings.Contains(s, "fizz") return strings.Contains(s, "fizz")
}, },
}, },
@ -347,6 +475,37 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
}, },
}, },
}, },
{
name: "multiple overlapping requests with non-indexed labels filters deleting the whole chunk",
deletionMode: deletionmode.FilterAndDelete,
batchSize: 70,
deleteRequestsFromStore: []DeleteRequest{
{
UserID: testUserID,
Query: streamSelectorWithNonIndexedLabelsFilters,
StartTime: now.Add(-13 * time.Hour),
EndTime: now.Add(-6 * time.Hour),
},
{
UserID: testUserID,
Query: streamSelectorWithNonIndexedLabelsFilters,
StartTime: now.Add(-8 * time.Hour),
EndTime: now,
},
},
expectedResp: resp{
isExpired: true,
expectedFilter: func(ts time.Time, s string, nonIndexedLabels ...labels.Label) bool {
return labels.Labels(nonIndexedLabels).Get(lblPing) == lblPong
},
},
expectedDeletionRangeByUser: map[string]model.Interval{
testUserID: {
Start: now.Add(-13 * time.Hour),
End: now,
},
},
},
{ {
name: "multiple non-overlapping requests deleting the whole chunk", name: "multiple non-overlapping requests deleting the whole chunk",
deletionMode: deletionmode.FilterAndDelete, deletionMode: deletionmode.FilterAndDelete,
@ -373,7 +532,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
}, },
expectedResp: resp{ expectedResp: resp{
isExpired: true, isExpired: true,
expectedFilter: func(ts time.Time, s string) bool { expectedFilter: func(ts time.Time, s string, _ ...labels.Label) bool {
return true return true
}, },
}, },
@ -410,7 +569,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
}, },
expectedResp: resp{ expectedResp: resp{
isExpired: true, isExpired: true,
expectedFilter: func(ts time.Time, s string) bool { expectedFilter: func(ts time.Time, s string, _ ...labels.Label) bool {
return strings.Contains(s, "fizz") return strings.Contains(s, "fizz")
}, },
}, },
@ -421,6 +580,43 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
}, },
}, },
}, },
{
name: "multiple non-overlapping requests with non-indexed labels filter deleting the whole chunk",
deletionMode: deletionmode.FilterAndDelete,
batchSize: 70,
deleteRequestsFromStore: []DeleteRequest{
{
UserID: testUserID,
Query: streamSelectorWithNonIndexedLabelsFilters,
StartTime: now.Add(-12 * time.Hour),
EndTime: now.Add(-6*time.Hour) - 1,
},
{
UserID: testUserID,
Query: streamSelectorWithNonIndexedLabelsFilters,
StartTime: now.Add(-6 * time.Hour),
EndTime: now.Add(-4*time.Hour) - 1,
},
{
UserID: testUserID,
Query: streamSelectorWithNonIndexedLabelsFilters,
StartTime: now.Add(-4 * time.Hour),
EndTime: now,
},
},
expectedResp: resp{
isExpired: true,
expectedFilter: func(ts time.Time, s string, nonIndexedLabels ...labels.Label) bool {
return labels.Labels(nonIndexedLabels).Get(lblPing) == lblPong
},
},
expectedDeletionRangeByUser: map[string]model.Interval{
testUserID: {
Start: now.Add(-12 * time.Hour),
End: now,
},
},
},
{ {
name: "deletes are disabled", name: "deletes are disabled",
deletionMode: deletionmode.Disabled, deletionMode: deletionmode.Disabled,
@ -521,7 +717,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
}, },
expectedResp: resp{ expectedResp: resp{
isExpired: true, isExpired: true,
expectedFilter: func(ts time.Time, s string) bool { expectedFilter: func(ts time.Time, s string, _ ...labels.Label) bool {
tsUnixNano := ts.UnixNano() tsUnixNano := ts.UnixNano()
if (now.Add(-13*time.Hour).UnixNano() <= tsUnixNano && tsUnixNano <= now.Add(-11*time.Hour).UnixNano()) || if (now.Add(-13*time.Hour).UnixNano() <= tsUnixNano && tsUnixNano <= now.Add(-11*time.Hour).UnixNano()) ||
(now.Add(-10*time.Hour).UnixNano() <= tsUnixNano && tsUnixNano <= now.Add(-8*time.Hour).UnixNano()) { (now.Add(-10*time.Hour).UnixNano() <= tsUnixNano && tsUnixNano <= now.Add(-8*time.Hour).UnixNano()) {
@ -562,7 +758,14 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
if start.Time().Minute()%2 == 1 { if start.Time().Minute()%2 == 1 {
line = "fizz buzz" line = "fizz buzz"
} }
require.Equal(t, tc.expectedResp.expectedFilter(start.Time(), line), filterFunc(start.Time(), line), "line", line, "time", start.Time(), "now", now.Time()) // mix of empty, ding=dong and ping=pong as non-indexed labels
var nonIndexedLabels []labels.Label
if start.Time().Minute()%3 == 0 {
nonIndexedLabels = []labels.Label{{Name: lblPing, Value: lblPong}}
} else if start.Time().Minute()%2 == 0 {
nonIndexedLabels = []labels.Label{{Name: "ting", Value: "tong"}}
}
require.Equal(t, tc.expectedResp.expectedFilter(start.Time(), line, nonIndexedLabels...), filterFunc(start.Time(), line, nonIndexedLabels...), "line", line, "time", start.Time(), "now", now.Time())
} }
require.Equal(t, len(tc.expectedDeletionRangeByUser), len(mgr.deleteRequestsToProcess)) require.Equal(t, len(tc.expectedDeletionRangeByUser), len(mgr.deleteRequestsToProcess))

@ -366,8 +366,8 @@ func (c *chunkRewriter) rewriteChunk(ctx context.Context, ce ChunkEntry, tableIn
return false, false, fmt.Errorf("expected 1 entry for chunk %s but found %d in storage", chunkID, len(chks)) return false, false, fmt.Errorf("expected 1 entry for chunk %s but found %d in storage", chunkID, len(chks))
} }
newChunkData, err := chks[0].Data.Rebound(ce.From, ce.Through, func(ts time.Time, s string) bool { newChunkData, err := chks[0].Data.Rebound(ce.From, ce.Through, func(ts time.Time, s string, nonIndexedLabels ...labels.Label) bool {
if filterFunc(ts, s) { if filterFunc(ts, s, nonIndexedLabels...) {
linesDeleted = true linesDeleted = true
return true return true
} }

@ -295,7 +295,7 @@ func TestChunkRewriter(t *testing.T) {
{ {
name: "no rewrites", name: "no rewrites",
chunk: createChunk(t, "1", labels.Labels{labels.Label{Name: "foo", Value: "bar"}}, todaysTableInterval.Start, todaysTableInterval.Start.Add(time.Hour)), chunk: createChunk(t, "1", labels.Labels{labels.Label{Name: "foo", Value: "bar"}}, todaysTableInterval.Start, todaysTableInterval.Start.Add(time.Hour)),
filterFunc: func(ts time.Time, s string) bool { filterFunc: func(ts time.Time, s string, _ ...labels.Label) bool {
return false return false
}, },
expectedRespByTables: map[string]tableResp{ expectedRespByTables: map[string]tableResp{
@ -305,7 +305,7 @@ func TestChunkRewriter(t *testing.T) {
{ {
name: "no rewrites with chunk spanning multiple tables", name: "no rewrites with chunk spanning multiple tables",
chunk: createChunk(t, "1", labels.Labels{labels.Label{Name: "foo", Value: "bar"}}, todaysTableInterval.End.Add(-48*time.Hour), todaysTableInterval.End), chunk: createChunk(t, "1", labels.Labels{labels.Label{Name: "foo", Value: "bar"}}, todaysTableInterval.End.Add(-48*time.Hour), todaysTableInterval.End),
filterFunc: func(ts time.Time, s string) bool { filterFunc: func(ts time.Time, s string, _ ...labels.Label) bool {
return false return false
}, },
expectedRespByTables: map[string]tableResp{ expectedRespByTables: map[string]tableResp{
@ -317,7 +317,7 @@ func TestChunkRewriter(t *testing.T) {
{ {
name: "rewrite first half", name: "rewrite first half",
chunk: createChunk(t, "1", labels.Labels{labels.Label{Name: "foo", Value: "bar"}}, todaysTableInterval.Start, todaysTableInterval.Start.Add(2*time.Hour)), chunk: createChunk(t, "1", labels.Labels{labels.Label{Name: "foo", Value: "bar"}}, todaysTableInterval.Start, todaysTableInterval.Start.Add(2*time.Hour)),
filterFunc: func(ts time.Time, _ string) bool { filterFunc: func(ts time.Time, _ string, _ ...labels.Label) bool {
tsUnixNano := ts.UnixNano() tsUnixNano := ts.UnixNano()
if todaysTableInterval.Start.UnixNano() <= tsUnixNano && tsUnixNano <= todaysTableInterval.Start.Add(time.Hour).UnixNano() { if todaysTableInterval.Start.UnixNano() <= tsUnixNano && tsUnixNano <= todaysTableInterval.Start.Add(time.Hour).UnixNano() {
return true return true
@ -338,10 +338,36 @@ func TestChunkRewriter(t *testing.T) {
}, },
}, },
}, },
{
name: "rewrite first half using non-indexed labels",
chunk: createChunk(t, "1", labels.Labels{labels.Label{Name: "foo", Value: "bar"}}, todaysTableInterval.Start, todaysTableInterval.Start.Add(2*time.Hour)),
filterFunc: func(ts time.Time, _ string, nonIndexedLabels ...labels.Label) bool {
tsUnixNano := ts.UnixNano()
if labels.Labels(nonIndexedLabels).Get("foo") == model.TimeFromUnixNano(ts.UnixNano()).String() &&
todaysTableInterval.Start.UnixNano() <= tsUnixNano &&
tsUnixNano <= todaysTableInterval.Start.Add(time.Hour).UnixNano() {
return true
}
return false
},
expectedRespByTables: map[string]tableResp{
schema.config.IndexTables.TableFor(todaysTableInterval.Start): {
mustDeleteLines: true,
mustRewriteChunk: true,
},
},
retainedChunkIntervals: []model.Interval{
{
Start: todaysTableInterval.Start.Add(time.Hour).Add(time.Minute),
End: todaysTableInterval.Start.Add(2 * time.Hour),
},
},
},
{ {
name: "rewrite second half", name: "rewrite second half",
chunk: createChunk(t, "1", labels.Labels{labels.Label{Name: "foo", Value: "bar"}}, todaysTableInterval.Start, todaysTableInterval.Start.Add(2*time.Hour)), chunk: createChunk(t, "1", labels.Labels{labels.Label{Name: "foo", Value: "bar"}}, todaysTableInterval.Start, todaysTableInterval.Start.Add(2*time.Hour)),
filterFunc: func(ts time.Time, _ string) bool { filterFunc: func(ts time.Time, _ string, _ ...labels.Label) bool {
tsUnixNano := ts.UnixNano() tsUnixNano := ts.UnixNano()
if todaysTableInterval.Start.Add(time.Hour).UnixNano() <= tsUnixNano && tsUnixNano <= todaysTableInterval.Start.Add(2*time.Hour).UnixNano() { if todaysTableInterval.Start.Add(time.Hour).UnixNano() <= tsUnixNano && tsUnixNano <= todaysTableInterval.Start.Add(2*time.Hour).UnixNano() {
return true return true
@ -365,7 +391,7 @@ func TestChunkRewriter(t *testing.T) {
{ {
name: "rewrite multiple intervals", name: "rewrite multiple intervals",
chunk: createChunk(t, "1", labels.Labels{labels.Label{Name: "foo", Value: "bar"}}, todaysTableInterval.Start, todaysTableInterval.Start.Add(12*time.Hour)), chunk: createChunk(t, "1", labels.Labels{labels.Label{Name: "foo", Value: "bar"}}, todaysTableInterval.Start, todaysTableInterval.Start.Add(12*time.Hour)),
filterFunc: func(ts time.Time, _ string) bool { filterFunc: func(ts time.Time, _ string, _ ...labels.Label) bool {
tsUnixNano := ts.UnixNano() tsUnixNano := ts.UnixNano()
if (todaysTableInterval.Start.UnixNano() <= tsUnixNano && tsUnixNano <= todaysTableInterval.Start.Add(2*time.Hour).UnixNano()) || if (todaysTableInterval.Start.UnixNano() <= tsUnixNano && tsUnixNano <= todaysTableInterval.Start.Add(2*time.Hour).UnixNano()) ||
(todaysTableInterval.Start.Add(5*time.Hour).UnixNano() <= tsUnixNano && tsUnixNano <= todaysTableInterval.Start.Add(9*time.Hour).UnixNano()) || (todaysTableInterval.Start.Add(5*time.Hour).UnixNano() <= tsUnixNano && tsUnixNano <= todaysTableInterval.Start.Add(9*time.Hour).UnixNano()) ||
@ -395,7 +421,7 @@ func TestChunkRewriter(t *testing.T) {
{ {
name: "rewrite chunk spanning multiple days with multiple intervals - delete partially for each day", name: "rewrite chunk spanning multiple days with multiple intervals - delete partially for each day",
chunk: createChunk(t, "1", labels.Labels{labels.Label{Name: "foo", Value: "bar"}}, todaysTableInterval.End.Add(-72*time.Hour), todaysTableInterval.End), chunk: createChunk(t, "1", labels.Labels{labels.Label{Name: "foo", Value: "bar"}}, todaysTableInterval.End.Add(-72*time.Hour), todaysTableInterval.End),
filterFunc: func(ts time.Time, _ string) bool { filterFunc: func(ts time.Time, _ string, _ ...labels.Label) bool {
tsUnixNano := ts.UnixNano() tsUnixNano := ts.UnixNano()
if (todaysTableInterval.End.Add(-71*time.Hour).UnixNano() <= tsUnixNano && tsUnixNano <= todaysTableInterval.End.Add(-47*time.Hour).UnixNano()) || if (todaysTableInterval.End.Add(-71*time.Hour).UnixNano() <= tsUnixNano && tsUnixNano <= todaysTableInterval.End.Add(-47*time.Hour).UnixNano()) ||
(todaysTableInterval.End.Add(-40*time.Hour).UnixNano() <= tsUnixNano && tsUnixNano <= todaysTableInterval.End.Add(-30*time.Hour).UnixNano()) || (todaysTableInterval.End.Add(-40*time.Hour).UnixNano() <= tsUnixNano && tsUnixNano <= todaysTableInterval.End.Add(-30*time.Hour).UnixNano()) ||
@ -441,7 +467,7 @@ func TestChunkRewriter(t *testing.T) {
{ {
name: "rewrite chunk spanning multiple days with multiple intervals - delete just one whole day", name: "rewrite chunk spanning multiple days with multiple intervals - delete just one whole day",
chunk: createChunk(t, "1", labels.Labels{labels.Label{Name: "foo", Value: "bar"}}, todaysTableInterval.End.Add(-72*time.Hour), todaysTableInterval.End), chunk: createChunk(t, "1", labels.Labels{labels.Label{Name: "foo", Value: "bar"}}, todaysTableInterval.End.Add(-72*time.Hour), todaysTableInterval.End),
filterFunc: func(ts time.Time, _ string) bool { filterFunc: func(ts time.Time, _ string, _ ...labels.Label) bool {
tsUnixNano := ts.UnixNano() tsUnixNano := ts.UnixNano()
if todaysTableInterval.Start.UnixNano() <= tsUnixNano && tsUnixNano <= todaysTableInterval.End.UnixNano() { if todaysTableInterval.Start.UnixNano() <= tsUnixNano && tsUnixNano <= todaysTableInterval.End.UnixNano() {
return true return true
@ -636,7 +662,7 @@ func TestMarkForDelete_SeriesCleanup(t *testing.T) {
expiry: []chunkExpiry{ expiry: []chunkExpiry{
{ {
isExpired: true, isExpired: true,
filterFunc: func(ts time.Time, s string) bool { filterFunc: func(ts time.Time, s string, _ ...labels.Label) bool {
return false return false
}, },
}, },
@ -679,7 +705,7 @@ func TestMarkForDelete_SeriesCleanup(t *testing.T) {
expiry: []chunkExpiry{ expiry: []chunkExpiry{
{ {
isExpired: true, isExpired: true,
filterFunc: func(ts time.Time, _ string) bool { filterFunc: func(ts time.Time, _ string, _ ...labels.Label) bool {
tsUnixNano := ts.UnixNano() tsUnixNano := ts.UnixNano()
if todaysTableInterval.Start.UnixNano() <= tsUnixNano && tsUnixNano <= todaysTableInterval.Start.Add(15*time.Minute).UnixNano() { if todaysTableInterval.Start.UnixNano() <= tsUnixNano && tsUnixNano <= todaysTableInterval.Start.Add(15*time.Minute).UnixNano() {
return true return true
@ -735,7 +761,7 @@ func TestMarkForDelete_SeriesCleanup(t *testing.T) {
}, },
{ {
isExpired: true, isExpired: true,
filterFunc: func(ts time.Time, _ string) bool { filterFunc: func(ts time.Time, _ string, _ ...labels.Label) bool {
tsUnixNano := ts.UnixNano() tsUnixNano := ts.UnixNano()
if todaysTableInterval.Start.UnixNano() <= tsUnixNano && tsUnixNano <= todaysTableInterval.Start.Add(15*time.Minute).UnixNano() { if todaysTableInterval.Start.UnixNano() <= tsUnixNano && tsUnixNano <= todaysTableInterval.Start.Add(15*time.Minute).UnixNano() {
return true return true
@ -763,7 +789,7 @@ func TestMarkForDelete_SeriesCleanup(t *testing.T) {
expiry: []chunkExpiry{ expiry: []chunkExpiry{
{ {
isExpired: true, isExpired: true,
filterFunc: func(ts time.Time, s string) bool { filterFunc: func(ts time.Time, s string, _ ...labels.Label) bool {
return ts.UnixNano() < todaysTableInterval.Start.UnixNano() return ts.UnixNano() < todaysTableInterval.Start.UnixNano()
}, },
}, },
@ -786,7 +812,7 @@ func TestMarkForDelete_SeriesCleanup(t *testing.T) {
expiry: []chunkExpiry{ expiry: []chunkExpiry{
{ {
isExpired: true, isExpired: true,
filterFunc: func(ts time.Time, s string) bool { filterFunc: func(ts time.Time, s string, _ ...labels.Label) bool {
return ts.UnixNano() < todaysTableInterval.Start.Add(-30*time.Minute).UnixNano() return ts.UnixNano() < todaysTableInterval.Start.Add(-30*time.Minute).UnixNano()
}, },
}, },

@ -1,5 +1,9 @@
package filter package filter
import "time" import (
"time"
type Func func(ts time.Time, s string) bool "github.com/prometheus/prometheus/model/labels"
)
type Func func(ts time.Time, s string, nonIndexedLabels ...labels.Label) bool

Loading…
Cancel
Save