The open and composable observability and data visualization platform. Visualize metrics, logs, and traces from multiple sources like Prometheus, Loki, Elasticsearch, InfluxDB, Postgres and many more.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
grafana/pkg/plugins/backendplugin/manager.go

321 lines
8.3 KiB

package backendplugin
import (
"context"
"errors"
"io"
"sync"
"time"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/util/proxyutil"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/registry"
plugin "github.com/hashicorp/go-plugin"
"golang.org/x/xerrors"
)
var (
// ErrPluginNotRegistered error returned when plugin not registered.
ErrPluginNotRegistered = errors.New("Plugin not registered")
// ErrDiagnosticsNotSupported error returned when plugin doesn't support diagnostics.
ErrDiagnosticsNotSupported = errors.New("Plugin diagnostics not supported")
// ErrHealthCheckFailed error returned when health check failed.
ErrHealthCheckFailed = errors.New("Health check failed")
)
func init() {
registry.Register(&registry.Descriptor{
Name: "BackendPluginManager",
Instance: &manager{},
InitPriority: registry.Low,
})
}
// Manager manages backend plugins.
type Manager interface {
// Register registers a backend plugin
Register(descriptor PluginDescriptor) error
// StartPlugin starts a non-managed backend plugin
StartPlugin(ctx context.Context, pluginID string) error
// CollectMetrics collects metrics from a registered backend plugin.
CollectMetrics(ctx context.Context, pluginID string) (*CollectMetricsResult, error)
// CheckHealth checks the health of a registered backend plugin.
CheckHealth(ctx context.Context, pluginConfig *PluginConfig) (*CheckHealthResult, error)
// CallResource calls a plugin resource.
CallResource(pluginConfig PluginConfig, ctx *models.ReqContext, path string)
}
type manager struct {
pluginsMu sync.RWMutex
plugins map[string]*BackendPlugin
logger log.Logger
}
func (m *manager) Init() error {
m.plugins = make(map[string]*BackendPlugin)
m.logger = log.New("plugins.backend")
return nil
}
func (m *manager) Run(ctx context.Context) error {
m.start(ctx)
<-ctx.Done()
m.stop()
return ctx.Err()
}
// Register registers a backend plugin
func (m *manager) Register(descriptor PluginDescriptor) error {
m.logger.Debug("Registering backend plugin", "pluginId", descriptor.pluginID, "executablePath", descriptor.executablePath)
m.pluginsMu.Lock()
defer m.pluginsMu.Unlock()
if _, exists := m.plugins[descriptor.pluginID]; exists {
return errors.New("Backend plugin already registered")
}
pluginLogger := m.logger.New("pluginId", descriptor.pluginID)
plugin := &BackendPlugin{
id: descriptor.pluginID,
executablePath: descriptor.executablePath,
managed: descriptor.managed,
clientFactory: func() *plugin.Client {
return plugin.NewClient(newClientConfig(descriptor.executablePath, pluginLogger, descriptor.versionedPlugins))
},
startFns: descriptor.startFns,
logger: pluginLogger,
}
m.plugins[descriptor.pluginID] = plugin
m.logger.Debug("Backend plugin registered", "pluginId", descriptor.pluginID, "executablePath", descriptor.executablePath)
return nil
}
// start starts all managed backend plugins
func (m *manager) start(ctx context.Context) {
m.pluginsMu.RLock()
defer m.pluginsMu.RUnlock()
for _, p := range m.plugins {
if !p.managed {
continue
}
if err := startPluginAndRestartKilledProcesses(ctx, p); err != nil {
p.logger.Error("Failed to start plugin", "error", err)
continue
}
}
}
// StartPlugin starts a non-managed backend plugin
func (m *manager) StartPlugin(ctx context.Context, pluginID string) error {
m.pluginsMu.RLock()
p, registered := m.plugins[pluginID]
m.pluginsMu.RUnlock()
if !registered {
return errors.New("Backend plugin not registered")
}
if p.managed {
return errors.New("Backend plugin is managed and cannot be manually started")
}
return startPluginAndRestartKilledProcesses(ctx, p)
}
// stop stops all managed backend plugins
func (m *manager) stop() {
m.pluginsMu.RLock()
defer m.pluginsMu.RUnlock()
for _, p := range m.plugins {
go func(p *BackendPlugin) {
p.logger.Debug("Stopping plugin")
if err := p.stop(); err != nil {
p.logger.Error("Failed to stop plugin", "error", err)
}
p.logger.Debug("Plugin stopped")
}(p)
}
}
// CollectMetrics collects metrics from a registered backend plugin.
func (m *manager) CollectMetrics(ctx context.Context, pluginID string) (*CollectMetricsResult, error) {
m.pluginsMu.RLock()
p, registered := m.plugins[pluginID]
m.pluginsMu.RUnlock()
if !registered {
return nil, ErrPluginNotRegistered
}
if !p.supportsDiagnostics() {
return nil, ErrDiagnosticsNotSupported
}
res, err := p.CollectMetrics(ctx)
if err != nil {
return nil, err
}
return collectMetricsResultFromProto(res), nil
}
// CheckHealth checks the health of a registered backend plugin.
func (m *manager) CheckHealth(ctx context.Context, pluginConfig *PluginConfig) (*CheckHealthResult, error) {
m.pluginsMu.RLock()
p, registered := m.plugins[pluginConfig.PluginID]
m.pluginsMu.RUnlock()
if !registered {
return nil, ErrPluginNotRegistered
}
if !p.supportsDiagnostics() {
return nil, ErrDiagnosticsNotSupported
}
res, err := p.checkHealth(ctx, pluginConfig)
if err != nil {
p.logger.Error("Failed to check plugin health", "error", err)
return nil, ErrHealthCheckFailed
}
return checkHealthResultFromProto(res), nil
}
// CallResource calls a plugin resource.
func (m *manager) CallResource(config PluginConfig, c *models.ReqContext, path string) {
m.pluginsMu.RLock()
p, registered := m.plugins[config.PluginID]
m.pluginsMu.RUnlock()
if !registered {
c.JsonApiErr(404, "Plugin not registered", nil)
return
}
clonedReq := c.Req.Clone(c.Req.Context())
keepCookieNames := []string{}
if config.JSONData != nil {
if keepCookies := config.JSONData.Get("keepCookies"); keepCookies != nil {
keepCookieNames = keepCookies.MustStringArray()
}
}
proxyutil.ClearCookieHeader(clonedReq, keepCookieNames)
proxyutil.PrepareProxyRequest(clonedReq)
body, err := c.Req.Body().Bytes()
if err != nil {
c.JsonApiErr(500, "Failed to read request body", err)
return
}
req := CallResourceRequest{
Config: config,
Path: path,
Method: clonedReq.Method,
URL: clonedReq.URL.String(),
Headers: clonedReq.Header,
Body: body,
User: c.SignedInUser,
}
stream, err := p.callResource(clonedReq.Context(), req)
if err != nil {
c.JsonApiErr(500, "Failed to call resource", err)
return
}
processedStreams := 0
for {
resp, err := stream.Recv()
if err == io.EOF {
if processedStreams == 0 {
c.JsonApiErr(500, "Received empty resource response ", nil)
}
return
}
if err != nil {
if processedStreams == 0 {
c.JsonApiErr(500, "Failed to receive response from resource call", err)
} else {
p.logger.Error("Failed to receive response from resource call", "error", err)
}
return
}
// Expected that headers and status are only part of first stream
if processedStreams == 0 {
// Make sure a content type always is returned in response
if _, exists := resp.Headers["Content-Type"]; !exists {
resp.Headers["Content-Type"] = []string{"application/json"}
}
for k, values := range resp.Headers {
// Due to security reasons we don't want to forward
// cookies from a backend plugin to clients/browsers.
if k == "Set-Cookie" {
continue
}
for _, v := range values {
c.Resp.Header().Add(k, v)
}
}
c.WriteHeader(resp.Status)
}
if _, err := c.Write(resp.Body); err != nil {
p.logger.Error("Failed to write resource response", "error", err)
}
c.Resp.Flush()
processedStreams++
}
}
func startPluginAndRestartKilledProcesses(ctx context.Context, p *BackendPlugin) error {
if err := p.start(ctx); err != nil {
return err
}
go func(ctx context.Context, p *BackendPlugin) {
if err := restartKilledProcess(ctx, p); err != nil {
p.logger.Error("Attempt to restart killed plugin process failed", "error", err)
}
}(ctx, p)
return nil
}
func restartKilledProcess(ctx context.Context, p *BackendPlugin) error {
ticker := time.NewTicker(time.Second * 1)
for {
select {
case <-ctx.Done():
if err := ctx.Err(); err != nil && !xerrors.Is(err, context.Canceled) {
return err
}
return nil
case <-ticker.C:
if !p.client.Exited() {
continue
}
p.logger.Debug("Restarting plugin")
if err := p.start(ctx); err != nil {
p.logger.Error("Failed to restart plugin", "error", err)
continue
}
p.logger.Debug("Plugin restarted")
}
}
}