diff --git a/docs/sources/api/_index.md b/docs/sources/api/_index.md index 0c46e60ca4..0984f8e82e 100644 --- a/docs/sources/api/_index.md +++ b/docs/sources/api/_index.md @@ -1036,6 +1036,7 @@ Query parameters: * `query=`: query argument that identifies the streams from which to delete with optional line filters. * `start=`: A timestamp that identifies the start of the time window within which entries will be deleted. This parameter is required. * `end=`: A timestamp that identifies the end of the time window within which entries will be deleted. If not specified, defaults to the current time. +* `max_interval=`: The maximum time period the delete request can span. If the request is larger than this value, it is split into several requests of <= `max_interval`. Valid time units are "s", "m", "h". A 204 response indicates success. @@ -1102,10 +1103,13 @@ curl -u "Tenant1:$API_TOKEN" \ DELETE /loki/api/v1/delete ``` +Query Parameters: +* `force=`: When the `force` query parameter is true, partially completed delete requests will be canceled. NOTE: some data from the request may still be deleted. + Remove a delete request for the authenticated tenant. The [log entry deletion](../operations/storage/logs-deletion/) documentation has configuration details. -Loki allows cancellation of delete requests until the requests are picked up for processing. It is controlled by the `delete_request_cancel_period` YAML configuration or the equivalent command line option when invoking Loki. +Loki allows cancellation of delete requests until the requests are picked up for processing. It is controlled by the `delete_request_cancel_period` YAML configuration or the equivalent command line option when invoking Loki. To cancel a delete request that has been picked up for processing or is partially complete, pass the `force=true` query parameter to the API. Log entry deletion is supported _only_ when the BoltDB Shipper is configured for the index store. diff --git a/docs/sources/configuration/_index.md b/docs/sources/configuration/_index.md index 34ed2c229c..7dbb3faef1 100644 --- a/docs/sources/configuration/_index.md +++ b/docs/sources/configuration/_index.md @@ -2135,6 +2135,13 @@ compacts index shards to more performant forms. # CLI flag: -boltdb.shipper.compactor.delete-request-cancel-period [delete_request_cancel_period: | default = 24h] +# Constrain the size a delete request. When a delete request that spans > delete_max_query_range +# is input, the request is sharded into smaller requests of no more than delete_max_query_range. +# +# 0 means no max_query_period. +# CLI flag: -boltdb.shipper.compactor.delete-max-interval +[delete_max_interval: | default = 0] + # The max number of delete requests to run per compaction cycle. # CLI flag: -boltdb.shipper.compactor.delete-batch-size [delete_batch_size: | default = 70] diff --git a/pkg/storage/stores/indexshipper/compactor/compactor.go b/pkg/storage/stores/indexshipper/compactor/compactor.go index 147f1fd9fb..6d6ff77438 100644 --- a/pkg/storage/stores/indexshipper/compactor/compactor.go +++ b/pkg/storage/stores/indexshipper/compactor/compactor.go @@ -82,6 +82,7 @@ type Config struct { RetentionTableTimeout time.Duration `yaml:"retention_table_timeout"` DeleteBatchSize int `yaml:"delete_batch_size"` DeleteRequestCancelPeriod time.Duration `yaml:"delete_request_cancel_period"` + DeleteMaxInterval time.Duration `yaml:"delete_max_interval"` MaxCompactionParallelism int `yaml:"max_compaction_parallelism"` CompactorRing util.RingConfig `yaml:"compactor_ring,omitempty"` RunOnce bool `yaml:"-"` @@ -102,6 +103,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.RetentionDeleteWorkCount, "boltdb.shipper.compactor.retention-delete-worker-count", 150, "The total amount of worker to use to delete chunks.") f.IntVar(&cfg.DeleteBatchSize, "boltdb.shipper.compactor.delete-batch-size", 70, "The max number of delete requests to run per compaction cycle.") f.DurationVar(&cfg.DeleteRequestCancelPeriod, "boltdb.shipper.compactor.delete-request-cancel-period", 24*time.Hour, "Allow cancellation of delete request until duration after they are created. Data would be deleted only after delete requests have been older than this duration. Ideally this should be set to at least 24h.") + f.DurationVar(&cfg.DeleteMaxInterval, "boltdb.shipper.compactor.delete-max-interval", 0, "Constrain the size of any single delete request. When a delete request > delete_max_query_range is input, the request is sharded into smaller requests of no more than delete_max_interval") f.DurationVar(&cfg.RetentionTableTimeout, "boltdb.shipper.compactor.retention-table-timeout", 0, "The maximum amount of time to spend running retention and deletion on any given table in the index.") f.IntVar(&cfg.MaxCompactionParallelism, "boltdb.shipper.compactor.max-compaction-parallelism", 1, "Maximum number of tables to compact in parallel. While increasing this value, please make sure compactor has enough disk space allocated to be able to store and compact as many tables.") f.BoolVar(&cfg.RunOnce, "boltdb.shipper.compactor.run-once", false, "Run the compactor one time to cleanup and compact index files only (no retention applied)") @@ -271,7 +273,7 @@ func (c *Compactor) initDeletes(r prometheus.Registerer, limits *validation.Over c.DeleteRequestsHandler = deletion.NewDeleteRequestHandler( c.deleteRequestsStore, - c.cfg.DeleteRequestCancelPeriod, + c.cfg.DeleteMaxInterval, r, ) diff --git a/pkg/storage/stores/indexshipper/compactor/deletion/delete_request.go b/pkg/storage/stores/indexshipper/compactor/deletion/delete_request.go index a0ecdeead4..ef0dae70d1 100644 --- a/pkg/storage/stores/indexshipper/compactor/deletion/delete_request.go +++ b/pkg/storage/stores/indexshipper/compactor/deletion/delete_request.go @@ -20,6 +20,7 @@ type DeleteRequest struct { CreatedAt model.Time `json:"created_at"` UserID string `json:"-"` + SequenceNum int64 `json:"-"` matchers []*labels.Matcher `json:"-"` logSelectorExpr syntax.LogSelectorExpr `json:"-"` diff --git a/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_manager.go b/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_manager.go index c773bf0502..59e63353d4 100644 --- a/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_manager.go +++ b/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_manager.go @@ -3,6 +3,7 @@ package deletion import ( "context" "fmt" + "sort" "sync" "time" @@ -122,7 +123,7 @@ func (d *DeleteRequestsManager) loadDeleteRequestsToProcess() error { // Reset this first so any errors result in a clear map d.deleteRequestsToProcess = map[string]*userDeleteRequests{} - deleteRequests, err := d.filteredDeleteRequests() + deleteRequests, err := d.filteredSortedDeleteRequests() if err != nil { return err } @@ -154,13 +155,22 @@ func (d *DeleteRequestsManager) loadDeleteRequestsToProcess() error { return nil } -func (d *DeleteRequestsManager) filteredDeleteRequests() ([]DeleteRequest, error) { +func (d *DeleteRequestsManager) filteredSortedDeleteRequests() ([]DeleteRequest, error) { deleteRequests, err := d.deleteRequestsStore.GetDeleteRequestsByStatus(context.Background(), StatusReceived) if err != nil { return nil, err } - return d.filteredRequests(deleteRequests) + deleteRequests, err = d.filteredRequests(deleteRequests) + if err != nil { + return nil, err + } + + sort.Slice(deleteRequests, func(i, j int) bool { + return deleteRequests[i].StartTime < deleteRequests[j].StartTime + }) + + return deleteRequests, nil } func (d *DeleteRequestsManager) filteredRequests(reqs []DeleteRequest) ([]DeleteRequest, error) { @@ -311,7 +321,7 @@ func (d *DeleteRequestsManager) MarkPhaseFinished() { } for _, deleteRequest := range userDeleteRequests.requests { - if err := d.deleteRequestsStore.UpdateStatus(context.Background(), deleteRequest.UserID, deleteRequest.RequestID, StatusProcessed); err != nil { + if err := d.deleteRequestsStore.UpdateStatus(context.Background(), deleteRequest, StatusProcessed); err != nil { level.Error(util_log.Logger).Log( "msg", "failed to mark delete request for user as processed", "delete_request_id", deleteRequest.RequestID, diff --git a/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_manager_test.go b/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_manager_test.go index 323fa002be..63e77ade6b 100644 --- a/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_manager_test.go +++ b/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_manager_test.go @@ -378,33 +378,33 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { }, }, { - name: "Deletes are limited by batch size", + name: "Deletes are sorted by start time and limited by batch size", deletionMode: deletionmode.FilterAndDelete, batchSize: 2, deleteRequestsFromStore: []DeleteRequest{ { UserID: testUserID, Query: lblFoo.String(), - StartTime: now.Add(-13 * time.Hour), - EndTime: now.Add(-11 * time.Hour), + StartTime: now.Add(-2 * time.Hour), + EndTime: now, }, { UserID: testUserID, Query: lblFoo.String(), - StartTime: now.Add(-10 * time.Hour), - EndTime: now.Add(-8 * time.Hour), + StartTime: now.Add(-6 * time.Hour), + EndTime: now.Add(-5 * time.Hour), }, { UserID: testUserID, Query: lblFoo.String(), - StartTime: now.Add(-6 * time.Hour), - EndTime: now.Add(-5 * time.Hour), + StartTime: now.Add(-10 * time.Hour), + EndTime: now.Add(-8 * time.Hour), }, { UserID: testUserID, Query: lblFoo.String(), - StartTime: now.Add(-2 * time.Hour), - EndTime: now, + StartTime: now.Add(-13 * time.Hour), + EndTime: now.Add(-11 * time.Hour), }, }, expectedResp: resp{ @@ -486,21 +486,43 @@ func TestDeleteRequestsManager_IntervalMayHaveExpiredChunks(t *testing.T) { type mockDeleteRequestsStore struct { DeleteRequestsStore deleteRequests []DeleteRequest - addedUser string - addedStartTime model.Time - addedEndTime model.Time - addedQuery string + addReqs []DeleteRequest addErr error + + removeReqs []DeleteRequest + removeErr error + + getUser string + getID string + getResult []DeleteRequest + getErr error + + getAllUser string + getAllResult []DeleteRequest + getAllErr error } func (m *mockDeleteRequestsStore) GetDeleteRequestsByStatus(_ context.Context, _ DeleteRequestStatus) ([]DeleteRequest, error) { return m.deleteRequests, nil } -func (m *mockDeleteRequestsStore) AddDeleteRequest(ctx context.Context, userID string, startTime, endTime model.Time, query string) error { - m.addedUser = userID - m.addedStartTime = startTime - m.addedEndTime = endTime - m.addedQuery = query - return m.addErr +func (m *mockDeleteRequestsStore) AddDeleteRequestGroup(ctx context.Context, reqs []DeleteRequest) ([]DeleteRequest, error) { + m.addReqs = reqs + return nil, m.addErr +} + +func (m *mockDeleteRequestsStore) RemoveDeleteRequests(ctx context.Context, reqs []DeleteRequest) error { + m.removeReqs = reqs + return m.removeErr +} + +func (m *mockDeleteRequestsStore) GetDeleteRequestGroup(ctx context.Context, userID, requestID string) ([]DeleteRequest, error) { + m.getUser = userID + m.getID = requestID + return m.getResult, m.getErr +} + +func (m *mockDeleteRequestsStore) GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]DeleteRequest, error) { + m.getAllUser = userID + return m.getAllResult, m.getAllErr } diff --git a/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_store.go b/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_store.go index 6ec62bf794..7190d900df 100644 --- a/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_store.go +++ b/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_store.go @@ -37,15 +37,15 @@ const ( DeleteRequestsTableName = "delete_requests" ) -var ErrDeleteRequestNotFound = errors.New("could not find matching delete request") +var ErrDeleteRequestNotFound = errors.New("could not find matching delete requests") type DeleteRequestsStore interface { - AddDeleteRequest(ctx context.Context, userID string, startTime, endTime model.Time, query string) error + AddDeleteRequestGroup(ctx context.Context, req []DeleteRequest) ([]DeleteRequest, error) GetDeleteRequestsByStatus(ctx context.Context, status DeleteRequestStatus) ([]DeleteRequest, error) GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]DeleteRequest, error) - UpdateStatus(ctx context.Context, userID, requestID string, newStatus DeleteRequestStatus) error - GetDeleteRequest(ctx context.Context, userID, requestID string) (*DeleteRequest, error) - RemoveDeleteRequest(ctx context.Context, userID, requestID string, createdAt, startTime, endTime model.Time) error + UpdateStatus(ctx context.Context, req DeleteRequest, newStatus DeleteRequestStatus) error + GetDeleteRequestGroup(ctx context.Context, userID, requestID string) ([]DeleteRequest, error) + RemoveDeleteRequests(ctx context.Context, req []DeleteRequest) error GetCacheGenerationNumber(ctx context.Context, userID string) (string, error) Stop() Name() string @@ -54,6 +54,7 @@ type DeleteRequestsStore interface { // deleteRequestsStore provides all the methods required to manage lifecycle of delete request and things related to it. type deleteRequestsStore struct { indexClient index.Client + now func() model.Time } // NewDeleteStore creates a store for managing delete requests. @@ -63,63 +64,102 @@ func NewDeleteStore(workingDirectory string, indexStorageClient storage.Client) return nil, err } - return &deleteRequestsStore{indexClient: indexClient}, nil -} - -func NewDeleteStoreFromIndexClient(ic index.Client) DeleteRequestsStore { - return &deleteRequestsStore{ic} + return &deleteRequestsStore{ + indexClient: indexClient, + now: model.Now, + }, nil } func (ds *deleteRequestsStore) Stop() { ds.indexClient.Stop() } -// AddDeleteRequest creates entries for a new delete request. -func (ds *deleteRequestsStore) AddDeleteRequest(ctx context.Context, userID string, startTime, endTime model.Time, query string) error { - _, err := ds.addDeleteRequest(ctx, userID, model.Now(), startTime, endTime, query) - return err -} +// AddDeleteRequestGroup creates entries for new delete requests. All passed delete requests will be associated to +// each other by request id +func (ds *deleteRequestsStore) AddDeleteRequestGroup(ctx context.Context, reqs []DeleteRequest) ([]DeleteRequest, error) { + if len(reqs) == 0 { + return nil, nil + } -// addDeleteRequest is also used for tests to create delete requests with different createdAt time. -func (ds *deleteRequestsStore) addDeleteRequest(ctx context.Context, userID string, createdAt, startTime, endTime model.Time, query string) ([]byte, error) { - requestID := generateUniqueID(userID, query) + createdAt := ds.now() + writeBatch := ds.indexClient.NewWriteBatch() + requestID, err := ds.generateID(ctx, reqs[0]) + if err != nil { + return nil, err + } - for { - _, err := ds.GetDeleteRequest(ctx, userID, string(requestID)) + var results []DeleteRequest + for i, req := range reqs { + newReq, err := newRequest(req, requestID, createdAt, i) if err != nil { - if err == ErrDeleteRequestNotFound { - break - } return nil, err } - // we have a collision here, lets recreate a new requestID and check for collision - time.Sleep(time.Millisecond) - requestID = generateUniqueID(userID, query) + results = append(results, newReq) + ds.writeDeleteRequest(newReq, writeBatch) } - // userID, requestID - userIDAndRequestID := fmt.Sprintf("%s:%s", userID, requestID) + if err := ds.indexClient.BatchWrite(ctx, writeBatch); err != nil { + return nil, err + } - // Add an entry with userID, requestID as range key and status as value to make it easy to manage and lookup status - // We don't want to set anything in hash key here since we would want to find delete requests by just status - writeBatch := ds.indexClient.NewWriteBatch() + return results, nil +} + +func newRequest(req DeleteRequest, requestID []byte, createdAt model.Time, seqNumber int) (DeleteRequest, error) { + req.RequestID = string(requestID) + req.Status = StatusReceived + req.CreatedAt = createdAt + req.SequenceNum = int64(seqNumber) + if err := req.SetQuery(req.Query); err != nil { + return DeleteRequest{}, err + } + return req, nil +} + +func (ds *deleteRequestsStore) writeDeleteRequest(req DeleteRequest, writeBatch index.WriteBatch) { + userIDAndRequestID := backwardCompatibleDeleteRequestHash(req.UserID, req.RequestID, req.SequenceNum) + + // Add an entry with userID, requestID, and sequence number as range key and status as value to make it easy + // to manage and lookup status. We don't want to set anything in hash key here since we would want to find + // delete requests by just status writeBatch.Add(DeleteRequestsTableName, string(deleteRequestID), []byte(userIDAndRequestID), []byte(StatusReceived)) // Add another entry with additional details like creation time, time range of delete request and the logQL requests in value - rangeValue := fmt.Sprintf("%x:%x:%x", int64(createdAt), int64(startTime), int64(endTime)) - writeBatch.Add(DeleteRequestsTableName, fmt.Sprintf("%s:%s", deleteRequestDetails, userIDAndRequestID), - []byte(rangeValue), []byte(query)) + rangeValue := fmt.Sprintf("%x:%x:%x", int64(ds.now()), int64(req.StartTime), int64(req.EndTime)) + writeBatch.Add(DeleteRequestsTableName, fmt.Sprintf("%s:%s", deleteRequestDetails, userIDAndRequestID), []byte(rangeValue), []byte(req.Query)) // create a gen number for this result - writeBatch.Add(DeleteRequestsTableName, fmt.Sprintf("%s:%s", cacheGenNum, userID), []byte{}, generateCacheGenNumber()) + writeBatch.Add(DeleteRequestsTableName, fmt.Sprintf("%s:%s", cacheGenNum, req.UserID), []byte{}, generateCacheGenNumber()) +} - err := ds.indexClient.BatchWrite(ctx, writeBatch) - if err != nil { - return nil, err +// backwardCompatibleDeleteRequestHash generates the hash key for a delete request. +// Sequence numbers were added after deletion was in production so any requests made +// before then won't have one. Ensure backward compatibility by treating the 0th +// sequence number as the old format without any number. As a consequence, the 0th +// sequence number will also be ignored for any new delete requests. +func backwardCompatibleDeleteRequestHash(userID, requestID string, sequenceNumber int64) string { + if sequenceNumber == 0 { + return fmt.Sprintf("%s:%s", userID, requestID) } + return fmt.Sprintf("%s:%s:%d", userID, requestID, sequenceNumber) +} + +func (ds *deleteRequestsStore) generateID(ctx context.Context, req DeleteRequest) ([]byte, error) { + requestID := generateUniqueID(req.UserID, req.Query) - return requestID, nil + for { + if _, err := ds.GetDeleteRequestGroup(ctx, req.UserID, string(requestID)); err != nil { + if err == ErrDeleteRequestNotFound { + return requestID, nil + } + return nil, err + } + + // we have a collision here, lets recreate a new requestID and check for collision + time.Sleep(time.Millisecond) + requestID = generateUniqueID(req.UserID, req.Query) + } } // GetDeleteRequestsByStatus returns all delete requests for given status. @@ -141,22 +181,22 @@ func (ds *deleteRequestsStore) GetAllDeleteRequestsForUser(ctx context.Context, } // UpdateStatus updates status of a delete request. -func (ds *deleteRequestsStore) UpdateStatus(ctx context.Context, userID, requestID string, newStatus DeleteRequestStatus) error { - userIDAndRequestID := fmt.Sprintf("%s:%s", userID, requestID) +func (ds *deleteRequestsStore) UpdateStatus(ctx context.Context, req DeleteRequest, newStatus DeleteRequestStatus) error { + userIDAndRequestID := backwardCompatibleDeleteRequestHash(req.UserID, req.RequestID, req.SequenceNum) writeBatch := ds.indexClient.NewWriteBatch() writeBatch.Add(DeleteRequestsTableName, string(deleteRequestID), []byte(userIDAndRequestID), []byte(newStatus)) if newStatus == StatusProcessed { // remove runtime filtering for deleted data - writeBatch.Add(DeleteRequestsTableName, fmt.Sprintf("%s:%s", cacheGenNum, userID), []byte{}, generateCacheGenNumber()) + writeBatch.Add(DeleteRequestsTableName, fmt.Sprintf("%s:%s", cacheGenNum, req.UserID), []byte{}, generateCacheGenNumber()) } return ds.indexClient.BatchWrite(ctx, writeBatch) } -// GetDeleteRequest returns delete request with given requestID. -func (ds *deleteRequestsStore) GetDeleteRequest(ctx context.Context, userID, requestID string) (*DeleteRequest, error) { +// GetDeleteRequestGroup returns delete requests with given requestID. +func (ds *deleteRequestsStore) GetDeleteRequestGroup(ctx context.Context, userID, requestID string) ([]DeleteRequest, error) { userIDAndRequestID := fmt.Sprintf("%s:%s", userID, requestID) deleteRequests, err := ds.queryDeleteRequests(ctx, index.Query{ @@ -172,7 +212,7 @@ func (ds *deleteRequestsStore) GetDeleteRequest(ctx context.Context, userID, req return nil, ErrDeleteRequestNotFound } - return &deleteRequests[0], nil + return deleteRequests, nil } func (ds *deleteRequestsStore) GetCacheGenerationNumber(ctx context.Context, userID string) (string, error) { @@ -197,9 +237,9 @@ func (ds *deleteRequestsStore) GetCacheGenerationNumber(ctx context.Context, use } func (ds *deleteRequestsStore) queryDeleteRequests(ctx context.Context, deleteQuery index.Query) ([]DeleteRequest, error) { - deleteRequests := []DeleteRequest{} - // No need to lock inside the callback since we run a single index query. + var deleteRequests []DeleteRequest err := ds.indexClient.QueryPages(ctx, []index.Query{deleteQuery}, func(query index.Query, batch index.ReadBatchResult) (shouldContinue bool) { + // No need to lock inside the callback since we run a single index query. itr := batch.Iterator() for itr.Next() { userID, requestID := splitUserIDAndRequestID(string(itr.RangeValue())) @@ -216,63 +256,85 @@ func (ds *deleteRequestsStore) queryDeleteRequests(ctx context.Context, deleteQu return nil, err } - for i, deleteRequest := range deleteRequests { - deleteRequestQuery := []index.Query{ - { - TableName: DeleteRequestsTableName, - HashValue: fmt.Sprintf("%s:%s:%s", deleteRequestDetails, deleteRequest.UserID, deleteRequest.RequestID), - }, - } - - var parseError error - err := ds.indexClient.QueryPages(ctx, deleteRequestQuery, func(query index.Query, batch index.ReadBatchResult) (shouldContinue bool) { - itr := batch.Iterator() - itr.Next() + return ds.deleteRequestsWithDetails(ctx, deleteRequests) +} - deleteRequest, err = parseDeleteRequestTimestamps(itr.RangeValue(), deleteRequest) +func (ds *deleteRequestsStore) deleteRequestsWithDetails(ctx context.Context, partialDeleteRequests []DeleteRequest) ([]DeleteRequest, error) { + deleteRequests := make([]DeleteRequest, 0, len(partialDeleteRequests)) + for _, group := range partitionByRequestID(partialDeleteRequests) { + for i, deleteRequest := range group { + deleteRequest.SequenceNum = int64(i) + requestWithDetails, err := ds.queryDeleteRequestDetails(ctx, deleteRequest) if err != nil { - parseError = err - return false + return nil, err } + deleteRequests = append(deleteRequests, requestWithDetails) + } + } + return deleteRequests, nil +} - err = deleteRequest.SetQuery(string(itr.Value())) - if err != nil { - parseError = err - return false - } - deleteRequests[i] = deleteRequest +func (ds *deleteRequestsStore) queryDeleteRequestDetails(ctx context.Context, deleteRequest DeleteRequest) (DeleteRequest, error) { + userIDAndRequestID := backwardCompatibleDeleteRequestHash(deleteRequest.UserID, deleteRequest.RequestID, deleteRequest.SequenceNum) + deleteRequestQuery := []index.Query{ + { + TableName: DeleteRequestsTableName, + HashValue: fmt.Sprintf("%s:%s", deleteRequestDetails, userIDAndRequestID), + }, + } - return true - }) - if err != nil { - return nil, err + var marshalError error + var requestWithDetails DeleteRequest + err := ds.indexClient.QueryPages(ctx, deleteRequestQuery, func(query index.Query, batch index.ReadBatchResult) (shouldContinue bool) { + if requestWithDetails, marshalError = unmarshalDeleteRequestDetails(batch.Iterator(), deleteRequest); marshalError != nil { + return false } - if parseError != nil { - return nil, parseError - } + return true + }) + if err != nil || marshalError != nil { + return DeleteRequest{}, err } - return deleteRequests, nil + return requestWithDetails, nil } -// RemoveDeleteRequest removes a delete request -func (ds *deleteRequestsStore) RemoveDeleteRequest(ctx context.Context, userID, requestID string, createdAt, startTime, endTime model.Time) error { - userIDAndRequestID := fmt.Sprintf("%s:%s", userID, requestID) +func unmarshalDeleteRequestDetails(itr index.ReadBatchIterator, req DeleteRequest) (DeleteRequest, error) { + itr.Next() + requestWithDetails, err := parseDeleteRequestTimestamps(itr.RangeValue(), req) + if err != nil { + return DeleteRequest{}, nil + } + + if err = requestWithDetails.SetQuery(string(itr.Value())); err != nil { + return DeleteRequest{}, err + } + + return requestWithDetails, nil +} + +// RemoveDeleteRequests the passed delete requests +func (ds *deleteRequestsStore) RemoveDeleteRequests(ctx context.Context, reqs []DeleteRequest) error { writeBatch := ds.indexClient.NewWriteBatch() + + for _, r := range reqs { + ds.removeRequest(r, writeBatch) + } + + return ds.indexClient.BatchWrite(ctx, writeBatch) +} + +func (ds *deleteRequestsStore) removeRequest(req DeleteRequest, writeBatch index.WriteBatch) { + userIDAndRequestID := backwardCompatibleDeleteRequestHash(req.UserID, req.RequestID, req.SequenceNum) writeBatch.Delete(DeleteRequestsTableName, string(deleteRequestID), []byte(userIDAndRequestID)) // Add another entry with additional details like creation time, time range of delete request and selectors in value - rangeValue := fmt.Sprintf("%x:%x:%x", int64(createdAt), int64(startTime), int64(endTime)) - writeBatch.Delete(DeleteRequestsTableName, fmt.Sprintf("%s:%s", deleteRequestDetails, userIDAndRequestID), - []byte(rangeValue)) + rangeValue := fmt.Sprintf("%x:%x:%x", int64(req.CreatedAt), int64(req.StartTime), int64(req.EndTime)) + writeBatch.Delete(DeleteRequestsTableName, fmt.Sprintf("%s:%s", deleteRequestDetails, userIDAndRequestID), []byte(rangeValue)) // ensure caches are invalidated - writeBatch.Add(DeleteRequestsTableName, fmt.Sprintf("%s:%s", cacheGenNum, userID), - []byte{}, []byte(strconv.FormatInt(time.Now().UnixNano(), 10))) - - return ds.indexClient.BatchWrite(ctx, writeBatch) + writeBatch.Add(DeleteRequestsTableName, fmt.Sprintf("%s:%s", cacheGenNum, req.UserID), []byte{}, []byte(strconv.FormatInt(time.Now().UnixNano(), 10))) } func (ds *deleteRequestsStore) Name() string { @@ -329,12 +391,8 @@ func encodeUniqueID(t uint32) []byte { } func splitUserIDAndRequestID(rangeValue string) (userID, requestID string) { - lastIndex := strings.LastIndex(rangeValue, ":") - - userID = rangeValue[:lastIndex] - requestID = rangeValue[lastIndex+1:] - - return + parts := strings.Split(rangeValue, ":") + return parts[0], parts[1] } // unsafeGetString is like yolostring but with a meaningful name diff --git a/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_store_test.go b/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_store_test.go index f089ecc78a..d56d584d4a 100644 --- a/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_store_test.go +++ b/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_store_test.go @@ -16,169 +16,184 @@ import ( ) func TestDeleteRequestsStore(t *testing.T) { - now := model.Now() - user1 := "user1" - user2 := "user2" - - // build some test requests to add to the store - var user1ExpectedRequests []DeleteRequest - var user2ExpectedRequests []DeleteRequest - for i := time.Duration(1); i <= 24; i++ { - user1ExpectedRequests = append(user1ExpectedRequests, DeleteRequest{ - UserID: user1, - StartTime: now.Add(-i * time.Hour), - EndTime: now.Add(-i * time.Hour).Add(30 * time.Minute), - CreatedAt: now.Add(-i * time.Hour).Add(30 * time.Minute), - Query: fmt.Sprintf(`{foo="%d", user="%s"}`, i, user1), - Status: StatusReceived, - }) - user2ExpectedRequests = append(user2ExpectedRequests, DeleteRequest{ - UserID: user2, - StartTime: now.Add(-i * time.Hour), - EndTime: now.Add(-(i + 1) * time.Hour), - CreatedAt: now.Add(-(i + 1) * time.Hour), - Query: fmt.Sprintf(`{foo="%d", user="%s"}`, i, user2), - Status: StatusReceived, - }) - } - - // build the store - tempDir := t.TempDir() - - workingDir := filepath.Join(tempDir, "working-dir") - objectStorePath := filepath.Join(tempDir, "object-store") - - objectClient, err := local.NewFSObjectClient(local.FSConfig{ - Directory: objectStorePath, - }) - require.NoError(t, err) - testDeleteRequestsStore, err := NewDeleteStore(workingDir, storage.NewIndexStorageClient(objectClient, "")) - require.NoError(t, err) - - defer testDeleteRequestsStore.Stop() + tc := setup(t) + defer tc.store.Stop() // add requests for both the users to the store - for i := 0; i < len(user1ExpectedRequests); i++ { - requestID, err := testDeleteRequestsStore.(*deleteRequestsStore).addDeleteRequest( + for i := 0; i < len(tc.user1Requests); i++ { + resp, err := tc.store.AddDeleteRequestGroup( context.Background(), - user1ExpectedRequests[i].UserID, - user1ExpectedRequests[i].CreatedAt, - user1ExpectedRequests[i].StartTime, - user1ExpectedRequests[i].EndTime, - user1ExpectedRequests[i].Query, + []DeleteRequest{tc.user1Requests[i]}, ) require.NoError(t, err) - user1ExpectedRequests[i].RequestID = string(requestID) - require.NoError(t, user1ExpectedRequests[i].SetQuery(user1ExpectedRequests[i].Query)) + tc.user1Requests[i] = resp[0] - requestID, err = testDeleteRequestsStore.(*deleteRequestsStore).addDeleteRequest( + resp, err = tc.store.AddDeleteRequestGroup( context.Background(), - user2ExpectedRequests[i].UserID, - user2ExpectedRequests[i].CreatedAt, - user2ExpectedRequests[i].StartTime, - user2ExpectedRequests[i].EndTime, - user2ExpectedRequests[i].Query, + []DeleteRequest{tc.user2Requests[i]}, ) require.NoError(t, err) - user2ExpectedRequests[i].RequestID = string(requestID) - require.NoError(t, user2ExpectedRequests[i].SetQuery(user2ExpectedRequests[i].Query)) + tc.user2Requests[i] = resp[0] } // get all requests with StatusReceived and see if they have expected values - deleteRequests, err := testDeleteRequestsStore.GetDeleteRequestsByStatus(context.Background(), StatusReceived) + deleteRequests, err := tc.store.GetDeleteRequestsByStatus(context.Background(), StatusReceived) require.NoError(t, err) - compareRequests(t, append(user1ExpectedRequests, user2ExpectedRequests...), deleteRequests) + compareRequests(t, append(tc.user1Requests, tc.user2Requests...), deleteRequests) // get user specific requests and see if they have expected values - user1Requests, err := testDeleteRequestsStore.GetAllDeleteRequestsForUser(context.Background(), user1) + user1Requests, err := tc.store.GetAllDeleteRequestsForUser(context.Background(), user1) require.NoError(t, err) - compareRequests(t, user1ExpectedRequests, user1Requests) + compareRequests(t, tc.user1Requests, user1Requests) - user2Requests, err := testDeleteRequestsStore.GetAllDeleteRequestsForUser(context.Background(), user2) + user2Requests, err := tc.store.GetAllDeleteRequestsForUser(context.Background(), user2) require.NoError(t, err) - compareRequests(t, user2ExpectedRequests, user2Requests) + compareRequests(t, tc.user2Requests, user2Requests) - createGenNumber, err := testDeleteRequestsStore.GetCacheGenerationNumber(context.Background(), user1) + createGenNumber, err := tc.store.GetCacheGenerationNumber(context.Background(), user1) require.NoError(t, err) require.NotEmpty(t, createGenNumber) - createGenNumber2, err := testDeleteRequestsStore.GetCacheGenerationNumber(context.Background(), user2) + createGenNumber2, err := tc.store.GetCacheGenerationNumber(context.Background(), user2) require.NoError(t, err) require.NotEmpty(t, createGenNumber2) // get individual delete requests by id and see if they have expected values for _, expectedRequest := range append(user1Requests, user2Requests...) { - actualRequest, err := testDeleteRequestsStore.GetDeleteRequest(context.Background(), expectedRequest.UserID, expectedRequest.RequestID) + actualRequest, err := tc.store.GetDeleteRequestGroup(context.Background(), expectedRequest.UserID, expectedRequest.RequestID) require.NoError(t, err) - require.Equal(t, expectedRequest, *actualRequest) + require.Len(t, actualRequest, 1) + require.Equal(t, expectedRequest, actualRequest[0]) } // try a non-existent request and see if it throws ErrDeleteRequestNotFound - _, err = testDeleteRequestsStore.GetDeleteRequest(context.Background(), "user3", "na") + _, err = tc.store.GetDeleteRequestGroup(context.Background(), "user3", "na") require.ErrorIs(t, err, ErrDeleteRequestNotFound) // update some of the delete requests for both the users to processed - for i := 0; i < len(user1ExpectedRequests); i++ { + for i := 0; i < len(tc.user1Requests); i++ { var request DeleteRequest if i%2 != 0 { - user1ExpectedRequests[i].Status = StatusProcessed - request = user1ExpectedRequests[i] + tc.user1Requests[i].Status = StatusProcessed + request = tc.user1Requests[i] } else { - user2ExpectedRequests[i].Status = StatusProcessed - request = user2ExpectedRequests[i] + tc.user2Requests[i].Status = StatusProcessed + request = tc.user2Requests[i] } - require.NoError(t, testDeleteRequestsStore.UpdateStatus(context.Background(), request.UserID, request.RequestID, StatusProcessed)) + require.NoError(t, tc.store.UpdateStatus(context.Background(), request, StatusProcessed)) } // see if requests in the store have right values - user1Requests, err = testDeleteRequestsStore.GetAllDeleteRequestsForUser(context.Background(), user1) + user1Requests, err = tc.store.GetAllDeleteRequestsForUser(context.Background(), user1) require.NoError(t, err) - compareRequests(t, user1ExpectedRequests, user1Requests) + compareRequests(t, tc.user1Requests, user1Requests) - user2Requests, err = testDeleteRequestsStore.GetAllDeleteRequestsForUser(context.Background(), user2) + user2Requests, err = tc.store.GetAllDeleteRequestsForUser(context.Background(), user2) require.NoError(t, err) - compareRequests(t, user2ExpectedRequests, user2Requests) + compareRequests(t, tc.user2Requests, user2Requests) - updateGenNumber, err := testDeleteRequestsStore.GetCacheGenerationNumber(context.Background(), user1) + updateGenNumber, err := tc.store.GetCacheGenerationNumber(context.Background(), user1) require.NoError(t, err) require.NotEqual(t, createGenNumber, updateGenNumber) - updateGenNumber2, err := testDeleteRequestsStore.GetCacheGenerationNumber(context.Background(), user2) + updateGenNumber2, err := tc.store.GetCacheGenerationNumber(context.Background(), user2) require.NoError(t, err) require.NotEqual(t, createGenNumber2, updateGenNumber2) // delete the requests from the store updated previously var remainingRequests []DeleteRequest - for i := 0; i < len(user1ExpectedRequests); i++ { + for i := 0; i < len(tc.user1Requests); i++ { var request DeleteRequest if i%2 != 0 { - user1ExpectedRequests[i].Status = StatusProcessed - request = user1ExpectedRequests[i] - remainingRequests = append(remainingRequests, user2ExpectedRequests[i]) + tc.user1Requests[i].Status = StatusProcessed + request = tc.user1Requests[i] + remainingRequests = append(remainingRequests, tc.user2Requests[i]) } else { - user2ExpectedRequests[i].Status = StatusProcessed - request = user2ExpectedRequests[i] - remainingRequests = append(remainingRequests, user1ExpectedRequests[i]) + tc.user2Requests[i].Status = StatusProcessed + request = tc.user2Requests[i] + remainingRequests = append(remainingRequests, tc.user1Requests[i]) } - require.NoError(t, testDeleteRequestsStore.RemoveDeleteRequest(context.Background(), request.UserID, request.RequestID, request.CreatedAt, request.StartTime, request.EndTime)) + require.NoError(t, tc.store.RemoveDeleteRequests(context.Background(), []DeleteRequest{request})) } // see if the store has the right remaining requests - deleteRequests, err = testDeleteRequestsStore.GetDeleteRequestsByStatus(context.Background(), StatusReceived) + deleteRequests, err = tc.store.GetDeleteRequestsByStatus(context.Background(), StatusReceived) require.NoError(t, err) compareRequests(t, remainingRequests, deleteRequests) - deleteGenNumber, err := testDeleteRequestsStore.GetCacheGenerationNumber(context.Background(), user1) + deleteGenNumber, err := tc.store.GetCacheGenerationNumber(context.Background(), user1) require.NoError(t, err) require.NotEqual(t, updateGenNumber, deleteGenNumber) - deleteGenNumber2, err := testDeleteRequestsStore.GetCacheGenerationNumber(context.Background(), user2) + deleteGenNumber2, err := tc.store.GetCacheGenerationNumber(context.Background(), user2) require.NoError(t, err) require.NotEqual(t, updateGenNumber2, deleteGenNumber2) } +func TestBatchCreateGet(t *testing.T) { + t.Run("it adds the requests with different sequence numbers but the same request id, status, and creation time", func(t *testing.T) { + tc := setup(t) + defer tc.store.Stop() + + requests, err := tc.store.AddDeleteRequestGroup(context.Background(), tc.user1Requests) + require.NoError(t, err) + + for i, req := range requests { + require.Equal(t, req.RequestID, requests[0].RequestID) + require.Equal(t, req.Status, requests[0].Status) + require.Equal(t, req.CreatedAt, requests[0].CreatedAt) + + require.Equal(t, req.SequenceNum, int64(i)) + } + }) + + t.Run("returns all the requests that share a request id", func(t *testing.T) { + tc := setup(t) + defer tc.store.Stop() + + savedRequests, err := tc.store.AddDeleteRequestGroup(context.Background(), tc.user1Requests) + require.NoError(t, err) + + results, err := tc.store.GetDeleteRequestGroup(context.Background(), savedRequests[0].UserID, savedRequests[0].RequestID) + require.NoError(t, err) + + require.Equal(t, savedRequests, results) + }) + + t.Run("updates a single request with a new status", func(t *testing.T) { + tc := setup(t) + defer tc.store.Stop() + + savedRequests, err := tc.store.AddDeleteRequestGroup(context.Background(), tc.user1Requests) + require.NoError(t, err) + + err = tc.store.UpdateStatus(context.Background(), savedRequests[1], StatusProcessed) + require.NoError(t, err) + + results, err := tc.store.GetDeleteRequestGroup(context.Background(), savedRequests[0].UserID, savedRequests[0].RequestID) + require.NoError(t, err) + + require.Equal(t, StatusProcessed, results[1].Status) + }) + + t.Run("deletes several delete requests", func(t *testing.T) { + tc := setup(t) + defer tc.store.Stop() + + savedRequests, err := tc.store.AddDeleteRequestGroup(context.Background(), tc.user1Requests) + require.NoError(t, err) + + err = tc.store.RemoveDeleteRequests(context.Background(), savedRequests) + require.NoError(t, err) + + results, err := tc.store.GetDeleteRequestGroup(context.Background(), savedRequests[0].UserID, savedRequests[0].RequestID) + require.ErrorIs(t, err, ErrDeleteRequestNotFound) + require.Empty(t, results) + }) +} + func compareRequests(t *testing.T, expected []DeleteRequest, actual []DeleteRequest) { require.Len(t, actual, len(expected)) sort.Slice(expected, func(i, j int) bool { @@ -191,3 +206,60 @@ func compareRequests(t *testing.T, expected []DeleteRequest, actual []DeleteRequ require.Equal(t, expected[i], deleteRequest) } } + +type testContext struct { + user1Requests []DeleteRequest + user2Requests []DeleteRequest + store *deleteRequestsStore +} + +func setup(t *testing.T) *testContext { + t.Helper() + tc := &testContext{} + // build some test requests to add to the store + for i := time.Duration(1); i <= 24; i++ { + tc.user1Requests = append(tc.user1Requests, DeleteRequest{ + UserID: user1, + StartTime: now.Add(-i * time.Hour), + EndTime: now.Add(-i * time.Hour).Add(30 * time.Minute), + CreatedAt: model.Time(38), + Query: fmt.Sprintf(`{foo="%d", user="%s"}`, i, user1), + Status: StatusReceived, + }) + tc.user2Requests = append(tc.user2Requests, DeleteRequest{ + UserID: user2, + StartTime: now.Add(-i * time.Hour), + EndTime: now.Add(-(i + 1) * time.Hour), + CreatedAt: model.Time(38), + Query: fmt.Sprintf(`{foo="%d", user="%s"}`, i, user2), + Status: StatusReceived, + }) + } + + // build the store + tempDir := t.TempDir() + //tempDir := os.TempDir() + fmt.Println(tempDir) + + workingDir := filepath.Join(tempDir, "working-dir") + objectStorePath := filepath.Join(tempDir, "object-store") + + objectClient, err := local.NewFSObjectClient(local.FSConfig{ + Directory: objectStorePath, + }) + require.NoError(t, err) + ds, err := NewDeleteStore(workingDir, storage.NewIndexStorageClient(objectClient, "")) + require.NoError(t, err) + + store := ds.(*deleteRequestsStore) + store.now = func() model.Time { return model.Time(38) } + tc.store = store + + return tc +} + +var ( + now = model.Now() + user1 = "user1" + user2 = "user2" +) diff --git a/pkg/storage/stores/indexshipper/compactor/deletion/noop_delete_requests_store.go b/pkg/storage/stores/indexshipper/compactor/deletion/noop_delete_requests_store.go index ac0d182645..081c3baf1c 100644 --- a/pkg/storage/stores/indexshipper/compactor/deletion/noop_delete_requests_store.go +++ b/pkg/storage/stores/indexshipper/compactor/deletion/noop_delete_requests_store.go @@ -2,8 +2,6 @@ package deletion import ( "context" - - "github.com/prometheus/common/model" ) func NewNoOpDeleteRequestsStore() DeleteRequestsStore { @@ -12,8 +10,8 @@ func NewNoOpDeleteRequestsStore() DeleteRequestsStore { type noOpDeleteRequestsStore struct{} -func (d *noOpDeleteRequestsStore) AddDeleteRequest(ctx context.Context, userID string, startTime, endTime model.Time, query string) error { - return nil +func (d *noOpDeleteRequestsStore) AddDeleteRequestGroup(ctx context.Context, req []DeleteRequest) ([]DeleteRequest, error) { + return nil, nil } func (d *noOpDeleteRequestsStore) GetDeleteRequestsByStatus(ctx context.Context, status DeleteRequestStatus) ([]DeleteRequest, error) { @@ -24,15 +22,15 @@ func (d *noOpDeleteRequestsStore) GetAllDeleteRequestsForUser(ctx context.Contex return nil, nil } -func (d *noOpDeleteRequestsStore) UpdateStatus(ctx context.Context, userID, requestID string, newStatus DeleteRequestStatus) error { +func (d *noOpDeleteRequestsStore) UpdateStatus(ctx context.Context, req DeleteRequest, newStatus DeleteRequestStatus) error { return nil } -func (d *noOpDeleteRequestsStore) GetDeleteRequest(ctx context.Context, userID, requestID string) (*DeleteRequest, error) { +func (d *noOpDeleteRequestsStore) GetDeleteRequestGroup(ctx context.Context, userID, requestID string) ([]DeleteRequest, error) { return nil, nil } -func (d *noOpDeleteRequestsStore) RemoveDeleteRequest(ctx context.Context, userID, requestID string, createdAt, startTime, endTime model.Time) error { +func (d *noOpDeleteRequestsStore) RemoveDeleteRequests(ctx context.Context, reqs []DeleteRequest) error { return nil } diff --git a/pkg/storage/stores/indexshipper/compactor/deletion/request_handler.go b/pkg/storage/stores/indexshipper/compactor/deletion/request_handler.go index 658ba1352c..ac15915797 100644 --- a/pkg/storage/stores/indexshipper/compactor/deletion/request_handler.go +++ b/pkg/storage/stores/indexshipper/compactor/deletion/request_handler.go @@ -4,8 +4,10 @@ import ( "encoding/json" "errors" "fmt" + "math" "net/http" "net/url" + "sort" "time" "github.com/grafana/loki/pkg/util" @@ -21,17 +23,17 @@ import ( // DeleteRequestHandler provides handlers for delete requests type DeleteRequestHandler struct { - deleteRequestsStore DeleteRequestsStore - metrics *deleteRequestHandlerMetrics - deleteRequestCancelPeriod time.Duration + deleteRequestsStore DeleteRequestsStore + metrics *deleteRequestHandlerMetrics + maxInterval time.Duration } // NewDeleteRequestHandler creates a DeleteRequestHandler -func NewDeleteRequestHandler(deleteStore DeleteRequestsStore, deleteRequestCancelPeriod time.Duration, registerer prometheus.Registerer) *DeleteRequestHandler { +func NewDeleteRequestHandler(deleteStore DeleteRequestsStore, maxInterval time.Duration, registerer prometheus.Registerer) *DeleteRequestHandler { deleteMgr := DeleteRequestHandler{ - deleteRequestsStore: deleteStore, - deleteRequestCancelPeriod: deleteRequestCancelPeriod, - metrics: newDeleteRequestHandlerMetrics(registerer), + deleteRequestsStore: deleteStore, + maxInterval: maxInterval, + metrics: newDeleteRequestHandlerMetrics(registerer), } return &deleteMgr @@ -65,7 +67,14 @@ func (dm *DeleteRequestHandler) AddDeleteRequestHandler(w http.ResponseWriter, r return } - if err := dm.deleteRequestsStore.AddDeleteRequest(ctx, userID, model.Time(startTime), model.Time(endTime), query); err != nil { + interval, err := dm.interval(params, startTime, endTime) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + deleteRequests := shardDeleteRequestsByInterval(startTime, endTime, query, userID, interval) + if _, err := dm.deleteRequestsStore.AddDeleteRequestGroup(ctx, deleteRequests); err != nil { level.Error(util_log.Logger).Log("msg", "error adding delete request to the store", "err", err) http.Error(w, err.Error(), http.StatusInternalServerError) return @@ -81,6 +90,58 @@ func (dm *DeleteRequestHandler) AddDeleteRequestHandler(w http.ResponseWriter, r w.WriteHeader(http.StatusNoContent) } +func shardDeleteRequestsByInterval(startTime, endTime model.Time, query, userID string, interval time.Duration) []DeleteRequest { + deleteRequests := make([]DeleteRequest, 0, endTime.Sub(startTime)/interval) + for start := startTime; start.Before(endTime); start = start.Add(interval) + 1 { + end := start.Add(interval) + if end.After(endTime) { + end = endTime + } + + deleteRequests = append(deleteRequests, + DeleteRequest{ + StartTime: start, + EndTime: end, + Query: query, + UserID: userID, + }) + } + return deleteRequests +} + +func (dm *DeleteRequestHandler) interval(params url.Values, startTime, endTime model.Time) (time.Duration, error) { + qr := params.Get("max_interval") + if qr == "" { + if dm.maxInterval == 0 { + return endTime.Sub(startTime), nil + } + + return min(endTime.Sub(startTime), dm.maxInterval), nil + } + + interval, err := time.ParseDuration(qr) + if err != nil || interval < time.Second { + return 0, errors.New("invalid max_interval: valid time units are 's', 'm', 'h'") + } + + if interval > dm.maxInterval && dm.maxInterval != 0 { + return 0, fmt.Errorf("max_interval can't be greater than %s", dm.maxInterval.String()) + } + + if interval > endTime.Sub(startTime) { + return 0, fmt.Errorf("max_interval can't be greater than the interval to be deleted (%s)", endTime.Sub(startTime)) + } + + return interval, nil +} + +func min(a, b time.Duration) time.Duration { + if a < b { + return a + } + return b +} + // GetAllDeleteRequestsHandler handles get all delete requests func (dm *DeleteRequestHandler) GetAllDeleteRequestsHandler(w http.ResponseWriter, r *http.Request) { ctx := r.Context() @@ -90,19 +151,77 @@ func (dm *DeleteRequestHandler) GetAllDeleteRequestsHandler(w http.ResponseWrite return } - deleteRequests, err := dm.deleteRequestsStore.GetAllDeleteRequestsForUser(ctx, userID) + deleteGroups, err := dm.deleteRequestsStore.GetAllDeleteRequestsForUser(ctx, userID) if err != nil { level.Error(util_log.Logger).Log("msg", "error getting delete requests from the store", "err", err) http.Error(w, err.Error(), http.StatusInternalServerError) return } + deletesPerRequest := partitionByRequestID(deleteGroups) + deleteRequests := mergeDeletes(deletesPerRequest) + + sort.Slice(deleteRequests, func(i, j int) bool { + return deleteRequests[i].CreatedAt < deleteRequests[j].CreatedAt + }) + if err := json.NewEncoder(w).Encode(deleteRequests); err != nil { level.Error(util_log.Logger).Log("msg", "error marshalling response", "err", err) http.Error(w, fmt.Sprintf("Error marshalling response: %v", err), http.StatusInternalServerError) } } +func mergeDeletes(groups map[string][]DeleteRequest) []DeleteRequest { + mergedRequests := []DeleteRequest{} // Declare this way so the return value is [] rather than null + for _, deletes := range groups { + startTime, endTime, status := mergeData(deletes) + newDelete := deletes[0] + newDelete.StartTime = startTime + newDelete.EndTime = endTime + newDelete.Status = status + + mergedRequests = append(mergedRequests, newDelete) + } + return mergedRequests +} + +func mergeData(deletes []DeleteRequest) (model.Time, model.Time, DeleteRequestStatus) { + var ( + startTime = model.Time(math.MaxInt64) + endTime = model.Time(0) + numProcessed = 0 + ) + + for _, del := range deletes { + if del.StartTime < startTime { + startTime = del.StartTime + } + + if del.EndTime > endTime { + endTime = del.EndTime + } + + if del.Status == StatusProcessed { + numProcessed++ + } + } + + return startTime, endTime, deleteRequestStatus(numProcessed, len(deletes)) +} + +func deleteRequestStatus(processed, total int) DeleteRequestStatus { + if processed == 0 { + return StatusReceived + } + + if processed == total { + return StatusProcessed + } + + percentCompleted := float64(processed) / float64(total) + return DeleteRequestStatus(fmt.Sprintf("%d%% Complete", int(percentCompleted*100))) +} + // CancelDeleteRequestHandler handles delete request cancellation func (dm *DeleteRequestHandler) CancelDeleteRequestHandler(w http.ResponseWriter, r *http.Request) { ctx := r.Context() @@ -114,25 +233,30 @@ func (dm *DeleteRequestHandler) CancelDeleteRequestHandler(w http.ResponseWriter params := r.URL.Query() requestID := params.Get("request_id") - - deleteRequest, err := dm.deleteRequestsStore.GetDeleteRequest(ctx, userID, requestID) + deleteRequests, err := dm.deleteRequestsStore.GetDeleteRequestGroup(ctx, userID, requestID) if err != nil { + if errors.Is(err, ErrDeleteRequestNotFound) { + http.Error(w, "could not find delete request with given id", http.StatusNotFound) + return + } + level.Error(util_log.Logger).Log("msg", "error getting delete request from the store", "err", err) http.Error(w, err.Error(), http.StatusInternalServerError) return } - if deleteRequest == nil { - http.Error(w, "could not find delete request with given id", http.StatusBadRequest) + toDelete := filterProcessed(deleteRequests) + if len(toDelete) == 0 { + http.Error(w, "deletion of request which is in process or already processed is not allowed", http.StatusBadRequest) return } - if deleteRequest.Status != StatusReceived { - http.Error(w, "deletion of request which is in process or already processed is not allowed", http.StatusBadRequest) + if len(toDelete) != len(deleteRequests) && params.Get("force") != "true" { + http.Error(w, "Unable to cancel partially completed delete request. To force, use the ?force query parameter", http.StatusBadRequest) return } - if err := dm.deleteRequestsStore.RemoveDeleteRequest(ctx, userID, requestID, deleteRequest.CreatedAt, deleteRequest.StartTime, deleteRequest.EndTime); err != nil { + if err := dm.deleteRequestsStore.RemoveDeleteRequests(ctx, toDelete); err != nil { level.Error(util_log.Logger).Log("msg", "error cancelling the delete request", "err", err) http.Error(w, err.Error(), http.StatusInternalServerError) return @@ -141,6 +265,16 @@ func (dm *DeleteRequestHandler) CancelDeleteRequestHandler(w http.ResponseWriter w.WriteHeader(http.StatusNoContent) } +func filterProcessed(reqs []DeleteRequest) []DeleteRequest { + var unprocessed []DeleteRequest + for _, r := range reqs { + if r.Status == StatusReceived { + unprocessed = append(unprocessed, r) + } + } + return unprocessed +} + // GetCacheGenerationNumberHandler handles requests for a user's cache generation number func (dm *DeleteRequestHandler) GetCacheGenerationNumberHandler(w http.ResponseWriter, r *http.Request) { ctx := r.Context() @@ -176,7 +310,7 @@ func query(params url.Values) (string, error) { return query, nil } -func startTime(params url.Values) (int64, error) { +func startTime(params url.Values) (model.Time, error) { startParam := params.Get("start") if startParam == "" { return 0, errors.New("start time not set") @@ -187,10 +321,10 @@ func startTime(params url.Values) (int64, error) { return 0, errors.New("invalid start time: require unix seconds or RFC3339 format") } - return st, nil + return model.Time(st), nil } -func endTime(params url.Values, startTime int64) (int64, error) { +func endTime(params url.Values, startTime model.Time) (model.Time, error) { endParam := params.Get("end") endTime, err := parseTime(endParam) if err != nil { @@ -201,11 +335,11 @@ func endTime(params url.Values, startTime int64) (int64, error) { return 0, errors.New("deletes in the future are not allowed") } - if startTime > endTime { + if int64(startTime) > endTime { return 0, errors.New("start time can't be greater than end time") } - return endTime, nil + return model.Time(endTime), nil } func parseTime(in string) (int64, error) { diff --git a/pkg/storage/stores/indexshipper/compactor/deletion/request_handler_test.go b/pkg/storage/stores/indexshipper/compactor/deletion/request_handler_test.go index 9ec62e9bc3..d56c292636 100644 --- a/pkg/storage/stores/indexshipper/compactor/deletion/request_handler_test.go +++ b/pkg/storage/stores/indexshipper/compactor/deletion/request_handler_test.go @@ -2,6 +2,7 @@ package deletion import ( "context" + "encoding/json" "fmt" "net/http" "net/http/httptest" @@ -22,7 +23,7 @@ import ( func TestAddDeleteRequestHandler(t *testing.T) { t.Run("it adds the delete request to the store", func(t *testing.T) { store := &mockDeleteRequestsStore{} - h := NewDeleteRequestHandler(store, time.Second, nil) + h := NewDeleteRequestHandler(store, 0, nil) req := buildRequest("org-id", `{foo="bar"}`, "0000000000", "0000000001") @@ -31,15 +32,72 @@ func TestAddDeleteRequestHandler(t *testing.T) { require.Equal(t, w.Code, http.StatusNoContent) - require.Equal(t, "org-id", store.addedUser) - require.Equal(t, `{foo="bar"}`, store.addedQuery) - require.Equal(t, toTime("0000000000"), store.addedStartTime) - require.Equal(t, toTime("0000000001"), store.addedEndTime) + require.Equal(t, "org-id", store.addReqs[0].UserID) + require.Equal(t, `{foo="bar"}`, store.addReqs[0].Query) + require.Equal(t, toTime("0000000000"), store.addReqs[0].StartTime) + require.Equal(t, toTime("0000000001"), store.addReqs[0].EndTime) + }) + + t.Run("it shards deletes based on a query param", func(t *testing.T) { + store := &mockDeleteRequestsStore{} + h := NewDeleteRequestHandler(store, 0, nil) + + from := model.TimeFromUnix(model.Now().Add(-3 * time.Hour).Unix()) + to := model.TimeFromUnix(from.Add(3 * time.Hour).Unix()) + + req := buildRequest("org-id", `{foo="bar"}`, unixString(from), unixString(to)) + params := req.URL.Query() + params.Set("max_interval", "1h") + req.URL.RawQuery = params.Encode() + + w := httptest.NewRecorder() + h.AddDeleteRequestHandler(w, req) + + require.Equal(t, w.Code, http.StatusNoContent) + require.Len(t, store.addReqs, 3) + + for i, req := range store.addReqs { + startTime := from.Add(time.Duration(i)*time.Hour) + model.Time(i) + endTime := from.Add(time.Duration(i+1)*time.Hour) + model.Time(i) + if endTime.After(to) { + endTime = to + } + + require.Equal(t, startTime, req.StartTime) + require.Equal(t, endTime, req.EndTime) + } + }) + + t.Run("it uses the default for sharding when the query param isn't present", func(t *testing.T) { + store := &mockDeleteRequestsStore{} + h := NewDeleteRequestHandler(store, time.Hour, nil) + + from := model.TimeFromUnix(model.Now().Add(-3 * time.Hour).Unix()) + to := model.TimeFromUnix(from.Add(3 * time.Hour).Unix()) + + req := buildRequest("org-id", `{foo="bar"}`, unixString(from), unixString(to)) + + w := httptest.NewRecorder() + h.AddDeleteRequestHandler(w, req) + + require.Equal(t, w.Code, http.StatusNoContent) + require.Len(t, store.addReqs, 3) + + for i, req := range store.addReqs { + startTime := from.Add(time.Duration(i)*time.Hour) + model.Time(i) + endTime := from.Add(time.Duration(i+1)*time.Hour) + model.Time(i) + if endTime.After(to) { + endTime = to + } + + require.Equal(t, startTime, req.StartTime) + require.Equal(t, endTime, req.EndTime) + } }) t.Run("it works with RFC3339", func(t *testing.T) { store := &mockDeleteRequestsStore{} - h := NewDeleteRequestHandler(store, time.Second, nil) + h := NewDeleteRequestHandler(store, 0, nil) req := buildRequest("org-id", `{foo="bar"}`, "2006-01-02T15:04:05Z", "2006-01-03T15:04:05Z") @@ -48,15 +106,15 @@ func TestAddDeleteRequestHandler(t *testing.T) { require.Equal(t, w.Code, http.StatusNoContent) - require.Equal(t, "org-id", store.addedUser) - require.Equal(t, `{foo="bar"}`, store.addedQuery) - require.Equal(t, toTime("1136214245"), store.addedStartTime) - require.Equal(t, toTime("1136300645"), store.addedEndTime) + require.Equal(t, "org-id", store.addReqs[0].UserID) + require.Equal(t, `{foo="bar"}`, store.addReqs[0].Query) + require.Equal(t, toTime("1136214245"), store.addReqs[0].StartTime) + require.Equal(t, toTime("1136300645"), store.addReqs[0].EndTime) }) t.Run("it fills in end time if blank", func(t *testing.T) { store := &mockDeleteRequestsStore{} - h := NewDeleteRequestHandler(store, time.Second, nil) + h := NewDeleteRequestHandler(store, 0, nil) req := buildRequest("org-id", `{foo="bar"}`, "0000000000", "") @@ -65,15 +123,15 @@ func TestAddDeleteRequestHandler(t *testing.T) { require.Equal(t, w.Code, http.StatusNoContent) - require.Equal(t, "org-id", store.addedUser) - require.Equal(t, `{foo="bar"}`, store.addedQuery) - require.Equal(t, toTime("0000000000"), store.addedStartTime) - require.InDelta(t, int64(model.Now()), int64(store.addedEndTime), 1000) + require.Equal(t, "org-id", store.addReqs[0].UserID) + require.Equal(t, `{foo="bar"}`, store.addReqs[0].Query) + require.Equal(t, toTime("0000000000"), store.addReqs[0].StartTime) + require.InDelta(t, int64(model.Now()), int64(store.addReqs[0].EndTime), 1000) }) t.Run("it returns 500 when the delete store errors", func(t *testing.T) { store := &mockDeleteRequestsStore{addErr: errors.New("something bad")} - h := NewDeleteRequestHandler(store, time.Second, nil) + h := NewDeleteRequestHandler(store, 0, nil) req := buildRequest("org-id", `{foo="bar"}`, "0000000000", "0000000001") @@ -83,23 +141,31 @@ func TestAddDeleteRequestHandler(t *testing.T) { }) t.Run("Validation", func(t *testing.T) { - h := NewDeleteRequestHandler(&mockDeleteRequestsStore{}, time.Second, nil) + h := NewDeleteRequestHandler(&mockDeleteRequestsStore{}, time.Minute, nil) for _, tc := range []struct { - orgID, query, startTime, endTime, error string + orgID, query, startTime, endTime, interval, error string }{ - {"", `{foo="bar"}`, "0000000000", "0000000001", "no org id\n"}, - {"org-id", "", "0000000000", "0000000001", "query not set\n"}, - {"org-id", `not a query`, "0000000000", "0000000001", "invalid query expression\n"}, - {"org-id", `{foo="bar"}`, "", "0000000001", "start time not set\n"}, - {"org-id", `{foo="bar"}`, "0000000000000", "0000000001", "invalid start time: require unix seconds or RFC3339 format\n"}, - {"org-id", `{foo="bar"}`, "0000000000", "0000000000001", "invalid end time: require unix seconds or RFC3339 format\n"}, - {"org-id", `{foo="bar"}`, "0000000000", fmt.Sprint(time.Now().Add(time.Hour).Unix())[:10], "deletes in the future are not allowed\n"}, - {"org-id", `{foo="bar"}`, "0000000001", "0000000000", "start time can't be greater than end time\n"}, + {"", `{foo="bar"}`, "0000000000", "0000000001", "", "no org id\n"}, + {"org-id", "", "0000000000", "0000000001", "", "query not set\n"}, + {"org-id", `not a query`, "0000000000", "0000000001", "", "invalid query expression\n"}, + {"org-id", `{foo="bar"}`, "", "0000000001", "", "start time not set\n"}, + {"org-id", `{foo="bar"}`, "0000000000000", "0000000001", "", "invalid start time: require unix seconds or RFC3339 format\n"}, + {"org-id", `{foo="bar"}`, "0000000000", "0000000000001", "", "invalid end time: require unix seconds or RFC3339 format\n"}, + {"org-id", `{foo="bar"}`, "0000000000", fmt.Sprint(time.Now().Add(time.Hour).Unix())[:10], "", "deletes in the future are not allowed\n"}, + {"org-id", `{foo="bar"}`, "0000000001", "0000000000", "", "start time can't be greater than end time\n"}, + {"org-id", `{foo="bar"}`, "0000000000", "0000000001", "not-a-duration", "invalid max_interval: valid time units are 's', 'm', 'h'\n"}, + {"org-id", `{foo="bar"}`, "0000000000", "0000000001", "1ms", "invalid max_interval: valid time units are 's', 'm', 'h'\n"}, + {"org-id", `{foo="bar"}`, "0000000000", "0000000001", "1h", "max_interval can't be greater than 1m0s\n"}, + {"org-id", `{foo="bar"}`, "0000000000", "0000000001", "30s", "max_interval can't be greater than the interval to be deleted (1s)\n"}, } { t.Run(strings.TrimSpace(tc.error), func(t *testing.T) { req := buildRequest(tc.orgID, tc.query, tc.startTime, tc.endTime) + params := req.URL.Query() + params.Set("max_interval", tc.interval) + req.URL.RawQuery = params.Encode() + w := httptest.NewRecorder() h.AddDeleteRequestHandler(w, req) @@ -110,6 +176,255 @@ func TestAddDeleteRequestHandler(t *testing.T) { }) } +func TestCancelDeleteRequestHandler(t *testing.T) { + t.Run("it removes unprocessed delete requests from the store when force is true", func(t *testing.T) { + stored := []DeleteRequest{ + {RequestID: "test-request", UserID: "org-id", Query: "test-query", SequenceNum: 0, Status: StatusProcessed}, + {RequestID: "test-request", UserID: "org-id", Query: "test-query", SequenceNum: 1, Status: StatusReceived}, + } + store := &mockDeleteRequestsStore{} + store.getResult = stored + + h := NewDeleteRequestHandler(store, 0, nil) + + req := buildRequest("org-id", ``, "", "") + params := req.URL.Query() + params.Set("request_id", "test-request") + params.Set("force", "true") + req.URL.RawQuery = params.Encode() + + w := httptest.NewRecorder() + h.CancelDeleteRequestHandler(w, req) + + require.Equal(t, w.Code, http.StatusNoContent) + + require.Equal(t, store.getUser, "org-id") + require.Equal(t, store.getID, "test-request") + require.Equal(t, stored[1], store.removeReqs[0]) + }) + + t.Run("it returns an error when parts of the query have started to be processed", func(t *testing.T) { + stored := []DeleteRequest{ + {RequestID: "test-request-1", CreatedAt: now, Status: StatusProcessed}, + {RequestID: "test-request-1", CreatedAt: now, Status: StatusReceived}, + {RequestID: "test-request-1", CreatedAt: now, Status: StatusProcessed}, + } + store := &mockDeleteRequestsStore{} + store.getResult = stored + + h := NewDeleteRequestHandler(store, 0, nil) + + req := buildRequest("org-id", ``, "", "") + params := req.URL.Query() + params.Set("request_id", "test-request") + params.Set("force", "false") + req.URL.RawQuery = params.Encode() + + w := httptest.NewRecorder() + h.CancelDeleteRequestHandler(w, req) + + require.Equal(t, w.Code, http.StatusBadRequest) + require.Equal(t, "Unable to cancel partially completed delete request. To force, use the ?force query parameter\n", w.Body.String()) + }) + + t.Run("error getting from store", func(t *testing.T) { + store := &mockDeleteRequestsStore{} + store.getErr = errors.New("something bad") + h := NewDeleteRequestHandler(store, 0, nil) + + req := buildRequest("org id", ``, "", "") + params := req.URL.Query() + params.Set("request_id", "test-request") + req.URL.RawQuery = params.Encode() + + w := httptest.NewRecorder() + h.CancelDeleteRequestHandler(w, req) + + require.Equal(t, w.Code, http.StatusInternalServerError) + require.Equal(t, "something bad\n", w.Body.String()) + }) + + t.Run("error removing from the store", func(t *testing.T) { + stored := []DeleteRequest{{RequestID: "test-request", UserID: "org-id", Query: "test-query", Status: StatusReceived}} + store := &mockDeleteRequestsStore{} + store.getResult = stored + store.removeErr = errors.New("something bad") + + h := NewDeleteRequestHandler(store, 0, nil) + + req := buildRequest("org-id", ``, "", "") + params := req.URL.Query() + params.Set("request_id", "test-request") + req.URL.RawQuery = params.Encode() + + w := httptest.NewRecorder() + h.CancelDeleteRequestHandler(w, req) + + require.Equal(t, w.Code, http.StatusInternalServerError) + require.Equal(t, "something bad\n", w.Body.String()) + }) + + t.Run("Validation", func(t *testing.T) { + t.Run("no org id", func(t *testing.T) { + h := NewDeleteRequestHandler(&mockDeleteRequestsStore{}, 0, nil) + + req := buildRequest("", ``, "", "") + params := req.URL.Query() + params.Set("request_id", "test-request") + req.URL.RawQuery = params.Encode() + + w := httptest.NewRecorder() + h.CancelDeleteRequestHandler(w, req) + + require.Equal(t, w.Code, http.StatusBadRequest) + require.Equal(t, "no org id\n", w.Body.String()) + }) + + t.Run("request not found", func(t *testing.T) { + h := NewDeleteRequestHandler(&mockDeleteRequestsStore{getErr: ErrDeleteRequestNotFound}, 0, nil) + + req := buildRequest("org-id", ``, "", "") + params := req.URL.Query() + params.Set("request_id", "test-request") + req.URL.RawQuery = params.Encode() + + w := httptest.NewRecorder() + h.CancelDeleteRequestHandler(w, req) + + require.Equal(t, w.Code, http.StatusNotFound) + require.Equal(t, "could not find delete request with given id\n", w.Body.String()) + }) + + t.Run("all requests in group are already processed", func(t *testing.T) { + stored := []DeleteRequest{{RequestID: "test-request", UserID: "org-id", Query: "test-query", Status: StatusProcessed}} + store := &mockDeleteRequestsStore{} + store.getResult = stored + + h := NewDeleteRequestHandler(store, 0, nil) + + req := buildRequest("org-id", ``, "", "") + params := req.URL.Query() + params.Set("request_id", "test-request") + req.URL.RawQuery = params.Encode() + + w := httptest.NewRecorder() + h.CancelDeleteRequestHandler(w, req) + + require.Equal(t, w.Code, http.StatusBadRequest) + require.Equal(t, "deletion of request which is in process or already processed is not allowed\n", w.Body.String()) + }) + }) +} + +func TestGetAllDeleteRequestsHandler(t *testing.T) { + t.Run("it gets all the delete requests for the user", func(t *testing.T) { + store := &mockDeleteRequestsStore{} + store.getAllResult = []DeleteRequest{{RequestID: "test-request-1", Status: StatusReceived}, {RequestID: "test-request-2", Status: StatusReceived}} + h := NewDeleteRequestHandler(store, 0, nil) + + req := buildRequest("org-id", ``, "", "") + + w := httptest.NewRecorder() + h.GetAllDeleteRequestsHandler(w, req) + + require.Equal(t, w.Code, http.StatusOK) + require.Equal(t, store.getAllUser, "org-id") + + var result []DeleteRequest + require.NoError(t, json.Unmarshal(w.Body.Bytes(), &result)) + require.ElementsMatch(t, store.getAllResult, result) + }) + + t.Run("it merges requests with the same requestID", func(t *testing.T) { + store := &mockDeleteRequestsStore{} + store.getAllResult = []DeleteRequest{ + {RequestID: "test-request-1", CreatedAt: now, StartTime: now, EndTime: now.Add(time.Hour)}, + {RequestID: "test-request-1", CreatedAt: now, StartTime: now.Add(2 * time.Hour), EndTime: now.Add(3 * time.Hour)}, + {RequestID: "test-request-2", CreatedAt: now.Add(time.Minute), StartTime: now.Add(30 * time.Minute), EndTime: now.Add(90 * time.Minute)}, + {RequestID: "test-request-1", CreatedAt: now, StartTime: now.Add(time.Hour), EndTime: now.Add(2 * time.Hour)}, + } + h := NewDeleteRequestHandler(store, 0, nil) + + req := buildRequest("org-id", ``, "", "") + + w := httptest.NewRecorder() + h.GetAllDeleteRequestsHandler(w, req) + + require.Equal(t, w.Code, http.StatusOK) + + var result []DeleteRequest + require.NoError(t, json.Unmarshal(w.Body.Bytes(), &result)) + + require.Len(t, result, 2) + require.Equal(t, []DeleteRequest{ + {RequestID: "test-request-1", Status: StatusReceived, CreatedAt: now, StartTime: now, EndTime: now.Add(3 * time.Hour)}, + {RequestID: "test-request-2", Status: StatusReceived, CreatedAt: now.Add(time.Minute), StartTime: now.Add(30 * time.Minute), EndTime: now.Add(90 * time.Minute)}, + }, result) + }) + + t.Run("it only considers a request processed if all it's subqueries are processed", func(t *testing.T) { + store := &mockDeleteRequestsStore{} + store.getAllResult = []DeleteRequest{ + {RequestID: "test-request-1", CreatedAt: now, Status: StatusProcessed}, + {RequestID: "test-request-1", CreatedAt: now, Status: StatusReceived}, + {RequestID: "test-request-1", CreatedAt: now, Status: StatusProcessed}, + {RequestID: "test-request-2", CreatedAt: now.Add(time.Minute), Status: StatusProcessed}, + {RequestID: "test-request-2", CreatedAt: now.Add(time.Minute), Status: StatusProcessed}, + {RequestID: "test-request-2", CreatedAt: now.Add(time.Minute), Status: StatusProcessed}, + {RequestID: "test-request-3", CreatedAt: now.Add(2 * time.Minute), Status: StatusReceived}, + } + h := NewDeleteRequestHandler(store, 0, nil) + + req := buildRequest("org-id", ``, "", "") + + w := httptest.NewRecorder() + h.GetAllDeleteRequestsHandler(w, req) + + require.Equal(t, w.Code, http.StatusOK) + + var result []DeleteRequest + require.NoError(t, json.Unmarshal(w.Body.Bytes(), &result)) + + require.Len(t, result, 3) + require.Equal(t, []DeleteRequest{ + {RequestID: "test-request-1", CreatedAt: now, Status: "66% Complete"}, + {RequestID: "test-request-2", CreatedAt: now.Add(time.Minute), Status: StatusProcessed}, + {RequestID: "test-request-3", CreatedAt: now.Add(2 * time.Minute), Status: StatusReceived}, + }, result) + }) + + t.Run("error getting from store", func(t *testing.T) { + store := &mockDeleteRequestsStore{} + store.getAllErr = errors.New("something bad") + h := NewDeleteRequestHandler(store, 0, nil) + + req := buildRequest("org id", ``, "", "") + params := req.URL.Query() + params.Set("request_id", "test-request") + req.URL.RawQuery = params.Encode() + + w := httptest.NewRecorder() + h.GetAllDeleteRequestsHandler(w, req) + + require.Equal(t, w.Code, http.StatusInternalServerError) + require.Equal(t, "something bad\n", w.Body.String()) + }) + + t.Run("validation", func(t *testing.T) { + t.Run("no org id", func(t *testing.T) { + h := NewDeleteRequestHandler(&mockDeleteRequestsStore{}, 0, nil) + + req := buildRequest("", ``, "", "") + + w := httptest.NewRecorder() + h.GetAllDeleteRequestsHandler(w, req) + + require.Equal(t, w.Code, http.StatusBadRequest) + require.Equal(t, "no org id\n", w.Body.String()) + }) + }) +} + func buildRequest(orgID, query, start, end string) *http.Request { var req *http.Request if orgID == "" { @@ -123,11 +438,16 @@ func buildRequest(orgID, query, start, end string) *http.Request { q.Set("query", query) q.Set("start", start) q.Set("end", end) + req.URL.RawQuery = q.Encode() return req } +func unixString(t model.Time) string { + return fmt.Sprint(t.Unix()) +} + func toTime(t string) model.Time { modelTime, _ := util.ParseTime(t) return model.Time(modelTime) diff --git a/pkg/storage/stores/indexshipper/compactor/deletion/util.go b/pkg/storage/stores/indexshipper/compactor/deletion/util.go index 9ab3be3a7f..405041fac8 100644 --- a/pkg/storage/stores/indexshipper/compactor/deletion/util.go +++ b/pkg/storage/stores/indexshipper/compactor/deletion/util.go @@ -35,3 +35,11 @@ func deleteModeFromLimits(l Limits, userID string) (deletionmode.Mode, error) { mode := l.DeletionMode(userID) return deletionmode.ParseMode(mode) } + +func partitionByRequestID(reqs []DeleteRequest) map[string][]DeleteRequest { + groups := make(map[string][]DeleteRequest) + for _, req := range reqs { + groups[req.RequestID] = append(groups[req.RequestID], req) + } + return groups +}