Add delete api validations (#6860)

* break out the middleware

* Add validation to the API

- Time must be in RFC3339 or a 10-digit-unix-seconds timestamp
- Start time must always exist

* lint

* docs

* improve tests

* cleanup

* cleanup

* access runtime config via function on validation
pull/6872/head
Travis Patterson 3 years ago committed by GitHub
parent b114dc93d6
commit 983ab80e7c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      docs/sources/api/_index.md
  2. 2
      integration/loki_micro_services_delete_test.go
  3. 15
      pkg/loki/modules.go
  4. 10
      pkg/storage/stores/indexshipper/compactor/compactor.go
  5. 4
      pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_manager.go
  6. 23
      pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_manager_test.go
  7. 176
      pkg/storage/stores/indexshipper/compactor/deletion/request_handler.go
  8. 188
      pkg/storage/stores/indexshipper/compactor/deletion/request_handler_test.go
  9. 12
      pkg/storage/stores/indexshipper/compactor/deletion/tenant_delete_requests_client.go
  10. 27
      pkg/storage/stores/indexshipper/compactor/deletion/tenant_delete_requests_client_test.go
  11. 31
      pkg/storage/stores/indexshipper/compactor/deletion/tenant_request_handler.go
  12. 83
      pkg/storage/stores/indexshipper/compactor/deletion/tenant_request_handler_test.go
  13. 13
      pkg/storage/stores/indexshipper/compactor/deletion/util.go

@ -1034,8 +1034,8 @@ Log entry deletion is supported _only_ when the BoltDB Shipper is configured for
Query parameters:
* `query=<series_selector>`: query argument that identifies the streams from which to delete with optional line filters.
* `start=<rfc3339 | unix_timestamp>`: A timestamp that identifies the start of the time window within which entries will be deleted. If not specified, defaults to 0, the Unix Epoch time.
* `end=<rfc3339 | unix_timestamp>`: A timestamp that identifies the end of the time window within which entries will be deleted. If not specified, defaults to the current time.
* `start=<rfc3339 | unix_seconds_timestamp>`: A timestamp that identifies the start of the time window within which entries will be deleted. This parameter is required.
* `end=<rfc3339 | unix_seconds_timestamp>`: A timestamp that identifies the end of the time window within which entries will be deleted. If not specified, defaults to the current time.
A 204 response indicates success.

@ -107,7 +107,7 @@ func TestMicroServicesDeleteRequest(t *testing.T) {
})
t.Run("add-delete-request", func(t *testing.T) {
params := client.DeleteRequestParams{Query: `{job="fake"} |= "lineB"`}
params := client.DeleteRequestParams{Start: "0000000000", Query: `{job="fake"} |= "lineB"`}
require.NoError(t, cliCompactor.AddDeleteRequest(params))
})

@ -52,7 +52,6 @@ import (
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/deletion"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/generationnumber"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/retention"
"github.com/grafana/loki/pkg/storage/stores/series/index"
shipper_index "github.com/grafana/loki/pkg/storage/stores/shipper/index"
boltdb_shipper_compactor "github.com/grafana/loki/pkg/storage/stores/shipper/index/compactor"
@ -937,15 +936,19 @@ func (t *Loki) initCompactor() (services.Service, error) {
t.Server.HTTP.Path("/compactor/ring").Methods("GET", "POST").Handler(t.compactor)
if t.Cfg.CompactorConfig.RetentionEnabled {
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("PUT", "POST").Handler(t.HTTPAuthMiddleware.Wrap(t.compactor.DeleteRequestsHandler.AddDeleteRequestHandler()))
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(t.compactor.DeleteRequestsHandler.GetAllDeleteRequestsHandler()))
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("DELETE").Handler(t.HTTPAuthMiddleware.Wrap(t.compactor.DeleteRequestsHandler.CancelDeleteRequestHandler()))
t.Server.HTTP.Path("/loki/api/v1/cache/generation_numbers").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(t.compactor.DeleteRequestsHandler.GetCacheGenerationNumberHandler()))
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("PUT", "POST").Handler(t.addCompactorMiddleware(t.compactor.DeleteRequestsHandler.AddDeleteRequestHandler))
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("GET").Handler(t.addCompactorMiddleware(t.compactor.DeleteRequestsHandler.GetAllDeleteRequestsHandler))
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("DELETE").Handler(t.addCompactorMiddleware(t.compactor.DeleteRequestsHandler.CancelDeleteRequestHandler))
t.Server.HTTP.Path("/loki/api/v1/cache/generation_numbers").Methods("GET").Handler(t.addCompactorMiddleware(t.compactor.DeleteRequestsHandler.GetCacheGenerationNumberHandler))
}
return t.compactor, nil
}
func (t *Loki) addCompactorMiddleware(h http.HandlerFunc) http.Handler {
return t.HTTPAuthMiddleware.Wrap(deletion.TenantMiddleware(t.overrides, h))
}
func (t *Loki) initIndexGateway() (services.Service, error) {
t.Cfg.IndexGateway.Ring.ListenPort = t.Cfg.Server.GRPCListenPort
@ -1038,7 +1041,7 @@ func (t *Loki) initUsageReport() (services.Service, error) {
return ur, nil
}
func (t *Loki) deleteRequestsClient(clientType string, limits retention.Limits) (deletion.DeleteRequestsClient, error) {
func (t *Loki) deleteRequestsClient(clientType string, limits *validation.Overrides) (deletion.DeleteRequestsClient, error) {
// TODO(owen-d): enable delete request storage in tsdb
if config.UsingTSDB(t.Cfg.SchemaConfig.Configs) {
return deletion.NewNoOpDeleteRequestsStore(), nil

@ -11,6 +11,8 @@ import (
"sync"
"time"
"github.com/grafana/loki/pkg/validation"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/ring"
@ -139,7 +141,7 @@ type Compactor struct {
subservicesWatcher *services.FailureWatcher
}
func NewCompactor(cfg Config, objectClient client.ObjectClient, schemaConfig config.SchemaConfig, limits retention.Limits, r prometheus.Registerer) (*Compactor, error) {
func NewCompactor(cfg Config, objectClient client.ObjectClient, schemaConfig config.SchemaConfig, limits *validation.Overrides, r prometheus.Registerer) (*Compactor, error) {
retentionEnabledStats.Set("false")
if cfg.RetentionEnabled {
retentionEnabledStats.Set("true")
@ -205,7 +207,7 @@ func NewCompactor(cfg Config, objectClient client.ObjectClient, schemaConfig con
return compactor, nil
}
func (c *Compactor) init(objectClient client.ObjectClient, schemaConfig config.SchemaConfig, limits retention.Limits, r prometheus.Registerer) error {
func (c *Compactor) init(objectClient client.ObjectClient, schemaConfig config.SchemaConfig, limits *validation.Overrides, r prometheus.Registerer) error {
err := chunk_util.EnsureDirectory(c.cfg.WorkingDirectory)
if err != nil {
return err
@ -240,7 +242,7 @@ func (c *Compactor) init(objectClient client.ObjectClient, schemaConfig config.S
return nil
}
func (c *Compactor) initDeletes(r prometheus.Registerer, limits retention.Limits) error {
func (c *Compactor) initDeletes(r prometheus.Registerer, limits *validation.Overrides) error {
deletionWorkDir := filepath.Join(c.cfg.WorkingDirectory, "deletion")
store, err := deletion.NewDeleteStore(deletionWorkDir, c.indexStorageClient)
@ -251,7 +253,7 @@ func (c *Compactor) initDeletes(r prometheus.Registerer, limits retention.Limits
c.DeleteRequestsHandler = deletion.NewDeleteRequestHandler(
c.deleteRequestsStore,
limits,
c.cfg.DeleteRequestCancelPeriod,
r,
)

@ -40,10 +40,10 @@ type DeleteRequestsManager struct {
wg sync.WaitGroup
done chan struct{}
batchSize int
limits retention.Limits
limits Limits
}
func NewDeleteRequestsManager(store DeleteRequestsStore, deleteRequestCancelPeriod time.Duration, batchSize int, limits retention.Limits, registerer prometheus.Registerer) *DeleteRequestsManager {
func NewDeleteRequestsManager(store DeleteRequestsStore, deleteRequestCancelPeriod time.Duration, batchSize int, limits Limits, registerer prometheus.Registerer) *DeleteRequestsManager {
dm := &DeleteRequestsManager{
deleteRequestsStore: store,
deleteRequestCancelPeriod: deleteRequestCancelPeriod,

@ -433,7 +433,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
},
} {
t.Run(tc.name, func(t *testing.T) {
mgr := NewDeleteRequestsManager(mockDeleteRequestsStore{deleteRequests: tc.deleteRequestsFromStore}, time.Hour, tc.batchSize, deleteLimits(tc.deletionMode), nil)
mgr := NewDeleteRequestsManager(&mockDeleteRequestsStore{deleteRequests: tc.deleteRequestsFromStore}, time.Hour, tc.batchSize, &fakeLimits{mode: tc.deletionMode.String()}, nil)
require.NoError(t, mgr.loadDeleteRequestsToProcess())
for _, deleteRequests := range mgr.deleteRequestsToProcess {
@ -475,7 +475,7 @@ func TestDeleteRequestsManager_IntervalMayHaveExpiredChunks(t *testing.T) {
}
for _, tc := range tt {
mgr := NewDeleteRequestsManager(mockDeleteRequestsStore{deleteRequests: tc.deleteRequestsFromStore}, time.Hour, 70, deleteLimits(deletionmode.FilterAndDelete), nil)
mgr := NewDeleteRequestsManager(&mockDeleteRequestsStore{deleteRequests: tc.deleteRequestsFromStore}, time.Hour, 70, &fakeLimits{mode: deletionmode.FilterAndDelete.String()}, nil)
require.NoError(t, mgr.loadDeleteRequestsToProcess())
interval := model.Interval{Start: 300, End: 600}
@ -486,16 +486,21 @@ func TestDeleteRequestsManager_IntervalMayHaveExpiredChunks(t *testing.T) {
type mockDeleteRequestsStore struct {
DeleteRequestsStore
deleteRequests []DeleteRequest
addedUser string
addedStartTime model.Time
addedEndTime model.Time
addedQuery string
addErr error
}
func (m mockDeleteRequestsStore) GetDeleteRequestsByStatus(_ context.Context, _ DeleteRequestStatus) ([]DeleteRequest, error) {
func (m *mockDeleteRequestsStore) GetDeleteRequestsByStatus(_ context.Context, _ DeleteRequestStatus) ([]DeleteRequest, error) {
return m.deleteRequests, nil
}
func deleteLimits(mode deletionmode.Mode) *fakeLimits {
return &fakeLimits{
defaultLimit: retentionLimit{
compactorDeletionEnabled: mode.String(),
},
}
func (m *mockDeleteRequestsStore) AddDeleteRequest(ctx context.Context, userID string, startTime, endTime model.Time, query string) error {
m.addedUser = userID
m.addedStartTime = startTime
m.addedEndTime = endTime
m.addedQuery = query
return m.addErr
}

@ -2,8 +2,13 @@ package deletion
import (
"encoding/json"
"errors"
"fmt"
"net/http"
"net/url"
"time"
"github.com/grafana/loki/pkg/util"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
@ -11,38 +16,29 @@ import (
"github.com/grafana/dskit/tenant"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/retention"
"github.com/grafana/loki/pkg/util"
util_log "github.com/grafana/loki/pkg/util/log"
)
const deletionNotAvailableMsg = "deletion is not available for this tenant"
// DeleteRequestHandler provides handlers for delete requests
type DeleteRequestHandler struct {
deleteRequestsStore DeleteRequestsStore
metrics *deleteRequestHandlerMetrics
limits retention.Limits
deleteRequestsStore DeleteRequestsStore
metrics *deleteRequestHandlerMetrics
deleteRequestCancelPeriod time.Duration
}
// NewDeleteRequestHandler creates a DeleteRequestHandler
func NewDeleteRequestHandler(deleteStore DeleteRequestsStore, limits retention.Limits, registerer prometheus.Registerer) *DeleteRequestHandler {
func NewDeleteRequestHandler(deleteStore DeleteRequestsStore, deleteRequestCancelPeriod time.Duration, registerer prometheus.Registerer) *DeleteRequestHandler {
deleteMgr := DeleteRequestHandler{
deleteRequestsStore: deleteStore,
limits: limits,
metrics: newDeleteRequestHandlerMetrics(registerer),
deleteRequestsStore: deleteStore,
deleteRequestCancelPeriod: deleteRequestCancelPeriod,
metrics: newDeleteRequestHandlerMetrics(registerer),
}
return &deleteMgr
}
// AddDeleteRequestHandler handles addition of a new delete request
func (dm *DeleteRequestHandler) AddDeleteRequestHandler() http.Handler {
return dm.deletionMiddleware(http.HandlerFunc(dm.addDeleteRequestHandler))
}
// AddDeleteRequestHandler handles addition of a new delete request
func (dm *DeleteRequestHandler) addDeleteRequestHandler(w http.ResponseWriter, r *http.Request) {
func (dm *DeleteRequestHandler) AddDeleteRequestHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
userID, err := tenant.TenantID(ctx)
if err != nil {
@ -51,46 +47,21 @@ func (dm *DeleteRequestHandler) addDeleteRequestHandler(w http.ResponseWriter, r
}
params := r.URL.Query()
query := params.Get("query")
if len(query) == 0 {
http.Error(w, "query not set", http.StatusBadRequest)
query, err := query(params)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
_, err = parseDeletionQuery(query)
startTime, err := startTime(params)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
startParam := params.Get("start")
startTime := int64(0)
if startParam != "" {
startTime, err = util.ParseTime(startParam)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
}
endParam := params.Get("end")
endTime := int64(model.Now())
if endParam != "" {
endTime, err = util.ParseTime(endParam)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if endTime > int64(model.Now()) {
http.Error(w, "deletes in the future are not allowed", http.StatusBadRequest)
return
}
}
if startTime > endTime {
http.Error(w, "start time can't be greater than end time", http.StatusBadRequest)
endTime, err := endTime(params, startTime)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
@ -111,12 +82,7 @@ func (dm *DeleteRequestHandler) addDeleteRequestHandler(w http.ResponseWriter, r
}
// GetAllDeleteRequestsHandler handles get all delete requests
func (dm *DeleteRequestHandler) GetAllDeleteRequestsHandler() http.Handler {
return dm.deletionMiddleware(http.HandlerFunc(dm.getAllDeleteRequestsHandler))
}
// GetAllDeleteRequestsHandler handles get all delete requests
func (dm *DeleteRequestHandler) getAllDeleteRequestsHandler(w http.ResponseWriter, r *http.Request) {
func (dm *DeleteRequestHandler) GetAllDeleteRequestsHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
userID, err := tenant.TenantID(ctx)
if err != nil {
@ -138,12 +104,7 @@ func (dm *DeleteRequestHandler) getAllDeleteRequestsHandler(w http.ResponseWrite
}
// CancelDeleteRequestHandler handles delete request cancellation
func (dm *DeleteRequestHandler) CancelDeleteRequestHandler() http.Handler {
return dm.deletionMiddleware(http.HandlerFunc(dm.cancelDeleteRequestHandler))
}
// CancelDeleteRequestHandler handles delete request cancellation
func (dm *DeleteRequestHandler) cancelDeleteRequestHandler(w http.ResponseWriter, r *http.Request) {
func (dm *DeleteRequestHandler) CancelDeleteRequestHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
userID, err := tenant.TenantID(ctx)
if err != nil {
@ -181,12 +142,7 @@ func (dm *DeleteRequestHandler) cancelDeleteRequestHandler(w http.ResponseWriter
}
// GetCacheGenerationNumberHandler handles requests for a user's cache generation number
func (dm *DeleteRequestHandler) GetCacheGenerationNumberHandler() http.Handler {
return dm.deletionMiddleware(http.HandlerFunc(dm.getCacheGenerationNumberHandler))
}
// GetCacheGenerationNumberHandler handles requests for a user's cache generation number
func (dm *DeleteRequestHandler) getCacheGenerationNumberHandler(w http.ResponseWriter, r *http.Request) {
func (dm *DeleteRequestHandler) GetCacheGenerationNumberHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
userID, err := tenant.TenantID(ctx)
if err != nil {
@ -207,26 +163,68 @@ func (dm *DeleteRequestHandler) getCacheGenerationNumberHandler(w http.ResponseW
}
}
func (dm *DeleteRequestHandler) deletionMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
userID, err := tenant.TenantID(ctx)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
hasDelete, err := validDeletionLimit(dm.limits, userID)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if !hasDelete {
http.Error(w, deletionNotAvailableMsg, http.StatusForbidden)
return
}
next.ServeHTTP(w, r)
})
func query(params url.Values) (string, error) {
query := params.Get("query")
if len(query) == 0 {
return "", errors.New("query not set")
}
if _, err := parseDeletionQuery(query); err != nil {
return "", err
}
return query, nil
}
func startTime(params url.Values) (int64, error) {
startParam := params.Get("start")
if startParam == "" {
return 0, errors.New("start time not set")
}
st, err := parseTime(startParam)
if err != nil {
return 0, errors.New("invalid start time: require unix seconds or RFC3339 format")
}
return st, nil
}
func endTime(params url.Values, startTime int64) (int64, error) {
endParam := params.Get("end")
endTime, err := parseTime(endParam)
if err != nil {
return 0, errors.New("invalid end time: require unix seconds or RFC3339 format")
}
if endTime > int64(model.Now()) {
return 0, errors.New("deletes in the future are not allowed")
}
if startTime > endTime {
return 0, errors.New("start time can't be greater than end time")
}
return endTime, nil
}
func parseTime(in string) (int64, error) {
if in == "" {
return int64(model.Now()), nil
}
t, err := time.Parse(time.RFC3339, in)
if err != nil {
return timeFromInt(in)
}
return t.UnixMilli(), nil
}
func timeFromInt(in string) (int64, error) {
if len(in) != 10 {
return 0, errors.New("not unix seconds")
}
return util.ParseTime(in)
}

@ -1,132 +1,134 @@
package deletion
import (
"context"
"fmt"
"net/http"
"net/http/httptest"
"path/filepath"
"strings"
"testing"
"time"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/user"
"github.com/grafana/loki/pkg/storage/chunk/client/local"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/storage"
"github.com/grafana/loki/pkg/validation"
"github.com/grafana/loki/pkg/util"
"github.com/weaveworks/common/user"
)
func TestDeleteRequestHandlerDeletionMiddleware(t *testing.T) {
// build the store
tempDir := t.TempDir()
func TestAddDeleteRequestHandler(t *testing.T) {
t.Run("it adds the delete request to the store", func(t *testing.T) {
store := &mockDeleteRequestsStore{}
h := NewDeleteRequestHandler(store, time.Second, nil)
workingDir := filepath.Join(tempDir, "working-dir")
objectStorePath := filepath.Join(tempDir, "object-store")
req := buildRequest("org-id", `{foo="bar"}`, "0000000000", "0000000001")
objectClient, err := local.NewFSObjectClient(local.FSConfig{
Directory: objectStorePath,
})
require.NoError(t, err)
testDeleteRequestsStore, err := NewDeleteStore(workingDir, storage.NewIndexStorageClient(objectClient, ""))
require.NoError(t, err)
// limits
fl := &fakeLimits{
defaultLimit: retentionLimit{
compactorDeletionEnabled: "disabled",
},
perTenant: map[string]retentionLimit{
"1": {compactorDeletionEnabled: "filter-only"},
"2": {compactorDeletionEnabled: "disabled"},
},
}
w := httptest.NewRecorder()
h.AddDeleteRequestHandler(w, req)
// Setup handler
drh := NewDeleteRequestHandler(testDeleteRequestsStore, fl, nil)
middle := drh.deletionMiddleware(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {}))
require.Equal(t, w.Code, http.StatusNoContent)
// User that has deletion enabled
req := httptest.NewRequest(http.MethodGet, "http://www.your-domain.com", nil)
req = req.WithContext(user.InjectOrgID(req.Context(), "1"))
res := httptest.NewRecorder()
middle.ServeHTTP(res, req)
require.Equal(t, http.StatusOK, res.Result().StatusCode)
require.Equal(t, "org-id", store.addedUser)
require.Equal(t, `{foo="bar"}`, store.addedQuery)
require.Equal(t, toTime("0000000000"), store.addedStartTime)
require.Equal(t, toTime("0000000001"), store.addedEndTime)
})
// User that does not have deletion enabled
req = httptest.NewRequest(http.MethodGet, "http://www.your-domain.com", nil)
req = req.WithContext(user.InjectOrgID(req.Context(), "2"))
t.Run("it works with RFC3339", func(t *testing.T) {
store := &mockDeleteRequestsStore{}
h := NewDeleteRequestHandler(store, time.Second, nil)
res = httptest.NewRecorder()
middle.ServeHTTP(res, req)
req := buildRequest("org-id", `{foo="bar"}`, "2006-01-02T15:04:05Z", "2006-01-03T15:04:05Z")
require.Equal(t, http.StatusForbidden, res.Result().StatusCode)
w := httptest.NewRecorder()
h.AddDeleteRequestHandler(w, req)
// User without override, this should use the default value which is false
req = httptest.NewRequest(http.MethodGet, "http://www.your-domain.com", nil)
req = req.WithContext(user.InjectOrgID(req.Context(), "3"))
require.Equal(t, w.Code, http.StatusNoContent)
res = httptest.NewRecorder()
middle.ServeHTTP(res, req)
require.Equal(t, "org-id", store.addedUser)
require.Equal(t, `{foo="bar"}`, store.addedQuery)
require.Equal(t, toTime("1136214245"), store.addedStartTime)
require.Equal(t, toTime("1136300645"), store.addedEndTime)
})
require.Equal(t, http.StatusForbidden, res.Result().StatusCode)
t.Run("it fills in end time if blank", func(t *testing.T) {
store := &mockDeleteRequestsStore{}
h := NewDeleteRequestHandler(store, time.Second, nil)
// User without override, after the default value is set to true
fl.defaultLimit.compactorDeletionEnabled = "filter-and-delete"
req := buildRequest("org-id", `{foo="bar"}`, "0000000000", "")
req = httptest.NewRequest(http.MethodGet, "http://www.your-domain.com", nil)
req = req.WithContext(user.InjectOrgID(req.Context(), "3"))
w := httptest.NewRecorder()
h.AddDeleteRequestHandler(w, req)
res = httptest.NewRecorder()
middle.ServeHTTP(res, req)
require.Equal(t, w.Code, http.StatusNoContent)
require.Equal(t, http.StatusOK, res.Result().StatusCode)
require.Equal(t, "org-id", store.addedUser)
require.Equal(t, `{foo="bar"}`, store.addedQuery)
require.Equal(t, toTime("0000000000"), store.addedStartTime)
require.InDelta(t, int64(model.Now()), int64(store.addedEndTime), 1000)
})
// User header is not given
req = httptest.NewRequest(http.MethodGet, "http://www.your-domain.com", nil)
t.Run("it returns 500 when the delete store errors", func(t *testing.T) {
store := &mockDeleteRequestsStore{addErr: errors.New("something bad")}
h := NewDeleteRequestHandler(store, time.Second, nil)
res = httptest.NewRecorder()
middle.ServeHTTP(res, req)
req := buildRequest("org-id", `{foo="bar"}`, "0000000000", "0000000001")
require.Equal(t, http.StatusBadRequest, res.Result().StatusCode)
}
w := httptest.NewRecorder()
h.AddDeleteRequestHandler(w, req)
require.Equal(t, w.Code, http.StatusInternalServerError)
})
type retentionLimit struct {
compactorDeletionEnabled string
retentionPeriod time.Duration
streamRetention []validation.StreamRetention
t.Run("Validation", func(t *testing.T) {
h := NewDeleteRequestHandler(&mockDeleteRequestsStore{}, time.Second, nil)
for _, tc := range []struct {
orgID, query, startTime, endTime, error string
}{
{"", `{foo="bar"}`, "0000000000", "0000000001", "no org id\n"},
{"org-id", "", "0000000000", "0000000001", "query not set\n"},
{"org-id", `not a query`, "0000000000", "0000000001", "invalid query expression\n"},
{"org-id", `{foo="bar"}`, "", "0000000001", "start time not set\n"},
{"org-id", `{foo="bar"}`, "0000000000000", "0000000001", "invalid start time: require unix seconds or RFC3339 format\n"},
{"org-id", `{foo="bar"}`, "0000000000", "0000000000001", "invalid end time: require unix seconds or RFC3339 format\n"},
{"org-id", `{foo="bar"}`, "0000000000", fmt.Sprint(time.Now().Add(time.Hour).Unix())[:10], "deletes in the future are not allowed\n"},
{"org-id", `{foo="bar"}`, "0000000001", "0000000000", "start time can't be greater than end time\n"},
} {
t.Run(strings.TrimSpace(tc.error), func(t *testing.T) {
req := buildRequest(tc.orgID, tc.query, tc.startTime, tc.endTime)
w := httptest.NewRecorder()
h.AddDeleteRequestHandler(w, req)
require.Equal(t, w.Code, http.StatusBadRequest)
require.Equal(t, w.Body.String(), tc.error)
})
}
})
}
func (r retentionLimit) convertToValidationLimit() *validation.Limits {
return &validation.Limits{
DeletionMode: r.compactorDeletionEnabled,
RetentionPeriod: model.Duration(r.retentionPeriod),
StreamRetention: r.streamRetention,
func buildRequest(orgID, query, start, end string) *http.Request {
var req *http.Request
if orgID == "" {
req, _ = http.NewRequest(http.MethodGet, "", nil)
} else {
ctx := user.InjectOrgID(context.Background(), orgID)
req, _ = http.NewRequestWithContext(ctx, http.MethodGet, "", nil)
}
}
type fakeLimits struct {
defaultLimit retentionLimit
perTenant map[string]retentionLimit
}
func (f fakeLimits) RetentionPeriod(userID string) time.Duration {
return f.perTenant[userID].retentionPeriod
}
q := req.URL.Query()
q.Set("query", query)
q.Set("start", start)
q.Set("end", end)
req.URL.RawQuery = q.Encode()
func (f fakeLimits) StreamRetention(userID string) []validation.StreamRetention {
return f.perTenant[userID].streamRetention
return req
}
func (f fakeLimits) DefaultLimits() *validation.Limits {
return f.defaultLimit.convertToValidationLimit()
}
func (f fakeLimits) AllByUserID() map[string]*validation.Limits {
res := make(map[string]*validation.Limits)
for userID, ret := range f.perTenant {
res[userID] = ret.convertToValidationLimit()
}
return res
func toTime(t string) model.Time {
modelTime, _ := util.ParseTime(t)
return model.Time(modelTime)
}

@ -2,16 +2,20 @@ package deletion
import (
"context"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/retention"
)
const deletionNotAvailableMsg = "deletion is not available for this tenant"
type Limits interface {
DeletionMode(userID string) string
}
type perTenantDeleteRequestsClient struct {
client DeleteRequestsClient
limits retention.Limits
limits Limits
}
func NewPerTenantDeleteRequestsClient(c DeleteRequestsClient, l retention.Limits) DeleteRequestsClient {
func NewPerTenantDeleteRequestsClient(c DeleteRequestsClient, l Limits) DeleteRequestsClient {
return &perTenantDeleteRequestsClient{
client: c,
limits: l,

@ -26,27 +26,6 @@ func TestTenantDeleteRequestsClient(t *testing.T) {
require.Nil(t, err)
require.Empty(t, reqs)
})
t.Run("tenant disabled but default enabled", func(t *testing.T) {
limits.defaultLimit.compactorDeletionEnabled = "filter-only"
reqs, err := perTenantClient.GetAllDeleteRequestsForUser(context.Background(), "2")
require.Nil(t, err)
require.Empty(t, reqs)
})
t.Run("default is enabled", func(t *testing.T) {
limits.defaultLimit.compactorDeletionEnabled = "filter-and-delete"
reqs, err := perTenantClient.GetAllDeleteRequestsForUser(context.Background(), "3")
require.Nil(t, err)
require.Equal(t, []DeleteRequest{{RequestID: "test-request"}}, reqs)
})
t.Run("default is disabled", func(t *testing.T) {
limits.defaultLimit.compactorDeletionEnabled = "disabled"
reqs, err := perTenantClient.GetAllDeleteRequestsForUser(context.Background(), "3")
require.Nil(t, err)
require.Empty(t, reqs)
})
}
type fakeRequestsClient struct {
@ -61,9 +40,9 @@ func (c *fakeRequestsClient) GetAllDeleteRequestsForUser(_ context.Context, user
var (
limits = &fakeLimits{
perTenant: map[string]retentionLimit{
"1": {compactorDeletionEnabled: "filter-only"},
"2": {compactorDeletionEnabled: "disabled"},
limits: map[string]string{
"1": "filter-only",
"2": "disabled",
},
}
)

@ -0,0 +1,31 @@
package deletion
import (
"net/http"
"github.com/grafana/dskit/tenant"
)
func TenantMiddleware(limits Limits, next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
userID, err := tenant.TenantID(ctx)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
hasDelete, err := validDeletionLimit(limits, userID)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if !hasDelete {
http.Error(w, deletionNotAvailableMsg, http.StatusForbidden)
return
}
next.ServeHTTP(w, r)
})
}

@ -0,0 +1,83 @@
package deletion
import (
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/retention"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/user"
"github.com/grafana/loki/pkg/validation"
)
func TestDeleteRequestHandlerDeletionMiddleware(t *testing.T) {
fl := &fakeLimits{
limits: map[string]string{
"1": "filter-only",
"2": "disabled",
},
}
// Setup handler
middle := TenantMiddleware(fl, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {}))
// User that has deletion enabled
req := httptest.NewRequest(http.MethodGet, "http://www.your-domain.com", nil)
req = req.WithContext(user.InjectOrgID(req.Context(), "1"))
res := httptest.NewRecorder()
middle.ServeHTTP(res, req)
require.Equal(t, http.StatusOK, res.Result().StatusCode)
// User that does not have deletion enabled
req = httptest.NewRequest(http.MethodGet, "http://www.your-domain.com", nil)
req = req.WithContext(user.InjectOrgID(req.Context(), "2"))
res = httptest.NewRecorder()
middle.ServeHTTP(res, req)
require.Equal(t, http.StatusForbidden, res.Result().StatusCode)
// User header is not given
req = httptest.NewRequest(http.MethodGet, "http://www.your-domain.com", nil)
res = httptest.NewRecorder()
middle.ServeHTTP(res, req)
require.Equal(t, http.StatusBadRequest, res.Result().StatusCode)
}
type retentionLimit struct {
compactorDeletionEnabled string
retentionPeriod time.Duration
streamRetention []validation.StreamRetention
}
func (r retentionLimit) convertToValidationLimit() *validation.Limits {
return &validation.Limits{
DeletionMode: r.compactorDeletionEnabled,
RetentionPeriod: model.Duration(r.retentionPeriod),
StreamRetention: r.streamRetention,
}
}
type fakeLimits struct {
retention.Limits
limits map[string]string
mode string
}
func (f *fakeLimits) DeletionMode(userID string) string {
if f.mode != "" {
return f.mode
}
return f.limits[userID]
}

@ -5,8 +5,6 @@ import (
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/deletionmode"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/retention"
"github.com/grafana/loki/pkg/logql/syntax"
)
@ -24,7 +22,7 @@ func parseDeletionQuery(query string) (syntax.LogSelectorExpr, error) {
return logSelectorExpr, nil
}
func validDeletionLimit(l retention.Limits, userID string) (bool, error) {
func validDeletionLimit(l Limits, userID string) (bool, error) {
mode, err := deleteModeFromLimits(l, userID)
if err != nil {
return false, err
@ -33,10 +31,7 @@ func validDeletionLimit(l retention.Limits, userID string) (bool, error) {
return mode.DeleteEnabled(), nil
}
func deleteModeFromLimits(l retention.Limits, userID string) (deletionmode.Mode, error) {
allLimits := l.AllByUserID()
if userLimits, ok := allLimits[userID]; ok {
return deletionmode.ParseMode(userLimits.DeletionMode)
}
return deletionmode.ParseMode(l.DefaultLimits().DeletionMode)
func deleteModeFromLimits(l Limits, userID string) (deletionmode.Mode, error) {
mode := l.DeletionMode(userID)
return deletionmode.ParseMode(mode)
}

Loading…
Cancel
Save