Bypass sharding middleware when a query can't be sharded. (#2709)

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
pull/2716/head
Cyril Tovena 5 years ago committed by GitHub
parent c00c7ed252
commit 3640429532
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 19
      pkg/logql/sharding.go
  2. 13
      pkg/logql/sharding_test.go
  3. 40
      pkg/querier/queryrange/querysharding.go
  4. 23
      pkg/querier/queryrange/querysharding_test.go

@ -8,7 +8,6 @@ import (
"github.com/cortexproject/cortex/pkg/querier/astmapper"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/spanlogger"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/prometheus/promql"
@ -47,25 +46,13 @@ func NewShardedEngine(opts EngineOpts, downstreamable Downstreamable, metrics *S
}
// Query constructs a Query
func (ng *ShardedEngine) Query(p Params, shards int) Query {
func (ng *ShardedEngine) Query(p Params, mapped Expr) Query {
return &query{
timeout: ng.timeout,
params: p,
evaluator: NewDownstreamEvaluator(ng.downstreamable.Downstreamer()),
parse: func(ctx context.Context, query string) (Expr, error) {
logger := spanlogger.FromContext(ctx)
mapper, err := NewShardMapper(shards, ng.metrics)
if err != nil {
return nil, err
}
noop, parsed, err := mapper.Parse(query)
if err != nil {
level.Warn(logger).Log("msg", "failed mapping AST", "err", err.Error(), "query", query)
return nil, err
}
level.Debug(logger).Log("no-op", noop, "mapped", parsed.String())
return parsed, nil
parse: func(_ context.Context, _ string) (Expr, error) {
return mapped, nil
},
}
}

@ -71,12 +71,19 @@ func TestMappingEquivalence(t *testing.T) {
nil,
)
qry := regular.Query(params)
shardedQry := sharded.Query(params, shards)
ctx := context.Background()
res, err := qry.Exec(context.Background())
mapper, err := NewShardMapper(shards, nilMetrics)
require.Nil(t, err)
_, mapped, err := mapper.Parse(tc.query)
require.Nil(t, err)
shardedQry := sharded.Query(params, mapped)
res, err := qry.Exec(ctx)
require.Nil(t, err)
shardedRes, err := shardedQry.Exec(context.Background())
shardedRes, err := shardedQry.Exec(ctx)
require.Nil(t, err)
if tc.approximate {

@ -63,18 +63,20 @@ func newASTMapperware(
) *astMapperware {
return &astMapperware{
confs: confs,
logger: log.With(logger, "middleware", "QueryShard.astMapperware"),
next: next,
ng: logql.NewShardedEngine(logql.EngineOpts{}, DownstreamHandler{next}, metrics),
confs: confs,
logger: log.With(logger, "middleware", "QueryShard.astMapperware"),
next: next,
ng: logql.NewShardedEngine(logql.EngineOpts{}, DownstreamHandler{next}, metrics),
metrics: metrics,
}
}
type astMapperware struct {
confs queryrange.ShardingConfigs
logger log.Logger
next queryrange.Handler
ng *logql.ShardedEngine
confs queryrange.ShardingConfigs
logger log.Logger
next queryrange.Handler
ng *logql.ShardedEngine
metrics *logql.ShardingMetrics
}
func (ast *astMapperware) Do(ctx context.Context, r queryrange.Request) (queryrange.Response, error) {
@ -92,8 +94,26 @@ func (ast *astMapperware) Do(ctx context.Context, r queryrange.Request) (queryra
if !ok {
return nil, fmt.Errorf("expected *LokiRequest, got (%T)", r)
}
params := paramsFromRequest(req)
query := ast.ng.Query(params, int(conf.RowShards))
mapper, err := logql.NewShardMapper(int(conf.RowShards), ast.metrics)
if err != nil {
return nil, err
}
noop, parsed, err := mapper.Parse(r.GetQuery())
if err != nil {
level.Warn(shardedLog).Log("msg", "failed mapping AST", "err", err.Error(), "query", r.GetQuery())
return nil, err
}
level.Debug(shardedLog).Log("no-op", noop, "mapped", parsed.String())
if noop {
// the ast can't be mapped to a sharded equivalent
// so we can bypass the sharding engine.
return ast.next.Do(ctx, r)
}
query := ast.ng.Query(paramsFromRequest(req), parsed)
res, err := query.Exec(ctx)
if err != nil {

@ -159,6 +159,29 @@ func Test_astMapper(t *testing.T) {
}
func Test_ShardingByPass(t *testing.T) {
called := 0
handler := queryrange.HandlerFunc(func(ctx context.Context, req queryrange.Request) (queryrange.Response, error) {
called++
return nil, nil
})
mware := newASTMapperware(
queryrange.ShardingConfigs{
chunk.PeriodConfig{
RowShards: 2,
},
},
handler,
log.NewNopLogger(),
nilShardingMetrics,
)
_, err := mware.Do(context.Background(), defaultReq().WithQuery(`1+1`))
require.Nil(t, err)
require.Equal(t, called, 1)
}
func Test_hasShards(t *testing.T) {
for i, tc := range []struct {
input queryrange.ShardingConfigs

Loading…
Cancel
Save