From c1bccac1418ff8abc1b0ad8136c113a9b095cd31 Mon Sep 17 00:00:00 2001 From: Travis Patterson Date: Mon, 17 Oct 2022 16:24:23 -0600 Subject: [PATCH] Results cache fix improvements (#7444) Move middleware to someplace more sensible and incorporate feedback missed in the review for this. --- pkg/logql/downstream.go | 5 ++- pkg/logql/engine_test.go | 4 +-- pkg/logqlmodel/metadata/context.go | 21 +++++++++--- pkg/logqlmodel/metadata/context_test.go | 22 +++++++++++-- pkg/loki/modules.go | 4 ++- .../queryrange/queryrangebase/middleware.go | 30 +++++++++++++++++ .../queryrangebase/middleware_test.go | 33 +++++++++++++++++++ .../queryrangebase/results_cache.go | 3 -- pkg/util/server/middleware.go | 21 ------------ 9 files changed, 107 insertions(+), 36 deletions(-) create mode 100644 pkg/querier/queryrange/queryrangebase/middleware.go create mode 100644 pkg/querier/queryrange/queryrangebase/middleware_test.go diff --git a/pkg/logql/downstream.go b/pkg/logql/downstream.go index 34ae8af63f..ab1d7021a0 100644 --- a/pkg/logql/downstream.go +++ b/pkg/logql/downstream.go @@ -221,7 +221,10 @@ func (ev DownstreamEvaluator) Downstream(ctx context.Context, queries []Downstre } for _, res := range results { - metadata.JoinHeaders(ctx, res.Headers) + if err := metadata.JoinHeaders(ctx, res.Headers); err != nil { + level.Warn(util_log.Logger).Log("msg", "unable to add headers to results context", "error", err) + break + } } return results, nil diff --git a/pkg/logql/engine_test.go b/pkg/logql/engine_test.go index 9abfb0755a..044afb0fec 100644 --- a/pkg/logql/engine_test.go +++ b/pkg/logql/engine_test.go @@ -2199,7 +2199,7 @@ func TestEngine_Stats(t *testing.T) { type metaQuerier struct{} func (metaQuerier) SelectLogs(ctx context.Context, p SelectLogParams) (iter.EntryIterator, error) { - metadata.JoinHeaders(ctx, []*definitions.PrometheusResponseHeader{ + _ = metadata.JoinHeaders(ctx, []*definitions.PrometheusResponseHeader{ { Name: "Header", Values: []string{"value"}, @@ -2209,7 +2209,7 @@ func (metaQuerier) SelectLogs(ctx context.Context, p SelectLogParams) (iter.Entr } func (metaQuerier) SelectSamples(ctx context.Context, p SelectSampleParams) (iter.SampleIterator, error) { - metadata.JoinHeaders(ctx, []*definitions.PrometheusResponseHeader{ + _ = metadata.JoinHeaders(ctx, []*definitions.PrometheusResponseHeader{ {Name: "Header", Values: []string{"value"}}, }) return iter.NoopIterator, nil diff --git a/pkg/logqlmodel/metadata/context.go b/pkg/logqlmodel/metadata/context.go index 673ec795ec..45ba90b9b5 100644 --- a/pkg/logqlmodel/metadata/context.go +++ b/pkg/logqlmodel/metadata/context.go @@ -6,6 +6,7 @@ package metadata import ( "context" + "errors" "sort" "sync" @@ -20,9 +21,13 @@ const ( metadataKey ctxKeyType = "metadata" ) +var ( + ErrNoCtxData = errors.New("unable to add headers to context: no existing context data") +) + // Context is the metadata context. It is passed through the query path and accumulates metadata. type Context struct { - mtx sync.RWMutex + mtx sync.Mutex headers map[string][]string } @@ -48,8 +53,8 @@ func FromContext(ctx context.Context) *Context { // Headers returns the cache headers accumulated in the context so far. func (c *Context) Headers() []*definitions.PrometheusResponseHeader { - c.mtx.RLock() - defer c.mtx.RUnlock() + c.mtx.Lock() + defer c.mtx.Unlock() headers := make([]*definitions.PrometheusResponseHeader, 0, len(c.headers)) for k, vs := range c.headers { @@ -70,8 +75,12 @@ func (c *Context) Headers() []*definitions.PrometheusResponseHeader { // JoinHeaders merges a Headers with the embedded Headers in a context in a concurrency-safe manner. // JoinHeaders will consolidate all distinct headers but will override same-named headers in an // undefined way -func JoinHeaders(ctx context.Context, headers []*definitions.PrometheusResponseHeader) { - context := FromContext(ctx) +func JoinHeaders(ctx context.Context, headers []*definitions.PrometheusResponseHeader) error { + context, ok := ctx.Value(metadataKey).(*Context) + if !ok { + return ErrNoCtxData + } + context.mtx.Lock() defer context.mtx.Unlock() @@ -79,4 +88,6 @@ func JoinHeaders(ctx context.Context, headers []*definitions.PrometheusResponseH header := headers[i] context.headers[header.Name] = header.Values } + + return nil } diff --git a/pkg/logqlmodel/metadata/context_test.go b/pkg/logqlmodel/metadata/context_test.go index 6c5fda040c..459fda2afd 100644 --- a/pkg/logqlmodel/metadata/context_test.go +++ b/pkg/logqlmodel/metadata/context_test.go @@ -2,6 +2,7 @@ package metadata import ( "context" + "errors" "testing" "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase/definitions" @@ -29,9 +30,12 @@ func TestHeaders(t *testing.T) { } metadata, ctx := NewContext(context.Background()) - JoinHeaders(ctx, h1) - JoinHeaders(ctx, h2) - JoinHeaders(ctx, h3) + err := JoinHeaders(ctx, h1) + require.Nil(t, err) + err = JoinHeaders(ctx, h2) + require.Nil(t, err) + err = JoinHeaders(ctx, h3) + require.Nil(t, err) require.Equal(t, []*definitions.PrometheusResponseHeader{ {Name: "Header1", Values: []string{"value"}}, @@ -39,3 +43,15 @@ func TestHeaders(t *testing.T) { {Name: "Header3", Values: []string{"value2"}}, }, metadata.Headers()) } + +func TestHeadersNoKey(t *testing.T) { + ctx := context.Background() + err := JoinHeaders(ctx, []*definitions.PrometheusResponseHeader{ + { + Name: "Header1", + Values: []string{"value"}, + }, + }) + + require.True(t, errors.Is(err, ErrNoCtxData)) +} diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index afbe67dc19..89fc1e0f05 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -12,6 +12,8 @@ import ( "strings" "time" + "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase" + "github.com/NYTimes/gziphandler" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -388,7 +390,7 @@ func (t *Loki) initQuerier() (services.Service, error) { if t.supportIndexDeleteRequest() { toMerge = append( toMerge, - serverutil.CacheGenNumberHeaderSetterMiddleware(t.cacheGenerationLoader), + queryrangebase.CacheGenNumberHeaderSetterMiddleware(t.cacheGenerationLoader), ) } httpMiddleware := middleware.Merge(toMerge...) diff --git a/pkg/querier/queryrange/queryrangebase/middleware.go b/pkg/querier/queryrange/queryrangebase/middleware.go new file mode 100644 index 0000000000..bccb6feead --- /dev/null +++ b/pkg/querier/queryrange/queryrangebase/middleware.go @@ -0,0 +1,30 @@ +package queryrangebase + +import ( + "net/http" + + "github.com/grafana/dskit/tenant" + "github.com/weaveworks/common/middleware" +) + +const ( + // ResultsCacheGenNumberHeaderName holds name of the header we want to set in http response + ResultsCacheGenNumberHeaderName = "Results-Cache-Gen-Number" +) + +func CacheGenNumberHeaderSetterMiddleware(cacheGenNumbersLoader CacheGenNumberLoader) middleware.Interface { + return middleware.Func(func(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + userIDs, err := tenant.TenantIDs(r.Context()) + if err != nil { + http.Error(w, err.Error(), http.StatusUnauthorized) + return + } + + cacheGenNumber := cacheGenNumbersLoader.GetResultsCacheGenNumber(userIDs) + + w.Header().Set(ResultsCacheGenNumberHeaderName, cacheGenNumber) + next.ServeHTTP(w, r) + }) + }) +} diff --git a/pkg/querier/queryrange/queryrangebase/middleware_test.go b/pkg/querier/queryrange/queryrangebase/middleware_test.go new file mode 100644 index 0000000000..6b3b5e64b9 --- /dev/null +++ b/pkg/querier/queryrange/queryrangebase/middleware_test.go @@ -0,0 +1,33 @@ +package queryrangebase + +import ( + "context" + "net/http" + "net/http/httptest" + "testing" + + "github.com/weaveworks/common/user" + + "github.com/stretchr/testify/assert" +) + +func TestCacheGenNumberHeaderSetterMiddleware(t *testing.T) { + ctx := user.InjectOrgID(context.Background(), "fake") + req, _ := http.NewRequestWithContext(ctx, "GET", "http://testing.com", nil) + w := httptest.NewRecorder() + loader := &fakeGenNumberLoader{genNumber: "test-header-value"} + + mware := CacheGenNumberHeaderSetterMiddleware(loader). + Wrap(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {})) + mware.ServeHTTP(w, req) + + assert.Equal(t, w.Header().Get(ResultsCacheGenNumberHeaderName), "test-header-value") +} + +type fakeGenNumberLoader struct { + genNumber string +} + +func (l *fakeGenNumberLoader) GetResultsCacheGenNumber(tenantIDs []string) string { + return l.genNumber +} diff --git a/pkg/querier/queryrange/queryrangebase/results_cache.go b/pkg/querier/queryrange/queryrangebase/results_cache.go index 081eec52c6..3d59a5ce03 100644 --- a/pkg/querier/queryrange/queryrangebase/results_cache.go +++ b/pkg/querier/queryrange/queryrangebase/results_cache.go @@ -39,9 +39,6 @@ import ( var ( // Value that cacheControlHeader has if the response indicates that the results should not be cached. noStoreValue = "no-store" - - // ResultsCacheGenNumberHeaderName holds name of the header we want to set in http response - ResultsCacheGenNumberHeaderName = "Results-Cache-Gen-Number" ) const ( diff --git a/pkg/util/server/middleware.go b/pkg/util/server/middleware.go index def74997aa..6131e11aaa 100644 --- a/pkg/util/server/middleware.go +++ b/pkg/util/server/middleware.go @@ -3,11 +3,8 @@ package server import ( "net/http" - "github.com/grafana/dskit/tenant" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/middleware" - - "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase" ) // NewPrepopulateMiddleware creates a middleware which will parse incoming http forms. @@ -34,21 +31,3 @@ func ResponseJSONMiddleware() middleware.Interface { }) }) } - -// middleware for setting cache gen header to let consumer of response know all previous responses could be invalid due to delete operation -func CacheGenNumberHeaderSetterMiddleware(cacheGenNumbersLoader queryrangebase.CacheGenNumberLoader) middleware.Interface { - return middleware.Func(func(next http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - userIDs, err := tenant.TenantIDs(r.Context()) - if err != nil { - http.Error(w, err.Error(), http.StatusUnauthorized) - return - } - - cacheGenNumber := cacheGenNumbersLoader.GetResultsCacheGenNumber(userIDs) - - w.Header().Set(queryrangebase.ResultsCacheGenNumberHeaderName, cacheGenNumber) - next.ServeHTTP(w, r) - }) - }) -}