Query: Fix concurrency handling for mixed datasource queries (#70100)

* split queries and merge responses

* increase concurrency again

* update unit test to verify the headers are merged

* fix lint issue

* fix race condition in unit test

* Fix function name and add a bit more documentation about how the func should be used

* update function call after rename

* check for duplicate header vals

* make concurrent query limit configurable

* Update conf/sample.ini

Co-authored-by: Sofia Papagiannaki <1632407+papagian@users.noreply.github.com>

---------

Co-authored-by: Sofia Papagiannaki <1632407+papagian@users.noreply.github.com>
pull/70693/head^2
Michael Mandrus 2 years ago committed by GitHub
parent d6c468c1c2
commit ff6d6659fb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      conf/defaults.ini
  2. 5
      conf/sample.ini
  3. 10
      docs/sources/setup-grafana/configure-grafana/_index.md
  4. 31
      pkg/services/contexthandler/contexthandler.go
  5. 46
      pkg/services/query/query.go
  6. 28
      pkg/services/query/query_test.go

@ -1262,6 +1262,11 @@ enabled = true
# Enable the news feed section # Enable the news feed section
news_feed_enabled = true news_feed_enabled = true
#################################### Query #############################
[query]
# Set the number of data source queries that can be executed concurrently in mixed queries. Default is the number of CPUs.
concurrent_query_limit =
#################################### Query History ############################# #################################### Query History #############################
[query_history] [query_history]
# Enable the Query history # Enable the Query history

@ -1188,6 +1188,11 @@
# Enable the news feed section # Enable the news feed section
; news_feed_enabled = true ; news_feed_enabled = true
#################################### Query #############################
[query]
# Set the number of data source queries that can be executed concurrently in mixed queries. Default is the number of CPUs.
;concurrent_query_limit =
#################################### Query History ############################# #################################### Query History #############################
[query_history] [query_history]
# Enable the Query history # Enable the Query history

@ -1669,6 +1669,14 @@ Enable or disable the Profile section. Default is `enabled`.
Enables the news feed section. Default is `true` Enables the news feed section. Default is `true`
<hr>
## [query]
### concurrent_query_limit
Set the number of queries that can be executed concurrently in a mixed data source panel. Default is the number of CPUs.
## [query_history] ## [query_history]
Configures Query history in Explore. Configures Query history in Explore.
@ -1677,6 +1685,8 @@ Configures Query history in Explore.
Enable or disable the Query history. Default is `enabled`. Enable or disable the Query history. Default is `enabled`.
<hr>
## [metrics] ## [metrics]
For detailed instructions, refer to [Internal Grafana metrics]({{< relref "../set-up-grafana-monitoring" >}}). For detailed instructions, refer to [Internal Grafana metrics]({{< relref "../set-up-grafana-monitoring" >}}).

@ -13,6 +13,7 @@ import (
"golang.org/x/sync/singleflight" "golang.org/x/sync/singleflight"
"github.com/grafana/grafana/pkg/api/response"
"github.com/grafana/grafana/pkg/components/apikeygen" "github.com/grafana/grafana/pkg/components/apikeygen"
"github.com/grafana/grafana/pkg/components/satokengen" "github.com/grafana/grafana/pkg/components/satokengen"
"github.com/grafana/grafana/pkg/infra/db" "github.com/grafana/grafana/pkg/infra/db"
@ -110,6 +111,36 @@ func FromContext(c context.Context) *contextmodel.ReqContext {
return nil return nil
} }
// CopyWithReqContext returns a copy of the parent context with a semi-shallow copy of the ReqContext as a value.
// The ReqContexts's *web.Context is deep copied so that headers are thread-safe; additional properties are shallow copied and should be treated as read-only.
func CopyWithReqContext(ctx context.Context) context.Context {
origReqCtx := FromContext(ctx)
if origReqCtx == nil {
return ctx
}
webCtx := &web.Context{
Req: origReqCtx.Req.Clone(ctx),
Resp: web.NewResponseWriter(origReqCtx.Req.Method, response.CreateNormalResponse(http.Header{}, []byte{}, 0)),
}
reqCtx := &contextmodel.ReqContext{
Context: webCtx,
SignedInUser: origReqCtx.SignedInUser,
UserToken: origReqCtx.UserToken,
IsSignedIn: origReqCtx.IsSignedIn,
IsRenderCall: origReqCtx.IsRenderCall,
AllowAnonymous: origReqCtx.AllowAnonymous,
SkipDSCache: origReqCtx.SkipDSCache,
SkipQueryCache: origReqCtx.SkipQueryCache,
Logger: origReqCtx.Logger,
Error: origReqCtx.Error,
RequestNonce: origReqCtx.RequestNonce,
IsPublicDashboardView: origReqCtx.IsPublicDashboardView,
LookupTokenErr: origReqCtx.LookupTokenErr,
}
return context.WithValue(ctx, reqContextKey{}, reqCtx)
}
// Middleware provides a middleware to initialize the request context. // Middleware provides a middleware to initialize the request context.
func (h *ContextHandler) Middleware(next http.Handler) http.Handler { func (h *ContextHandler) Middleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {

@ -3,9 +3,12 @@ package query
import ( import (
"context" "context"
"fmt" "fmt"
"net/http"
"runtime"
"time" "time"
"github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/backend"
"golang.org/x/exp/slices"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
"github.com/grafana/grafana/pkg/api/dtos" "github.com/grafana/grafana/pkg/api/dtos"
@ -13,6 +16,7 @@ import (
"github.com/grafana/grafana/pkg/expr" "github.com/grafana/grafana/pkg/expr"
"github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/plugins" "github.com/grafana/grafana/pkg/plugins"
"github.com/grafana/grafana/pkg/services/contexthandler"
"github.com/grafana/grafana/pkg/services/datasources" "github.com/grafana/grafana/pkg/services/datasources"
"github.com/grafana/grafana/pkg/services/pluginsintegration/plugincontext" "github.com/grafana/grafana/pkg/services/pluginsintegration/plugincontext"
"github.com/grafana/grafana/pkg/services/user" "github.com/grafana/grafana/pkg/services/user"
@ -48,6 +52,7 @@ func ProvideService(
pluginClient: pluginClient, pluginClient: pluginClient,
pCtxProvider: pCtxProvider, pCtxProvider: pCtxProvider,
log: log.New("query_data"), log: log.New("query_data"),
concurrentQueryLimit: cfg.SectionWithEnvOverrides("query").Key("concurrent_query_limit").MustInt(runtime.NumCPU()),
} }
g.log.Info("Query Service initialization") g.log.Info("Query Service initialization")
return g return g
@ -70,6 +75,7 @@ type ServiceImpl struct {
pluginClient plugins.Client pluginClient plugins.Client
pCtxProvider *plugincontext.Provider pCtxProvider *plugincontext.Provider
log log.Logger log log.Logger
concurrentQueryLimit int
} }
// Run ServiceImpl. // Run ServiceImpl.
@ -98,14 +104,17 @@ func (s *ServiceImpl) QueryData(ctx context.Context, user *user.SignedInUser, sk
return s.executeConcurrentQueries(ctx, user, skipDSCache, reqDTO, parsedReq.parsedQueries) return s.executeConcurrentQueries(ctx, user, skipDSCache, reqDTO, parsedReq.parsedQueries)
} }
// splitResponse contains the results of a concurrent data source query - the response and any headers
type splitResponse struct {
responses backend.Responses
header http.Header
}
// executeConcurrentQueries executes queries to multiple datasources concurrently and returns the aggregate result. // executeConcurrentQueries executes queries to multiple datasources concurrently and returns the aggregate result.
func (s *ServiceImpl) executeConcurrentQueries(ctx context.Context, user *user.SignedInUser, skipDSCache bool, reqDTO dtos.MetricRequest, queriesbyDs map[string][]parsedQuery) (*backend.QueryDataResponse, error) { func (s *ServiceImpl) executeConcurrentQueries(ctx context.Context, user *user.SignedInUser, skipDSCache bool, reqDTO dtos.MetricRequest, queriesbyDs map[string][]parsedQuery) (*backend.QueryDataResponse, error) {
g, ctx := errgroup.WithContext(ctx) g, ctx := errgroup.WithContext(ctx)
// TODO: Temporarily limiting concurrency here to 1 to avoid concurrent map writes in the plugin middleware that crash the app g.SetLimit(s.concurrentQueryLimit) // prevent too many concurrent requests
// This is a workaround to mitigate the security issue. We will implement a more thread-safe way of handling concurrent queries as a next step. rchan := make(chan splitResponse, len(queriesbyDs))
g.SetLimit(1)
// g.SetLimit(8) // arbitrary limit to prevent too many concurrent requests
rchan := make(chan backend.Responses, len(queriesbyDs))
// Create panic recovery function for loop below // Create panic recovery function for loop below
recoveryFn := func(queries []*simplejson.Json) { recoveryFn := func(queries []*simplejson.Json) {
@ -135,9 +144,14 @@ func (s *ServiceImpl) executeConcurrentQueries(ctx context.Context, user *user.S
// Handle panics in the datasource qery // Handle panics in the datasource qery
defer recoveryFn(subDTO.Queries) defer recoveryFn(subDTO.Queries)
subResp, err := s.QueryData(ctx, user, skipDSCache, subDTO) ctxCopy := contexthandler.CopyWithReqContext(ctx)
subResp, err := s.QueryData(ctxCopy, user, skipDSCache, subDTO)
if err == nil { if err == nil {
rchan <- subResp.Responses reqCtx, header := contexthandler.FromContext(ctxCopy), http.Header{}
if reqCtx != nil {
header = reqCtx.Resp.Header()
}
rchan <- splitResponse{subResp.Responses, header}
} else { } else {
// If there was an error, return an error response for each query for this datasource // If there was an error, return an error response for each query for this datasource
rchan <- buildErrorResponses(err, subDTO.Queries) rchan <- buildErrorResponses(err, subDTO.Queries)
@ -151,24 +165,36 @@ func (s *ServiceImpl) executeConcurrentQueries(ctx context.Context, user *user.S
} }
close(rchan) close(rchan)
resp := backend.NewQueryDataResponse() resp := backend.NewQueryDataResponse()
reqCtx := contexthandler.FromContext(ctx)
for result := range rchan { for result := range rchan {
for refId, dataResponse := range result { for refId, dataResponse := range result.responses {
resp.Responses[refId] = dataResponse resp.Responses[refId] = dataResponse
} }
if reqCtx != nil {
for k, v := range result.header {
for _, val := range v {
if !slices.Contains(reqCtx.Resp.Header().Values(k), val) {
reqCtx.Resp.Header().Add(k, val)
} else {
s.log.Warn("skipped duplicate response header", "header", k, "value", val)
}
}
}
}
} }
return resp, nil return resp, nil
} }
// buildErrorResponses applies the provided error to each query response in the list. These queries should all belong to the same datasource. // buildErrorResponses applies the provided error to each query response in the list. These queries should all belong to the same datasource.
func buildErrorResponses(err error, queries []*simplejson.Json) backend.Responses { func buildErrorResponses(err error, queries []*simplejson.Json) splitResponse {
er := backend.Responses{} er := backend.Responses{}
for _, query := range queries { for _, query := range queries {
er[query.Get("refId").MustString("A")] = backend.DataResponse{ er[query.Get("refId").MustString("A")] = backend.DataResponse{
Error: err, Error: err,
} }
} }
return er return splitResponse{er, http.Header{}}
} }
// handleExpressions handles POST /api/ds/query when there is an expression. // handleExpressions handles POST /api/ds/query when there is an expression.

@ -6,7 +6,10 @@ import (
"errors" "errors"
"fmt" "fmt"
"net/http" "net/http"
"net/http/httptest"
"sync"
"testing" "testing"
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
@ -21,6 +24,7 @@ import (
"github.com/grafana/grafana/pkg/infra/tracing" "github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/models/roletype" "github.com/grafana/grafana/pkg/models/roletype"
"github.com/grafana/grafana/pkg/plugins" "github.com/grafana/grafana/pkg/plugins"
"github.com/grafana/grafana/pkg/services/contexthandler"
"github.com/grafana/grafana/pkg/services/contexthandler/ctxkey" "github.com/grafana/grafana/pkg/services/contexthandler/ctxkey"
contextmodel "github.com/grafana/grafana/pkg/services/contexthandler/model" contextmodel "github.com/grafana/grafana/pkg/services/contexthandler/model"
"github.com/grafana/grafana/pkg/services/datasources" "github.com/grafana/grafana/pkg/services/datasources"
@ -289,9 +293,23 @@ func TestQueryDataMultipleSources(t *testing.T) {
PublicDashboardAccessToken: "abc123", PublicDashboardAccessToken: "abc123",
} }
_, err = tc.queryService.QueryData(context.Background(), tc.signedInUser, true, reqDTO) req, err := http.NewRequest("POST", "http://localhost:3000", nil)
require.NoError(t, err)
reqCtx := &contextmodel.ReqContext{
SkipQueryCache: false,
Context: &web.Context{
Resp: web.NewResponseWriter(http.MethodGet, httptest.NewRecorder()),
Req: req,
},
}
ctx := ctxkey.Set(context.Background(), reqCtx)
_, err = tc.queryService.QueryData(ctx, tc.signedInUser, true, reqDTO)
require.NoError(t, err) require.NoError(t, err)
// response headers should be merged
header := contexthandler.FromContext(ctx).Resp.Header()
assert.Len(t, header.Values("test"), 2)
}) })
t.Run("can query multiple datasources with an expression present", func(t *testing.T) { t.Run("can query multiple datasources with an expression present", func(t *testing.T) {
@ -527,9 +545,13 @@ func (c *fakeDataSourceCache) GetDatasourceByUID(ctx context.Context, datasource
type fakePluginClient struct { type fakePluginClient struct {
plugins.Client plugins.Client
req *backend.QueryDataRequest req *backend.QueryDataRequest
mu sync.Mutex
} }
func (c *fakePluginClient) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) { func (c *fakePluginClient) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
c.mu.Lock()
defer c.mu.Unlock()
c.req = req c.req = req
// If an expression query ends up getting directly queried, we want it to return an error in our test. // If an expression query ends up getting directly queried, we want it to return an error in our test.
@ -541,5 +563,9 @@ func (c *fakePluginClient) QueryData(ctx context.Context, req *backend.QueryData
return nil, errors.New("plugin client failed") return nil, errors.New("plugin client failed")
} }
if reqCtx := contexthandler.FromContext(ctx); reqCtx != nil && reqCtx.Resp != nil {
reqCtx.Resp.Header().Add("test", fmt.Sprintf("header-%d", time.Now().Nanosecond()))
}
return &backend.QueryDataResponse{Responses: make(backend.Responses)}, nil return &backend.QueryDataResponse{Responses: make(backend.Responses)}, nil
} }

Loading…
Cancel
Save