diff --git a/conf/defaults.ini b/conf/defaults.ini index 8e365834b9f..79a9ed4e8f0 100644 --- a/conf/defaults.ini +++ b/conf/defaults.ini @@ -1262,6 +1262,11 @@ enabled = true # Enable the news feed section news_feed_enabled = true +#################################### Query ############################# +[query] +# Set the number of data source queries that can be executed concurrently in mixed queries. Default is the number of CPUs. +concurrent_query_limit = + #################################### Query History ############################# [query_history] # Enable the Query history diff --git a/conf/sample.ini b/conf/sample.ini index ea22fdaa6b9..44553d40561 100644 --- a/conf/sample.ini +++ b/conf/sample.ini @@ -1188,6 +1188,11 @@ # Enable the news feed section ; news_feed_enabled = true +#################################### Query ############################# +[query] +# Set the number of data source queries that can be executed concurrently in mixed queries. Default is the number of CPUs. +;concurrent_query_limit = + #################################### Query History ############################# [query_history] # Enable the Query history diff --git a/docs/sources/setup-grafana/configure-grafana/_index.md b/docs/sources/setup-grafana/configure-grafana/_index.md index 38661540e1d..4af59af6325 100644 --- a/docs/sources/setup-grafana/configure-grafana/_index.md +++ b/docs/sources/setup-grafana/configure-grafana/_index.md @@ -1669,6 +1669,14 @@ Enable or disable the Profile section. Default is `enabled`. Enables the news feed section. Default is `true` +
+ +## [query] + +### concurrent_query_limit + +Set the number of queries that can be executed concurrently in a mixed data source panel. Default is the number of CPUs. + ## [query_history] Configures Query history in Explore. @@ -1677,6 +1685,8 @@ Configures Query history in Explore. Enable or disable the Query history. Default is `enabled`. +
+ ## [metrics] For detailed instructions, refer to [Internal Grafana metrics]({{< relref "../set-up-grafana-monitoring" >}}). diff --git a/pkg/services/contexthandler/contexthandler.go b/pkg/services/contexthandler/contexthandler.go index 1ea23f6357c..5b9ed22e3cb 100644 --- a/pkg/services/contexthandler/contexthandler.go +++ b/pkg/services/contexthandler/contexthandler.go @@ -13,6 +13,7 @@ import ( "golang.org/x/sync/singleflight" + "github.com/grafana/grafana/pkg/api/response" "github.com/grafana/grafana/pkg/components/apikeygen" "github.com/grafana/grafana/pkg/components/satokengen" "github.com/grafana/grafana/pkg/infra/db" @@ -110,6 +111,36 @@ func FromContext(c context.Context) *contextmodel.ReqContext { return nil } +// CopyWithReqContext returns a copy of the parent context with a semi-shallow copy of the ReqContext as a value. +// The ReqContexts's *web.Context is deep copied so that headers are thread-safe; additional properties are shallow copied and should be treated as read-only. +func CopyWithReqContext(ctx context.Context) context.Context { + origReqCtx := FromContext(ctx) + if origReqCtx == nil { + return ctx + } + + webCtx := &web.Context{ + Req: origReqCtx.Req.Clone(ctx), + Resp: web.NewResponseWriter(origReqCtx.Req.Method, response.CreateNormalResponse(http.Header{}, []byte{}, 0)), + } + reqCtx := &contextmodel.ReqContext{ + Context: webCtx, + SignedInUser: origReqCtx.SignedInUser, + UserToken: origReqCtx.UserToken, + IsSignedIn: origReqCtx.IsSignedIn, + IsRenderCall: origReqCtx.IsRenderCall, + AllowAnonymous: origReqCtx.AllowAnonymous, + SkipDSCache: origReqCtx.SkipDSCache, + SkipQueryCache: origReqCtx.SkipQueryCache, + Logger: origReqCtx.Logger, + Error: origReqCtx.Error, + RequestNonce: origReqCtx.RequestNonce, + IsPublicDashboardView: origReqCtx.IsPublicDashboardView, + LookupTokenErr: origReqCtx.LookupTokenErr, + } + return context.WithValue(ctx, reqContextKey{}, reqCtx) +} + // Middleware provides a middleware to initialize the request context. func (h *ContextHandler) Middleware(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { diff --git a/pkg/services/query/query.go b/pkg/services/query/query.go index bd8ae4e30eb..ba005572541 100644 --- a/pkg/services/query/query.go +++ b/pkg/services/query/query.go @@ -3,9 +3,12 @@ package query import ( "context" "fmt" + "net/http" + "runtime" "time" "github.com/grafana/grafana-plugin-sdk-go/backend" + "golang.org/x/exp/slices" "golang.org/x/sync/errgroup" "github.com/grafana/grafana/pkg/api/dtos" @@ -13,6 +16,7 @@ import ( "github.com/grafana/grafana/pkg/expr" "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/plugins" + "github.com/grafana/grafana/pkg/services/contexthandler" "github.com/grafana/grafana/pkg/services/datasources" "github.com/grafana/grafana/pkg/services/pluginsintegration/plugincontext" "github.com/grafana/grafana/pkg/services/user" @@ -48,6 +52,7 @@ func ProvideService( pluginClient: pluginClient, pCtxProvider: pCtxProvider, log: log.New("query_data"), + concurrentQueryLimit: cfg.SectionWithEnvOverrides("query").Key("concurrent_query_limit").MustInt(runtime.NumCPU()), } g.log.Info("Query Service initialization") return g @@ -70,6 +75,7 @@ type ServiceImpl struct { pluginClient plugins.Client pCtxProvider *plugincontext.Provider log log.Logger + concurrentQueryLimit int } // Run ServiceImpl. @@ -98,14 +104,17 @@ func (s *ServiceImpl) QueryData(ctx context.Context, user *user.SignedInUser, sk return s.executeConcurrentQueries(ctx, user, skipDSCache, reqDTO, parsedReq.parsedQueries) } +// splitResponse contains the results of a concurrent data source query - the response and any headers +type splitResponse struct { + responses backend.Responses + header http.Header +} + // executeConcurrentQueries executes queries to multiple datasources concurrently and returns the aggregate result. func (s *ServiceImpl) executeConcurrentQueries(ctx context.Context, user *user.SignedInUser, skipDSCache bool, reqDTO dtos.MetricRequest, queriesbyDs map[string][]parsedQuery) (*backend.QueryDataResponse, error) { g, ctx := errgroup.WithContext(ctx) - // TODO: Temporarily limiting concurrency here to 1 to avoid concurrent map writes in the plugin middleware that crash the app - // This is a workaround to mitigate the security issue. We will implement a more thread-safe way of handling concurrent queries as a next step. - g.SetLimit(1) - // g.SetLimit(8) // arbitrary limit to prevent too many concurrent requests - rchan := make(chan backend.Responses, len(queriesbyDs)) + g.SetLimit(s.concurrentQueryLimit) // prevent too many concurrent requests + rchan := make(chan splitResponse, len(queriesbyDs)) // Create panic recovery function for loop below recoveryFn := func(queries []*simplejson.Json) { @@ -135,9 +144,14 @@ func (s *ServiceImpl) executeConcurrentQueries(ctx context.Context, user *user.S // Handle panics in the datasource qery defer recoveryFn(subDTO.Queries) - subResp, err := s.QueryData(ctx, user, skipDSCache, subDTO) + ctxCopy := contexthandler.CopyWithReqContext(ctx) + subResp, err := s.QueryData(ctxCopy, user, skipDSCache, subDTO) if err == nil { - rchan <- subResp.Responses + reqCtx, header := contexthandler.FromContext(ctxCopy), http.Header{} + if reqCtx != nil { + header = reqCtx.Resp.Header() + } + rchan <- splitResponse{subResp.Responses, header} } else { // If there was an error, return an error response for each query for this datasource rchan <- buildErrorResponses(err, subDTO.Queries) @@ -151,24 +165,36 @@ func (s *ServiceImpl) executeConcurrentQueries(ctx context.Context, user *user.S } close(rchan) resp := backend.NewQueryDataResponse() + reqCtx := contexthandler.FromContext(ctx) for result := range rchan { - for refId, dataResponse := range result { + for refId, dataResponse := range result.responses { resp.Responses[refId] = dataResponse } + if reqCtx != nil { + for k, v := range result.header { + for _, val := range v { + if !slices.Contains(reqCtx.Resp.Header().Values(k), val) { + reqCtx.Resp.Header().Add(k, val) + } else { + s.log.Warn("skipped duplicate response header", "header", k, "value", val) + } + } + } + } } return resp, nil } // buildErrorResponses applies the provided error to each query response in the list. These queries should all belong to the same datasource. -func buildErrorResponses(err error, queries []*simplejson.Json) backend.Responses { +func buildErrorResponses(err error, queries []*simplejson.Json) splitResponse { er := backend.Responses{} for _, query := range queries { er[query.Get("refId").MustString("A")] = backend.DataResponse{ Error: err, } } - return er + return splitResponse{er, http.Header{}} } // handleExpressions handles POST /api/ds/query when there is an expression. diff --git a/pkg/services/query/query_test.go b/pkg/services/query/query_test.go index 22e0cf1264b..36497590bd2 100644 --- a/pkg/services/query/query_test.go +++ b/pkg/services/query/query_test.go @@ -6,7 +6,10 @@ import ( "errors" "fmt" "net/http" + "net/http/httptest" + "sync" "testing" + "time" "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/stretchr/testify/assert" @@ -21,6 +24,7 @@ import ( "github.com/grafana/grafana/pkg/infra/tracing" "github.com/grafana/grafana/pkg/models/roletype" "github.com/grafana/grafana/pkg/plugins" + "github.com/grafana/grafana/pkg/services/contexthandler" "github.com/grafana/grafana/pkg/services/contexthandler/ctxkey" contextmodel "github.com/grafana/grafana/pkg/services/contexthandler/model" "github.com/grafana/grafana/pkg/services/datasources" @@ -289,9 +293,23 @@ func TestQueryDataMultipleSources(t *testing.T) { PublicDashboardAccessToken: "abc123", } - _, err = tc.queryService.QueryData(context.Background(), tc.signedInUser, true, reqDTO) + req, err := http.NewRequest("POST", "http://localhost:3000", nil) + require.NoError(t, err) + reqCtx := &contextmodel.ReqContext{ + SkipQueryCache: false, + Context: &web.Context{ + Resp: web.NewResponseWriter(http.MethodGet, httptest.NewRecorder()), + Req: req, + }, + } + ctx := ctxkey.Set(context.Background(), reqCtx) + _, err = tc.queryService.QueryData(ctx, tc.signedInUser, true, reqDTO) require.NoError(t, err) + + // response headers should be merged + header := contexthandler.FromContext(ctx).Resp.Header() + assert.Len(t, header.Values("test"), 2) }) t.Run("can query multiple datasources with an expression present", func(t *testing.T) { @@ -527,9 +545,13 @@ func (c *fakeDataSourceCache) GetDatasourceByUID(ctx context.Context, datasource type fakePluginClient struct { plugins.Client req *backend.QueryDataRequest + mu sync.Mutex } func (c *fakePluginClient) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) { + c.mu.Lock() + defer c.mu.Unlock() + c.req = req // If an expression query ends up getting directly queried, we want it to return an error in our test. @@ -541,5 +563,9 @@ func (c *fakePluginClient) QueryData(ctx context.Context, req *backend.QueryData return nil, errors.New("plugin client failed") } + if reqCtx := contexthandler.FromContext(ctx); reqCtx != nil && reqCtx.Resp != nil { + reqCtx.Resp.Header().Add("test", fmt.Sprintf("header-%d", time.Now().Nanosecond())) + } + return &backend.QueryDataResponse{Responses: make(backend.Responses)}, nil }