From 983ab80e7cc6b99de5cf2fb62c71f0c24d6d7328 Mon Sep 17 00:00:00 2001 From: Travis Patterson Date: Wed, 10 Aug 2022 11:09:02 -0600 Subject: [PATCH] Add delete api validations (#6860) * break out the middleware * Add validation to the API - Time must be in RFC3339 or a 10-digit-unix-seconds timestamp - Start time must always exist * lint * docs * improve tests * cleanup * cleanup * access runtime config via function on validation --- docs/sources/api/_index.md | 4 +- .../loki_micro_services_delete_test.go | 2 +- pkg/loki/modules.go | 15 +- .../indexshipper/compactor/compactor.go | 10 +- .../deletion/delete_requests_manager.go | 4 +- .../deletion/delete_requests_manager_test.go | 23 ++- .../compactor/deletion/request_handler.go | 176 ++++++++-------- .../deletion/request_handler_test.go | 188 +++++++++--------- .../deletion/tenant_delete_requests_client.go | 12 +- .../tenant_delete_requests_client_test.go | 27 +-- .../deletion/tenant_request_handler.go | 31 +++ .../deletion/tenant_request_handler_test.go | 83 ++++++++ .../indexshipper/compactor/deletion/util.go | 13 +- 13 files changed, 345 insertions(+), 243 deletions(-) create mode 100644 pkg/storage/stores/indexshipper/compactor/deletion/tenant_request_handler.go create mode 100644 pkg/storage/stores/indexshipper/compactor/deletion/tenant_request_handler_test.go diff --git a/docs/sources/api/_index.md b/docs/sources/api/_index.md index f7ef842753..0c46e60ca4 100644 --- a/docs/sources/api/_index.md +++ b/docs/sources/api/_index.md @@ -1034,8 +1034,8 @@ Log entry deletion is supported _only_ when the BoltDB Shipper is configured for Query parameters: * `query=`: query argument that identifies the streams from which to delete with optional line filters. -* `start=`: A timestamp that identifies the start of the time window within which entries will be deleted. If not specified, defaults to 0, the Unix Epoch time. -* `end=`: A timestamp that identifies the end of the time window within which entries will be deleted. If not specified, defaults to the current time. +* `start=`: A timestamp that identifies the start of the time window within which entries will be deleted. This parameter is required. +* `end=`: A timestamp that identifies the end of the time window within which entries will be deleted. If not specified, defaults to the current time. A 204 response indicates success. diff --git a/integration/loki_micro_services_delete_test.go b/integration/loki_micro_services_delete_test.go index dd1965968c..739512f310 100644 --- a/integration/loki_micro_services_delete_test.go +++ b/integration/loki_micro_services_delete_test.go @@ -107,7 +107,7 @@ func TestMicroServicesDeleteRequest(t *testing.T) { }) t.Run("add-delete-request", func(t *testing.T) { - params := client.DeleteRequestParams{Query: `{job="fake"} |= "lineB"`} + params := client.DeleteRequestParams{Start: "0000000000", Query: `{job="fake"} |= "lineB"`} require.NoError(t, cliCompactor.AddDeleteRequest(params)) }) diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index a97ce6a19a..50f7cab01d 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -52,7 +52,6 @@ import ( "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor" "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/deletion" "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/generationnumber" - "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/retention" "github.com/grafana/loki/pkg/storage/stores/series/index" shipper_index "github.com/grafana/loki/pkg/storage/stores/shipper/index" boltdb_shipper_compactor "github.com/grafana/loki/pkg/storage/stores/shipper/index/compactor" @@ -937,15 +936,19 @@ func (t *Loki) initCompactor() (services.Service, error) { t.Server.HTTP.Path("/compactor/ring").Methods("GET", "POST").Handler(t.compactor) if t.Cfg.CompactorConfig.RetentionEnabled { - t.Server.HTTP.Path("/loki/api/v1/delete").Methods("PUT", "POST").Handler(t.HTTPAuthMiddleware.Wrap(t.compactor.DeleteRequestsHandler.AddDeleteRequestHandler())) - t.Server.HTTP.Path("/loki/api/v1/delete").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(t.compactor.DeleteRequestsHandler.GetAllDeleteRequestsHandler())) - t.Server.HTTP.Path("/loki/api/v1/delete").Methods("DELETE").Handler(t.HTTPAuthMiddleware.Wrap(t.compactor.DeleteRequestsHandler.CancelDeleteRequestHandler())) - t.Server.HTTP.Path("/loki/api/v1/cache/generation_numbers").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(t.compactor.DeleteRequestsHandler.GetCacheGenerationNumberHandler())) + t.Server.HTTP.Path("/loki/api/v1/delete").Methods("PUT", "POST").Handler(t.addCompactorMiddleware(t.compactor.DeleteRequestsHandler.AddDeleteRequestHandler)) + t.Server.HTTP.Path("/loki/api/v1/delete").Methods("GET").Handler(t.addCompactorMiddleware(t.compactor.DeleteRequestsHandler.GetAllDeleteRequestsHandler)) + t.Server.HTTP.Path("/loki/api/v1/delete").Methods("DELETE").Handler(t.addCompactorMiddleware(t.compactor.DeleteRequestsHandler.CancelDeleteRequestHandler)) + t.Server.HTTP.Path("/loki/api/v1/cache/generation_numbers").Methods("GET").Handler(t.addCompactorMiddleware(t.compactor.DeleteRequestsHandler.GetCacheGenerationNumberHandler)) } return t.compactor, nil } +func (t *Loki) addCompactorMiddleware(h http.HandlerFunc) http.Handler { + return t.HTTPAuthMiddleware.Wrap(deletion.TenantMiddleware(t.overrides, h)) +} + func (t *Loki) initIndexGateway() (services.Service, error) { t.Cfg.IndexGateway.Ring.ListenPort = t.Cfg.Server.GRPCListenPort @@ -1038,7 +1041,7 @@ func (t *Loki) initUsageReport() (services.Service, error) { return ur, nil } -func (t *Loki) deleteRequestsClient(clientType string, limits retention.Limits) (deletion.DeleteRequestsClient, error) { +func (t *Loki) deleteRequestsClient(clientType string, limits *validation.Overrides) (deletion.DeleteRequestsClient, error) { // TODO(owen-d): enable delete request storage in tsdb if config.UsingTSDB(t.Cfg.SchemaConfig.Configs) { return deletion.NewNoOpDeleteRequestsStore(), nil diff --git a/pkg/storage/stores/indexshipper/compactor/compactor.go b/pkg/storage/stores/indexshipper/compactor/compactor.go index 9b64378457..145dfe3740 100644 --- a/pkg/storage/stores/indexshipper/compactor/compactor.go +++ b/pkg/storage/stores/indexshipper/compactor/compactor.go @@ -11,6 +11,8 @@ import ( "sync" "time" + "github.com/grafana/loki/pkg/validation" + "github.com/go-kit/log/level" "github.com/grafana/dskit/kv" "github.com/grafana/dskit/ring" @@ -139,7 +141,7 @@ type Compactor struct { subservicesWatcher *services.FailureWatcher } -func NewCompactor(cfg Config, objectClient client.ObjectClient, schemaConfig config.SchemaConfig, limits retention.Limits, r prometheus.Registerer) (*Compactor, error) { +func NewCompactor(cfg Config, objectClient client.ObjectClient, schemaConfig config.SchemaConfig, limits *validation.Overrides, r prometheus.Registerer) (*Compactor, error) { retentionEnabledStats.Set("false") if cfg.RetentionEnabled { retentionEnabledStats.Set("true") @@ -205,7 +207,7 @@ func NewCompactor(cfg Config, objectClient client.ObjectClient, schemaConfig con return compactor, nil } -func (c *Compactor) init(objectClient client.ObjectClient, schemaConfig config.SchemaConfig, limits retention.Limits, r prometheus.Registerer) error { +func (c *Compactor) init(objectClient client.ObjectClient, schemaConfig config.SchemaConfig, limits *validation.Overrides, r prometheus.Registerer) error { err := chunk_util.EnsureDirectory(c.cfg.WorkingDirectory) if err != nil { return err @@ -240,7 +242,7 @@ func (c *Compactor) init(objectClient client.ObjectClient, schemaConfig config.S return nil } -func (c *Compactor) initDeletes(r prometheus.Registerer, limits retention.Limits) error { +func (c *Compactor) initDeletes(r prometheus.Registerer, limits *validation.Overrides) error { deletionWorkDir := filepath.Join(c.cfg.WorkingDirectory, "deletion") store, err := deletion.NewDeleteStore(deletionWorkDir, c.indexStorageClient) @@ -251,7 +253,7 @@ func (c *Compactor) initDeletes(r prometheus.Registerer, limits retention.Limits c.DeleteRequestsHandler = deletion.NewDeleteRequestHandler( c.deleteRequestsStore, - limits, + c.cfg.DeleteRequestCancelPeriod, r, ) diff --git a/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_manager.go b/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_manager.go index 8d6a7a3dfb..6852e15bf9 100644 --- a/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_manager.go +++ b/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_manager.go @@ -40,10 +40,10 @@ type DeleteRequestsManager struct { wg sync.WaitGroup done chan struct{} batchSize int - limits retention.Limits + limits Limits } -func NewDeleteRequestsManager(store DeleteRequestsStore, deleteRequestCancelPeriod time.Duration, batchSize int, limits retention.Limits, registerer prometheus.Registerer) *DeleteRequestsManager { +func NewDeleteRequestsManager(store DeleteRequestsStore, deleteRequestCancelPeriod time.Duration, batchSize int, limits Limits, registerer prometheus.Registerer) *DeleteRequestsManager { dm := &DeleteRequestsManager{ deleteRequestsStore: store, deleteRequestCancelPeriod: deleteRequestCancelPeriod, diff --git a/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_manager_test.go b/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_manager_test.go index e7d9f650a5..323fa002be 100644 --- a/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_manager_test.go +++ b/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_manager_test.go @@ -433,7 +433,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { }, } { t.Run(tc.name, func(t *testing.T) { - mgr := NewDeleteRequestsManager(mockDeleteRequestsStore{deleteRequests: tc.deleteRequestsFromStore}, time.Hour, tc.batchSize, deleteLimits(tc.deletionMode), nil) + mgr := NewDeleteRequestsManager(&mockDeleteRequestsStore{deleteRequests: tc.deleteRequestsFromStore}, time.Hour, tc.batchSize, &fakeLimits{mode: tc.deletionMode.String()}, nil) require.NoError(t, mgr.loadDeleteRequestsToProcess()) for _, deleteRequests := range mgr.deleteRequestsToProcess { @@ -475,7 +475,7 @@ func TestDeleteRequestsManager_IntervalMayHaveExpiredChunks(t *testing.T) { } for _, tc := range tt { - mgr := NewDeleteRequestsManager(mockDeleteRequestsStore{deleteRequests: tc.deleteRequestsFromStore}, time.Hour, 70, deleteLimits(deletionmode.FilterAndDelete), nil) + mgr := NewDeleteRequestsManager(&mockDeleteRequestsStore{deleteRequests: tc.deleteRequestsFromStore}, time.Hour, 70, &fakeLimits{mode: deletionmode.FilterAndDelete.String()}, nil) require.NoError(t, mgr.loadDeleteRequestsToProcess()) interval := model.Interval{Start: 300, End: 600} @@ -486,16 +486,21 @@ func TestDeleteRequestsManager_IntervalMayHaveExpiredChunks(t *testing.T) { type mockDeleteRequestsStore struct { DeleteRequestsStore deleteRequests []DeleteRequest + addedUser string + addedStartTime model.Time + addedEndTime model.Time + addedQuery string + addErr error } -func (m mockDeleteRequestsStore) GetDeleteRequestsByStatus(_ context.Context, _ DeleteRequestStatus) ([]DeleteRequest, error) { +func (m *mockDeleteRequestsStore) GetDeleteRequestsByStatus(_ context.Context, _ DeleteRequestStatus) ([]DeleteRequest, error) { return m.deleteRequests, nil } -func deleteLimits(mode deletionmode.Mode) *fakeLimits { - return &fakeLimits{ - defaultLimit: retentionLimit{ - compactorDeletionEnabled: mode.String(), - }, - } +func (m *mockDeleteRequestsStore) AddDeleteRequest(ctx context.Context, userID string, startTime, endTime model.Time, query string) error { + m.addedUser = userID + m.addedStartTime = startTime + m.addedEndTime = endTime + m.addedQuery = query + return m.addErr } diff --git a/pkg/storage/stores/indexshipper/compactor/deletion/request_handler.go b/pkg/storage/stores/indexshipper/compactor/deletion/request_handler.go index 67543a71ba..658ba1352c 100644 --- a/pkg/storage/stores/indexshipper/compactor/deletion/request_handler.go +++ b/pkg/storage/stores/indexshipper/compactor/deletion/request_handler.go @@ -2,8 +2,13 @@ package deletion import ( "encoding/json" + "errors" "fmt" "net/http" + "net/url" + "time" + + "github.com/grafana/loki/pkg/util" "github.com/go-kit/log/level" "github.com/prometheus/client_golang/prometheus" @@ -11,38 +16,29 @@ import ( "github.com/grafana/dskit/tenant" - "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/retention" - "github.com/grafana/loki/pkg/util" util_log "github.com/grafana/loki/pkg/util/log" ) -const deletionNotAvailableMsg = "deletion is not available for this tenant" - // DeleteRequestHandler provides handlers for delete requests type DeleteRequestHandler struct { - deleteRequestsStore DeleteRequestsStore - metrics *deleteRequestHandlerMetrics - limits retention.Limits + deleteRequestsStore DeleteRequestsStore + metrics *deleteRequestHandlerMetrics + deleteRequestCancelPeriod time.Duration } // NewDeleteRequestHandler creates a DeleteRequestHandler -func NewDeleteRequestHandler(deleteStore DeleteRequestsStore, limits retention.Limits, registerer prometheus.Registerer) *DeleteRequestHandler { +func NewDeleteRequestHandler(deleteStore DeleteRequestsStore, deleteRequestCancelPeriod time.Duration, registerer prometheus.Registerer) *DeleteRequestHandler { deleteMgr := DeleteRequestHandler{ - deleteRequestsStore: deleteStore, - limits: limits, - metrics: newDeleteRequestHandlerMetrics(registerer), + deleteRequestsStore: deleteStore, + deleteRequestCancelPeriod: deleteRequestCancelPeriod, + metrics: newDeleteRequestHandlerMetrics(registerer), } return &deleteMgr } // AddDeleteRequestHandler handles addition of a new delete request -func (dm *DeleteRequestHandler) AddDeleteRequestHandler() http.Handler { - return dm.deletionMiddleware(http.HandlerFunc(dm.addDeleteRequestHandler)) -} - -// AddDeleteRequestHandler handles addition of a new delete request -func (dm *DeleteRequestHandler) addDeleteRequestHandler(w http.ResponseWriter, r *http.Request) { +func (dm *DeleteRequestHandler) AddDeleteRequestHandler(w http.ResponseWriter, r *http.Request) { ctx := r.Context() userID, err := tenant.TenantID(ctx) if err != nil { @@ -51,46 +47,21 @@ func (dm *DeleteRequestHandler) addDeleteRequestHandler(w http.ResponseWriter, r } params := r.URL.Query() - query := params.Get("query") - if len(query) == 0 { - http.Error(w, "query not set", http.StatusBadRequest) + query, err := query(params) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) return } - _, err = parseDeletionQuery(query) + startTime, err := startTime(params) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } - startParam := params.Get("start") - startTime := int64(0) - if startParam != "" { - startTime, err = util.ParseTime(startParam) - if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - } - - endParam := params.Get("end") - endTime := int64(model.Now()) - - if endParam != "" { - endTime, err = util.ParseTime(endParam) - if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - if endTime > int64(model.Now()) { - http.Error(w, "deletes in the future are not allowed", http.StatusBadRequest) - return - } - } - - if startTime > endTime { - http.Error(w, "start time can't be greater than end time", http.StatusBadRequest) + endTime, err := endTime(params, startTime) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) return } @@ -111,12 +82,7 @@ func (dm *DeleteRequestHandler) addDeleteRequestHandler(w http.ResponseWriter, r } // GetAllDeleteRequestsHandler handles get all delete requests -func (dm *DeleteRequestHandler) GetAllDeleteRequestsHandler() http.Handler { - return dm.deletionMiddleware(http.HandlerFunc(dm.getAllDeleteRequestsHandler)) -} - -// GetAllDeleteRequestsHandler handles get all delete requests -func (dm *DeleteRequestHandler) getAllDeleteRequestsHandler(w http.ResponseWriter, r *http.Request) { +func (dm *DeleteRequestHandler) GetAllDeleteRequestsHandler(w http.ResponseWriter, r *http.Request) { ctx := r.Context() userID, err := tenant.TenantID(ctx) if err != nil { @@ -138,12 +104,7 @@ func (dm *DeleteRequestHandler) getAllDeleteRequestsHandler(w http.ResponseWrite } // CancelDeleteRequestHandler handles delete request cancellation -func (dm *DeleteRequestHandler) CancelDeleteRequestHandler() http.Handler { - return dm.deletionMiddleware(http.HandlerFunc(dm.cancelDeleteRequestHandler)) -} - -// CancelDeleteRequestHandler handles delete request cancellation -func (dm *DeleteRequestHandler) cancelDeleteRequestHandler(w http.ResponseWriter, r *http.Request) { +func (dm *DeleteRequestHandler) CancelDeleteRequestHandler(w http.ResponseWriter, r *http.Request) { ctx := r.Context() userID, err := tenant.TenantID(ctx) if err != nil { @@ -181,12 +142,7 @@ func (dm *DeleteRequestHandler) cancelDeleteRequestHandler(w http.ResponseWriter } // GetCacheGenerationNumberHandler handles requests for a user's cache generation number -func (dm *DeleteRequestHandler) GetCacheGenerationNumberHandler() http.Handler { - return dm.deletionMiddleware(http.HandlerFunc(dm.getCacheGenerationNumberHandler)) -} - -// GetCacheGenerationNumberHandler handles requests for a user's cache generation number -func (dm *DeleteRequestHandler) getCacheGenerationNumberHandler(w http.ResponseWriter, r *http.Request) { +func (dm *DeleteRequestHandler) GetCacheGenerationNumberHandler(w http.ResponseWriter, r *http.Request) { ctx := r.Context() userID, err := tenant.TenantID(ctx) if err != nil { @@ -207,26 +163,68 @@ func (dm *DeleteRequestHandler) getCacheGenerationNumberHandler(w http.ResponseW } } -func (dm *DeleteRequestHandler) deletionMiddleware(next http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - ctx := r.Context() - userID, err := tenant.TenantID(ctx) - if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - hasDelete, err := validDeletionLimit(dm.limits, userID) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - if !hasDelete { - http.Error(w, deletionNotAvailableMsg, http.StatusForbidden) - return - } - - next.ServeHTTP(w, r) - }) +func query(params url.Values) (string, error) { + query := params.Get("query") + if len(query) == 0 { + return "", errors.New("query not set") + } + + if _, err := parseDeletionQuery(query); err != nil { + return "", err + } + + return query, nil +} + +func startTime(params url.Values) (int64, error) { + startParam := params.Get("start") + if startParam == "" { + return 0, errors.New("start time not set") + } + + st, err := parseTime(startParam) + if err != nil { + return 0, errors.New("invalid start time: require unix seconds or RFC3339 format") + } + + return st, nil +} + +func endTime(params url.Values, startTime int64) (int64, error) { + endParam := params.Get("end") + endTime, err := parseTime(endParam) + if err != nil { + return 0, errors.New("invalid end time: require unix seconds or RFC3339 format") + } + + if endTime > int64(model.Now()) { + return 0, errors.New("deletes in the future are not allowed") + } + + if startTime > endTime { + return 0, errors.New("start time can't be greater than end time") + } + + return endTime, nil +} + +func parseTime(in string) (int64, error) { + if in == "" { + return int64(model.Now()), nil + } + + t, err := time.Parse(time.RFC3339, in) + if err != nil { + return timeFromInt(in) + } + + return t.UnixMilli(), nil +} + +func timeFromInt(in string) (int64, error) { + if len(in) != 10 { + return 0, errors.New("not unix seconds") + } + + return util.ParseTime(in) } diff --git a/pkg/storage/stores/indexshipper/compactor/deletion/request_handler_test.go b/pkg/storage/stores/indexshipper/compactor/deletion/request_handler_test.go index a601c55e15..9ec62e9bc3 100644 --- a/pkg/storage/stores/indexshipper/compactor/deletion/request_handler_test.go +++ b/pkg/storage/stores/indexshipper/compactor/deletion/request_handler_test.go @@ -1,132 +1,134 @@ package deletion import ( + "context" + "fmt" "net/http" "net/http/httptest" - "path/filepath" + "strings" "testing" "time" + "github.com/pkg/errors" + "github.com/prometheus/common/model" "github.com/stretchr/testify/require" - "github.com/weaveworks/common/user" - "github.com/grafana/loki/pkg/storage/chunk/client/local" - "github.com/grafana/loki/pkg/storage/stores/indexshipper/storage" - "github.com/grafana/loki/pkg/validation" + "github.com/grafana/loki/pkg/util" + + "github.com/weaveworks/common/user" ) -func TestDeleteRequestHandlerDeletionMiddleware(t *testing.T) { - // build the store - tempDir := t.TempDir() +func TestAddDeleteRequestHandler(t *testing.T) { + t.Run("it adds the delete request to the store", func(t *testing.T) { + store := &mockDeleteRequestsStore{} + h := NewDeleteRequestHandler(store, time.Second, nil) - workingDir := filepath.Join(tempDir, "working-dir") - objectStorePath := filepath.Join(tempDir, "object-store") + req := buildRequest("org-id", `{foo="bar"}`, "0000000000", "0000000001") - objectClient, err := local.NewFSObjectClient(local.FSConfig{ - Directory: objectStorePath, - }) - require.NoError(t, err) - testDeleteRequestsStore, err := NewDeleteStore(workingDir, storage.NewIndexStorageClient(objectClient, "")) - require.NoError(t, err) - - // limits - fl := &fakeLimits{ - defaultLimit: retentionLimit{ - compactorDeletionEnabled: "disabled", - }, - perTenant: map[string]retentionLimit{ - "1": {compactorDeletionEnabled: "filter-only"}, - "2": {compactorDeletionEnabled: "disabled"}, - }, - } + w := httptest.NewRecorder() + h.AddDeleteRequestHandler(w, req) - // Setup handler - drh := NewDeleteRequestHandler(testDeleteRequestsStore, fl, nil) - middle := drh.deletionMiddleware(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {})) + require.Equal(t, w.Code, http.StatusNoContent) - // User that has deletion enabled - req := httptest.NewRequest(http.MethodGet, "http://www.your-domain.com", nil) - req = req.WithContext(user.InjectOrgID(req.Context(), "1")) - - res := httptest.NewRecorder() - middle.ServeHTTP(res, req) - - require.Equal(t, http.StatusOK, res.Result().StatusCode) + require.Equal(t, "org-id", store.addedUser) + require.Equal(t, `{foo="bar"}`, store.addedQuery) + require.Equal(t, toTime("0000000000"), store.addedStartTime) + require.Equal(t, toTime("0000000001"), store.addedEndTime) + }) - // User that does not have deletion enabled - req = httptest.NewRequest(http.MethodGet, "http://www.your-domain.com", nil) - req = req.WithContext(user.InjectOrgID(req.Context(), "2")) + t.Run("it works with RFC3339", func(t *testing.T) { + store := &mockDeleteRequestsStore{} + h := NewDeleteRequestHandler(store, time.Second, nil) - res = httptest.NewRecorder() - middle.ServeHTTP(res, req) + req := buildRequest("org-id", `{foo="bar"}`, "2006-01-02T15:04:05Z", "2006-01-03T15:04:05Z") - require.Equal(t, http.StatusForbidden, res.Result().StatusCode) + w := httptest.NewRecorder() + h.AddDeleteRequestHandler(w, req) - // User without override, this should use the default value which is false - req = httptest.NewRequest(http.MethodGet, "http://www.your-domain.com", nil) - req = req.WithContext(user.InjectOrgID(req.Context(), "3")) + require.Equal(t, w.Code, http.StatusNoContent) - res = httptest.NewRecorder() - middle.ServeHTTP(res, req) + require.Equal(t, "org-id", store.addedUser) + require.Equal(t, `{foo="bar"}`, store.addedQuery) + require.Equal(t, toTime("1136214245"), store.addedStartTime) + require.Equal(t, toTime("1136300645"), store.addedEndTime) + }) - require.Equal(t, http.StatusForbidden, res.Result().StatusCode) + t.Run("it fills in end time if blank", func(t *testing.T) { + store := &mockDeleteRequestsStore{} + h := NewDeleteRequestHandler(store, time.Second, nil) - // User without override, after the default value is set to true - fl.defaultLimit.compactorDeletionEnabled = "filter-and-delete" + req := buildRequest("org-id", `{foo="bar"}`, "0000000000", "") - req = httptest.NewRequest(http.MethodGet, "http://www.your-domain.com", nil) - req = req.WithContext(user.InjectOrgID(req.Context(), "3")) + w := httptest.NewRecorder() + h.AddDeleteRequestHandler(w, req) - res = httptest.NewRecorder() - middle.ServeHTTP(res, req) + require.Equal(t, w.Code, http.StatusNoContent) - require.Equal(t, http.StatusOK, res.Result().StatusCode) + require.Equal(t, "org-id", store.addedUser) + require.Equal(t, `{foo="bar"}`, store.addedQuery) + require.Equal(t, toTime("0000000000"), store.addedStartTime) + require.InDelta(t, int64(model.Now()), int64(store.addedEndTime), 1000) + }) - // User header is not given - req = httptest.NewRequest(http.MethodGet, "http://www.your-domain.com", nil) + t.Run("it returns 500 when the delete store errors", func(t *testing.T) { + store := &mockDeleteRequestsStore{addErr: errors.New("something bad")} + h := NewDeleteRequestHandler(store, time.Second, nil) - res = httptest.NewRecorder() - middle.ServeHTTP(res, req) + req := buildRequest("org-id", `{foo="bar"}`, "0000000000", "0000000001") - require.Equal(t, http.StatusBadRequest, res.Result().StatusCode) -} + w := httptest.NewRecorder() + h.AddDeleteRequestHandler(w, req) + require.Equal(t, w.Code, http.StatusInternalServerError) + }) -type retentionLimit struct { - compactorDeletionEnabled string - retentionPeriod time.Duration - streamRetention []validation.StreamRetention + t.Run("Validation", func(t *testing.T) { + h := NewDeleteRequestHandler(&mockDeleteRequestsStore{}, time.Second, nil) + + for _, tc := range []struct { + orgID, query, startTime, endTime, error string + }{ + {"", `{foo="bar"}`, "0000000000", "0000000001", "no org id\n"}, + {"org-id", "", "0000000000", "0000000001", "query not set\n"}, + {"org-id", `not a query`, "0000000000", "0000000001", "invalid query expression\n"}, + {"org-id", `{foo="bar"}`, "", "0000000001", "start time not set\n"}, + {"org-id", `{foo="bar"}`, "0000000000000", "0000000001", "invalid start time: require unix seconds or RFC3339 format\n"}, + {"org-id", `{foo="bar"}`, "0000000000", "0000000000001", "invalid end time: require unix seconds or RFC3339 format\n"}, + {"org-id", `{foo="bar"}`, "0000000000", fmt.Sprint(time.Now().Add(time.Hour).Unix())[:10], "deletes in the future are not allowed\n"}, + {"org-id", `{foo="bar"}`, "0000000001", "0000000000", "start time can't be greater than end time\n"}, + } { + t.Run(strings.TrimSpace(tc.error), func(t *testing.T) { + req := buildRequest(tc.orgID, tc.query, tc.startTime, tc.endTime) + + w := httptest.NewRecorder() + h.AddDeleteRequestHandler(w, req) + + require.Equal(t, w.Code, http.StatusBadRequest) + require.Equal(t, w.Body.String(), tc.error) + }) + } + }) } -func (r retentionLimit) convertToValidationLimit() *validation.Limits { - return &validation.Limits{ - DeletionMode: r.compactorDeletionEnabled, - RetentionPeriod: model.Duration(r.retentionPeriod), - StreamRetention: r.streamRetention, +func buildRequest(orgID, query, start, end string) *http.Request { + var req *http.Request + if orgID == "" { + req, _ = http.NewRequest(http.MethodGet, "", nil) + } else { + ctx := user.InjectOrgID(context.Background(), orgID) + req, _ = http.NewRequestWithContext(ctx, http.MethodGet, "", nil) } -} - -type fakeLimits struct { - defaultLimit retentionLimit - perTenant map[string]retentionLimit -} -func (f fakeLimits) RetentionPeriod(userID string) time.Duration { - return f.perTenant[userID].retentionPeriod -} + q := req.URL.Query() + q.Set("query", query) + q.Set("start", start) + q.Set("end", end) + req.URL.RawQuery = q.Encode() -func (f fakeLimits) StreamRetention(userID string) []validation.StreamRetention { - return f.perTenant[userID].streamRetention + return req } -func (f fakeLimits) DefaultLimits() *validation.Limits { - return f.defaultLimit.convertToValidationLimit() -} - -func (f fakeLimits) AllByUserID() map[string]*validation.Limits { - res := make(map[string]*validation.Limits) - for userID, ret := range f.perTenant { - res[userID] = ret.convertToValidationLimit() - } - return res +func toTime(t string) model.Time { + modelTime, _ := util.ParseTime(t) + return model.Time(modelTime) } diff --git a/pkg/storage/stores/indexshipper/compactor/deletion/tenant_delete_requests_client.go b/pkg/storage/stores/indexshipper/compactor/deletion/tenant_delete_requests_client.go index 08f7b3aa7a..29b6a56922 100644 --- a/pkg/storage/stores/indexshipper/compactor/deletion/tenant_delete_requests_client.go +++ b/pkg/storage/stores/indexshipper/compactor/deletion/tenant_delete_requests_client.go @@ -2,16 +2,20 @@ package deletion import ( "context" - - "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/retention" ) +const deletionNotAvailableMsg = "deletion is not available for this tenant" + +type Limits interface { + DeletionMode(userID string) string +} + type perTenantDeleteRequestsClient struct { client DeleteRequestsClient - limits retention.Limits + limits Limits } -func NewPerTenantDeleteRequestsClient(c DeleteRequestsClient, l retention.Limits) DeleteRequestsClient { +func NewPerTenantDeleteRequestsClient(c DeleteRequestsClient, l Limits) DeleteRequestsClient { return &perTenantDeleteRequestsClient{ client: c, limits: l, diff --git a/pkg/storage/stores/indexshipper/compactor/deletion/tenant_delete_requests_client_test.go b/pkg/storage/stores/indexshipper/compactor/deletion/tenant_delete_requests_client_test.go index c6d36671c2..3de51dc2ee 100644 --- a/pkg/storage/stores/indexshipper/compactor/deletion/tenant_delete_requests_client_test.go +++ b/pkg/storage/stores/indexshipper/compactor/deletion/tenant_delete_requests_client_test.go @@ -26,27 +26,6 @@ func TestTenantDeleteRequestsClient(t *testing.T) { require.Nil(t, err) require.Empty(t, reqs) }) - - t.Run("tenant disabled but default enabled", func(t *testing.T) { - limits.defaultLimit.compactorDeletionEnabled = "filter-only" - reqs, err := perTenantClient.GetAllDeleteRequestsForUser(context.Background(), "2") - require.Nil(t, err) - require.Empty(t, reqs) - }) - - t.Run("default is enabled", func(t *testing.T) { - limits.defaultLimit.compactorDeletionEnabled = "filter-and-delete" - reqs, err := perTenantClient.GetAllDeleteRequestsForUser(context.Background(), "3") - require.Nil(t, err) - require.Equal(t, []DeleteRequest{{RequestID: "test-request"}}, reqs) - }) - - t.Run("default is disabled", func(t *testing.T) { - limits.defaultLimit.compactorDeletionEnabled = "disabled" - reqs, err := perTenantClient.GetAllDeleteRequestsForUser(context.Background(), "3") - require.Nil(t, err) - require.Empty(t, reqs) - }) } type fakeRequestsClient struct { @@ -61,9 +40,9 @@ func (c *fakeRequestsClient) GetAllDeleteRequestsForUser(_ context.Context, user var ( limits = &fakeLimits{ - perTenant: map[string]retentionLimit{ - "1": {compactorDeletionEnabled: "filter-only"}, - "2": {compactorDeletionEnabled: "disabled"}, + limits: map[string]string{ + "1": "filter-only", + "2": "disabled", }, } ) diff --git a/pkg/storage/stores/indexshipper/compactor/deletion/tenant_request_handler.go b/pkg/storage/stores/indexshipper/compactor/deletion/tenant_request_handler.go new file mode 100644 index 0000000000..0af62baa13 --- /dev/null +++ b/pkg/storage/stores/indexshipper/compactor/deletion/tenant_request_handler.go @@ -0,0 +1,31 @@ +package deletion + +import ( + "net/http" + + "github.com/grafana/dskit/tenant" +) + +func TenantMiddleware(limits Limits, next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + userID, err := tenant.TenantID(ctx) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + hasDelete, err := validDeletionLimit(limits, userID) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + if !hasDelete { + http.Error(w, deletionNotAvailableMsg, http.StatusForbidden) + return + } + + next.ServeHTTP(w, r) + }) +} diff --git a/pkg/storage/stores/indexshipper/compactor/deletion/tenant_request_handler_test.go b/pkg/storage/stores/indexshipper/compactor/deletion/tenant_request_handler_test.go new file mode 100644 index 0000000000..1577c596ba --- /dev/null +++ b/pkg/storage/stores/indexshipper/compactor/deletion/tenant_request_handler_test.go @@ -0,0 +1,83 @@ +package deletion + +import ( + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/retention" + + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + "github.com/weaveworks/common/user" + + "github.com/grafana/loki/pkg/validation" +) + +func TestDeleteRequestHandlerDeletionMiddleware(t *testing.T) { + fl := &fakeLimits{ + limits: map[string]string{ + "1": "filter-only", + "2": "disabled", + }, + } + + // Setup handler + middle := TenantMiddleware(fl, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {})) + + // User that has deletion enabled + req := httptest.NewRequest(http.MethodGet, "http://www.your-domain.com", nil) + req = req.WithContext(user.InjectOrgID(req.Context(), "1")) + + res := httptest.NewRecorder() + middle.ServeHTTP(res, req) + + require.Equal(t, http.StatusOK, res.Result().StatusCode) + + // User that does not have deletion enabled + req = httptest.NewRequest(http.MethodGet, "http://www.your-domain.com", nil) + req = req.WithContext(user.InjectOrgID(req.Context(), "2")) + + res = httptest.NewRecorder() + middle.ServeHTTP(res, req) + + require.Equal(t, http.StatusForbidden, res.Result().StatusCode) + + // User header is not given + req = httptest.NewRequest(http.MethodGet, "http://www.your-domain.com", nil) + + res = httptest.NewRecorder() + middle.ServeHTTP(res, req) + + require.Equal(t, http.StatusBadRequest, res.Result().StatusCode) +} + +type retentionLimit struct { + compactorDeletionEnabled string + retentionPeriod time.Duration + streamRetention []validation.StreamRetention +} + +func (r retentionLimit) convertToValidationLimit() *validation.Limits { + return &validation.Limits{ + DeletionMode: r.compactorDeletionEnabled, + RetentionPeriod: model.Duration(r.retentionPeriod), + StreamRetention: r.streamRetention, + } +} + +type fakeLimits struct { + retention.Limits + + limits map[string]string + mode string +} + +func (f *fakeLimits) DeletionMode(userID string) string { + if f.mode != "" { + return f.mode + } + + return f.limits[userID] +} diff --git a/pkg/storage/stores/indexshipper/compactor/deletion/util.go b/pkg/storage/stores/indexshipper/compactor/deletion/util.go index ad6f70647b..9ab3be3a7f 100644 --- a/pkg/storage/stores/indexshipper/compactor/deletion/util.go +++ b/pkg/storage/stores/indexshipper/compactor/deletion/util.go @@ -5,8 +5,6 @@ import ( "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/deletionmode" - "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/retention" - "github.com/grafana/loki/pkg/logql/syntax" ) @@ -24,7 +22,7 @@ func parseDeletionQuery(query string) (syntax.LogSelectorExpr, error) { return logSelectorExpr, nil } -func validDeletionLimit(l retention.Limits, userID string) (bool, error) { +func validDeletionLimit(l Limits, userID string) (bool, error) { mode, err := deleteModeFromLimits(l, userID) if err != nil { return false, err @@ -33,10 +31,7 @@ func validDeletionLimit(l retention.Limits, userID string) (bool, error) { return mode.DeleteEnabled(), nil } -func deleteModeFromLimits(l retention.Limits, userID string) (deletionmode.Mode, error) { - allLimits := l.AllByUserID() - if userLimits, ok := allLimits[userID]; ok { - return deletionmode.ParseMode(userLimits.DeletionMode) - } - return deletionmode.ParseMode(l.DefaultLimits().DeletionMode) +func deleteModeFromLimits(l Limits, userID string) (deletionmode.Mode, error) { + mode := l.DeletionMode(userID) + return deletionmode.ParseMode(mode) }