@ -40,6 +40,9 @@ type rawNode struct {
QueryType string
TimeRange TimeRange
DataSource * datasources . DataSource
// We use this index as the id of the node graph so the order can remain during a the stable sort of the dependency graph execution order.
// Some data sources, such as cloud watch, have order dependencies between queries.
idx int64
}
func ( rn * rawNode ) GetCommandType ( ) ( c CommandType , err error ) {
@ -97,7 +100,7 @@ func buildCMDNode(dp *simple.DirectedGraph, rn *rawNode) (*CMDNode, error) {
node := & CMDNode {
baseNode : baseNode {
id : dp . NewNode ( ) . ID ( ) ,
id : rn . idx ,
refID : rn . RefID ,
} ,
CMDType : commandType ,
@ -159,7 +162,7 @@ func (s *Service) buildDSNode(dp *simple.DirectedGraph, rn *rawNode, req *Reques
dsNode := & DSNode {
baseNode : baseNode {
id : dp . NewNode ( ) . ID ( ) ,
id : rn . idx ,
refID : rn . RefID ,
} ,
orgID : req . OrgId ,
@ -191,69 +194,104 @@ func (s *Service) buildDSNode(dp *simple.DirectedGraph, rn *rawNode, req *Reques
return dsNode , nil
}
// Execute runs the node and adds the results to vars. If the node requires
// other nodes they must have already been executed and their results must
// already by in vars.
func ( dn * DSNode ) Execute ( ctx context . Context , now time . Time , _ mathexp . Vars , s * Service ) ( r mathexp . Results , e error ) {
logger := logger . FromContext ( ctx ) . New ( "datasourceType" , dn . datasource . Type , "queryRefId" , dn . refID , "datasourceUid" , dn . datasource . UID , "datasourceVersion" , dn . datasource . Version )
ctx , span := s . tracer . Start ( ctx , "SSE.ExecuteDatasourceQuery" )
defer span . End ( )
// executeDSNodesGrouped groups datasource node queries by the datasource instance, and then sends them
// in a single request with one or more queries to the datasource.
func executeDSNodesGrouped ( ctx context . Context , now time . Time , vars mathexp . Vars , s * Service , nodes [ ] * DSNode ) ( e error ) {
type dsKey struct {
uid string // in theory I think this all I need for the key, but rather be safe
id int64
orgID int64
}
byDS := make ( map [ dsKey ] [ ] * DSNode )
for _ , node := range nodes {
k := dsKey { id : node . datasource . ID , uid : node . datasource . UID , orgID : node . orgID }
byDS [ k ] = append ( byDS [ k ] , node )
}
for _ , nodeGroup := range byDS {
if err := func ( ) error {
ctx , span := s . tracer . Start ( ctx , "SSE.ExecuteDatasourceQuery" )
defer span . End ( )
firstNode := nodeGroup [ 0 ]
pCtx , err := s . pCtxProvider . GetWithDataSource ( ctx , firstNode . datasource . Type , firstNode . request . User , firstNode . datasource )
if err != nil {
return err
}
pCtx , err := s . pCtxProvider . GetWithDataSource ( ctx , dn . datasource . Type , dn . request . User , dn . datasource )
if err != nil {
return mathexp . Results { } , err
}
span . SetAttributes ( "datasource.type" , dn . datasource . Type , attribute . Key ( "datasource.type" ) . String ( dn . datasource . Type ) )
span . SetAttributes ( "datasource.uid" , dn . datasource . UID , attribute . Key ( "datasource.uid" ) . String ( dn . datasource . UID ) )
req := & backend . QueryDataRequest {
PluginContext : pCtx ,
Queries : [ ] backend . DataQuery {
{
RefID : dn . refID ,
MaxDataPoints : dn . maxDP ,
Interval : time . Duration ( int64 ( time . Millisecond ) * dn . intervalMS ) ,
JSON : dn . query ,
TimeRange : dn . timeRange . AbsoluteTime ( now ) ,
QueryType : dn . queryType ,
} ,
} ,
Headers : dn . request . Headers ,
}
responseType := "unknown"
respStatus := "success"
defer func ( ) {
if e != nil {
responseType = "error"
respStatus = "failure"
span . AddEvents ( [ ] string { "error" , "message" } ,
[ ] tracing . EventValue {
{ Str : fmt . Sprintf ( "%v" , err ) } ,
{ Str : "failed to query data source" } ,
logger := logger . FromContext ( ctx ) . New ( "datasourceType" , firstNode . datasource . Type ,
"queryRefId" , firstNode . refID ,
"datasourceUid" , firstNode . datasource . UID ,
"datasourceVersion" , firstNode . datasource . Version ,
)
span . SetAttributes ( "datasource.type" , firstNode . datasource . Type , attribute . Key ( "datasource.type" ) . String ( firstNode . datasource . Type ) )
span . SetAttributes ( "datasource.uid" , firstNode . datasource . UID , attribute . Key ( "datasource.uid" ) . String ( firstNode . datasource . UID ) )
req := & backend . QueryDataRequest {
PluginContext : pCtx ,
Headers : firstNode . request . Headers ,
}
for _ , dn := range nodeGroup {
req . Queries = append ( req . Queries , backend . DataQuery {
RefID : dn . refID ,
MaxDataPoints : dn . maxDP ,
Interval : time . Duration ( int64 ( time . Millisecond ) * dn . intervalMS ) ,
JSON : dn . query ,
TimeRange : dn . timeRange . AbsoluteTime ( now ) ,
QueryType : dn . queryType ,
} )
}
logger . Debug ( "Data source queried" , "responseType" , responseType )
useDataplane := strings . HasPrefix ( responseType , "dataplane-" )
s . metrics . dsRequests . WithLabelValues ( respStatus , fmt . Sprintf ( "%t" , useDataplane ) , dn . datasource . Type ) . Inc ( )
} ( )
}
resp , err := s . dataService . QueryData ( ctx , req )
if err != nil {
return mathexp . Results { } , MakeQueryError ( dn . refID , dn . datasource . UID , err )
}
responseType := "unknown"
respStatus := "success"
defer func ( ) {
if e != nil {
responseType = "error"
respStatus = "failure"
span . AddEvents ( [ ] string { "error" , "message" } ,
[ ] tracing . EventValue {
{ Str : fmt . Sprintf ( "%v" , err ) } ,
{ Str : "failed to query data source" } ,
} )
}
logger . Debug ( "Data source queried" , "responseType" , responseType )
useDataplane := strings . HasPrefix ( responseType , "dataplane-" )
s . metrics . dsRequests . WithLabelValues ( respStatus , fmt . Sprintf ( "%t" , useDataplane ) , firstNode . datasource . Type ) . Inc ( )
} ( )
dataFrames , err := getResponseFrame ( resp , dn . refID )
if err != nil {
return mathexp . Results { } , MakeQueryError ( dn . refID , dn . datasource . UID , err )
}
resp , err := s . dataService . QueryData ( ctx , req )
if err != nil {
return MakeQueryError ( firstNode . refID , firstNode . datasource . UID , err )
}
var result mathexp . Results
responseType , result , err = convertDataFramesToResults ( ctx , dataFrames , dn . datasource . Type , s , logger )
if err != nil {
err = MakeConversionError ( dn . refID , err )
for _ , dn := range nodeGroup {
dataFrames , err := getResponseFrame ( resp , dn . refID )
if err != nil {
return MakeQueryError ( dn . refID , dn . datasource . UID , err )
}
var result mathexp . Results
responseType , result , err = convertDataFramesToResults ( ctx , dataFrames , dn . datasource . Type , s , logger )
if err != nil {
return MakeConversionError ( dn . refID , err )
}
vars [ dn . refID ] = result
}
return nil
} ( ) ; err != nil {
return err
}
}
return result , err
return nil
}
// Execute runs the node and adds the results to vars. If the node requires
// other nodes they must have already been executed and their results must
// already by in vars.
func ( dn * DSNode ) Execute ( ctx context . Context , now time . Time , _ mathexp . Vars , s * Service ) ( r mathexp . Results , e error ) {
panic ( "Execute called on DSNode and should not be" )
// Datasource queries are sent as a group to the datasource, see executeDSNodesGrouped.
}
func getResponseFrame ( resp * backend . QueryDataResponse , refID string ) ( data . Frames , error ) {