diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 3be8ca0e8dc..2de75513073 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -262,6 +262,7 @@ /pkg/services/querylibrary/ @grafana/multitenancy-squad /pkg/infra/filestorage/ @grafana/multitenancy-squad /pkg/util/converter/ @grafana/multitenancy-squad +/pkg/modules/ @grafana/multitenancy-squad # Alerting /pkg/services/ngalert/ @grafana/alerting-squad-backend diff --git a/pkg/modules/listener.go b/pkg/modules/listener.go new file mode 100644 index 00000000000..79e8bee11c2 --- /dev/null +++ b/pkg/modules/listener.go @@ -0,0 +1,50 @@ +package modules + +import ( + "context" + "errors" + + "github.com/grafana/dskit/modules" + "github.com/grafana/dskit/services" + "github.com/grafana/grafana/pkg/infra/log" +) + +var _ services.ManagerListener = (*serviceListener)(nil) + +type serviceListener struct { + log log.Logger + service *service +} + +func newServiceListener(logger log.Logger, s *service) *serviceListener { + return &serviceListener{log: logger, service: s} +} + +func (l *serviceListener) Healthy() { + l.log.Info("All modules healthy") +} + +func (l *serviceListener) Stopped() { + l.log.Info("All modules stopped") +} + +func (l *serviceListener) Failure(service services.Service) { + // if any service fails, stop all services + if err := l.service.Shutdown(context.Background()); err != nil { + l.log.Error("Failed to stop all modules", "err", err) + } + + // log which module failed + for module, s := range l.service.ServiceMap { + if s == service { + if errors.Is(service.FailureCase(), modules.ErrStopProcess) { + l.log.Info("Received stop signal via return error", "module", module, "err", service.FailureCase()) + } else { + l.log.Error("Module failed", "module", module, "err", service.FailureCase()) + } + return + } + } + + l.log.Error("Module failed", "module", "unknown", "err", service.FailureCase()) +} diff --git a/pkg/modules/modules.go b/pkg/modules/modules.go new file mode 100644 index 00000000000..c970c58a966 --- /dev/null +++ b/pkg/modules/modules.go @@ -0,0 +1,155 @@ +package modules + +import ( + "context" + "errors" + + "github.com/grafana/dskit/modules" + "github.com/grafana/dskit/services" + + "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/setting" +) + +// List of available targets. +const ( + All string = "all" +) + +type Engine interface { + Init(context.Context) error + Run(context.Context) error + Shutdown(context.Context) error +} + +type Manager interface { + RegisterModule(name string, initFn func() (services.Service, error), deps ...string) + RegisterInvisibleModule(name string, initFn func() (services.Service, error), deps ...string) +} + +var _ Engine = (*service)(nil) +var _ Manager = (*service)(nil) + +// service manages the registration and lifecycle of modules. +type service struct { + cfg *setting.Cfg + log log.Logger + targets []string + dependencyMap map[string][]string + + ModuleManager *modules.Manager + ServiceManager *services.Manager + ServiceMap map[string]services.Service +} + +func ProvideService(cfg *setting.Cfg) *service { + logger := log.New("modules") + + return &service{ + cfg: cfg, + log: logger, + targets: cfg.Target, + dependencyMap: map[string][]string{}, + + ModuleManager: modules.NewManager(logger), + ServiceMap: map[string]services.Service{}, + } +} + +// Init initializes all registered modules. +func (m *service) Init(_ context.Context) error { + var err error + + // module registration + m.RegisterModule(All, nil) + + for mod, targets := range m.dependencyMap { + if err := m.ModuleManager.AddDependency(mod, targets...); err != nil { + return err + } + } + + m.ServiceMap, err = m.ModuleManager.InitModuleServices(m.targets...) + if err != nil { + return err + } + + // if no modules are registered, we don't need to start the service manager + if len(m.ServiceMap) == 0 { + return nil + } + + var svcs []services.Service + for _, s := range m.ServiceMap { + svcs = append(svcs, s) + } + m.ServiceManager, err = services.NewManager(svcs...) + + return err +} + +// Run starts all registered modules. +func (m *service) Run(ctx context.Context) error { + // we don't need to continue if no modules are registered. + // this behavior may need to change if dskit services replace the + // current background service registry. + if len(m.ServiceMap) == 0 { + m.log.Warn("No modules registered...") + <-ctx.Done() + return nil + } + + listener := newServiceListener(m.log, m) + m.ServiceManager.AddListener(listener) + + // wait until a service fails or stop signal was received + err := m.ServiceManager.StartAsync(ctx) + if err != nil { + return err + } + + err = m.ServiceManager.AwaitStopped(ctx) + if err != nil { + return err + } + + failed := m.ServiceManager.ServicesByState()[services.Failed] + for _, f := range failed { + // the service listener will log error details for all modules that failed, + // so here we return the first error that is not ErrStopProcess + if !errors.Is(f.FailureCase(), modules.ErrStopProcess) { + return f.FailureCase() + } + } + + return nil +} + +// Shutdown stops all modules and waits for them to stop. +func (m *service) Shutdown(ctx context.Context) error { + if m.ServiceManager == nil { + m.log.Debug("No modules registered, nothing to stop...") + return nil + } + m.ServiceManager.StopAsync() + m.log.Info("Awaiting services to be stopped...") + return m.ServiceManager.AwaitStopped(ctx) +} + +// RegisterModule registers a module with the dskit module manager. +func (m *service) RegisterModule(name string, initFn func() (services.Service, error), deps ...string) { + m.ModuleManager.RegisterModule(name, initFn) + m.dependencyMap[name] = deps +} + +// RegisterInvisibleModule registers an invisible module with the dskit module manager. +// Invisible modules are not visible to the user, and are intendent to be used as dependencies. +func (m *service) RegisterInvisibleModule(name string, initFn func() (services.Service, error), deps ...string) { + m.ModuleManager.RegisterModule(name, initFn, modules.UserInvisibleModule) + m.dependencyMap[name] = deps +} + +// IsModuleEnabled returns true if the module is enabled. +func (m *service) IsModuleEnabled(name string) bool { + return stringsContain(m.targets, name) +} diff --git a/pkg/modules/util.go b/pkg/modules/util.go new file mode 100644 index 00000000000..a02479ce9ed --- /dev/null +++ b/pkg/modules/util.go @@ -0,0 +1,11 @@ +package modules + +func stringsContain(values []string, search string) bool { + for _, v := range values { + if search == v { + return true + } + } + + return false +} diff --git a/pkg/modules/wire.go b/pkg/modules/wire.go new file mode 100644 index 00000000000..7bd1c57e124 --- /dev/null +++ b/pkg/modules/wire.go @@ -0,0 +1,9 @@ +package modules + +import "github.com/google/wire" + +var WireSet = wire.NewSet( + ProvideService, + wire.Bind(new(Engine), new(*service)), + wire.Bind(new(Manager), new(*service)), +) diff --git a/pkg/server/server.go b/pkg/server/server.go index 0c06701694c..2cf67e98cfb 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -18,6 +18,7 @@ import ( "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/infra/metrics" "github.com/grafana/grafana/pkg/infra/usagestats/statscollector" + "github.com/grafana/grafana/pkg/modules" "github.com/grafana/grafana/pkg/registry" "github.com/grafana/grafana/pkg/services/accesscontrol" "github.com/grafana/grafana/pkg/services/provisioning" @@ -38,9 +39,10 @@ type Options struct { func New(opts Options, cfg *setting.Cfg, httpServer *api.HTTPServer, roleRegistry accesscontrol.RoleRegistry, provisioningService provisioning.ProvisioningService, backgroundServiceProvider registry.BackgroundServiceRegistry, usageStatsProvidersRegistry registry.UsageStatsProvidersRegistry, statsCollectorService *statscollector.Service, + moduleService modules.Engine, ) (*Server, error) { statsCollectorService.RegisterProviders(usageStatsProvidersRegistry.GetServices()) - s, err := newServer(opts, cfg, httpServer, roleRegistry, provisioningService, backgroundServiceProvider) + s, err := newServer(opts, cfg, httpServer, roleRegistry, provisioningService, backgroundServiceProvider, moduleService) if err != nil { return nil, err } @@ -54,6 +56,7 @@ func New(opts Options, cfg *setting.Cfg, httpServer *api.HTTPServer, roleRegistr func newServer(opts Options, cfg *setting.Cfg, httpServer *api.HTTPServer, roleRegistry accesscontrol.RoleRegistry, provisioningService provisioning.ProvisioningService, backgroundServiceProvider registry.BackgroundServiceRegistry, + moduleService modules.Engine, ) (*Server, error) { rootCtx, shutdownFn := context.WithCancel(context.Background()) childRoutines, childCtx := errgroup.WithContext(rootCtx) @@ -73,6 +76,7 @@ func newServer(opts Options, cfg *setting.Cfg, httpServer *api.HTTPServer, roleR commit: opts.Commit, buildBranch: opts.BuildBranch, backgroundServices: backgroundServiceProvider.GetServices(), + moduleService: moduleService, } return s, nil @@ -99,6 +103,7 @@ type Server struct { HTTPServer *api.HTTPServer roleRegistry accesscontrol.RoleRegistry provisioningService provisioning.ProvisioningService + moduleService modules.Engine } // init initializes the server and its services. @@ -115,6 +120,11 @@ func (s *Server) init() error { return err } + // Initialize dskit modules. + if err := s.moduleService.Init(s.context); err != nil { + return err + } + if err := metrics.SetEnvironmentInformation(s.cfg.MetricsGrafanaEnvironmentInfo); err != nil { return err } @@ -135,6 +145,15 @@ func (s *Server) Run() error { return err } + // Start dskit modules. + s.childRoutines.Go(func() error { + err := s.moduleService.Run(s.context) + if err != nil && !errors.Is(err, context.Canceled) { + return err + } + return nil + }) + services := s.backgroundServices // Start background services. @@ -178,7 +197,10 @@ func (s *Server) Shutdown(ctx context.Context, reason string) error { var err error s.shutdownOnce.Do(func() { s.log.Info("Shutdown started", "reason", reason) - // Call cancel func to stop services. + if err := s.moduleService.Shutdown(ctx); err != nil { + s.log.Error("Failed to shutdown modules", "error", err) + } + // Call cancel func to stop background services. s.shutdownFn() // Wait for server to shut down select { diff --git a/pkg/server/server_test.go b/pkg/server/server_test.go index 6864cbe589c..8327f7ed944 100644 --- a/pkg/server/server_test.go +++ b/pkg/server/server_test.go @@ -48,7 +48,7 @@ func (s *testService) IsDisabled() bool { func testServer(t *testing.T, services ...registry.BackgroundService) *Server { t.Helper() - s, err := newServer(Options{}, setting.NewCfg(), nil, &acimpl.Service{}, nil, backgroundsvcs.NewBackgroundServiceRegistry(services...)) + s, err := newServer(Options{}, setting.NewCfg(), nil, &acimpl.Service{}, nil, backgroundsvcs.NewBackgroundServiceRegistry(services...), &MockModuleService{}) require.NoError(t, err) // Required to skip configuration initialization that causes // DI errors in this test. @@ -90,3 +90,30 @@ func TestServer_Shutdown(t *testing.T) { err = <-ch require.NoError(t, err) } + +type MockModuleService struct { + initFunc func(context.Context) error + runFunc func(context.Context) error + shutdownFunc func(context.Context) error +} + +func (m *MockModuleService) Init(ctx context.Context) error { + if m.initFunc != nil { + return m.initFunc(ctx) + } + return nil +} + +func (m *MockModuleService) Run(ctx context.Context) error { + if m.runFunc != nil { + return m.runFunc(ctx) + } + return nil +} + +func (m *MockModuleService) Shutdown(ctx context.Context) error { + if m.shutdownFunc != nil { + return m.shutdownFunc(ctx) + } + return nil +} diff --git a/pkg/server/wire.go b/pkg/server/wire.go index 65491f0ad62..f98ac36589e 100644 --- a/pkg/server/wire.go +++ b/pkg/server/wire.go @@ -30,6 +30,7 @@ import ( loginpkg "github.com/grafana/grafana/pkg/login" "github.com/grafana/grafana/pkg/login/social" "github.com/grafana/grafana/pkg/middleware/csrf" + "github.com/grafana/grafana/pkg/modules" pluginDashboards "github.com/grafana/grafana/pkg/plugins/manager/dashboards" "github.com/grafana/grafana/pkg/registry/corekind" "github.com/grafana/grafana/pkg/services/accesscontrol" @@ -366,6 +367,7 @@ var wireBasicSet = wire.NewSet( authnimpl.ProvideService, wire.Bind(new(authn.Service), new(*authnimpl.Service)), supportbundlesimpl.ProvideService, + modules.WireSet, ) var wireSet = wire.NewSet( diff --git a/pkg/setting/setting.go b/pkg/setting/setting.go index a4aecbe59db..4cbb5863d3e 100644 --- a/pkg/setting/setting.go +++ b/pkg/setting/setting.go @@ -145,6 +145,7 @@ var ( // TODO move all global vars to this struct type Cfg struct { + Target []string Raw *ini.File Logger log.Logger @@ -908,6 +909,7 @@ var skipStaticRootValidation = false func NewCfg() *Cfg { return &Cfg{ + Target: []string{"all"}, Logger: log.New("settings"), Raw: ini.Empty(), Azure: &azsettings.AzureSettings{}, @@ -966,6 +968,8 @@ func (cfg *Cfg) Load(args CommandLineArgs) error { cfg.ErrTemplateName = "error" + Target := valueAsString(iniFile.Section(""), "target", "all") + cfg.Target = strings.Split(Target, " ") Env = valueAsString(iniFile.Section(""), "app_mode", "development") cfg.Env = Env cfg.ForceMigration = iniFile.Section("").Key("force_migration").MustBool(false) @@ -1265,6 +1269,7 @@ func (cfg *Cfg) LogConfigSources() { } } + cfg.Logger.Info("Target", "target", cfg.Target) cfg.Logger.Info("Path Home", "path", HomePath) cfg.Logger.Info("Path Data", "path", cfg.DataPath) cfg.Logger.Info("Path Logs", "path", cfg.LogsPath)