Fix instant query summary split stats (#9773)

**What this PR does / why we need it**:

Fix instant query summary statistic's `splits` corresponding to the
number of subqueries a query is split into based on
`split_queries_by_interval`.

* Update rangemapper with a statistics structure to include the number
of split queries a query is mapped into.
* In the `split_by_range` middleware once the mapped query is returned
update the middleware statistics with the number of split queries. This
value will then be merged with the statistics of the Loki response.

**Checklist**
- [ ] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- [ ] Documentation added
- [x] Tests updated
- [x] `CHANGELOG.md` updated
- [ ] If the change is worth mentioning in the release notes, add
`add-to-release-notes` label
- [ ] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/upgrading/_index.md`
- [ ] For Helm chart changes bump the Helm chart version in
`production/helm/loki/Chart.yaml` and update
`production/helm/loki/CHANGELOG.md` and
`production/helm/loki/README.md`. [Example
PR](d10549e3ec)
pull/9780/head
Susana Ferreira 3 years ago committed by GitHub
parent 86d943d8f4
commit 35465d0297
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 2
      pkg/logql/downstream_test.go
  3. 24
      pkg/logql/mapper_stats.go
  4. 8
      pkg/logql/rangemapper.go
  5. 201
      pkg/logql/rangemapper_test.go
  6. 4
      pkg/logqlmodel/stats/context.go
  7. 105
      pkg/querier/queryrange/roundtrip_test.go
  8. 10
      pkg/querier/queryrange/split_by_range.go
  9. 22
      pkg/querier/queryrange/stats.go

@ -57,6 +57,7 @@
* [9650](https://github.com/grafana/loki/pull/9650) **ashwanthgoli**: Config: ensure storage config defaults apply to named stores.
* [9629](https://github.com/grafana/loki/pull/9629) **periklis**: Fix duplicate label values from ingester streams.
* [9763](https://github.com/grafana/loki/pull/9763) **ssncferreira**: Fix the logic of the `offset` operator for downstream queries on instant query splitting of (range) vector aggregation expressions containing an offset.
* [9773](https://github.com/grafana/loki/pull/9773) **ssncferreira**: Fix instant query summary statistic's `splits` corresponding to the number of subqueries a query is split into based on `split_queries_by_interval`.
##### Changes

@ -408,7 +408,7 @@ func TestRangeMappingEquivalence(t *testing.T) {
require.Nil(t, err)
// Downstream engine - split by range
rangeMapper, err := NewRangeMapper(tc.splitByInterval, nilRangeMetrics)
rangeMapper, err := NewRangeMapper(tc.splitByInterval, nilRangeMetrics, NewMapperStats())
require.Nil(t, err)
noop, rangeExpr, err := rangeMapper.Parse(tc.query)
require.Nil(t, err)

@ -0,0 +1,24 @@
package logql
type MapperStats struct {
splitQueries int
}
func NewMapperStats() *MapperStats {
return &MapperStats{}
}
// AddSplitQueries add num split queries to the counter
func (s *MapperStats) AddSplitQueries(num int) {
s.splitQueries += num
}
// GetSplitQueries returns the number of split queries
func (s *MapperStats) GetSplitQueries() int {
return s.splitQueries
}
// resetSplitQueries resets the number of split queries
func (s *MapperStats) resetSplitQueries() {
s.splitQueries = 0
}

@ -56,17 +56,19 @@ var splittableRangeVectorOp = map[string]struct{}{
type RangeMapper struct {
splitByInterval time.Duration
metrics *MapperMetrics
stats *MapperStats
}
// NewRangeMapper creates a new RangeMapper instance with the given duration as
// split interval. The interval must be greater than 0.
func NewRangeMapper(interval time.Duration, metrics *MapperMetrics) (RangeMapper, error) {
func NewRangeMapper(interval time.Duration, metrics *MapperMetrics, stats *MapperStats) (RangeMapper, error) {
if interval <= 0 {
return RangeMapper{}, fmt.Errorf("cannot create RangeMapper with splitByInterval <= 0; got %s", interval)
}
return RangeMapper{
splitByInterval: interval,
metrics: metrics,
stats: stats,
}, nil
}
@ -100,6 +102,8 @@ func (m RangeMapper) Parse(query string) (bool, syntax.Expr, error) {
noop := origExpr.String() == modExpr.String()
if noop {
// reset split queries counter if the query is a noop
m.stats.resetSplitQueries()
m.metrics.ParsedQueries.WithLabelValues(NoopKey).Inc()
} else {
m.metrics.ParsedQueries.WithLabelValues(SuccessKey).Inc()
@ -344,6 +348,8 @@ func (m RangeMapper) mapConcatSampleExpr(expr syntax.SampleExpr, rangeInterval t
downstreams = appendDownstream(downstreams, expr, splitRangeInterval, splitOffset)
}
// Update stats and metrics
m.stats.AddSplitQueries(splitCount)
recorder.Add(splitCount, MetricsKey)
return downstreams

File diff suppressed because it is too large Load Diff

@ -382,6 +382,10 @@ func (c *Context) AddCacheRequest(t CacheType, i int) {
atomic.AddInt32(&stats.Requests, int32(i))
}
func (c *Context) AddSplitQueries(num int64) {
atomic.AddInt64(&c.result.Summary.Splits, num)
}
func (c *Context) getCacheStatsByType(t CacheType) *Cache {
var stats *Cache
switch t {

@ -1162,6 +1162,111 @@ func Test_getOperation(t *testing.T) {
}
}
func TestMetricsTripperware_SplitShardStats(t *testing.T) {
l := WithSplitByLimits(fakeLimits{
maxSeries: math.MaxInt32,
maxQueryParallelism: 1,
tsdbMaxQueryParallelism: 1,
queryTimeout: 1 * time.Minute,
}, 1*time.Hour) // 1 hour split time interval
statsTestCfg := testConfig
statsTestCfg.ShardedQueries = true
statsSchemas := testSchemas
statsSchemas[0].RowShards = 4
for _, tc := range []struct {
name string
request queryrangebase.Request
expectedSplitStats int64
expectedShardStats int64
}{
{
name: "instant query split",
request: &LokiInstantRequest{
Query: `sum by (app) (rate({app="foo"} |= "foo"[2h]))`,
Limit: 1000,
TimeTs: testTime,
Direction: logproto.FORWARD,
Path: "/loki/api/v1/query",
},
expectedSplitStats: 2, // [2h] interval split by 1h configured split interval
expectedShardStats: 8, // 2 time splits * 4 row shards
},
{
name: "instant query split not split",
request: &LokiInstantRequest{
Query: `sum by (app) (rate({app="foo"} |= "foo"[1h]))`,
Limit: 1000,
TimeTs: testTime,
Direction: logproto.FORWARD,
Path: "/loki/api/v1/query",
},
expectedSplitStats: 0, // [1h] interval not split
expectedShardStats: 4, // 4 row shards
},
{
name: "range query split",
request: &LokiRequest{
Query: `sum by (app) (rate({app="foo"} |= "foo"[1h]))`,
Limit: 1000,
Step: 30000, // 30sec
StartTs: testTime.Add(-2 * time.Hour),
EndTs: testTime,
Direction: logproto.FORWARD,
Path: "/query_range",
},
expectedSplitStats: 3, // 2 hour range interval split based on the base hour + the remainder
expectedShardStats: 12, // 3 time splits * 4 row shards
},
{
name: "range query not split",
request: &LokiRequest{
Query: `sum by (app) (rate({app="foo"} |= "foo"[1h]))`,
Limit: 1000,
Step: 30000, // 30sec
StartTs: testTime.Add(-1 * time.Minute),
EndTs: testTime,
Direction: logproto.FORWARD,
Path: "/query_range",
},
expectedSplitStats: 0, // 1 minute range interval not split
expectedShardStats: 4, // 4 row shards
},
} {
t.Run(tc.name, func(t *testing.T) {
tpw, stopper, err := NewTripperware(statsTestCfg, testEngineOpts, util_log.Logger, l, config.SchemaConfig{Configs: statsSchemas}, nil, false, nil)
if stopper != nil {
defer stopper.Stop()
}
require.NoError(t, err)
ctx := user.InjectOrgID(context.Background(), "1")
req, err := LokiCodec.EncodeRequest(ctx, tc.request)
require.NoError(t, err)
req = req.WithContext(ctx)
err = user.InjectOrgIDIntoHTTPRequest(ctx, req)
require.NoError(t, err)
rt, err := newfakeRoundTripper()
require.NoError(t, err)
defer rt.Close()
_, h := promqlResult(matrix)
rt.setHandler(h)
resp, err := tpw(rt).RoundTrip(req)
require.NoError(t, err)
lokiResponse, err := LokiCodec.DecodeResponse(ctx, resp, tc.request)
require.NoError(t, err)
require.Equal(t, tc.expectedSplitStats, lokiResponse.(*LokiPromResponse).Statistics.Summary.Splits)
require.Equal(t, tc.expectedShardStats, lokiResponse.(*LokiPromResponse).Statistics.Summary.Shards)
})
}
}
type fakeLimits struct {
maxQueryLength time.Duration
maxQueryParallelism int

@ -13,6 +13,7 @@ import (
"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logqlmodel/stats"
"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase"
util_log "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/util/marshal"
@ -57,7 +58,8 @@ func (s *splitByRange) Do(ctx context.Context, request queryrangebase.Request) (
return s.next.Do(ctx, request)
}
mapper, err := logql.NewRangeMapper(interval, s.metrics)
mapperStats := logql.NewMapperStats()
mapper, err := logql.NewRangeMapper(interval, s.metrics, mapperStats)
if err != nil {
return nil, err
}
@ -74,6 +76,10 @@ func (s *splitByRange) Do(ctx context.Context, request queryrangebase.Request) (
return s.next.Do(ctx, request)
}
// Update middleware stats
queryStatsCtx := stats.FromContext(ctx)
queryStatsCtx.AddSplitQueries(int64(mapperStats.GetSplitQueries()))
params, err := paramsFromRequest(request)
if err != nil {
return nil, err
@ -109,7 +115,6 @@ func (s *splitByRange) Do(ctx context.Context, request queryrangebase.Request) (
}, nil
case parser.ValueTypeVector:
return &LokiPromResponse{
Statistics: res.Statistics,
Response: &queryrangebase.PrometheusResponse{
Status: loghttp.QueryStatusSuccess,
Data: queryrangebase.PrometheusData{
@ -117,6 +122,7 @@ func (s *splitByRange) Do(ctx context.Context, request queryrangebase.Request) (
Result: toProtoVector(value.(loghttp.Vector)),
},
},
Statistics: res.Statistics,
}, nil
default:
return nil, fmt.Errorf("unexpected downstream response type (%T)", res.Data.Type())

@ -111,7 +111,7 @@ func StatsCollectorMiddleware() queryrangebase.Middleware {
start := time.Now()
// start a new statistics context to be used by middleware, which we will merge with the response's statistics
st, statsCtx := stats.NewContext(ctx)
middlewareStats, statsCtx := stats.NewContext(ctx)
// execute the request
resp, err := next.Do(statsCtx, req)
@ -120,7 +120,7 @@ func StatsCollectorMiddleware() queryrangebase.Middleware {
}
// collect stats and status
var statistics *stats.Result
var responseStats *stats.Result
var res promql_parser.Value
var queryType string
var totalEntries int
@ -128,21 +128,21 @@ func StatsCollectorMiddleware() queryrangebase.Middleware {
if resp != nil {
switch r := resp.(type) {
case *LokiResponse:
statistics = &r.Statistics
responseStats = &r.Statistics
totalEntries = int(logqlmodel.Streams(r.Data.Result).Lines())
queryType = queryTypeLog
case *LokiPromResponse:
statistics = &r.Statistics
responseStats = &r.Statistics
if r.Response != nil {
totalEntries = len(r.Response.Data.Result)
}
queryType = queryTypeMetric
case *LokiSeriesResponse:
statistics = &r.Statistics
responseStats = &r.Statistics
totalEntries = len(r.Data)
queryType = queryTypeSeries
case *LokiLabelNamesResponse:
statistics = &r.Statistics
responseStats = &r.Statistics
totalEntries = len(r.Data)
queryType = queryTypeLabel
default:
@ -150,19 +150,19 @@ func StatsCollectorMiddleware() queryrangebase.Middleware {
}
}
if statistics != nil {
if responseStats != nil {
// merge the response's statistics with the stats collected by the middleware
statistics.Merge(st.Result(time.Since(start), 0, totalEntries))
responseStats.Merge(middlewareStats.Result(time.Since(start), 0, totalEntries))
// Re-calculate the summary: the queueTime result is already merged so should not be updated
// Log and record metrics for the current query
statistics.ComputeSummary(time.Since(start), 0, totalEntries)
statistics.Log(level.Debug(logger))
responseStats.ComputeSummary(time.Since(start), 0, totalEntries)
responseStats.Log(level.Debug(logger))
}
ctxValue := ctx.Value(ctxKey)
if data, ok := ctxValue.(*queryData); ok {
data.recorded = true
data.statistics = statistics
data.statistics = responseStats
data.result = res
data.queryType = queryType
p, errReq := paramsFromRequest(req)

Loading…
Cancel
Save