Querier/Ruler: query blocker (#7785)

Block malicious or expensive queries using a per-tenant runtime configuration.
pull/7791/head
Danny Kopping 3 years ago committed by GitHub
parent c6b55e7512
commit a63ad06509
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 47
      docs/sources/operations/blocking-queries.md
  3. 5
      pkg/logcli/client/file.go
  4. 94
      pkg/logql/blocker.go
  5. 150
      pkg/logql/blocker_test.go
  6. 28
      pkg/logql/engine.go
  7. 12
      pkg/logql/limits.go
  8. 1
      pkg/logqlmodel/error.go
  9. 5
      pkg/querier/queryrange/roundtrip_test.go
  10. 2
      pkg/util/server/error.go
  11. 9
      pkg/util/validation/blocked_queries.go
  12. 7
      pkg/validation/limits.go
  13. 11
      pkg/validation/limits_test.go

@ -11,6 +11,7 @@
* [7602](https://github.com/grafana/loki/pull/7602) **vmax**: Add decolorize filter to easily parse colored logs.
* [7731](https://github.com/grafana/loki/pull/7731) **bitkill**: Add healthchecks to the docker-compose example.
* [7759](https://github.com/grafana/loki/pull/7759) **kavirajk**: Improve error message for loading config with ENV variables.
* [7785](https://github.com/grafana/loki/pull/7785) **dannykopping**: Add query blocker for queries and rules.
##### Fixes

@ -0,0 +1,47 @@
---
title: Blocking Queries
weight: 60
---
# Blocking Queries
In certain situations, you may not be able to control the queries being sent to your Loki installation. These queries
may be intentionally or unintentionally expensive to run, and they may affect the overall stability or cost of running
your service.
You can block queries using [per-tenant overrides](../configuration/#runtime-configuration-file), like so:
```yaml
overrides:
"tenant-id":
blocked_queries:
# block this query exactly
- pattern: 'sum(rate({env="prod"}[1m]))'
# block any query matching this regex pattern
- pattern: '.*prod.*'
regex: true
# block all metric queries
- types: metric
# block any filter or limited queries matching this regex pattern
- pattern: '.*prod.*'
regex: true
types: filter,limited
```
The available query types are:
- `metric`: a query with an aggregation, e.g. `sum(rate({env="prod"}[1m]))`
- `filter`: a query with a log filter, e.g. `{env="prod"} |= "error"`
- `limited`: a query without a filter or a metric aggregation
**Note:** the order of patterns is preserved, so the first matching pattern will be used
## Observing blocked queries
Blocked queries are logged, as well as counted in the `loki_blocked_queries` metric on a per-tenant basis.
## Scope
Queries received via the API and executed as [alerting/recording rules](../rules/) will be blocked.

@ -18,6 +18,7 @@ import (
logqllog "github.com/grafana/loki/pkg/logql/log"
"github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/util/marshal"
"github.com/grafana/loki/pkg/util/validation"
"github.com/prometheus/prometheus/model/labels"
"github.com/weaveworks/common/user"
@ -193,6 +194,10 @@ func (l *limiter) QueryTimeout(userID string) time.Duration {
return time.Minute * 5
}
func (l *limiter) BlockedQueries(userID string) []*validation.BlockedQuery {
return []*validation.BlockedQuery{}
}
type querier struct {
r io.Reader
labels labels.Labels

@ -0,0 +1,94 @@
package logql
import (
"context"
"strings"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/regexp"
logutil "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/util/validation"
)
type queryBlocker struct {
ctx context.Context
q *query
logger log.Logger
}
func newQueryBlocker(ctx context.Context, q *query) *queryBlocker {
return &queryBlocker{
ctx: ctx,
q: q,
logger: logutil.WithContext(ctx, q.logger),
}
}
func (qb *queryBlocker) isBlocked(tenant string) bool {
patterns := qb.q.limits.BlockedQueries(tenant)
if len(patterns) <= 0 {
return false
}
typ, err := QueryType(qb.q.params.Query())
if err != nil {
typ = "unknown"
}
logger := log.With(qb.logger, "user", tenant, "type", typ)
query := qb.q.params.Query()
for _, p := range patterns {
// if no pattern is given, assume we want to match all queries
if p.Pattern == "" {
p.Pattern = ".*"
p.Regex = true
}
if strings.TrimSpace(p.Pattern) == strings.TrimSpace(query) {
level.Warn(logger).Log("msg", "query blocker matched with exact match policy", "query", query)
return qb.block(p, typ, logger)
}
if p.Regex {
r, err := regexp.Compile(p.Pattern)
if err != nil {
level.Error(logger).Log("msg", "query blocker regex does not compile", "pattern", p.Pattern, "err", err)
continue
}
if r.MatchString(query) {
level.Warn(logger).Log("msg", "query blocker matched with regex policy", "pattern", p.Pattern, "query", query)
return qb.block(p, typ, logger)
}
}
}
return false
}
func (qb *queryBlocker) block(q *validation.BlockedQuery, typ string, logger log.Logger) bool {
// no specific types to validate against, so query is blocked
if len(q.Types) == 0 {
return true
}
matched := false
for _, qt := range q.Types {
if qt == typ {
matched = true
break
}
}
// query would be blocked, but it didn't match specified types
if !matched {
level.Debug(logger).Log("msg", "query blocker matched pattern, but not specified types", "pattern", q.Pattern, "types", q.Types.String(), "queryType", typ)
return false
}
return true
}

@ -0,0 +1,150 @@
package logql
import (
"context"
"fmt"
"testing"
"time"
"github.com/go-kit/log"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/user"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logqlmodel"
"github.com/grafana/loki/pkg/util/validation"
)
func TestEngine_ExecWithBlockedQueries(t *testing.T) {
limits := &fakeLimits{maxSeries: 10}
eng := NewEngine(EngineOpts{}, getLocalQuerier(100000), limits, log.NewNopLogger())
defaultQuery := `topk(1,rate(({app=~"foo|bar"})[1m]))`
for _, test := range []struct {
name string
q string
blocked []*validation.BlockedQuery
expectedErr error
}{
{
"exact match all types",
defaultQuery, []*validation.BlockedQuery{
{
Pattern: defaultQuery,
},
}, logqlmodel.ErrBlocked,
},
{
"exact match all types with surrounding whitespace trimmed",
defaultQuery, []*validation.BlockedQuery{
{
Pattern: fmt.Sprintf(" %s ", defaultQuery),
},
}, logqlmodel.ErrBlocked,
},
{
"exact match filter type only",
`{app=~"foo|bar"} |= "baz"`, []*validation.BlockedQuery{
{
Pattern: `{app=~"foo|bar"} |= "baz"`,
Types: []string{QueryTypeFilter},
},
}, logqlmodel.ErrBlocked,
},
{
"match from multiple patterns",
`{app=~"foo|bar"} |= "baz"`, []*validation.BlockedQuery{
// won't match
{
Pattern: `.*"buzz".*`,
Regex: true,
},
// will match
{
Pattern: `{app=~"foo|bar"} |= "baz"`,
Types: []string{QueryTypeFilter},
},
}, logqlmodel.ErrBlocked,
},
{
"no block: exact match not matching filter type",
`{app=~"foo|bar"} | json`, []*validation.BlockedQuery{
{
Pattern: `{app=~"foo|bar"} | json`, // "limited" query
Types: []string{QueryTypeFilter},
},
}, nil,
},
{
"regex match all types",
defaultQuery, []*validation.BlockedQuery{
{
Pattern: ".*foo.*",
Regex: true,
},
}, logqlmodel.ErrBlocked,
},
{
"regex match multiple types",
defaultQuery, []*validation.BlockedQuery{
{
Pattern: ".*foo.*",
Regex: true,
Types: []string{QueryTypeFilter, QueryTypeMetric},
},
}, logqlmodel.ErrBlocked,
},
{
"match all queries by type",
defaultQuery, []*validation.BlockedQuery{
{
Types: []string{QueryTypeFilter, QueryTypeMetric},
},
}, logqlmodel.ErrBlocked,
},
{
"no block: match all queries by type",
defaultQuery, []*validation.BlockedQuery{
{
Types: []string{QueryTypeLimited},
},
}, nil,
},
{
"regex does not compile",
defaultQuery, []*validation.BlockedQuery{
{
Pattern: "[.*",
Regex: true,
Types: []string{QueryTypeFilter, QueryTypeMetric},
},
}, nil,
},
{
"no blocked queries",
defaultQuery, []*validation.BlockedQuery{}, nil,
},
} {
t.Run(test.name, func(t *testing.T) {
limits.blockedQueries = test.blocked
q := eng.Query(LiteralParams{
qs: test.q,
start: time.Unix(0, 0),
end: time.Unix(100000, 0),
step: 60 * time.Second,
direction: logproto.FORWARD,
limit: 1000,
})
_, err := q.Exec(user.InjectOrgID(context.Background(), "fake"))
if test.expectedErr == nil {
require.NoError(t, err)
return
}
require.Error(t, err)
require.Equal(t, err.Error(), test.expectedErr.Error())
})
}
}

@ -35,7 +35,8 @@ import (
)
const (
DefaultEngineTimeout = 5 * time.Minute
DefaultEngineTimeout = 5 * time.Minute
DefaultBlockedQueryMessage = "blocked by policy"
)
var (
@ -45,6 +46,13 @@ var (
Help: "LogQL query timings",
Buckets: prometheus.DefBuckets,
}, []string{"query_type"})
QueriesBlocked = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "blocked_queries",
Help: "Count of queries blocked by per-tenant policy",
}, []string{"user"})
lastEntryMinTime = time.Unix(-100, 0)
)
@ -223,6 +231,7 @@ func (q *query) Exec(ctx context.Context) (logqlmodel.Result, error) {
if errors.Is(err, logqlmodel.ErrParse) ||
errors.Is(err, logqlmodel.ErrPipeline) ||
errors.Is(err, logqlmodel.ErrLimit) ||
errors.Is(err, logqlmodel.ErrBlocked) ||
errors.Is(err, context.Canceled) {
status = "400"
}
@ -251,6 +260,10 @@ func (q *query) Eval(ctx context.Context) (promql_parser.Value, error) {
return nil, err
}
if q.checkBlocked(ctx, tenants) {
return nil, logqlmodel.ErrBlocked
}
switch e := expr.(type) {
case syntax.SampleExpr:
value, err := q.evalSample(ctx, e)
@ -270,6 +283,19 @@ func (q *query) Eval(ctx context.Context) (promql_parser.Value, error) {
}
}
func (q *query) checkBlocked(ctx context.Context, tenants []string) bool {
blocker := newQueryBlocker(ctx, q)
for _, tenant := range tenants {
if blocker.isBlocked(tenant) {
QueriesBlocked.WithLabelValues(tenant).Inc()
return true
}
}
return false
}
// evalSample evaluate a sampleExpr
func (q *query) evalSample(ctx context.Context, expr syntax.SampleExpr) (promql_parser.Value, error) {
if lit, ok := expr.(*syntax.LiteralExpr); ok {

@ -3,6 +3,8 @@ package logql
import (
"math"
"time"
"github.com/grafana/loki/pkg/util/validation"
)
var (
@ -13,11 +15,13 @@ var (
type Limits interface {
MaxQuerySeries(userID string) int
QueryTimeout(userID string) time.Duration
BlockedQueries(userID string) []*validation.BlockedQuery
}
type fakeLimits struct {
maxSeries int
timeout time.Duration
maxSeries int
timeout time.Duration
blockedQueries []*validation.BlockedQuery
}
func (f fakeLimits) MaxQuerySeries(userID string) int {
@ -27,3 +31,7 @@ func (f fakeLimits) MaxQuerySeries(userID string) int {
func (f fakeLimits) QueryTimeout(userID string) time.Duration {
return f.timeout
}
func (f fakeLimits) BlockedQueries(userID string) []*validation.BlockedQuery {
return f.blockedQueries
}

@ -13,6 +13,7 @@ var (
ErrParse = errors.New("failed to parse the log query")
ErrPipeline = errors.New("failed execute pipeline")
ErrLimit = errors.New("limit reached while evaluating the query")
ErrBlocked = errors.New("query blocked by policy")
ErrorLabel = "__error__"
ErrorDetailsLabel = "__error_details__"
)

@ -29,6 +29,7 @@ import (
"github.com/grafana/loki/pkg/storage/config"
util_log "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/util/marshal"
"github.com/grafana/loki/pkg/util/validation"
)
var (
@ -624,6 +625,10 @@ func (f fakeLimits) QueryTimeout(string) time.Duration {
return f.queryTimeout
}
func (f fakeLimits) BlockedQueries(string) []*validation.BlockedQuery {
return []*validation.BlockedQuery{}
}
func counter() (*int, http.Handler) {
count := 0
var lock sync.Mutex

@ -56,7 +56,7 @@ func ClientHTTPStatusAndError(err error) (int, error) {
return http.StatusGatewayTimeout, errors.New(ErrDeadlineExceeded)
case errors.As(err, &queryErr):
return http.StatusBadRequest, err
case errors.Is(err, logqlmodel.ErrLimit) || errors.Is(err, logqlmodel.ErrParse) || errors.Is(err, logqlmodel.ErrPipeline):
case errors.Is(err, logqlmodel.ErrLimit) || errors.Is(err, logqlmodel.ErrParse) || errors.Is(err, logqlmodel.ErrPipeline) || errors.Is(err, logqlmodel.ErrBlocked):
return http.StatusBadRequest, err
case errors.Is(err, user.ErrNoOrgID):
return http.StatusBadRequest, err

@ -0,0 +1,9 @@
package validation
import "github.com/grafana/dskit/flagext"
type BlockedQuery struct {
Pattern string `yaml:"pattern"`
Regex bool `yaml:"regex"`
Types flagext.StringSliceCSV `yaml:"types"`
}

@ -25,6 +25,7 @@ import (
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/deletionmode"
"github.com/grafana/loki/pkg/util/flagext"
util_log "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/util/validation"
)
const (
@ -155,6 +156,8 @@ type Limits struct {
CompactorDeletionEnabled bool `yaml:"allow_deletes" json:"allow_deletes"`
ShardStreams *shardstreams.Config `yaml:"shard_streams" json:"shard_streams"`
BlockedQueries []*validation.BlockedQuery `yaml:"blocked_queries,omitempty" json:"blocked_queries,omitempty"`
}
type StreamRetention struct {
@ -634,6 +637,10 @@ func (o *Overrides) ShardStreams(userID string) *shardstreams.Config {
return o.getOverridesForUser(userID).ShardStreams
}
func (o *Overrides) BlockedQueries(userID string) []*validation.BlockedQuery {
return o.getOverridesForUser(userID).BlockedQueries
}
func (o *Overrides) DefaultLimits() *Limits {
return o.defaultLimits
}

@ -76,6 +76,9 @@ shard_streams:
enabled: true
desired_rate: 4mb
logging_enabled: true
blocked_queries:
- pattern: ".*foo.*"
regex: true
`
inputJSON := `
{
@ -117,7 +120,13 @@ shard_streams:
"desired_rate": "4mb",
"enabled": true,
"logging_enabled": true
}
},
"blocked_queries": [
{
"pattern": ".*foo.*",
"regex": true
}
]
}
`

Loading…
Cancel
Save