Add a DeletionMode config variable (#5481)

* Add deletionEnabled setting and remove delete request manager

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Rebase

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Remove deletion handling from delete requests manager

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* change store so it stores a logql statement

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Add validation code for logql statement

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Run deleteRequestsManager when deletion is enabled

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Remove unused variables

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Revert "Remove deletion handling from delete requests manager"

This reverts commit ce4f774497aa590caff86b0745ec81588592a9e1.

* Re-add IsDeleted method

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Re-add tests for IsDeleted

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Fix delete request store test

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Fix linting issue

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Revert compactor changes

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Add deletion mode

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Add v1 mode

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Rename LogQLRequest to Query

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Fix linting issues

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Use DeleteMode in compactor module

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Rename logql to query

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Put cancel under delete verb

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Update documentation

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Update changelog

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Revert only the API surface area while keeping everything else

* Use moved code in syntax package

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Remove duplicte import

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Use renamed field in tests

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Remove duplicates and empty lines in changelog

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Update changelog description

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Update pkg/storage/stores/shipper/compactor/deletion/delete_request.go

Co-authored-by: Christian Simon <simon@swine.de>

* Update pkg/storage/stores/shipper/compactor/deletion/delete_requests_manager_test.go

Co-authored-by: Christian Simon <simon@swine.de>

* Update pkg/storage/stores/shipper/compactor/deletion/delete_requests_store.go

Co-authored-by: Christian Simon <simon@swine.de>

* Update pkg/storage/stores/shipper/compactor/deletion/delete_requests_store.go

Co-authored-by: Christian Simon <simon@swine.de>

* Update CHANGELOG.md

Co-authored-by: Christian Simon <simon@swine.de>

* Update pkg/storage/stores/shipper/compactor/deletion/delete_requests_store.go

Co-authored-by: Christian Simon <simon@swine.de>

* Make DeletionMode struct member non public

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Revert change to docs re cancellation

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Use same variable names

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Add parameter validation to changelog

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Rename v1 to WholeStreamDeletion

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Fix default value of deletion mode config setting

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* reimplement new api

* Add delete request handler when delete mode is set

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Remove unused variable

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Add comment to change the code when other deletion modes are available

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* create expirationChecker if deletionMode is set

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Address review comments

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Update pkg/storage/stores/shipper/compactor/compactor.go

Co-authored-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

* Rename AddQuery to SetQuery

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

Co-authored-by: Travis Patterson <travis.patterson@grafana.com>
Co-authored-by: Christian Simon <simon@swine.de>
Co-authored-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>
pull/5780/head
Michel Hollands 4 years ago committed by GitHub
parent ece1fb5a34
commit b865b81ba0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 13
      pkg/loki/modules.go
  3. 12
      pkg/querier/querier.go
  4. 20
      pkg/querier/querier_test.go
  5. 50
      pkg/storage/stores/shipper/compactor/compactor.go
  6. 39
      pkg/storage/stores/shipper/compactor/deletion/delete_request.go
  7. 23
      pkg/storage/stores/shipper/compactor/deletion/delete_request_test.go
  8. 30
      pkg/storage/stores/shipper/compactor/deletion/delete_requests_manager_test.go
  9. 30
      pkg/storage/stores/shipper/compactor/deletion/delete_requests_store.go
  10. 10
      pkg/storage/stores/shipper/compactor/deletion/delete_requests_store_test.go
  11. 50
      pkg/storage/stores/shipper/compactor/deletion/mode.go
  12. 33
      pkg/storage/stores/shipper/compactor/deletion/mode_test.go
  13. 2
      pkg/storage/stores/shipper/compactor/deletion/noop_delete_requests_store.go
  14. 21
      pkg/storage/stores/shipper/compactor/deletion/request_handler.go
  15. 28
      pkg/storage/stores/shipper/compactor/deletion/validation.go
  16. 39
      pkg/storage/stores/shipper/compactor/deletion/validation_test.go
  17. 20
      pkg/storage/stores/shipper/compactor/retention/expiration.go

@ -12,6 +12,7 @@
* [5536](https://github.com/grafana/loki/pull/5536) **jiachengxu**: Loki mixin: make labelsSelector in loki chunks dashboards configurable
* [5535](https://github.com/grafana/loki/pull/5535) **jiachengxu**: Loki mixins: use labels selector for loki chunks dashboard
* [5507](https://github.com/grafana/loki/pull/5507) **MichelHollands**: Remove extra param in call for inflightRequests metric.
* [5481](https://github.com/grafana/loki/pull/5481) **MichelHollands**: Add a DeletionMode config variable to specify the delete mode and validate match parameters.
* [5356](https://github.com/grafana/loki/pull/5356) **jbschami**: Enhance lambda-promtail to support adding extra labels from an environment variable value
* [5409](https://github.com/grafana/loki/pull/5409) **ldb**: Enable best effort parsing for Syslog messages
* [5392](https://github.com/grafana/loki/pull/5392) **MichelHollands**: Etcd credentials are parsed as secrets instead of plain text now.

@ -12,8 +12,6 @@ import (
"github.com/grafana/dskit/tenant"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/deletion"
"github.com/NYTimes/gziphandler"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
@ -52,6 +50,7 @@ import (
chunk_util "github.com/grafana/loki/pkg/storage/chunk/util"
"github.com/grafana/loki/pkg/storage/stores/shipper"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/deletion"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway/indexgatewaypb"
"github.com/grafana/loki/pkg/storage/stores/shipper/uploads"
@ -749,10 +748,12 @@ func (t *Loki) initCompactor() (services.Service, error) {
}
t.Server.HTTP.Path("/compactor/ring").Methods("GET", "POST").Handler(t.compactor)
if t.Cfg.CompactorConfig.RetentionEnabled {
t.Server.HTTP.Path("/loki/api/admin/delete").Methods("PUT", "POST").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.AddDeleteRequestHandler)))
t.Server.HTTP.Path("/loki/api/admin/delete").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.GetAllDeleteRequestsHandler)))
t.Server.HTTP.Path("/loki/api/admin/cancel_delete_request").Methods("PUT", "POST").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.CancelDeleteRequestHandler)))
// TODO: update this when the other deletion modes are available
if t.Cfg.CompactorConfig.RetentionEnabled && t.compactor.DeleteMode() == deletion.WholeStreamDeletion {
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("PUT", "POST").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.AddDeleteRequestHandler)))
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.GetAllDeleteRequestsHandler)))
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("DELETE").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.CancelDeleteRequestHandler)))
}
return t.compactor, nil

@ -226,13 +226,11 @@ func (q *SingleTenantQuerier) deletesForUser(ctx context.Context, startT, endT t
var deletes []*logproto.Delete
for _, del := range d {
if int64(del.StartTime) <= end && int64(del.EndTime) >= start {
for _, selector := range del.Selectors {
deletes = append(deletes, &logproto.Delete{
Selector: selector,
Start: int64(del.StartTime),
End: int64(del.EndTime),
})
}
deletes = append(deletes, &logproto.Delete{
Selector: del.Query,
Start: int64(del.StartTime),
End: int64(del.EndTime),
})
}
}

@ -740,11 +740,11 @@ func TestQuerier_SelectLogWithDeletes(t *testing.T) {
delGetter := &mockDeleteGettter{
results: []deletion.DeleteRequest{
{Selectors: []string{`0`}, StartTime: 0, EndTime: 100},
{Selectors: []string{`1`}, StartTime: 200, EndTime: 400},
{Selectors: []string{`2`}, StartTime: 400, EndTime: 500},
{Selectors: []string{`3`}, StartTime: 500, EndTime: 700},
{Selectors: []string{`4`}, StartTime: 700, EndTime: 900},
{Query: `0`, StartTime: 0, EndTime: 100},
{Query: `1`, StartTime: 200, EndTime: 400},
{Query: `2`, StartTime: 400, EndTime: 500},
{Query: `3`, StartTime: 500, EndTime: 700},
{Query: `4`, StartTime: 700, EndTime: 900},
},
}
@ -802,11 +802,11 @@ func TestQuerier_SelectSamplesWithDeletes(t *testing.T) {
delGetter := &mockDeleteGettter{
results: []deletion.DeleteRequest{
{Selectors: []string{`0`}, StartTime: 0, EndTime: 100},
{Selectors: []string{`1`}, StartTime: 200, EndTime: 400},
{Selectors: []string{`2`}, StartTime: 400, EndTime: 500},
{Selectors: []string{`3`}, StartTime: 500, EndTime: 700},
{Selectors: []string{`4`}, StartTime: 700, EndTime: 900},
{Query: `0`, StartTime: 0, EndTime: 100},
{Query: `1`, StartTime: 200, EndTime: 400},
{Query: `2`, StartTime: 400, EndTime: 500},
{Query: `3`, StartTime: 500, EndTime: 700},
{Query: `4`, StartTime: 700, EndTime: 900},
},
}

@ -6,6 +6,7 @@ import (
"fmt"
"net/http"
"path/filepath"
"strings"
"sync"
"time"
@ -67,6 +68,7 @@ type Config struct {
RetentionEnabled bool `yaml:"retention_enabled"`
RetentionDeleteDelay time.Duration `yaml:"retention_delete_delay"`
RetentionDeleteWorkCount int `yaml:"retention_delete_worker_count"`
DeletionMode string `yaml:"deletion_mode"`
DeleteRequestCancelPeriod time.Duration `yaml:"delete_request_cancel_period"`
MaxCompactionParallelism int `yaml:"max_compaction_parallelism"`
CompactorRing util.RingConfig `yaml:"compactor_ring,omitempty"`
@ -84,6 +86,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.RetentionDeleteWorkCount, "boltdb.shipper.compactor.retention-delete-worker-count", 150, "The total amount of worker to use to delete chunks.")
f.DurationVar(&cfg.DeleteRequestCancelPeriod, "boltdb.shipper.compactor.delete-request-cancel-period", 24*time.Hour, "Allow cancellation of delete request until duration after they are created. Data would be deleted only after delete requests have been older than this duration. Ideally this should be set to at least 24h.")
f.IntVar(&cfg.MaxCompactionParallelism, "boltdb.shipper.compactor.max-compaction-parallelism", 1, "Maximum number of tables to compact in parallel. While increasing this value, please make sure compactor has enough disk space allocated to be able to store and compact as many tables.")
f.StringVar(&cfg.DeletionMode, "boltdb.shipper.compactor.deletion-mode", "whole-stream-deletion", fmt.Sprintf("(Experimental) Deletion mode. Can be one of %v", strings.Join(deletion.AllModes(), "|")))
cfg.CompactorRing.RegisterFlagsWithPrefix("boltdb.shipper.compactor.", "collectors/", f)
}
@ -96,6 +99,10 @@ func (cfg *Config) Validate() error {
return errors.New("interval for applying retention should either be set to a 0 or a multiple of compaction interval")
}
if _, err := deletion.ParseMode(cfg.DeletionMode); err != nil {
return err
}
return shipper_util.ValidateSharedStoreKeyPrefix(cfg.SharedStoreKeyPrefix)
}
@ -113,6 +120,7 @@ type Compactor struct {
metrics *metrics
running bool
wg sync.WaitGroup
deleteMode deletion.Mode
// Ring used for running a single compactor
ringLifecycler *ring.BasicLifecycler
@ -180,6 +188,12 @@ func NewCompactor(cfg Config, storageConfig storage.Config, schemaConfig loki_st
compactor.subservicesWatcher = services.NewFailureWatcher()
compactor.subservicesWatcher.WatchManager(compactor.subservices)
mode, err := deletion.ParseMode(cfg.DeletionMode)
if err != nil {
return nil, err
}
compactor.deleteMode = mode
if err := compactor.init(storageConfig, schemaConfig, limits, clientMetrics, r); err != nil {
return nil, err
}
@ -215,18 +229,24 @@ func (c *Compactor) init(storageConfig storage.Config, schemaConfig loki_storage
return err
}
deletionWorkDir := filepath.Join(c.cfg.WorkingDirectory, "deletion")
if c.deleteMode == deletion.WholeStreamDeletion {
deletionWorkDir := filepath.Join(c.cfg.WorkingDirectory, "deletion")
c.deleteRequestsStore, err = deletion.NewDeleteStore(deletionWorkDir, c.indexStorageClient)
if err != nil {
return err
c.deleteRequestsStore, err = deletion.NewDeleteStore(deletionWorkDir, c.indexStorageClient)
if err != nil {
return err
}
c.DeleteRequestsHandler = deletion.NewDeleteRequestHandler(c.deleteRequestsStore, time.Hour, r)
c.deleteRequestsManager = deletion.NewDeleteRequestsManager(c.deleteRequestsStore, c.cfg.DeleteRequestCancelPeriod, r)
c.expirationChecker = newExpirationChecker(retention.NewExpirationChecker(limits), c.deleteRequestsManager)
} else {
c.expirationChecker = newExpirationChecker(
retention.NewExpirationChecker(limits),
// This is a dummy deletion ExpirationChecker that never expires anything
retention.NeverExpiringExpirationChecker(limits),
)
}
c.DeleteRequestsHandler = deletion.NewDeleteRequestHandler(c.deleteRequestsStore, time.Hour, r)
c.deleteRequestsManager = deletion.NewDeleteRequestsManager(c.deleteRequestsStore, c.cfg.DeleteRequestCancelPeriod, r)
c.expirationChecker = newExpirationChecker(retention.NewExpirationChecker(limits), c.deleteRequestsManager)
c.tableMarker, err = retention.NewMarker(retentionWorkDir, schemaConfig, c.expirationChecker, chunkClient, r)
if err != nil {
return err
@ -286,8 +306,12 @@ func (c *Compactor) starting(ctx context.Context) (err error) {
func (c *Compactor) loop(ctx context.Context) error {
if c.cfg.RetentionEnabled {
defer c.deleteRequestsStore.Stop()
defer c.deleteRequestsManager.Stop()
if c.deleteRequestsStore != nil {
defer c.deleteRequestsStore.Stop()
}
if c.deleteRequestsManager != nil {
defer c.deleteRequestsManager.Stop()
}
}
syncTicker := time.NewTicker(c.ringPollPeriod)
@ -530,6 +554,10 @@ func (c *Compactor) RunCompaction(ctx context.Context, applyRetention bool) erro
return firstErr
}
func (c *Compactor) DeleteMode() deletion.Mode {
return c.deleteMode
}
type expirationChecker struct {
retentionExpiryChecker retention.ExpirationChecker
deletionExpiryChecker retention.ExpirationChecker

@ -3,22 +3,30 @@ package deletion
import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql/parser"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention"
)
// DeleteRequest holds all the details about a delete request.
type DeleteRequest struct {
RequestID string `json:"request_id"`
StartTime model.Time `json:"start_time"`
EndTime model.Time `json:"end_time"`
Selectors []string `json:"selectors"`
Query string `json:"query"`
Status DeleteRequestStatus `json:"status"`
CreatedAt model.Time `json:"created_at"`
UserID string `json:"-"`
Matchers [][]*labels.Matcher `json:"-"`
UserID string `json:"-"`
matchers []*labels.Matcher `json:"-"`
}
func (d *DeleteRequest) SetQuery(logQL string) error {
d.Query = logQL
matchers, err := parseDeletionQuery(logQL)
if err != nil {
return err
}
d.matchers = matchers
return nil
}
func (d *DeleteRequest) IsDeleted(entry retention.ChunkEntry) (bool, []model.Interval) {
@ -36,26 +44,7 @@ func (d *DeleteRequest) IsDeleted(entry retention.ChunkEntry) (bool, []model.Int
return false, nil
}
matchers := make([][]*labels.Matcher, len(d.Selectors))
var err error
for j, selector := range d.Selectors {
matchers[j], err = parser.ParseMetricSelector(selector)
if err != nil {
return false, nil
}
}
matches := false
for _, matchers := range matchers {
if labels.Selector(matchers).Matches(entry.Labels) {
matches = true
break
}
}
if !matches {
if !labels.Selector(d.matchers).Matches(entry.Labels) {
return false, nil
}

@ -16,7 +16,7 @@ func TestDeleteRequest_IsDeleted(t *testing.T) {
now := model.Now()
user1 := "user1"
lbls := `{foo="bar", fizz="buzz"}`
lbl := `{foo="bar", fizz="buzz"}`
chunkEntry := retention.ChunkEntry{
ChunkRef: retention.ChunkRef{
@ -24,7 +24,7 @@ func TestDeleteRequest_IsDeleted(t *testing.T) {
From: now.Add(-3 * time.Hour),
Through: now.Add(-time.Hour),
},
Labels: mustParseLabel(lbls),
Labels: mustParseLabel(lbl),
}
type resp struct {
@ -43,7 +43,7 @@ func TestDeleteRequest_IsDeleted(t *testing.T) {
UserID: user1,
StartTime: now.Add(-3 * time.Hour),
EndTime: now.Add(-time.Hour),
Selectors: []string{lbls},
Query: lbl,
},
expectedResp: resp{
isDeleted: true,
@ -56,7 +56,7 @@ func TestDeleteRequest_IsDeleted(t *testing.T) {
UserID: user1,
StartTime: now.Add(-3 * time.Hour),
EndTime: now.Add(-2 * time.Hour),
Selectors: []string{lbls},
Query: lbl,
},
expectedResp: resp{
isDeleted: true,
@ -74,7 +74,7 @@ func TestDeleteRequest_IsDeleted(t *testing.T) {
UserID: user1,
StartTime: now.Add(-2 * time.Hour),
EndTime: now,
Selectors: []string{lbls},
Query: lbl,
},
expectedResp: resp{
isDeleted: true,
@ -92,7 +92,7 @@ func TestDeleteRequest_IsDeleted(t *testing.T) {
UserID: user1,
StartTime: now.Add(-2 * time.Hour),
EndTime: now,
Selectors: []string{lbls},
Query: lbl,
},
expectedResp: resp{
isDeleted: true,
@ -110,7 +110,7 @@ func TestDeleteRequest_IsDeleted(t *testing.T) {
UserID: user1,
StartTime: now.Add(-(2*time.Hour + 30*time.Minute)),
EndTime: now.Add(-(time.Hour + 30*time.Minute)),
Selectors: []string{lbls},
Query: lbl,
},
expectedResp: resp{
isDeleted: true,
@ -132,7 +132,7 @@ func TestDeleteRequest_IsDeleted(t *testing.T) {
UserID: user1,
StartTime: now.Add(-12 * time.Hour),
EndTime: now.Add(-10 * time.Hour),
Selectors: []string{lbls},
Query: lbl,
},
expectedResp: resp{
isDeleted: false,
@ -141,10 +141,10 @@ func TestDeleteRequest_IsDeleted(t *testing.T) {
{
name: "request not matching due to matchers",
deleteRequest: DeleteRequest{
UserID: user1,
UserID: "user1",
StartTime: now.Add(-3 * time.Hour),
EndTime: now.Add(-time.Hour),
Selectors: []string{`{foo1="bar"}`, `{fizz1="buzz"}`},
Query: `{foo1="bar"}`,
},
expectedResp: resp{
isDeleted: false,
@ -156,7 +156,7 @@ func TestDeleteRequest_IsDeleted(t *testing.T) {
UserID: "user2",
StartTime: now.Add(-3 * time.Hour),
EndTime: now.Add(-time.Hour),
Selectors: []string{lbls},
Query: lbl,
},
expectedResp: resp{
isDeleted: false,
@ -164,6 +164,7 @@ func TestDeleteRequest_IsDeleted(t *testing.T) {
},
} {
t.Run(tc.name, func(t *testing.T) {
require.NoError(t, tc.deleteRequest.SetQuery(tc.deleteRequest.Query))
isDeleted, nonDeletedIntervals := tc.deleteRequest.IsDeleted(chunkEntry)
require.Equal(t, tc.expectedResp.isDeleted, isDeleted)
require.Equal(t, tc.expectedResp.nonDeletedIntervals, nonDeletedIntervals)

@ -26,7 +26,7 @@ func (m mockDeleteRequestsStore) UpdateStatus(ctx context.Context, userID, reque
return nil
}
func (m mockDeleteRequestsStore) AddDeleteRequest(ctx context.Context, userID string, startTime, endTime model.Time, selectors []string) error {
func (m mockDeleteRequestsStore) AddDeleteRequest(ctx context.Context, userID string, startTime, endTime model.Time, query string) error {
panic("implement me")
}
@ -90,7 +90,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
deleteRequestsFromStore: []DeleteRequest{
{
UserID: "different-user",
Selectors: []string{lblFoo.String()},
Query: lblFoo.String(),
StartTime: now.Add(-24 * time.Hour),
EndTime: now,
},
@ -105,7 +105,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
deleteRequestsFromStore: []DeleteRequest{
{
UserID: testUserID,
Selectors: []string{lblFoo.String()},
Query: lblFoo.String(),
StartTime: now.Add(-24 * time.Hour),
EndTime: now,
},
@ -120,7 +120,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
deleteRequestsFromStore: []DeleteRequest{
{
UserID: testUserID,
Selectors: []string{lblFoo.String()},
Query: lblFoo.String(),
StartTime: now.Add(-48 * time.Hour),
EndTime: now.Add(-24 * time.Hour),
},
@ -135,13 +135,13 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
deleteRequestsFromStore: []DeleteRequest{
{
UserID: testUserID,
Selectors: []string{lblFoo.String()},
Query: lblFoo.String(),
StartTime: now.Add(-48 * time.Hour),
EndTime: now.Add(-24 * time.Hour),
},
{
UserID: testUserID,
Selectors: []string{lblFoo.String()},
Query: lblFoo.String(),
StartTime: now.Add(-12 * time.Hour),
EndTime: now,
},
@ -156,25 +156,25 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
deleteRequestsFromStore: []DeleteRequest{
{
UserID: testUserID,
Selectors: []string{lblFoo.String()},
Query: lblFoo.String(),
StartTime: now.Add(-13 * time.Hour),
EndTime: now.Add(-11 * time.Hour),
},
{
UserID: testUserID,
Selectors: []string{lblFoo.String()},
Query: lblFoo.String(),
StartTime: now.Add(-10 * time.Hour),
EndTime: now.Add(-8 * time.Hour),
},
{
UserID: testUserID,
Selectors: []string{lblFoo.String()},
Query: lblFoo.String(),
StartTime: now.Add(-6 * time.Hour),
EndTime: now.Add(-5 * time.Hour),
},
{
UserID: testUserID,
Selectors: []string{lblFoo.String()},
Query: lblFoo.String(),
StartTime: now.Add(-2 * time.Hour),
EndTime: now,
},
@ -202,13 +202,13 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
deleteRequestsFromStore: []DeleteRequest{
{
UserID: testUserID,
Selectors: []string{lblFoo.String()},
Query: lblFoo.String(),
StartTime: now.Add(-13 * time.Hour),
EndTime: now.Add(-6 * time.Hour),
},
{
UserID: testUserID,
Selectors: []string{lblFoo.String()},
Query: lblFoo.String(),
StartTime: now.Add(-8 * time.Hour),
EndTime: now,
},
@ -223,19 +223,19 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
deleteRequestsFromStore: []DeleteRequest{
{
UserID: testUserID,
Selectors: []string{lblFoo.String()},
Query: lblFoo.String(),
StartTime: now.Add(-12 * time.Hour),
EndTime: now.Add(-6*time.Hour) - 1,
},
{
UserID: testUserID,
Selectors: []string{lblFoo.String()},
Query: lblFoo.String(),
StartTime: now.Add(-6 * time.Hour),
EndTime: now.Add(-4*time.Hour) - 1,
},
{
UserID: testUserID,
Selectors: []string{lblFoo.String()},
Query: lblFoo.String(),
StartTime: now.Add(-4 * time.Hour),
EndTime: now,
},

@ -27,8 +27,6 @@ const (
StatusReceived DeleteRequestStatus = "received"
StatusProcessed DeleteRequestStatus = "processed"
separator = "\000" // separator for series selectors in delete requests
deleteRequestID indexType = "1"
deleteRequestDetails indexType = "2"
@ -39,7 +37,7 @@ const (
var ErrDeleteRequestNotFound = errors.New("could not find matching delete request")
type DeleteRequestsStore interface {
AddDeleteRequest(ctx context.Context, userID string, startTime, endTime model.Time, selectors []string) error
AddDeleteRequest(ctx context.Context, userID string, startTime, endTime model.Time, query string) error
GetDeleteRequestsByStatus(ctx context.Context, status DeleteRequestStatus) ([]DeleteRequest, error)
GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]DeleteRequest, error)
UpdateStatus(ctx context.Context, userID, requestID string, newStatus DeleteRequestStatus) error
@ -72,14 +70,14 @@ func (ds *deleteRequestsStore) Stop() {
}
// AddDeleteRequest creates entries for a new delete request.
func (ds *deleteRequestsStore) AddDeleteRequest(ctx context.Context, userID string, startTime, endTime model.Time, selectors []string) error {
_, err := ds.addDeleteRequest(ctx, userID, model.Now(), startTime, endTime, selectors)
func (ds *deleteRequestsStore) AddDeleteRequest(ctx context.Context, userID string, startTime, endTime model.Time, query string) error {
_, err := ds.addDeleteRequest(ctx, userID, model.Now(), startTime, endTime, query)
return err
}
// addDeleteRequest is also used for tests to create delete requests with different createdAt time.
func (ds *deleteRequestsStore) addDeleteRequest(ctx context.Context, userID string, createdAt, startTime, endTime model.Time, selectors []string) ([]byte, error) {
requestID := generateUniqueID(userID, selectors)
func (ds *deleteRequestsStore) addDeleteRequest(ctx context.Context, userID string, createdAt, startTime, endTime model.Time, query string) ([]byte, error) {
requestID := generateUniqueID(userID, query)
for {
_, err := ds.GetDeleteRequest(ctx, userID, string(requestID))
@ -92,7 +90,7 @@ func (ds *deleteRequestsStore) addDeleteRequest(ctx context.Context, userID stri
// we have a collision here, lets recreate a new requestID and check for collision
time.Sleep(time.Millisecond)
requestID = generateUniqueID(userID, selectors)
requestID = generateUniqueID(userID, query)
}
// userID, requestID
@ -103,10 +101,10 @@ func (ds *deleteRequestsStore) addDeleteRequest(ctx context.Context, userID stri
writeBatch := ds.indexClient.NewWriteBatch()
writeBatch.Add(DeleteRequestsTableName, string(deleteRequestID), []byte(userIDAndRequestID), []byte(StatusReceived))
// Add another entry with additional details like creation time, time range of delete request and selectors in value
// Add another entry with additional details like creation time, time range of delete request and the logQL requests in value
rangeValue := fmt.Sprintf("%x:%x:%x", int64(createdAt), int64(startTime), int64(endTime))
writeBatch.Add(DeleteRequestsTableName, fmt.Sprintf("%s:%s", deleteRequestDetails, userIDAndRequestID),
[]byte(rangeValue), []byte(strings.Join(selectors, separator)))
[]byte(rangeValue), []byte(query))
err := ds.indexClient.BatchWrite(ctx, writeBatch)
if err != nil {
@ -203,7 +201,11 @@ func (ds *deleteRequestsStore) queryDeleteRequests(ctx context.Context, deleteQu
return false
}
deleteRequest.Selectors = strings.Split(string(itr.Value()), separator)
err = deleteRequest.SetQuery(string(itr.Value()))
if err != nil {
parseError = err
return false
}
deleteRequests[i] = deleteRequest
return true
@ -263,7 +265,7 @@ func parseDeleteRequestTimestamps(rangeValue []byte, deleteRequest DeleteRequest
}
// An id is useful in managing delete requests
func generateUniqueID(orgID string, selectors []string) []byte {
func generateUniqueID(orgID string, query string) []byte {
uniqueID := fnv.New32()
_, _ = uniqueID.Write([]byte(orgID))
@ -271,9 +273,7 @@ func generateUniqueID(orgID string, selectors []string) []byte {
binary.LittleEndian.PutUint64(timeNow, uint64(time.Now().UnixNano()))
_, _ = uniqueID.Write(timeNow)
for _, selector := range selectors {
_, _ = uniqueID.Write([]byte(selector))
}
_, _ = uniqueID.Write([]byte(query))
return encodeUniqueID(uniqueID.Sum32())
}

@ -30,7 +30,7 @@ func TestDeleteRequestsStore(t *testing.T) {
StartTime: now.Add(-i * time.Hour),
EndTime: now.Add(-i * time.Hour).Add(30 * time.Minute),
CreatedAt: now.Add(-i * time.Hour).Add(30 * time.Minute),
Selectors: []string{fmt.Sprintf(`{foo="%d", user="%s"}`, i, user1)},
Query: fmt.Sprintf(`{foo="%d", user="%s"}`, i, user1),
Status: StatusReceived,
})
user2ExpectedRequests = append(user2ExpectedRequests, DeleteRequest{
@ -38,7 +38,7 @@ func TestDeleteRequestsStore(t *testing.T) {
StartTime: now.Add(-i * time.Hour),
EndTime: now.Add(-(i + 1) * time.Hour),
CreatedAt: now.Add(-(i + 1) * time.Hour),
Selectors: []string{fmt.Sprintf(`{foo="%d", user="%s"}`, i, user2)},
Query: fmt.Sprintf(`{foo="%d", user="%s"}`, i, user2),
Status: StatusReceived,
})
}
@ -66,10 +66,11 @@ func TestDeleteRequestsStore(t *testing.T) {
user1ExpectedRequests[i].CreatedAt,
user1ExpectedRequests[i].StartTime,
user1ExpectedRequests[i].EndTime,
user1ExpectedRequests[i].Selectors,
user1ExpectedRequests[i].Query,
)
require.NoError(t, err)
user1ExpectedRequests[i].RequestID = string(requestID)
require.NoError(t, user1ExpectedRequests[i].SetQuery(user1ExpectedRequests[i].Query))
requestID, err = testDeleteRequestsStore.(*deleteRequestsStore).addDeleteRequest(
context.Background(),
@ -77,10 +78,11 @@ func TestDeleteRequestsStore(t *testing.T) {
user2ExpectedRequests[i].CreatedAt,
user2ExpectedRequests[i].StartTime,
user2ExpectedRequests[i].EndTime,
user2ExpectedRequests[i].Selectors,
user2ExpectedRequests[i].Query,
)
require.NoError(t, err)
user2ExpectedRequests[i].RequestID = string(requestID)
require.NoError(t, user2ExpectedRequests[i].SetQuery(user2ExpectedRequests[i].Query))
}
// get all requests with StatusReceived and see if they have expected values

@ -0,0 +1,50 @@
package deletion
import (
"errors"
)
type Mode int16
var (
errUnknownMode = errors.New("unknown deletion mode")
)
const (
Disabled Mode = iota
WholeStreamDeletion // The existing log deletion that removes whole streams.
FilterOnly
FilterAndDelete
)
func (m Mode) String() string {
switch m {
case Disabled:
return "disabled"
case WholeStreamDeletion:
return "whole-stream-deletion"
case FilterOnly:
return "filter-only"
case FilterAndDelete:
return "filter-and-delete"
}
return "unknown"
}
func AllModes() []string {
return []string{Disabled.String(), WholeStreamDeletion.String(), FilterOnly.String(), FilterAndDelete.String()}
}
func ParseMode(in string) (Mode, error) {
switch in {
case "disabled":
return Disabled, nil
case "whole-stream-deletion":
return WholeStreamDeletion, nil
case "filter-only":
return FilterOnly, nil
case "filter-and-delete":
return FilterAndDelete, nil
}
return 0, errUnknownMode
}

@ -0,0 +1,33 @@
package deletion
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestAllModes(t *testing.T) {
modes := AllModes()
require.ElementsMatch(t, []string{"disabled", "whole-stream-deletion", "filter-only", "filter-and-delete"}, modes)
}
func TestParseMode(t *testing.T) {
mode, err := ParseMode("disabled")
require.NoError(t, err)
require.Equal(t, Disabled, mode)
mode, err = ParseMode("whole-stream-deletion")
require.NoError(t, err)
require.Equal(t, WholeStreamDeletion, mode)
mode, err = ParseMode("filter-only")
require.NoError(t, err)
require.Equal(t, FilterOnly, mode)
mode, err = ParseMode("filter-and-delete")
require.NoError(t, err)
require.Equal(t, FilterAndDelete, mode)
_, err = ParseMode("something-else")
require.ErrorIs(t, errUnknownMode, err)
}

@ -12,7 +12,7 @@ func NewNoOpDeleteRequestsStore() DeleteRequestsStore {
type noOpDeleteRequestsStore struct{}
func (d *noOpDeleteRequestsStore) AddDeleteRequest(ctx context.Context, userID string, startTime, endTime model.Time, selectors []string) error {
func (d *noOpDeleteRequestsStore) AddDeleteRequest(ctx context.Context, userID string, startTime, endTime model.Time, query string) error {
return nil
}

@ -9,7 +9,6 @@ import (
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/promql/parser"
"github.com/grafana/dskit/tenant"
@ -35,7 +34,7 @@ func NewDeleteRequestHandler(deleteStore DeleteRequestsStore, deleteRequestCance
return &deleteMgr
}
// AddDeleteRequestHandler handles addition of new delete request
// AddDeleteRequestHandler handles addition of a new delete request
func (dm *DeleteRequestHandler) AddDeleteRequestHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
userID, err := tenant.TenantID(ctx)
@ -45,18 +44,16 @@ func (dm *DeleteRequestHandler) AddDeleteRequestHandler(w http.ResponseWriter, r
}
params := r.URL.Query()
match := params["match[]"]
if len(match) == 0 {
http.Error(w, "selectors not set", http.StatusBadRequest)
query := params.Get("query")
if len(query) == 0 {
http.Error(w, "query not set", http.StatusBadRequest)
return
}
for i := range match {
_, err := parser.ParseMetricSelector(match[i])
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
_, err = parseDeletionQuery(query)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
startParam := params.Get("start")
@ -90,7 +87,7 @@ func (dm *DeleteRequestHandler) AddDeleteRequestHandler(w http.ResponseWriter, r
return
}
if err := dm.deleteRequestsStore.AddDeleteRequest(ctx, userID, model.Time(startTime), model.Time(endTime), match); err != nil {
if err := dm.deleteRequestsStore.AddDeleteRequest(ctx, userID, model.Time(startTime), model.Time(endTime), query); err != nil {
level.Error(util_log.Logger).Log("msg", "error adding delete request to the store", "err", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return

@ -0,0 +1,28 @@
package deletion
import (
"errors"
"github.com/prometheus/prometheus/model/labels"
"github.com/grafana/loki/pkg/logql/syntax"
)
var (
errInvalidQuery = errors.New("invalid query expression")
errUnsupportedQuery = errors.New("unsupported query expression")
)
// parseDeletionQuery checks if the given logQL is valid for deletions
func parseDeletionQuery(query string) ([]*labels.Matcher, error) {
expr, err := syntax.ParseExpr(query)
if err != nil {
return nil, errInvalidQuery
}
if matchersExpr, ok := expr.(*syntax.MatchersExpr); ok {
return matchersExpr.Matchers(), nil
}
return nil, errUnsupportedQuery
}

@ -0,0 +1,39 @@
package deletion
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestParseLogQLExpressionForDeletion(t *testing.T) {
t.Run("invalid logql", func(t *testing.T) {
matchers, err := parseDeletionQuery("gjgjg ggj")
require.Nil(t, matchers)
require.ErrorIs(t, err, errInvalidQuery)
})
t.Run("matcher expression", func(t *testing.T) {
matchers, err := parseDeletionQuery(`{env="dev", secret="true"}`)
require.NotNil(t, matchers)
require.NoError(t, err)
})
t.Run("pipeline expression with line filter", func(t *testing.T) {
matchers, err := parseDeletionQuery(`{env="dev", secret="true"} |= "social sec number"`)
require.Nil(t, matchers)
require.ErrorIs(t, err, errUnsupportedQuery)
})
t.Run("pipeline expression with label filter ", func(t *testing.T) {
matchers, err := parseDeletionQuery(`{env="dev", secret="true"} | json bob="top.params[0]"`)
require.Nil(t, matchers)
require.ErrorIs(t, err, errUnsupportedQuery)
})
t.Run("metrics query", func(t *testing.T) {
matchers, err := parseDeletionQuery(`count_over_time({job="mysql"}[5m])`)
require.Nil(t, matchers)
require.ErrorIs(t, err, errUnsupportedQuery)
})
}

@ -81,6 +81,26 @@ func (e *expirationChecker) IntervalMayHaveExpiredChunks(interval model.Interval
return interval.Start.Before(latestRetentionStartTime)
}
// NeverExpiringExpirationChecker returns an expiration checker that never expires anything
func NeverExpiringExpirationChecker(limits Limits) ExpirationChecker {
return &neverExpiringExpirationChecker{}
}
type neverExpiringExpirationChecker struct{}
func (e *neverExpiringExpirationChecker) Expired(ref ChunkEntry, now model.Time) (bool, []model.Interval) {
return false, nil
}
func (e *neverExpiringExpirationChecker) IntervalMayHaveExpiredChunks(interval model.Interval, userID string) bool {
return false
}
func (e *neverExpiringExpirationChecker) MarkPhaseStarted() {}
func (e *neverExpiringExpirationChecker) MarkPhaseFailed() {}
func (e *neverExpiringExpirationChecker) MarkPhaseFinished() {}
func (e *neverExpiringExpirationChecker) DropFromIndex(ref ChunkEntry, tableEndTime model.Time, now model.Time) bool {
return false
}
type TenantsRetention struct {
limits Limits
}

Loading…
Cancel
Save