From 1b63331e3dcb7f6f2b65c4281710458239910021 Mon Sep 17 00:00:00 2001 From: Kaviraj Date: Fri, 26 Nov 2021 11:21:41 +0100 Subject: [PATCH] Add support for `X-Query-Tags` (#4814) * Add support for `X-Query-Tags` 1. Now client can send any metadata about query via `X-Query-Tags` HTTP header 2. Also record `X-Query-Tags` in `metrics.go` for instant and range queries. * PR remarks * Make linter happy * Fix middleware next * Make safe parsing of the header * Add bit more test cases * Minor suggestions * Add middleware chain to querier handlers * Repopulate the HTTP headers before while Encoding --- pkg/logql/metrics.go | 48 ++++++++++++++++++++++++++-- pkg/logql/metrics_test.go | 51 +++++++++++++++++++++++++++++- pkg/loki/modules.go | 2 ++ pkg/querier/queryrange/codec.go | 21 +++++++++--- pkg/querier/worker_service.go | 1 + pkg/util/server/middleware.go | 28 ++++++++++++++++ pkg/util/server/middleware_test.go | 42 ++++++++++++++++++++++++ 7 files changed, 185 insertions(+), 8 deletions(-) diff --git a/pkg/logql/metrics.go b/pkg/logql/metrics.go index 9163e2f0ad..5a1b0e1bc8 100644 --- a/pkg/logql/metrics.go +++ b/pkg/logql/metrics.go @@ -14,6 +14,7 @@ import ( "github.com/grafana/loki/pkg/logqlmodel" "github.com/grafana/loki/pkg/logqlmodel/stats" + serverutil "github.com/grafana/loki/pkg/util/server" ) const ( @@ -88,20 +89,30 @@ func RecordMetrics(ctx context.Context, p Params, status string, stats stats.Res returnedLines = int(result.(logqlmodel.Streams).Lines()) } - // we also log queries, useful for troubleshooting slow queries. - level.Info(logger).Log( + queryTags, _ := ctx.Value(serverutil.QueryTagsHTTPHeader).(string) // it's ok to be empty. + + logValues := make([]interface{}, 0, 20) + + logValues = append(logValues, []interface{}{ "latency", latencyType, // this can be used to filter log lines. "query", p.Query(), "query_type", queryType, "range_type", rt, "length", p.End().Sub(p.Start()), "step", p.Step(), - "duration", time.Duration(int64(stats.Summary.ExecTime*float64(time.Second))), + "duration", time.Duration(int64(stats.Summary.ExecTime * float64(time.Second))), "status", status, "limit", p.Limit(), "returned_lines", returnedLines, "throughput", strings.Replace(humanize.Bytes(uint64(stats.Summary.BytesProcessedPerSecond)), " ", "", 1), "total_bytes", strings.Replace(humanize.Bytes(uint64(stats.Summary.TotalBytesProcessed)), " ", "", 1), + }...) + + logValues = append(logValues, tagsToKeyValues(queryTags)...) + + // we also log queries, useful for troubleshooting slow queries. + level.Info(logger).Log( + logValues..., ) bytesPerSecond.WithLabelValues(status, queryType, rt, latencyType). @@ -133,3 +144,34 @@ func QueryType(query string) (string, error) { return "", nil } } + +// tagsToKeyValues converts QueryTags to form that is easy to log. +// e.g: `Source=foo,Feature=beta` -> []interface{}{"source", "foo", "feature", "beta"} +// so that we could log nicely! +// If queryTags is not in canonical form then its completely ignored (e.g: `key1=value1,key2=value`) +func tagsToKeyValues(queryTags string) []interface{} { + toks := strings.FieldsFunc(queryTags, func(r rune) bool { + return r == ',' + }) + + vals := make([]string, 0) + + for _, tok := range toks { + val := strings.FieldsFunc(tok, func(r rune) bool { + return r == '=' + }) + + if len(val) != 2 { + continue + } + vals = append(vals, val...) + } + + res := make([]interface{}, 0, len(vals)) + + for _, val := range vals { + res = append(res, strings.ToLower(val)) + } + + return res +} diff --git a/pkg/logql/metrics_test.go b/pkg/logql/metrics_test.go index e44ccbe9f5..7fccd87b24 100644 --- a/pkg/logql/metrics_test.go +++ b/pkg/logql/metrics_test.go @@ -10,6 +10,7 @@ import ( util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/go-kit/log" "github.com/opentracing/opentracing-go" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/uber/jaeger-client-go" "github.com/weaveworks/common/user" @@ -17,6 +18,7 @@ import ( "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logqlmodel" "github.com/grafana/loki/pkg/logqlmodel/stats" + serverutil "github.com/grafana/loki/pkg/util/server" ) func TestQueryType(t *testing.T) { @@ -61,6 +63,9 @@ func TestLogSlowQuery(t *testing.T) { sp := opentracing.StartSpan("") ctx := opentracing.ContextWithSpan(user.InjectOrgID(context.Background(), "foo"), sp) now := time.Now() + + ctx = context.WithValue(ctx, serverutil.QueryTagsHTTPHeader, "Source=logvolhist,Feature=Beta") + RecordMetrics(ctx, LiteralParams{ qs: `{foo="bar"} |= "buzz"`, direction: logproto.BACKWARD, @@ -77,9 +82,53 @@ func TestLogSlowQuery(t *testing.T) { }, logqlmodel.Streams{logproto.Stream{Entries: make([]logproto.Entry, 10)}}) require.Equal(t, fmt.Sprintf( - "level=info org_id=foo traceID=%s latency=slow query=\"{foo=\\\"bar\\\"} |= \\\"buzz\\\"\" query_type=filter range_type=range length=1h0m0s step=1m0s duration=25.25s status=200 limit=1000 returned_lines=10 throughput=100kB total_bytes=100kB\n", + "level=info org_id=foo traceID=%s latency=slow query=\"{foo=\\\"bar\\\"} |= \\\"buzz\\\"\" query_type=filter range_type=range length=1h0m0s step=1m0s duration=25.25s status=200 limit=1000 returned_lines=10 throughput=100kB total_bytes=100kB source=logvolhist feature=beta\n", sp.Context().(jaeger.SpanContext).SpanID().String(), ), buf.String()) util_log.Logger = log.NewNopLogger() } + +func Test_testToKeyValues(t *testing.T) { + cases := []struct { + name string + in string + exp []interface{} + }{ + { + name: "canonical-form", + in: "Source=logvolhist", + exp: []interface{}{ + "source", + "logvolhist", + }, + }, + { + name: "canonical-form-multiple-values", + in: "Source=logvolhist,Feature=beta", + exp: []interface{}{ + "source", + "logvolhist", + "feature", + "beta", + }, + }, + { + name: "empty", + in: "", + exp: []interface{}{}, + }, + { + name: "non-canonical form", + in: "abc", + exp: []interface{}{}, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + got := tagsToKeyValues(c.in) + assert.Equal(t, c.exp, got) + }) + } +} diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index c11b301c98..f3f8a1488a 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -489,6 +489,7 @@ func (t *Loki) initQueryFrontend() (_ services.Service, err error) { } frontendHandler = middleware.Merge( + serverutil.ExtractQueryTagsMiddleware(), serverutil.RecoveryHTTPMiddleware, t.HTTPAuthMiddleware, queryrange.StatsHTTPMiddleware, @@ -500,6 +501,7 @@ func (t *Loki) initQueryFrontend() (_ services.Service, err error) { // If this process also acts as a Querier we don't do any proxying of tail requests if t.Cfg.Frontend.TailProxyURL != "" && !t.isModuleActive(Querier) { httpMiddleware := middleware.Merge( + serverutil.ExtractQueryTagsMiddleware(), t.HTTPAuthMiddleware, queryrange.StatsHTTPMiddleware, ) diff --git a/pkg/querier/queryrange/codec.go b/pkg/querier/queryrange/codec.go index 9485b3ce1f..afd2584d44 100644 --- a/pkg/querier/queryrange/codec.go +++ b/pkg/querier/queryrange/codec.go @@ -28,6 +28,7 @@ import ( "github.com/grafana/loki/pkg/logqlmodel/stats" "github.com/grafana/loki/pkg/util/marshal" marshal_legacy "github.com/grafana/loki/pkg/util/marshal/legacy" + serverutil "github.com/grafana/loki/pkg/util/server" ) var LokiCodec = &Codec{} @@ -256,6 +257,13 @@ func (Codec) DecodeRequest(_ context.Context, r *http.Request) (queryrange.Reque } func (Codec) EncodeRequest(ctx context.Context, r queryrange.Request) (*http.Request, error) { + + header := make(http.Header) + queryTags := getQueryTags(ctx) + if queryTags != "" { + header.Set(string(serverutil.QueryTagsHTTPHeader), queryTags) + } + switch request := r.(type) { case *LokiRequest: params := url.Values{ @@ -281,7 +289,7 @@ func (Codec) EncodeRequest(ctx context.Context, r queryrange.Request) (*http.Req RequestURI: u.String(), // This is what the httpgrpc code looks at. URL: u, Body: http.NoBody, - Header: http.Header{}, + Header: header, } return req.WithContext(ctx), nil @@ -303,7 +311,7 @@ func (Codec) EncodeRequest(ctx context.Context, r queryrange.Request) (*http.Req RequestURI: u.String(), // This is what the httpgrpc code looks at. URL: u, Body: http.NoBody, - Header: http.Header{}, + Header: header, } return req.WithContext(ctx), nil case *LokiLabelNamesRequest: @@ -321,7 +329,7 @@ func (Codec) EncodeRequest(ctx context.Context, r queryrange.Request) (*http.Req RequestURI: u.String(), // This is what the httpgrpc code looks at. URL: u, Body: http.NoBody, - Header: http.Header{}, + Header: header, } return req.WithContext(ctx), nil case *LokiInstantRequest: @@ -344,7 +352,7 @@ func (Codec) EncodeRequest(ctx context.Context, r queryrange.Request) (*http.Req RequestURI: u.String(), // This is what the httpgrpc code looks at. URL: u, Body: http.NoBody, - Header: http.Header{}, + Header: header, } return req.WithContext(ctx), nil @@ -856,3 +864,8 @@ func httpResponseHeadersToPromResponseHeaders(httpHeaders http.Header) []queryra return promHeaders } + +func getQueryTags(ctx context.Context) string { + v, _ := ctx.Value(serverutil.QueryTagsHTTPHeader).(string) // it's ok to be empty + return v +} diff --git a/pkg/querier/worker_service.go b/pkg/querier/worker_service.go index 02154551c8..ead499e875 100644 --- a/pkg/querier/worker_service.go +++ b/pkg/querier/worker_service.go @@ -54,6 +54,7 @@ func InitWorkerService( // Create a couple Middlewares used to handle panics, perform auth, parse forms in http request, and set content type in response handlerMiddleware := middleware.Merge( + serverutil.ExtractQueryTagsMiddleware(), serverutil.RecoveryHTTPMiddleware, authMiddleware, serverutil.NewPrepopulateMiddleware(), diff --git a/pkg/util/server/middleware.go b/pkg/util/server/middleware.go index 6131e11aaa..6a9791aae4 100644 --- a/pkg/util/server/middleware.go +++ b/pkg/util/server/middleware.go @@ -1,12 +1,24 @@ package server import ( + "context" "net/http" + "regexp" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/middleware" ) +// NOTE(kavi): Why new type? +// Our linter won't allow to use basic types like string to be used as key in context. +type ctxKey string + +var ( + QueryTagsHTTPHeader ctxKey = "X-Query-Tags" + safeQueryTags = regexp.MustCompile("[^a-zA-Z0-9-=, ]+") // only alpha-numeric, ' ', ',', '=' and `-` + +) + // NewPrepopulateMiddleware creates a middleware which will parse incoming http forms. // This is important because some endpoints can POST x-www-form-urlencoded bodies instead of GET w/ query strings. func NewPrepopulateMiddleware() middleware.Interface { @@ -31,3 +43,19 @@ func ResponseJSONMiddleware() middleware.Interface { }) }) } + +func ExtractQueryTagsMiddleware() middleware.Interface { + return middleware.Func(func(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + ctx := req.Context() + tags := req.Header.Get(string(QueryTagsHTTPHeader)) + tags = safeQueryTags.ReplaceAllString(tags, "") + + if tags != "" { + ctx = context.WithValue(ctx, QueryTagsHTTPHeader, tags) + req = req.WithContext(ctx) + } + next.ServeHTTP(w, req) + }) + }) +} diff --git a/pkg/util/server/middleware_test.go b/pkg/util/server/middleware_test.go index b2267c9199..31af2d552a 100644 --- a/pkg/util/server/middleware_test.go +++ b/pkg/util/server/middleware_test.go @@ -8,9 +8,51 @@ import ( "net/url" "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) +func TestQueryTags(t *testing.T) { + for _, tc := range []struct { + desc string + in string + exp string + error bool + }{ + { + desc: "single-value", + in: `Source=logvolhist`, + exp: `Source=logvolhist`, + }, + { + desc: "multiple-values", + in: `Source=logvolhist,Statate=beta`, + exp: `Source=logvolhist,Statate=beta`, + }, + { + desc: "remove-invalid-chars", + in: `Source=log+volhi\\st,Statate=be$ta`, + exp: `Source=logvolhist,Statate=beta`, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + req := httptest.NewRequest("GET", "http://testing.com", nil) + req.Header.Set(string(QueryTagsHTTPHeader), tc.in) + + w := httptest.NewRecorder() + checked := false + mware := ExtractQueryTagsMiddleware().Wrap(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + require.Equal(t, tc.exp, req.Context().Value(QueryTagsHTTPHeader).(string)) + checked = true + })) + + mware.ServeHTTP(w, req) + + assert.True(t, true, checked) + }) + } +} + func TestPrepopulate(t *testing.T) { success := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { _, err := w.Write([]byte("ok"))