Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/compactor/deletion/delete_request.go

224 lines
6.0 KiB

package deletion
import (
"strings"
"time"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"go.uber.org/atomic"
"github.com/grafana/loki/v3/pkg/compactor/deletion/deletionproto"
"github.com/grafana/loki/v3/pkg/compactor/retention"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/util/filter"
util_log "github.com/grafana/loki/v3/pkg/util/log"
)
type timeInterval struct {
start, end time.Time
}
type deleteRequest struct {
deletionproto.DeleteRequest
matchers []*labels.Matcher `json:"-"`
logSelectorExpr syntax.LogSelectorExpr `json:"-"`
timeInterval *timeInterval `json:"-"`
TotalLinesDeletedMetric *prometheus.CounterVec `json:"-"`
DeletedLines atomic.Int32 `json:"-"`
}
func newDeleteRequest(protoReq deletionproto.DeleteRequest, totalLinesDeletedMetric *prometheus.CounterVec) (*deleteRequest, error) {
d := deleteRequest{
DeleteRequest: protoReq,
TotalLinesDeletedMetric: totalLinesDeletedMetric,
}
if err := d.SetQuery(d.Query); err != nil {
return nil, err
}
// init d.timeInterval used to efficiently check log ts is within the bounds of delete request below in filter func
// without having to do conversion of timestamps for each log line we check.
d.timeInterval = &timeInterval{
start: d.StartTime.Time(),
end: d.EndTime.Time(),
}
return &d, nil
}
func (d *deleteRequest) SetQuery(logQL string) error {
d.Query = logQL
logSelectorExpr, err := parseDeletionQuery(logQL)
if err != nil {
return err
}
d.logSelectorExpr = logSelectorExpr
d.matchers = logSelectorExpr.Matchers()
return nil
}
// FilterFunction returns a filter function that returns true if the given line should be deleted based on the DeleteRequest
func (d *deleteRequest) FilterFunction(lbls labels.Labels) (filter.Func, error) {
if !allMatch(d.matchers, lbls) {
return func(_ time.Time, _ string, _ labels.Labels) bool {
return false
}, nil
}
// if delete request doesn't have a line filter, just do time based filtering
if !d.logSelectorExpr.HasFilter() {
return func(ts time.Time, _ string, _ labels.Labels) bool {
if ts.Before(d.timeInterval.start) || ts.After(d.timeInterval.end) {
return false
}
return true
}, nil
}
p, err := d.logSelectorExpr.Pipeline()
if err != nil {
return nil, err
}
f := p.ForStream(lbls).ProcessString
return func(ts time.Time, s string, structuredMetadata labels.Labels) bool {
if ts.Before(d.timeInterval.start) || ts.After(d.timeInterval.end) {
return false
}
result, _, skip := f(0, s, structuredMetadata)
if len(result) != 0 || skip {
d.TotalLinesDeletedMetric.WithLabelValues(d.UserID).Inc()
d.DeletedLines.Add(1)
return true
}
return false
}, nil
}
func allMatch(matchers []*labels.Matcher, labels labels.Labels) bool {
for _, m := range matchers {
if !m.Matches(labels.Get(m.Name)) {
return false
}
}
return true
}
// IsDeleted checks if the given chunk entry would have data requested for deletion.
func (d *deleteRequest) IsDeleted(userID []byte, lbls labels.Labels, chunk retention.Chunk) bool {
if d.UserID != unsafeGetString(userID) {
return false
}
if !intervalsOverlap(model.Interval{
Start: chunk.From,
End: chunk.Through,
}, model.Interval{
Start: d.StartTime,
End: d.EndTime,
}) {
return false
}
if d.logSelectorExpr == nil {
err := d.SetQuery(d.Query)
if err != nil {
level.Error(util_log.Logger).Log(
"msg", "failed to init log selector expr",
"delete_request_id", d.RequestID,
"user", d.UserID,
"err", err,
)
return false
}
}
if !labels.Selector(d.matchers).Matches(lbls) {
return false
}
return true
}
// GetChunkFilter tells whether the chunk is covered by the DeleteRequest and
// optionally returns a filter.Func if the chunk is supposed to be deleted partially or the delete request has line filters.
func (d *deleteRequest) GetChunkFilter(userID []byte, lbls labels.Labels, chunk retention.Chunk) (bool, filter.Func) {
if !d.IsDeleted(userID, lbls, chunk) {
return false, nil
}
if d.StartTime <= chunk.From && d.EndTime >= chunk.Through && !d.logSelectorExpr.HasFilter() {
// Delete request covers the whole chunk and there are no line filters in the logSelectorExpr so the whole chunk will be deleted
return true, nil
}
ff, err := d.FilterFunction(lbls)
if err != nil {
// The query in the delete request is checked when added to the table.
// So this error should not occur.
level.Error(util_log.Logger).Log(
"msg", "unexpected error getting filter function",
"delete_request_id", d.RequestID,
"user", d.UserID,
"err", err,
)
return false, nil
}
return true, ff
}
func (d *deleteRequest) IsDuplicate(o *deleteRequest) (bool, error) {
// we would never have duplicates from same request
if d.RequestID == o.RequestID {
return false, nil
}
if d.UserID != o.UserID || d.StartTime != o.StartTime || d.EndTime != o.EndTime {
return false, nil
}
if d.logSelectorExpr == nil {
if err := d.SetQuery(d.Query); err != nil {
return false, errors.Wrapf(err, "failed to init log selector expr for request_id=%s, user_id=%s", d.RequestID, d.UserID)
}
}
if o.logSelectorExpr == nil {
if err := o.SetQuery(o.Query); err != nil {
return false, errors.Wrapf(err, "failed to init log selector expr for request_id=%s, user_id=%s", o.RequestID, o.UserID)
}
}
if d.logSelectorExpr.String() != o.logSelectorExpr.String() {
return false, nil
}
return true, nil
}
func intervalsOverlap(interval1, interval2 model.Interval) bool {
if interval1.Start > interval2.End || interval2.Start > interval1.End {
return false
}
return true
}
// GetMatchers returns the string representation of the matchers
func (d *deleteRequest) GetMatchers() string {
if len(d.matchers) == 0 {
return ""
}
var result []string
for _, m := range d.matchers {
result = append(result, m.String())
}
return strings.Join(result, ",")
}