From 42533dc9734b75476739362d8b8f6a711baa8685 Mon Sep 17 00:00:00 2001 From: Will Browne Date: Wed, 30 Mar 2022 12:46:31 +0200 Subject: [PATCH] Plugins: split manager.go into areas of responsibility (#46957) --- pkg/plugins/manager/client.go | 140 ++++++++++++++++++++++++++ pkg/plugins/manager/manager.go | 175 --------------------------------- pkg/plugins/manager/store.go | 44 +++++++++ 3 files changed, 184 insertions(+), 175 deletions(-) create mode 100644 pkg/plugins/manager/client.go diff --git a/pkg/plugins/manager/client.go b/pkg/plugins/manager/client.go new file mode 100644 index 00000000000..7783b95b9be --- /dev/null +++ b/pkg/plugins/manager/client.go @@ -0,0 +1,140 @@ +package manager + +import ( + "context" + "errors" + + "github.com/grafana/grafana-plugin-sdk-go/backend" + + "github.com/grafana/grafana/pkg/plugins/backendplugin" + "github.com/grafana/grafana/pkg/plugins/backendplugin/instrumentation" + "github.com/grafana/grafana/pkg/util/errutil" +) + +func (m *PluginManager) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) { + plugin, exists := m.plugin(req.PluginContext.PluginID) + if !exists { + return nil, backendplugin.ErrPluginNotRegistered + } + + var resp *backend.QueryDataResponse + err := instrumentation.InstrumentQueryDataRequest(req.PluginContext.PluginID, func() (innerErr error) { + resp, innerErr = plugin.QueryData(ctx, req) + return + }) + + if err != nil { + if errors.Is(err, backendplugin.ErrMethodNotImplemented) { + return nil, err + } + + if errors.Is(err, backendplugin.ErrPluginUnavailable) { + return nil, err + } + + return nil, errutil.Wrap("failed to query data", err) + } + + for refID, res := range resp.Responses { + // set frame ref ID based on response ref ID + for _, f := range res.Frames { + if f.RefID == "" { + f.RefID = refID + } + } + } + + return resp, err +} + +func (m *PluginManager) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error { + p, exists := m.plugin(req.PluginContext.PluginID) + if !exists { + return backendplugin.ErrPluginNotRegistered + } + + err := instrumentation.InstrumentCallResourceRequest(p.PluginID(), func() error { + if err := p.CallResource(ctx, req, sender); err != nil { + return err + } + return nil + }) + + if err != nil { + return err + } + + return nil +} + +func (m *PluginManager) CollectMetrics(ctx context.Context, req *backend.CollectMetricsRequest) (*backend.CollectMetricsResult, error) { + p, exists := m.plugin(req.PluginContext.PluginID) + if !exists { + return nil, backendplugin.ErrPluginNotRegistered + } + + var resp *backend.CollectMetricsResult + err := instrumentation.InstrumentCollectMetrics(p.PluginID(), func() (innerErr error) { + resp, innerErr = p.CollectMetrics(ctx, req) + return + }) + if err != nil { + return nil, err + } + + return resp, nil +} + +func (m *PluginManager) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) { + p, exists := m.plugin(req.PluginContext.PluginID) + if !exists { + return nil, backendplugin.ErrPluginNotRegistered + } + + var resp *backend.CheckHealthResult + err := instrumentation.InstrumentCheckHealthRequest(p.PluginID(), func() (innerErr error) { + resp, innerErr = p.CheckHealth(ctx, req) + return + }) + + if err != nil { + if errors.Is(err, backendplugin.ErrMethodNotImplemented) { + return nil, err + } + + if errors.Is(err, backendplugin.ErrPluginUnavailable) { + return nil, err + } + + return nil, errutil.Wrap("failed to check plugin health", backendplugin.ErrHealthCheckFailed) + } + + return resp, nil +} + +func (m *PluginManager) SubscribeStream(ctx context.Context, req *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) { + plugin, exists := m.plugin(req.PluginContext.PluginID) + if !exists { + return nil, backendplugin.ErrPluginNotRegistered + } + + return plugin.SubscribeStream(ctx, req) +} + +func (m *PluginManager) PublishStream(ctx context.Context, req *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) { + plugin, exists := m.plugin(req.PluginContext.PluginID) + if !exists { + return nil, backendplugin.ErrPluginNotRegistered + } + + return plugin.PublishStream(ctx, req) +} + +func (m *PluginManager) RunStream(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender) error { + plugin, exists := m.plugin(req.PluginContext.PluginID) + if !exists { + return backendplugin.ErrPluginNotRegistered + } + + return plugin.RunStream(ctx, req, sender) +} diff --git a/pkg/plugins/manager/manager.go b/pkg/plugins/manager/manager.go index d7792f2ff15..729a4962741 100644 --- a/pkg/plugins/manager/manager.go +++ b/pkg/plugins/manager/manager.go @@ -8,14 +8,11 @@ import ( "sync" "time" - "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/plugins" "github.com/grafana/grafana/pkg/plugins/backendplugin" - "github.com/grafana/grafana/pkg/plugins/backendplugin/instrumentation" "github.com/grafana/grafana/pkg/plugins/manager/installer" "github.com/grafana/grafana/pkg/setting" - "github.com/grafana/grafana/pkg/util/errutil" ) const ( @@ -82,32 +79,6 @@ func (m *PluginManager) Run(ctx context.Context) error { return ctx.Err() } -func (m *PluginManager) plugin(pluginID string) (*plugins.Plugin, bool) { - m.pluginsMu.RLock() - defer m.pluginsMu.RUnlock() - p, exists := m.store[pluginID] - - if !exists || (p.IsDecommissioned()) { - return nil, false - } - - return p, true -} - -func (m *PluginManager) plugins() []*plugins.Plugin { - m.pluginsMu.RLock() - defer m.pluginsMu.RUnlock() - - res := make([]*plugins.Plugin, 0) - for _, p := range m.store { - if !p.IsDecommissioned() { - res = append(res, p) - } - } - - return res -} - func (m *PluginManager) loadPlugins(ctx context.Context, class plugins.Class, paths ...string) error { if len(paths) == 0 { return nil @@ -135,15 +106,6 @@ func (m *PluginManager) loadPlugins(ctx context.Context, class plugins.Class, pa return nil } -func (m *PluginManager) registeredPlugins() map[string]struct{} { - pluginsByID := make(map[string]struct{}) - for _, p := range m.plugins() { - pluginsByID[p.ID] = struct{}{} - } - - return pluginsByID -} - func (m *PluginManager) Renderer() *plugins.Plugin { for _, p := range m.plugins() { if p.IsRenderer() { @@ -154,143 +116,6 @@ func (m *PluginManager) Renderer() *plugins.Plugin { return nil } -func (m *PluginManager) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) { - plugin, exists := m.plugin(req.PluginContext.PluginID) - if !exists { - return nil, backendplugin.ErrPluginNotRegistered - } - - var resp *backend.QueryDataResponse - err := instrumentation.InstrumentQueryDataRequest(req.PluginContext.PluginID, func() (innerErr error) { - resp, innerErr = plugin.QueryData(ctx, req) - return - }) - - if err != nil { - if errors.Is(err, backendplugin.ErrMethodNotImplemented) { - return nil, err - } - - if errors.Is(err, backendplugin.ErrPluginUnavailable) { - return nil, err - } - - return nil, errutil.Wrap("failed to query data", err) - } - - for refID, res := range resp.Responses { - // set frame ref ID based on response ref ID - for _, f := range res.Frames { - if f.RefID == "" { - f.RefID = refID - } - } - } - - return resp, err -} - -func (m *PluginManager) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error { - p, exists := m.plugin(req.PluginContext.PluginID) - if !exists { - return backendplugin.ErrPluginNotRegistered - } - - err := instrumentation.InstrumentCallResourceRequest(p.PluginID(), func() error { - if err := p.CallResource(ctx, req, sender); err != nil { - return err - } - return nil - }) - - if err != nil { - return err - } - - return nil -} - -func (m *PluginManager) CollectMetrics(ctx context.Context, req *backend.CollectMetricsRequest) (*backend.CollectMetricsResult, error) { - p, exists := m.plugin(req.PluginContext.PluginID) - if !exists { - return nil, backendplugin.ErrPluginNotRegistered - } - - var resp *backend.CollectMetricsResult - err := instrumentation.InstrumentCollectMetrics(p.PluginID(), func() (innerErr error) { - resp, innerErr = p.CollectMetrics(ctx, req) - return - }) - if err != nil { - return nil, err - } - - return resp, nil -} - -func (m *PluginManager) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) { - p, exists := m.plugin(req.PluginContext.PluginID) - if !exists { - return nil, backendplugin.ErrPluginNotRegistered - } - - var resp *backend.CheckHealthResult - err := instrumentation.InstrumentCheckHealthRequest(p.PluginID(), func() (innerErr error) { - resp, innerErr = p.CheckHealth(ctx, req) - return - }) - - if err != nil { - if errors.Is(err, backendplugin.ErrMethodNotImplemented) { - return nil, err - } - - if errors.Is(err, backendplugin.ErrPluginUnavailable) { - return nil, err - } - - return nil, errutil.Wrap("failed to check plugin health", backendplugin.ErrHealthCheckFailed) - } - - return resp, nil -} - -func (m *PluginManager) SubscribeStream(ctx context.Context, req *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) { - plugin, exists := m.plugin(req.PluginContext.PluginID) - if !exists { - return nil, backendplugin.ErrPluginNotRegistered - } - - return plugin.SubscribeStream(ctx, req) -} - -func (m *PluginManager) PublishStream(ctx context.Context, req *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) { - plugin, exists := m.plugin(req.PluginContext.PluginID) - if !exists { - return nil, backendplugin.ErrPluginNotRegistered - } - - return plugin.PublishStream(ctx, req) -} - -func (m *PluginManager) RunStream(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender) error { - plugin, exists := m.plugin(req.PluginContext.PluginID) - if !exists { - return backendplugin.ErrPluginNotRegistered - } - - return plugin.RunStream(ctx, req, sender) -} - -func (m *PluginManager) isRegistered(pluginID string) bool { - p, exists := m.plugin(pluginID) - if !exists { - return false - } - - return !p.IsDecommissioned() -} - func (m *PluginManager) Routes() []*plugins.StaticRoute { staticRoutes := make([]*plugins.StaticRoute, 0) diff --git a/pkg/plugins/manager/store.go b/pkg/plugins/manager/store.go index ebf50c48755..1656848735c 100644 --- a/pkg/plugins/manager/store.go +++ b/pkg/plugins/manager/store.go @@ -38,6 +38,50 @@ func (m *PluginManager) Plugins(_ context.Context, pluginTypes ...plugins.Type) return pluginsList } +func (m *PluginManager) plugin(pluginID string) (*plugins.Plugin, bool) { + m.pluginsMu.RLock() + defer m.pluginsMu.RUnlock() + p, exists := m.store[pluginID] + + if !exists || (p.IsDecommissioned()) { + return nil, false + } + + return p, true +} + +func (m *PluginManager) plugins() []*plugins.Plugin { + m.pluginsMu.RLock() + defer m.pluginsMu.RUnlock() + + res := make([]*plugins.Plugin, 0) + for _, p := range m.store { + if !p.IsDecommissioned() { + res = append(res, p) + } + } + + return res +} + +func (m *PluginManager) isRegistered(pluginID string) bool { + p, exists := m.plugin(pluginID) + if !exists { + return false + } + + return !p.IsDecommissioned() +} + +func (m *PluginManager) registeredPlugins() map[string]struct{} { + pluginsByID := make(map[string]struct{}) + for _, p := range m.plugins() { + pluginsByID[p.ID] = struct{}{} + } + + return pluginsByID +} + func (m *PluginManager) Add(ctx context.Context, pluginID, version string) error { var pluginZipURL string