The open and composable observability and data visualization platform. Visualize metrics, logs, and traces from multiple sources like Prometheus, Loki, Elasticsearch, InfluxDB, Postgres and many more.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
grafana/pkg/server/module_server.go

217 lines
5.9 KiB

package server
import (
"context"
"fmt"
"net"
"os"
"path/filepath"
"strconv"
"sync"
"github.com/grafana/dskit/services"
"github.com/grafana/grafana/pkg/api"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/modules"
"github.com/grafana/grafana/pkg/services/featuremgmt"
Storage: Unified Storage based on Entity API (#71977) * first round of entityapi updates - quote column names and clean up insert/update queries - replace grn with guid - streamline table structure fixes streamline entity history move EntitySummary into proto remove EntitySummary add guid to json fix tests change DB_Uuid to DB_NVarchar fix folder test convert interface to any more cleanup start entity store under grafana-apiserver dskit target CRUD working, kind of rough cut of wiring entity api to kube-apiserver fake grafana user in context add key to entity list working revert unnecessary changes move entity storage files to their own package, clean up use accessor to read/write grafana annotations implement separate Create and Update functions * go mod tidy * switch from Kind to resource * basic grpc storage server * basic support for grpc entity store * don't connect to database unless it's needed, pass user identity over grpc * support getting user from k8s context, fix some mysql issues * assign owner to snowflake dependency * switch from ulid to uuid for guids * cleanup, rename Search to List * remove entityListResult * EntityAPI: remove extra user abstraction (#79033) * remove extra user abstraction * add test stub (but * move grpc context setup into client wrapper, fix lint issue * remove unused constants * remove custom json stuff * basic list filtering, add todo * change target to storage-server, allow entityStore flag in prod mode * fix issue with Update * EntityAPI: make test work, need to resolve expected differences (#79123) * make test work, need to resolve expected differences * remove the fields not supported by legacy * sanitize out the bits legacy does not support * sanitize out the bits legacy does not support --------- Co-authored-by: Ryan McKinley <ryantxu@gmail.com> * update feature toggle generated files * remove unused http headers * update feature flag strategy * devmode * update readme * spelling * readme --------- Co-authored-by: Ryan McKinley <ryantxu@gmail.com>
2 years ago
storageServer "github.com/grafana/grafana/pkg/services/store/entity/server"
"github.com/grafana/grafana/pkg/setting"
)
// NewModule returns an instance of a ModuleServer, responsible for managing
// dskit modules (services).
func NewModule(opts Options, apiOpts api.ServerOptions, features featuremgmt.FeatureToggles, cfg *setting.Cfg) (*ModuleServer, error) {
s, err := newModuleServer(opts, apiOpts, features, cfg)
if err != nil {
return nil, err
}
if err := s.init(); err != nil {
return nil, err
}
return s, nil
}
func newModuleServer(opts Options, apiOpts api.ServerOptions, features featuremgmt.FeatureToggles, cfg *setting.Cfg) (*ModuleServer, error) {
rootCtx, shutdownFn := context.WithCancel(context.Background())
s := &ModuleServer{
opts: opts,
apiOpts: apiOpts,
context: rootCtx,
shutdownFn: shutdownFn,
shutdownFinished: make(chan struct{}),
log: log.New("base-server"),
features: features,
cfg: cfg,
pidFile: opts.PidFile,
version: opts.Version,
commit: opts.Commit,
buildBranch: opts.BuildBranch,
}
return s, nil
}
// ModuleServer is responsible for managing the lifecycle of dskit services. The
// ModuleServer has the minimal set of dependencies to launch dskit services,
// but it can be used to launch the entire Grafana server.
type ModuleServer struct {
opts Options
apiOpts api.ServerOptions
features featuremgmt.FeatureToggles
context context.Context
shutdownFn context.CancelFunc
log log.Logger
cfg *setting.Cfg
shutdownOnce sync.Once
shutdownFinished chan struct{}
isInitialized bool
mtx sync.Mutex
pidFile string
version string
commit string
buildBranch string
}
// init initializes the server and its services.
func (s *ModuleServer) init() error {
s.mtx.Lock()
defer s.mtx.Unlock()
if s.isInitialized {
return nil
}
s.isInitialized = true
if err := s.writePIDFile(); err != nil {
return err
}
return nil
}
// Run initializes and starts services. This will block until all services have
// exited. To initiate shutdown, call the Shutdown method in another goroutine.
func (s *ModuleServer) Run() error {
defer close(s.shutdownFinished)
if err := s.init(); err != nil {
return err
}
s.notifySystemd("READY=1")
s.log.Debug("Waiting on services...")
// Only allow individual dskit modules to run in dev mode.
if s.cfg.Env != setting.Dev {
if len(s.cfg.Target) > 1 || s.cfg.Target[0] != "all" {
s.log.Error("dskit module targeting is only supported in dev mode. Falling back to 'all'")
s.cfg.Target = []string{"all"}
}
}
m := modules.New(s.cfg.Target)
m.RegisterModule(modules.Core, func() (services.Service, error) {
return NewService(s.cfg, s.opts, s.apiOpts)
})
// TODO: uncomment this once the apiserver is ready to be run as a standalone target
//if s.features.IsEnabled(featuremgmt.FlagGrafanaAPIServer) {
// m.RegisterModule(modules.GrafanaAPIServer, func() (services.Service, error) {
// return grafanaapiserver.New(path.Join(s.cfg.DataPath, "k8s"))
// })
//} else {
// s.log.Debug("apiserver feature is disabled")
//}
Storage: Unified Storage based on Entity API (#71977) * first round of entityapi updates - quote column names and clean up insert/update queries - replace grn with guid - streamline table structure fixes streamline entity history move EntitySummary into proto remove EntitySummary add guid to json fix tests change DB_Uuid to DB_NVarchar fix folder test convert interface to any more cleanup start entity store under grafana-apiserver dskit target CRUD working, kind of rough cut of wiring entity api to kube-apiserver fake grafana user in context add key to entity list working revert unnecessary changes move entity storage files to their own package, clean up use accessor to read/write grafana annotations implement separate Create and Update functions * go mod tidy * switch from Kind to resource * basic grpc storage server * basic support for grpc entity store * don't connect to database unless it's needed, pass user identity over grpc * support getting user from k8s context, fix some mysql issues * assign owner to snowflake dependency * switch from ulid to uuid for guids * cleanup, rename Search to List * remove entityListResult * EntityAPI: remove extra user abstraction (#79033) * remove extra user abstraction * add test stub (but * move grpc context setup into client wrapper, fix lint issue * remove unused constants * remove custom json stuff * basic list filtering, add todo * change target to storage-server, allow entityStore flag in prod mode * fix issue with Update * EntityAPI: make test work, need to resolve expected differences (#79123) * make test work, need to resolve expected differences * remove the fields not supported by legacy * sanitize out the bits legacy does not support * sanitize out the bits legacy does not support --------- Co-authored-by: Ryan McKinley <ryantxu@gmail.com> * update feature toggle generated files * remove unused http headers * update feature flag strategy * devmode * update readme * spelling * readme --------- Co-authored-by: Ryan McKinley <ryantxu@gmail.com>
2 years ago
m.RegisterModule(modules.StorageServer, func() (services.Service, error) {
return storageServer.ProvideService(s.cfg, s.features)
})
m.RegisterModule(modules.All, nil)
return m.Run(s.context)
}
// Shutdown initiates Grafana graceful shutdown. This shuts down all
// running background services. Since Run blocks Shutdown supposed to
// be run from a separate goroutine.
func (s *ModuleServer) Shutdown(ctx context.Context, reason string) error {
var err error
s.shutdownOnce.Do(func() {
s.log.Info("Shutdown started", "reason", reason)
// Call cancel func to stop background services.
s.shutdownFn()
// Wait for server to shut down
select {
case <-s.shutdownFinished:
s.log.Debug("Finished waiting for server to shut down")
case <-ctx.Done():
s.log.Warn("Timed out while waiting for server to shut down")
err = fmt.Errorf("timeout waiting for shutdown")
}
})
return err
}
// writePIDFile retrieves the current process ID and writes it to file.
func (s *ModuleServer) writePIDFile() error {
if s.pidFile == "" {
return nil
}
// Ensure the required directory structure exists.
err := os.MkdirAll(filepath.Dir(s.pidFile), 0700)
if err != nil {
s.log.Error("Failed to verify pid directory", "error", err)
return fmt.Errorf("failed to verify pid directory: %s", err)
}
// Retrieve the PID and write it to file.
pid := strconv.Itoa(os.Getpid())
if err := os.WriteFile(s.pidFile, []byte(pid), 0644); err != nil {
s.log.Error("Failed to write pidfile", "error", err)
return fmt.Errorf("failed to write pidfile: %s", err)
}
s.log.Info("Writing PID file", "path", s.pidFile, "pid", pid)
return nil
}
// notifySystemd sends state notifications to systemd.
func (s *ModuleServer) notifySystemd(state string) {
notifySocket := os.Getenv("NOTIFY_SOCKET")
if notifySocket == "" {
s.log.Debug(
"NOTIFY_SOCKET environment variable empty or unset, can't send systemd notification")
return
}
socketAddr := &net.UnixAddr{
Name: notifySocket,
Net: "unixgram",
}
conn, err := net.DialUnix(socketAddr.Net, nil, socketAddr)
if err != nil {
s.log.Warn("Failed to connect to systemd", "err", err, "socket", notifySocket)
return
}
defer func() {
if err := conn.Close(); err != nil {
s.log.Warn("Failed to close connection", "err", err)
}
}()
_, err = conn.Write([]byte(state))
if err != nil {
s.log.Warn("Failed to write notification to systemd", "err", err)
}
}