diff --git a/pkg/api/fakes.go b/pkg/api/fakes.go index a1576267e00..6a424d90925 100644 --- a/pkg/api/fakes.go +++ b/pkg/api/fakes.go @@ -7,6 +7,8 @@ import ( ) type fakePluginManager struct { + plugins.Manager + plugins map[string]fakePlugin } diff --git a/pkg/cmd/grafana-cli/runner/wire.go b/pkg/cmd/grafana-cli/runner/wire.go index 56e5e497553..149a24ec9ad 100644 --- a/pkg/cmd/grafana-cli/runner/wire.go +++ b/pkg/cmd/grafana-cli/runner/wire.go @@ -34,8 +34,12 @@ import ( "github.com/grafana/grafana/pkg/plugins" "github.com/grafana/grafana/pkg/plugins/backendplugin/coreplugin" "github.com/grafana/grafana/pkg/plugins/manager" + "github.com/grafana/grafana/pkg/plugins/manager/client" + pluginDashboards "github.com/grafana/grafana/pkg/plugins/manager/dashboards" "github.com/grafana/grafana/pkg/plugins/manager/loader" + processManager "github.com/grafana/grafana/pkg/plugins/manager/process" "github.com/grafana/grafana/pkg/plugins/manager/registry" + managerStore "github.com/grafana/grafana/pkg/plugins/manager/store" "github.com/grafana/grafana/pkg/plugins/plugincontext" "github.com/grafana/grafana/pkg/plugins/repo" "github.com/grafana/grafana/pkg/services/accesscontrol" @@ -176,12 +180,17 @@ var wireSet = wire.NewSet( wire.Bind(new(repo.Service), new(*repo.Manager)), manager.ProvideService, wire.Bind(new(plugins.Manager), new(*manager.PluginManager)), - wire.Bind(new(plugins.Client), new(*manager.PluginManager)), - wire.Bind(new(plugins.Store), new(*manager.PluginManager)), - wire.Bind(new(plugins.DashboardFileStore), new(*manager.PluginManager)), - wire.Bind(new(plugins.StaticRouteResolver), new(*manager.PluginManager)), - wire.Bind(new(plugins.RendererManager), new(*manager.PluginManager)), - wire.Bind(new(plugins.SecretsPluginManager), new(*manager.PluginManager)), + client.ProvideService, + wire.Bind(new(plugins.Client), new(*client.Service)), + managerStore.ProvideService, + wire.Bind(new(plugins.Store), new(*managerStore.Service)), + wire.Bind(new(plugins.RendererManager), new(*managerStore.Service)), + wire.Bind(new(plugins.SecretsPluginManager), new(*managerStore.Service)), + wire.Bind(new(plugins.StaticRouteResolver), new(*managerStore.Service)), + pluginDashboards.ProvideFileStoreManager, + wire.Bind(new(pluginDashboards.FileStore), new(*pluginDashboards.FileStoreManager)), + processManager.ProvideService, + wire.Bind(new(processManager.Service), new(*processManager.Manager)), coreplugin.ProvideCoreRegistry, loader.ProvideService, wire.Bind(new(loader.Service), new(*loader.Loader)), diff --git a/pkg/plugins/ifaces.go b/pkg/plugins/ifaces.go index 6a1f158777c..c26c4419a3e 100644 --- a/pkg/plugins/ifaces.go +++ b/pkg/plugins/ifaces.go @@ -2,7 +2,6 @@ package plugins import ( "context" - "io" "github.com/grafana/grafana-plugin-sdk-go/backend" @@ -24,6 +23,11 @@ type Manager interface { Remove(ctx context.Context, pluginID string) error } +type PluginSource struct { + Class Class + Paths []string +} + type CompatOpts struct { GrafanaVersion string OS string @@ -70,33 +74,3 @@ type PluginLoaderAuthorizer interface { // CanLoadPlugin confirms if a plugin is authorized to load CanLoadPlugin(plugin *Plugin) bool } - -// ListPluginDashboardFilesArgs list plugin dashboard files argument model. -type ListPluginDashboardFilesArgs struct { - PluginID string -} - -// GetPluginDashboardFilesArgs list plugin dashboard files result model. -type ListPluginDashboardFilesResult struct { - FileReferences []string -} - -// GetPluginDashboardFileContentsArgs get plugin dashboard file content argument model. -type GetPluginDashboardFileContentsArgs struct { - PluginID string - FileReference string -} - -// GetPluginDashboardFileContentsResult get plugin dashboard file content result model. -type GetPluginDashboardFileContentsResult struct { - Content io.ReadCloser -} - -// DashboardFileStore is the interface for plugin dashboard file storage. -type DashboardFileStore interface { - // ListPluginDashboardFiles lists plugin dashboard files. - ListPluginDashboardFiles(ctx context.Context, args *ListPluginDashboardFilesArgs) (*ListPluginDashboardFilesResult, error) - - // GetPluginDashboardFileContents gets the referenced plugin dashboard file content. - GetPluginDashboardFileContents(ctx context.Context, args *GetPluginDashboardFileContentsArgs) (*GetPluginDashboardFileContentsResult, error) -} diff --git a/pkg/plugins/manager/client.go b/pkg/plugins/manager/client/client.go similarity index 55% rename from pkg/plugins/manager/client.go rename to pkg/plugins/manager/client/client.go index cd1d853b5b0..4c9aaf4fb0e 100644 --- a/pkg/plugins/manager/client.go +++ b/pkg/plugins/manager/client/client.go @@ -1,4 +1,4 @@ -package manager +package client import ( "context" @@ -7,12 +7,26 @@ import ( "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana/pkg/plugins" "github.com/grafana/grafana/pkg/plugins/backendplugin" "github.com/grafana/grafana/pkg/plugins/backendplugin/instrumentation" + "github.com/grafana/grafana/pkg/plugins/manager/registry" ) -func (m *PluginManager) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) { - plugin, exists := m.plugin(ctx, req.PluginContext.PluginID) +var _ plugins.Client = (*Service)(nil) + +type Service struct { + pluginRegistry registry.Service +} + +func ProvideService(pluginRegistry registry.Service) *Service { + return &Service{ + pluginRegistry: pluginRegistry, + } +} + +func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) { + plugin, exists := s.plugin(ctx, req.PluginContext.PluginID) if !exists { return nil, backendplugin.ErrPluginNotRegistered } @@ -47,8 +61,8 @@ func (m *PluginManager) QueryData(ctx context.Context, req *backend.QueryDataReq return resp, err } -func (m *PluginManager) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error { - p, exists := m.plugin(ctx, req.PluginContext.PluginID) +func (s *Service) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error { + p, exists := s.plugin(ctx, req.PluginContext.PluginID) if !exists { return backendplugin.ErrPluginNotRegistered } @@ -66,8 +80,8 @@ func (m *PluginManager) CallResource(ctx context.Context, req *backend.CallResou return nil } -func (m *PluginManager) CollectMetrics(ctx context.Context, req *backend.CollectMetricsRequest) (*backend.CollectMetricsResult, error) { - p, exists := m.plugin(ctx, req.PluginContext.PluginID) +func (s *Service) CollectMetrics(ctx context.Context, req *backend.CollectMetricsRequest) (*backend.CollectMetricsResult, error) { + p, exists := s.plugin(ctx, req.PluginContext.PluginID) if !exists { return nil, backendplugin.ErrPluginNotRegistered } @@ -84,8 +98,8 @@ func (m *PluginManager) CollectMetrics(ctx context.Context, req *backend.Collect return resp, nil } -func (m *PluginManager) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) { - p, exists := m.plugin(ctx, req.PluginContext.PluginID) +func (s *Service) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) { + p, exists := s.plugin(ctx, req.PluginContext.PluginID) if !exists { return nil, backendplugin.ErrPluginNotRegistered } @@ -111,8 +125,8 @@ func (m *PluginManager) CheckHealth(ctx context.Context, req *backend.CheckHealt return resp, nil } -func (m *PluginManager) SubscribeStream(ctx context.Context, req *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) { - plugin, exists := m.plugin(ctx, req.PluginContext.PluginID) +func (s *Service) SubscribeStream(ctx context.Context, req *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) { + plugin, exists := s.plugin(ctx, req.PluginContext.PluginID) if !exists { return nil, backendplugin.ErrPluginNotRegistered } @@ -120,8 +134,8 @@ func (m *PluginManager) SubscribeStream(ctx context.Context, req *backend.Subscr return plugin.SubscribeStream(ctx, req) } -func (m *PluginManager) PublishStream(ctx context.Context, req *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) { - plugin, exists := m.plugin(ctx, req.PluginContext.PluginID) +func (s *Service) PublishStream(ctx context.Context, req *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) { + plugin, exists := s.plugin(ctx, req.PluginContext.PluginID) if !exists { return nil, backendplugin.ErrPluginNotRegistered } @@ -129,11 +143,25 @@ func (m *PluginManager) PublishStream(ctx context.Context, req *backend.PublishS return plugin.PublishStream(ctx, req) } -func (m *PluginManager) RunStream(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender) error { - plugin, exists := m.plugin(ctx, req.PluginContext.PluginID) +func (s *Service) RunStream(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender) error { + plugin, exists := s.plugin(ctx, req.PluginContext.PluginID) if !exists { return backendplugin.ErrPluginNotRegistered } return plugin.RunStream(ctx, req, sender) } + +// plugin finds a plugin with `pluginID` from the registry that is not decommissioned +func (s *Service) plugin(ctx context.Context, pluginID string) (*plugins.Plugin, bool) { + p, exists := s.pluginRegistry.Plugin(ctx, pluginID) + if !exists { + return nil, false + } + + if p.IsDecommissioned() { + return nil, false + } + + return p, true +} diff --git a/pkg/plugins/manager/dashboard_file_store.go b/pkg/plugins/manager/dashboards/filestore.go similarity index 69% rename from pkg/plugins/manager/dashboard_file_store.go rename to pkg/plugins/manager/dashboards/filestore.go index 009bd0a2aa8..cb37225f043 100644 --- a/pkg/plugins/manager/dashboard_file_store.go +++ b/pkg/plugins/manager/dashboards/filestore.go @@ -1,4 +1,4 @@ -package manager +package dashboards import ( "context" @@ -12,13 +12,25 @@ import ( "github.com/grafana/grafana/pkg/util" ) +var _ FileStore = (*FileStoreManager)(nil) + +type FileStoreManager struct { + pluginStore plugins.Store +} + +func ProvideFileStoreManager(pluginStore plugins.Store) *FileStoreManager { + return &FileStoreManager{ + pluginStore: pluginStore, + } +} + var openDashboardFile = func(name string) (fs.File, error) { // Wrapping in filepath.Clean to properly handle // gosec G304 Potential file inclusion via variable rule. return os.Open(filepath.Clean(name)) } -func (m *PluginManager) ListPluginDashboardFiles(ctx context.Context, args *plugins.ListPluginDashboardFilesArgs) (*plugins.ListPluginDashboardFilesResult, error) { +func (m *FileStoreManager) ListPluginDashboardFiles(ctx context.Context, args *ListPluginDashboardFilesArgs) (*ListPluginDashboardFilesResult, error) { if args == nil { return nil, fmt.Errorf("args cannot be nil") } @@ -27,7 +39,7 @@ func (m *PluginManager) ListPluginDashboardFiles(ctx context.Context, args *plug return nil, fmt.Errorf("args.PluginID cannot be empty") } - plugin, exists := m.Plugin(ctx, args.PluginID) + plugin, exists := m.pluginStore.Plugin(ctx, args.PluginID) if !exists { return nil, plugins.NotFoundError{PluginID: args.PluginID} } @@ -37,12 +49,12 @@ func (m *PluginManager) ListPluginDashboardFiles(ctx context.Context, args *plug references = append(references, include.Path) } - return &plugins.ListPluginDashboardFilesResult{ + return &ListPluginDashboardFilesResult{ FileReferences: references, }, nil } -func (m *PluginManager) GetPluginDashboardFileContents(ctx context.Context, args *plugins.GetPluginDashboardFileContentsArgs) (*plugins.GetPluginDashboardFileContentsResult, error) { +func (m *FileStoreManager) GetPluginDashboardFileContents(ctx context.Context, args *GetPluginDashboardFileContentsArgs) (*GetPluginDashboardFileContentsResult, error) { if args == nil { return nil, fmt.Errorf("args cannot be nil") } @@ -55,7 +67,7 @@ func (m *PluginManager) GetPluginDashboardFileContents(ctx context.Context, args return nil, fmt.Errorf("args.FileReference cannot be empty") } - plugin, exists := m.Plugin(ctx, args.PluginID) + plugin, exists := m.pluginStore.Plugin(ctx, args.PluginID) if !exists { return nil, plugins.NotFoundError{PluginID: args.PluginID} } @@ -84,9 +96,7 @@ func (m *PluginManager) GetPluginDashboardFileContents(ctx context.Context, args return nil, err } - return &plugins.GetPluginDashboardFileContentsResult{ + return &GetPluginDashboardFileContentsResult{ Content: file, }, nil } - -var _ plugins.DashboardFileStore = &PluginManager{} diff --git a/pkg/plugins/manager/dashboard_file_store_test.go b/pkg/plugins/manager/dashboards/filestore_test.go similarity index 81% rename from pkg/plugins/manager/dashboard_file_store_test.go rename to pkg/plugins/manager/dashboards/filestore_test.go index 44b5036290b..ab3834c3b21 100644 --- a/pkg/plugins/manager/dashboard_file_store_test.go +++ b/pkg/plugins/manager/dashboards/filestore_test.go @@ -1,4 +1,4 @@ -package manager +package dashboards import ( "context" @@ -18,18 +18,18 @@ func TestDashboardFileStore(t *testing.T) { t.Run("ListPluginDashboardFiles", func(t *testing.T) { testCases := []struct { name string - args *plugins.ListPluginDashboardFilesArgs + args *ListPluginDashboardFilesArgs }{ { name: "nil args should return error", }, { name: "empty args.PluginID should return error", - args: &plugins.ListPluginDashboardFilesArgs{}, + args: &ListPluginDashboardFilesArgs{}, }, { name: "args.PluginID with only space should return error", - args: &plugins.ListPluginDashboardFilesArgs{PluginID: " \t "}, + args: &ListPluginDashboardFilesArgs{PluginID: " \t "}, }, } @@ -45,28 +45,28 @@ func TestDashboardFileStore(t *testing.T) { t.Run("GetPluginDashboardFileContents", func(t *testing.T) { testCases := []struct { name string - args *plugins.GetPluginDashboardFileContentsArgs + args *GetPluginDashboardFileContentsArgs }{ { name: "nil args should return error", }, { name: "empty args.PluginID should return error", - args: &plugins.GetPluginDashboardFileContentsArgs{}, + args: &GetPluginDashboardFileContentsArgs{}, }, { name: "args.PluginID with only space should return error", - args: &plugins.GetPluginDashboardFileContentsArgs{PluginID: " "}, + args: &GetPluginDashboardFileContentsArgs{PluginID: " "}, }, { name: "empty args.FileReference should return error", - args: &plugins.GetPluginDashboardFileContentsArgs{ + args: &GetPluginDashboardFileContentsArgs{ PluginID: "pluginWithDashboards", }, }, { name: "args.FileReference with only space should return error", - args: &plugins.GetPluginDashboardFileContentsArgs{ + args: &GetPluginDashboardFileContentsArgs{ PluginID: "pluginWithDashboard", FileReference: " \t", }, @@ -85,7 +85,7 @@ func TestDashboardFileStore(t *testing.T) { t.Run("Plugin without dashboards", func(t *testing.T) { t.Run("Should return zero file references", func(t *testing.T) { - res, err := m.ListPluginDashboardFiles(context.Background(), &plugins.ListPluginDashboardFilesArgs{ + res, err := m.ListPluginDashboardFiles(context.Background(), &ListPluginDashboardFilesArgs{ PluginID: "pluginWithoutDashboards", }) require.NoError(t, err) @@ -94,7 +94,7 @@ func TestDashboardFileStore(t *testing.T) { }) t.Run("Should return file not found error when trying to get non-existing plugin dashboard file content", func(t *testing.T) { - res, err := m.GetPluginDashboardFileContents(context.Background(), &plugins.GetPluginDashboardFileContentsArgs{ + res, err := m.GetPluginDashboardFileContents(context.Background(), &GetPluginDashboardFileContentsArgs{ PluginID: "pluginWithoutDashboards", FileReference: "dashboards/dash2.json", }) @@ -106,7 +106,7 @@ func TestDashboardFileStore(t *testing.T) { t.Run("Plugin with dashboards", func(t *testing.T) { t.Run("Should return two file references", func(t *testing.T) { - res, err := m.ListPluginDashboardFiles(context.Background(), &plugins.ListPluginDashboardFilesArgs{ + res, err := m.ListPluginDashboardFiles(context.Background(), &ListPluginDashboardFilesArgs{ PluginID: "pluginWithDashboards", }) require.NoError(t, err) @@ -136,7 +136,7 @@ func TestDashboardFileStore(t *testing.T) { }) t.Run("Should return file not found error when trying to get non-existing plugin dashboard file content", func(t *testing.T) { - res, err := m.GetPluginDashboardFileContents(context.Background(), &plugins.GetPluginDashboardFileContentsArgs{ + res, err := m.GetPluginDashboardFileContents(context.Background(), &GetPluginDashboardFileContentsArgs{ PluginID: "pluginWithDashboards", FileReference: "dashboards/dash3.json", }) @@ -146,7 +146,7 @@ func TestDashboardFileStore(t *testing.T) { }) t.Run("Should return file content for dashboards/dash1.json", func(t *testing.T) { - res, err := m.GetPluginDashboardFileContents(context.Background(), &plugins.GetPluginDashboardFileContentsArgs{ + res, err := m.GetPluginDashboardFileContents(context.Background(), &GetPluginDashboardFileContentsArgs{ PluginID: "pluginWithDashboards", FileReference: "dashboards/dash1.json", }) @@ -160,7 +160,7 @@ func TestDashboardFileStore(t *testing.T) { }) t.Run("Should return file content for dashboards/dash2.json", func(t *testing.T) { - res, err := m.GetPluginDashboardFileContents(context.Background(), &plugins.GetPluginDashboardFileContentsArgs{ + res, err := m.GetPluginDashboardFileContents(context.Background(), &GetPluginDashboardFileContentsArgs{ PluginID: "pluginWithDashboards", FileReference: "dashboards/dash2.json", }) @@ -174,7 +174,7 @@ func TestDashboardFileStore(t *testing.T) { }) t.Run("Should return error when trying to read relative file", func(t *testing.T) { - res, err := m.GetPluginDashboardFileContents(context.Background(), &plugins.GetPluginDashboardFileContentsArgs{ + res, err := m.GetPluginDashboardFileContents(context.Background(), &GetPluginDashboardFileContentsArgs{ PluginID: "pluginWithDashboards", FileReference: "dashboards/../dash2.json", }) @@ -186,12 +186,12 @@ func TestDashboardFileStore(t *testing.T) { }) } -func setupPluginDashboardsForTest(t *testing.T) *PluginManager { +func setupPluginDashboardsForTest(t *testing.T) *FileStoreManager { t.Helper() - return &PluginManager{ - pluginRegistry: &fakePluginRegistry{ - store: map[string]*plugins.Plugin{ + return &FileStoreManager{ + pluginStore: &fakePluginStore{ + plugins: map[string]plugins.PluginDTO{ "pluginWithoutDashboards": { JSONData: plugins.JSONData{ Includes: []*plugins.Includes{ @@ -223,3 +223,20 @@ func setupPluginDashboardsForTest(t *testing.T) *PluginManager { }, } } + +type fakePluginStore struct { + plugins map[string]plugins.PluginDTO +} + +func (pr fakePluginStore) Plugin(_ context.Context, pluginID string) (plugins.PluginDTO, bool) { + p, exists := pr.plugins[pluginID] + return p, exists +} + +func (pr fakePluginStore) Plugins(_ context.Context, _ ...plugins.Type) []plugins.PluginDTO { + var result []plugins.PluginDTO + for _, v := range pr.plugins { + result = append(result, v) + } + return result +} diff --git a/pkg/plugins/manager/dashboards/ifaces.go b/pkg/plugins/manager/dashboards/ifaces.go new file mode 100644 index 00000000000..a8cd5a11278 --- /dev/null +++ b/pkg/plugins/manager/dashboards/ifaces.go @@ -0,0 +1,35 @@ +package dashboards + +import ( + "context" + "io" +) + +// FileStore is the interface for plugin dashboard file storage. +type FileStore interface { + // ListPluginDashboardFiles lists plugin dashboard files. + ListPluginDashboardFiles(ctx context.Context, args *ListPluginDashboardFilesArgs) (*ListPluginDashboardFilesResult, error) + // GetPluginDashboardFileContents gets the referenced plugin dashboard file content. + GetPluginDashboardFileContents(ctx context.Context, args *GetPluginDashboardFileContentsArgs) (*GetPluginDashboardFileContentsResult, error) +} + +// ListPluginDashboardFilesArgs list plugin dashboard files argument model. +type ListPluginDashboardFilesArgs struct { + PluginID string +} + +// ListPluginDashboardFilesResult list plugin dashboard files result model. +type ListPluginDashboardFilesResult struct { + FileReferences []string +} + +// GetPluginDashboardFileContentsArgs get plugin dashboard file content argument model. +type GetPluginDashboardFileContentsArgs struct { + PluginID string + FileReference string +} + +// GetPluginDashboardFileContentsResult get plugin dashboard file content result model. +type GetPluginDashboardFileContentsResult struct { + Content io.ReadCloser +} diff --git a/pkg/plugins/manager/fakes/fakes.go b/pkg/plugins/manager/fakes/fakes.go new file mode 100644 index 00000000000..f79921ae436 --- /dev/null +++ b/pkg/plugins/manager/fakes/fakes.go @@ -0,0 +1,260 @@ +package fakes + +import ( + "archive/zip" + "context" + "sync" + + "github.com/grafana/grafana-plugin-sdk-go/backend" + + "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/plugins" + "github.com/grafana/grafana/pkg/plugins/backendplugin" + "github.com/grafana/grafana/pkg/plugins/repo" + "github.com/grafana/grafana/pkg/plugins/storage" +) + +type FakeLoader struct { + LoadFunc func(_ context.Context, _ plugins.Class, paths []string, _ map[string]struct{}) ([]*plugins.Plugin, error) + + LoadedPaths []string +} + +func (l *FakeLoader) Load(ctx context.Context, class plugins.Class, paths []string, ignore map[string]struct{}) ([]*plugins.Plugin, error) { + if l.LoadFunc != nil { + return l.LoadFunc(ctx, class, paths, ignore) + } + + l.LoadedPaths = append(l.LoadedPaths, paths...) + + return nil, nil +} + +type FakePluginClient struct { + ID string + Managed bool + Log log.Logger + + startCount int + stopCount int + exited bool + decommissioned bool + backend.CollectMetricsHandlerFunc + backend.CheckHealthHandlerFunc + backend.QueryDataHandlerFunc + backend.CallResourceHandlerFunc + mutex sync.RWMutex + + backendplugin.Plugin +} + +func (pc *FakePluginClient) PluginID() string { + return pc.ID +} + +func (pc *FakePluginClient) Logger() log.Logger { + return pc.Log +} + +func (pc *FakePluginClient) Start(_ context.Context) error { + pc.mutex.Lock() + defer pc.mutex.Unlock() + pc.exited = false + pc.startCount++ + return nil +} + +func (pc *FakePluginClient) Stop(_ context.Context) error { + pc.mutex.Lock() + defer pc.mutex.Unlock() + pc.stopCount++ + pc.exited = true + return nil +} + +func (pc *FakePluginClient) IsManaged() bool { + return pc.Managed +} + +func (pc *FakePluginClient) Exited() bool { + pc.mutex.RLock() + defer pc.mutex.RUnlock() + return pc.exited +} + +func (pc *FakePluginClient) Decommission() error { + pc.mutex.Lock() + defer pc.mutex.Unlock() + pc.decommissioned = true + return nil +} + +func (pc *FakePluginClient) IsDecommissioned() bool { + pc.mutex.RLock() + defer pc.mutex.RUnlock() + return pc.decommissioned +} + +func (pc *FakePluginClient) CollectMetrics(ctx context.Context, req *backend.CollectMetricsRequest) (*backend.CollectMetricsResult, error) { + if pc.CollectMetricsHandlerFunc != nil { + return pc.CollectMetricsHandlerFunc(ctx, req) + } + + return nil, backendplugin.ErrMethodNotImplemented +} + +func (pc *FakePluginClient) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) { + if pc.CheckHealthHandlerFunc != nil { + return pc.CheckHealthHandlerFunc(ctx, req) + } + + return nil, backendplugin.ErrMethodNotImplemented +} + +func (pc *FakePluginClient) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) { + if pc.QueryDataHandlerFunc != nil { + return pc.QueryDataHandlerFunc(ctx, req) + } + + return nil, backendplugin.ErrMethodNotImplemented +} + +func (pc *FakePluginClient) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error { + if pc.CallResourceHandlerFunc != nil { + return pc.CallResourceHandlerFunc(ctx, req, sender) + } + + return backendplugin.ErrMethodNotImplemented +} + +func (pc *FakePluginClient) SubscribeStream(_ context.Context, _ *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) { + return nil, backendplugin.ErrMethodNotImplemented +} + +func (pc *FakePluginClient) PublishStream(_ context.Context, _ *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) { + return nil, backendplugin.ErrMethodNotImplemented +} + +func (pc *FakePluginClient) RunStream(_ context.Context, _ *backend.RunStreamRequest, _ *backend.StreamSender) error { + return backendplugin.ErrMethodNotImplemented +} + +type FakePluginRegistry struct { + Store map[string]*plugins.Plugin +} + +func NewFakePluginRegistry() *FakePluginRegistry { + return &FakePluginRegistry{ + Store: make(map[string]*plugins.Plugin), + } +} + +func (f *FakePluginRegistry) Plugin(_ context.Context, id string) (*plugins.Plugin, bool) { + p, exists := f.Store[id] + return p, exists +} + +func (f *FakePluginRegistry) Plugins(_ context.Context) []*plugins.Plugin { + var res []*plugins.Plugin + + for _, p := range f.Store { + res = append(res, p) + } + + return res +} + +func (f *FakePluginRegistry) Add(_ context.Context, p *plugins.Plugin) error { + f.Store[p.ID] = p + return nil +} + +func (f *FakePluginRegistry) Remove(_ context.Context, id string) error { + delete(f.Store, id) + return nil +} + +type FakePluginRepo struct { + GetPluginArchiveFunc func(_ context.Context, pluginID, version string, _ repo.CompatOpts) (*repo.PluginArchive, error) + GetPluginArchiveByURLFunc func(_ context.Context, archiveURL string, _ repo.CompatOpts) (*repo.PluginArchive, error) + GetPluginDownloadOptionsFunc func(_ context.Context, pluginID, version string, _ repo.CompatOpts) (*repo.PluginDownloadOptions, error) +} + +// GetPluginArchive fetches the requested plugin archive. +func (r *FakePluginRepo) GetPluginArchive(ctx context.Context, pluginID, version string, opts repo.CompatOpts) (*repo.PluginArchive, error) { + if r.GetPluginArchiveFunc != nil { + return r.GetPluginArchiveFunc(ctx, pluginID, version, opts) + } + + return &repo.PluginArchive{}, nil +} + +// GetPluginArchiveByURL fetches the requested plugin from the specified URL. +func (r *FakePluginRepo) GetPluginArchiveByURL(ctx context.Context, archiveURL string, opts repo.CompatOpts) (*repo.PluginArchive, error) { + if r.GetPluginArchiveByURLFunc != nil { + return r.GetPluginArchiveByURLFunc(ctx, archiveURL, opts) + } + + return &repo.PluginArchive{}, nil +} + +// GetPluginDownloadOptions fetches information for downloading the requested plugin. +func (r *FakePluginRepo) GetPluginDownloadOptions(ctx context.Context, pluginID, version string, opts repo.CompatOpts) (*repo.PluginDownloadOptions, error) { + if r.GetPluginDownloadOptionsFunc != nil { + return r.GetPluginDownloadOptionsFunc(ctx, pluginID, version, opts) + } + return &repo.PluginDownloadOptions{}, nil +} + +type FakePluginStorage struct { + AddFunc func(_ context.Context, pluginID string, z *zip.ReadCloser) (*storage.ExtractedPluginArchive, error) + RemoveFunc func(_ context.Context, pluginID string) error + Added map[string]string + Removed map[string]int +} + +func (s *FakePluginStorage) Add(ctx context.Context, pluginID string, z *zip.ReadCloser) (*storage.ExtractedPluginArchive, error) { + s.Added[pluginID] = z.File[0].Name + if s.AddFunc != nil { + return s.AddFunc(ctx, pluginID, z) + } + return &storage.ExtractedPluginArchive{}, nil +} + +func (s *FakePluginStorage) Remove(ctx context.Context, pluginID string) error { + s.Removed[pluginID]++ + if s.RemoveFunc != nil { + return s.RemoveFunc(ctx, pluginID) + } + return nil +} + +type FakeProcessManager struct { + StartFunc func(_ context.Context, pluginID string) error + StopFunc func(_ context.Context, pluginID string) error + Started map[string]int + Stopped map[string]int +} + +func NewFakeProcessManager() *FakeProcessManager { + return &FakeProcessManager{ + Started: make(map[string]int), + Stopped: make(map[string]int), + } +} + +func (m *FakeProcessManager) Start(ctx context.Context, pluginID string) error { + m.Started[pluginID]++ + if m.StartFunc != nil { + return m.StartFunc(ctx, pluginID) + } + return nil +} + +func (m *FakeProcessManager) Stop(ctx context.Context, pluginID string) error { + m.Stopped[pluginID]++ + if m.StopFunc != nil { + return m.StopFunc(ctx, pluginID) + } + return nil +} diff --git a/pkg/plugins/manager/manager.go b/pkg/plugins/manager/manager.go index 1365935eda5..ad5e908100f 100644 --- a/pkg/plugins/manager/manager.go +++ b/pkg/plugins/manager/manager.go @@ -2,16 +2,14 @@ package manager import ( "context" - "errors" + "fmt" "path/filepath" - "sync" - "time" "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/plugins" - "github.com/grafana/grafana/pkg/plugins/backendplugin" "github.com/grafana/grafana/pkg/plugins/logger" "github.com/grafana/grafana/pkg/plugins/manager/loader" + "github.com/grafana/grafana/pkg/plugins/manager/process" "github.com/grafana/grafana/pkg/plugins/manager/registry" "github.com/grafana/grafana/pkg/plugins/repo" "github.com/grafana/grafana/pkg/plugins/storage" @@ -19,251 +17,229 @@ import ( ) var _ plugins.Manager = (*PluginManager)(nil) -var _ plugins.Client = (*PluginManager)(nil) -var _ plugins.Store = (*PluginManager)(nil) -var _ plugins.StaticRouteResolver = (*PluginManager)(nil) -var _ plugins.RendererManager = (*PluginManager)(nil) -var _ plugins.SecretsPluginManager = (*PluginManager)(nil) type PluginManager struct { cfg *plugins.Cfg - pluginRegistry registry.Service - pluginLoader loader.Service + pluginSources []plugins.PluginSource pluginRepo repo.Service - pluginSources []PluginSource pluginStorage storage.Manager - pluginsMu sync.RWMutex + processManager process.Service + pluginRegistry registry.Service + pluginLoader loader.Service log log.Logger } -type PluginSource struct { - Class plugins.Class - Paths []string -} - func ProvideService(grafanaCfg *setting.Cfg, pluginRegistry registry.Service, pluginLoader loader.Service, pluginRepo repo.Service) (*PluginManager, error) { - pm := New(plugins.FromGrafanaCfg(grafanaCfg), pluginRegistry, []PluginSource{ - {Class: plugins.Core, Paths: corePluginPaths(grafanaCfg)}, - {Class: plugins.Bundled, Paths: []string{grafanaCfg.BundledPluginsPath}}, - {Class: plugins.External, Paths: append([]string{grafanaCfg.PluginsPath}, pluginSettingPaths(grafanaCfg)...)}, - }, pluginLoader, pluginRepo, storage.FileSystem(logger.NewLogger("plugin.fs"), grafanaCfg.PluginsPath)) - if err := pm.Init(); err != nil { + pm := New(plugins.FromGrafanaCfg(grafanaCfg), pluginRegistry, pluginSources(grafanaCfg), pluginLoader, + pluginRepo, storage.FileSystem(logger.NewLogger("plugin.fs"), grafanaCfg.PluginsPath), + process.NewManager(pluginRegistry), + ) + if err := pm.Init(context.Background()); err != nil { return nil, err } return pm, nil } -func New(cfg *plugins.Cfg, pluginRegistry registry.Service, pluginSources []PluginSource, pluginLoader loader.Service, - pluginRepo repo.Service, pluginFs storage.Manager) *PluginManager { +func New(cfg *plugins.Cfg, pluginRegistry registry.Service, pluginSources []plugins.PluginSource, + pluginLoader loader.Service, pluginRepo repo.Service, pluginStorage storage.Manager, + processManager process.Service) *PluginManager { return &PluginManager{ cfg: cfg, - pluginLoader: pluginLoader, pluginSources: pluginSources, - pluginRegistry: pluginRegistry, pluginRepo: pluginRepo, - pluginStorage: pluginFs, + pluginLoader: pluginLoader, + pluginRegistry: pluginRegistry, + processManager: processManager, + pluginStorage: pluginStorage, log: log.New("plugin.manager"), } } -func (m *PluginManager) Init() error { +func (m *PluginManager) Init(ctx context.Context) error { for _, ps := range m.pluginSources { - err := m.loadPlugins(context.Background(), ps.Class, ps.Paths...) - if err != nil { + if err := m.loadPlugins(ctx, ps.Class, ps.Paths...); err != nil { return err } } - return nil } -func (m *PluginManager) Run(ctx context.Context) error { - <-ctx.Done() - m.shutdown(ctx) - return ctx.Err() -} - -func (m *PluginManager) loadPlugins(ctx context.Context, class plugins.Class, paths ...string) error { - if len(paths) == 0 { - return nil - } +func (m *PluginManager) Add(ctx context.Context, pluginID, version string, opts plugins.CompatOpts) error { + compatOpts := repo.NewCompatOpts(opts.GrafanaVersion, opts.OS, opts.Arch) - var pluginPaths []string - for _, p := range paths { - if p != "" { - pluginPaths = append(pluginPaths, p) + var pluginArchive *repo.PluginArchive + if plugin, exists := m.plugin(ctx, pluginID); exists { + if !plugin.IsExternalPlugin() { + return plugins.ErrInstallCorePlugin } - } - loadedPlugins, err := m.pluginLoader.Load(ctx, class, pluginPaths, m.registeredPlugins(ctx)) - if err != nil { - m.log.Error("Could not load plugins", "paths", pluginPaths, "err", err) - return err - } + if plugin.Info.Version == version { + return plugins.DuplicateError{ + PluginID: plugin.ID, + ExistingPluginDir: plugin.PluginDir, + } + } - for _, p := range loadedPlugins { - if err := m.registerAndStart(context.Background(), p); err != nil { - m.log.Error("Could not start plugin", "pluginId", p.ID, "err", err) + // get plugin update information to confirm if target update is possible + dlOpts, err := m.pluginRepo.GetPluginDownloadOptions(ctx, pluginID, version, compatOpts) + if err != nil { + return err } - } - return nil -} + // if existing plugin version is the same as the target update version + if dlOpts.Version == plugin.Info.Version { + return plugins.DuplicateError{ + PluginID: plugin.ID, + ExistingPluginDir: plugin.PluginDir, + } + } -func (m *PluginManager) Renderer() *plugins.Plugin { - for _, p := range m.availablePlugins(context.TODO()) { - if p.IsRenderer() { - return p + if dlOpts.PluginZipURL == "" && dlOpts.Version == "" { + return fmt.Errorf("could not determine update options for %s", pluginID) } - } - return nil -} + // remove existing installation of plugin + err = m.Remove(ctx, plugin.ID) + if err != nil { + return err + } -func (m *PluginManager) SecretsManager() *plugins.Plugin { - for _, p := range m.availablePlugins(context.TODO()) { - if p.IsSecretsManager() { - return p + if dlOpts.PluginZipURL != "" { + pluginArchive, err = m.pluginRepo.GetPluginArchiveByURL(ctx, dlOpts.PluginZipURL, compatOpts) + if err != nil { + return err + } + } else { + pluginArchive, err = m.pluginRepo.GetPluginArchive(ctx, pluginID, dlOpts.Version, compatOpts) + if err != nil { + return err + } + } + } else { + var err error + pluginArchive, err = m.pluginRepo.GetPluginArchive(ctx, pluginID, version, compatOpts) + if err != nil { + return err } } - return nil -} + extractedArchive, err := m.pluginStorage.Add(ctx, pluginID, pluginArchive.File) + if err != nil { + return err + } -func (m *PluginManager) Routes() []*plugins.StaticRoute { - staticRoutes := make([]*plugins.StaticRoute, 0) + // download dependency plugins + pathsToScan := []string{extractedArchive.Path} + for _, dep := range extractedArchive.Dependencies { + m.log.Info("Fetching %s dependencies...", dep.ID) + d, err := m.pluginRepo.GetPluginArchive(ctx, dep.ID, dep.Version, compatOpts) + if err != nil { + return fmt.Errorf("%v: %w", fmt.Sprintf("failed to download plugin %s from repository", dep.ID), err) + } - for _, p := range m.availablePlugins(context.TODO()) { - if p.StaticRoute() != nil { - staticRoutes = append(staticRoutes, p.StaticRoute()) + depArchive, err := m.pluginStorage.Add(ctx, dep.ID, d.File) + if err != nil { + return err } - } - return staticRoutes -} -func (m *PluginManager) registerAndStart(ctx context.Context, p *plugins.Plugin) error { - if err := m.pluginRegistry.Add(ctx, p); err != nil { - return err + pathsToScan = append(pathsToScan, depArchive.Path) } - if !p.IsCorePlugin() { - m.log.Info("Plugin registered", "pluginId", p.ID) + err = m.loadPlugins(context.Background(), plugins.External, pathsToScan...) + if err != nil { + m.log.Error("Could not load plugins", "paths", pathsToScan, "err", err) + return err } - return m.start(ctx, p) + return nil } -func (m *PluginManager) unregisterAndStop(ctx context.Context, p *plugins.Plugin) error { - m.log.Debug("Stopping plugin process", "pluginId", p.ID) - m.pluginsMu.Lock() - defer m.pluginsMu.Unlock() - - if err := p.Decommission(); err != nil { - return err +func (m *PluginManager) Remove(ctx context.Context, pluginID string) error { + plugin, exists := m.plugin(ctx, pluginID) + if !exists { + return plugins.ErrPluginNotInstalled } - if err := p.Stop(ctx); err != nil { - return err + if !plugin.IsExternalPlugin() { + return plugins.ErrUninstallCorePlugin } - if err := m.pluginRegistry.Remove(ctx, p.ID); err != nil { + if err := m.unregisterAndStop(ctx, plugin); err != nil { return err } - m.log.Debug("Plugin unregistered", "pluginId", p.ID) - return nil + return m.pluginStorage.Remove(ctx, plugin.ID) } -// start starts a backend plugin process -func (m *PluginManager) start(ctx context.Context, p *plugins.Plugin) error { - if !p.IsManaged() || !p.Backend || p.SignatureError != nil { - return nil +// plugin finds a plugin with `pluginID` from the registry that is not decommissioned +func (m *PluginManager) plugin(ctx context.Context, pluginID string) (*plugins.Plugin, bool) { + p, exists := m.pluginRegistry.Plugin(ctx, pluginID) + if !exists { + return nil, false } - if _, exists := m.pluginRegistry.Plugin(ctx, p.ID); !exists { - return backendplugin.ErrPluginNotRegistered + if p.IsDecommissioned() { + return nil, false } - if p.IsCorePlugin() { - return nil + return p, true +} + +func (m *PluginManager) loadPlugins(ctx context.Context, class plugins.Class, pluginPaths ...string) error { + registeredPlugins := make(map[string]struct{}) + for _, p := range m.pluginRegistry.Plugins(ctx) { + registeredPlugins[p.ID] = struct{}{} } - if err := startPluginAndRestartKilledProcesses(ctx, p); err != nil { + loadedPlugins, err := m.pluginLoader.Load(ctx, class, pluginPaths, registeredPlugins) + if err != nil { + m.log.Error("Could not load plugins", "paths", pluginPaths, "err", err) return err } - p.Logger().Debug("Successfully started backend plugin process") + for _, p := range loadedPlugins { + if err = m.registerAndStart(context.Background(), p); err != nil { + m.log.Error("Could not start plugin", "pluginID", p.ID, "err", err) + } + } return nil } -func startPluginAndRestartKilledProcesses(ctx context.Context, p *plugins.Plugin) error { - if err := p.Start(ctx); err != nil { +func (m *PluginManager) registerAndStart(ctx context.Context, p *plugins.Plugin) error { + if err := m.pluginRegistry.Add(ctx, p); err != nil { return err } - - go func(ctx context.Context, p *plugins.Plugin) { - if err := restartKilledProcess(ctx, p); err != nil { - p.Logger().Error("Attempt to restart killed plugin process failed", "error", err) - } - }(ctx, p) - - return nil + return m.processManager.Start(ctx, p.ID) } -func restartKilledProcess(ctx context.Context, p *plugins.Plugin) error { - ticker := time.NewTicker(time.Second * 1) - - for { - select { - case <-ctx.Done(): - if err := ctx.Err(); err != nil && !errors.Is(err, context.Canceled) { - return err - } - return nil - case <-ticker.C: - if p.IsDecommissioned() { - p.Logger().Debug("Plugin decommissioned") - return nil - } +func (m *PluginManager) unregisterAndStop(ctx context.Context, p *plugins.Plugin) error { + m.log.Debug("Stopping plugin process", "pluginID", p.ID) - if !p.Exited() { - continue - } + if err := m.processManager.Stop(ctx, p.ID); err != nil { + return err + } - p.Logger().Debug("Restarting plugin") - if err := p.Start(ctx); err != nil { - p.Logger().Error("Failed to restart plugin", "error", err) - continue - } - p.Logger().Debug("Plugin restarted") - } + if err := m.pluginRegistry.Remove(ctx, p.ID); err != nil { + return err } + m.log.Debug("Plugin unregistered", "pluginID", p.ID) + return nil } -// shutdown stops all backend plugin processes -func (m *PluginManager) shutdown(ctx context.Context) { - var wg sync.WaitGroup - for _, p := range m.availablePlugins(ctx) { - wg.Add(1) - go func(p backendplugin.Plugin, ctx context.Context) { - defer wg.Done() - p.Logger().Debug("Stopping plugin") - if err := p.Stop(ctx); err != nil { - p.Logger().Error("Failed to stop plugin", "error", err) - } - p.Logger().Debug("Plugin stopped") - }(p, ctx) +func pluginSources(cfg *setting.Cfg) []plugins.PluginSource { + return []plugins.PluginSource{ + {Class: plugins.Core, Paths: corePluginPaths(cfg)}, + {Class: plugins.Bundled, Paths: []string{cfg.BundledPluginsPath}}, + {Class: plugins.External, Paths: append([]string{cfg.PluginsPath}, pluginSettingPaths(cfg)...)}, } - wg.Wait() } // corePluginPaths provides a list of the Core plugin paths which need to be scanned on init() func corePluginPaths(cfg *setting.Cfg) []string { datasourcePaths := filepath.Join(cfg.StaticRootPath, "app/plugins/datasource") panelsPath := filepath.Join(cfg.StaticRootPath, "app/plugins/panel") - return []string{datasourcePaths, panelsPath} } @@ -277,6 +253,5 @@ func pluginSettingPaths(cfg *setting.Cfg) []string { } pluginSettingDirs = append(pluginSettingDirs, path) } - return pluginSettingDirs } diff --git a/pkg/plugins/manager/manager_integration_test.go b/pkg/plugins/manager/manager_integration_test.go index da57fbd765b..ed9e68ba074 100644 --- a/pkg/plugins/manager/manager_integration_test.go +++ b/pkg/plugins/manager/manager_integration_test.go @@ -2,24 +2,29 @@ package manager import ( "context" + "encoding/json" "net/http" "path/filepath" "strings" "testing" + "time" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/trace" "gopkg.in/ini.v1" + "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/backend/httpclient" + "github.com/grafana/grafana/pkg/infra/tracing" "github.com/grafana/grafana/pkg/plugins" "github.com/grafana/grafana/pkg/plugins/backendplugin/coreplugin" "github.com/grafana/grafana/pkg/plugins/backendplugin/provider" + "github.com/grafana/grafana/pkg/plugins/manager/client" "github.com/grafana/grafana/pkg/plugins/manager/loader" "github.com/grafana/grafana/pkg/plugins/manager/registry" "github.com/grafana/grafana/pkg/plugins/manager/signature" + "github.com/grafana/grafana/pkg/plugins/manager/store" "github.com/grafana/grafana/pkg/services/featuremgmt" "github.com/grafana/grafana/pkg/services/licensing" "github.com/grafana/grafana/pkg/services/searchV2" @@ -42,7 +47,7 @@ import ( "github.com/grafana/grafana/pkg/tsdb/testdatasource" ) -func TestPluginManager_int_init(t *testing.T) { +func TestIntegrationPluginManager_Run(t *testing.T) { t.Helper() staticRootPath, err := filepath.Abs("../../../public/") @@ -59,9 +64,12 @@ func TestPluginManager_int_init(t *testing.T) { BundledPluginsPath: bundledPluginsPath, IsFeatureToggleEnabled: features.IsEnabled, PluginSettings: map[string]map[string]string{ - "plugin.datasource-id": { + "plugin.test-app": { "path": "testdata/test-app", }, + "plugin.test-panel": { + "not": "included", + }, }, } @@ -92,17 +100,46 @@ func TestPluginManager_int_init(t *testing.T) { coreRegistry := coreplugin.ProvideCoreRegistry(am, cw, cm, es, grap, idb, lk, otsdb, pr, tmpo, td, pg, my, ms, graf) pmCfg := plugins.FromGrafanaCfg(cfg) - pm, err := ProvideService(cfg, registry.NewInMemory(), loader.New(pmCfg, license, signature.NewUnsignedAuthorizer(pmCfg), + reg := registry.ProvideService() + pm, err := ProvideService(cfg, reg, loader.New(pmCfg, license, signature.NewUnsignedAuthorizer(pmCfg), provider.ProvideService(coreRegistry)), nil) require.NoError(t, err) + ps := store.ProvideService(reg) ctx := context.Background() - verifyCorePluginCatalogue(t, ctx, pm) - verifyBundledPlugins(t, ctx, pm) - verifyPluginStaticRoutes(t, ctx, pm) + verifyCorePluginCatalogue(t, ctx, ps) + verifyBundledPlugins(t, ctx, ps) + verifyPluginStaticRoutes(t, ctx, ps) + verifyBackendProcesses(t, pm.pluginRegistry.Plugins(ctx)) + verifyPluginQuery(t, ctx, client.ProvideService(reg)) } -func verifyCorePluginCatalogue(t *testing.T, ctx context.Context, pm *PluginManager) { +func verifyPluginQuery(t *testing.T, ctx context.Context, c plugins.Client) { + now := time.Unix(1661420870, 0) + req := &backend.QueryDataRequest{ + PluginContext: backend.PluginContext{ + PluginID: "testdata", + }, + Queries: []backend.DataQuery{ + { + RefID: "A", + TimeRange: backend.TimeRange{ + From: now.Add(-5 * time.Minute), + To: now, + }, + JSON: json.RawMessage(`{"scenarioId":"csv_metric_values","stringInput":"1,20,90,30,5,0"}`), + }, + }, + } + + resp, err := c.QueryData(ctx, req) + require.NoError(t, err) + payload, err := resp.MarshalJSON() + require.NoError(t, err) + require.JSONEq(t, `{"results":{"A":{"frames":[{"schema":{"refId":"A","fields":[{"name":"time","type":"time","typeInfo":{"frame":"time.Time"}},{"name":"A-series","type":"number","typeInfo":{"frame":"int64","nullable":true}}]},"data":{"values":[[1661420570000,1661420630000,1661420690000,1661420750000,1661420810000,1661420870000],[1,20,90,30,5,0]]}}]}}}`, string(payload)) +} + +func verifyCorePluginCatalogue(t *testing.T, ctx context.Context, ps *store.Service) { t.Helper() expPanels := map[string]struct{}{ @@ -168,78 +205,85 @@ func verifyCorePluginCatalogue(t *testing.T, ctx context.Context, pm *PluginMana "test-app": {}, } - panels := pm.Plugins(ctx, plugins.Panel) - assert.Equal(t, len(expPanels), len(panels)) + panels := ps.Plugins(ctx, plugins.Panel) + require.Equal(t, len(expPanels), len(panels)) for _, p := range panels { - p, exists := pm.Plugin(ctx, p.ID) + p, exists := ps.Plugin(ctx, p.ID) require.NotEqual(t, plugins.PluginDTO{}, p) - assert.True(t, exists) - assert.Contains(t, expPanels, p.ID) - assert.Contains(t, pm.registeredPlugins(ctx), p.ID) + require.True(t, exists) + require.Contains(t, expPanels, p.ID) } - dataSources := pm.Plugins(ctx, plugins.DataSource) - assert.Equal(t, len(expDataSources), len(dataSources)) + dataSources := ps.Plugins(ctx, plugins.DataSource) + require.Equal(t, len(expDataSources), len(dataSources)) for _, ds := range dataSources { - p, exists := pm.Plugin(ctx, ds.ID) + p, exists := ps.Plugin(ctx, ds.ID) require.NotEqual(t, plugins.PluginDTO{}, p) - assert.True(t, exists) - assert.Contains(t, expDataSources, ds.ID) - assert.Contains(t, pm.registeredPlugins(ctx), ds.ID) + require.True(t, exists) + require.Contains(t, expDataSources, ds.ID) } - apps := pm.Plugins(ctx, plugins.App) - assert.Equal(t, len(expApps), len(apps)) + apps := ps.Plugins(ctx, plugins.App) + require.Equal(t, len(expApps), len(apps)) for _, app := range apps { - p, exists := pm.Plugin(ctx, app.ID) - require.NotEqual(t, plugins.PluginDTO{}, p) - assert.True(t, exists) - assert.Contains(t, expApps, app.ID) - assert.Contains(t, pm.registeredPlugins(ctx), app.ID) + p, exists := ps.Plugin(ctx, app.ID) + require.True(t, exists) + require.NotNil(t, p) + require.Contains(t, expApps, app.ID) } - assert.Equal(t, len(expPanels)+len(expDataSources)+len(expApps), len(pm.Plugins(ctx))) + require.Equal(t, len(expPanels)+len(expDataSources)+len(expApps), len(ps.Plugins(ctx))) } -func verifyBundledPlugins(t *testing.T, ctx context.Context, pm *PluginManager) { +func verifyBundledPlugins(t *testing.T, ctx context.Context, ps *store.Service) { t.Helper() dsPlugins := make(map[string]struct{}) - for _, p := range pm.Plugins(ctx, plugins.DataSource) { + for _, p := range ps.Plugins(ctx, plugins.DataSource) { dsPlugins[p.ID] = struct{}{} } + inputPlugin, exists := ps.Plugin(ctx, "input") + require.True(t, exists) + require.NotEqual(t, plugins.PluginDTO{}, inputPlugin) + require.NotNil(t, dsPlugins["input"]) + pluginRoutes := make(map[string]*plugins.StaticRoute) - for _, r := range pm.Routes() { + for _, r := range ps.Routes() { pluginRoutes[r.PluginID] = r } - inputPlugin, exists := pm.Plugin(ctx, "input") - require.NotEqual(t, plugins.PluginDTO{}, inputPlugin) - assert.True(t, exists) - assert.NotNil(t, dsPlugins["input"]) - for _, pluginID := range []string{"input"} { - assert.Contains(t, pluginRoutes, pluginID) - assert.True(t, strings.HasPrefix(pluginRoutes[pluginID].Directory, inputPlugin.PluginDir)) + require.Contains(t, pluginRoutes, pluginID) + require.True(t, strings.HasPrefix(pluginRoutes[pluginID].Directory, inputPlugin.PluginDir)) } } -func verifyPluginStaticRoutes(t *testing.T, ctx context.Context, pm *PluginManager) { +func verifyPluginStaticRoutes(t *testing.T, ctx context.Context, ps *store.Service) { routes := make(map[string]*plugins.StaticRoute) - for _, route := range pm.Routes() { + for _, route := range ps.Routes() { routes[route.PluginID] = route } - assert.Len(t, routes, 2) + require.Len(t, routes, 2) + + inputPlugin, _ := ps.Plugin(ctx, "input") + require.NotNil(t, routes["input"]) + require.Equal(t, routes["input"].Directory, inputPlugin.PluginDir) - inputPlugin, _ := pm.Plugin(ctx, "input") - assert.NotNil(t, routes["input"]) - assert.Equal(t, routes["input"].Directory, inputPlugin.PluginDir) + testAppPlugin, _ := ps.Plugin(ctx, "test-app") + require.Contains(t, routes, "test-app") + require.Equal(t, routes["test-app"].Directory, testAppPlugin.PluginDir) +} - testAppPlugin, _ := pm.Plugin(ctx, "test-app") - assert.Contains(t, routes, "test-app") - assert.Equal(t, routes["test-app"].Directory, testAppPlugin.PluginDir) +func verifyBackendProcesses(t *testing.T, ps []*plugins.Plugin) { + for _, p := range ps { + if p.Backend { + pc, exists := p.Client() + require.True(t, exists) + require.NotNil(t, pc) + } + } } type fakeTracer struct { diff --git a/pkg/plugins/manager/manager_test.go b/pkg/plugins/manager/manager_test.go index c63d9d9008c..ca1fb548810 100644 --- a/pkg/plugins/manager/manager_test.go +++ b/pkg/plugins/manager/manager_test.go @@ -3,891 +3,237 @@ package manager import ( "archive/zip" "context" - "net/http" - "os" - "path/filepath" - "sync" "testing" - "time" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/grafana/grafana-azure-sdk-go/azsettings" - "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/plugins" - "github.com/grafana/grafana/pkg/plugins/backendplugin" - "github.com/grafana/grafana/pkg/plugins/manager/registry" + "github.com/grafana/grafana/pkg/plugins/manager/fakes" "github.com/grafana/grafana/pkg/plugins/repo" "github.com/grafana/grafana/pkg/plugins/storage" ) -const ( - testPluginID = "test-plugin" -) - -func TestPluginManager_Init(t *testing.T) { - t.Run("Plugin sources are loaded in order", func(t *testing.T) { - loader := &fakeLoader{} - pm := New(&plugins.Cfg{}, newFakePluginRegistry(), []PluginSource{ - {Class: plugins.Bundled, Paths: []string{"path1"}}, - {Class: plugins.Core, Paths: []string{"path2"}}, - {Class: plugins.External, Paths: []string{"path3"}}, - }, loader, &fakePluginRepo{}, &fakeFsManager{}) - - err := pm.Init() - require.NoError(t, err) - require.Equal(t, []string{"path1", "path2", "path3"}, loader.loadedPaths) - }) -} - -func TestPluginManager_loadPlugins(t *testing.T) { - t.Run("Managed backend plugin", func(t *testing.T) { - p, pc := createPlugin(t, testPluginID, plugins.External, true, func(p *plugins.Plugin) { - p.Backend = true - }) +const testPluginID = "test-plugin" - loader := &fakeLoader{ - mockedLoadedPlugins: []*plugins.Plugin{p}, - } - - pm := createManager(t, func(pm *PluginManager) { - pm.pluginLoader = loader - }) - err := pm.loadPlugins(context.Background(), plugins.External, "test/path") - require.NoError(t, err) - - assert.Equal(t, 1, pc.startCount) - assert.Equal(t, 0, pc.stopCount) - assert.False(t, pc.exited) - assert.False(t, pc.decommissioned) - - testPlugin, exists := pm.Plugin(context.Background(), testPluginID) - assert.True(t, exists) - assert.Equal(t, p.ToDTO(), testPlugin) - assert.Len(t, pm.Plugins(context.Background()), 1) - - verifyNoPluginErrors(t, pm) - }) - - t.Run("Unmanaged backend plugin", func(t *testing.T) { - p, pc := createPlugin(t, testPluginID, plugins.External, false, func(p *plugins.Plugin) { - p.Backend = true - }) - - loader := &fakeLoader{ - mockedLoadedPlugins: []*plugins.Plugin{p}, - } - - pm := createManager(t, func(pm *PluginManager) { - pm.pluginLoader = loader - }) - err := pm.loadPlugins(context.Background(), plugins.External, "test/path") - require.NoError(t, err) - - assert.Equal(t, 0, pc.startCount) - assert.Equal(t, 0, pc.stopCount) - assert.False(t, pc.exited) - assert.False(t, pc.decommissioned) - - testPlugin, exists := pm.Plugin(context.Background(), testPluginID) - assert.True(t, exists) - assert.Equal(t, p.ToDTO(), testPlugin) - assert.Len(t, pm.Plugins(context.Background()), 1) - - verifyNoPluginErrors(t, pm) - }) +func TestPluginManager_Add_Remove(t *testing.T) { + t.Run("Adding a new plugin", func(t *testing.T) { + const ( + pluginID, v1 = "test-panel", "1.0.0" + zipNameV1 = "test-panel-1.0.0.zip" + ) - t.Run("Managed non-backend plugin", func(t *testing.T) { - p, pc := createPlugin(t, testPluginID, plugins.External, false, func(p *plugins.Plugin) { - p.Backend = true + // mock a plugin to be returned automatically by the plugin loader + pluginV1 := createPlugin(t, pluginID, plugins.External, true, true, func(plugin *plugins.Plugin) { + plugin.Info.Version = v1 }) + mockZipV1 := &zip.ReadCloser{Reader: zip.Reader{File: []*zip.File{{ + FileHeader: zip.FileHeader{Name: zipNameV1}, + }}}} - loader := &fakeLoader{ - mockedLoadedPlugins: []*plugins.Plugin{p}, + loader := &fakes.FakeLoader{ + LoadFunc: func(_ context.Context, _ plugins.Class, paths []string, _ map[string]struct{}) ([]*plugins.Plugin, error) { + require.Equal(t, []string{zipNameV1}, paths) + return []*plugins.Plugin{pluginV1}, nil + }, } - pm := createManager(t, func(pm *PluginManager) { - pm.pluginLoader = loader - }) - err := pm.loadPlugins(context.Background(), plugins.External, "test/path") - require.NoError(t, err) - - assert.Equal(t, 0, pc.startCount) - assert.Equal(t, 0, pc.stopCount) - assert.False(t, pc.exited) - assert.False(t, pc.decommissioned) - - testPlugin, exists := pm.Plugin(context.Background(), testPluginID) - assert.True(t, exists) - assert.Equal(t, p.ToDTO(), testPlugin) - assert.Len(t, pm.Plugins(context.Background()), 1) - - verifyNoPluginErrors(t, pm) - }) - - t.Run("Unmanaged non-backend plugin", func(t *testing.T) { - p, pc := createPlugin(t, testPluginID, plugins.External, false) - - loader := &fakeLoader{ - mockedLoadedPlugins: []*plugins.Plugin{p}, + pluginRepo := &fakes.FakePluginRepo{ + GetPluginArchiveFunc: func(_ context.Context, pluginID, version string, _ repo.CompatOpts) (*repo.PluginArchive, error) { + require.Equal(t, pluginV1.ID, pluginID) + require.Equal(t, v1, version) + return &repo.PluginArchive{ + File: mockZipV1, + }, nil + }, } - pm := createManager(t, func(pm *PluginManager) { - pm.pluginLoader = loader - }) - err := pm.loadPlugins(context.Background(), plugins.External, "test/path") - require.NoError(t, err) - - assert.Equal(t, 0, pc.startCount) - assert.Equal(t, 0, pc.stopCount) - assert.False(t, pc.exited) - assert.False(t, pc.decommissioned) - - testPlugin, exists := pm.Plugin(context.Background(), testPluginID) - assert.True(t, exists) - assert.Equal(t, p.ToDTO(), testPlugin) - assert.Len(t, pm.Plugins(context.Background()), 1) - - verifyNoPluginErrors(t, pm) - }) -} - -func TestPluginManager_Installer(t *testing.T) { - t.Run("Add new plugin", func(t *testing.T) { - testDir, err := os.CreateTemp(os.TempDir(), "plugin-manager-test-*") - require.NoError(t, err) - t.Cleanup(func() { - err := os.RemoveAll(testDir.Name()) - assert.NoError(t, err) - }) - - p, pc := createPlugin(t, testPluginID, plugins.External, true, func(p *plugins.Plugin) { - p.PluginDir = filepath.Join(testDir.Name(), p.ID) - p.Backend = true - }) - - l := &fakeLoader{ - mockedLoadedPlugins: []*plugins.Plugin{p}, + fs := &fakes.FakePluginStorage{ + AddFunc: func(_ context.Context, pluginID string, z *zip.ReadCloser) (*storage.ExtractedPluginArchive, error) { + require.Equal(t, pluginV1.ID, pluginID) + require.Equal(t, mockZipV1, z) + return &storage.ExtractedPluginArchive{ + Path: zipNameV1, + }, nil + }, + Added: make(map[string]string), + Removed: make(map[string]int), } - fsm := &fakeFsManager{} - - repository := &fakePluginRepo{} - pm := createManager(t, func(pm *PluginManager) { - pm.cfg.PluginsPath = testDir.Name() - pm.pluginLoader = l - pm.pluginStorage = fsm - pm.pluginRepo = repository - }) + proc := fakes.NewFakeProcessManager() - err = pm.Add(context.Background(), testPluginID, "1.0.0", plugins.CompatOpts{}) + pm := New(&plugins.Cfg{}, fakes.NewFakePluginRegistry(), []plugins.PluginSource{}, loader, pluginRepo, fs, proc) + err := pm.Add(context.Background(), pluginID, v1, plugins.CompatOpts{}) require.NoError(t, err) - assert.Equal(t, 1, repository.downloadCount) - - verifyNoPluginErrors(t, pm) - - assert.Len(t, pm.Routes(), 1) - assert.Equal(t, p.ID, pm.Routes()[0].PluginID) - assert.Equal(t, p.PluginDir, pm.Routes()[0].Directory) - - assert.Equal(t, 1, repository.downloadCount) - assert.Equal(t, 0, fsm.removed) - assert.Equal(t, 1, fsm.added) - - assert.Equal(t, 1, pc.startCount) - assert.Equal(t, 0, pc.stopCount) - assert.False(t, pc.exited) - assert.False(t, pc.decommissioned) - - testPlugin, exists := pm.Plugin(context.Background(), testPluginID) - assert.True(t, exists) - assert.Equal(t, p.ToDTO(), testPlugin) - assert.Len(t, pm.Plugins(context.Background()), 1) - - t.Run("Won't install if already installed", func(t *testing.T) { - err := pm.Add(context.Background(), testPluginID, "1.0.0", plugins.CompatOpts{}) - assert.Equal(t, plugins.DuplicateError{ - PluginID: p.ID, - ExistingPluginDir: p.PluginDir, + require.Equal(t, zipNameV1, fs.Added[pluginID]) + require.Equal(t, 0, fs.Removed[pluginID]) + require.Equal(t, 1, proc.Started[pluginID]) + require.Equal(t, 0, proc.Stopped[pluginID]) + + regPlugin, exists := pm.pluginRegistry.Plugin(context.Background(), pluginID) + require.True(t, exists) + require.Equal(t, pluginV1, regPlugin) + require.Len(t, pm.pluginRegistry.Plugins(context.Background()), 1) + + t.Run("Won't add if already exists", func(t *testing.T) { + err = pm.Add(context.Background(), pluginID, v1, plugins.CompatOpts{}) + require.Equal(t, plugins.DuplicateError{ + PluginID: pluginV1.ID, + ExistingPluginDir: pluginV1.PluginDir, }, err) }) - t.Run("Update option is the same as installed version", func(t *testing.T) { - repository.downloadOptionsHandler = func(_ context.Context, _, _ string, _ repo.CompatOpts) (*repo.PluginDownloadOptions, error) { - return &repo.PluginDownloadOptions{ - Version: p.Info.Version, - }, nil - } - - err = pm.Add(context.Background(), p.ID, "", plugins.CompatOpts{}) - require.ErrorIs(t, err, plugins.DuplicateError{ - PluginID: p.ID, - ExistingPluginDir: p.PluginDir, - }) - - assert.Equal(t, 1, repository.downloadCount) - assert.Equal(t, 0, fsm.removed) - assert.Equal(t, 1, fsm.added) - assert.Equal(t, 1, pc.startCount) - assert.Equal(t, 0, pc.stopCount) - assert.False(t, pc.exited) - assert.False(t, pc.decommissioned) - - testPlugin, exists = pm.Plugin(context.Background(), p.ID) - assert.True(t, exists) - assert.Equal(t, p.ToDTO(), testPlugin) - assert.Len(t, pm.Plugins(context.Background()), 1) - }) - - t.Run("Update existing plugin", func(t *testing.T) { - p, pc := createPlugin(t, testPluginID, plugins.External, true, func(p *plugins.Plugin) { - p.Backend = true - p.PluginDir = filepath.Join(testDir.Name(), p.ID) + t.Run("Update plugin to different version", func(t *testing.T) { + const ( + v2 = "2.0.0" + zipNameV2 = "test-panel-2.0.0.zip" + ) + // mock a plugin to be returned automatically by the plugin loader + pluginV2 := createPlugin(t, pluginID, plugins.External, true, true, func(plugin *plugins.Plugin) { + plugin.Info.Version = v2 }) - l := &fakeLoader{ - mockedLoadedPlugins: []*plugins.Plugin{p}, + mockZipV2 := &zip.ReadCloser{Reader: zip.Reader{File: []*zip.File{{ + FileHeader: zip.FileHeader{Name: zipNameV2}, + }}}} + loader.LoadFunc = func(_ context.Context, class plugins.Class, paths []string, ignore map[string]struct{}) ([]*plugins.Plugin, error) { + require.Equal(t, plugins.External, class) + require.Empty(t, ignore) + require.Equal(t, []string{zipNameV2}, paths) + return []*plugins.Plugin{pluginV2}, nil } - pm.pluginLoader = l - - repository.downloadOptionsHandler = func(_ context.Context, _, _ string, _ repo.CompatOpts) (*repo.PluginDownloadOptions, error) { + pluginRepo.GetPluginDownloadOptionsFunc = func(_ context.Context, pluginID, version string, _ repo.CompatOpts) (*repo.PluginDownloadOptions, error) { return &repo.PluginDownloadOptions{ - Version: "1.2.0", + PluginZipURL: "https://grafanaplugins.com", + }, nil + } + pluginRepo.GetPluginArchiveByURLFunc = func(_ context.Context, pluginZipURL string, _ repo.CompatOpts) (*repo.PluginArchive, error) { + require.Equal(t, "https://grafanaplugins.com", pluginZipURL) + return &repo.PluginArchive{ + File: mockZipV2, + }, nil + } + fs.AddFunc = func(_ context.Context, pluginID string, z *zip.ReadCloser) (*storage.ExtractedPluginArchive, error) { + require.Equal(t, pluginV1.ID, pluginID) + require.Equal(t, mockZipV2, z) + return &storage.ExtractedPluginArchive{ + Path: zipNameV2, }, nil } - err = pm.Add(context.Background(), testPluginID, "1.2.0", plugins.CompatOpts{}) - assert.NoError(t, err) - - assert.Equal(t, 2, repository.downloadCount) - assert.Equal(t, 1, fsm.removed) - assert.Equal(t, 2, fsm.added) - assert.Equal(t, 1, pc.startCount) - assert.Equal(t, 0, pc.stopCount) - assert.False(t, pc.exited) - assert.False(t, pc.decommissioned) - - testPlugin, exists := pm.Plugin(context.Background(), testPluginID) - assert.True(t, exists) - assert.Equal(t, p.ToDTO(), testPlugin) - assert.Len(t, pm.Plugins(context.Background()), 1) + err = pm.Add(context.Background(), pluginID, v2, plugins.CompatOpts{}) + require.NoError(t, err) + + require.Equal(t, zipNameV2, fs.Added[pluginID]) + require.Equal(t, 1, fs.Removed[pluginID]) + require.Equal(t, 2, proc.Started[pluginID]) + require.Equal(t, 1, proc.Stopped[pluginID]) + + regPlugin, exists = pm.pluginRegistry.Plugin(context.Background(), pluginID) + require.True(t, exists) + require.Equal(t, pluginV2, regPlugin) + require.Len(t, pm.pluginRegistry.Plugins(context.Background()), 1) }) - t.Run("Uninstall existing plugin", func(t *testing.T) { - err := pm.Remove(context.Background(), p.ID) + t.Run("Removing an existing plugin", func(t *testing.T) { + err = pm.Remove(context.Background(), pluginID) require.NoError(t, err) - assert.Equal(t, 2, repository.downloadCount) + require.Equal(t, 2, proc.Stopped[pluginID]) + require.Equal(t, 2, fs.Removed[pluginID]) - p, exists := pm.Plugin(context.Background(), p.ID) - assert.False(t, exists) - assert.Equal(t, plugins.PluginDTO{}, p) - assert.Len(t, pm.Routes(), 0) + p, exists := pm.pluginRegistry.Plugin(context.Background(), pluginID) + require.False(t, exists) + require.Nil(t, p) - t.Run("Won't uninstall if not installed", func(t *testing.T) { - err := pm.Remove(context.Background(), p.ID) + t.Run("Won't remove if not exists", func(t *testing.T) { + err := pm.Remove(context.Background(), pluginID) require.Equal(t, plugins.ErrPluginNotInstalled, err) }) }) }) - t.Run("Can't update core plugin", func(t *testing.T) { - p, pc := createPlugin(t, testPluginID, plugins.Core, true, func(p *plugins.Plugin) { - p.Backend = true - }) - - loader := &fakeLoader{ - mockedLoadedPlugins: []*plugins.Plugin{p}, + t.Run("Can't update core or bundled plugin", func(t *testing.T) { + tcs := []struct { + class plugins.Class + }{ + {class: plugins.Core}, + {class: plugins.Bundled}, } - pm := createManager(t, func(pm *PluginManager) { - pm.pluginLoader = loader - }) - err := pm.loadPlugins(context.Background(), plugins.Core, "test/path") - require.NoError(t, err) + for _, tc := range tcs { + p := createPlugin(t, testPluginID, tc.class, true, true, func(plugin *plugins.Plugin) { + plugin.Info.Version = "1.0.0" + }) - assert.Equal(t, 0, pc.startCount) - assert.Equal(t, 0, pc.stopCount) - assert.False(t, pc.exited) - assert.False(t, pc.decommissioned) + fakes.NewFakePluginRegistry() - testPlugin, exists := pm.Plugin(context.Background(), testPluginID) - assert.True(t, exists) - assert.Equal(t, p.ToDTO(), testPlugin) - assert.Len(t, pm.Plugins(context.Background()), 1) + reg := &fakes.FakePluginRegistry{ + Store: map[string]*plugins.Plugin{ + testPluginID: p, + }, + } - verifyNoPluginErrors(t, pm) + proc := fakes.NewFakeProcessManager() + pm := New(&plugins.Cfg{}, reg, []plugins.PluginSource{}, &fakes.FakeLoader{}, &fakes.FakePluginRepo{}, &fakes.FakePluginStorage{}, proc) + err := pm.Add(context.Background(), p.ID, "3.2.0", plugins.CompatOpts{}) + require.ErrorIs(t, err, plugins.ErrInstallCorePlugin) - err = pm.Add(context.Background(), testPluginID, "1.0.0", plugins.CompatOpts{}) - assert.Equal(t, plugins.ErrInstallCorePlugin, err) + require.Equal(t, 0, proc.Started[p.ID]) + require.Equal(t, 0, proc.Stopped[p.ID]) - t.Run("Can't uninstall core plugin", func(t *testing.T) { - err := pm.Remove(context.Background(), p.ID) - require.Equal(t, plugins.ErrUninstallCorePlugin, err) - }) - }) + regPlugin, exists := pm.pluginRegistry.Plugin(context.Background(), testPluginID) + require.True(t, exists) + require.Equal(t, p, regPlugin) + require.Len(t, pm.pluginRegistry.Plugins(context.Background()), 1) - t.Run("Can't update bundled plugin", func(t *testing.T) { - p, pc := createPlugin(t, testPluginID, plugins.Bundled, true, func(p *plugins.Plugin) { - p.Backend = true - }) + err = pm.Add(context.Background(), testPluginID, "", plugins.CompatOpts{}) + require.Equal(t, plugins.ErrInstallCorePlugin, err) - loader := &fakeLoader{ - mockedLoadedPlugins: []*plugins.Plugin{p}, + t.Run("Can't uninstall core plugin", func(t *testing.T) { + err = pm.Remove(context.Background(), p.ID) + require.Equal(t, plugins.ErrUninstallCorePlugin, err) + }) } - - pm := createManager(t, func(pm *PluginManager) { - pm.pluginLoader = loader - }) - err := pm.loadPlugins(context.Background(), plugins.Bundled, "test/path") - require.NoError(t, err) - - assert.Equal(t, 1, pc.startCount) - assert.Equal(t, 0, pc.stopCount) - assert.False(t, pc.exited) - assert.False(t, pc.decommissioned) - - testPlugin, exists := pm.Plugin(context.Background(), testPluginID) - assert.True(t, exists) - assert.Equal(t, p.ToDTO(), testPlugin) - assert.Len(t, pm.Plugins(context.Background()), 1) - - verifyNoPluginErrors(t, pm) - - err = pm.Add(context.Background(), testPluginID, "1.0.0", plugins.CompatOpts{}) - assert.Equal(t, plugins.ErrInstallCorePlugin, err) - - t.Run("Can't uninstall bundled plugin", func(t *testing.T) { - err := pm.Remove(context.Background(), p.ID) - require.Equal(t, plugins.ErrUninstallCorePlugin, err) - }) - }) -} - -func TestPluginManager_registeredPlugins(t *testing.T) { - t.Run("Decommissioned plugins are included in registeredPlugins", func(t *testing.T) { - decommissionedPlugin, _ := createPlugin(t, testPluginID, plugins.External, true, func(p *plugins.Plugin) { - p.Backend = true - err := p.Decommission() - require.NoError(t, err) - }) - - pm := New(&plugins.Cfg{}, &fakePluginRegistry{ - store: map[string]*plugins.Plugin{ - testPluginID: decommissionedPlugin, - "test-app": {}, - }, - }, []PluginSource{}, &fakeLoader{}, &fakePluginRepo{}, &fakeFsManager{}) - - require.True(t, decommissionedPlugin.IsDecommissioned()) - - rps := pm.registeredPlugins(context.Background()) - require.Equal(t, 2, len(rps)) - require.NotNil(t, rps[testPluginID]) - require.NotNil(t, rps["test-app"]) }) } -func TestPluginManager_lifecycle_managed(t *testing.T) { - newScenario(t, true, func(t *testing.T, ctx *managerScenarioCtx) { - t.Run("Managed plugin scenario", func(t *testing.T) { - t.Run("Should be able to register plugin", func(t *testing.T) { - err := ctx.manager.registerAndStart(context.Background(), ctx.plugin) - require.NoError(t, err) - require.NotNil(t, ctx.plugin) - require.Equal(t, testPluginID, ctx.plugin.ID) - require.Equal(t, 1, ctx.pluginClient.startCount) - testPlugin, exists := ctx.manager.Plugin(context.Background(), testPluginID) - require.True(t, exists) - require.NotNil(t, testPlugin) - - t.Run("Should not be able to register an already registered plugin", func(t *testing.T) { - err := ctx.manager.registerAndStart(context.Background(), ctx.plugin) - require.Error(t, err) - require.Equal(t, 1, ctx.pluginClient.startCount) - }) - - t.Run("When manager runs should start and stop plugin", func(t *testing.T) { - pCtx := context.Background() - cCtx, cancel := context.WithCancel(pCtx) - var wg sync.WaitGroup - wg.Add(1) - var runErr error - go func() { - runErr = ctx.manager.Run(cCtx) - wg.Done() - }() - time.Sleep(time.Millisecond) - cancel() - wg.Wait() - require.Equal(t, context.Canceled, runErr) - require.Equal(t, 1, ctx.pluginClient.startCount) - require.Equal(t, 1, ctx.pluginClient.stopCount) - }) - - t.Run("When manager runs should restart plugin process when killed", func(t *testing.T) { - ctx.pluginClient.stopCount = 0 - ctx.pluginClient.startCount = 0 - pCtx := context.Background() - cCtx, cancel := context.WithCancel(pCtx) - var wgRun sync.WaitGroup - wgRun.Add(1) - var runErr error - go func() { - runErr = ctx.manager.Run(cCtx) - wgRun.Done() - }() - - time.Sleep(time.Millisecond) - - var wgKill sync.WaitGroup - wgKill.Add(1) - go func() { - ctx.pluginClient.kill() - for { - if !ctx.plugin.Exited() { - break - } - } - cancel() - wgKill.Done() - }() - wgKill.Wait() - wgRun.Wait() - require.Equal(t, context.Canceled, runErr) - require.Equal(t, 1, ctx.pluginClient.stopCount) - require.Equal(t, 1, ctx.pluginClient.startCount) - }) - - t.Run("Unimplemented handlers", func(t *testing.T) { - t.Run("Collect metrics should return method not implemented error", func(t *testing.T) { - _, err = ctx.manager.CollectMetrics(context.Background(), &backend.CollectMetricsRequest{PluginContext: backend.PluginContext{PluginID: testPluginID}}) - require.Equal(t, backendplugin.ErrMethodNotImplemented, err) - }) - - t.Run("Check health should return method not implemented error", func(t *testing.T) { - _, err = ctx.manager.CheckHealth(context.Background(), &backend.CheckHealthRequest{PluginContext: backend.PluginContext{PluginID: testPluginID}}) - require.Equal(t, backendplugin.ErrMethodNotImplemented, err) - }) - }) - - t.Run("Implemented handlers", func(t *testing.T) { - t.Run("Collect metrics should return expected result", func(t *testing.T) { - ctx.pluginClient.CollectMetricsHandlerFunc = func(_ context.Context, _ *backend.CollectMetricsRequest) (*backend.CollectMetricsResult, error) { - return &backend.CollectMetricsResult{ - PrometheusMetrics: []byte("hello"), - }, nil - } - - res, err := ctx.manager.CollectMetrics(context.Background(), &backend.CollectMetricsRequest{PluginContext: backend.PluginContext{PluginID: testPluginID}}) - require.NoError(t, err) - require.NotNil(t, res) - require.Equal(t, "hello", string(res.PrometheusMetrics)) - }) - - t.Run("Check health should return expected result", func(t *testing.T) { - json := []byte(`{ - "key": "value" - }`) - ctx.pluginClient.CheckHealthHandlerFunc = func(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) { - return &backend.CheckHealthResult{ - Status: backend.HealthStatusOk, - Message: "All good", - JSONDetails: json, - }, nil - } - - res, err := ctx.manager.CheckHealth(context.Background(), &backend.CheckHealthRequest{PluginContext: backend.PluginContext{PluginID: testPluginID}}) - require.NoError(t, err) - require.NotNil(t, res) - require.Equal(t, backend.HealthStatusOk, res.Status) - require.Equal(t, "All good", res.Message) - require.Equal(t, json, res.JSONDetails) - }) - - t.Run("Call resource should return expected response", func(t *testing.T) { - ctx.pluginClient.CallResourceHandlerFunc = func(ctx context.Context, - req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error { - return sender.Send(&backend.CallResourceResponse{ - Status: http.StatusOK, - }) - } - - sender := &fakeSender{} - err = ctx.manager.CallResource(context.Background(), &backend.CallResourceRequest{PluginContext: backend.PluginContext{PluginID: testPluginID}}, sender) - require.NoError(t, err) - require.NotNil(t, sender.resp) - require.Equal(t, http.StatusOK, sender.resp.Status) - }) - }) - }) - }) - }) - - newScenario(t, true, func(t *testing.T, ctx *managerScenarioCtx) { - t.Run("Backend core plugin is registered but not started", func(t *testing.T) { - ctx.plugin.Class = plugins.Core - err := ctx.manager.registerAndStart(context.Background(), ctx.plugin) - require.NoError(t, err) - require.NotNil(t, ctx.plugin) - require.Equal(t, testPluginID, ctx.plugin.ID) - require.Equal(t, 0, ctx.pluginClient.startCount) - testPlugin, exists := ctx.manager.Plugin(context.Background(), testPluginID) - assert.True(t, exists) - require.NotNil(t, testPlugin) - }) - }) -} +func TestPluginManager_Run(t *testing.T) { + t.Run("Plugin sources are loaded in order", func(t *testing.T) { + loader := &fakes.FakeLoader{} + pm := New(&plugins.Cfg{}, fakes.NewFakePluginRegistry(), []plugins.PluginSource{ + {Class: plugins.Bundled, Paths: []string{"path1"}}, + {Class: plugins.Core, Paths: []string{"path2"}}, + {Class: plugins.External, Paths: []string{"path3"}}, + }, loader, &fakes.FakePluginRepo{}, &fakes.FakePluginStorage{}, &fakes.FakeProcessManager{}) -func TestPluginManager_lifecycle_unmanaged(t *testing.T) { - newScenario(t, false, func(t *testing.T, ctx *managerScenarioCtx) { - t.Run("Unmanaged plugin scenario", func(t *testing.T) { - t.Run("Should be able to register plugin", func(t *testing.T) { - err := ctx.manager.registerAndStart(context.Background(), ctx.plugin) - require.NoError(t, err) - p, exists := ctx.manager.Plugin(context.Background(), testPluginID) - require.True(t, exists) - require.NotNil(t, p) - require.False(t, ctx.pluginClient.managed) - - t.Run("When manager runs should not start plugin", func(t *testing.T) { - pCtx := context.Background() - cCtx, cancel := context.WithCancel(pCtx) - var wg sync.WaitGroup - wg.Add(1) - var runErr error - go func() { - runErr = ctx.manager.Run(cCtx) - wg.Done() - }() - go func() { - cancel() - }() - wg.Wait() - require.Equal(t, context.Canceled, runErr) - require.Equal(t, 0, ctx.pluginClient.startCount) - require.Equal(t, 1, ctx.pluginClient.stopCount) - require.True(t, ctx.plugin.Exited()) - }) - - t.Run("Should be not be able to start unmanaged plugin", func(t *testing.T) { - pCtx := context.Background() - cCtx, cancel := context.WithCancel(pCtx) - defer cancel() - err := ctx.manager.start(cCtx, ctx.plugin) - require.Nil(t, err) - require.Equal(t, 0, ctx.pluginClient.startCount) - require.True(t, ctx.plugin.Exited()) - }) - }) - }) + err := pm.Init(context.Background()) + require.NoError(t, err) + require.Equal(t, []string{"path1", "path2", "path3"}, loader.LoadedPaths) }) } -func createPlugin(t *testing.T, pluginID string, class plugins.Class, managed bool, - cbs ...func(*plugins.Plugin)) (*plugins.Plugin, *fakePluginClient) { +func createPlugin(t *testing.T, pluginID string, class plugins.Class, managed, backend bool, cbs ...func(*plugins.Plugin)) *plugins.Plugin { t.Helper() p := &plugins.Plugin{ Class: class, JSONData: plugins.JSONData{ - ID: pluginID, - Type: plugins.DataSource, - Info: plugins.Info{ - Version: "1.0.0", - }, + ID: pluginID, + Type: plugins.DataSource, + Backend: backend, }, } - - logger := log.NewNopLogger() - - p.SetLogger(logger) - - pc := &fakePluginClient{ - pluginID: pluginID, - logger: logger, - managed: managed, - } - - p.RegisterClient(pc) + p.SetLogger(log.NewNopLogger()) + p.RegisterClient(&fakes.FakePluginClient{ + ID: pluginID, + Managed: managed, + Log: p.Logger(), + }) for _, cb := range cbs { cb(p) } - return p, pc -} - -func createManager(t *testing.T, cbs ...func(*PluginManager)) *PluginManager { - t.Helper() - - cfg := &plugins.Cfg{ - DevMode: false, - } - - pm := New(cfg, newFakePluginRegistry(), nil, &fakeLoader{}, &fakePluginRepo{}, &fakeFsManager{}) - - for _, cb := range cbs { - cb(pm) - } - - return pm -} - -type managerScenarioCtx struct { - manager *PluginManager - plugin *plugins.Plugin - pluginClient *fakePluginClient -} - -func newScenario(t *testing.T, managed bool, fn func(t *testing.T, ctx *managerScenarioCtx)) { - t.Helper() - cfg := &plugins.Cfg{} - cfg.AWSAllowedAuthProviders = []string{"keys", "credentials"} - cfg.AWSAssumeRoleEnabled = true - cfg.Azure = &azsettings.AzureSettings{ - ManagedIdentityEnabled: true, - Cloud: "AzureCloud", - ManagedIdentityClientId: "client-id", - } - - loader := &fakeLoader{} - manager := New(cfg, registry.NewInMemory(), nil, loader, &fakePluginRepo{}, &fakeFsManager{}) - manager.pluginLoader = loader - ctx := &managerScenarioCtx{ - manager: manager, - } - - ctx.plugin, ctx.pluginClient = createPlugin(t, testPluginID, plugins.External, managed, func(p *plugins.Plugin) { - p.Backend = true - }) - - fn(t, ctx) -} - -func verifyNoPluginErrors(t *testing.T, pm *PluginManager) { - for _, plugin := range pm.Plugins(context.Background()) { - assert.Nil(t, plugin.SignatureError) - } -} - -type fakePluginRepo struct { - repo.Service - - downloadOptionsHandler func(_ context.Context, _, _ string, _ repo.CompatOpts) (*repo.PluginDownloadOptions, error) - - downloadOptionsCount int - downloadCount int -} - -func (pr *fakePluginRepo) GetPluginArchive(_ context.Context, _, _ string, _ repo.CompatOpts) (*repo.PluginArchive, error) { - pr.downloadCount++ - return &repo.PluginArchive{}, nil -} - -// DownloadWithURL downloads the requested plugin from the specified URL. -func (pr *fakePluginRepo) GetPluginArchiveByURL(_ context.Context, _ string, _ repo.CompatOpts) (*repo.PluginArchive, error) { - pr.downloadCount++ - return &repo.PluginArchive{}, nil -} - -// GetDownloadOptions provides information for downloading the requested plugin. -func (pr *fakePluginRepo) GetPluginDownloadOptions(ctx context.Context, pluginID, version string, opts repo.CompatOpts) (*repo.PluginDownloadOptions, error) { - pr.downloadOptionsCount++ - if pr.downloadOptionsHandler != nil { - return pr.downloadOptionsHandler(ctx, pluginID, version, opts) - } - return &repo.PluginDownloadOptions{}, nil -} - -type fakeLoader struct { - mockedLoadedPlugins []*plugins.Plugin - - loadedPaths []string -} - -func (l *fakeLoader) Load(_ context.Context, _ plugins.Class, paths []string, _ map[string]struct{}) ([]*plugins.Plugin, error) { - l.loadedPaths = append(l.loadedPaths, paths...) - - return l.mockedLoadedPlugins, nil -} - -type fakePluginClient struct { - pluginID string - logger log.Logger - startCount int - stopCount int - managed bool - exited bool - decommissioned bool - backend.CollectMetricsHandlerFunc - backend.CheckHealthHandlerFunc - backend.QueryDataHandlerFunc - backend.CallResourceHandlerFunc - mutex sync.RWMutex - - backendplugin.Plugin -} - -func (pc *fakePluginClient) PluginID() string { - return pc.pluginID -} - -func (pc *fakePluginClient) Logger() log.Logger { - return pc.logger -} - -func (pc *fakePluginClient) Start(_ context.Context) error { - pc.mutex.Lock() - defer pc.mutex.Unlock() - pc.exited = false - pc.startCount++ - return nil -} - -func (pc *fakePluginClient) Stop(_ context.Context) error { - pc.mutex.Lock() - defer pc.mutex.Unlock() - pc.stopCount++ - pc.exited = true - return nil -} - -func (pc *fakePluginClient) IsManaged() bool { - return pc.managed -} - -func (pc *fakePluginClient) Exited() bool { - pc.mutex.RLock() - defer pc.mutex.RUnlock() - return pc.exited -} - -func (pc *fakePluginClient) Decommission() error { - pc.mutex.Lock() - defer pc.mutex.Unlock() - - pc.decommissioned = true - - return nil -} - -func (pc *fakePluginClient) IsDecommissioned() bool { - pc.mutex.RLock() - defer pc.mutex.RUnlock() - return pc.decommissioned -} - -func (pc *fakePluginClient) kill() { - pc.mutex.Lock() - defer pc.mutex.Unlock() - pc.exited = true -} - -func (pc *fakePluginClient) CollectMetrics(ctx context.Context, req *backend.CollectMetricsRequest) (*backend.CollectMetricsResult, error) { - if pc.CollectMetricsHandlerFunc != nil { - return pc.CollectMetricsHandlerFunc(ctx, req) - } - - return nil, backendplugin.ErrMethodNotImplemented -} - -func (pc *fakePluginClient) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) { - if pc.CheckHealthHandlerFunc != nil { - return pc.CheckHealthHandlerFunc(ctx, req) - } - - return nil, backendplugin.ErrMethodNotImplemented -} - -func (pc *fakePluginClient) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) { - if pc.QueryDataHandlerFunc != nil { - return pc.QueryDataHandlerFunc(ctx, req) - } - - return nil, backendplugin.ErrMethodNotImplemented -} - -func (pc *fakePluginClient) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error { - if pc.CallResourceHandlerFunc != nil { - return pc.CallResourceHandlerFunc(ctx, req, sender) - } - - return backendplugin.ErrMethodNotImplemented -} - -func (pc *fakePluginClient) SubscribeStream(_ context.Context, _ *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) { - return nil, backendplugin.ErrMethodNotImplemented -} - -func (pc *fakePluginClient) PublishStream(_ context.Context, _ *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) { - return nil, backendplugin.ErrMethodNotImplemented -} - -func (pc *fakePluginClient) RunStream(_ context.Context, _ *backend.RunStreamRequest, _ *backend.StreamSender) error { - return backendplugin.ErrMethodNotImplemented -} - -type fakeSender struct { - resp *backend.CallResourceResponse -} - -func (s *fakeSender) Send(crr *backend.CallResourceResponse) error { - s.resp = crr - - return nil -} - -type fakePluginRegistry struct { - store map[string]*plugins.Plugin -} - -func newFakePluginRegistry() *fakePluginRegistry { - return &fakePluginRegistry{ - store: make(map[string]*plugins.Plugin), - } -} - -func (f *fakePluginRegistry) Plugin(_ context.Context, id string) (*plugins.Plugin, bool) { - p, exists := f.store[id] - return p, exists -} - -func (f *fakePluginRegistry) Plugins(_ context.Context) []*plugins.Plugin { - var res []*plugins.Plugin - - for _, p := range f.store { - res = append(res, p) - } - - return res -} - -func (f *fakePluginRegistry) Add(_ context.Context, p *plugins.Plugin) error { - f.store[p.ID] = p - return nil -} - -func (f *fakePluginRegistry) Remove(_ context.Context, id string) error { - delete(f.store, id) - return nil -} - -type fakeFsManager struct { - storage.Manager - - added int - removed int -} - -func (fsm *fakeFsManager) Add(_ context.Context, _ string, _ *zip.ReadCloser) (*storage.ExtractedPluginArchive, error) { - fsm.added++ - return &storage.ExtractedPluginArchive{}, nil -} - -func (fsm *fakeFsManager) Remove(_ context.Context, _ string) error { - fsm.removed++ - return nil + return p } diff --git a/pkg/plugins/manager/process/ifaces.go b/pkg/plugins/manager/process/ifaces.go new file mode 100644 index 00000000000..0c8a9d46f96 --- /dev/null +++ b/pkg/plugins/manager/process/ifaces.go @@ -0,0 +1,10 @@ +package process + +import "context" + +type Service interface { + // Start executes a backend plugin process. + Start(ctx context.Context, pluginID string) error + // Stop terminates a backend plugin process. + Stop(ctx context.Context, pluginID string) error +} diff --git a/pkg/plugins/manager/process/process.go b/pkg/plugins/manager/process/process.go new file mode 100644 index 00000000000..6ba85dbdb5f --- /dev/null +++ b/pkg/plugins/manager/process/process.go @@ -0,0 +1,145 @@ +package process + +import ( + "context" + "errors" + "sync" + "time" + + "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/plugins" + "github.com/grafana/grafana/pkg/plugins/backendplugin" + "github.com/grafana/grafana/pkg/plugins/manager/registry" +) + +var _ Service = (*Manager)(nil) + +type Manager struct { + pluginRegistry registry.Service + + mu sync.Mutex + log log.Logger +} + +func ProvideService(pluginRegistry registry.Service) *Manager { + return NewManager(pluginRegistry) +} + +func NewManager(pluginRegistry registry.Service) *Manager { + return &Manager{ + pluginRegistry: pluginRegistry, + log: log.New("plugin.process.manager"), + } +} + +func (m *Manager) Run(ctx context.Context) error { + <-ctx.Done() + m.shutdown(ctx) + return ctx.Err() +} + +func (m *Manager) Start(ctx context.Context, pluginID string) error { + p, exists := m.pluginRegistry.Plugin(ctx, pluginID) + if !exists { + return backendplugin.ErrPluginNotRegistered + } + + if !p.IsManaged() || !p.Backend || p.SignatureError != nil { + return nil + } + + if p.IsCorePlugin() { + return nil + } + + m.log.Info("Plugin registered", "pluginID", p.ID) + m.mu.Lock() + if err := startPluginAndRestartKilledProcesses(ctx, p); err != nil { + return err + } + + p.Logger().Debug("Successfully started backend plugin process") + m.mu.Unlock() + return nil +} + +func (m *Manager) Stop(ctx context.Context, pluginID string) error { + p, exists := m.pluginRegistry.Plugin(ctx, pluginID) + if !exists { + return backendplugin.ErrPluginNotRegistered + } + m.log.Debug("Stopping plugin process", "pluginID", p.ID) + m.mu.Lock() + defer m.mu.Unlock() + + if err := p.Decommission(); err != nil { + return err + } + + if err := p.Stop(ctx); err != nil { + return err + } + + return nil +} + +// shutdown stops all backend plugin processes +func (m *Manager) shutdown(ctx context.Context) { + var wg sync.WaitGroup + for _, p := range m.pluginRegistry.Plugins(ctx) { + wg.Add(1) + go func(p backendplugin.Plugin, ctx context.Context) { + defer wg.Done() + p.Logger().Debug("Stopping plugin") + if err := p.Stop(ctx); err != nil { + p.Logger().Error("Failed to stop plugin", "error", err) + } + p.Logger().Debug("Plugin stopped") + }(p, ctx) + } + wg.Wait() +} + +func startPluginAndRestartKilledProcesses(ctx context.Context, p *plugins.Plugin) error { + if err := p.Start(ctx); err != nil { + return err + } + + go func(ctx context.Context, p *plugins.Plugin) { + if err := restartKilledProcess(ctx, p); err != nil { + p.Logger().Error("Attempt to restart killed plugin process failed", "error", err) + } + }(ctx, p) + + return nil +} + +func restartKilledProcess(ctx context.Context, p *plugins.Plugin) error { + ticker := time.NewTicker(time.Second * 1) + + for { + select { + case <-ctx.Done(): + if err := ctx.Err(); err != nil && !errors.Is(err, context.Canceled) { + return err + } + return nil + case <-ticker.C: + if p.IsDecommissioned() { + p.Logger().Debug("Plugin decommissioned") + return nil + } + + if !p.Exited() { + continue + } + + p.Logger().Debug("Restarting plugin") + if err := p.Start(ctx); err != nil { + p.Logger().Error("Failed to restart plugin", "error", err) + continue + } + p.Logger().Debug("Plugin restarted") + } + } +} diff --git a/pkg/plugins/manager/process/process_test.go b/pkg/plugins/manager/process/process_test.go new file mode 100644 index 00000000000..805aa14668e --- /dev/null +++ b/pkg/plugins/manager/process/process_test.go @@ -0,0 +1,300 @@ +package process + +import ( + "context" + "sync" + "testing" + + "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/plugins" + "github.com/grafana/grafana/pkg/plugins/backendplugin" + "github.com/stretchr/testify/require" +) + +func TestProcessManager_Start(t *testing.T) { + t.Run("Plugin not found in registry", func(t *testing.T) { + m := NewManager(newFakePluginRegistry(map[string]*plugins.Plugin{})) + err := m.Start(context.Background(), "non-existing-datasource") + require.ErrorIs(t, err, backendplugin.ErrPluginNotRegistered) + }) + + t.Run("Cannot start a core plugin", func(t *testing.T) { + pluginID := "core-datasource" + + bp := newFakeBackendPlugin(true) + p := createPlugin(t, bp, func(plugin *plugins.Plugin) { + plugin.ID = pluginID + plugin.Class = plugins.Core + plugin.Backend = true + }) + + m := NewManager(newFakePluginRegistry(map[string]*plugins.Plugin{ + pluginID: p, + })) + err := m.Start(context.Background(), pluginID) + require.NoError(t, err) + require.True(t, p.Exited()) + require.Zero(t, bp.startCount) + }) + + t.Run("Plugin state determines process start", func(t *testing.T) { + tcs := []struct { + name string + managed bool + backend bool + signatureError *plugins.SignatureError + expectedStartCount int + }{ + { + name: "Unmanaged backend plugin will not be started", + managed: false, + backend: true, + expectedStartCount: 0, + }, + { + name: "Managed non-backend plugin will not be started", + managed: false, + backend: true, + expectedStartCount: 0, + }, + { + name: "Managed backend plugin with signature error will not be started", + managed: true, + backend: true, + signatureError: &plugins.SignatureError{ + SignatureStatus: plugins.SignatureUnsigned, + }, + expectedStartCount: 0, + }, + { + name: "Managed backend plugin with no signature errors will be started", + managed: true, + backend: true, + expectedStartCount: 1, + }, + } + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + bp := newFakeBackendPlugin(tc.managed) + p := createPlugin(t, bp, func(plugin *plugins.Plugin) { + plugin.Backend = tc.backend + plugin.SignatureError = tc.signatureError + }) + + m := NewManager(newFakePluginRegistry(map[string]*plugins.Plugin{ + p.ID: p, + })) + + err := m.Start(context.Background(), p.ID) + require.NoError(t, err) + require.Equal(t, tc.expectedStartCount, bp.startCount) + + if tc.expectedStartCount > 0 { + require.True(t, !p.Exited()) + } else { + require.True(t, p.Exited()) + } + }) + } + }) +} + +func TestProcessManager_Stop(t *testing.T) { + t.Run("Plugin not found in registry", func(t *testing.T) { + m := NewManager(newFakePluginRegistry(map[string]*plugins.Plugin{})) + err := m.Stop(context.Background(), "non-existing-datasource") + require.ErrorIs(t, err, backendplugin.ErrPluginNotRegistered) + }) + + t.Run("Can stop a running plugin", func(t *testing.T) { + pluginID := "test-datasource" + + bp := newFakeBackendPlugin(true) + p := createPlugin(t, bp, func(plugin *plugins.Plugin) { + plugin.ID = pluginID + plugin.Backend = true + }) + + m := NewManager(newFakePluginRegistry(map[string]*plugins.Plugin{ + pluginID: p, + })) + err := m.Stop(context.Background(), pluginID) + require.NoError(t, err) + + require.True(t, p.IsDecommissioned()) + require.True(t, bp.decommissioned) + require.True(t, p.Exited()) + require.Equal(t, 1, bp.stopCount) + }) +} + +func TestProcessManager_ManagedBackendPluginLifecycle(t *testing.T) { + bp := newFakeBackendPlugin(true) + p := createPlugin(t, bp, func(plugin *plugins.Plugin) { + plugin.Backend = true + }) + + m := NewManager(newFakePluginRegistry(map[string]*plugins.Plugin{ + p.ID: p, + })) + + err := m.Start(context.Background(), p.ID) + require.NoError(t, err) + require.Equal(t, 1, bp.startCount) + + t.Run("When plugin process is killed, the process is restarted", func(t *testing.T) { + pCtx := context.Background() + cCtx, cancel := context.WithCancel(pCtx) + var wgRun sync.WaitGroup + wgRun.Add(1) + var runErr error + go func() { + runErr = m.Run(cCtx) + wgRun.Done() + }() + + var wgKill sync.WaitGroup + wgKill.Add(1) + go func() { + bp.kill() // manually kill process + for { + if !bp.Exited() { + break + } + } + wgKill.Done() + }() + wgKill.Wait() + require.True(t, !p.Exited()) + require.Equal(t, 2, bp.startCount) + require.Equal(t, 0, bp.stopCount) + + t.Run("When context is cancelled the plugin is stopped", func(t *testing.T) { + cancel() + wgRun.Wait() + require.ErrorIs(t, runErr, context.Canceled) + require.True(t, p.Exited()) + require.Equal(t, 2, bp.startCount) + require.Equal(t, 1, bp.stopCount) + }) + }) +} + +type fakePluginRegistry struct { + store map[string]*plugins.Plugin +} + +func newFakePluginRegistry(m map[string]*plugins.Plugin) *fakePluginRegistry { + return &fakePluginRegistry{ + store: m, + } +} + +func (f *fakePluginRegistry) Plugin(_ context.Context, id string) (*plugins.Plugin, bool) { + p, exists := f.store[id] + return p, exists +} + +func (f *fakePluginRegistry) Plugins(_ context.Context) []*plugins.Plugin { + var res []*plugins.Plugin + + for _, p := range f.store { + res = append(res, p) + } + return res +} + +func (f *fakePluginRegistry) Add(_ context.Context, p *plugins.Plugin) error { + f.store[p.ID] = p + return nil +} + +func (f *fakePluginRegistry) Remove(_ context.Context, id string) error { + delete(f.store, id) + return nil +} + +type fakeBackendPlugin struct { + managed bool + + startCount int + stopCount int + decommissioned bool + running bool + + mutex sync.RWMutex + backendplugin.Plugin +} + +func newFakeBackendPlugin(managed bool) *fakeBackendPlugin { + return &fakeBackendPlugin{ + managed: managed, + } +} + +func (p *fakeBackendPlugin) Start(_ context.Context) error { + p.mutex.Lock() + defer p.mutex.Unlock() + p.running = true + p.startCount++ + return nil +} + +func (p *fakeBackendPlugin) Stop(_ context.Context) error { + p.mutex.Lock() + defer p.mutex.Unlock() + p.running = false + p.stopCount++ + return nil +} + +func (p *fakeBackendPlugin) Decommission() error { + p.mutex.Lock() + defer p.mutex.Unlock() + p.decommissioned = true + return nil +} + +func (p *fakeBackendPlugin) IsDecommissioned() bool { + p.mutex.RLock() + defer p.mutex.RUnlock() + return p.decommissioned +} + +func (p *fakeBackendPlugin) IsManaged() bool { + p.mutex.RLock() + defer p.mutex.RUnlock() + return p.managed +} + +func (p *fakeBackendPlugin) Exited() bool { + p.mutex.RLock() + defer p.mutex.RUnlock() + return !p.running +} + +func (p *fakeBackendPlugin) kill() { + p.mutex.Lock() + defer p.mutex.Unlock() + p.running = false +} + +func createPlugin(t *testing.T, bp backendplugin.Plugin, cbs ...func(p *plugins.Plugin)) *plugins.Plugin { + t.Helper() + + p := &plugins.Plugin{ + Class: plugins.External, + JSONData: plugins.JSONData{ + ID: "test-datasource", + }, + } + + p.SetLogger(log.NewNopLogger()) + p.RegisterClient(bp) + + for _, cb := range cbs { + cb(p) + } + + return p +} diff --git a/pkg/plugins/manager/store.go b/pkg/plugins/manager/store.go deleted file mode 100644 index 304409c6f02..00000000000 --- a/pkg/plugins/manager/store.go +++ /dev/null @@ -1,176 +0,0 @@ -package manager - -import ( - "context" - "fmt" - - "github.com/grafana/grafana/pkg/plugins" - "github.com/grafana/grafana/pkg/plugins/repo" -) - -func (m *PluginManager) Plugin(ctx context.Context, pluginID string) (plugins.PluginDTO, bool) { - p, exists := m.plugin(ctx, pluginID) - if !exists { - return plugins.PluginDTO{}, false - } - - return p.ToDTO(), true -} - -func (m *PluginManager) Plugins(ctx context.Context, pluginTypes ...plugins.Type) []plugins.PluginDTO { - // if no types passed, assume all - if len(pluginTypes) == 0 { - pluginTypes = plugins.PluginTypes - } - - var requestedTypes = make(map[plugins.Type]struct{}) - for _, pt := range pluginTypes { - requestedTypes[pt] = struct{}{} - } - - pluginsList := make([]plugins.PluginDTO, 0) - for _, p := range m.availablePlugins(ctx) { - if _, exists := requestedTypes[p.Type]; exists { - pluginsList = append(pluginsList, p.ToDTO()) - } - } - return pluginsList -} - -// plugin finds a plugin with `pluginID` from the registry that is not decommissioned -func (m *PluginManager) plugin(ctx context.Context, pluginID string) (*plugins.Plugin, bool) { - p, exists := m.pluginRegistry.Plugin(ctx, pluginID) - if !exists || p.IsDecommissioned() { - return nil, false - } - - return p, true -} - -// availablePlugins returns all non-decommissioned plugins from the registry -func (m *PluginManager) availablePlugins(ctx context.Context) []*plugins.Plugin { - var res []*plugins.Plugin - for _, p := range m.pluginRegistry.Plugins(ctx) { - if !p.IsDecommissioned() { - res = append(res, p) - } - } - return res -} - -// registeredPlugins returns all registered plugins from the registry -func (m *PluginManager) registeredPlugins(ctx context.Context) map[string]struct{} { - pluginsByID := make(map[string]struct{}) - for _, p := range m.pluginRegistry.Plugins(ctx) { - pluginsByID[p.ID] = struct{}{} - } - - return pluginsByID -} - -func (m *PluginManager) Add(ctx context.Context, pluginID, version string, opts plugins.CompatOpts) error { - compatOpts := repo.NewCompatOpts(opts.GrafanaVersion, opts.OS, opts.Arch) - - var pluginArchive *repo.PluginArchive - if plugin, exists := m.plugin(ctx, pluginID); exists { - if !plugin.IsExternalPlugin() { - return plugins.ErrInstallCorePlugin - } - - if plugin.Info.Version == version { - return plugins.DuplicateError{ - PluginID: plugin.ID, - ExistingPluginDir: plugin.PluginDir, - } - } - - // get plugin update information to confirm if target update is possible - dlOpts, err := m.pluginRepo.GetPluginDownloadOptions(ctx, pluginID, version, compatOpts) - if err != nil { - return err - } - - // if existing plugin version is the same as the target update version - if dlOpts.Version == plugin.Info.Version { - return plugins.DuplicateError{ - PluginID: plugin.ID, - ExistingPluginDir: plugin.PluginDir, - } - } - - if dlOpts.PluginZipURL == "" && dlOpts.Version == "" { - return fmt.Errorf("could not determine update options for %s", pluginID) - } - - // remove existing installation of plugin - err = m.Remove(ctx, plugin.ID) - if err != nil { - return err - } - - if dlOpts.PluginZipURL != "" { - pluginArchive, err = m.pluginRepo.GetPluginArchiveByURL(ctx, dlOpts.PluginZipURL, compatOpts) - if err != nil { - return err - } - } else { - pluginArchive, err = m.pluginRepo.GetPluginArchive(ctx, pluginID, dlOpts.Version, compatOpts) - if err != nil { - return err - } - } - } else { - var err error - pluginArchive, err = m.pluginRepo.GetPluginArchive(ctx, pluginID, version, compatOpts) - if err != nil { - return err - } - } - - extractedArchive, err := m.pluginStorage.Add(ctx, pluginID, pluginArchive.File) - if err != nil { - return err - } - - // download dependency plugins - pathsToScan := []string{extractedArchive.Path} - for _, dep := range extractedArchive.Dependencies { - m.log.Info("Fetching %s dependencies...", dep.ID) - d, err := m.pluginRepo.GetPluginArchive(ctx, dep.ID, dep.Version, compatOpts) - if err != nil { - return fmt.Errorf("%v: %w", fmt.Sprintf("failed to download plugin %s from repository", dep.ID), err) - } - - depArchive, err := m.pluginStorage.Add(ctx, dep.ID, d.File) - if err != nil { - return err - } - - pathsToScan = append(pathsToScan, depArchive.Path) - } - - err = m.loadPlugins(context.Background(), plugins.External, pathsToScan...) - if err != nil { - m.log.Error("Could not load plugins", "paths", pathsToScan, "err", err) - return err - } - - return nil -} - -func (m *PluginManager) Remove(ctx context.Context, pluginID string) error { - plugin, exists := m.plugin(ctx, pluginID) - if !exists { - return plugins.ErrPluginNotInstalled - } - - if !plugin.IsExternalPlugin() { - return plugins.ErrUninstallCorePlugin - } - - if err := m.unregisterAndStop(ctx, plugin); err != nil { - return err - } - - return m.pluginStorage.Remove(ctx, plugin.ID) -} diff --git a/pkg/plugins/manager/store/store.go b/pkg/plugins/manager/store/store.go new file mode 100644 index 00000000000..c80696bea7d --- /dev/null +++ b/pkg/plugins/manager/store/store.go @@ -0,0 +1,111 @@ +package store + +import ( + "context" + "sort" + + "github.com/grafana/grafana/pkg/plugins" + "github.com/grafana/grafana/pkg/plugins/manager/registry" +) + +var _ plugins.Store = (*Service)(nil) +var _ plugins.RendererManager = (*Service)(nil) +var _ plugins.SecretsPluginManager = (*Service)(nil) + +type Service struct { + pluginRegistry registry.Service +} + +func ProvideService(pluginRegistry registry.Service) *Service { + return &Service{ + pluginRegistry: pluginRegistry, + } +} + +func (s *Service) Plugin(ctx context.Context, pluginID string) (plugins.PluginDTO, bool) { + p, exists := s.plugin(ctx, pluginID) + if !exists { + return plugins.PluginDTO{}, false + } + + return p.ToDTO(), true +} + +func (s *Service) Plugins(ctx context.Context, pluginTypes ...plugins.Type) []plugins.PluginDTO { + // if no types passed, assume all + if len(pluginTypes) == 0 { + pluginTypes = plugins.PluginTypes + } + + var requestedTypes = make(map[plugins.Type]struct{}) + for _, pt := range pluginTypes { + requestedTypes[pt] = struct{}{} + } + + pluginsList := make([]plugins.PluginDTO, 0) + for _, p := range s.availablePlugins(ctx) { + if _, exists := requestedTypes[p.Type]; exists { + pluginsList = append(pluginsList, p.ToDTO()) + } + } + return pluginsList +} + +func (s *Service) Renderer() *plugins.Plugin { + for _, p := range s.availablePlugins(context.TODO()) { + if p.IsRenderer() { + return p + } + } + + return nil +} + +func (s *Service) SecretsManager() *plugins.Plugin { + for _, p := range s.availablePlugins(context.TODO()) { + if p.IsSecretsManager() { + return p + } + } + + return nil +} + +// plugin finds a plugin with `pluginID` from the registry that is not decommissioned +func (s *Service) plugin(ctx context.Context, pluginID string) (*plugins.Plugin, bool) { + p, exists := s.pluginRegistry.Plugin(ctx, pluginID) + if !exists { + return nil, false + } + + if p.IsDecommissioned() { + return nil, false + } + + return p, true +} + +// availablePlugins returns all non-decommissioned plugins from the registry sorted by alphabetic order on `plugin.ID` +func (s *Service) availablePlugins(ctx context.Context) []*plugins.Plugin { + var res []*plugins.Plugin + for _, p := range s.pluginRegistry.Plugins(ctx) { + if !p.IsDecommissioned() { + res = append(res, p) + } + } + sort.SliceStable(res, func(i, j int) bool { + return res[i].ID < res[j].ID + }) + return res +} + +func (s *Service) Routes() []*plugins.StaticRoute { + staticRoutes := make([]*plugins.StaticRoute, 0) + + for _, p := range s.availablePlugins(context.TODO()) { + if p.StaticRoute() != nil { + staticRoutes = append(staticRoutes, p.StaticRoute()) + } + } + return staticRoutes +} diff --git a/pkg/plugins/manager/store/store_test.go b/pkg/plugins/manager/store/store_test.go new file mode 100644 index 00000000000..f11f102128e --- /dev/null +++ b/pkg/plugins/manager/store/store_test.go @@ -0,0 +1,207 @@ +package store + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/grafana/grafana/pkg/plugins" + "github.com/grafana/grafana/pkg/plugins/backendplugin" +) + +func TestStore_Plugin(t *testing.T) { + t.Run("Plugin returns all non-decommissioned plugins", func(t *testing.T) { + p1 := &plugins.Plugin{JSONData: plugins.JSONData{ID: "test-datasource"}} + p1.RegisterClient(&DecommissionedPlugin{}) + p2 := &plugins.Plugin{JSONData: plugins.JSONData{ID: "test-panel"}} + + ps := ProvideService( + newFakePluginRegistry(map[string]*plugins.Plugin{ + p1.ID: p1, + p2.ID: p2, + }), + ) + + p, exists := ps.Plugin(context.Background(), p1.ID) + require.False(t, exists) + require.Equal(t, plugins.PluginDTO{}, p) + + p, exists = ps.Plugin(context.Background(), p2.ID) + require.True(t, exists) + require.Equal(t, p, p2.ToDTO()) + }) +} + +func TestStore_Plugins(t *testing.T) { + t.Run("Plugin returns all non-decommissioned plugins by type", func(t *testing.T) { + p1 := &plugins.Plugin{JSONData: plugins.JSONData{ID: "a-test-datasource", Type: plugins.DataSource}} + p2 := &plugins.Plugin{JSONData: plugins.JSONData{ID: "b-test-panel", Type: plugins.Panel}} + p3 := &plugins.Plugin{JSONData: plugins.JSONData{ID: "c-test-panel", Type: plugins.Panel}} + p4 := &plugins.Plugin{JSONData: plugins.JSONData{ID: "d-test-app", Type: plugins.App}} + p5 := &plugins.Plugin{JSONData: plugins.JSONData{ID: "e-test-panel", Type: plugins.Panel}} + p5.RegisterClient(&DecommissionedPlugin{}) + + ps := ProvideService( + newFakePluginRegistry(map[string]*plugins.Plugin{ + p1.ID: p1, + p2.ID: p2, + p3.ID: p3, + p4.ID: p4, + p5.ID: p5, + }), + ) + + pss := ps.Plugins(context.Background()) + require.Equal(t, pss, []plugins.PluginDTO{p1.ToDTO(), p2.ToDTO(), p3.ToDTO(), p4.ToDTO()}) + + pss = ps.Plugins(context.Background(), plugins.App) + require.Equal(t, pss, []plugins.PluginDTO{p4.ToDTO()}) + + pss = ps.Plugins(context.Background(), plugins.Panel) + require.Equal(t, pss, []plugins.PluginDTO{p2.ToDTO(), p3.ToDTO()}) + + pss = ps.Plugins(context.Background(), plugins.DataSource) + require.Equal(t, pss, []plugins.PluginDTO{p1.ToDTO()}) + + pss = ps.Plugins(context.Background(), plugins.DataSource, plugins.App, plugins.Panel) + require.Equal(t, pss, []plugins.PluginDTO{p1.ToDTO(), p2.ToDTO(), p3.ToDTO(), p4.ToDTO()}) + }) +} + +func TestStore_Renderer(t *testing.T) { + t.Run("Renderer returns a single (non-decommissioned) renderer plugin", func(t *testing.T) { + p1 := &plugins.Plugin{JSONData: plugins.JSONData{ID: "test-renderer", Type: plugins.Renderer}} + p2 := &plugins.Plugin{JSONData: plugins.JSONData{ID: "test-panel", Type: plugins.Panel}} + p3 := &plugins.Plugin{JSONData: plugins.JSONData{ID: "test-app", Type: plugins.App}} + p4 := &plugins.Plugin{JSONData: plugins.JSONData{ID: "test-datasource", Type: plugins.DataSource}} + p4.RegisterClient(&DecommissionedPlugin{}) + + ps := ProvideService( + newFakePluginRegistry(map[string]*plugins.Plugin{ + p1.ID: p1, + p2.ID: p2, + p3.ID: p3, + p4.ID: p4, + }), + ) + + r := ps.Renderer() + require.Equal(t, p1, r) + }) +} + +func TestStore_SecretsManager(t *testing.T) { + t.Run("Renderer returns a single (non-decommissioned) secrets manager plugin", func(t *testing.T) { + p1 := &plugins.Plugin{JSONData: plugins.JSONData{ID: "test-renderer", Type: plugins.Renderer}} + p2 := &plugins.Plugin{JSONData: plugins.JSONData{ID: "test-panel", Type: plugins.Panel}} + p3 := &plugins.Plugin{JSONData: plugins.JSONData{ID: "test-secrets", Type: plugins.SecretsManager}} + p4 := &plugins.Plugin{JSONData: plugins.JSONData{ID: "test-datasource", Type: plugins.DataSource}} + + ps := ProvideService( + newFakePluginRegistry(map[string]*plugins.Plugin{ + p1.ID: p1, + p2.ID: p2, + p3.ID: p3, + p4.ID: p4, + }), + ) + + r := ps.SecretsManager() + require.Equal(t, p3, r) + }) +} + +func TestStore_Routes(t *testing.T) { + t.Run("Routes returns all static routes for non-decommissioned plugins", func(t *testing.T) { + p1 := &plugins.Plugin{JSONData: plugins.JSONData{ID: "a-test-renderer", Type: plugins.Renderer}, PluginDir: "/some/dir"} + p2 := &plugins.Plugin{JSONData: plugins.JSONData{ID: "b-test-panel", Type: plugins.Panel}, PluginDir: "/grafana/"} + p3 := &plugins.Plugin{JSONData: plugins.JSONData{ID: "c-test-secrets", Type: plugins.SecretsManager}, PluginDir: "./secrets", Class: plugins.Core} + p4 := &plugins.Plugin{JSONData: plugins.JSONData{ID: "d-test-datasource", Type: plugins.DataSource}, PluginDir: "../test"} + p5 := &plugins.Plugin{JSONData: plugins.JSONData{ID: "e-test-app", Type: plugins.App}} + p6 := &plugins.Plugin{JSONData: plugins.JSONData{ID: "f-test-app", Type: plugins.App}} + p6.RegisterClient(&DecommissionedPlugin{}) + + ps := ProvideService( + newFakePluginRegistry(map[string]*plugins.Plugin{ + p1.ID: p1, + p2.ID: p2, + p3.ID: p3, + p4.ID: p4, + p5.ID: p5, + p6.ID: p6, + }), + ) + + sr := func(p *plugins.Plugin) *plugins.StaticRoute { + return &plugins.StaticRoute{PluginID: p.ID, Directory: p.PluginDir} + } + + rs := ps.Routes() + require.Equal(t, []*plugins.StaticRoute{sr(p1), sr(p2), sr(p4), sr(p5)}, rs) + }) +} + +func TestStore_availablePlugins(t *testing.T) { + t.Run("Decommissioned plugins are excluded from availablePlugins", func(t *testing.T) { + p1 := &plugins.Plugin{JSONData: plugins.JSONData{ID: "test-datasource"}} + p1.RegisterClient(&DecommissionedPlugin{}) + p2 := &plugins.Plugin{JSONData: plugins.JSONData{ID: "test-app"}} + + ps := ProvideService( + newFakePluginRegistry(map[string]*plugins.Plugin{ + p1.ID: p1, + p2.ID: p2, + }), + ) + + aps := ps.availablePlugins(context.Background()) + require.Len(t, aps, 1) + require.Equal(t, p2, aps[0]) + }) +} + +type DecommissionedPlugin struct { + backendplugin.Plugin +} + +func (p *DecommissionedPlugin) Decommission() error { + return nil +} + +func (p *DecommissionedPlugin) IsDecommissioned() bool { + return true +} + +type fakePluginRegistry struct { + store map[string]*plugins.Plugin +} + +func newFakePluginRegistry(m map[string]*plugins.Plugin) *fakePluginRegistry { + return &fakePluginRegistry{ + store: m, + } +} + +func (f *fakePluginRegistry) Plugin(_ context.Context, id string) (*plugins.Plugin, bool) { + p, exists := f.store[id] + return p, exists +} + +func (f *fakePluginRegistry) Plugins(_ context.Context) []*plugins.Plugin { + var res []*plugins.Plugin + for _, p := range f.store { + res = append(res, p) + } + return res +} + +func (f *fakePluginRegistry) Add(_ context.Context, p *plugins.Plugin) error { + f.store[p.ID] = p + return nil +} + +func (f *fakePluginRegistry) Remove(_ context.Context, id string) error { + delete(f.store, id) + return nil +} diff --git a/pkg/plugins/plugins.go b/pkg/plugins/plugins.go index d04a11ceefb..ef70a7e03e8 100644 --- a/pkg/plugins/plugins.go +++ b/pkg/plugins/plugins.go @@ -206,7 +206,6 @@ func (p *Plugin) Start(ctx context.Context) error { if p.client == nil { return fmt.Errorf("could not start plugin %s as no plugin client exists", p.ID) } - return p.client.Start(ctx) } diff --git a/pkg/server/backgroundsvcs/background_services.go b/pkg/server/backgroundsvcs/background_services.go index f51f29f6620..515210493cd 100644 --- a/pkg/server/backgroundsvcs/background_services.go +++ b/pkg/server/backgroundsvcs/background_services.go @@ -8,7 +8,6 @@ import ( uss "github.com/grafana/grafana/pkg/infra/usagestats/service" "github.com/grafana/grafana/pkg/infra/usagestats/statscollector" "github.com/grafana/grafana/pkg/models" - "github.com/grafana/grafana/pkg/plugins/manager" "github.com/grafana/grafana/pkg/registry" "github.com/grafana/grafana/pkg/services/alerting" "github.com/grafana/grafana/pkg/services/cleanup" @@ -34,7 +33,7 @@ import ( func ProvideBackgroundServiceRegistry( httpServer *api.HTTPServer, ng *ngalert.AlertNG, cleanup *cleanup.CleanUpService, live *live.GrafanaLive, - pushGateway *pushhttp.Gateway, notifications *notifications.NotificationService, pm *manager.PluginManager, + pushGateway *pushhttp.Gateway, notifications *notifications.NotificationService, rendering *rendering.RenderingService, tokenService models.UserTokenBackgroundService, tracing tracing.Tracer, provisioning *provisioning.ProvisioningServiceImpl, alerting *alerting.AlertEngine, usageStats *uss.UsageStats, statsCollector *statscollector.Service, grafanaUpdateChecker *updatechecker.GrafanaService, @@ -58,7 +57,6 @@ func ProvideBackgroundServiceRegistry( tokenService, provisioning, alerting, - pm, grafanaUpdateChecker, pluginsUpdateChecker, metrics, diff --git a/pkg/server/wire.go b/pkg/server/wire.go index 8cde320c7fd..aff0949c27a 100644 --- a/pkg/server/wire.go +++ b/pkg/server/wire.go @@ -35,8 +35,12 @@ import ( "github.com/grafana/grafana/pkg/plugins" "github.com/grafana/grafana/pkg/plugins/backendplugin/coreplugin" "github.com/grafana/grafana/pkg/plugins/manager" + "github.com/grafana/grafana/pkg/plugins/manager/client" + pluginDashboards "github.com/grafana/grafana/pkg/plugins/manager/dashboards" "github.com/grafana/grafana/pkg/plugins/manager/loader" + processManager "github.com/grafana/grafana/pkg/plugins/manager/process" "github.com/grafana/grafana/pkg/plugins/manager/registry" + managerStore "github.com/grafana/grafana/pkg/plugins/manager/store" "github.com/grafana/grafana/pkg/plugins/plugincontext" "github.com/grafana/grafana/pkg/plugins/repo" "github.com/grafana/grafana/pkg/services/accesscontrol" @@ -171,12 +175,17 @@ var wireBasicSet = wire.NewSet( wire.Bind(new(repo.Service), new(*repo.Manager)), manager.ProvideService, wire.Bind(new(plugins.Manager), new(*manager.PluginManager)), - wire.Bind(new(plugins.Client), new(*manager.PluginManager)), - wire.Bind(new(plugins.Store), new(*manager.PluginManager)), - wire.Bind(new(plugins.DashboardFileStore), new(*manager.PluginManager)), - wire.Bind(new(plugins.StaticRouteResolver), new(*manager.PluginManager)), - wire.Bind(new(plugins.RendererManager), new(*manager.PluginManager)), - wire.Bind(new(plugins.SecretsPluginManager), new(*manager.PluginManager)), + client.ProvideService, + wire.Bind(new(plugins.Client), new(*client.Service)), + managerStore.ProvideService, + wire.Bind(new(plugins.Store), new(*managerStore.Service)), + wire.Bind(new(plugins.RendererManager), new(*managerStore.Service)), + wire.Bind(new(plugins.SecretsPluginManager), new(*managerStore.Service)), + wire.Bind(new(plugins.StaticRouteResolver), new(*managerStore.Service)), + pluginDashboards.ProvideFileStoreManager, + wire.Bind(new(pluginDashboards.FileStore), new(*pluginDashboards.FileStoreManager)), + processManager.ProvideService, + wire.Bind(new(processManager.Service), new(*processManager.Manager)), coreplugin.ProvideCoreRegistry, loader.ProvideService, wire.Bind(new(loader.Service), new(*loader.Loader)), diff --git a/pkg/services/plugindashboards/service/service.go b/pkg/services/plugindashboards/service/service.go index 620910887a9..a2934ce6ea9 100644 --- a/pkg/services/plugindashboards/service/service.go +++ b/pkg/services/plugindashboards/service/service.go @@ -7,12 +7,12 @@ import ( "github.com/grafana/grafana/pkg/components/simplejson" "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/models" - "github.com/grafana/grafana/pkg/plugins" + pluginDashboardsManager "github.com/grafana/grafana/pkg/plugins/manager/dashboards" "github.com/grafana/grafana/pkg/services/dashboards" "github.com/grafana/grafana/pkg/services/plugindashboards" ) -func ProvideService(pluginDashboardStore plugins.DashboardFileStore, dashboardPluginService dashboards.PluginService) *Service { +func ProvideService(pluginDashboardStore pluginDashboardsManager.FileStore, dashboardPluginService dashboards.PluginService) *Service { return &Service{ pluginDashboardStore: pluginDashboardStore, dashboardPluginService: dashboardPluginService, @@ -21,7 +21,7 @@ func ProvideService(pluginDashboardStore plugins.DashboardFileStore, dashboardPl } type Service struct { - pluginDashboardStore plugins.DashboardFileStore + pluginDashboardStore pluginDashboardsManager.FileStore dashboardPluginService dashboards.PluginService logger log.Logger } @@ -31,7 +31,7 @@ func (s Service) ListPluginDashboards(ctx context.Context, req *plugindashboards return nil, fmt.Errorf("req cannot be nil") } - listArgs := &plugins.ListPluginDashboardFilesArgs{ + listArgs := &pluginDashboardsManager.ListPluginDashboardFilesArgs{ PluginID: req.PluginID, } listResp, err := s.pluginDashboardStore.ListPluginDashboardFiles(ctx, listArgs) @@ -106,7 +106,7 @@ func (s Service) LoadPluginDashboard(ctx context.Context, req *plugindashboards. return nil, fmt.Errorf("req cannot be nil") } - args := &plugins.GetPluginDashboardFileContentsArgs{ + args := &pluginDashboardsManager.GetPluginDashboardFileContentsArgs{ PluginID: req.PluginID, FileReference: req.Reference, } diff --git a/pkg/services/plugindashboards/service/service_test.go b/pkg/services/plugindashboards/service/service_test.go index e0f5404feb7..f46af1556f3 100644 --- a/pkg/services/plugindashboards/service/service_test.go +++ b/pkg/services/plugindashboards/service/service_test.go @@ -11,6 +11,7 @@ import ( "github.com/grafana/grafana/pkg/components/simplejson" "github.com/grafana/grafana/pkg/models" "github.com/grafana/grafana/pkg/plugins" + "github.com/grafana/grafana/pkg/plugins/manager/dashboards" "github.com/grafana/grafana/pkg/services/plugindashboards" "github.com/stretchr/testify/require" ) @@ -169,7 +170,7 @@ type pluginDashboardStoreMock struct { pluginDashboardFiles map[string]map[string][]byte } -func (m pluginDashboardStoreMock) ListPluginDashboardFiles(ctx context.Context, args *plugins.ListPluginDashboardFilesArgs) (*plugins.ListPluginDashboardFilesResult, error) { +func (m pluginDashboardStoreMock) ListPluginDashboardFiles(ctx context.Context, args *dashboards.ListPluginDashboardFilesArgs) (*dashboards.ListPluginDashboardFilesResult, error) { if dashboardFiles, exists := m.pluginDashboardFiles[args.PluginID]; exists { references := []string{} @@ -179,7 +180,7 @@ func (m pluginDashboardStoreMock) ListPluginDashboardFiles(ctx context.Context, sort.Strings(references) - return &plugins.ListPluginDashboardFilesResult{ + return &dashboards.ListPluginDashboardFilesResult{ FileReferences: references, }, nil } @@ -187,11 +188,11 @@ func (m pluginDashboardStoreMock) ListPluginDashboardFiles(ctx context.Context, return nil, plugins.NotFoundError{PluginID: args.PluginID} } -func (m pluginDashboardStoreMock) GetPluginDashboardFileContents(ctx context.Context, args *plugins.GetPluginDashboardFileContentsArgs) (*plugins.GetPluginDashboardFileContentsResult, error) { +func (m pluginDashboardStoreMock) GetPluginDashboardFileContents(ctx context.Context, args *dashboards.GetPluginDashboardFileContentsArgs) (*dashboards.GetPluginDashboardFileContentsResult, error) { if dashboardFiles, exists := m.pluginDashboardFiles[args.PluginID]; exists { if content, exists := dashboardFiles[args.FileReference]; exists { r := bytes.NewReader(content) - return &plugins.GetPluginDashboardFileContentsResult{ + return &dashboards.GetPluginDashboardFileContentsResult{ Content: io.NopCloser(r), }, nil }