The open and composable observability and data visualization platform. Visualize metrics, logs, and traces from multiple sources like Prometheus, Loki, Elasticsearch, InfluxDB, Postgres and many more.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
grafana/pkg/services/live/liveplugin/plugin.go

85 lines
2.6 KiB

package liveplugin
import (
"context"
"fmt"
"github.com/centrifugal/centrifuge"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana/pkg/apimachinery/identity"
"github.com/grafana/grafana/pkg/services/datasources"
"github.com/grafana/grafana/pkg/services/live/orgchannel"
"github.com/grafana/grafana/pkg/services/live/pipeline"
"github.com/grafana/grafana/pkg/services/pluginsintegration/plugincontext"
)
type ChannelLocalPublisher struct {
node *centrifuge.Node
pipeline *pipeline.Pipeline
}
func NewChannelLocalPublisher(node *centrifuge.Node, pipeline *pipeline.Pipeline) *ChannelLocalPublisher {
return &ChannelLocalPublisher{node: node, pipeline: pipeline}
}
func (p *ChannelLocalPublisher) PublishLocal(channel string, data []byte) error {
if p.pipeline != nil {
orgID, channelID, err := orgchannel.StripOrgID(channel)
if err != nil {
return err
}
ok, err := p.pipeline.ProcessInput(context.Background(), orgID, channelID, data)
if err != nil {
return err
}
if ok {
// if rule found – we are done here. If not - fall through and process as usual.
return nil
}
}
pub := &centrifuge.Publication{
Data: data,
}
err := p.node.Hub().BroadcastPublication(channel, pub, centrifuge.StreamPosition{})
if err != nil {
return fmt.Errorf("error publishing %s: %w", string(data), err)
}
return nil
}
type NumLocalSubscribersGetter struct {
node *centrifuge.Node
}
func NewNumLocalSubscribersGetter(node *centrifuge.Node) *NumLocalSubscribersGetter {
return &NumLocalSubscribersGetter{node: node}
}
func (p *NumLocalSubscribersGetter) GetNumLocalSubscribers(channelID string) (int, error) {
return p.node.Hub().NumSubscribers(channelID), nil
}
type ContextGetter struct {
pluginContextProvider *plugincontext.Provider
dataSourceCache datasources.CacheService
}
func NewContextGetter(pluginContextProvider *plugincontext.Provider, dataSourceCache datasources.CacheService) *ContextGetter {
return &ContextGetter{
pluginContextProvider: pluginContextProvider,
dataSourceCache: dataSourceCache,
}
}
func (g *ContextGetter) GetPluginContext(ctx context.Context, user identity.Requester, pluginID string, datasourceUID string, skipCache bool) (backend.PluginContext, error) {
if datasourceUID == "" {
return g.pluginContextProvider.Get(ctx, pluginID, user, user.GetOrgID())
}
ds, err := g.dataSourceCache.GetDatasourceByUID(ctx, datasourceUID, user, skipCache)
if err != nil {
return backend.PluginContext{}, fmt.Errorf("%v: %w", "Failed to get datasource", err)
}
return g.pluginContextProvider.GetWithDataSource(ctx, pluginID, user, ds)
}