The open and composable observability and data visualization platform. Visualize metrics, logs, and traces from multiple sources like Prometheus, Loki, Elasticsearch, InfluxDB, Postgres and many more.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
grafana/pkg/plugins/backendplugin/grpcplugin/grpc_plugin.go

190 lines
4.8 KiB

package grpcplugin
import (
"context"
"errors"
"sync"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/process"
"github.com/grafana/grafana/pkg/plugins/backendplugin"
"github.com/hashicorp/go-plugin"
)
type pluginClient interface {
backend.CollectMetricsHandler
backend.CheckHealthHandler
backend.QueryDataHandler
backend.CallResourceHandler
backend.StreamHandler
}
type grpcPlugin struct {
Plugins: Enable plugin runtime install/uninstall capabilities (#33836) * add uninstall flow * add install flow * small cleanup * smaller-footprint solution * cleanup + make bp start auto * fix interface contract * improve naming * accept version arg * ensure use of shared logger * make installer a field * add plugin decommissioning * add basic error checking * fix api docs * making initialization idempotent * add mutex * fix comment * fix test * add test for decommission * improve existing test * add more test coverage * more tests * change test func to use read lock * refactoring + adding test asserts * improve purging old install flow * improve dupe checking * change log name * skip over dupe scanned * make test assertion more flexible * remove trailing line * fix pointer receiver name * update comment * add context to API * add config flag * add base http api test + fix update functionality * simplify existing check * clean up test * refactor tests based on feedback * add single quotes to errs * use gcmp in tests + fix logo issue * make plugin list testing more flexible * address feedback * fix API test * fix linter * undo preallocate * Update docs/sources/administration/configuration.md Co-authored-by: achatterjee-grafana <70489351+achatterjee-grafana@users.noreply.github.com> * Update docs/sources/administration/configuration.md Co-authored-by: achatterjee-grafana <70489351+achatterjee-grafana@users.noreply.github.com> * Update docs/sources/administration/configuration.md Co-authored-by: achatterjee-grafana <70489351+achatterjee-grafana@users.noreply.github.com> * fix linting issue in test * add docs placeholder * update install notes * Update docs/sources/plugins/marketplace.md Co-authored-by: Marcus Olsson <marcus.olsson@hey.com> * update access wording * add more placeholder docs * add link to more info * PR feedback - improved errors, refactor, lock fix * improve err details * propagate plugin version errors * don't autostart renderer * add H1 * fix imports Co-authored-by: achatterjee-grafana <70489351+achatterjee-grafana@users.noreply.github.com> Co-authored-by: Marcus Olsson <marcus.olsson@hey.com>
4 years ago
descriptor PluginDescriptor
clientFactory func() *plugin.Client
client *plugin.Client
pluginClient pluginClient
logger log.Logger
mutex sync.RWMutex
decommissioned bool
}
// newPlugin allocates and returns a new gRPC (external) backendplugin.Plugin.
func newPlugin(descriptor PluginDescriptor) backendplugin.PluginFactoryFunc {
return func(pluginID string, logger log.Logger, env []string) (backendplugin.Plugin, error) {
return &grpcPlugin{
descriptor: descriptor,
logger: logger,
clientFactory: func() *plugin.Client {
return plugin.NewClient(newClientConfig(descriptor.executablePath, env, logger, descriptor.versionedPlugins))
},
}, nil
}
}
func (p *grpcPlugin) PluginID() string {
return p.descriptor.pluginID
}
func (p *grpcPlugin) Logger() log.Logger {
return p.logger
}
func (p *grpcPlugin) Start(ctx context.Context) error {
p.mutex.Lock()
defer p.mutex.Unlock()
p.client = p.clientFactory()
rpcClient, err := p.client.Client()
if err != nil {
return err
}
if p.client.NegotiatedVersion() < 2 {
return errors.New("plugin protocol version not supported")
}
p.pluginClient, err = newClientV2(p.descriptor, p.logger, rpcClient)
if err != nil {
return err
}
if p.pluginClient == nil {
return errors.New("no compatible plugin implementation found")
}
elevated, err := process.IsRunningWithElevatedPrivileges()
if err != nil {
p.logger.Debug("Error checking plugin process execution privilege", "err", err)
}
if elevated {
p.logger.Warn("Plugin process is running with elevated privileges. This is not recommended")
}
return nil
}
func (p *grpcPlugin) Stop(ctx context.Context) error {
p.mutex.Lock()
defer p.mutex.Unlock()
if p.client != nil {
p.client.Kill()
}
return nil
}
func (p *grpcPlugin) IsManaged() bool {
return p.descriptor.managed
}
func (p *grpcPlugin) Exited() bool {
p.mutex.RLock()
defer p.mutex.RUnlock()
if p.client != nil {
return p.client.Exited()
}
return true
}
Plugins: Enable plugin runtime install/uninstall capabilities (#33836) * add uninstall flow * add install flow * small cleanup * smaller-footprint solution * cleanup + make bp start auto * fix interface contract * improve naming * accept version arg * ensure use of shared logger * make installer a field * add plugin decommissioning * add basic error checking * fix api docs * making initialization idempotent * add mutex * fix comment * fix test * add test for decommission * improve existing test * add more test coverage * more tests * change test func to use read lock * refactoring + adding test asserts * improve purging old install flow * improve dupe checking * change log name * skip over dupe scanned * make test assertion more flexible * remove trailing line * fix pointer receiver name * update comment * add context to API * add config flag * add base http api test + fix update functionality * simplify existing check * clean up test * refactor tests based on feedback * add single quotes to errs * use gcmp in tests + fix logo issue * make plugin list testing more flexible * address feedback * fix API test * fix linter * undo preallocate * Update docs/sources/administration/configuration.md Co-authored-by: achatterjee-grafana <70489351+achatterjee-grafana@users.noreply.github.com> * Update docs/sources/administration/configuration.md Co-authored-by: achatterjee-grafana <70489351+achatterjee-grafana@users.noreply.github.com> * Update docs/sources/administration/configuration.md Co-authored-by: achatterjee-grafana <70489351+achatterjee-grafana@users.noreply.github.com> * fix linting issue in test * add docs placeholder * update install notes * Update docs/sources/plugins/marketplace.md Co-authored-by: Marcus Olsson <marcus.olsson@hey.com> * update access wording * add more placeholder docs * add link to more info * PR feedback - improved errors, refactor, lock fix * improve err details * propagate plugin version errors * don't autostart renderer * add H1 * fix imports Co-authored-by: achatterjee-grafana <70489351+achatterjee-grafana@users.noreply.github.com> Co-authored-by: Marcus Olsson <marcus.olsson@hey.com>
4 years ago
func (p *grpcPlugin) Decommission() error {
p.mutex.RLock()
defer p.mutex.RUnlock()
p.decommissioned = true
return nil
}
func (p *grpcPlugin) IsDecommissioned() bool {
return p.decommissioned
}
func (p *grpcPlugin) getPluginClient() (pluginClient, bool) {
p.mutex.RLock()
if p.client == nil || p.client.Exited() || p.pluginClient == nil {
p.mutex.RUnlock()
return nil, false
}
pluginClient := p.pluginClient
p.mutex.RUnlock()
return pluginClient, true
}
func (p *grpcPlugin) CollectMetrics(ctx context.Context) (*backend.CollectMetricsResult, error) {
pluginClient, ok := p.getPluginClient()
if !ok {
return nil, backendplugin.ErrPluginUnavailable
}
return pluginClient.CollectMetrics(ctx)
}
func (p *grpcPlugin) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) {
pluginClient, ok := p.getPluginClient()
if !ok {
return nil, backendplugin.ErrPluginUnavailable
}
return pluginClient.CheckHealth(ctx, req)
}
func (p *grpcPlugin) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
pluginClient, ok := p.getPluginClient()
if !ok {
return nil, backendplugin.ErrPluginUnavailable
}
return pluginClient.QueryData(ctx, req)
}
func (p *grpcPlugin) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error {
pluginClient, ok := p.getPluginClient()
if !ok {
return backendplugin.ErrPluginUnavailable
}
return pluginClient.CallResource(ctx, req, sender)
}
func (p *grpcPlugin) SubscribeStream(ctx context.Context, request *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) {
pluginClient, ok := p.getPluginClient()
if !ok {
return nil, backendplugin.ErrPluginUnavailable
}
return pluginClient.SubscribeStream(ctx, request)
}
func (p *grpcPlugin) PublishStream(ctx context.Context, request *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) {
pluginClient, ok := p.getPluginClient()
if !ok {
return nil, backendplugin.ErrPluginUnavailable
}
return pluginClient.PublishStream(ctx, request)
}
func (p *grpcPlugin) RunStream(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender) error {
pluginClient, ok := p.getPluginClient()
if !ok {
return backendplugin.ErrPluginUnavailable
}
return pluginClient.RunStream(ctx, req, sender)
}