From ff6d6659fb33c4f52ab954b965d2b357339ee9d8 Mon Sep 17 00:00:00 2001
From: Michael Mandrus <41969079+mmandrus@users.noreply.github.com>
Date: Thu, 6 Jul 2023 10:15:43 -0400
Subject: [PATCH] Query: Fix concurrency handling for mixed datasource queries
(#70100)
* split queries and merge responses
* increase concurrency again
* update unit test to verify the headers are merged
* fix lint issue
* fix race condition in unit test
* Fix function name and add a bit more documentation about how the func should be used
* update function call after rename
* check for duplicate header vals
* make concurrent query limit configurable
* Update conf/sample.ini
Co-authored-by: Sofia Papagiannaki <1632407+papagian@users.noreply.github.com>
---------
Co-authored-by: Sofia Papagiannaki <1632407+papagian@users.noreply.github.com>
---
conf/defaults.ini | 5 ++
conf/sample.ini | 5 ++
.../setup-grafana/configure-grafana/_index.md | 10 ++++
pkg/services/contexthandler/contexthandler.go | 31 +++++++++++++
pkg/services/query/query.go | 46 +++++++++++++++----
pkg/services/query/query_test.go | 28 ++++++++++-
6 files changed, 114 insertions(+), 11 deletions(-)
diff --git a/conf/defaults.ini b/conf/defaults.ini
index 8e365834b9f..79a9ed4e8f0 100644
--- a/conf/defaults.ini
+++ b/conf/defaults.ini
@@ -1262,6 +1262,11 @@ enabled = true
# Enable the news feed section
news_feed_enabled = true
+#################################### Query #############################
+[query]
+# Set the number of data source queries that can be executed concurrently in mixed queries. Default is the number of CPUs.
+concurrent_query_limit =
+
#################################### Query History #############################
[query_history]
# Enable the Query history
diff --git a/conf/sample.ini b/conf/sample.ini
index ea22fdaa6b9..44553d40561 100644
--- a/conf/sample.ini
+++ b/conf/sample.ini
@@ -1188,6 +1188,11 @@
# Enable the news feed section
; news_feed_enabled = true
+#################################### Query #############################
+[query]
+# Set the number of data source queries that can be executed concurrently in mixed queries. Default is the number of CPUs.
+;concurrent_query_limit =
+
#################################### Query History #############################
[query_history]
# Enable the Query history
diff --git a/docs/sources/setup-grafana/configure-grafana/_index.md b/docs/sources/setup-grafana/configure-grafana/_index.md
index 38661540e1d..4af59af6325 100644
--- a/docs/sources/setup-grafana/configure-grafana/_index.md
+++ b/docs/sources/setup-grafana/configure-grafana/_index.md
@@ -1669,6 +1669,14 @@ Enable or disable the Profile section. Default is `enabled`.
Enables the news feed section. Default is `true`
+
+
+## [query]
+
+### concurrent_query_limit
+
+Set the number of queries that can be executed concurrently in a mixed data source panel. Default is the number of CPUs.
+
## [query_history]
Configures Query history in Explore.
@@ -1677,6 +1685,8 @@ Configures Query history in Explore.
Enable or disable the Query history. Default is `enabled`.
+
+
## [metrics]
For detailed instructions, refer to [Internal Grafana metrics]({{< relref "../set-up-grafana-monitoring" >}}).
diff --git a/pkg/services/contexthandler/contexthandler.go b/pkg/services/contexthandler/contexthandler.go
index 1ea23f6357c..5b9ed22e3cb 100644
--- a/pkg/services/contexthandler/contexthandler.go
+++ b/pkg/services/contexthandler/contexthandler.go
@@ -13,6 +13,7 @@ import (
"golang.org/x/sync/singleflight"
+ "github.com/grafana/grafana/pkg/api/response"
"github.com/grafana/grafana/pkg/components/apikeygen"
"github.com/grafana/grafana/pkg/components/satokengen"
"github.com/grafana/grafana/pkg/infra/db"
@@ -110,6 +111,36 @@ func FromContext(c context.Context) *contextmodel.ReqContext {
return nil
}
+// CopyWithReqContext returns a copy of the parent context with a semi-shallow copy of the ReqContext as a value.
+// The ReqContexts's *web.Context is deep copied so that headers are thread-safe; additional properties are shallow copied and should be treated as read-only.
+func CopyWithReqContext(ctx context.Context) context.Context {
+ origReqCtx := FromContext(ctx)
+ if origReqCtx == nil {
+ return ctx
+ }
+
+ webCtx := &web.Context{
+ Req: origReqCtx.Req.Clone(ctx),
+ Resp: web.NewResponseWriter(origReqCtx.Req.Method, response.CreateNormalResponse(http.Header{}, []byte{}, 0)),
+ }
+ reqCtx := &contextmodel.ReqContext{
+ Context: webCtx,
+ SignedInUser: origReqCtx.SignedInUser,
+ UserToken: origReqCtx.UserToken,
+ IsSignedIn: origReqCtx.IsSignedIn,
+ IsRenderCall: origReqCtx.IsRenderCall,
+ AllowAnonymous: origReqCtx.AllowAnonymous,
+ SkipDSCache: origReqCtx.SkipDSCache,
+ SkipQueryCache: origReqCtx.SkipQueryCache,
+ Logger: origReqCtx.Logger,
+ Error: origReqCtx.Error,
+ RequestNonce: origReqCtx.RequestNonce,
+ IsPublicDashboardView: origReqCtx.IsPublicDashboardView,
+ LookupTokenErr: origReqCtx.LookupTokenErr,
+ }
+ return context.WithValue(ctx, reqContextKey{}, reqCtx)
+}
+
// Middleware provides a middleware to initialize the request context.
func (h *ContextHandler) Middleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
diff --git a/pkg/services/query/query.go b/pkg/services/query/query.go
index bd8ae4e30eb..ba005572541 100644
--- a/pkg/services/query/query.go
+++ b/pkg/services/query/query.go
@@ -3,9 +3,12 @@ package query
import (
"context"
"fmt"
+ "net/http"
+ "runtime"
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
+ "golang.org/x/exp/slices"
"golang.org/x/sync/errgroup"
"github.com/grafana/grafana/pkg/api/dtos"
@@ -13,6 +16,7 @@ import (
"github.com/grafana/grafana/pkg/expr"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/plugins"
+ "github.com/grafana/grafana/pkg/services/contexthandler"
"github.com/grafana/grafana/pkg/services/datasources"
"github.com/grafana/grafana/pkg/services/pluginsintegration/plugincontext"
"github.com/grafana/grafana/pkg/services/user"
@@ -48,6 +52,7 @@ func ProvideService(
pluginClient: pluginClient,
pCtxProvider: pCtxProvider,
log: log.New("query_data"),
+ concurrentQueryLimit: cfg.SectionWithEnvOverrides("query").Key("concurrent_query_limit").MustInt(runtime.NumCPU()),
}
g.log.Info("Query Service initialization")
return g
@@ -70,6 +75,7 @@ type ServiceImpl struct {
pluginClient plugins.Client
pCtxProvider *plugincontext.Provider
log log.Logger
+ concurrentQueryLimit int
}
// Run ServiceImpl.
@@ -98,14 +104,17 @@ func (s *ServiceImpl) QueryData(ctx context.Context, user *user.SignedInUser, sk
return s.executeConcurrentQueries(ctx, user, skipDSCache, reqDTO, parsedReq.parsedQueries)
}
+// splitResponse contains the results of a concurrent data source query - the response and any headers
+type splitResponse struct {
+ responses backend.Responses
+ header http.Header
+}
+
// executeConcurrentQueries executes queries to multiple datasources concurrently and returns the aggregate result.
func (s *ServiceImpl) executeConcurrentQueries(ctx context.Context, user *user.SignedInUser, skipDSCache bool, reqDTO dtos.MetricRequest, queriesbyDs map[string][]parsedQuery) (*backend.QueryDataResponse, error) {
g, ctx := errgroup.WithContext(ctx)
- // TODO: Temporarily limiting concurrency here to 1 to avoid concurrent map writes in the plugin middleware that crash the app
- // This is a workaround to mitigate the security issue. We will implement a more thread-safe way of handling concurrent queries as a next step.
- g.SetLimit(1)
- // g.SetLimit(8) // arbitrary limit to prevent too many concurrent requests
- rchan := make(chan backend.Responses, len(queriesbyDs))
+ g.SetLimit(s.concurrentQueryLimit) // prevent too many concurrent requests
+ rchan := make(chan splitResponse, len(queriesbyDs))
// Create panic recovery function for loop below
recoveryFn := func(queries []*simplejson.Json) {
@@ -135,9 +144,14 @@ func (s *ServiceImpl) executeConcurrentQueries(ctx context.Context, user *user.S
// Handle panics in the datasource qery
defer recoveryFn(subDTO.Queries)
- subResp, err := s.QueryData(ctx, user, skipDSCache, subDTO)
+ ctxCopy := contexthandler.CopyWithReqContext(ctx)
+ subResp, err := s.QueryData(ctxCopy, user, skipDSCache, subDTO)
if err == nil {
- rchan <- subResp.Responses
+ reqCtx, header := contexthandler.FromContext(ctxCopy), http.Header{}
+ if reqCtx != nil {
+ header = reqCtx.Resp.Header()
+ }
+ rchan <- splitResponse{subResp.Responses, header}
} else {
// If there was an error, return an error response for each query for this datasource
rchan <- buildErrorResponses(err, subDTO.Queries)
@@ -151,24 +165,36 @@ func (s *ServiceImpl) executeConcurrentQueries(ctx context.Context, user *user.S
}
close(rchan)
resp := backend.NewQueryDataResponse()
+ reqCtx := contexthandler.FromContext(ctx)
for result := range rchan {
- for refId, dataResponse := range result {
+ for refId, dataResponse := range result.responses {
resp.Responses[refId] = dataResponse
}
+ if reqCtx != nil {
+ for k, v := range result.header {
+ for _, val := range v {
+ if !slices.Contains(reqCtx.Resp.Header().Values(k), val) {
+ reqCtx.Resp.Header().Add(k, val)
+ } else {
+ s.log.Warn("skipped duplicate response header", "header", k, "value", val)
+ }
+ }
+ }
+ }
}
return resp, nil
}
// buildErrorResponses applies the provided error to each query response in the list. These queries should all belong to the same datasource.
-func buildErrorResponses(err error, queries []*simplejson.Json) backend.Responses {
+func buildErrorResponses(err error, queries []*simplejson.Json) splitResponse {
er := backend.Responses{}
for _, query := range queries {
er[query.Get("refId").MustString("A")] = backend.DataResponse{
Error: err,
}
}
- return er
+ return splitResponse{er, http.Header{}}
}
// handleExpressions handles POST /api/ds/query when there is an expression.
diff --git a/pkg/services/query/query_test.go b/pkg/services/query/query_test.go
index 22e0cf1264b..36497590bd2 100644
--- a/pkg/services/query/query_test.go
+++ b/pkg/services/query/query_test.go
@@ -6,7 +6,10 @@ import (
"errors"
"fmt"
"net/http"
+ "net/http/httptest"
+ "sync"
"testing"
+ "time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/stretchr/testify/assert"
@@ -21,6 +24,7 @@ import (
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/models/roletype"
"github.com/grafana/grafana/pkg/plugins"
+ "github.com/grafana/grafana/pkg/services/contexthandler"
"github.com/grafana/grafana/pkg/services/contexthandler/ctxkey"
contextmodel "github.com/grafana/grafana/pkg/services/contexthandler/model"
"github.com/grafana/grafana/pkg/services/datasources"
@@ -289,9 +293,23 @@ func TestQueryDataMultipleSources(t *testing.T) {
PublicDashboardAccessToken: "abc123",
}
- _, err = tc.queryService.QueryData(context.Background(), tc.signedInUser, true, reqDTO)
+ req, err := http.NewRequest("POST", "http://localhost:3000", nil)
+ require.NoError(t, err)
+ reqCtx := &contextmodel.ReqContext{
+ SkipQueryCache: false,
+ Context: &web.Context{
+ Resp: web.NewResponseWriter(http.MethodGet, httptest.NewRecorder()),
+ Req: req,
+ },
+ }
+ ctx := ctxkey.Set(context.Background(), reqCtx)
+ _, err = tc.queryService.QueryData(ctx, tc.signedInUser, true, reqDTO)
require.NoError(t, err)
+
+ // response headers should be merged
+ header := contexthandler.FromContext(ctx).Resp.Header()
+ assert.Len(t, header.Values("test"), 2)
})
t.Run("can query multiple datasources with an expression present", func(t *testing.T) {
@@ -527,9 +545,13 @@ func (c *fakeDataSourceCache) GetDatasourceByUID(ctx context.Context, datasource
type fakePluginClient struct {
plugins.Client
req *backend.QueryDataRequest
+ mu sync.Mutex
}
func (c *fakePluginClient) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
c.req = req
// If an expression query ends up getting directly queried, we want it to return an error in our test.
@@ -541,5 +563,9 @@ func (c *fakePluginClient) QueryData(ctx context.Context, req *backend.QueryData
return nil, errors.New("plugin client failed")
}
+ if reqCtx := contexthandler.FromContext(ctx); reqCtx != nil && reqCtx.Resp != nil {
+ reqCtx.Resp.Header().Add("test", fmt.Sprintf("header-%d", time.Now().Nanosecond()))
+ }
+
return &backend.QueryDataResponse{Responses: make(backend.Responses)}, nil
}