Add limit and line_returned in the query log. (#3423)

* Add limit and line_returned in the query log.

This can be useful to investigate how many wasted queries have been made by the frontend.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* got linted.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
pull/3442/head
Cyril Tovena 5 years ago committed by GitHub
parent ce4f78515f
commit a29901d33a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 11
      pkg/logql/engine.go
  2. 18
      pkg/logql/metrics.go
  3. 4
      pkg/logql/metrics_test.go
  4. 31
      pkg/querier/queryrange/stats.go
  5. 56
      pkg/querier/queryrange/stats_test.go

@ -53,6 +53,14 @@ func (Streams) String() string {
return ""
}
func (streams Streams) lines() int64 {
var res int64
for _, s := range streams {
res += int64(len(s.Entries))
}
return res
}
// Result is the result of a query execution.
type Result struct {
Data promql_parser.Value
@ -159,7 +167,7 @@ func (q *query) Exec(ctx context.Context) (Result, error) {
}
if q.record {
RecordMetrics(ctx, q.params, status, statResult)
RecordMetrics(ctx, q.params, status, statResult, data)
}
return Result{
@ -293,7 +301,6 @@ func (q *query) evalLiteral(_ context.Context, expr *literalExpr) (promql_parser
}
return PopulateMatrixFromScalar(s, q.params), nil
}
func PopulateMatrixFromScalar(data promql.Scalar, params Params) promql.Matrix {

@ -10,6 +10,7 @@ import (
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
promql_parser "github.com/prometheus/prometheus/promql/parser"
"github.com/grafana/loki/pkg/logql/stats"
)
@ -64,21 +65,28 @@ var (
})
)
func RecordMetrics(ctx context.Context, p Params, status string, stats stats.Result) {
logger := util_log.WithContext(ctx, util_log.Logger)
func RecordMetrics(ctx context.Context, p Params, status string, stats stats.Result, result promql_parser.Value) {
var (
logger = util_log.WithContext(ctx, util_log.Logger)
rt = string(GetRangeType(p))
latencyType = latencyTypeFast
returnedLines = 0
)
queryType, err := QueryType(p.Query())
if err != nil {
level.Warn(logger).Log("msg", "error parsing query type", "err", err)
}
rt := string(GetRangeType(p))
// Tag throughput metric by latency type based on a threshold.
// Latency below the threshold is fast, above is slow.
latencyType := latencyTypeFast
if stats.Summary.ExecTime > slowQueryThresholdSecond {
latencyType = latencyTypeSlow
}
if result != nil && result.Type() == ValueTypeStreams {
returnedLines = int(result.(Streams).lines())
}
// we also log queries, useful for troubleshooting slow queries.
level.Info(logger).Log(
"latency", latencyType, // this can be used to filter log lines.
@ -89,6 +97,8 @@ func RecordMetrics(ctx context.Context, p Params, status string, stats stats.Res
"step", p.Step(),
"duration", time.Duration(int64(stats.Summary.ExecTime*float64(time.Second))),
"status", status,
"limit", p.Limit(),
"returned_lines", returnedLines,
"throughput", strings.Replace(humanize.Bytes(uint64(stats.Summary.BytesProcessedPerSecond)), " ", "", 1),
"total_bytes", strings.Replace(humanize.Bytes(uint64(stats.Summary.TotalBytesProcessed)), " ", "", 1),
)

@ -73,10 +73,10 @@ func TestLogSlowQuery(t *testing.T) {
ExecTime: 25.25,
TotalBytesProcessed: 100000,
},
})
}, Streams{logproto.Stream{Entries: make([]logproto.Entry, 10)}})
require.Equal(t,
fmt.Sprintf(
"level=info org_id=foo traceID=%s latency=slow query=\"{foo=\\\"bar\\\"} |= \\\"buzz\\\"\" query_type=filter range_type=range length=1h0m0s step=1m0s duration=25.25s status=200 throughput=100kB total_bytes=100kB\n",
"level=info org_id=foo traceID=%s latency=slow query=\"{foo=\\\"bar\\\"} |= \\\"buzz\\\"\" query_type=filter range_type=range length=1h0m0s step=1m0s duration=25.25s status=200 limit=1000 returned_lines=10 throughput=100kB total_bytes=100kB\n",
sp.Context().(jaeger.SpanContext).SpanID().String(),
),
buf.String())

@ -12,6 +12,7 @@ import (
"github.com/cortexproject/cortex/pkg/querier/queryrange"
"github.com/cortexproject/cortex/pkg/util/spanlogger"
"github.com/go-kit/kit/log/level"
promql_parser "github.com/prometheus/prometheus/promql/parser"
"github.com/weaveworks/common/middleware"
"github.com/grafana/loki/pkg/logql"
@ -23,27 +24,31 @@ type ctxKeyType string
const ctxKey ctxKeyType = "stats"
var (
defaultMetricRecorder = metricRecorderFn(func(ctx context.Context, p logql.Params, status string, stats stats.Result) {
logql.RecordMetrics(ctx, p, status, stats)
defaultMetricRecorder = metricRecorderFn(func(data *queryData) {
logql.RecordMetrics(data.ctx, data.params, data.status, *data.statistics, data.result)
})
// StatsHTTPMiddleware is an http middleware to record stats for query_range filter.
StatsHTTPMiddleware middleware.Interface = statsHTTPMiddleware(defaultMetricRecorder)
)
type metricRecorder interface {
Record(ctx context.Context, p logql.Params, status string, stats stats.Result)
Record(data *queryData)
}
type metricRecorderFn func(ctx context.Context, p logql.Params, status string, stats stats.Result)
type metricRecorderFn func(data *queryData)
func (m metricRecorderFn) Record(ctx context.Context, p logql.Params, status string, stats stats.Result) {
m(ctx, p, status, stats)
func (m metricRecorderFn) Record(data *queryData) {
m(data)
}
type queryData struct {
ctx context.Context
params logql.Params
statistics *stats.Result
recorded bool
result promql_parser.Value
status string
recorded bool
}
func statsHTTPMiddleware(recorder metricRecorder) middleware.Interface {
@ -62,12 +67,9 @@ func statsHTTPMiddleware(recorder metricRecorder) middleware.Interface {
if data.statistics == nil {
data.statistics = &stats.Result{}
}
recorder.Record(
r.Context(),
data.params,
strconv.Itoa(interceptor.statusCode),
*data.statistics,
)
data.ctx = r.Context()
data.status = strconv.Itoa(interceptor.statusCode)
recorder.Record(data)
}
})
})
@ -85,10 +87,12 @@ func StatsCollectorMiddleware() queryrange.Middleware {
// collect stats and status
var statistics *stats.Result
var res promql_parser.Value
if resp != nil {
switch r := resp.(type) {
case *LokiResponse:
statistics = &r.Statistics
res = logql.Streams(r.Data.Result)
case *LokiPromResponse:
statistics = &r.Statistics
default:
@ -105,6 +109,7 @@ func StatsCollectorMiddleware() queryrange.Middleware {
data.recorded = true
data.statistics = statistics
data.params = paramsFromRequest(req)
data.result = res
}
return resp, err
})

@ -13,7 +13,6 @@ import (
"github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/stats"
)
@ -70,7 +69,7 @@ func Test_StatsHTTP(t *testing.T) {
for _, test := range []struct {
name string
next http.Handler
expect func(t *testing.T, ctx context.Context, p logql.Params, status string, stats stats.Result)
expect func(t *testing.T, data *queryData)
}{
{
"should not record metric if nothing is recorded",
@ -78,7 +77,7 @@ func Test_StatsHTTP(t *testing.T) {
data := r.Context().Value(ctxKey).(*queryData)
data.recorded = false
}),
func(t *testing.T, ctx context.Context, p logql.Params, status string, stats stats.Result) {
func(t *testing.T, data *queryData) {
t.Fail()
},
},
@ -94,12 +93,12 @@ func Test_StatsHTTP(t *testing.T) {
})
data.statistics = nil
}),
func(t *testing.T, ctx context.Context, p logql.Params, status string, s stats.Result) {
require.Equal(t, fmt.Sprintf("%d", http.StatusOK), status)
require.Equal(t, "foo", p.Query())
require.Equal(t, logproto.BACKWARD, p.Direction())
require.Equal(t, uint32(100), p.Limit())
require.Equal(t, stats.Result{}, s)
func(t *testing.T, data *queryData) {
require.Equal(t, fmt.Sprintf("%d", http.StatusOK), data.status)
require.Equal(t, "foo", data.params.Query())
require.Equal(t, logproto.BACKWARD, data.params.Direction())
require.Equal(t, uint32(100), data.params.Limit())
require.Equal(t, stats.Result{}, *data.statistics)
},
},
{
@ -115,18 +114,41 @@ func Test_StatsHTTP(t *testing.T) {
data.statistics = &statsResult
w.WriteHeader(http.StatusTeapot)
}),
func(t *testing.T, ctx context.Context, p logql.Params, status string, s stats.Result) {
require.Equal(t, fmt.Sprintf("%d", http.StatusTeapot), status)
require.Equal(t, "foo", p.Query())
require.Equal(t, logproto.BACKWARD, p.Direction())
require.Equal(t, uint32(100), p.Limit())
require.Equal(t, statsResult, s)
func(t *testing.T, data *queryData) {
require.Equal(t, fmt.Sprintf("%d", http.StatusTeapot), data.status)
require.Equal(t, "foo", data.params.Query())
require.Equal(t, logproto.BACKWARD, data.params.Direction())
require.Equal(t, uint32(100), data.params.Limit())
require.Equal(t, statsResult, *data.statistics)
},
},
{
"result",
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
data := r.Context().Value(ctxKey).(*queryData)
data.recorded = true
data.params = paramsFromRequest(&LokiRequest{
Query: "foo",
Direction: logproto.BACKWARD,
Limit: 100,
})
data.statistics = &statsResult
data.result = streams
w.WriteHeader(http.StatusTeapot)
}),
func(t *testing.T, data *queryData) {
require.Equal(t, fmt.Sprintf("%d", http.StatusTeapot), data.status)
require.Equal(t, "foo", data.params.Query())
require.Equal(t, logproto.BACKWARD, data.params.Direction())
require.Equal(t, uint32(100), data.params.Limit())
require.Equal(t, statsResult, *data.statistics)
require.Equal(t, streams, data.result)
},
},
} {
t.Run(test.name, func(t *testing.T) {
statsHTTPMiddleware(metricRecorderFn(func(ctx context.Context, p logql.Params, status string, stats stats.Result) {
test.expect(t, ctx, p, status, stats)
statsHTTPMiddleware(metricRecorderFn(func(data *queryData) {
test.expect(t, data)
})).Wrap(test.next).ServeHTTP(httptest.NewRecorder(), httptest.NewRequest("GET", "/foo", strings.NewReader("")))
})
}

Loading…
Cancel
Save