Compactor: add per tenant compaction delete enabled flag (#6410)

* Add per tenant compaction delete enabled flag

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Remove changes in wrong place

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Add compactor deletion enabled field

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Use limit in compactor

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Use http middleware and add test

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Fix lint issue

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Add changelog

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Revert to default setting if no override

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Add default value command line option

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Update the docs

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Enable access to deletion API for integration test

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Rename flag to allow_deletes

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Update per review comments

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>
pull/6415/head
Michel Hollands 3 years ago committed by GitHub
parent 8ee0d62d9e
commit b4e6c59936
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 4
      docs/sources/configuration/_index.md
  3. 3
      docs/sources/operations/storage/logs-deletion.md
  4. 1
      integration/loki_micro_services_delete_test.go
  5. 8
      pkg/loki/modules.go
  6. 1
      pkg/storage/stores/shipper/compactor/compactor.go
  7. 62
      pkg/storage/stores/shipper/compactor/deletion/request_handler.go
  8. 133
      pkg/storage/stores/shipper/compactor/deletion/request_handler_test.go
  9. 8
      pkg/validation/limits.go

@ -1,5 +1,6 @@
## Main
* [6410](https://github.com/grafana/loki/pull/6410) **MichelHollands**: Add support for per tenant delete API access enabling.
* [6372](https://github.com/grafana/loki/pull/6372) **splitice**: Add support for numbers in JSON fields.
* [6105](https://github.com/grafana/loki/pull/6105) **rutgerke** Export metrics for the Promtail journal target.
* [6099](https://github.com/grafana/loki/pull/6099) **cstyan**: Drop lines with malformed JSON in Promtail JSON pipeline stage.

@ -2372,6 +2372,10 @@ The `limits_config` block configures global and per-tenant limits in Loki.
# This also determines how cache keys are chosen when result caching is enabled
# CLI flag: -querier.split-queries-by-interval
[split_queries_by_interval: <duration> | default = 30m]
# When true, access to the deletion API is enabled.
# CLI flag: -compactor.allow_deletes
[allow_deletes: <boolean> | default = false]
```
## sigv4_config

@ -24,9 +24,10 @@ With `whole-stream-deletion`, all the log entries matching the query given in th
With `filter-only`, log lines matching the query in the delete request are filtered out when querying Loki. They are not removed from the on-disk chunks.
With `filter-and-delete`, log lines matching the query in the delete request are filtered out when querying Loki, and they are also removed from the on-disk chunks.
A delete request may be canceled within a configurable cancellation period. Set the `delete_request_cancel_period` in the Compactor's YAML configuration or on the command line when invoking Loki. Its default value is 24h.
Access to the deletion API can be enabled per tenant via the `allow_deletes` setting.
## Compactor endpoints
The Compactor exposes endpoints to allow for the deletion of log entries from specified streams.

@ -26,6 +26,7 @@ func TestMicroServicesDeleteRequest(t *testing.T) {
"-boltdb.shipper.compactor.deletion-mode=filter-and-delete",
// By default a minute is added to the delete request start time. This compensates for that.
"-boltdb.shipper.compactor.delete-request-cancel-period=-60s",
"-compactor.allow-deletes=true",
)
tIndexGateway = clu.AddComponent(
"index-gateway",

@ -873,10 +873,10 @@ func (t *Loki) initCompactor() (services.Service, error) {
if t.Cfg.CompactorConfig.RetentionEnabled {
switch t.compactor.DeleteMode() {
case deletion.WholeStreamDeletion, deletion.FilterOnly, deletion.FilterAndDelete:
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("PUT", "POST").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.AddDeleteRequestHandler)))
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.GetAllDeleteRequestsHandler)))
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("DELETE").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.CancelDeleteRequestHandler)))
t.Server.HTTP.Path("/loki/api/v1/cache/generation_numbers").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.GetCacheGenerationNumberHandler)))
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("PUT", "POST").Handler(t.HTTPAuthMiddleware.Wrap(t.compactor.DeleteRequestsHandler.AddDeleteRequestHandler()))
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(t.compactor.DeleteRequestsHandler.GetAllDeleteRequestsHandler()))
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("DELETE").Handler(t.HTTPAuthMiddleware.Wrap(t.compactor.DeleteRequestsHandler.CancelDeleteRequestHandler()))
t.Server.HTTP.Path("/loki/api/v1/cache/generation_numbers").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(t.compactor.DeleteRequestsHandler.GetCacheGenerationNumberHandler()))
default:
break
}

@ -265,6 +265,7 @@ func (c *Compactor) initDeletes(r prometheus.Registerer, limits retention.Limits
c.DeleteRequestsHandler = deletion.NewDeleteRequestHandler(
c.deleteRequestsStore,
c.cfg.DeleteRequestCancelPeriod,
limits,
r,
)

@ -12,22 +12,27 @@ import (
"github.com/grafana/dskit/tenant"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention"
"github.com/grafana/loki/pkg/util"
util_log "github.com/grafana/loki/pkg/util/log"
)
const deletionNotAvailableMsg = "deletion is not available for this tenant"
// DeleteRequestHandler provides handlers for delete requests
type DeleteRequestHandler struct {
deleteRequestsStore DeleteRequestsStore
metrics *deleteRequestHandlerMetrics
limits retention.Limits
deleteRequestCancelPeriod time.Duration
}
// NewDeleteRequestHandler creates a DeleteRequestHandler
func NewDeleteRequestHandler(deleteStore DeleteRequestsStore, deleteRequestCancelPeriod time.Duration, registerer prometheus.Registerer) *DeleteRequestHandler {
func NewDeleteRequestHandler(deleteStore DeleteRequestsStore, deleteRequestCancelPeriod time.Duration, limits retention.Limits, registerer prometheus.Registerer) *DeleteRequestHandler {
deleteMgr := DeleteRequestHandler{
deleteRequestsStore: deleteStore,
deleteRequestCancelPeriod: deleteRequestCancelPeriod,
limits: limits,
metrics: newDeleteRequestHandlerMetrics(registerer),
}
@ -35,7 +40,12 @@ func NewDeleteRequestHandler(deleteStore DeleteRequestsStore, deleteRequestCance
}
// AddDeleteRequestHandler handles addition of a new delete request
func (dm *DeleteRequestHandler) AddDeleteRequestHandler(w http.ResponseWriter, r *http.Request) {
func (dm *DeleteRequestHandler) AddDeleteRequestHandler() http.Handler {
return dm.deletionMiddleware(http.HandlerFunc(dm.addDeleteRequestHandler))
}
// AddDeleteRequestHandler handles addition of a new delete request
func (dm *DeleteRequestHandler) addDeleteRequestHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
userID, err := tenant.TenantID(ctx)
if err != nil {
@ -98,7 +108,12 @@ func (dm *DeleteRequestHandler) AddDeleteRequestHandler(w http.ResponseWriter, r
}
// GetAllDeleteRequestsHandler handles get all delete requests
func (dm *DeleteRequestHandler) GetAllDeleteRequestsHandler(w http.ResponseWriter, r *http.Request) {
func (dm *DeleteRequestHandler) GetAllDeleteRequestsHandler() http.Handler {
return dm.deletionMiddleware(http.HandlerFunc(dm.getAllDeleteRequestsHandler))
}
// GetAllDeleteRequestsHandler handles get all delete requests
func (dm *DeleteRequestHandler) getAllDeleteRequestsHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
userID, err := tenant.TenantID(ctx)
if err != nil {
@ -120,7 +135,12 @@ func (dm *DeleteRequestHandler) GetAllDeleteRequestsHandler(w http.ResponseWrite
}
// CancelDeleteRequestHandler handles delete request cancellation
func (dm *DeleteRequestHandler) CancelDeleteRequestHandler(w http.ResponseWriter, r *http.Request) {
func (dm *DeleteRequestHandler) CancelDeleteRequestHandler() http.Handler {
return dm.deletionMiddleware(http.HandlerFunc(dm.cancelDeleteRequestHandler))
}
// CancelDeleteRequestHandler handles delete request cancellation
func (dm *DeleteRequestHandler) cancelDeleteRequestHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
userID, err := tenant.TenantID(ctx)
if err != nil {
@ -163,7 +183,12 @@ func (dm *DeleteRequestHandler) CancelDeleteRequestHandler(w http.ResponseWriter
}
// GetCacheGenerationNumberHandler handles requests for a user's cache generation number
func (dm *DeleteRequestHandler) GetCacheGenerationNumberHandler(w http.ResponseWriter, r *http.Request) {
func (dm *DeleteRequestHandler) GetCacheGenerationNumberHandler() http.Handler {
return dm.deletionMiddleware(http.HandlerFunc(dm.getCacheGenerationNumberHandler))
}
// GetCacheGenerationNumberHandler handles requests for a user's cache generation number
func (dm *DeleteRequestHandler) getCacheGenerationNumberHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
userID, err := tenant.TenantID(ctx)
if err != nil {
@ -183,3 +208,30 @@ func (dm *DeleteRequestHandler) GetCacheGenerationNumberHandler(w http.ResponseW
http.Error(w, fmt.Sprintf("Error marshalling response: %v", err), http.StatusInternalServerError)
}
}
func (dm *DeleteRequestHandler) deletionMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
userID, err := tenant.TenantID(ctx)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
allLimits := dm.limits.AllByUserID()
userLimits, ok := allLimits[userID]
if ok {
if !userLimits.CompactorDeletionEnabled {
http.Error(w, deletionNotAvailableMsg, http.StatusForbidden)
return
}
} else {
if !dm.limits.DefaultLimits().CompactorDeletionEnabled {
http.Error(w, deletionNotAvailableMsg, http.StatusForbidden)
return
}
}
next.ServeHTTP(w, r)
})
}

@ -0,0 +1,133 @@
package deletion
import (
"net/http"
"net/http/httptest"
"path/filepath"
"testing"
"time"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/user"
"github.com/grafana/loki/pkg/storage/chunk/client/local"
"github.com/grafana/loki/pkg/storage/stores/shipper/storage"
"github.com/grafana/loki/pkg/validation"
)
type retentionLimit struct {
compactorDeletionEnabled bool
retentionPeriod time.Duration
streamRetention []validation.StreamRetention
}
func (r retentionLimit) convertToValidationLimit() *validation.Limits {
return &validation.Limits{
CompactorDeletionEnabled: r.compactorDeletionEnabled,
RetentionPeriod: model.Duration(r.retentionPeriod),
StreamRetention: r.streamRetention,
}
}
type fakeLimits struct {
defaultLimit retentionLimit
perTenant map[string]retentionLimit
}
func (f fakeLimits) RetentionPeriod(userID string) time.Duration {
return f.perTenant[userID].retentionPeriod
}
func (f fakeLimits) StreamRetention(userID string) []validation.StreamRetention {
return f.perTenant[userID].streamRetention
}
func (f fakeLimits) CompactorDeletionEnabled(userID string) bool {
return f.perTenant[userID].compactorDeletionEnabled
}
func (f fakeLimits) DefaultLimits() *validation.Limits {
return f.defaultLimit.convertToValidationLimit()
}
func (f fakeLimits) AllByUserID() map[string]*validation.Limits {
res := make(map[string]*validation.Limits)
for userID, ret := range f.perTenant {
res[userID] = ret.convertToValidationLimit()
}
return res
}
func TestDeleteRequestHandlerDeletionMiddleware(t *testing.T) {
// build the store
tempDir := t.TempDir()
workingDir := filepath.Join(tempDir, "working-dir")
objectStorePath := filepath.Join(tempDir, "object-store")
objectClient, err := local.NewFSObjectClient(local.FSConfig{
Directory: objectStorePath,
})
require.NoError(t, err)
testDeleteRequestsStore, err := NewDeleteStore(workingDir, storage.NewIndexStorageClient(objectClient, ""))
require.NoError(t, err)
// limits
fl := &fakeLimits{
perTenant: map[string]retentionLimit{
"1": {compactorDeletionEnabled: true},
"2": {compactorDeletionEnabled: false},
},
}
// Setup handler
drh := NewDeleteRequestHandler(testDeleteRequestsStore, 10*time.Second, fl, nil)
middle := drh.deletionMiddleware(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {}))
// User that has deletion enabled
req := httptest.NewRequest(http.MethodGet, "http://www.your-domain.com", nil)
req = req.WithContext(user.InjectOrgID(req.Context(), "1"))
res := httptest.NewRecorder()
middle.ServeHTTP(res, req)
require.Equal(t, http.StatusOK, res.Result().StatusCode)
// User that does not have deletion enabled
req = httptest.NewRequest(http.MethodGet, "http://www.your-domain.com", nil)
req = req.WithContext(user.InjectOrgID(req.Context(), "2"))
res = httptest.NewRecorder()
middle.ServeHTTP(res, req)
require.Equal(t, http.StatusForbidden, res.Result().StatusCode)
// User without override, this should use the default value which is false
req = httptest.NewRequest(http.MethodGet, "http://www.your-domain.com", nil)
req = req.WithContext(user.InjectOrgID(req.Context(), "3"))
res = httptest.NewRecorder()
middle.ServeHTTP(res, req)
require.Equal(t, http.StatusForbidden, res.Result().StatusCode)
// User without override, after the default value is set to true
fl.defaultLimit.compactorDeletionEnabled = true
req = httptest.NewRequest(http.MethodGet, "http://www.your-domain.com", nil)
req = req.WithContext(user.InjectOrgID(req.Context(), "3"))
res = httptest.NewRecorder()
middle.ServeHTTP(res, req)
require.Equal(t, http.StatusOK, res.Result().StatusCode)
// User header is not given
req = httptest.NewRequest(http.MethodGet, "http://www.your-domain.com", nil)
res = httptest.NewRecorder()
middle.ServeHTTP(res, req)
require.Equal(t, http.StatusBadRequest, res.Result().StatusCode)
}

@ -111,6 +111,8 @@ type Limits struct {
RulerRemoteWriteQueueRetryOnRateLimit bool `yaml:"ruler_remote_write_queue_retry_on_ratelimit" json:"ruler_remote_write_queue_retry_on_ratelimit"`
RulerRemoteWriteSigV4Config *sigv4.SigV4Config `yaml:"ruler_remote_write_sigv4_config" json:"ruler_remote_write_sigv4_config"`
CompactorDeletionEnabled bool `yaml:"allow_deletes" json:"allow_deletes"`
// Global and per tenant retention
RetentionPeriod model.Duration `yaml:"retention_period" json:"retention_period"`
StreamRetention []StreamRetention `yaml:"retention_stream,omitempty" json:"retention_stream,omitempty"`
@ -193,6 +195,8 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
_ = l.QuerySplitDuration.Set("30m")
f.Var(&l.QuerySplitDuration, "querier.split-queries-by-interval", "Split queries by an interval and execute in parallel, 0 disables it. This also determines how cache keys are chosen when result caching is enabled")
f.BoolVar(&l.CompactorDeletionEnabled, "compactor.allow-deletes", false, "Enable access to the deletion API.")
}
// UnmarshalYAML implements the yaml.Unmarshaler interface.
@ -532,6 +536,10 @@ func (o *Overrides) UnorderedWrites(userID string) bool {
return o.getOverridesForUser(userID).UnorderedWrites
}
func (o *Overrides) CompactorDeletionEnabled(userID string) bool {
return o.getOverridesForUser(userID).CompactorDeletionEnabled
}
func (o *Overrides) DefaultLimits() *Limits {
return o.defaultLimits
}

Loading…
Cancel
Save