The open and composable observability and data visualization platform. Visualize metrics, logs, and traces from multiple sources like Prometheus, Loki, Elasticsearch, InfluxDB, Postgres and many more.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
grafana/pkg/plugins/backendplugin/backend_plugin.go

433 lines
12 KiB

package backendplugin
import (
"bytes"
"context"
"errors"
"fmt"
"net/http"
"time"
"github.com/grafana/grafana-plugin-sdk-go/genproto/pluginv2"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/expfmt"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
datasourceV1 "github.com/grafana/grafana-plugin-model/go/datasource"
rendererV1 "github.com/grafana/grafana-plugin-model/go/renderer"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/plugins/backendplugin/collector"
"github.com/grafana/grafana/pkg/util/errutil"
plugin "github.com/hashicorp/go-plugin"
dto "github.com/prometheus/client_model/go"
)
// BackendPlugin a registered backend plugin.
type BackendPlugin struct {
id string
executablePath string
managed bool
clientFactory func() *plugin.Client
client *plugin.Client
logger log.Logger
startFns PluginStartFuncs
diagnostics DiagnosticsPlugin
resource ResourcePlugin
}
func (p *BackendPlugin) start(ctx context.Context) error {
p.client = p.clientFactory()
rpcClient, err := p.client.Client()
if err != nil {
return err
}
var legacyClient *LegacyClient
var client *Client
if p.client.NegotiatedVersion() > 1 {
rawDiagnostics, err := rpcClient.Dispense("diagnostics")
if err != nil {
return err
}
rawResource, err := rpcClient.Dispense("resource")
if err != nil {
return err
}
rawData, err := rpcClient.Dispense("data")
if err != nil {
return err
}
rawTransform, err := rpcClient.Dispense("transform")
if err != nil {
return err
}
if rawDiagnostics != nil {
if plugin, ok := rawDiagnostics.(DiagnosticsPlugin); ok {
p.diagnostics = plugin
}
}
client = &Client{}
if rawResource != nil {
if plugin, ok := rawResource.(ResourcePlugin); ok {
p.resource = plugin
client.ResourcePlugin = plugin
}
}
if rawData != nil {
if plugin, ok := rawData.(DataPlugin); ok {
client.DataPlugin = plugin
}
}
if rawTransform != nil {
if plugin, ok := rawTransform.(TransformPlugin); ok {
client.TransformPlugin = plugin
}
}
} else {
raw, err := rpcClient.Dispense(p.id)
if err != nil {
return err
}
legacyClient = &LegacyClient{}
if plugin, ok := raw.(datasourceV1.DatasourcePlugin); ok {
legacyClient.DatasourcePlugin = plugin
}
if plugin, ok := raw.(rendererV1.RendererPlugin); ok {
legacyClient.RendererPlugin = plugin
}
}
if legacyClient == nil && client == nil {
return errors.New("no compatible plugin implementation found")
}
if legacyClient != nil && p.startFns.OnLegacyStart != nil {
if err := p.startFns.OnLegacyStart(p.id, legacyClient, p.logger); err != nil {
return err
}
}
if client != nil && p.startFns.OnStart != nil {
if err := p.startFns.OnStart(p.id, client, p.logger); err != nil {
return err
}
}
return nil
}
func (p *BackendPlugin) stop() error {
if p.client != nil {
p.client.Kill()
}
return nil
}
// supportsDiagnostics return whether backend plugin supports diagnostics like metrics and health check.
func (p *BackendPlugin) supportsDiagnostics() bool {
return p.diagnostics != nil
}
// CollectMetrics implements the collector.Collector interface.
func (p *BackendPlugin) CollectMetrics(ctx context.Context, ch chan<- prometheus.Metric) error {
if p.diagnostics == nil {
return nil
}
if p.client == nil || p.client.Exited() {
return nil
}
res, err := p.diagnostics.CollectMetrics(ctx, &pluginv2.CollectMetricsRequest{})
if err != nil {
if st, ok := status.FromError(err); ok {
if st.Code() == codes.Unimplemented {
return nil
}
}
return err
}
if res == nil || res.Metrics == nil || res.Metrics.Prometheus == nil {
return nil
}
reader := bytes.NewReader(res.Metrics.Prometheus)
var parser expfmt.TextParser
families, err := parser.TextToMetricFamilies(reader)
if err != nil {
return errutil.Wrap("failed to parse collected metrics", err)
}
for _, mf := range families {
if mf.Help == nil {
help := fmt.Sprintf("Metric read from %s plugin", p.id)
mf.Help = &help
}
}
for _, mf := range families {
convertMetricFamily(p.id, mf, ch, p.logger)
}
return nil
}
func (p *BackendPlugin) checkHealth(ctx context.Context, config *PluginConfig) (*pluginv2.CheckHealthResponse, error) {
if p.diagnostics == nil || p.client == nil || p.client.Exited() {
return &pluginv2.CheckHealthResponse{
Status: pluginv2.CheckHealthResponse_UNKNOWN,
}, nil
}
jsonDataBytes, err := config.JSONData.ToDB()
if err != nil {
return nil, err
}
pconfig := &pluginv2.PluginConfig{
OrgId: config.OrgID,
PluginId: config.PluginID,
JsonData: jsonDataBytes,
DecryptedSecureJsonData: config.DecryptedSecureJSONData,
LastUpdatedMS: config.Updated.UnixNano() / int64(time.Millisecond),
}
if config.DataSourceConfig != nil {
datasourceJSONData, err := config.DataSourceConfig.JSONData.ToDB()
if err != nil {
return nil, err
}
pconfig.DatasourceConfig = &pluginv2.DataSourceConfig{
Id: config.DataSourceConfig.ID,
Name: config.DataSourceConfig.Name,
Url: config.DataSourceConfig.URL,
User: config.DataSourceConfig.User,
Database: config.DataSourceConfig.Database,
BasicAuthEnabled: config.DataSourceConfig.BasicAuthEnabled,
BasicAuthUser: config.DataSourceConfig.BasicAuthUser,
JsonData: datasourceJSONData,
DecryptedSecureJsonData: config.DataSourceConfig.DecryptedSecureJSONData,
LastUpdatedMS: config.DataSourceConfig.Updated.Unix() / int64(time.Millisecond),
}
}
res, err := p.diagnostics.CheckHealth(ctx, &pluginv2.CheckHealthRequest{Config: pconfig})
if err != nil {
if st, ok := status.FromError(err); ok {
if st.Code() == codes.Unimplemented {
return &pluginv2.CheckHealthResponse{
Status: pluginv2.CheckHealthResponse_UNKNOWN,
Message: "Health check not implemented",
}, nil
}
}
return nil, err
}
return res, nil
}
func (p *BackendPlugin) callResource(ctx context.Context, req CallResourceRequest) (callResourceResultStream, error) {
p.logger.Debug("Calling resource", "path", req.Path, "method", req.Method)
if p.resource == nil || p.client == nil || p.client.Exited() {
return nil, errors.New("plugin not running, cannot call resource")
}
reqHeaders := map[string]*pluginv2.StringList{}
for k, v := range req.Headers {
reqHeaders[k] = &pluginv2.StringList{Values: v}
}
jsonDataBytes, err := req.Config.JSONData.ToDB()
if err != nil {
return nil, err
}
protoReq := &pluginv2.CallResourceRequest{
Config: &pluginv2.PluginConfig{
OrgId: req.Config.OrgID,
PluginId: req.Config.PluginID,
JsonData: jsonDataBytes,
DecryptedSecureJsonData: req.Config.DecryptedSecureJSONData,
LastUpdatedMS: req.Config.Updated.UnixNano() / int64(time.Millisecond),
},
Path: req.Path,
Method: req.Method,
Url: req.URL,
Headers: reqHeaders,
Body: req.Body,
}
if req.User != nil {
protoReq.User = &pluginv2.User{
Name: req.User.Name,
Login: req.User.Login,
Email: req.User.Email,
Role: string(req.User.OrgRole),
}
}
if req.Config.DataSourceConfig != nil {
datasourceJSONData, err := req.Config.DataSourceConfig.JSONData.ToDB()
if err != nil {
return nil, err
}
protoReq.Config.DatasourceConfig = &pluginv2.DataSourceConfig{
Id: req.Config.DataSourceConfig.ID,
Name: req.Config.DataSourceConfig.Name,
Url: req.Config.DataSourceConfig.URL,
Database: req.Config.DataSourceConfig.Database,
User: req.Config.DataSourceConfig.User,
BasicAuthEnabled: req.Config.DataSourceConfig.BasicAuthEnabled,
BasicAuthUser: req.Config.DataSourceConfig.BasicAuthUser,
JsonData: datasourceJSONData,
DecryptedSecureJsonData: req.Config.DataSourceConfig.DecryptedSecureJSONData,
LastUpdatedMS: req.Config.DataSourceConfig.Updated.UnixNano() / int64(time.Millisecond),
}
}
protoStream, err := p.resource.CallResource(ctx, protoReq)
if err != nil {
if st, ok := status.FromError(err); ok {
if st.Code() == codes.Unimplemented {
return &singleCallResourceResult{
result: &CallResourceResult{
Status: http.StatusNotImplemented,
},
}, nil
}
}
return nil, errutil.Wrap("Failed to call resource", err)
}
return &callResourceResultStreamImpl{
stream: protoStream,
}, nil
}
// convertMetricFamily converts metric family to prometheus.Metric.
// Copied from https://github.com/prometheus/node_exporter/blob/3ddc82c2d8d11eec53ed5faa8db969a1bb81f8bb/collector/textfile.go#L66-L165
func convertMetricFamily(pluginID string, metricFamily *dto.MetricFamily, ch chan<- prometheus.Metric, logger log.Logger) {
var valType prometheus.ValueType
var val float64
allLabelNames := map[string]struct{}{}
for _, metric := range metricFamily.Metric {
labels := metric.GetLabel()
for _, label := range labels {
if _, ok := allLabelNames[label.GetName()]; !ok {
allLabelNames[label.GetName()] = struct{}{}
}
}
}
for _, metric := range metricFamily.Metric {
if metric.TimestampMs != nil {
logger.Warn("Ignoring unsupported custom timestamp on metric", "metric", metric)
}
labels := metric.GetLabel()
var names []string
var values []string
for _, label := range labels {
names = append(names, label.GetName())
values = append(values, label.GetValue())
}
names = append(names, "plugin_id")
values = append(values, pluginID)
for k := range allLabelNames {
present := false
for _, name := range names {
if k == name {
present = true
break
}
}
if !present {
names = append(names, k)
values = append(values, "")
}
}
metricName := prometheus.BuildFQName(collector.Namespace, "", *metricFamily.Name)
metricType := metricFamily.GetType()
switch metricType {
case dto.MetricType_COUNTER:
valType = prometheus.CounterValue
val = metric.Counter.GetValue()
case dto.MetricType_GAUGE:
valType = prometheus.GaugeValue
val = metric.Gauge.GetValue()
case dto.MetricType_UNTYPED:
valType = prometheus.UntypedValue
val = metric.Untyped.GetValue()
case dto.MetricType_SUMMARY:
quantiles := map[float64]float64{}
for _, q := range metric.Summary.Quantile {
quantiles[q.GetQuantile()] = q.GetValue()
}
ch <- prometheus.MustNewConstSummary(
prometheus.NewDesc(
metricName,
metricFamily.GetHelp(),
names, nil,
),
metric.Summary.GetSampleCount(),
metric.Summary.GetSampleSum(),
quantiles, values...,
)
case dto.MetricType_HISTOGRAM:
buckets := map[float64]uint64{}
for _, b := range metric.Histogram.Bucket {
buckets[b.GetUpperBound()] = b.GetCumulativeCount()
}
ch <- prometheus.MustNewConstHistogram(
prometheus.NewDesc(
metricName,
metricFamily.GetHelp(),
names, nil,
),
metric.Histogram.GetSampleCount(),
metric.Histogram.GetSampleSum(),
buckets, values...,
)
default:
logger.Error("unknown metric type", "type", metricType)
continue
}
if metricType == dto.MetricType_GAUGE || metricType == dto.MetricType_COUNTER || metricType == dto.MetricType_UNTYPED {
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
metricName,
metricFamily.GetHelp(),
names, nil,
),
valType, val, values...,
)
}
}
}