diff --git a/pkg/logql/downstream.go b/pkg/logql/downstream.go index 1703a034a0..02487f70b7 100644 --- a/pkg/logql/downstream.go +++ b/pkg/logql/downstream.go @@ -43,7 +43,7 @@ operand expression can take advantage of the parallel execution model: // querying the underlying backend shards individually and re-aggregating them. type DownstreamEngine struct { logger log.Logger - timeout time.Duration + opts EngineOpts downstreamable Downstreamable limits Limits } @@ -53,19 +53,21 @@ func NewDownstreamEngine(opts EngineOpts, downstreamable Downstreamable, limits opts.applyDefault() return &DownstreamEngine{ logger: logger, - timeout: opts.Timeout, + opts: opts, downstreamable: downstreamable, limits: limits, } } +func (ng *DownstreamEngine) Opts() EngineOpts { return ng.opts } + // Query constructs a Query -func (ng *DownstreamEngine) Query(p Params, mapped syntax.Expr) Query { +func (ng *DownstreamEngine) Query(ctx context.Context, p Params, mapped syntax.Expr) Query { return &query{ logger: ng.logger, - timeout: ng.timeout, + timeout: ng.opts.Timeout, params: p, - evaluator: NewDownstreamEvaluator(ng.downstreamable.Downstreamer()), + evaluator: NewDownstreamEvaluator(ng.downstreamable.Downstreamer(ctx)), parse: func(_ context.Context, _ string) (syntax.Expr, error) { return mapped, nil }, @@ -158,7 +160,7 @@ func ParseShards(strs []string) (Shards, error) { } type Downstreamable interface { - Downstreamer() Downstreamer + Downstreamer(context.Context) Downstreamer } type DownstreamQuery struct { diff --git a/pkg/logql/downstream_test.go b/pkg/logql/downstream_test.go index f65ae9a60e..bc532a05e0 100644 --- a/pkg/logql/downstream_test.go +++ b/pkg/logql/downstream_test.go @@ -78,12 +78,11 @@ func TestMappingEquivalence(t *testing.T) { qry := regular.Query(params) ctx := user.InjectOrgID(context.Background(), "fake") - mapper, err := NewShardMapper(shards, nilShardMetrics) - require.Nil(t, err) + mapper := NewShardMapper(ConstantShards(shards), nilShardMetrics) _, mapped, err := mapper.Parse(tc.query) require.Nil(t, err) - shardedQry := sharded.Query(params, mapped) + shardedQry := sharded.Query(ctx, params, mapped) res, err := qry.Exec(ctx) require.Nil(t, err) @@ -331,7 +330,7 @@ func TestRangeMappingEquivalence(t *testing.T) { require.False(t, noop, "downstream engine cannot execute noop") - rangeQry := downstreamEngine.Query(params, rangeExpr) + rangeQry := downstreamEngine.Query(ctx, params, rangeExpr) rangeRes, err := rangeQry.Exec(ctx) require.Nil(t, err) diff --git a/pkg/logql/shardmapper.go b/pkg/logql/shardmapper.go index d95df139b3..91cf66f856 100644 --- a/pkg/logql/shardmapper.go +++ b/pkg/logql/shardmapper.go @@ -16,23 +16,20 @@ type ShardResolver interface { Shards(expr syntax.Expr) (int, error) } -type constantShards int +type ConstantShards int -func (s constantShards) Shards(_ syntax.Expr) (int, error) { return int(s), nil } +func (s ConstantShards) Shards(_ syntax.Expr) (int, error) { return int(s), nil } type ShardMapper struct { - shards constantShards + shards ShardResolver metrics *MapperMetrics } -func NewShardMapper(shards int, metrics *MapperMetrics) (ShardMapper, error) { - if shards < 2 { - return ShardMapper{}, fmt.Errorf("cannot create ShardMapper with <2 shards. Received %d", shards) - } +func NewShardMapper(resolver ShardResolver, metrics *MapperMetrics) ShardMapper { return ShardMapper{ - shards: constantShards(shards), + shards: resolver, metrics: metrics, - }, nil + } } func NewShardMapperMetrics(registerer prometheus.Registerer) *MapperMetrics { @@ -116,6 +113,14 @@ func (m ShardMapper) mapLogSelectorExpr(expr syntax.LogSelectorExpr, r *downstre if err != nil { return nil, err } + if shards == 0 { + return &ConcatLogSelectorExpr{ + DownstreamLogSelectorExpr: DownstreamLogSelectorExpr{ + shard: nil, + LogSelectorExpr: expr, + }, + }, nil + } for i := shards - 1; i >= 0; i-- { head = &ConcatLogSelectorExpr{ DownstreamLogSelectorExpr: DownstreamLogSelectorExpr{ @@ -139,6 +144,14 @@ func (m ShardMapper) mapSampleExpr(expr syntax.SampleExpr, r *downstreamRecorder if err != nil { return nil, err } + if shards == 0 { + return &ConcatSampleExpr{ + DownstreamSampleExpr: DownstreamSampleExpr{ + shard: nil, + SampleExpr: expr, + }, + }, nil + } for i := shards - 1; i >= 0; i-- { head = &ConcatSampleExpr{ DownstreamSampleExpr: DownstreamSampleExpr{ diff --git a/pkg/logql/shardmapper_test.go b/pkg/logql/shardmapper_test.go index 1108bd2a44..f4349a2e67 100644 --- a/pkg/logql/shardmapper_test.go +++ b/pkg/logql/shardmapper_test.go @@ -51,8 +51,7 @@ func TestShardedStringer(t *testing.T) { } func TestMapSampleExpr(t *testing.T) { - m, err := NewShardMapper(2, nilShardMetrics) - require.Nil(t, err) + m := NewShardMapper(ConstantShards(2), nilShardMetrics) for _, tc := range []struct { in syntax.SampleExpr @@ -114,8 +113,7 @@ func TestMapSampleExpr(t *testing.T) { } func TestMappingStrings(t *testing.T) { - m, err := NewShardMapper(2, nilShardMetrics) - require.Nil(t, err) + m := NewShardMapper(ConstantShards(2), nilShardMetrics) for _, tc := range []struct { in string out string @@ -279,8 +277,7 @@ func TestMappingStrings(t *testing.T) { } func TestMapping(t *testing.T) { - m, err := NewShardMapper(2, nilShardMetrics) - require.Nil(t, err) + m := NewShardMapper(ConstantShards(2), nilShardMetrics) for _, tc := range []struct { in string diff --git a/pkg/logql/syntax/ast.go b/pkg/logql/syntax/ast.go index 19203c1004..0d8f0264e2 100644 --- a/pkg/logql/syntax/ast.go +++ b/pkg/logql/syntax/ast.go @@ -707,7 +707,7 @@ type SampleExpr interface { // Selector is the LogQL selector to apply when retrieving logs. Selector() LogSelectorExpr Extractor() (SampleExtractor, error) - MatcherGroups() [][]*labels.Matcher + MatcherGroups() []MatcherRange Expr } @@ -754,10 +754,16 @@ func (e *RangeAggregationExpr) Selector() LogSelectorExpr { return e.Left.Left } -func (e *RangeAggregationExpr) MatcherGroups() [][]*labels.Matcher { +func (e *RangeAggregationExpr) MatcherGroups() []MatcherRange { xs := e.Left.Left.Matchers() if len(xs) > 0 { - return [][]*labels.Matcher{xs} + return []MatcherRange{ + { + Matchers: xs, + Interval: e.Left.Interval, + Offset: e.Left.Offset, + }, + } } return nil } @@ -880,7 +886,7 @@ func mustNewVectorAggregationExpr(left SampleExpr, operation string, gr *Groupin } } -func (e *VectorAggregationExpr) MatcherGroups() [][]*labels.Matcher { +func (e *VectorAggregationExpr) MatcherGroups() []MatcherRange { return e.Left.MatcherGroups() } @@ -1005,7 +1011,7 @@ type BinOpExpr struct { Opts *BinOpOptions } -func (e *BinOpExpr) MatcherGroups() [][]*labels.Matcher { +func (e *BinOpExpr) MatcherGroups() []MatcherRange { return append(e.SampleExpr.MatcherGroups(), e.RHS.MatcherGroups()...) } @@ -1391,7 +1397,7 @@ func (e *LiteralExpr) Shardable() bool { return true } func (e *LiteralExpr) Walk(f WalkFn) { f(e) } func (e *LiteralExpr) Pipeline() (log.Pipeline, error) { return log.NewNoopPipeline(), nil } func (e *LiteralExpr) Matchers() []*labels.Matcher { return nil } -func (e *LiteralExpr) MatcherGroups() [][]*labels.Matcher { return nil } +func (e *LiteralExpr) MatcherGroups() []MatcherRange { return nil } func (e *LiteralExpr) Extractor() (log.SampleExtractor, error) { return nil, nil } func (e *LiteralExpr) Value() float64 { return e.Val } @@ -1446,7 +1452,7 @@ func (e *LabelReplaceExpr) Selector() LogSelectorExpr { return e.Left.Selector() } -func (e *LabelReplaceExpr) MatcherGroups() [][]*labels.Matcher { +func (e *LabelReplaceExpr) MatcherGroups() []MatcherRange { return e.Left.MatcherGroups() } @@ -1521,13 +1527,22 @@ var shardableOps = map[string]bool{ OpTypeMul: true, } -func MatcherGroups(expr Expr) [][]*labels.Matcher { +type MatcherRange struct { + Matchers []*labels.Matcher + Interval, Offset time.Duration +} + +func MatcherGroups(expr Expr) []MatcherRange { switch e := expr.(type) { case SampleExpr: return e.MatcherGroups() case LogSelectorExpr: if xs := e.Matchers(); len(xs) > 0 { - return [][]*labels.Matcher{xs} + return []MatcherRange{ + { + Matchers: xs, + }, + } } return nil default: diff --git a/pkg/logql/syntax/ast_test.go b/pkg/logql/syntax/ast_test.go index 4507eb5933..4c3c112345 100644 --- a/pkg/logql/syntax/ast_test.go +++ b/pkg/logql/syntax/ast_test.go @@ -3,6 +3,7 @@ package syntax import ( "fmt" "testing" + "time" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql" @@ -182,19 +183,34 @@ func Test_SampleExpr_String(t *testing.T) { func TestMatcherGroups(t *testing.T) { for i, tc := range []struct { query string - exp [][]*labels.Matcher + exp []MatcherRange }{ { query: `{job="foo"}`, - exp: [][]*labels.Matcher{ - {labels.MustNewMatcher(labels.MatchEqual, "job", "foo")}, + exp: []MatcherRange{ + { + Matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "job", "foo"), + }, + }, }, }, { - query: `count_over_time({job="foo"}[5m]) / count_over_time({job="bar"}[5m])`, - exp: [][]*labels.Matcher{ - {labels.MustNewMatcher(labels.MatchEqual, "job", "foo")}, - {labels.MustNewMatcher(labels.MatchEqual, "job", "bar")}, + query: `count_over_time({job="foo"}[5m]) / count_over_time({job="bar"}[5m] offset 10m)`, + exp: []MatcherRange{ + { + Interval: 5 * time.Minute, + Matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "job", "foo"), + }, + }, + { + Interval: 5 * time.Minute, + Offset: 10 * time.Minute, + Matchers: []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "job", "bar"), + }, + }, }, }, } { diff --git a/pkg/logql/test_utils.go b/pkg/logql/test_utils.go index 225ad0d5a2..3acee973b7 100644 --- a/pkg/logql/test_utils.go +++ b/pkg/logql/test_utils.go @@ -212,7 +212,7 @@ type MockDownstreamer struct { *Engine } -func (m MockDownstreamer) Downstreamer() Downstreamer { return m } +func (m MockDownstreamer) Downstreamer(_ context.Context) Downstreamer { return m } func (m MockDownstreamer) Downstream(ctx context.Context, queries []DownstreamQuery) ([]logqlmodel.Result, error) { results := make([]logqlmodel.Result, 0, len(queries)) diff --git a/pkg/querier/queryrange/downstreamer.go b/pkg/querier/queryrange/downstreamer.go index 8e7b184bc2..92a836fe33 100644 --- a/pkg/querier/queryrange/downstreamer.go +++ b/pkg/querier/queryrange/downstreamer.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/go-kit/log/level" + "github.com/grafana/dskit/tenant" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql/parser" @@ -21,7 +22,8 @@ const ( ) type DownstreamHandler struct { - next queryrangebase.Handler + limits Limits + next queryrangebase.Handler } func ParamsToLokiRequest(params logql.Params, shards logql.Shards) queryrangebase.Request { @@ -54,8 +56,19 @@ func ParamsToLokiRequest(params logql.Params, shards logql.Shards) queryrangebas // from creating an unreasonably large number of goroutines, such as // the case of a query like `a / a / a / a / a ..etc`, which could try // to shard each leg, quickly dispatching an unreasonable number of goroutines. -func (h DownstreamHandler) Downstreamer() logql.Downstreamer { +// In the future, it's probably better to replace this with a channel based API +// so we don't have to do all this ugly edge case handling/accounting +func (h DownstreamHandler) Downstreamer(ctx context.Context) logql.Downstreamer { p := DefaultDownstreamConcurrency + + // We may increase parallelism above the default, + // ensure we don't end up bottlenecking here. + if user, err := tenant.TenantID(ctx); err == nil { + if x := h.limits.MaxQueryParallelism(user); x > 0 { + p = x + } + } + locks := make(chan struct{}, p) for i := 0; i < p; i++ { locks <- struct{}{} diff --git a/pkg/querier/queryrange/downstreamer_test.go b/pkg/querier/queryrange/downstreamer_test.go index 887cfa8e62..d0459d0402 100644 --- a/pkg/querier/queryrange/downstreamer_test.go +++ b/pkg/querier/queryrange/downstreamer_test.go @@ -10,6 +10,7 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql" "github.com/stretchr/testify/require" + "github.com/weaveworks/common/user" "go.uber.org/atomic" "github.com/grafana/loki/pkg/logproto" @@ -188,8 +189,8 @@ func TestResponseToResult(t *testing.T) { func TestDownstreamHandler(t *testing.T) { // Pretty poor test, but this is just a passthrough struct, so ensure we create locks // and can consume them - h := DownstreamHandler{nil} - in := h.Downstreamer().(*instance) + h := DownstreamHandler{limits: fakeLimits{}, next: nil} + in := h.Downstreamer(context.Background()).(*instance) require.Equal(t, DefaultDownstreamConcurrency, in.parallelism) require.NotNil(t, in.locks) ensureParallelism(t, in, in.parallelism) @@ -213,7 +214,12 @@ func ensureParallelism(t *testing.T, in *instance, n int) { } func TestInstanceFor(t *testing.T) { - mkIn := func() *instance { return DownstreamHandler{nil}.Downstreamer().(*instance) } + mkIn := func() *instance { + return DownstreamHandler{ + limits: fakeLimits{}, + next: nil, + }.Downstreamer(context.Background()).(*instance) + } in := mkIn() queries := make([]logql.DownstreamQuery, in.parallelism+1) @@ -339,7 +345,10 @@ func TestInstanceDownstream(t *testing.T) { expected, err := ResponseToResult(expectedResp()) require.Nil(t, err) - results, err := DownstreamHandler{handler}.Downstreamer().Downstream(context.Background(), queries) + results, err := DownstreamHandler{ + limits: fakeLimits{}, + next: handler, + }.Downstreamer(context.Background()).Downstream(context.Background(), queries) require.Equal(t, want, got) @@ -348,7 +357,12 @@ func TestInstanceDownstream(t *testing.T) { } func TestCancelWhileWaitingResponse(t *testing.T) { - mkIn := func() *instance { return DownstreamHandler{nil}.Downstreamer().(*instance) } + mkIn := func() *instance { + return DownstreamHandler{ + limits: fakeLimits{}, + next: nil, + }.Downstreamer(context.Background()).(*instance) + } in := mkIn() queries := make([]logql.DownstreamQuery, in.parallelism+1) @@ -376,3 +390,20 @@ func TestCancelWhileWaitingResponse(t *testing.T) { "The parent context calling the Downstreamer For method was canceled "+ "but the For method did not return as expected.") } + +func TestDownstreamerUsesCorrectParallelism(t *testing.T) { + ctx := user.InjectOrgID(context.Background(), "fake") + l := fakeLimits{maxQueryParallelism: 4} + d := DownstreamHandler{ + limits: l, + next: nil, + }.Downstreamer(ctx) + + i := d.(*instance) + close(i.locks) + var ct int + for range i.locks { + ct++ + } + require.Equal(t, l.maxQueryParallelism, ct) +} diff --git a/pkg/querier/queryrange/querysharding.go b/pkg/querier/queryrange/querysharding.go index abab1186ca..01a117e681 100644 --- a/pkg/querier/queryrange/querysharding.go +++ b/pkg/querier/queryrange/querysharding.go @@ -69,13 +69,14 @@ func newASTMapperware( next queryrangebase.Handler, logger log.Logger, metrics *logql.MapperMetrics, - limits logql.Limits, + limits Limits, ) *astMapperware { return &astMapperware{ confs: confs, logger: log.With(logger, "middleware", "QueryShard.astMapperware"), + limits: limits, next: next, - ng: logql.NewDownstreamEngine(logql.EngineOpts{}, DownstreamHandler{next}, limits, logger), + ng: logql.NewDownstreamEngine(logql.EngineOpts{}, DownstreamHandler{next: next, limits: limits}, limits, logger), metrics: metrics, } } @@ -83,6 +84,7 @@ func newASTMapperware( type astMapperware struct { confs ShardingConfigs logger log.Logger + limits Limits next queryrangebase.Handler ng *logql.DownstreamEngine metrics *logql.MapperMetrics @@ -97,7 +99,25 @@ func (ast *astMapperware) Do(ctx context.Context, r queryrangebase.Request) (que return ast.next.Do(ctx, r) } - mapper, err := logql.NewShardMapper(int(conf.RowShards), ast.metrics) + userID, err := tenant.TenantID(ctx) + if err != nil { + return nil, err + } + + resolver, ok := shardResolverForConf( + ctx, + conf, + ast.ng.Opts().MaxLookBackPeriod, + ast.logger, + ast.limits.MaxQueryParallelism(userID), + r, + ast.next, + ) + if !ok { + return ast.next.Do(ctx, r) + } + + mapper := logql.NewShardMapper(resolver, ast.metrics) if err != nil { return nil, err } @@ -129,7 +149,7 @@ func (ast *astMapperware) Do(ctx context.Context, r queryrangebase.Request) (que default: return nil, fmt.Errorf("expected *LokiRequest or *LokiInstantRequest, got (%T)", r) } - query := ast.ng.Query(params, parsed) + query := ast.ng.Query(ctx, params, parsed) res, err := query.Exec(ctx) if err != nil { @@ -211,7 +231,7 @@ func (splitter *shardSplitter) Do(ctx context.Context, r queryrangebase.Request) func hasShards(confs ShardingConfigs) bool { for _, conf := range confs { - if conf.RowShards > 0 { + if conf.RowShards > 0 || conf.IndexType == config.TSDBType { return true } } @@ -250,7 +270,7 @@ func (confs ShardingConfigs) GetConf(r queryrangebase.Request) (config.PeriodCon } // query doesn't have shard factor, so don't try to do AST mapping. - if conf.RowShards < 2 { + if conf.RowShards < 2 && conf.IndexType != config.TSDBType { return conf, errors.Errorf("shard factor not high enough: [%d]", conf.RowShards) } @@ -307,10 +327,6 @@ func (ss *seriesShardingHandler) Do(ctx context.Context, r queryrangebase.Reques return ss.next.Do(ctx, r) } - if conf.RowShards <= 1 { - return ss.next.Do(ctx, r) - } - req, ok := r.(*LokiSeriesRequest) if !ok { return nil, fmt.Errorf("expected *LokiSeriesRequest, got (%T)", r) diff --git a/pkg/querier/queryrange/querysharding_test.go b/pkg/querier/queryrange/querysharding_test.go index 96e2c107d0..0446cb1f11 100644 --- a/pkg/querier/queryrange/querysharding_test.go +++ b/pkg/querier/queryrange/querysharding_test.go @@ -159,7 +159,7 @@ func Test_astMapper(t *testing.T) { fakeLimits{maxSeries: math.MaxInt32, maxQueryParallelism: 1}, ) - resp, err := mware.Do(context.Background(), defaultReq().WithQuery(`{food="bar"}`)) + resp, err := mware.Do(user.InjectOrgID(context.Background(), "1"), defaultReq().WithQuery(`{food="bar"}`)) require.Nil(t, err) expected, err := LokiCodec.MergeResponse(lokiResps...) @@ -188,7 +188,7 @@ func Test_ShardingByPass(t *testing.T) { fakeLimits{maxSeries: math.MaxInt32, maxQueryParallelism: 1}, ) - _, err := mware.Do(context.Background(), defaultReq().WithQuery(`1+1`)) + _, err := mware.Do(user.InjectOrgID(context.Background(), "1"), defaultReq().WithQuery(`1+1`)) require.Nil(t, err) require.Equal(t, called, 1) } diff --git a/pkg/querier/queryrange/shard_resolver.go b/pkg/querier/queryrange/shard_resolver.go new file mode 100644 index 0000000000..a8a7d63f96 --- /dev/null +++ b/pkg/querier/queryrange/shard_resolver.go @@ -0,0 +1,158 @@ +package queryrange + +import ( + "context" + "fmt" + math "math" + strings "strings" + "time" + + "github.com/dustin/go-humanize" + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/grafana/dskit/concurrency" + "github.com/prometheus/common/model" + + "github.com/grafana/loki/pkg/logql" + "github.com/grafana/loki/pkg/logql/syntax" + "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase" + "github.com/grafana/loki/pkg/storage/config" + "github.com/grafana/loki/pkg/storage/stores/index/stats" + "github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway/indexgatewaypb" + "github.com/grafana/loki/pkg/util/spanlogger" +) + +func shardResolverForConf( + ctx context.Context, + conf config.PeriodConfig, + defaultLookback time.Duration, + logger log.Logger, + maxParallelism int, + r queryrangebase.Request, + handler queryrangebase.Handler, +) (logql.ShardResolver, bool) { + if conf.IndexType == config.TSDBType { + return &dynamicShardResolver{ + ctx: ctx, + logger: logger, + handler: handler, + from: model.Time(r.GetStart()), + through: model.Time(r.GetEnd()), + maxParallelism: maxParallelism, + defaultLookback: defaultLookback, + }, true + } + if conf.RowShards < 2 { + return nil, false + } + return logql.ConstantShards(conf.RowShards), true +} + +type dynamicShardResolver struct { + ctx context.Context + handler queryrangebase.Handler + logger log.Logger + + from, through model.Time + maxParallelism int + defaultLookback time.Duration +} + +func (r *dynamicShardResolver) Shards(e syntax.Expr) (int, error) { + sp, _ := spanlogger.NewWithLogger(r.ctx, r.logger, "dynamicShardResolver.Shards") + defer sp.Finish() + // We try to shard subtrees in the AST independently if possible, although + // nested binary expressions can make this difficult. In this case, + // we query the index stats for all matcher groups then sum the results. + grps := syntax.MatcherGroups(e) + + // If there are zero matchers groups, we'll inject one to query everything + if len(grps) == 0 { + grps = append(grps, syntax.MatcherRange{}) + } + + results := make([]*stats.Stats, 0, len(grps)) + + start := time.Now() + if err := concurrency.ForEachJob(r.ctx, len(grps), r.maxParallelism, func(ctx context.Context, i int) error { + matchers := syntax.MatchersString(grps[i].Matchers) + diff := grps[i].Interval + grps[i].Offset + adjustedFrom := r.from.Add(-diff) + if grps[i].Interval == 0 { + adjustedFrom = adjustedFrom.Add(-r.defaultLookback) + } + + adjustedThrough := r.through.Add(-grps[i].Offset) + + start := time.Now() + resp, err := r.handler.Do(r.ctx, &indexgatewaypb.IndexStatsRequest{ + From: adjustedFrom, + Through: adjustedThrough, + Matchers: matchers, + }) + if err != nil { + return err + } + + casted, ok := resp.(*IndexStatsResponse) + if !ok { + return fmt.Errorf("expected *IndexStatsResponse while querying index, got %T", resp) + } + + results = append(results, casted.Response) + level.Debug(sp).Log( + "msg", "queried index", + "type", "single", + "matchers", matchers, + "bytes", strings.Replace(humanize.Bytes(casted.Response.Bytes), " ", "", 1), + "chunks", casted.Response.Chunks, + "streams", casted.Response.Streams, + "entries", casted.Response.Entries, + "duration", time.Since(start), + "from", adjustedFrom.Time(), + "through", adjustedThrough.Time(), + "length", adjustedThrough.Sub(adjustedFrom), + ) + return nil + }); err != nil { + return 0, err + } + + combined := stats.MergeStats(results...) + factor := guessShardFactor(combined, r.maxParallelism) + level.Debug(sp).Log( + "msg", "queried index", + "type", "combined", + "len", len(results), + "bytes", strings.Replace(humanize.Bytes(combined.Bytes), " ", "", 1), + "chunks", combined.Chunks, + "streams", combined.Streams, + "entries", combined.Entries, + "max_parallelism", r.maxParallelism, + "duration", time.Since(start), + "factor", factor, + ) + return factor, nil +} + +const ( + // Just some observed values to get us started on better query planning. + p90BytesPerSecond = 300 << 20 // 300MB/s/core + // At max, schedule a query for 10s of execution before + // splitting it into more requests. This is a lot of guesswork. + maxSeconds = 10 + maxSchedulableBytes = maxSeconds * p90BytesPerSecond +) + +func guessShardFactor(stats stats.Stats, maxParallelism int) int { + expectedSeconds := float64(stats.Bytes / p90BytesPerSecond) + if expectedSeconds <= float64(maxParallelism) { + power := math.Ceil(math.Log2(expectedSeconds)) // round up to nearest power of 2 + // Ideally, parallelize down to 1s queries + return int(math.Pow(2, power)) + } + + n := stats.Bytes / maxSchedulableBytes + power := math.Ceil(math.Log2(float64(n))) + return int(math.Pow(2, power)) +} diff --git a/pkg/querier/queryrange/shard_resolver_test.go b/pkg/querier/queryrange/shard_resolver_test.go new file mode 100644 index 0000000000..7c6166c43a --- /dev/null +++ b/pkg/querier/queryrange/shard_resolver_test.go @@ -0,0 +1,60 @@ +package queryrange + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/pkg/storage/stores/index/stats" +) + +func TestGuessShardFactor(t *testing.T) { + for _, tc := range []struct { + stats stats.Stats + maxParallelism int + exp int + }{ + { + // no data == no sharding + exp: 0, + maxParallelism: 10, + }, + { + exp: 4, + maxParallelism: 10, + stats: stats.Stats{ + Bytes: 1200 << 20, // 1200MB + }, + }, + { + exp: 8, + maxParallelism: 10, + // 1500MB moves us to the next + // power of 2 parallelism factor + stats: stats.Stats{ + Bytes: 1500 << 20, + }, + }, + { + // Two fully packed parallelism cycles + exp: 16, + maxParallelism: 8, + stats: stats.Stats{ + Bytes: maxSchedulableBytes * 16, + }, + }, + { + // increase to next factor of two + exp: 32, + maxParallelism: 8, + stats: stats.Stats{ + Bytes: maxSchedulableBytes * 17, + }, + }, + } { + t.Run(fmt.Sprintf("%+v", tc.stats), func(t *testing.T) { + require.Equal(t, tc.exp, guessShardFactor(tc.stats, tc.maxParallelism)) + }) + } +} diff --git a/pkg/querier/queryrange/split_by_range.go b/pkg/querier/queryrange/split_by_range.go index a0d3364958..1a6a29324c 100644 --- a/pkg/querier/queryrange/split_by_range.go +++ b/pkg/querier/queryrange/split_by_range.go @@ -31,10 +31,13 @@ type splitByRange struct { func NewSplitByRangeMiddleware(logger log.Logger, limits Limits, metrics *logql.MapperMetrics) queryrangebase.Middleware { return queryrangebase.MiddlewareFunc(func(next queryrangebase.Handler) queryrangebase.Handler { return &splitByRange{ - logger: log.With(logger, "middleware", "InstantQuery.splitByRangeVector"), - next: next, - limits: limits, - ng: logql.NewDownstreamEngine(logql.EngineOpts{}, DownstreamHandler{next}, limits, logger), + logger: log.With(logger, "middleware", "InstantQuery.splitByRangeVector"), + next: next, + limits: limits, + ng: logql.NewDownstreamEngine(logql.EngineOpts{}, DownstreamHandler{ + limits: limits, + next: next, + }, limits, logger), metrics: metrics, } }) @@ -80,7 +83,7 @@ func (s *splitByRange) Do(ctx context.Context, request queryrangebase.Request) ( return nil, fmt.Errorf("expected *LokiInstantRequest") } - query := s.ng.Query(params, parsed) + query := s.ng.Query(ctx, params, parsed) res, err := query.Exec(ctx) if err != nil { diff --git a/pkg/storage/stores/shipper/indexgateway/indexgatewaypb/compat.go b/pkg/storage/stores/shipper/indexgateway/indexgatewaypb/compat.go index 07f7e44d3f..73ce2b3722 100644 --- a/pkg/storage/stores/shipper/indexgateway/indexgatewaypb/compat.go +++ b/pkg/storage/stores/shipper/indexgateway/indexgatewaypb/compat.go @@ -15,12 +15,12 @@ import ( // GetStart returns the start timestamp of the request in milliseconds. func (m *IndexStatsRequest) GetStart() int64 { - return m.From.UnixNano() / (int64(time.Millisecond) / int64(time.Nanosecond)) + return int64(m.From) } // GetEnd returns the end timestamp of the request in milliseconds. func (m *IndexStatsRequest) GetEnd() int64 { - return m.Through.UnixNano() / (int64(time.Millisecond) / int64(time.Nanosecond)) + return int64(m.Through) } // GetStep returns the step of the request in milliseconds. diff --git a/pkg/storage/stores/tsdb/chunkwriter.go b/pkg/storage/stores/tsdb/chunkwriter.go index 6241ec47d5..b555f13746 100644 --- a/pkg/storage/stores/tsdb/chunkwriter.go +++ b/pkg/storage/stores/tsdb/chunkwriter.go @@ -2,6 +2,7 @@ package tsdb import ( "context" + "math" "github.com/go-kit/log/level" "github.com/pkg/errors" @@ -77,12 +78,13 @@ func (w *ChunkWriter) PutOne(ctx context.Context, from, through model.Time, chk } // Always write the index to benefit durability via replication factor. + approxKB := math.Round(float64(chk.Data.Size()) / float64(1<<10)) metas := index.ChunkMetas{ { Checksum: chk.ChunkRef.Checksum, MinTime: int64(chk.ChunkRef.From), MaxTime: int64(chk.ChunkRef.Through), - KB: uint32(chk.Size()) / (1 << 10), + KB: uint32(approxKB), Entries: uint32(chk.Data.Entries()), }, }