mirror of https://github.com/grafana/loki
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
383 lines
11 KiB
383 lines
11 KiB
|
5 years ago
|
package deletion
|
||
|
|
|
||
|
|
import (
|
||
|
|
"encoding/json"
|
||
|
3 years ago
|
"errors"
|
||
|
4 years ago
|
"fmt"
|
||
|
3 years ago
|
"math"
|
||
|
5 years ago
|
"net/http"
|
||
|
3 years ago
|
"net/url"
|
||
|
3 years ago
|
"sort"
|
||
|
3 years ago
|
"time"
|
||
|
|
|
||
|
|
"github.com/grafana/loki/pkg/util"
|
||
|
5 years ago
|
|
||
|
4 years ago
|
"github.com/go-kit/log/level"
|
||
|
5 years ago
|
"github.com/prometheus/client_golang/prometheus"
|
||
|
|
"github.com/prometheus/common/model"
|
||
|
|
|
||
|
4 years ago
|
"github.com/grafana/dskit/tenant"
|
||
|
|
|
||
|
4 years ago
|
util_log "github.com/grafana/loki/pkg/util/log"
|
||
|
5 years ago
|
)
|
||
|
|
|
||
|
|
// DeleteRequestHandler provides handlers for delete requests
|
||
|
|
type DeleteRequestHandler struct {
|
||
|
3 years ago
|
deleteRequestsStore DeleteRequestsStore
|
||
|
|
metrics *deleteRequestHandlerMetrics
|
||
|
|
maxInterval time.Duration
|
||
|
5 years ago
|
}
|
||
|
|
|
||
|
|
// NewDeleteRequestHandler creates a DeleteRequestHandler
|
||
|
3 years ago
|
func NewDeleteRequestHandler(deleteStore DeleteRequestsStore, maxInterval time.Duration, registerer prometheus.Registerer) *DeleteRequestHandler {
|
||
|
5 years ago
|
deleteMgr := DeleteRequestHandler{
|
||
|
3 years ago
|
deleteRequestsStore: deleteStore,
|
||
|
|
maxInterval: maxInterval,
|
||
|
|
metrics: newDeleteRequestHandlerMetrics(registerer),
|
||
|
5 years ago
|
}
|
||
|
|
|
||
|
|
return &deleteMgr
|
||
|
|
}
|
||
|
|
|
||
|
4 years ago
|
// AddDeleteRequestHandler handles addition of a new delete request
|
||
|
3 years ago
|
func (dm *DeleteRequestHandler) AddDeleteRequestHandler(w http.ResponseWriter, r *http.Request) {
|
||
|
5 years ago
|
ctx := r.Context()
|
||
|
|
userID, err := tenant.TenantID(ctx)
|
||
|
|
if err != nil {
|
||
|
4 years ago
|
http.Error(w, err.Error(), http.StatusBadRequest)
|
||
|
5 years ago
|
return
|
||
|
|
}
|
||
|
|
|
||
|
|
params := r.URL.Query()
|
||
|
3 years ago
|
query, err := query(params)
|
||
|
|
if err != nil {
|
||
|
|
http.Error(w, err.Error(), http.StatusBadRequest)
|
||
|
5 years ago
|
return
|
||
|
|
}
|
||
|
|
|
||
|
3 years ago
|
startTime, err := startTime(params)
|
||
|
4 years ago
|
if err != nil {
|
||
|
|
http.Error(w, err.Error(), http.StatusBadRequest)
|
||
|
|
return
|
||
|
5 years ago
|
}
|
||
|
|
|
||
|
3 years ago
|
endTime, err := endTime(params, startTime)
|
||
|
|
if err != nil {
|
||
|
|
http.Error(w, err.Error(), http.StatusBadRequest)
|
||
|
5 years ago
|
return
|
||
|
|
}
|
||
|
|
|
||
|
3 years ago
|
interval, err := dm.interval(params, startTime, endTime)
|
||
|
|
if err != nil {
|
||
|
|
http.Error(w, err.Error(), http.StatusBadRequest)
|
||
|
|
return
|
||
|
|
}
|
||
|
|
|
||
|
|
deleteRequests := shardDeleteRequestsByInterval(startTime, endTime, query, userID, interval)
|
||
|
3 years ago
|
createdDeleteRequests, err := dm.deleteRequestsStore.AddDeleteRequestGroup(ctx, deleteRequests)
|
||
|
|
if err != nil {
|
||
|
5 years ago
|
level.Error(util_log.Logger).Log("msg", "error adding delete request to the store", "err", err)
|
||
|
4 years ago
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||
|
5 years ago
|
return
|
||
|
|
}
|
||
|
|
|
||
|
3 years ago
|
if len(createdDeleteRequests) == 0 {
|
||
|
|
level.Error(util_log.Logger).Log("msg", "zero delete requests created", "user", userID, "query", query)
|
||
|
|
http.Error(w, "Zero delete requests were created due to an internal error. Please contact support.", http.StatusInternalServerError)
|
||
|
|
return
|
||
|
|
}
|
||
|
|
|
||
|
3 years ago
|
level.Info(util_log.Logger).Log(
|
||
|
|
"msg", "delete request for user added",
|
||
|
3 years ago
|
"delete_request_id", createdDeleteRequests[0].RequestID,
|
||
|
3 years ago
|
"user", userID,
|
||
|
|
"query", query,
|
||
|
3 years ago
|
"interval", interval.String(),
|
||
|
3 years ago
|
)
|
||
|
|
|
||
|
5 years ago
|
dm.metrics.deleteRequestsReceivedTotal.WithLabelValues(userID).Inc()
|
||
|
|
w.WriteHeader(http.StatusNoContent)
|
||
|
|
}
|
||
|
|
|
||
|
3 years ago
|
func shardDeleteRequestsByInterval(startTime, endTime model.Time, query, userID string, interval time.Duration) []DeleteRequest {
|
||
|
|
deleteRequests := make([]DeleteRequest, 0, endTime.Sub(startTime)/interval)
|
||
|
|
for start := startTime; start.Before(endTime); start = start.Add(interval) + 1 {
|
||
|
|
end := start.Add(interval)
|
||
|
|
if end.After(endTime) {
|
||
|
|
end = endTime
|
||
|
|
}
|
||
|
|
|
||
|
|
deleteRequests = append(deleteRequests,
|
||
|
|
DeleteRequest{
|
||
|
|
StartTime: start,
|
||
|
|
EndTime: end,
|
||
|
|
Query: query,
|
||
|
|
UserID: userID,
|
||
|
|
})
|
||
|
|
}
|
||
|
|
return deleteRequests
|
||
|
|
}
|
||
|
|
|
||
|
|
func (dm *DeleteRequestHandler) interval(params url.Values, startTime, endTime model.Time) (time.Duration, error) {
|
||
|
|
qr := params.Get("max_interval")
|
||
|
|
if qr == "" {
|
||
|
3 years ago
|
return dm.intervalFromStartAndEnd(startTime, endTime)
|
||
|
3 years ago
|
}
|
||
|
|
|
||
|
|
interval, err := time.ParseDuration(qr)
|
||
|
|
if err != nil || interval < time.Second {
|
||
|
|
return 0, errors.New("invalid max_interval: valid time units are 's', 'm', 'h'")
|
||
|
|
}
|
||
|
|
|
||
|
|
if interval > dm.maxInterval && dm.maxInterval != 0 {
|
||
|
|
return 0, fmt.Errorf("max_interval can't be greater than %s", dm.maxInterval.String())
|
||
|
|
}
|
||
|
|
|
||
|
|
if interval > endTime.Sub(startTime) {
|
||
|
|
return 0, fmt.Errorf("max_interval can't be greater than the interval to be deleted (%s)", endTime.Sub(startTime))
|
||
|
|
}
|
||
|
|
|
||
|
|
return interval, nil
|
||
|
|
}
|
||
|
|
|
||
|
3 years ago
|
func (dm *DeleteRequestHandler) intervalFromStartAndEnd(startTime, endTime model.Time) (time.Duration, error) {
|
||
|
|
interval := endTime.Sub(startTime)
|
||
|
|
if interval < time.Second {
|
||
|
|
return 0, errors.New("difference between start time and end time must be at least one second")
|
||
|
|
}
|
||
|
|
|
||
|
|
if dm.maxInterval == 0 {
|
||
|
|
return interval, nil
|
||
|
|
}
|
||
|
|
return min(interval, dm.maxInterval), nil
|
||
|
|
}
|
||
|
|
|
||
|
3 years ago
|
func min(a, b time.Duration) time.Duration {
|
||
|
|
if a < b {
|
||
|
|
return a
|
||
|
|
}
|
||
|
|
return b
|
||
|
|
}
|
||
|
|
|
||
|
5 years ago
|
// GetAllDeleteRequestsHandler handles get all delete requests
|
||
|
3 years ago
|
func (dm *DeleteRequestHandler) GetAllDeleteRequestsHandler(w http.ResponseWriter, r *http.Request) {
|
||
|
5 years ago
|
ctx := r.Context()
|
||
|
|
userID, err := tenant.TenantID(ctx)
|
||
|
|
if err != nil {
|
||
|
4 years ago
|
http.Error(w, err.Error(), http.StatusBadRequest)
|
||
|
5 years ago
|
return
|
||
|
|
}
|
||
|
|
|
||
|
3 years ago
|
deleteGroups, err := dm.deleteRequestsStore.GetAllDeleteRequestsForUser(ctx, userID)
|
||
|
5 years ago
|
if err != nil {
|
||
|
|
level.Error(util_log.Logger).Log("msg", "error getting delete requests from the store", "err", err)
|
||
|
4 years ago
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||
|
5 years ago
|
return
|
||
|
|
}
|
||
|
|
|
||
|
3 years ago
|
deletesPerRequest := partitionByRequestID(deleteGroups)
|
||
|
|
deleteRequests := mergeDeletes(deletesPerRequest)
|
||
|
|
|
||
|
|
sort.Slice(deleteRequests, func(i, j int) bool {
|
||
|
|
return deleteRequests[i].CreatedAt < deleteRequests[j].CreatedAt
|
||
|
|
})
|
||
|
|
|
||
|
2 years ago
|
w.Header().Set("Content-Type", "application/json")
|
||
|
5 years ago
|
if err := json.NewEncoder(w).Encode(deleteRequests); err != nil {
|
||
|
|
level.Error(util_log.Logger).Log("msg", "error marshalling response", "err", err)
|
||
|
4 years ago
|
http.Error(w, fmt.Sprintf("Error marshalling response: %v", err), http.StatusInternalServerError)
|
||
|
5 years ago
|
}
|
||
|
|
}
|
||
|
|
|
||
|
3 years ago
|
func mergeDeletes(groups map[string][]DeleteRequest) []DeleteRequest {
|
||
|
|
mergedRequests := []DeleteRequest{} // Declare this way so the return value is [] rather than null
|
||
|
|
for _, deletes := range groups {
|
||
|
|
startTime, endTime, status := mergeData(deletes)
|
||
|
|
newDelete := deletes[0]
|
||
|
|
newDelete.StartTime = startTime
|
||
|
|
newDelete.EndTime = endTime
|
||
|
|
newDelete.Status = status
|
||
|
|
|
||
|
|
mergedRequests = append(mergedRequests, newDelete)
|
||
|
|
}
|
||
|
|
return mergedRequests
|
||
|
|
}
|
||
|
|
|
||
|
|
func mergeData(deletes []DeleteRequest) (model.Time, model.Time, DeleteRequestStatus) {
|
||
|
|
var (
|
||
|
|
startTime = model.Time(math.MaxInt64)
|
||
|
|
endTime = model.Time(0)
|
||
|
|
numProcessed = 0
|
||
|
|
)
|
||
|
|
|
||
|
|
for _, del := range deletes {
|
||
|
|
if del.StartTime < startTime {
|
||
|
|
startTime = del.StartTime
|
||
|
|
}
|
||
|
|
|
||
|
|
if del.EndTime > endTime {
|
||
|
|
endTime = del.EndTime
|
||
|
|
}
|
||
|
|
|
||
|
|
if del.Status == StatusProcessed {
|
||
|
|
numProcessed++
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
return startTime, endTime, deleteRequestStatus(numProcessed, len(deletes))
|
||
|
|
}
|
||
|
|
|
||
|
|
func deleteRequestStatus(processed, total int) DeleteRequestStatus {
|
||
|
|
if processed == 0 {
|
||
|
|
return StatusReceived
|
||
|
|
}
|
||
|
|
|
||
|
|
if processed == total {
|
||
|
|
return StatusProcessed
|
||
|
|
}
|
||
|
|
|
||
|
|
percentCompleted := float64(processed) / float64(total)
|
||
|
|
return DeleteRequestStatus(fmt.Sprintf("%d%% Complete", int(percentCompleted*100)))
|
||
|
|
}
|
||
|
|
|
||
|
5 years ago
|
// CancelDeleteRequestHandler handles delete request cancellation
|
||
|
3 years ago
|
func (dm *DeleteRequestHandler) CancelDeleteRequestHandler(w http.ResponseWriter, r *http.Request) {
|
||
|
5 years ago
|
ctx := r.Context()
|
||
|
|
userID, err := tenant.TenantID(ctx)
|
||
|
|
if err != nil {
|
||
|
4 years ago
|
http.Error(w, err.Error(), http.StatusBadRequest)
|
||
|
5 years ago
|
return
|
||
|
|
}
|
||
|
|
|
||
|
|
params := r.URL.Query()
|
||
|
|
requestID := params.Get("request_id")
|
||
|
3 years ago
|
deleteRequests, err := dm.deleteRequestsStore.GetDeleteRequestGroup(ctx, userID, requestID)
|
||
|
5 years ago
|
if err != nil {
|
||
|
3 years ago
|
if errors.Is(err, ErrDeleteRequestNotFound) {
|
||
|
|
http.Error(w, "could not find delete request with given id", http.StatusNotFound)
|
||
|
|
return
|
||
|
|
}
|
||
|
|
|
||
|
5 years ago
|
level.Error(util_log.Logger).Log("msg", "error getting delete request from the store", "err", err)
|
||
|
4 years ago
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||
|
5 years ago
|
return
|
||
|
|
}
|
||
|
|
|
||
|
3 years ago
|
toDelete := filterProcessed(deleteRequests)
|
||
|
|
if len(toDelete) == 0 {
|
||
|
|
http.Error(w, "deletion of request which is in process or already processed is not allowed", http.StatusBadRequest)
|
||
|
5 years ago
|
return
|
||
|
|
}
|
||
|
|
|
||
|
3 years ago
|
if len(toDelete) != len(deleteRequests) && params.Get("force") != "true" {
|
||
|
|
http.Error(w, "Unable to cancel partially completed delete request. To force, use the ?force query parameter", http.StatusBadRequest)
|
||
|
5 years ago
|
return
|
||
|
|
}
|
||
|
|
|
||
|
3 years ago
|
if err := dm.deleteRequestsStore.RemoveDeleteRequests(ctx, toDelete); err != nil {
|
||
|
5 years ago
|
level.Error(util_log.Logger).Log("msg", "error cancelling the delete request", "err", err)
|
||
|
4 years ago
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||
|
5 years ago
|
return
|
||
|
|
}
|
||
|
|
|
||
|
|
w.WriteHeader(http.StatusNoContent)
|
||
|
|
}
|
||
|
4 years ago
|
|
||
|
3 years ago
|
func filterProcessed(reqs []DeleteRequest) []DeleteRequest {
|
||
|
|
var unprocessed []DeleteRequest
|
||
|
|
for _, r := range reqs {
|
||
|
|
if r.Status == StatusReceived {
|
||
|
|
unprocessed = append(unprocessed, r)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
return unprocessed
|
||
|
|
}
|
||
|
|
|
||
|
4 years ago
|
// GetCacheGenerationNumberHandler handles requests for a user's cache generation number
|
||
|
3 years ago
|
func (dm *DeleteRequestHandler) GetCacheGenerationNumberHandler(w http.ResponseWriter, r *http.Request) {
|
||
|
4 years ago
|
ctx := r.Context()
|
||
|
|
userID, err := tenant.TenantID(ctx)
|
||
|
|
if err != nil {
|
||
|
|
http.Error(w, err.Error(), http.StatusBadRequest)
|
||
|
|
return
|
||
|
|
}
|
||
|
|
|
||
|
|
cacheGenNumber, err := dm.deleteRequestsStore.GetCacheGenerationNumber(ctx, userID)
|
||
|
|
if err != nil {
|
||
|
|
level.Error(util_log.Logger).Log("msg", "error getting cache generation number", "err", err)
|
||
|
|
http.Error(w, fmt.Sprintf("error getting cache generation number %v", err), http.StatusInternalServerError)
|
||
|
|
return
|
||
|
|
}
|
||
|
|
|
||
|
|
if err := json.NewEncoder(w).Encode(cacheGenNumber); err != nil {
|
||
|
|
level.Error(util_log.Logger).Log("msg", "error marshalling response", "err", err)
|
||
|
|
http.Error(w, fmt.Sprintf("Error marshalling response: %v", err), http.StatusInternalServerError)
|
||
|
|
}
|
||
|
|
}
|
||
|
4 years ago
|
|
||
|
3 years ago
|
func query(params url.Values) (string, error) {
|
||
|
|
query := params.Get("query")
|
||
|
|
if len(query) == 0 {
|
||
|
|
return "", errors.New("query not set")
|
||
|
|
}
|
||
|
|
|
||
|
|
if _, err := parseDeletionQuery(query); err != nil {
|
||
|
|
return "", err
|
||
|
|
}
|
||
|
|
|
||
|
|
return query, nil
|
||
|
|
}
|
||
|
|
|
||
|
3 years ago
|
func startTime(params url.Values) (model.Time, error) {
|
||
|
3 years ago
|
startParam := params.Get("start")
|
||
|
|
if startParam == "" {
|
||
|
|
return 0, errors.New("start time not set")
|
||
|
|
}
|
||
|
|
|
||
|
|
st, err := parseTime(startParam)
|
||
|
|
if err != nil {
|
||
|
|
return 0, errors.New("invalid start time: require unix seconds or RFC3339 format")
|
||
|
|
}
|
||
|
|
|
||
|
3 years ago
|
return model.Time(st), nil
|
||
|
3 years ago
|
}
|
||
|
|
|
||
|
3 years ago
|
func endTime(params url.Values, startTime model.Time) (model.Time, error) {
|
||
|
3 years ago
|
endParam := params.Get("end")
|
||
|
|
endTime, err := parseTime(endParam)
|
||
|
|
if err != nil {
|
||
|
|
return 0, errors.New("invalid end time: require unix seconds or RFC3339 format")
|
||
|
|
}
|
||
|
|
|
||
|
|
if endTime > int64(model.Now()) {
|
||
|
|
return 0, errors.New("deletes in the future are not allowed")
|
||
|
|
}
|
||
|
|
|
||
|
3 years ago
|
if int64(startTime) > endTime {
|
||
|
3 years ago
|
return 0, errors.New("start time can't be greater than end time")
|
||
|
|
}
|
||
|
|
|
||
|
3 years ago
|
return model.Time(endTime), nil
|
||
|
3 years ago
|
}
|
||
|
|
|
||
|
|
func parseTime(in string) (int64, error) {
|
||
|
|
if in == "" {
|
||
|
|
return int64(model.Now()), nil
|
||
|
|
}
|
||
|
|
|
||
|
|
t, err := time.Parse(time.RFC3339, in)
|
||
|
|
if err != nil {
|
||
|
|
return timeFromInt(in)
|
||
|
|
}
|
||
|
|
|
||
|
|
return t.UnixMilli(), nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func timeFromInt(in string) (int64, error) {
|
||
|
|
if len(in) != 10 {
|
||
|
|
return 0, errors.New("not unix seconds")
|
||
|
|
}
|
||
|
|
|
||
|
|
return util.ParseTime(in)
|
||
|
4 years ago
|
}
|