SSE: Group data source node execution by data source (#72935)

Execute all queries to the same datasource in a single request.

Uses the query index and the graph node ID index, and then a stable dependency graph sort based on node input index number in attempt to keep the original query order intact.
pull/73481/head
Kyle Brandt 2 years ago committed by GitHub
parent 5e61b54fa3
commit 720d716e45
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 24
      pkg/expr/graph.go
  2. 2
      pkg/expr/ml.go
  3. 156
      pkg/expr/nodes.go

@ -55,7 +55,24 @@ type DataPipeline []Node
// map of the refId of the of each command // map of the refId of the of each command
func (dp *DataPipeline) execute(c context.Context, now time.Time, s *Service) (mathexp.Vars, error) { func (dp *DataPipeline) execute(c context.Context, now time.Time, s *Service) (mathexp.Vars, error) {
vars := make(mathexp.Vars) vars := make(mathexp.Vars)
// Execute datasource nodes first, and grouped by datasource.
dsNodes := []*DSNode{}
for _, node := range *dp {
if node.NodeType() != TypeDatasourceNode {
continue
}
dsNodes = append(dsNodes, node.(*DSNode))
}
if err := executeDSNodesGrouped(c, now, vars, s, dsNodes); err != nil {
return nil, err
}
for _, node := range *dp { for _, node := range *dp {
if node.NodeType() == TypeDatasourceNode {
continue // already executed via executeDSNodesGrouped
}
c, span := s.tracer.Start(c, "SSE.ExecuteNode") c, span := s.tracer.Start(c, "SSE.ExecuteNode")
span.SetAttributes("node.refId", node.RefID(), attribute.Key("node.refId").String(node.RefID())) span.SetAttributes("node.refId", node.RefID(), attribute.Key("node.refId").String(node.RefID()))
if node.NodeType() == TypeCMDNode { if node.NodeType() == TypeCMDNode {
@ -112,8 +129,10 @@ func (s *Service) buildDependencyGraph(req *Request) (*simple.DirectedGraph, err
} }
// buildExecutionOrder returns a sequence of nodes ordered by dependency. // buildExecutionOrder returns a sequence of nodes ordered by dependency.
// Note: During execution, Datasource query nodes for the same datasource will
// be grouped into one request and executed first as phase after this call.
func buildExecutionOrder(graph *simple.DirectedGraph) ([]Node, error) { func buildExecutionOrder(graph *simple.DirectedGraph) ([]Node, error) {
sortedNodes, err := topo.Sort(graph) sortedNodes, err := topo.SortStabilized(graph, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -145,7 +164,7 @@ func buildNodeRegistry(g *simple.DirectedGraph) map[string]Node {
func (s *Service) buildGraph(req *Request) (*simple.DirectedGraph, error) { func (s *Service) buildGraph(req *Request) (*simple.DirectedGraph, error) {
dp := simple.NewDirectedGraph() dp := simple.NewDirectedGraph()
for _, query := range req.Queries { for i, query := range req.Queries {
if query.DataSource == nil || query.DataSource.UID == "" { if query.DataSource == nil || query.DataSource.UID == "" {
return nil, fmt.Errorf("missing datasource uid in query with refId %v", query.RefID) return nil, fmt.Errorf("missing datasource uid in query with refId %v", query.RefID)
} }
@ -169,6 +188,7 @@ func (s *Service) buildGraph(req *Request) (*simple.DirectedGraph, error) {
TimeRange: query.TimeRange, TimeRange: query.TimeRange,
QueryType: query.QueryType, QueryType: query.QueryType,
DataSource: query.DataSource, DataSource: query.DataSource,
idx: int64(i),
} }
var node Node var node Node

@ -141,7 +141,7 @@ func (s *Service) buildMLNode(dp *simple.DirectedGraph, rn *rawNode, req *Reques
return &MLNode{ return &MLNode{
baseNode: baseNode{ baseNode: baseNode{
id: dp.NewNode().ID(), id: rn.idx,
refID: rn.RefID, refID: rn.RefID,
}, },
TimeRange: rn.TimeRange, TimeRange: rn.TimeRange,

@ -40,6 +40,9 @@ type rawNode struct {
QueryType string QueryType string
TimeRange TimeRange TimeRange TimeRange
DataSource *datasources.DataSource DataSource *datasources.DataSource
// We use this index as the id of the node graph so the order can remain during a the stable sort of the dependency graph execution order.
// Some data sources, such as cloud watch, have order dependencies between queries.
idx int64
} }
func (rn *rawNode) GetCommandType() (c CommandType, err error) { func (rn *rawNode) GetCommandType() (c CommandType, err error) {
@ -97,7 +100,7 @@ func buildCMDNode(dp *simple.DirectedGraph, rn *rawNode) (*CMDNode, error) {
node := &CMDNode{ node := &CMDNode{
baseNode: baseNode{ baseNode: baseNode{
id: dp.NewNode().ID(), id: rn.idx,
refID: rn.RefID, refID: rn.RefID,
}, },
CMDType: commandType, CMDType: commandType,
@ -159,7 +162,7 @@ func (s *Service) buildDSNode(dp *simple.DirectedGraph, rn *rawNode, req *Reques
dsNode := &DSNode{ dsNode := &DSNode{
baseNode: baseNode{ baseNode: baseNode{
id: dp.NewNode().ID(), id: rn.idx,
refID: rn.RefID, refID: rn.RefID,
}, },
orgID: req.OrgId, orgID: req.OrgId,
@ -191,69 +194,104 @@ func (s *Service) buildDSNode(dp *simple.DirectedGraph, rn *rawNode, req *Reques
return dsNode, nil return dsNode, nil
} }
// Execute runs the node and adds the results to vars. If the node requires // executeDSNodesGrouped groups datasource node queries by the datasource instance, and then sends them
// other nodes they must have already been executed and their results must // in a single request with one or more queries to the datasource.
// already by in vars. func executeDSNodesGrouped(ctx context.Context, now time.Time, vars mathexp.Vars, s *Service, nodes []*DSNode) (e error) {
func (dn *DSNode) Execute(ctx context.Context, now time.Time, _ mathexp.Vars, s *Service) (r mathexp.Results, e error) { type dsKey struct {
logger := logger.FromContext(ctx).New("datasourceType", dn.datasource.Type, "queryRefId", dn.refID, "datasourceUid", dn.datasource.UID, "datasourceVersion", dn.datasource.Version) uid string // in theory I think this all I need for the key, but rather be safe
ctx, span := s.tracer.Start(ctx, "SSE.ExecuteDatasourceQuery") id int64
defer span.End() orgID int64
}
byDS := make(map[dsKey][]*DSNode)
for _, node := range nodes {
k := dsKey{id: node.datasource.ID, uid: node.datasource.UID, orgID: node.orgID}
byDS[k] = append(byDS[k], node)
}
for _, nodeGroup := range byDS {
if err := func() error {
ctx, span := s.tracer.Start(ctx, "SSE.ExecuteDatasourceQuery")
defer span.End()
firstNode := nodeGroup[0]
pCtx, err := s.pCtxProvider.GetWithDataSource(ctx, firstNode.datasource.Type, firstNode.request.User, firstNode.datasource)
if err != nil {
return err
}
pCtx, err := s.pCtxProvider.GetWithDataSource(ctx, dn.datasource.Type, dn.request.User, dn.datasource) logger := logger.FromContext(ctx).New("datasourceType", firstNode.datasource.Type,
if err != nil { "queryRefId", firstNode.refID,
return mathexp.Results{}, err "datasourceUid", firstNode.datasource.UID,
} "datasourceVersion", firstNode.datasource.Version,
span.SetAttributes("datasource.type", dn.datasource.Type, attribute.Key("datasource.type").String(dn.datasource.Type)) )
span.SetAttributes("datasource.uid", dn.datasource.UID, attribute.Key("datasource.uid").String(dn.datasource.UID))
span.SetAttributes("datasource.type", firstNode.datasource.Type, attribute.Key("datasource.type").String(firstNode.datasource.Type))
req := &backend.QueryDataRequest{ span.SetAttributes("datasource.uid", firstNode.datasource.UID, attribute.Key("datasource.uid").String(firstNode.datasource.UID))
PluginContext: pCtx,
Queries: []backend.DataQuery{ req := &backend.QueryDataRequest{
{ PluginContext: pCtx,
RefID: dn.refID, Headers: firstNode.request.Headers,
MaxDataPoints: dn.maxDP, }
Interval: time.Duration(int64(time.Millisecond) * dn.intervalMS),
JSON: dn.query, for _, dn := range nodeGroup {
TimeRange: dn.timeRange.AbsoluteTime(now), req.Queries = append(req.Queries, backend.DataQuery{
QueryType: dn.queryType, RefID: dn.refID,
}, MaxDataPoints: dn.maxDP,
}, Interval: time.Duration(int64(time.Millisecond) * dn.intervalMS),
Headers: dn.request.Headers, JSON: dn.query,
} TimeRange: dn.timeRange.AbsoluteTime(now),
QueryType: dn.queryType,
responseType := "unknown"
respStatus := "success"
defer func() {
if e != nil {
responseType = "error"
respStatus = "failure"
span.AddEvents([]string{"error", "message"},
[]tracing.EventValue{
{Str: fmt.Sprintf("%v", err)},
{Str: "failed to query data source"},
}) })
} }
logger.Debug("Data source queried", "responseType", responseType)
useDataplane := strings.HasPrefix(responseType, "dataplane-")
s.metrics.dsRequests.WithLabelValues(respStatus, fmt.Sprintf("%t", useDataplane), dn.datasource.Type).Inc()
}()
resp, err := s.dataService.QueryData(ctx, req) responseType := "unknown"
if err != nil { respStatus := "success"
return mathexp.Results{}, MakeQueryError(dn.refID, dn.datasource.UID, err) defer func() {
} if e != nil {
responseType = "error"
respStatus = "failure"
span.AddEvents([]string{"error", "message"},
[]tracing.EventValue{
{Str: fmt.Sprintf("%v", err)},
{Str: "failed to query data source"},
})
}
logger.Debug("Data source queried", "responseType", responseType)
useDataplane := strings.HasPrefix(responseType, "dataplane-")
s.metrics.dsRequests.WithLabelValues(respStatus, fmt.Sprintf("%t", useDataplane), firstNode.datasource.Type).Inc()
}()
dataFrames, err := getResponseFrame(resp, dn.refID) resp, err := s.dataService.QueryData(ctx, req)
if err != nil { if err != nil {
return mathexp.Results{}, MakeQueryError(dn.refID, dn.datasource.UID, err) return MakeQueryError(firstNode.refID, firstNode.datasource.UID, err)
} }
var result mathexp.Results for _, dn := range nodeGroup {
responseType, result, err = convertDataFramesToResults(ctx, dataFrames, dn.datasource.Type, s, logger) dataFrames, err := getResponseFrame(resp, dn.refID)
if err != nil { if err != nil {
err = MakeConversionError(dn.refID, err) return MakeQueryError(dn.refID, dn.datasource.UID, err)
}
var result mathexp.Results
responseType, result, err = convertDataFramesToResults(ctx, dataFrames, dn.datasource.Type, s, logger)
if err != nil {
return MakeConversionError(dn.refID, err)
}
vars[dn.refID] = result
}
return nil
}(); err != nil {
return err
}
} }
return result, err return nil
}
// Execute runs the node and adds the results to vars. If the node requires
// other nodes they must have already been executed and their results must
// already by in vars.
func (dn *DSNode) Execute(ctx context.Context, now time.Time, _ mathexp.Vars, s *Service) (r mathexp.Results, e error) {
panic("Execute called on DSNode and should not be")
// Datasource queries are sent as a group to the datasource, see executeDSNodesGrouped.
} }
func getResponseFrame(resp *backend.QueryDataResponse, refID string) (data.Frames, error) { func getResponseFrame(resp *backend.QueryDataResponse, refID string) (data.Frames, error) {

Loading…
Cancel
Save