|
|
|
@ -10,6 +10,7 @@ import ( |
|
|
|
|
|
|
|
|
|
"github.com/grafana/grafana-plugin-sdk-go/backend" |
|
|
|
|
"github.com/grafana/grafana-plugin-sdk-go/experimental/apis/data/v0alpha1" |
|
|
|
|
"github.com/grafana/grafana/pkg/registry/apis/query/clientapi" |
|
|
|
|
"github.com/grafana/grafana/pkg/services/datasources" |
|
|
|
|
"github.com/grafana/grafana/pkg/services/ngalert/models" |
|
|
|
|
"go.opentelemetry.io/otel/attribute" |
|
|
|
@ -160,8 +161,14 @@ func (r *queryREST) Connect(connectCtx context.Context, name string, _ runtime.O |
|
|
|
|
req.Requests[i].Headers = ExtractKnownHeaders(httpreq.Header) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Actually run the query
|
|
|
|
|
rsp, err := b.execute(ctx, req) |
|
|
|
|
// Fetch information on the grafana instance (e.g. feature toggles)
|
|
|
|
|
instanceConfig, err := b.clientSupplier.GetInstanceConfigurationSettings(ctx) |
|
|
|
|
if err != nil { |
|
|
|
|
b.log.Error("failed to get instance configuration settings", "err", err) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Actually run the query (includes expressions)
|
|
|
|
|
rsp, err := b.execute(ctx, req, instanceConfig) |
|
|
|
|
if err != nil { |
|
|
|
|
b.log.Error("execute error", "http code", query.GetResponseCode(rsp), "err", err) |
|
|
|
|
if rsp != nil { // if we have a response, we assume the err is set in the response
|
|
|
|
@ -183,14 +190,14 @@ func (r *queryREST) Connect(connectCtx context.Context, name string, _ runtime.O |
|
|
|
|
}), nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (b *QueryAPIBuilder) execute(ctx context.Context, req parsedRequestInfo) (qdr *backend.QueryDataResponse, err error) { |
|
|
|
|
func (b *QueryAPIBuilder) execute(ctx context.Context, req parsedRequestInfo, instanceConfig clientapi.InstanceConfigurationSettings) (qdr *backend.QueryDataResponse, err error) { |
|
|
|
|
switch len(req.Requests) { |
|
|
|
|
case 0: |
|
|
|
|
b.log.Debug("executing empty query") |
|
|
|
|
qdr = &backend.QueryDataResponse{} |
|
|
|
|
case 1: |
|
|
|
|
b.log.Debug("executing single query") |
|
|
|
|
qdr, err = b.handleQuerySingleDatasource(ctx, req.Requests[0]) |
|
|
|
|
qdr, err = b.handleQuerySingleDatasource(ctx, req.Requests[0], instanceConfig) |
|
|
|
|
if err != nil { |
|
|
|
|
b.log.Debug("handleQuerySingleDatasource failed", err) |
|
|
|
|
} |
|
|
|
@ -203,7 +210,7 @@ func (b *QueryAPIBuilder) execute(ctx context.Context, req parsedRequestInfo) (q |
|
|
|
|
} |
|
|
|
|
default: |
|
|
|
|
b.log.Debug("executing concurrent queries") |
|
|
|
|
qdr, err = b.executeConcurrentQueries(ctx, req.Requests) |
|
|
|
|
qdr, err = b.executeConcurrentQueries(ctx, req.Requests, instanceConfig) |
|
|
|
|
if err != nil { |
|
|
|
|
b.log.Debug("error in executeConcurrentQueries", "err", err) |
|
|
|
|
} |
|
|
|
@ -235,7 +242,7 @@ func (b *QueryAPIBuilder) execute(ctx context.Context, req parsedRequestInfo) (q |
|
|
|
|
|
|
|
|
|
// Process a single request
|
|
|
|
|
// See: https://github.com/grafana/grafana/blob/v10.2.3/pkg/services/query/query.go#L242
|
|
|
|
|
func (b *QueryAPIBuilder) handleQuerySingleDatasource(ctx context.Context, req datasourceRequest) (*backend.QueryDataResponse, error) { |
|
|
|
|
func (b *QueryAPIBuilder) handleQuerySingleDatasource(ctx context.Context, req datasourceRequest, instanceConfig clientapi.InstanceConfigurationSettings) (*backend.QueryDataResponse, error) { |
|
|
|
|
ctx, span := b.tracer.Start(ctx, "Query.handleQuerySingleDatasource") |
|
|
|
|
defer span.End() |
|
|
|
|
span.SetAttributes( |
|
|
|
@ -261,6 +268,7 @@ func (b *QueryAPIBuilder) handleQuerySingleDatasource(ctx context.Context, req d |
|
|
|
|
UID: req.UID, |
|
|
|
|
}, |
|
|
|
|
req.Headers, |
|
|
|
|
instanceConfig, |
|
|
|
|
) |
|
|
|
|
if err != nil { |
|
|
|
|
b.log.Debug("error getting single datasource client", "error", err, "reqUid", req.UID) |
|
|
|
@ -269,6 +277,7 @@ func (b *QueryAPIBuilder) handleQuerySingleDatasource(ctx context.Context, req d |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
rsp, err := client.QueryData(ctx, *req.Request) |
|
|
|
|
|
|
|
|
|
if err == nil && rsp != nil { |
|
|
|
|
for _, q := range req.Request.Queries { |
|
|
|
|
if q.ResultAssertions != nil { |
|
|
|
@ -306,7 +315,7 @@ func buildErrorResponse(err error, req datasourceRequest) *backend.QueryDataResp |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// executeConcurrentQueries executes queries to multiple datasources concurrently and returns the aggregate result.
|
|
|
|
|
func (b *QueryAPIBuilder) executeConcurrentQueries(ctx context.Context, requests []datasourceRequest) (*backend.QueryDataResponse, error) { |
|
|
|
|
func (b *QueryAPIBuilder) executeConcurrentQueries(ctx context.Context, requests []datasourceRequest, instanceConfig clientapi.InstanceConfigurationSettings) (*backend.QueryDataResponse, error) { |
|
|
|
|
ctx, span := b.tracer.Start(ctx, "Query.executeConcurrentQueries") |
|
|
|
|
defer span.End() |
|
|
|
|
|
|
|
|
@ -337,7 +346,7 @@ func (b *QueryAPIBuilder) executeConcurrentQueries(ctx context.Context, requests |
|
|
|
|
g.Go(func() error { |
|
|
|
|
defer recoveryFn(req) |
|
|
|
|
|
|
|
|
|
dqr, err := b.handleQuerySingleDatasource(ctx, req) |
|
|
|
|
dqr, err := b.handleQuerySingleDatasource(ctx, req, instanceConfig) |
|
|
|
|
if err == nil { |
|
|
|
|
rchan <- dqr |
|
|
|
|
} else { |
|
|
|
|