Add support for `X-Query-Tags` (#4814)

* Add support for `X-Query-Tags`

1. Now client can send any metadata about query via `X-Query-Tags` HTTP header
2. Also record `X-Query-Tags` in `metrics.go` for instant and range queries.

* PR remarks

* Make linter happy

* Fix middleware next

* Make safe parsing of the header

* Add bit more test cases

* Minor suggestions

* Add middleware chain to querier handlers

* Repopulate the HTTP headers before while Encoding
pull/4842/head
Kaviraj 4 years ago committed by GitHub
parent ece671c146
commit 1b63331e3d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 48
      pkg/logql/metrics.go
  2. 51
      pkg/logql/metrics_test.go
  3. 2
      pkg/loki/modules.go
  4. 21
      pkg/querier/queryrange/codec.go
  5. 1
      pkg/querier/worker_service.go
  6. 28
      pkg/util/server/middleware.go
  7. 42
      pkg/util/server/middleware_test.go

@ -14,6 +14,7 @@ import (
"github.com/grafana/loki/pkg/logqlmodel" "github.com/grafana/loki/pkg/logqlmodel"
"github.com/grafana/loki/pkg/logqlmodel/stats" "github.com/grafana/loki/pkg/logqlmodel/stats"
serverutil "github.com/grafana/loki/pkg/util/server"
) )
const ( const (
@ -88,20 +89,30 @@ func RecordMetrics(ctx context.Context, p Params, status string, stats stats.Res
returnedLines = int(result.(logqlmodel.Streams).Lines()) returnedLines = int(result.(logqlmodel.Streams).Lines())
} }
// we also log queries, useful for troubleshooting slow queries. queryTags, _ := ctx.Value(serverutil.QueryTagsHTTPHeader).(string) // it's ok to be empty.
level.Info(logger).Log(
logValues := make([]interface{}, 0, 20)
logValues = append(logValues, []interface{}{
"latency", latencyType, // this can be used to filter log lines. "latency", latencyType, // this can be used to filter log lines.
"query", p.Query(), "query", p.Query(),
"query_type", queryType, "query_type", queryType,
"range_type", rt, "range_type", rt,
"length", p.End().Sub(p.Start()), "length", p.End().Sub(p.Start()),
"step", p.Step(), "step", p.Step(),
"duration", time.Duration(int64(stats.Summary.ExecTime*float64(time.Second))), "duration", time.Duration(int64(stats.Summary.ExecTime * float64(time.Second))),
"status", status, "status", status,
"limit", p.Limit(), "limit", p.Limit(),
"returned_lines", returnedLines, "returned_lines", returnedLines,
"throughput", strings.Replace(humanize.Bytes(uint64(stats.Summary.BytesProcessedPerSecond)), " ", "", 1), "throughput", strings.Replace(humanize.Bytes(uint64(stats.Summary.BytesProcessedPerSecond)), " ", "", 1),
"total_bytes", strings.Replace(humanize.Bytes(uint64(stats.Summary.TotalBytesProcessed)), " ", "", 1), "total_bytes", strings.Replace(humanize.Bytes(uint64(stats.Summary.TotalBytesProcessed)), " ", "", 1),
}...)
logValues = append(logValues, tagsToKeyValues(queryTags)...)
// we also log queries, useful for troubleshooting slow queries.
level.Info(logger).Log(
logValues...,
) )
bytesPerSecond.WithLabelValues(status, queryType, rt, latencyType). bytesPerSecond.WithLabelValues(status, queryType, rt, latencyType).
@ -133,3 +144,34 @@ func QueryType(query string) (string, error) {
return "", nil return "", nil
} }
} }
// tagsToKeyValues converts QueryTags to form that is easy to log.
// e.g: `Source=foo,Feature=beta` -> []interface{}{"source", "foo", "feature", "beta"}
// so that we could log nicely!
// If queryTags is not in canonical form then its completely ignored (e.g: `key1=value1,key2=value`)
func tagsToKeyValues(queryTags string) []interface{} {
toks := strings.FieldsFunc(queryTags, func(r rune) bool {
return r == ','
})
vals := make([]string, 0)
for _, tok := range toks {
val := strings.FieldsFunc(tok, func(r rune) bool {
return r == '='
})
if len(val) != 2 {
continue
}
vals = append(vals, val...)
}
res := make([]interface{}, 0, len(vals))
for _, val := range vals {
res = append(res, strings.ToLower(val))
}
return res
}

@ -10,6 +10,7 @@ import (
util_log "github.com/cortexproject/cortex/pkg/util/log" util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/go-kit/log" "github.com/go-kit/log"
"github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/uber/jaeger-client-go" "github.com/uber/jaeger-client-go"
"github.com/weaveworks/common/user" "github.com/weaveworks/common/user"
@ -17,6 +18,7 @@ import (
"github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logqlmodel" "github.com/grafana/loki/pkg/logqlmodel"
"github.com/grafana/loki/pkg/logqlmodel/stats" "github.com/grafana/loki/pkg/logqlmodel/stats"
serverutil "github.com/grafana/loki/pkg/util/server"
) )
func TestQueryType(t *testing.T) { func TestQueryType(t *testing.T) {
@ -61,6 +63,9 @@ func TestLogSlowQuery(t *testing.T) {
sp := opentracing.StartSpan("") sp := opentracing.StartSpan("")
ctx := opentracing.ContextWithSpan(user.InjectOrgID(context.Background(), "foo"), sp) ctx := opentracing.ContextWithSpan(user.InjectOrgID(context.Background(), "foo"), sp)
now := time.Now() now := time.Now()
ctx = context.WithValue(ctx, serverutil.QueryTagsHTTPHeader, "Source=logvolhist,Feature=Beta")
RecordMetrics(ctx, LiteralParams{ RecordMetrics(ctx, LiteralParams{
qs: `{foo="bar"} |= "buzz"`, qs: `{foo="bar"} |= "buzz"`,
direction: logproto.BACKWARD, direction: logproto.BACKWARD,
@ -77,9 +82,53 @@ func TestLogSlowQuery(t *testing.T) {
}, logqlmodel.Streams{logproto.Stream{Entries: make([]logproto.Entry, 10)}}) }, logqlmodel.Streams{logproto.Stream{Entries: make([]logproto.Entry, 10)}})
require.Equal(t, require.Equal(t,
fmt.Sprintf( fmt.Sprintf(
"level=info org_id=foo traceID=%s latency=slow query=\"{foo=\\\"bar\\\"} |= \\\"buzz\\\"\" query_type=filter range_type=range length=1h0m0s step=1m0s duration=25.25s status=200 limit=1000 returned_lines=10 throughput=100kB total_bytes=100kB\n", "level=info org_id=foo traceID=%s latency=slow query=\"{foo=\\\"bar\\\"} |= \\\"buzz\\\"\" query_type=filter range_type=range length=1h0m0s step=1m0s duration=25.25s status=200 limit=1000 returned_lines=10 throughput=100kB total_bytes=100kB source=logvolhist feature=beta\n",
sp.Context().(jaeger.SpanContext).SpanID().String(), sp.Context().(jaeger.SpanContext).SpanID().String(),
), ),
buf.String()) buf.String())
util_log.Logger = log.NewNopLogger() util_log.Logger = log.NewNopLogger()
} }
func Test_testToKeyValues(t *testing.T) {
cases := []struct {
name string
in string
exp []interface{}
}{
{
name: "canonical-form",
in: "Source=logvolhist",
exp: []interface{}{
"source",
"logvolhist",
},
},
{
name: "canonical-form-multiple-values",
in: "Source=logvolhist,Feature=beta",
exp: []interface{}{
"source",
"logvolhist",
"feature",
"beta",
},
},
{
name: "empty",
in: "",
exp: []interface{}{},
},
{
name: "non-canonical form",
in: "abc",
exp: []interface{}{},
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
got := tagsToKeyValues(c.in)
assert.Equal(t, c.exp, got)
})
}
}

@ -489,6 +489,7 @@ func (t *Loki) initQueryFrontend() (_ services.Service, err error) {
} }
frontendHandler = middleware.Merge( frontendHandler = middleware.Merge(
serverutil.ExtractQueryTagsMiddleware(),
serverutil.RecoveryHTTPMiddleware, serverutil.RecoveryHTTPMiddleware,
t.HTTPAuthMiddleware, t.HTTPAuthMiddleware,
queryrange.StatsHTTPMiddleware, queryrange.StatsHTTPMiddleware,
@ -500,6 +501,7 @@ func (t *Loki) initQueryFrontend() (_ services.Service, err error) {
// If this process also acts as a Querier we don't do any proxying of tail requests // If this process also acts as a Querier we don't do any proxying of tail requests
if t.Cfg.Frontend.TailProxyURL != "" && !t.isModuleActive(Querier) { if t.Cfg.Frontend.TailProxyURL != "" && !t.isModuleActive(Querier) {
httpMiddleware := middleware.Merge( httpMiddleware := middleware.Merge(
serverutil.ExtractQueryTagsMiddleware(),
t.HTTPAuthMiddleware, t.HTTPAuthMiddleware,
queryrange.StatsHTTPMiddleware, queryrange.StatsHTTPMiddleware,
) )

@ -28,6 +28,7 @@ import (
"github.com/grafana/loki/pkg/logqlmodel/stats" "github.com/grafana/loki/pkg/logqlmodel/stats"
"github.com/grafana/loki/pkg/util/marshal" "github.com/grafana/loki/pkg/util/marshal"
marshal_legacy "github.com/grafana/loki/pkg/util/marshal/legacy" marshal_legacy "github.com/grafana/loki/pkg/util/marshal/legacy"
serverutil "github.com/grafana/loki/pkg/util/server"
) )
var LokiCodec = &Codec{} var LokiCodec = &Codec{}
@ -256,6 +257,13 @@ func (Codec) DecodeRequest(_ context.Context, r *http.Request) (queryrange.Reque
} }
func (Codec) EncodeRequest(ctx context.Context, r queryrange.Request) (*http.Request, error) { func (Codec) EncodeRequest(ctx context.Context, r queryrange.Request) (*http.Request, error) {
header := make(http.Header)
queryTags := getQueryTags(ctx)
if queryTags != "" {
header.Set(string(serverutil.QueryTagsHTTPHeader), queryTags)
}
switch request := r.(type) { switch request := r.(type) {
case *LokiRequest: case *LokiRequest:
params := url.Values{ params := url.Values{
@ -281,7 +289,7 @@ func (Codec) EncodeRequest(ctx context.Context, r queryrange.Request) (*http.Req
RequestURI: u.String(), // This is what the httpgrpc code looks at. RequestURI: u.String(), // This is what the httpgrpc code looks at.
URL: u, URL: u,
Body: http.NoBody, Body: http.NoBody,
Header: http.Header{}, Header: header,
} }
return req.WithContext(ctx), nil return req.WithContext(ctx), nil
@ -303,7 +311,7 @@ func (Codec) EncodeRequest(ctx context.Context, r queryrange.Request) (*http.Req
RequestURI: u.String(), // This is what the httpgrpc code looks at. RequestURI: u.String(), // This is what the httpgrpc code looks at.
URL: u, URL: u,
Body: http.NoBody, Body: http.NoBody,
Header: http.Header{}, Header: header,
} }
return req.WithContext(ctx), nil return req.WithContext(ctx), nil
case *LokiLabelNamesRequest: case *LokiLabelNamesRequest:
@ -321,7 +329,7 @@ func (Codec) EncodeRequest(ctx context.Context, r queryrange.Request) (*http.Req
RequestURI: u.String(), // This is what the httpgrpc code looks at. RequestURI: u.String(), // This is what the httpgrpc code looks at.
URL: u, URL: u,
Body: http.NoBody, Body: http.NoBody,
Header: http.Header{}, Header: header,
} }
return req.WithContext(ctx), nil return req.WithContext(ctx), nil
case *LokiInstantRequest: case *LokiInstantRequest:
@ -344,7 +352,7 @@ func (Codec) EncodeRequest(ctx context.Context, r queryrange.Request) (*http.Req
RequestURI: u.String(), // This is what the httpgrpc code looks at. RequestURI: u.String(), // This is what the httpgrpc code looks at.
URL: u, URL: u,
Body: http.NoBody, Body: http.NoBody,
Header: http.Header{}, Header: header,
} }
return req.WithContext(ctx), nil return req.WithContext(ctx), nil
@ -856,3 +864,8 @@ func httpResponseHeadersToPromResponseHeaders(httpHeaders http.Header) []queryra
return promHeaders return promHeaders
} }
func getQueryTags(ctx context.Context) string {
v, _ := ctx.Value(serverutil.QueryTagsHTTPHeader).(string) // it's ok to be empty
return v
}

@ -54,6 +54,7 @@ func InitWorkerService(
// Create a couple Middlewares used to handle panics, perform auth, parse forms in http request, and set content type in response // Create a couple Middlewares used to handle panics, perform auth, parse forms in http request, and set content type in response
handlerMiddleware := middleware.Merge( handlerMiddleware := middleware.Merge(
serverutil.ExtractQueryTagsMiddleware(),
serverutil.RecoveryHTTPMiddleware, serverutil.RecoveryHTTPMiddleware,
authMiddleware, authMiddleware,
serverutil.NewPrepopulateMiddleware(), serverutil.NewPrepopulateMiddleware(),

@ -1,12 +1,24 @@
package server package server
import ( import (
"context"
"net/http" "net/http"
"regexp"
"github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/middleware" "github.com/weaveworks/common/middleware"
) )
// NOTE(kavi): Why new type?
// Our linter won't allow to use basic types like string to be used as key in context.
type ctxKey string
var (
QueryTagsHTTPHeader ctxKey = "X-Query-Tags"
safeQueryTags = regexp.MustCompile("[^a-zA-Z0-9-=, ]+") // only alpha-numeric, ' ', ',', '=' and `-`
)
// NewPrepopulateMiddleware creates a middleware which will parse incoming http forms. // NewPrepopulateMiddleware creates a middleware which will parse incoming http forms.
// This is important because some endpoints can POST x-www-form-urlencoded bodies instead of GET w/ query strings. // This is important because some endpoints can POST x-www-form-urlencoded bodies instead of GET w/ query strings.
func NewPrepopulateMiddleware() middleware.Interface { func NewPrepopulateMiddleware() middleware.Interface {
@ -31,3 +43,19 @@ func ResponseJSONMiddleware() middleware.Interface {
}) })
}) })
} }
func ExtractQueryTagsMiddleware() middleware.Interface {
return middleware.Func(func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
ctx := req.Context()
tags := req.Header.Get(string(QueryTagsHTTPHeader))
tags = safeQueryTags.ReplaceAllString(tags, "")
if tags != "" {
ctx = context.WithValue(ctx, QueryTagsHTTPHeader, tags)
req = req.WithContext(ctx)
}
next.ServeHTTP(w, req)
})
})
}

@ -8,9 +8,51 @@ import (
"net/url" "net/url"
"testing" "testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func TestQueryTags(t *testing.T) {
for _, tc := range []struct {
desc string
in string
exp string
error bool
}{
{
desc: "single-value",
in: `Source=logvolhist`,
exp: `Source=logvolhist`,
},
{
desc: "multiple-values",
in: `Source=logvolhist,Statate=beta`,
exp: `Source=logvolhist,Statate=beta`,
},
{
desc: "remove-invalid-chars",
in: `Source=log+volhi\\st,Statate=be$ta`,
exp: `Source=logvolhist,Statate=beta`,
},
} {
t.Run(tc.desc, func(t *testing.T) {
req := httptest.NewRequest("GET", "http://testing.com", nil)
req.Header.Set(string(QueryTagsHTTPHeader), tc.in)
w := httptest.NewRecorder()
checked := false
mware := ExtractQueryTagsMiddleware().Wrap(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
require.Equal(t, tc.exp, req.Context().Value(QueryTagsHTTPHeader).(string))
checked = true
}))
mware.ServeHTTP(w, req)
assert.True(t, true, checked)
})
}
}
func TestPrepopulate(t *testing.T) { func TestPrepopulate(t *testing.T) {
success := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { success := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
_, err := w.Write([]byte("ok")) _, err := w.Write([]byte("ok"))

Loading…
Cancel
Save