Shard deletes requests based on time ranges (#6903)

* Refactor the store API to be in terms of DeleteRequest

* backfill remove tests

* backfill GetAll tests

* Make the delete request store deal in groups of requests instead of single ones

* Make cancelation aware of request groups

* remove cancelation deadline

* shard deletes based on a query parameter

* merge request group on gets

* global configuration for max query range

* sort delete requests by start time so batches are grouped to similar chunks

* Fix status when some deletes haven't happened yet

* fix tests

* Review feedback

* review feedback

* make max_interval > deletion interval a validation error

* list partially completed deletes

* list partially completed deletes

* fix flaky test

* clarify naming

* disallow cancelation of partial deletes unless the 'force' flag is passed

* more comment specificity for the delete request hash
pull/6967/head
Travis Patterson 3 years ago committed by GitHub
parent 7961ef6a4e
commit e15da437f0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      docs/sources/api/_index.md
  2. 7
      docs/sources/configuration/_index.md
  3. 4
      pkg/storage/stores/indexshipper/compactor/compactor.go
  4. 1
      pkg/storage/stores/indexshipper/compactor/deletion/delete_request.go
  5. 18
      pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_manager.go
  6. 60
      pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_manager_test.go
  7. 242
      pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_store.go
  8. 254
      pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_store_test.go
  9. 12
      pkg/storage/stores/indexshipper/compactor/deletion/noop_delete_requests_store.go
  10. 176
      pkg/storage/stores/indexshipper/compactor/deletion/request_handler.go
  11. 372
      pkg/storage/stores/indexshipper/compactor/deletion/request_handler_test.go
  12. 8
      pkg/storage/stores/indexshipper/compactor/deletion/util.go

@ -1036,6 +1036,7 @@ Query parameters:
* `query=<series_selector>`: query argument that identifies the streams from which to delete with optional line filters.
* `start=<rfc3339 | unix_seconds_timestamp>`: A timestamp that identifies the start of the time window within which entries will be deleted. This parameter is required.
* `end=<rfc3339 | unix_seconds_timestamp>`: A timestamp that identifies the end of the time window within which entries will be deleted. If not specified, defaults to the current time.
* `max_interval=<duration>`: The maximum time period the delete request can span. If the request is larger than this value, it is split into several requests of <= `max_interval`. Valid time units are "s", "m", "h".
A 204 response indicates success.
@ -1102,10 +1103,13 @@ curl -u "Tenant1:$API_TOKEN" \
DELETE /loki/api/v1/delete
```
Query Parameters:
* `force=<boolean>`: When the `force` query parameter is true, partially completed delete requests will be canceled. NOTE: some data from the request may still be deleted.
Remove a delete request for the authenticated tenant.
The [log entry deletion](../operations/storage/logs-deletion/) documentation has configuration details.
Loki allows cancellation of delete requests until the requests are picked up for processing. It is controlled by the `delete_request_cancel_period` YAML configuration or the equivalent command line option when invoking Loki.
Loki allows cancellation of delete requests until the requests are picked up for processing. It is controlled by the `delete_request_cancel_period` YAML configuration or the equivalent command line option when invoking Loki. To cancel a delete request that has been picked up for processing or is partially complete, pass the `force=true` query parameter to the API.
Log entry deletion is supported _only_ when the BoltDB Shipper is configured for the index store.

@ -2135,6 +2135,13 @@ compacts index shards to more performant forms.
# CLI flag: -boltdb.shipper.compactor.delete-request-cancel-period
[delete_request_cancel_period: <duration> | default = 24h]
# Constrain the size a delete request. When a delete request that spans > delete_max_query_range
# is input, the request is sharded into smaller requests of no more than delete_max_query_range.
#
# 0 means no max_query_period.
# CLI flag: -boltdb.shipper.compactor.delete-max-interval
[delete_max_interval: <duration> | default = 0]
# The max number of delete requests to run per compaction cycle.
# CLI flag: -boltdb.shipper.compactor.delete-batch-size
[delete_batch_size: <duration> | default = 70]

@ -82,6 +82,7 @@ type Config struct {
RetentionTableTimeout time.Duration `yaml:"retention_table_timeout"`
DeleteBatchSize int `yaml:"delete_batch_size"`
DeleteRequestCancelPeriod time.Duration `yaml:"delete_request_cancel_period"`
DeleteMaxInterval time.Duration `yaml:"delete_max_interval"`
MaxCompactionParallelism int `yaml:"max_compaction_parallelism"`
CompactorRing util.RingConfig `yaml:"compactor_ring,omitempty"`
RunOnce bool `yaml:"-"`
@ -102,6 +103,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.RetentionDeleteWorkCount, "boltdb.shipper.compactor.retention-delete-worker-count", 150, "The total amount of worker to use to delete chunks.")
f.IntVar(&cfg.DeleteBatchSize, "boltdb.shipper.compactor.delete-batch-size", 70, "The max number of delete requests to run per compaction cycle.")
f.DurationVar(&cfg.DeleteRequestCancelPeriod, "boltdb.shipper.compactor.delete-request-cancel-period", 24*time.Hour, "Allow cancellation of delete request until duration after they are created. Data would be deleted only after delete requests have been older than this duration. Ideally this should be set to at least 24h.")
f.DurationVar(&cfg.DeleteMaxInterval, "boltdb.shipper.compactor.delete-max-interval", 0, "Constrain the size of any single delete request. When a delete request > delete_max_query_range is input, the request is sharded into smaller requests of no more than delete_max_interval")
f.DurationVar(&cfg.RetentionTableTimeout, "boltdb.shipper.compactor.retention-table-timeout", 0, "The maximum amount of time to spend running retention and deletion on any given table in the index.")
f.IntVar(&cfg.MaxCompactionParallelism, "boltdb.shipper.compactor.max-compaction-parallelism", 1, "Maximum number of tables to compact in parallel. While increasing this value, please make sure compactor has enough disk space allocated to be able to store and compact as many tables.")
f.BoolVar(&cfg.RunOnce, "boltdb.shipper.compactor.run-once", false, "Run the compactor one time to cleanup and compact index files only (no retention applied)")
@ -271,7 +273,7 @@ func (c *Compactor) initDeletes(r prometheus.Registerer, limits *validation.Over
c.DeleteRequestsHandler = deletion.NewDeleteRequestHandler(
c.deleteRequestsStore,
c.cfg.DeleteRequestCancelPeriod,
c.cfg.DeleteMaxInterval,
r,
)

@ -20,6 +20,7 @@ type DeleteRequest struct {
CreatedAt model.Time `json:"created_at"`
UserID string `json:"-"`
SequenceNum int64 `json:"-"`
matchers []*labels.Matcher `json:"-"`
logSelectorExpr syntax.LogSelectorExpr `json:"-"`

@ -3,6 +3,7 @@ package deletion
import (
"context"
"fmt"
"sort"
"sync"
"time"
@ -122,7 +123,7 @@ func (d *DeleteRequestsManager) loadDeleteRequestsToProcess() error {
// Reset this first so any errors result in a clear map
d.deleteRequestsToProcess = map[string]*userDeleteRequests{}
deleteRequests, err := d.filteredDeleteRequests()
deleteRequests, err := d.filteredSortedDeleteRequests()
if err != nil {
return err
}
@ -154,13 +155,22 @@ func (d *DeleteRequestsManager) loadDeleteRequestsToProcess() error {
return nil
}
func (d *DeleteRequestsManager) filteredDeleteRequests() ([]DeleteRequest, error) {
func (d *DeleteRequestsManager) filteredSortedDeleteRequests() ([]DeleteRequest, error) {
deleteRequests, err := d.deleteRequestsStore.GetDeleteRequestsByStatus(context.Background(), StatusReceived)
if err != nil {
return nil, err
}
return d.filteredRequests(deleteRequests)
deleteRequests, err = d.filteredRequests(deleteRequests)
if err != nil {
return nil, err
}
sort.Slice(deleteRequests, func(i, j int) bool {
return deleteRequests[i].StartTime < deleteRequests[j].StartTime
})
return deleteRequests, nil
}
func (d *DeleteRequestsManager) filteredRequests(reqs []DeleteRequest) ([]DeleteRequest, error) {
@ -311,7 +321,7 @@ func (d *DeleteRequestsManager) MarkPhaseFinished() {
}
for _, deleteRequest := range userDeleteRequests.requests {
if err := d.deleteRequestsStore.UpdateStatus(context.Background(), deleteRequest.UserID, deleteRequest.RequestID, StatusProcessed); err != nil {
if err := d.deleteRequestsStore.UpdateStatus(context.Background(), deleteRequest, StatusProcessed); err != nil {
level.Error(util_log.Logger).Log(
"msg", "failed to mark delete request for user as processed",
"delete_request_id", deleteRequest.RequestID,

@ -378,33 +378,33 @@ func TestDeleteRequestsManager_Expired(t *testing.T) {
},
},
{
name: "Deletes are limited by batch size",
name: "Deletes are sorted by start time and limited by batch size",
deletionMode: deletionmode.FilterAndDelete,
batchSize: 2,
deleteRequestsFromStore: []DeleteRequest{
{
UserID: testUserID,
Query: lblFoo.String(),
StartTime: now.Add(-13 * time.Hour),
EndTime: now.Add(-11 * time.Hour),
StartTime: now.Add(-2 * time.Hour),
EndTime: now,
},
{
UserID: testUserID,
Query: lblFoo.String(),
StartTime: now.Add(-10 * time.Hour),
EndTime: now.Add(-8 * time.Hour),
StartTime: now.Add(-6 * time.Hour),
EndTime: now.Add(-5 * time.Hour),
},
{
UserID: testUserID,
Query: lblFoo.String(),
StartTime: now.Add(-6 * time.Hour),
EndTime: now.Add(-5 * time.Hour),
StartTime: now.Add(-10 * time.Hour),
EndTime: now.Add(-8 * time.Hour),
},
{
UserID: testUserID,
Query: lblFoo.String(),
StartTime: now.Add(-2 * time.Hour),
EndTime: now,
StartTime: now.Add(-13 * time.Hour),
EndTime: now.Add(-11 * time.Hour),
},
},
expectedResp: resp{
@ -486,21 +486,43 @@ func TestDeleteRequestsManager_IntervalMayHaveExpiredChunks(t *testing.T) {
type mockDeleteRequestsStore struct {
DeleteRequestsStore
deleteRequests []DeleteRequest
addedUser string
addedStartTime model.Time
addedEndTime model.Time
addedQuery string
addReqs []DeleteRequest
addErr error
removeReqs []DeleteRequest
removeErr error
getUser string
getID string
getResult []DeleteRequest
getErr error
getAllUser string
getAllResult []DeleteRequest
getAllErr error
}
func (m *mockDeleteRequestsStore) GetDeleteRequestsByStatus(_ context.Context, _ DeleteRequestStatus) ([]DeleteRequest, error) {
return m.deleteRequests, nil
}
func (m *mockDeleteRequestsStore) AddDeleteRequest(ctx context.Context, userID string, startTime, endTime model.Time, query string) error {
m.addedUser = userID
m.addedStartTime = startTime
m.addedEndTime = endTime
m.addedQuery = query
return m.addErr
func (m *mockDeleteRequestsStore) AddDeleteRequestGroup(ctx context.Context, reqs []DeleteRequest) ([]DeleteRequest, error) {
m.addReqs = reqs
return nil, m.addErr
}
func (m *mockDeleteRequestsStore) RemoveDeleteRequests(ctx context.Context, reqs []DeleteRequest) error {
m.removeReqs = reqs
return m.removeErr
}
func (m *mockDeleteRequestsStore) GetDeleteRequestGroup(ctx context.Context, userID, requestID string) ([]DeleteRequest, error) {
m.getUser = userID
m.getID = requestID
return m.getResult, m.getErr
}
func (m *mockDeleteRequestsStore) GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]DeleteRequest, error) {
m.getAllUser = userID
return m.getAllResult, m.getAllErr
}

@ -37,15 +37,15 @@ const (
DeleteRequestsTableName = "delete_requests"
)
var ErrDeleteRequestNotFound = errors.New("could not find matching delete request")
var ErrDeleteRequestNotFound = errors.New("could not find matching delete requests")
type DeleteRequestsStore interface {
AddDeleteRequest(ctx context.Context, userID string, startTime, endTime model.Time, query string) error
AddDeleteRequestGroup(ctx context.Context, req []DeleteRequest) ([]DeleteRequest, error)
GetDeleteRequestsByStatus(ctx context.Context, status DeleteRequestStatus) ([]DeleteRequest, error)
GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]DeleteRequest, error)
UpdateStatus(ctx context.Context, userID, requestID string, newStatus DeleteRequestStatus) error
GetDeleteRequest(ctx context.Context, userID, requestID string) (*DeleteRequest, error)
RemoveDeleteRequest(ctx context.Context, userID, requestID string, createdAt, startTime, endTime model.Time) error
UpdateStatus(ctx context.Context, req DeleteRequest, newStatus DeleteRequestStatus) error
GetDeleteRequestGroup(ctx context.Context, userID, requestID string) ([]DeleteRequest, error)
RemoveDeleteRequests(ctx context.Context, req []DeleteRequest) error
GetCacheGenerationNumber(ctx context.Context, userID string) (string, error)
Stop()
Name() string
@ -54,6 +54,7 @@ type DeleteRequestsStore interface {
// deleteRequestsStore provides all the methods required to manage lifecycle of delete request and things related to it.
type deleteRequestsStore struct {
indexClient index.Client
now func() model.Time
}
// NewDeleteStore creates a store for managing delete requests.
@ -63,63 +64,102 @@ func NewDeleteStore(workingDirectory string, indexStorageClient storage.Client)
return nil, err
}
return &deleteRequestsStore{indexClient: indexClient}, nil
}
func NewDeleteStoreFromIndexClient(ic index.Client) DeleteRequestsStore {
return &deleteRequestsStore{ic}
return &deleteRequestsStore{
indexClient: indexClient,
now: model.Now,
}, nil
}
func (ds *deleteRequestsStore) Stop() {
ds.indexClient.Stop()
}
// AddDeleteRequest creates entries for a new delete request.
func (ds *deleteRequestsStore) AddDeleteRequest(ctx context.Context, userID string, startTime, endTime model.Time, query string) error {
_, err := ds.addDeleteRequest(ctx, userID, model.Now(), startTime, endTime, query)
return err
}
// AddDeleteRequestGroup creates entries for new delete requests. All passed delete requests will be associated to
// each other by request id
func (ds *deleteRequestsStore) AddDeleteRequestGroup(ctx context.Context, reqs []DeleteRequest) ([]DeleteRequest, error) {
if len(reqs) == 0 {
return nil, nil
}
// addDeleteRequest is also used for tests to create delete requests with different createdAt time.
func (ds *deleteRequestsStore) addDeleteRequest(ctx context.Context, userID string, createdAt, startTime, endTime model.Time, query string) ([]byte, error) {
requestID := generateUniqueID(userID, query)
createdAt := ds.now()
writeBatch := ds.indexClient.NewWriteBatch()
requestID, err := ds.generateID(ctx, reqs[0])
if err != nil {
return nil, err
}
for {
_, err := ds.GetDeleteRequest(ctx, userID, string(requestID))
var results []DeleteRequest
for i, req := range reqs {
newReq, err := newRequest(req, requestID, createdAt, i)
if err != nil {
if err == ErrDeleteRequestNotFound {
break
}
return nil, err
}
// we have a collision here, lets recreate a new requestID and check for collision
time.Sleep(time.Millisecond)
requestID = generateUniqueID(userID, query)
results = append(results, newReq)
ds.writeDeleteRequest(newReq, writeBatch)
}
// userID, requestID
userIDAndRequestID := fmt.Sprintf("%s:%s", userID, requestID)
if err := ds.indexClient.BatchWrite(ctx, writeBatch); err != nil {
return nil, err
}
// Add an entry with userID, requestID as range key and status as value to make it easy to manage and lookup status
// We don't want to set anything in hash key here since we would want to find delete requests by just status
writeBatch := ds.indexClient.NewWriteBatch()
return results, nil
}
func newRequest(req DeleteRequest, requestID []byte, createdAt model.Time, seqNumber int) (DeleteRequest, error) {
req.RequestID = string(requestID)
req.Status = StatusReceived
req.CreatedAt = createdAt
req.SequenceNum = int64(seqNumber)
if err := req.SetQuery(req.Query); err != nil {
return DeleteRequest{}, err
}
return req, nil
}
func (ds *deleteRequestsStore) writeDeleteRequest(req DeleteRequest, writeBatch index.WriteBatch) {
userIDAndRequestID := backwardCompatibleDeleteRequestHash(req.UserID, req.RequestID, req.SequenceNum)
// Add an entry with userID, requestID, and sequence number as range key and status as value to make it easy
// to manage and lookup status. We don't want to set anything in hash key here since we would want to find
// delete requests by just status
writeBatch.Add(DeleteRequestsTableName, string(deleteRequestID), []byte(userIDAndRequestID), []byte(StatusReceived))
// Add another entry with additional details like creation time, time range of delete request and the logQL requests in value
rangeValue := fmt.Sprintf("%x:%x:%x", int64(createdAt), int64(startTime), int64(endTime))
writeBatch.Add(DeleteRequestsTableName, fmt.Sprintf("%s:%s", deleteRequestDetails, userIDAndRequestID),
[]byte(rangeValue), []byte(query))
rangeValue := fmt.Sprintf("%x:%x:%x", int64(ds.now()), int64(req.StartTime), int64(req.EndTime))
writeBatch.Add(DeleteRequestsTableName, fmt.Sprintf("%s:%s", deleteRequestDetails, userIDAndRequestID), []byte(rangeValue), []byte(req.Query))
// create a gen number for this result
writeBatch.Add(DeleteRequestsTableName, fmt.Sprintf("%s:%s", cacheGenNum, userID), []byte{}, generateCacheGenNumber())
writeBatch.Add(DeleteRequestsTableName, fmt.Sprintf("%s:%s", cacheGenNum, req.UserID), []byte{}, generateCacheGenNumber())
}
err := ds.indexClient.BatchWrite(ctx, writeBatch)
if err != nil {
return nil, err
// backwardCompatibleDeleteRequestHash generates the hash key for a delete request.
// Sequence numbers were added after deletion was in production so any requests made
// before then won't have one. Ensure backward compatibility by treating the 0th
// sequence number as the old format without any number. As a consequence, the 0th
// sequence number will also be ignored for any new delete requests.
func backwardCompatibleDeleteRequestHash(userID, requestID string, sequenceNumber int64) string {
if sequenceNumber == 0 {
return fmt.Sprintf("%s:%s", userID, requestID)
}
return fmt.Sprintf("%s:%s:%d", userID, requestID, sequenceNumber)
}
func (ds *deleteRequestsStore) generateID(ctx context.Context, req DeleteRequest) ([]byte, error) {
requestID := generateUniqueID(req.UserID, req.Query)
return requestID, nil
for {
if _, err := ds.GetDeleteRequestGroup(ctx, req.UserID, string(requestID)); err != nil {
if err == ErrDeleteRequestNotFound {
return requestID, nil
}
return nil, err
}
// we have a collision here, lets recreate a new requestID and check for collision
time.Sleep(time.Millisecond)
requestID = generateUniqueID(req.UserID, req.Query)
}
}
// GetDeleteRequestsByStatus returns all delete requests for given status.
@ -141,22 +181,22 @@ func (ds *deleteRequestsStore) GetAllDeleteRequestsForUser(ctx context.Context,
}
// UpdateStatus updates status of a delete request.
func (ds *deleteRequestsStore) UpdateStatus(ctx context.Context, userID, requestID string, newStatus DeleteRequestStatus) error {
userIDAndRequestID := fmt.Sprintf("%s:%s", userID, requestID)
func (ds *deleteRequestsStore) UpdateStatus(ctx context.Context, req DeleteRequest, newStatus DeleteRequestStatus) error {
userIDAndRequestID := backwardCompatibleDeleteRequestHash(req.UserID, req.RequestID, req.SequenceNum)
writeBatch := ds.indexClient.NewWriteBatch()
writeBatch.Add(DeleteRequestsTableName, string(deleteRequestID), []byte(userIDAndRequestID), []byte(newStatus))
if newStatus == StatusProcessed {
// remove runtime filtering for deleted data
writeBatch.Add(DeleteRequestsTableName, fmt.Sprintf("%s:%s", cacheGenNum, userID), []byte{}, generateCacheGenNumber())
writeBatch.Add(DeleteRequestsTableName, fmt.Sprintf("%s:%s", cacheGenNum, req.UserID), []byte{}, generateCacheGenNumber())
}
return ds.indexClient.BatchWrite(ctx, writeBatch)
}
// GetDeleteRequest returns delete request with given requestID.
func (ds *deleteRequestsStore) GetDeleteRequest(ctx context.Context, userID, requestID string) (*DeleteRequest, error) {
// GetDeleteRequestGroup returns delete requests with given requestID.
func (ds *deleteRequestsStore) GetDeleteRequestGroup(ctx context.Context, userID, requestID string) ([]DeleteRequest, error) {
userIDAndRequestID := fmt.Sprintf("%s:%s", userID, requestID)
deleteRequests, err := ds.queryDeleteRequests(ctx, index.Query{
@ -172,7 +212,7 @@ func (ds *deleteRequestsStore) GetDeleteRequest(ctx context.Context, userID, req
return nil, ErrDeleteRequestNotFound
}
return &deleteRequests[0], nil
return deleteRequests, nil
}
func (ds *deleteRequestsStore) GetCacheGenerationNumber(ctx context.Context, userID string) (string, error) {
@ -197,9 +237,9 @@ func (ds *deleteRequestsStore) GetCacheGenerationNumber(ctx context.Context, use
}
func (ds *deleteRequestsStore) queryDeleteRequests(ctx context.Context, deleteQuery index.Query) ([]DeleteRequest, error) {
deleteRequests := []DeleteRequest{}
// No need to lock inside the callback since we run a single index query.
var deleteRequests []DeleteRequest
err := ds.indexClient.QueryPages(ctx, []index.Query{deleteQuery}, func(query index.Query, batch index.ReadBatchResult) (shouldContinue bool) {
// No need to lock inside the callback since we run a single index query.
itr := batch.Iterator()
for itr.Next() {
userID, requestID := splitUserIDAndRequestID(string(itr.RangeValue()))
@ -216,63 +256,85 @@ func (ds *deleteRequestsStore) queryDeleteRequests(ctx context.Context, deleteQu
return nil, err
}
for i, deleteRequest := range deleteRequests {
deleteRequestQuery := []index.Query{
{
TableName: DeleteRequestsTableName,
HashValue: fmt.Sprintf("%s:%s:%s", deleteRequestDetails, deleteRequest.UserID, deleteRequest.RequestID),
},
}
var parseError error
err := ds.indexClient.QueryPages(ctx, deleteRequestQuery, func(query index.Query, batch index.ReadBatchResult) (shouldContinue bool) {
itr := batch.Iterator()
itr.Next()
return ds.deleteRequestsWithDetails(ctx, deleteRequests)
}
deleteRequest, err = parseDeleteRequestTimestamps(itr.RangeValue(), deleteRequest)
func (ds *deleteRequestsStore) deleteRequestsWithDetails(ctx context.Context, partialDeleteRequests []DeleteRequest) ([]DeleteRequest, error) {
deleteRequests := make([]DeleteRequest, 0, len(partialDeleteRequests))
for _, group := range partitionByRequestID(partialDeleteRequests) {
for i, deleteRequest := range group {
deleteRequest.SequenceNum = int64(i)
requestWithDetails, err := ds.queryDeleteRequestDetails(ctx, deleteRequest)
if err != nil {
parseError = err
return false
return nil, err
}
deleteRequests = append(deleteRequests, requestWithDetails)
}
}
return deleteRequests, nil
}
err = deleteRequest.SetQuery(string(itr.Value()))
if err != nil {
parseError = err
return false
}
deleteRequests[i] = deleteRequest
func (ds *deleteRequestsStore) queryDeleteRequestDetails(ctx context.Context, deleteRequest DeleteRequest) (DeleteRequest, error) {
userIDAndRequestID := backwardCompatibleDeleteRequestHash(deleteRequest.UserID, deleteRequest.RequestID, deleteRequest.SequenceNum)
deleteRequestQuery := []index.Query{
{
TableName: DeleteRequestsTableName,
HashValue: fmt.Sprintf("%s:%s", deleteRequestDetails, userIDAndRequestID),
},
}
return true
})
if err != nil {
return nil, err
var marshalError error
var requestWithDetails DeleteRequest
err := ds.indexClient.QueryPages(ctx, deleteRequestQuery, func(query index.Query, batch index.ReadBatchResult) (shouldContinue bool) {
if requestWithDetails, marshalError = unmarshalDeleteRequestDetails(batch.Iterator(), deleteRequest); marshalError != nil {
return false
}
if parseError != nil {
return nil, parseError
}
return true
})
if err != nil || marshalError != nil {
return DeleteRequest{}, err
}
return deleteRequests, nil
return requestWithDetails, nil
}
// RemoveDeleteRequest removes a delete request
func (ds *deleteRequestsStore) RemoveDeleteRequest(ctx context.Context, userID, requestID string, createdAt, startTime, endTime model.Time) error {
userIDAndRequestID := fmt.Sprintf("%s:%s", userID, requestID)
func unmarshalDeleteRequestDetails(itr index.ReadBatchIterator, req DeleteRequest) (DeleteRequest, error) {
itr.Next()
requestWithDetails, err := parseDeleteRequestTimestamps(itr.RangeValue(), req)
if err != nil {
return DeleteRequest{}, nil
}
if err = requestWithDetails.SetQuery(string(itr.Value())); err != nil {
return DeleteRequest{}, err
}
return requestWithDetails, nil
}
// RemoveDeleteRequests the passed delete requests
func (ds *deleteRequestsStore) RemoveDeleteRequests(ctx context.Context, reqs []DeleteRequest) error {
writeBatch := ds.indexClient.NewWriteBatch()
for _, r := range reqs {
ds.removeRequest(r, writeBatch)
}
return ds.indexClient.BatchWrite(ctx, writeBatch)
}
func (ds *deleteRequestsStore) removeRequest(req DeleteRequest, writeBatch index.WriteBatch) {
userIDAndRequestID := backwardCompatibleDeleteRequestHash(req.UserID, req.RequestID, req.SequenceNum)
writeBatch.Delete(DeleteRequestsTableName, string(deleteRequestID), []byte(userIDAndRequestID))
// Add another entry with additional details like creation time, time range of delete request and selectors in value
rangeValue := fmt.Sprintf("%x:%x:%x", int64(createdAt), int64(startTime), int64(endTime))
writeBatch.Delete(DeleteRequestsTableName, fmt.Sprintf("%s:%s", deleteRequestDetails, userIDAndRequestID),
[]byte(rangeValue))
rangeValue := fmt.Sprintf("%x:%x:%x", int64(req.CreatedAt), int64(req.StartTime), int64(req.EndTime))
writeBatch.Delete(DeleteRequestsTableName, fmt.Sprintf("%s:%s", deleteRequestDetails, userIDAndRequestID), []byte(rangeValue))
// ensure caches are invalidated
writeBatch.Add(DeleteRequestsTableName, fmt.Sprintf("%s:%s", cacheGenNum, userID),
[]byte{}, []byte(strconv.FormatInt(time.Now().UnixNano(), 10)))
return ds.indexClient.BatchWrite(ctx, writeBatch)
writeBatch.Add(DeleteRequestsTableName, fmt.Sprintf("%s:%s", cacheGenNum, req.UserID), []byte{}, []byte(strconv.FormatInt(time.Now().UnixNano(), 10)))
}
func (ds *deleteRequestsStore) Name() string {
@ -329,12 +391,8 @@ func encodeUniqueID(t uint32) []byte {
}
func splitUserIDAndRequestID(rangeValue string) (userID, requestID string) {
lastIndex := strings.LastIndex(rangeValue, ":")
userID = rangeValue[:lastIndex]
requestID = rangeValue[lastIndex+1:]
return
parts := strings.Split(rangeValue, ":")
return parts[0], parts[1]
}
// unsafeGetString is like yolostring but with a meaningful name

@ -16,169 +16,184 @@ import (
)
func TestDeleteRequestsStore(t *testing.T) {
now := model.Now()
user1 := "user1"
user2 := "user2"
// build some test requests to add to the store
var user1ExpectedRequests []DeleteRequest
var user2ExpectedRequests []DeleteRequest
for i := time.Duration(1); i <= 24; i++ {
user1ExpectedRequests = append(user1ExpectedRequests, DeleteRequest{
UserID: user1,
StartTime: now.Add(-i * time.Hour),
EndTime: now.Add(-i * time.Hour).Add(30 * time.Minute),
CreatedAt: now.Add(-i * time.Hour).Add(30 * time.Minute),
Query: fmt.Sprintf(`{foo="%d", user="%s"}`, i, user1),
Status: StatusReceived,
})
user2ExpectedRequests = append(user2ExpectedRequests, DeleteRequest{
UserID: user2,
StartTime: now.Add(-i * time.Hour),
EndTime: now.Add(-(i + 1) * time.Hour),
CreatedAt: now.Add(-(i + 1) * time.Hour),
Query: fmt.Sprintf(`{foo="%d", user="%s"}`, i, user2),
Status: StatusReceived,
})
}
// build the store
tempDir := t.TempDir()
workingDir := filepath.Join(tempDir, "working-dir")
objectStorePath := filepath.Join(tempDir, "object-store")
objectClient, err := local.NewFSObjectClient(local.FSConfig{
Directory: objectStorePath,
})
require.NoError(t, err)
testDeleteRequestsStore, err := NewDeleteStore(workingDir, storage.NewIndexStorageClient(objectClient, ""))
require.NoError(t, err)
defer testDeleteRequestsStore.Stop()
tc := setup(t)
defer tc.store.Stop()
// add requests for both the users to the store
for i := 0; i < len(user1ExpectedRequests); i++ {
requestID, err := testDeleteRequestsStore.(*deleteRequestsStore).addDeleteRequest(
for i := 0; i < len(tc.user1Requests); i++ {
resp, err := tc.store.AddDeleteRequestGroup(
context.Background(),
user1ExpectedRequests[i].UserID,
user1ExpectedRequests[i].CreatedAt,
user1ExpectedRequests[i].StartTime,
user1ExpectedRequests[i].EndTime,
user1ExpectedRequests[i].Query,
[]DeleteRequest{tc.user1Requests[i]},
)
require.NoError(t, err)
user1ExpectedRequests[i].RequestID = string(requestID)
require.NoError(t, user1ExpectedRequests[i].SetQuery(user1ExpectedRequests[i].Query))
tc.user1Requests[i] = resp[0]
requestID, err = testDeleteRequestsStore.(*deleteRequestsStore).addDeleteRequest(
resp, err = tc.store.AddDeleteRequestGroup(
context.Background(),
user2ExpectedRequests[i].UserID,
user2ExpectedRequests[i].CreatedAt,
user2ExpectedRequests[i].StartTime,
user2ExpectedRequests[i].EndTime,
user2ExpectedRequests[i].Query,
[]DeleteRequest{tc.user2Requests[i]},
)
require.NoError(t, err)
user2ExpectedRequests[i].RequestID = string(requestID)
require.NoError(t, user2ExpectedRequests[i].SetQuery(user2ExpectedRequests[i].Query))
tc.user2Requests[i] = resp[0]
}
// get all requests with StatusReceived and see if they have expected values
deleteRequests, err := testDeleteRequestsStore.GetDeleteRequestsByStatus(context.Background(), StatusReceived)
deleteRequests, err := tc.store.GetDeleteRequestsByStatus(context.Background(), StatusReceived)
require.NoError(t, err)
compareRequests(t, append(user1ExpectedRequests, user2ExpectedRequests...), deleteRequests)
compareRequests(t, append(tc.user1Requests, tc.user2Requests...), deleteRequests)
// get user specific requests and see if they have expected values
user1Requests, err := testDeleteRequestsStore.GetAllDeleteRequestsForUser(context.Background(), user1)
user1Requests, err := tc.store.GetAllDeleteRequestsForUser(context.Background(), user1)
require.NoError(t, err)
compareRequests(t, user1ExpectedRequests, user1Requests)
compareRequests(t, tc.user1Requests, user1Requests)
user2Requests, err := testDeleteRequestsStore.GetAllDeleteRequestsForUser(context.Background(), user2)
user2Requests, err := tc.store.GetAllDeleteRequestsForUser(context.Background(), user2)
require.NoError(t, err)
compareRequests(t, user2ExpectedRequests, user2Requests)
compareRequests(t, tc.user2Requests, user2Requests)
createGenNumber, err := testDeleteRequestsStore.GetCacheGenerationNumber(context.Background(), user1)
createGenNumber, err := tc.store.GetCacheGenerationNumber(context.Background(), user1)
require.NoError(t, err)
require.NotEmpty(t, createGenNumber)
createGenNumber2, err := testDeleteRequestsStore.GetCacheGenerationNumber(context.Background(), user2)
createGenNumber2, err := tc.store.GetCacheGenerationNumber(context.Background(), user2)
require.NoError(t, err)
require.NotEmpty(t, createGenNumber2)
// get individual delete requests by id and see if they have expected values
for _, expectedRequest := range append(user1Requests, user2Requests...) {
actualRequest, err := testDeleteRequestsStore.GetDeleteRequest(context.Background(), expectedRequest.UserID, expectedRequest.RequestID)
actualRequest, err := tc.store.GetDeleteRequestGroup(context.Background(), expectedRequest.UserID, expectedRequest.RequestID)
require.NoError(t, err)
require.Equal(t, expectedRequest, *actualRequest)
require.Len(t, actualRequest, 1)
require.Equal(t, expectedRequest, actualRequest[0])
}
// try a non-existent request and see if it throws ErrDeleteRequestNotFound
_, err = testDeleteRequestsStore.GetDeleteRequest(context.Background(), "user3", "na")
_, err = tc.store.GetDeleteRequestGroup(context.Background(), "user3", "na")
require.ErrorIs(t, err, ErrDeleteRequestNotFound)
// update some of the delete requests for both the users to processed
for i := 0; i < len(user1ExpectedRequests); i++ {
for i := 0; i < len(tc.user1Requests); i++ {
var request DeleteRequest
if i%2 != 0 {
user1ExpectedRequests[i].Status = StatusProcessed
request = user1ExpectedRequests[i]
tc.user1Requests[i].Status = StatusProcessed
request = tc.user1Requests[i]
} else {
user2ExpectedRequests[i].Status = StatusProcessed
request = user2ExpectedRequests[i]
tc.user2Requests[i].Status = StatusProcessed
request = tc.user2Requests[i]
}
require.NoError(t, testDeleteRequestsStore.UpdateStatus(context.Background(), request.UserID, request.RequestID, StatusProcessed))
require.NoError(t, tc.store.UpdateStatus(context.Background(), request, StatusProcessed))
}
// see if requests in the store have right values
user1Requests, err = testDeleteRequestsStore.GetAllDeleteRequestsForUser(context.Background(), user1)
user1Requests, err = tc.store.GetAllDeleteRequestsForUser(context.Background(), user1)
require.NoError(t, err)
compareRequests(t, user1ExpectedRequests, user1Requests)
compareRequests(t, tc.user1Requests, user1Requests)
user2Requests, err = testDeleteRequestsStore.GetAllDeleteRequestsForUser(context.Background(), user2)
user2Requests, err = tc.store.GetAllDeleteRequestsForUser(context.Background(), user2)
require.NoError(t, err)
compareRequests(t, user2ExpectedRequests, user2Requests)
compareRequests(t, tc.user2Requests, user2Requests)
updateGenNumber, err := testDeleteRequestsStore.GetCacheGenerationNumber(context.Background(), user1)
updateGenNumber, err := tc.store.GetCacheGenerationNumber(context.Background(), user1)
require.NoError(t, err)
require.NotEqual(t, createGenNumber, updateGenNumber)
updateGenNumber2, err := testDeleteRequestsStore.GetCacheGenerationNumber(context.Background(), user2)
updateGenNumber2, err := tc.store.GetCacheGenerationNumber(context.Background(), user2)
require.NoError(t, err)
require.NotEqual(t, createGenNumber2, updateGenNumber2)
// delete the requests from the store updated previously
var remainingRequests []DeleteRequest
for i := 0; i < len(user1ExpectedRequests); i++ {
for i := 0; i < len(tc.user1Requests); i++ {
var request DeleteRequest
if i%2 != 0 {
user1ExpectedRequests[i].Status = StatusProcessed
request = user1ExpectedRequests[i]
remainingRequests = append(remainingRequests, user2ExpectedRequests[i])
tc.user1Requests[i].Status = StatusProcessed
request = tc.user1Requests[i]
remainingRequests = append(remainingRequests, tc.user2Requests[i])
} else {
user2ExpectedRequests[i].Status = StatusProcessed
request = user2ExpectedRequests[i]
remainingRequests = append(remainingRequests, user1ExpectedRequests[i])
tc.user2Requests[i].Status = StatusProcessed
request = tc.user2Requests[i]
remainingRequests = append(remainingRequests, tc.user1Requests[i])
}
require.NoError(t, testDeleteRequestsStore.RemoveDeleteRequest(context.Background(), request.UserID, request.RequestID, request.CreatedAt, request.StartTime, request.EndTime))
require.NoError(t, tc.store.RemoveDeleteRequests(context.Background(), []DeleteRequest{request}))
}
// see if the store has the right remaining requests
deleteRequests, err = testDeleteRequestsStore.GetDeleteRequestsByStatus(context.Background(), StatusReceived)
deleteRequests, err = tc.store.GetDeleteRequestsByStatus(context.Background(), StatusReceived)
require.NoError(t, err)
compareRequests(t, remainingRequests, deleteRequests)
deleteGenNumber, err := testDeleteRequestsStore.GetCacheGenerationNumber(context.Background(), user1)
deleteGenNumber, err := tc.store.GetCacheGenerationNumber(context.Background(), user1)
require.NoError(t, err)
require.NotEqual(t, updateGenNumber, deleteGenNumber)
deleteGenNumber2, err := testDeleteRequestsStore.GetCacheGenerationNumber(context.Background(), user2)
deleteGenNumber2, err := tc.store.GetCacheGenerationNumber(context.Background(), user2)
require.NoError(t, err)
require.NotEqual(t, updateGenNumber2, deleteGenNumber2)
}
func TestBatchCreateGet(t *testing.T) {
t.Run("it adds the requests with different sequence numbers but the same request id, status, and creation time", func(t *testing.T) {
tc := setup(t)
defer tc.store.Stop()
requests, err := tc.store.AddDeleteRequestGroup(context.Background(), tc.user1Requests)
require.NoError(t, err)
for i, req := range requests {
require.Equal(t, req.RequestID, requests[0].RequestID)
require.Equal(t, req.Status, requests[0].Status)
require.Equal(t, req.CreatedAt, requests[0].CreatedAt)
require.Equal(t, req.SequenceNum, int64(i))
}
})
t.Run("returns all the requests that share a request id", func(t *testing.T) {
tc := setup(t)
defer tc.store.Stop()
savedRequests, err := tc.store.AddDeleteRequestGroup(context.Background(), tc.user1Requests)
require.NoError(t, err)
results, err := tc.store.GetDeleteRequestGroup(context.Background(), savedRequests[0].UserID, savedRequests[0].RequestID)
require.NoError(t, err)
require.Equal(t, savedRequests, results)
})
t.Run("updates a single request with a new status", func(t *testing.T) {
tc := setup(t)
defer tc.store.Stop()
savedRequests, err := tc.store.AddDeleteRequestGroup(context.Background(), tc.user1Requests)
require.NoError(t, err)
err = tc.store.UpdateStatus(context.Background(), savedRequests[1], StatusProcessed)
require.NoError(t, err)
results, err := tc.store.GetDeleteRequestGroup(context.Background(), savedRequests[0].UserID, savedRequests[0].RequestID)
require.NoError(t, err)
require.Equal(t, StatusProcessed, results[1].Status)
})
t.Run("deletes several delete requests", func(t *testing.T) {
tc := setup(t)
defer tc.store.Stop()
savedRequests, err := tc.store.AddDeleteRequestGroup(context.Background(), tc.user1Requests)
require.NoError(t, err)
err = tc.store.RemoveDeleteRequests(context.Background(), savedRequests)
require.NoError(t, err)
results, err := tc.store.GetDeleteRequestGroup(context.Background(), savedRequests[0].UserID, savedRequests[0].RequestID)
require.ErrorIs(t, err, ErrDeleteRequestNotFound)
require.Empty(t, results)
})
}
func compareRequests(t *testing.T, expected []DeleteRequest, actual []DeleteRequest) {
require.Len(t, actual, len(expected))
sort.Slice(expected, func(i, j int) bool {
@ -191,3 +206,60 @@ func compareRequests(t *testing.T, expected []DeleteRequest, actual []DeleteRequ
require.Equal(t, expected[i], deleteRequest)
}
}
type testContext struct {
user1Requests []DeleteRequest
user2Requests []DeleteRequest
store *deleteRequestsStore
}
func setup(t *testing.T) *testContext {
t.Helper()
tc := &testContext{}
// build some test requests to add to the store
for i := time.Duration(1); i <= 24; i++ {
tc.user1Requests = append(tc.user1Requests, DeleteRequest{
UserID: user1,
StartTime: now.Add(-i * time.Hour),
EndTime: now.Add(-i * time.Hour).Add(30 * time.Minute),
CreatedAt: model.Time(38),
Query: fmt.Sprintf(`{foo="%d", user="%s"}`, i, user1),
Status: StatusReceived,
})
tc.user2Requests = append(tc.user2Requests, DeleteRequest{
UserID: user2,
StartTime: now.Add(-i * time.Hour),
EndTime: now.Add(-(i + 1) * time.Hour),
CreatedAt: model.Time(38),
Query: fmt.Sprintf(`{foo="%d", user="%s"}`, i, user2),
Status: StatusReceived,
})
}
// build the store
tempDir := t.TempDir()
//tempDir := os.TempDir()
fmt.Println(tempDir)
workingDir := filepath.Join(tempDir, "working-dir")
objectStorePath := filepath.Join(tempDir, "object-store")
objectClient, err := local.NewFSObjectClient(local.FSConfig{
Directory: objectStorePath,
})
require.NoError(t, err)
ds, err := NewDeleteStore(workingDir, storage.NewIndexStorageClient(objectClient, ""))
require.NoError(t, err)
store := ds.(*deleteRequestsStore)
store.now = func() model.Time { return model.Time(38) }
tc.store = store
return tc
}
var (
now = model.Now()
user1 = "user1"
user2 = "user2"
)

@ -2,8 +2,6 @@ package deletion
import (
"context"
"github.com/prometheus/common/model"
)
func NewNoOpDeleteRequestsStore() DeleteRequestsStore {
@ -12,8 +10,8 @@ func NewNoOpDeleteRequestsStore() DeleteRequestsStore {
type noOpDeleteRequestsStore struct{}
func (d *noOpDeleteRequestsStore) AddDeleteRequest(ctx context.Context, userID string, startTime, endTime model.Time, query string) error {
return nil
func (d *noOpDeleteRequestsStore) AddDeleteRequestGroup(ctx context.Context, req []DeleteRequest) ([]DeleteRequest, error) {
return nil, nil
}
func (d *noOpDeleteRequestsStore) GetDeleteRequestsByStatus(ctx context.Context, status DeleteRequestStatus) ([]DeleteRequest, error) {
@ -24,15 +22,15 @@ func (d *noOpDeleteRequestsStore) GetAllDeleteRequestsForUser(ctx context.Contex
return nil, nil
}
func (d *noOpDeleteRequestsStore) UpdateStatus(ctx context.Context, userID, requestID string, newStatus DeleteRequestStatus) error {
func (d *noOpDeleteRequestsStore) UpdateStatus(ctx context.Context, req DeleteRequest, newStatus DeleteRequestStatus) error {
return nil
}
func (d *noOpDeleteRequestsStore) GetDeleteRequest(ctx context.Context, userID, requestID string) (*DeleteRequest, error) {
func (d *noOpDeleteRequestsStore) GetDeleteRequestGroup(ctx context.Context, userID, requestID string) ([]DeleteRequest, error) {
return nil, nil
}
func (d *noOpDeleteRequestsStore) RemoveDeleteRequest(ctx context.Context, userID, requestID string, createdAt, startTime, endTime model.Time) error {
func (d *noOpDeleteRequestsStore) RemoveDeleteRequests(ctx context.Context, reqs []DeleteRequest) error {
return nil
}

@ -4,8 +4,10 @@ import (
"encoding/json"
"errors"
"fmt"
"math"
"net/http"
"net/url"
"sort"
"time"
"github.com/grafana/loki/pkg/util"
@ -21,17 +23,17 @@ import (
// DeleteRequestHandler provides handlers for delete requests
type DeleteRequestHandler struct {
deleteRequestsStore DeleteRequestsStore
metrics *deleteRequestHandlerMetrics
deleteRequestCancelPeriod time.Duration
deleteRequestsStore DeleteRequestsStore
metrics *deleteRequestHandlerMetrics
maxInterval time.Duration
}
// NewDeleteRequestHandler creates a DeleteRequestHandler
func NewDeleteRequestHandler(deleteStore DeleteRequestsStore, deleteRequestCancelPeriod time.Duration, registerer prometheus.Registerer) *DeleteRequestHandler {
func NewDeleteRequestHandler(deleteStore DeleteRequestsStore, maxInterval time.Duration, registerer prometheus.Registerer) *DeleteRequestHandler {
deleteMgr := DeleteRequestHandler{
deleteRequestsStore: deleteStore,
deleteRequestCancelPeriod: deleteRequestCancelPeriod,
metrics: newDeleteRequestHandlerMetrics(registerer),
deleteRequestsStore: deleteStore,
maxInterval: maxInterval,
metrics: newDeleteRequestHandlerMetrics(registerer),
}
return &deleteMgr
@ -65,7 +67,14 @@ func (dm *DeleteRequestHandler) AddDeleteRequestHandler(w http.ResponseWriter, r
return
}
if err := dm.deleteRequestsStore.AddDeleteRequest(ctx, userID, model.Time(startTime), model.Time(endTime), query); err != nil {
interval, err := dm.interval(params, startTime, endTime)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
deleteRequests := shardDeleteRequestsByInterval(startTime, endTime, query, userID, interval)
if _, err := dm.deleteRequestsStore.AddDeleteRequestGroup(ctx, deleteRequests); err != nil {
level.Error(util_log.Logger).Log("msg", "error adding delete request to the store", "err", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
@ -81,6 +90,58 @@ func (dm *DeleteRequestHandler) AddDeleteRequestHandler(w http.ResponseWriter, r
w.WriteHeader(http.StatusNoContent)
}
func shardDeleteRequestsByInterval(startTime, endTime model.Time, query, userID string, interval time.Duration) []DeleteRequest {
deleteRequests := make([]DeleteRequest, 0, endTime.Sub(startTime)/interval)
for start := startTime; start.Before(endTime); start = start.Add(interval) + 1 {
end := start.Add(interval)
if end.After(endTime) {
end = endTime
}
deleteRequests = append(deleteRequests,
DeleteRequest{
StartTime: start,
EndTime: end,
Query: query,
UserID: userID,
})
}
return deleteRequests
}
func (dm *DeleteRequestHandler) interval(params url.Values, startTime, endTime model.Time) (time.Duration, error) {
qr := params.Get("max_interval")
if qr == "" {
if dm.maxInterval == 0 {
return endTime.Sub(startTime), nil
}
return min(endTime.Sub(startTime), dm.maxInterval), nil
}
interval, err := time.ParseDuration(qr)
if err != nil || interval < time.Second {
return 0, errors.New("invalid max_interval: valid time units are 's', 'm', 'h'")
}
if interval > dm.maxInterval && dm.maxInterval != 0 {
return 0, fmt.Errorf("max_interval can't be greater than %s", dm.maxInterval.String())
}
if interval > endTime.Sub(startTime) {
return 0, fmt.Errorf("max_interval can't be greater than the interval to be deleted (%s)", endTime.Sub(startTime))
}
return interval, nil
}
func min(a, b time.Duration) time.Duration {
if a < b {
return a
}
return b
}
// GetAllDeleteRequestsHandler handles get all delete requests
func (dm *DeleteRequestHandler) GetAllDeleteRequestsHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
@ -90,19 +151,77 @@ func (dm *DeleteRequestHandler) GetAllDeleteRequestsHandler(w http.ResponseWrite
return
}
deleteRequests, err := dm.deleteRequestsStore.GetAllDeleteRequestsForUser(ctx, userID)
deleteGroups, err := dm.deleteRequestsStore.GetAllDeleteRequestsForUser(ctx, userID)
if err != nil {
level.Error(util_log.Logger).Log("msg", "error getting delete requests from the store", "err", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
deletesPerRequest := partitionByRequestID(deleteGroups)
deleteRequests := mergeDeletes(deletesPerRequest)
sort.Slice(deleteRequests, func(i, j int) bool {
return deleteRequests[i].CreatedAt < deleteRequests[j].CreatedAt
})
if err := json.NewEncoder(w).Encode(deleteRequests); err != nil {
level.Error(util_log.Logger).Log("msg", "error marshalling response", "err", err)
http.Error(w, fmt.Sprintf("Error marshalling response: %v", err), http.StatusInternalServerError)
}
}
func mergeDeletes(groups map[string][]DeleteRequest) []DeleteRequest {
mergedRequests := []DeleteRequest{} // Declare this way so the return value is [] rather than null
for _, deletes := range groups {
startTime, endTime, status := mergeData(deletes)
newDelete := deletes[0]
newDelete.StartTime = startTime
newDelete.EndTime = endTime
newDelete.Status = status
mergedRequests = append(mergedRequests, newDelete)
}
return mergedRequests
}
func mergeData(deletes []DeleteRequest) (model.Time, model.Time, DeleteRequestStatus) {
var (
startTime = model.Time(math.MaxInt64)
endTime = model.Time(0)
numProcessed = 0
)
for _, del := range deletes {
if del.StartTime < startTime {
startTime = del.StartTime
}
if del.EndTime > endTime {
endTime = del.EndTime
}
if del.Status == StatusProcessed {
numProcessed++
}
}
return startTime, endTime, deleteRequestStatus(numProcessed, len(deletes))
}
func deleteRequestStatus(processed, total int) DeleteRequestStatus {
if processed == 0 {
return StatusReceived
}
if processed == total {
return StatusProcessed
}
percentCompleted := float64(processed) / float64(total)
return DeleteRequestStatus(fmt.Sprintf("%d%% Complete", int(percentCompleted*100)))
}
// CancelDeleteRequestHandler handles delete request cancellation
func (dm *DeleteRequestHandler) CancelDeleteRequestHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
@ -114,25 +233,30 @@ func (dm *DeleteRequestHandler) CancelDeleteRequestHandler(w http.ResponseWriter
params := r.URL.Query()
requestID := params.Get("request_id")
deleteRequest, err := dm.deleteRequestsStore.GetDeleteRequest(ctx, userID, requestID)
deleteRequests, err := dm.deleteRequestsStore.GetDeleteRequestGroup(ctx, userID, requestID)
if err != nil {
if errors.Is(err, ErrDeleteRequestNotFound) {
http.Error(w, "could not find delete request with given id", http.StatusNotFound)
return
}
level.Error(util_log.Logger).Log("msg", "error getting delete request from the store", "err", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if deleteRequest == nil {
http.Error(w, "could not find delete request with given id", http.StatusBadRequest)
toDelete := filterProcessed(deleteRequests)
if len(toDelete) == 0 {
http.Error(w, "deletion of request which is in process or already processed is not allowed", http.StatusBadRequest)
return
}
if deleteRequest.Status != StatusReceived {
http.Error(w, "deletion of request which is in process or already processed is not allowed", http.StatusBadRequest)
if len(toDelete) != len(deleteRequests) && params.Get("force") != "true" {
http.Error(w, "Unable to cancel partially completed delete request. To force, use the ?force query parameter", http.StatusBadRequest)
return
}
if err := dm.deleteRequestsStore.RemoveDeleteRequest(ctx, userID, requestID, deleteRequest.CreatedAt, deleteRequest.StartTime, deleteRequest.EndTime); err != nil {
if err := dm.deleteRequestsStore.RemoveDeleteRequests(ctx, toDelete); err != nil {
level.Error(util_log.Logger).Log("msg", "error cancelling the delete request", "err", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
@ -141,6 +265,16 @@ func (dm *DeleteRequestHandler) CancelDeleteRequestHandler(w http.ResponseWriter
w.WriteHeader(http.StatusNoContent)
}
func filterProcessed(reqs []DeleteRequest) []DeleteRequest {
var unprocessed []DeleteRequest
for _, r := range reqs {
if r.Status == StatusReceived {
unprocessed = append(unprocessed, r)
}
}
return unprocessed
}
// GetCacheGenerationNumberHandler handles requests for a user's cache generation number
func (dm *DeleteRequestHandler) GetCacheGenerationNumberHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
@ -176,7 +310,7 @@ func query(params url.Values) (string, error) {
return query, nil
}
func startTime(params url.Values) (int64, error) {
func startTime(params url.Values) (model.Time, error) {
startParam := params.Get("start")
if startParam == "" {
return 0, errors.New("start time not set")
@ -187,10 +321,10 @@ func startTime(params url.Values) (int64, error) {
return 0, errors.New("invalid start time: require unix seconds or RFC3339 format")
}
return st, nil
return model.Time(st), nil
}
func endTime(params url.Values, startTime int64) (int64, error) {
func endTime(params url.Values, startTime model.Time) (model.Time, error) {
endParam := params.Get("end")
endTime, err := parseTime(endParam)
if err != nil {
@ -201,11 +335,11 @@ func endTime(params url.Values, startTime int64) (int64, error) {
return 0, errors.New("deletes in the future are not allowed")
}
if startTime > endTime {
if int64(startTime) > endTime {
return 0, errors.New("start time can't be greater than end time")
}
return endTime, nil
return model.Time(endTime), nil
}
func parseTime(in string) (int64, error) {

@ -2,6 +2,7 @@ package deletion
import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
@ -22,7 +23,7 @@ import (
func TestAddDeleteRequestHandler(t *testing.T) {
t.Run("it adds the delete request to the store", func(t *testing.T) {
store := &mockDeleteRequestsStore{}
h := NewDeleteRequestHandler(store, time.Second, nil)
h := NewDeleteRequestHandler(store, 0, nil)
req := buildRequest("org-id", `{foo="bar"}`, "0000000000", "0000000001")
@ -31,15 +32,72 @@ func TestAddDeleteRequestHandler(t *testing.T) {
require.Equal(t, w.Code, http.StatusNoContent)
require.Equal(t, "org-id", store.addedUser)
require.Equal(t, `{foo="bar"}`, store.addedQuery)
require.Equal(t, toTime("0000000000"), store.addedStartTime)
require.Equal(t, toTime("0000000001"), store.addedEndTime)
require.Equal(t, "org-id", store.addReqs[0].UserID)
require.Equal(t, `{foo="bar"}`, store.addReqs[0].Query)
require.Equal(t, toTime("0000000000"), store.addReqs[0].StartTime)
require.Equal(t, toTime("0000000001"), store.addReqs[0].EndTime)
})
t.Run("it shards deletes based on a query param", func(t *testing.T) {
store := &mockDeleteRequestsStore{}
h := NewDeleteRequestHandler(store, 0, nil)
from := model.TimeFromUnix(model.Now().Add(-3 * time.Hour).Unix())
to := model.TimeFromUnix(from.Add(3 * time.Hour).Unix())
req := buildRequest("org-id", `{foo="bar"}`, unixString(from), unixString(to))
params := req.URL.Query()
params.Set("max_interval", "1h")
req.URL.RawQuery = params.Encode()
w := httptest.NewRecorder()
h.AddDeleteRequestHandler(w, req)
require.Equal(t, w.Code, http.StatusNoContent)
require.Len(t, store.addReqs, 3)
for i, req := range store.addReqs {
startTime := from.Add(time.Duration(i)*time.Hour) + model.Time(i)
endTime := from.Add(time.Duration(i+1)*time.Hour) + model.Time(i)
if endTime.After(to) {
endTime = to
}
require.Equal(t, startTime, req.StartTime)
require.Equal(t, endTime, req.EndTime)
}
})
t.Run("it uses the default for sharding when the query param isn't present", func(t *testing.T) {
store := &mockDeleteRequestsStore{}
h := NewDeleteRequestHandler(store, time.Hour, nil)
from := model.TimeFromUnix(model.Now().Add(-3 * time.Hour).Unix())
to := model.TimeFromUnix(from.Add(3 * time.Hour).Unix())
req := buildRequest("org-id", `{foo="bar"}`, unixString(from), unixString(to))
w := httptest.NewRecorder()
h.AddDeleteRequestHandler(w, req)
require.Equal(t, w.Code, http.StatusNoContent)
require.Len(t, store.addReqs, 3)
for i, req := range store.addReqs {
startTime := from.Add(time.Duration(i)*time.Hour) + model.Time(i)
endTime := from.Add(time.Duration(i+1)*time.Hour) + model.Time(i)
if endTime.After(to) {
endTime = to
}
require.Equal(t, startTime, req.StartTime)
require.Equal(t, endTime, req.EndTime)
}
})
t.Run("it works with RFC3339", func(t *testing.T) {
store := &mockDeleteRequestsStore{}
h := NewDeleteRequestHandler(store, time.Second, nil)
h := NewDeleteRequestHandler(store, 0, nil)
req := buildRequest("org-id", `{foo="bar"}`, "2006-01-02T15:04:05Z", "2006-01-03T15:04:05Z")
@ -48,15 +106,15 @@ func TestAddDeleteRequestHandler(t *testing.T) {
require.Equal(t, w.Code, http.StatusNoContent)
require.Equal(t, "org-id", store.addedUser)
require.Equal(t, `{foo="bar"}`, store.addedQuery)
require.Equal(t, toTime("1136214245"), store.addedStartTime)
require.Equal(t, toTime("1136300645"), store.addedEndTime)
require.Equal(t, "org-id", store.addReqs[0].UserID)
require.Equal(t, `{foo="bar"}`, store.addReqs[0].Query)
require.Equal(t, toTime("1136214245"), store.addReqs[0].StartTime)
require.Equal(t, toTime("1136300645"), store.addReqs[0].EndTime)
})
t.Run("it fills in end time if blank", func(t *testing.T) {
store := &mockDeleteRequestsStore{}
h := NewDeleteRequestHandler(store, time.Second, nil)
h := NewDeleteRequestHandler(store, 0, nil)
req := buildRequest("org-id", `{foo="bar"}`, "0000000000", "")
@ -65,15 +123,15 @@ func TestAddDeleteRequestHandler(t *testing.T) {
require.Equal(t, w.Code, http.StatusNoContent)
require.Equal(t, "org-id", store.addedUser)
require.Equal(t, `{foo="bar"}`, store.addedQuery)
require.Equal(t, toTime("0000000000"), store.addedStartTime)
require.InDelta(t, int64(model.Now()), int64(store.addedEndTime), 1000)
require.Equal(t, "org-id", store.addReqs[0].UserID)
require.Equal(t, `{foo="bar"}`, store.addReqs[0].Query)
require.Equal(t, toTime("0000000000"), store.addReqs[0].StartTime)
require.InDelta(t, int64(model.Now()), int64(store.addReqs[0].EndTime), 1000)
})
t.Run("it returns 500 when the delete store errors", func(t *testing.T) {
store := &mockDeleteRequestsStore{addErr: errors.New("something bad")}
h := NewDeleteRequestHandler(store, time.Second, nil)
h := NewDeleteRequestHandler(store, 0, nil)
req := buildRequest("org-id", `{foo="bar"}`, "0000000000", "0000000001")
@ -83,23 +141,31 @@ func TestAddDeleteRequestHandler(t *testing.T) {
})
t.Run("Validation", func(t *testing.T) {
h := NewDeleteRequestHandler(&mockDeleteRequestsStore{}, time.Second, nil)
h := NewDeleteRequestHandler(&mockDeleteRequestsStore{}, time.Minute, nil)
for _, tc := range []struct {
orgID, query, startTime, endTime, error string
orgID, query, startTime, endTime, interval, error string
}{
{"", `{foo="bar"}`, "0000000000", "0000000001", "no org id\n"},
{"org-id", "", "0000000000", "0000000001", "query not set\n"},
{"org-id", `not a query`, "0000000000", "0000000001", "invalid query expression\n"},
{"org-id", `{foo="bar"}`, "", "0000000001", "start time not set\n"},
{"org-id", `{foo="bar"}`, "0000000000000", "0000000001", "invalid start time: require unix seconds or RFC3339 format\n"},
{"org-id", `{foo="bar"}`, "0000000000", "0000000000001", "invalid end time: require unix seconds or RFC3339 format\n"},
{"org-id", `{foo="bar"}`, "0000000000", fmt.Sprint(time.Now().Add(time.Hour).Unix())[:10], "deletes in the future are not allowed\n"},
{"org-id", `{foo="bar"}`, "0000000001", "0000000000", "start time can't be greater than end time\n"},
{"", `{foo="bar"}`, "0000000000", "0000000001", "", "no org id\n"},
{"org-id", "", "0000000000", "0000000001", "", "query not set\n"},
{"org-id", `not a query`, "0000000000", "0000000001", "", "invalid query expression\n"},
{"org-id", `{foo="bar"}`, "", "0000000001", "", "start time not set\n"},
{"org-id", `{foo="bar"}`, "0000000000000", "0000000001", "", "invalid start time: require unix seconds or RFC3339 format\n"},
{"org-id", `{foo="bar"}`, "0000000000", "0000000000001", "", "invalid end time: require unix seconds or RFC3339 format\n"},
{"org-id", `{foo="bar"}`, "0000000000", fmt.Sprint(time.Now().Add(time.Hour).Unix())[:10], "", "deletes in the future are not allowed\n"},
{"org-id", `{foo="bar"}`, "0000000001", "0000000000", "", "start time can't be greater than end time\n"},
{"org-id", `{foo="bar"}`, "0000000000", "0000000001", "not-a-duration", "invalid max_interval: valid time units are 's', 'm', 'h'\n"},
{"org-id", `{foo="bar"}`, "0000000000", "0000000001", "1ms", "invalid max_interval: valid time units are 's', 'm', 'h'\n"},
{"org-id", `{foo="bar"}`, "0000000000", "0000000001", "1h", "max_interval can't be greater than 1m0s\n"},
{"org-id", `{foo="bar"}`, "0000000000", "0000000001", "30s", "max_interval can't be greater than the interval to be deleted (1s)\n"},
} {
t.Run(strings.TrimSpace(tc.error), func(t *testing.T) {
req := buildRequest(tc.orgID, tc.query, tc.startTime, tc.endTime)
params := req.URL.Query()
params.Set("max_interval", tc.interval)
req.URL.RawQuery = params.Encode()
w := httptest.NewRecorder()
h.AddDeleteRequestHandler(w, req)
@ -110,6 +176,255 @@ func TestAddDeleteRequestHandler(t *testing.T) {
})
}
func TestCancelDeleteRequestHandler(t *testing.T) {
t.Run("it removes unprocessed delete requests from the store when force is true", func(t *testing.T) {
stored := []DeleteRequest{
{RequestID: "test-request", UserID: "org-id", Query: "test-query", SequenceNum: 0, Status: StatusProcessed},
{RequestID: "test-request", UserID: "org-id", Query: "test-query", SequenceNum: 1, Status: StatusReceived},
}
store := &mockDeleteRequestsStore{}
store.getResult = stored
h := NewDeleteRequestHandler(store, 0, nil)
req := buildRequest("org-id", ``, "", "")
params := req.URL.Query()
params.Set("request_id", "test-request")
params.Set("force", "true")
req.URL.RawQuery = params.Encode()
w := httptest.NewRecorder()
h.CancelDeleteRequestHandler(w, req)
require.Equal(t, w.Code, http.StatusNoContent)
require.Equal(t, store.getUser, "org-id")
require.Equal(t, store.getID, "test-request")
require.Equal(t, stored[1], store.removeReqs[0])
})
t.Run("it returns an error when parts of the query have started to be processed", func(t *testing.T) {
stored := []DeleteRequest{
{RequestID: "test-request-1", CreatedAt: now, Status: StatusProcessed},
{RequestID: "test-request-1", CreatedAt: now, Status: StatusReceived},
{RequestID: "test-request-1", CreatedAt: now, Status: StatusProcessed},
}
store := &mockDeleteRequestsStore{}
store.getResult = stored
h := NewDeleteRequestHandler(store, 0, nil)
req := buildRequest("org-id", ``, "", "")
params := req.URL.Query()
params.Set("request_id", "test-request")
params.Set("force", "false")
req.URL.RawQuery = params.Encode()
w := httptest.NewRecorder()
h.CancelDeleteRequestHandler(w, req)
require.Equal(t, w.Code, http.StatusBadRequest)
require.Equal(t, "Unable to cancel partially completed delete request. To force, use the ?force query parameter\n", w.Body.String())
})
t.Run("error getting from store", func(t *testing.T) {
store := &mockDeleteRequestsStore{}
store.getErr = errors.New("something bad")
h := NewDeleteRequestHandler(store, 0, nil)
req := buildRequest("org id", ``, "", "")
params := req.URL.Query()
params.Set("request_id", "test-request")
req.URL.RawQuery = params.Encode()
w := httptest.NewRecorder()
h.CancelDeleteRequestHandler(w, req)
require.Equal(t, w.Code, http.StatusInternalServerError)
require.Equal(t, "something bad\n", w.Body.String())
})
t.Run("error removing from the store", func(t *testing.T) {
stored := []DeleteRequest{{RequestID: "test-request", UserID: "org-id", Query: "test-query", Status: StatusReceived}}
store := &mockDeleteRequestsStore{}
store.getResult = stored
store.removeErr = errors.New("something bad")
h := NewDeleteRequestHandler(store, 0, nil)
req := buildRequest("org-id", ``, "", "")
params := req.URL.Query()
params.Set("request_id", "test-request")
req.URL.RawQuery = params.Encode()
w := httptest.NewRecorder()
h.CancelDeleteRequestHandler(w, req)
require.Equal(t, w.Code, http.StatusInternalServerError)
require.Equal(t, "something bad\n", w.Body.String())
})
t.Run("Validation", func(t *testing.T) {
t.Run("no org id", func(t *testing.T) {
h := NewDeleteRequestHandler(&mockDeleteRequestsStore{}, 0, nil)
req := buildRequest("", ``, "", "")
params := req.URL.Query()
params.Set("request_id", "test-request")
req.URL.RawQuery = params.Encode()
w := httptest.NewRecorder()
h.CancelDeleteRequestHandler(w, req)
require.Equal(t, w.Code, http.StatusBadRequest)
require.Equal(t, "no org id\n", w.Body.String())
})
t.Run("request not found", func(t *testing.T) {
h := NewDeleteRequestHandler(&mockDeleteRequestsStore{getErr: ErrDeleteRequestNotFound}, 0, nil)
req := buildRequest("org-id", ``, "", "")
params := req.URL.Query()
params.Set("request_id", "test-request")
req.URL.RawQuery = params.Encode()
w := httptest.NewRecorder()
h.CancelDeleteRequestHandler(w, req)
require.Equal(t, w.Code, http.StatusNotFound)
require.Equal(t, "could not find delete request with given id\n", w.Body.String())
})
t.Run("all requests in group are already processed", func(t *testing.T) {
stored := []DeleteRequest{{RequestID: "test-request", UserID: "org-id", Query: "test-query", Status: StatusProcessed}}
store := &mockDeleteRequestsStore{}
store.getResult = stored
h := NewDeleteRequestHandler(store, 0, nil)
req := buildRequest("org-id", ``, "", "")
params := req.URL.Query()
params.Set("request_id", "test-request")
req.URL.RawQuery = params.Encode()
w := httptest.NewRecorder()
h.CancelDeleteRequestHandler(w, req)
require.Equal(t, w.Code, http.StatusBadRequest)
require.Equal(t, "deletion of request which is in process or already processed is not allowed\n", w.Body.String())
})
})
}
func TestGetAllDeleteRequestsHandler(t *testing.T) {
t.Run("it gets all the delete requests for the user", func(t *testing.T) {
store := &mockDeleteRequestsStore{}
store.getAllResult = []DeleteRequest{{RequestID: "test-request-1", Status: StatusReceived}, {RequestID: "test-request-2", Status: StatusReceived}}
h := NewDeleteRequestHandler(store, 0, nil)
req := buildRequest("org-id", ``, "", "")
w := httptest.NewRecorder()
h.GetAllDeleteRequestsHandler(w, req)
require.Equal(t, w.Code, http.StatusOK)
require.Equal(t, store.getAllUser, "org-id")
var result []DeleteRequest
require.NoError(t, json.Unmarshal(w.Body.Bytes(), &result))
require.ElementsMatch(t, store.getAllResult, result)
})
t.Run("it merges requests with the same requestID", func(t *testing.T) {
store := &mockDeleteRequestsStore{}
store.getAllResult = []DeleteRequest{
{RequestID: "test-request-1", CreatedAt: now, StartTime: now, EndTime: now.Add(time.Hour)},
{RequestID: "test-request-1", CreatedAt: now, StartTime: now.Add(2 * time.Hour), EndTime: now.Add(3 * time.Hour)},
{RequestID: "test-request-2", CreatedAt: now.Add(time.Minute), StartTime: now.Add(30 * time.Minute), EndTime: now.Add(90 * time.Minute)},
{RequestID: "test-request-1", CreatedAt: now, StartTime: now.Add(time.Hour), EndTime: now.Add(2 * time.Hour)},
}
h := NewDeleteRequestHandler(store, 0, nil)
req := buildRequest("org-id", ``, "", "")
w := httptest.NewRecorder()
h.GetAllDeleteRequestsHandler(w, req)
require.Equal(t, w.Code, http.StatusOK)
var result []DeleteRequest
require.NoError(t, json.Unmarshal(w.Body.Bytes(), &result))
require.Len(t, result, 2)
require.Equal(t, []DeleteRequest{
{RequestID: "test-request-1", Status: StatusReceived, CreatedAt: now, StartTime: now, EndTime: now.Add(3 * time.Hour)},
{RequestID: "test-request-2", Status: StatusReceived, CreatedAt: now.Add(time.Minute), StartTime: now.Add(30 * time.Minute), EndTime: now.Add(90 * time.Minute)},
}, result)
})
t.Run("it only considers a request processed if all it's subqueries are processed", func(t *testing.T) {
store := &mockDeleteRequestsStore{}
store.getAllResult = []DeleteRequest{
{RequestID: "test-request-1", CreatedAt: now, Status: StatusProcessed},
{RequestID: "test-request-1", CreatedAt: now, Status: StatusReceived},
{RequestID: "test-request-1", CreatedAt: now, Status: StatusProcessed},
{RequestID: "test-request-2", CreatedAt: now.Add(time.Minute), Status: StatusProcessed},
{RequestID: "test-request-2", CreatedAt: now.Add(time.Minute), Status: StatusProcessed},
{RequestID: "test-request-2", CreatedAt: now.Add(time.Minute), Status: StatusProcessed},
{RequestID: "test-request-3", CreatedAt: now.Add(2 * time.Minute), Status: StatusReceived},
}
h := NewDeleteRequestHandler(store, 0, nil)
req := buildRequest("org-id", ``, "", "")
w := httptest.NewRecorder()
h.GetAllDeleteRequestsHandler(w, req)
require.Equal(t, w.Code, http.StatusOK)
var result []DeleteRequest
require.NoError(t, json.Unmarshal(w.Body.Bytes(), &result))
require.Len(t, result, 3)
require.Equal(t, []DeleteRequest{
{RequestID: "test-request-1", CreatedAt: now, Status: "66% Complete"},
{RequestID: "test-request-2", CreatedAt: now.Add(time.Minute), Status: StatusProcessed},
{RequestID: "test-request-3", CreatedAt: now.Add(2 * time.Minute), Status: StatusReceived},
}, result)
})
t.Run("error getting from store", func(t *testing.T) {
store := &mockDeleteRequestsStore{}
store.getAllErr = errors.New("something bad")
h := NewDeleteRequestHandler(store, 0, nil)
req := buildRequest("org id", ``, "", "")
params := req.URL.Query()
params.Set("request_id", "test-request")
req.URL.RawQuery = params.Encode()
w := httptest.NewRecorder()
h.GetAllDeleteRequestsHandler(w, req)
require.Equal(t, w.Code, http.StatusInternalServerError)
require.Equal(t, "something bad\n", w.Body.String())
})
t.Run("validation", func(t *testing.T) {
t.Run("no org id", func(t *testing.T) {
h := NewDeleteRequestHandler(&mockDeleteRequestsStore{}, 0, nil)
req := buildRequest("", ``, "", "")
w := httptest.NewRecorder()
h.GetAllDeleteRequestsHandler(w, req)
require.Equal(t, w.Code, http.StatusBadRequest)
require.Equal(t, "no org id\n", w.Body.String())
})
})
}
func buildRequest(orgID, query, start, end string) *http.Request {
var req *http.Request
if orgID == "" {
@ -123,11 +438,16 @@ func buildRequest(orgID, query, start, end string) *http.Request {
q.Set("query", query)
q.Set("start", start)
q.Set("end", end)
req.URL.RawQuery = q.Encode()
return req
}
func unixString(t model.Time) string {
return fmt.Sprint(t.Unix())
}
func toTime(t string) model.Time {
modelTime, _ := util.ParseTime(t)
return model.Time(modelTime)

@ -35,3 +35,11 @@ func deleteModeFromLimits(l Limits, userID string) (deletionmode.Mode, error) {
mode := l.DeletionMode(userID)
return deletionmode.ParseMode(mode)
}
func partitionByRequestID(reqs []DeleteRequest) map[string][]DeleteRequest {
groups := make(map[string][]DeleteRequest)
for _, req := range reqs {
groups[req.RequestID] = append(groups[req.RequestID], req)
}
return groups
}

Loading…
Cancel
Save