mirror of https://github.com/grafana/grafana
Basic streaming plugin support (#31940)
This pull request migrates testdata to coreplugin streaming capabilities, this is mostly a working concept of streaming plugins at the moment, the work will continue in the following pull requests.pull/32259/head^2
parent
1cd8981be4
commit
336bc559a3
@ -0,0 +1,118 @@ |
||||
package plugincontext |
||||
|
||||
import ( |
||||
"encoding/json" |
||||
"errors" |
||||
"time" |
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend" |
||||
|
||||
"github.com/grafana/grafana/pkg/bus" |
||||
"github.com/grafana/grafana/pkg/infra/localcache" |
||||
"github.com/grafana/grafana/pkg/models" |
||||
"github.com/grafana/grafana/pkg/plugins" |
||||
"github.com/grafana/grafana/pkg/plugins/adapters" |
||||
"github.com/grafana/grafana/pkg/registry" |
||||
"github.com/grafana/grafana/pkg/services/datasources" |
||||
"github.com/grafana/grafana/pkg/util/errutil" |
||||
) |
||||
|
||||
func init() { |
||||
registry.Register(®istry.Descriptor{ |
||||
Name: "PluginContextProvider", |
||||
Instance: newProvider(), |
||||
}) |
||||
} |
||||
|
||||
func newProvider() *Provider { |
||||
return &Provider{} |
||||
} |
||||
|
||||
type Provider struct { |
||||
Bus bus.Bus `inject:""` |
||||
CacheService *localcache.CacheService `inject:""` |
||||
PluginManager plugins.Manager `inject:""` |
||||
DatasourceCache datasources.CacheService `inject:""` |
||||
} |
||||
|
||||
func (p *Provider) Init() error { |
||||
return nil |
||||
} |
||||
|
||||
// Get allows getting plugin context by its id. If datasourceUID is not empty string
|
||||
// then PluginContext.DataSourceInstanceSettings will be resolved and appended to
|
||||
// returned context.
|
||||
func (p *Provider) Get(pluginID string, datasourceUID string, user *models.SignedInUser) (backend.PluginContext, bool, error) { |
||||
pc := backend.PluginContext{} |
||||
plugin := p.PluginManager.GetPlugin(pluginID) |
||||
if plugin == nil { |
||||
return pc, false, nil |
||||
} |
||||
|
||||
jsonData := json.RawMessage{} |
||||
decryptedSecureJSONData := map[string]string{} |
||||
var updated time.Time |
||||
|
||||
ps, err := p.getCachedPluginSettings(pluginID, user) |
||||
if err != nil { |
||||
// models.ErrPluginSettingNotFound is expected if there's no row found for plugin setting in database (if non-app plugin).
|
||||
// If it's not this expected error something is wrong with cache or database and we return the error to the client.
|
||||
if !errors.Is(err, models.ErrPluginSettingNotFound) { |
||||
return pc, false, errutil.Wrap("Failed to get plugin settings", err) |
||||
} |
||||
} else { |
||||
jsonData, err = json.Marshal(ps.JsonData) |
||||
if err != nil { |
||||
return pc, false, errutil.Wrap("Failed to unmarshal plugin json data", err) |
||||
} |
||||
decryptedSecureJSONData = ps.DecryptedValues() |
||||
updated = ps.Updated |
||||
} |
||||
|
||||
pCtx := backend.PluginContext{ |
||||
OrgID: user.OrgId, |
||||
PluginID: plugin.Id, |
||||
User: adapters.BackendUserFromSignedInUser(user), |
||||
AppInstanceSettings: &backend.AppInstanceSettings{ |
||||
JSONData: jsonData, |
||||
DecryptedSecureJSONData: decryptedSecureJSONData, |
||||
Updated: updated, |
||||
}, |
||||
} |
||||
|
||||
if datasourceUID != "" { |
||||
ds, err := p.DatasourceCache.GetDatasourceByUID(datasourceUID, user, false) |
||||
if err != nil { |
||||
return pc, false, errutil.Wrap("Failed to get datasource", err) |
||||
} |
||||
datasourceSettings, err := adapters.ModelToInstanceSettings(ds) |
||||
if err != nil { |
||||
return pc, false, errutil.Wrap("Failed to convert datasource", err) |
||||
} |
||||
pCtx.DataSourceInstanceSettings = datasourceSettings |
||||
} |
||||
|
||||
return pCtx, true, nil |
||||
} |
||||
|
||||
const pluginSettingsCacheTTL = 5 * time.Second |
||||
const pluginSettingsCachePrefix = "plugin-setting-" |
||||
|
||||
func (p *Provider) getCachedPluginSettings(pluginID string, user *models.SignedInUser) (*models.PluginSetting, error) { |
||||
cacheKey := pluginSettingsCachePrefix + pluginID |
||||
|
||||
if cached, found := p.CacheService.Get(cacheKey); found { |
||||
ps := cached.(*models.PluginSetting) |
||||
if ps.OrgId == user.OrgId { |
||||
return ps, nil |
||||
} |
||||
} |
||||
|
||||
query := models.GetPluginSettingByIdQuery{PluginId: pluginID, OrgId: user.OrgId} |
||||
if err := p.Bus.Dispatch(&query); err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
p.CacheService.Set(cacheKey, query.Result, pluginSettingsCacheTTL) |
||||
return query.Result, nil |
||||
} |
||||
@ -0,0 +1,24 @@ |
||||
package live |
||||
|
||||
import ( |
||||
"context" |
||||
|
||||
"github.com/grafana/grafana/pkg/models" |
||||
) |
||||
|
||||
type signedUserContextKeyType int |
||||
|
||||
var signedUserContextKey signedUserContextKeyType |
||||
|
||||
func setContextSignedUser(ctx context.Context, user *models.SignedInUser) context.Context { |
||||
ctx = context.WithValue(ctx, signedUserContextKey, user) |
||||
return ctx |
||||
} |
||||
|
||||
func getContextSignedUser(ctx context.Context) (*models.SignedInUser, bool) { |
||||
if val := ctx.Value(signedUserContextKey); val != nil { |
||||
user, ok := val.(*models.SignedInUser) |
||||
return user, ok |
||||
} |
||||
return nil, false |
||||
} |
||||
@ -0,0 +1,164 @@ |
||||
// Code generated by MockGen. DO NOT EDIT.
|
||||
// Source: github.com/grafana/grafana/pkg/services/live/features (interfaces: ChannelPublisher,PresenceGetter,PluginContextGetter,StreamRunner)
|
||||
|
||||
// Package features is a generated GoMock package.
|
||||
package features |
||||
|
||||
import ( |
||||
context "context" |
||||
reflect "reflect" |
||||
|
||||
gomock "github.com/golang/mock/gomock" |
||||
backend "github.com/grafana/grafana-plugin-sdk-go/backend" |
||||
) |
||||
|
||||
// MockChannelPublisher is a mock of ChannelPublisher interface.
|
||||
type MockChannelPublisher struct { |
||||
ctrl *gomock.Controller |
||||
recorder *MockChannelPublisherMockRecorder |
||||
} |
||||
|
||||
// MockChannelPublisherMockRecorder is the mock recorder for MockChannelPublisher.
|
||||
type MockChannelPublisherMockRecorder struct { |
||||
mock *MockChannelPublisher |
||||
} |
||||
|
||||
// NewMockChannelPublisher creates a new mock instance.
|
||||
func NewMockChannelPublisher(ctrl *gomock.Controller) *MockChannelPublisher { |
||||
mock := &MockChannelPublisher{ctrl: ctrl} |
||||
mock.recorder = &MockChannelPublisherMockRecorder{mock} |
||||
return mock |
||||
} |
||||
|
||||
// EXPECT returns an object that allows the caller to indicate expected use.
|
||||
func (m *MockChannelPublisher) EXPECT() *MockChannelPublisherMockRecorder { |
||||
return m.recorder |
||||
} |
||||
|
||||
// Publish mocks base method.
|
||||
func (m *MockChannelPublisher) Publish(arg0 string, arg1 []byte) error { |
||||
m.ctrl.T.Helper() |
||||
ret := m.ctrl.Call(m, "Publish", arg0, arg1) |
||||
ret0, _ := ret[0].(error) |
||||
return ret0 |
||||
} |
||||
|
||||
// Publish indicates an expected call of Publish.
|
||||
func (mr *MockChannelPublisherMockRecorder) Publish(arg0, arg1 interface{}) *gomock.Call { |
||||
mr.mock.ctrl.T.Helper() |
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Publish", reflect.TypeOf((*MockChannelPublisher)(nil).Publish), arg0, arg1) |
||||
} |
||||
|
||||
// MockPresenceGetter is a mock of PresenceGetter interface.
|
||||
type MockPresenceGetter struct { |
||||
ctrl *gomock.Controller |
||||
recorder *MockPresenceGetterMockRecorder |
||||
} |
||||
|
||||
// MockPresenceGetterMockRecorder is the mock recorder for MockPresenceGetter.
|
||||
type MockPresenceGetterMockRecorder struct { |
||||
mock *MockPresenceGetter |
||||
} |
||||
|
||||
// NewMockPresenceGetter creates a new mock instance.
|
||||
func NewMockPresenceGetter(ctrl *gomock.Controller) *MockPresenceGetter { |
||||
mock := &MockPresenceGetter{ctrl: ctrl} |
||||
mock.recorder = &MockPresenceGetterMockRecorder{mock} |
||||
return mock |
||||
} |
||||
|
||||
// EXPECT returns an object that allows the caller to indicate expected use.
|
||||
func (m *MockPresenceGetter) EXPECT() *MockPresenceGetterMockRecorder { |
||||
return m.recorder |
||||
} |
||||
|
||||
// GetNumSubscribers mocks base method.
|
||||
func (m *MockPresenceGetter) GetNumSubscribers(arg0 string) (int, error) { |
||||
m.ctrl.T.Helper() |
||||
ret := m.ctrl.Call(m, "GetNumSubscribers", arg0) |
||||
ret0, _ := ret[0].(int) |
||||
ret1, _ := ret[1].(error) |
||||
return ret0, ret1 |
||||
} |
||||
|
||||
// GetNumSubscribers indicates an expected call of GetNumSubscribers.
|
||||
func (mr *MockPresenceGetterMockRecorder) GetNumSubscribers(arg0 interface{}) *gomock.Call { |
||||
mr.mock.ctrl.T.Helper() |
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetNumSubscribers", reflect.TypeOf((*MockPresenceGetter)(nil).GetNumSubscribers), arg0) |
||||
} |
||||
|
||||
// MockPluginContextGetter is a mock of PluginContextGetter interface.
|
||||
type MockPluginContextGetter struct { |
||||
ctrl *gomock.Controller |
||||
recorder *MockPluginContextGetterMockRecorder |
||||
} |
||||
|
||||
// MockPluginContextGetterMockRecorder is the mock recorder for MockPluginContextGetter.
|
||||
type MockPluginContextGetterMockRecorder struct { |
||||
mock *MockPluginContextGetter |
||||
} |
||||
|
||||
// NewMockPluginContextGetter creates a new mock instance.
|
||||
func NewMockPluginContextGetter(ctrl *gomock.Controller) *MockPluginContextGetter { |
||||
mock := &MockPluginContextGetter{ctrl: ctrl} |
||||
mock.recorder = &MockPluginContextGetterMockRecorder{mock} |
||||
return mock |
||||
} |
||||
|
||||
// EXPECT returns an object that allows the caller to indicate expected use.
|
||||
func (m *MockPluginContextGetter) EXPECT() *MockPluginContextGetterMockRecorder { |
||||
return m.recorder |
||||
} |
||||
|
||||
// GetPluginContext mocks base method.
|
||||
func (m *MockPluginContextGetter) GetPluginContext(arg0 context.Context, arg1, arg2 string) (backend.PluginContext, bool, error) { |
||||
m.ctrl.T.Helper() |
||||
ret := m.ctrl.Call(m, "GetPluginContext", arg0, arg1, arg2) |
||||
ret0, _ := ret[0].(backend.PluginContext) |
||||
ret1, _ := ret[1].(bool) |
||||
ret2, _ := ret[2].(error) |
||||
return ret0, ret1, ret2 |
||||
} |
||||
|
||||
// GetPluginContext indicates an expected call of GetPluginContext.
|
||||
func (mr *MockPluginContextGetterMockRecorder) GetPluginContext(arg0, arg1, arg2 interface{}) *gomock.Call { |
||||
mr.mock.ctrl.T.Helper() |
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPluginContext", reflect.TypeOf((*MockPluginContextGetter)(nil).GetPluginContext), arg0, arg1, arg2) |
||||
} |
||||
|
||||
// MockStreamRunner is a mock of StreamRunner interface.
|
||||
type MockStreamRunner struct { |
||||
ctrl *gomock.Controller |
||||
recorder *MockStreamRunnerMockRecorder |
||||
} |
||||
|
||||
// MockStreamRunnerMockRecorder is the mock recorder for MockStreamRunner.
|
||||
type MockStreamRunnerMockRecorder struct { |
||||
mock *MockStreamRunner |
||||
} |
||||
|
||||
// NewMockStreamRunner creates a new mock instance.
|
||||
func NewMockStreamRunner(ctrl *gomock.Controller) *MockStreamRunner { |
||||
mock := &MockStreamRunner{ctrl: ctrl} |
||||
mock.recorder = &MockStreamRunnerMockRecorder{mock} |
||||
return mock |
||||
} |
||||
|
||||
// EXPECT returns an object that allows the caller to indicate expected use.
|
||||
func (m *MockStreamRunner) EXPECT() *MockStreamRunnerMockRecorder { |
||||
return m.recorder |
||||
} |
||||
|
||||
// RunStream mocks base method.
|
||||
func (m *MockStreamRunner) RunStream(arg0 context.Context, arg1 *backend.RunStreamRequest, arg2 backend.StreamPacketSender) error { |
||||
m.ctrl.T.Helper() |
||||
ret := m.ctrl.Call(m, "RunStream", arg0, arg1, arg2) |
||||
ret0, _ := ret[0].(error) |
||||
return ret0 |
||||
} |
||||
|
||||
// RunStream indicates an expected call of RunStream.
|
||||
func (mr *MockStreamRunnerMockRecorder) RunStream(arg0, arg1, arg2 interface{}) *gomock.Call { |
||||
mr.mock.ctrl.T.Helper() |
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RunStream", reflect.TypeOf((*MockStreamRunner)(nil).RunStream), arg0, arg1, arg2) |
||||
} |
||||
@ -0,0 +1,122 @@ |
||||
package features |
||||
|
||||
import ( |
||||
"context" |
||||
"fmt" |
||||
|
||||
"github.com/centrifugal/centrifuge" |
||||
"github.com/grafana/grafana-plugin-sdk-go/backend" |
||||
"github.com/grafana/grafana/pkg/models" |
||||
) |
||||
|
||||
//go:generate mockgen -destination=mock.go -package=features github.com/grafana/grafana/pkg/services/live/features ChannelPublisher,PresenceGetter,PluginContextGetter,StreamRunner
|
||||
|
||||
type ChannelPublisher interface { |
||||
Publish(channel string, data []byte) error |
||||
} |
||||
|
||||
type PresenceGetter interface { |
||||
GetNumSubscribers(channel string) (int, error) |
||||
} |
||||
|
||||
type PluginContextGetter interface { |
||||
GetPluginContext(ctx context.Context, pluginID string, datasourceUID string) (backend.PluginContext, bool, error) |
||||
} |
||||
|
||||
type StreamRunner interface { |
||||
RunStream(ctx context.Context, request *backend.RunStreamRequest, sender backend.StreamPacketSender) error |
||||
} |
||||
|
||||
type streamSender struct { |
||||
channel string |
||||
channelPublisher ChannelPublisher |
||||
} |
||||
|
||||
func newStreamSender(channel string, publisher ChannelPublisher) *streamSender { |
||||
return &streamSender{channel: channel, channelPublisher: publisher} |
||||
} |
||||
|
||||
func (p *streamSender) Send(packet *backend.StreamPacket) error { |
||||
return p.channelPublisher.Publish(p.channel, packet.Payload) |
||||
} |
||||
|
||||
// PluginRunner can handle streaming operations for channels belonging to plugins.
|
||||
type PluginRunner struct { |
||||
pluginID string |
||||
datasourceUID string |
||||
pluginContextGetter PluginContextGetter |
||||
handler backend.StreamHandler |
||||
streamManager *StreamManager |
||||
} |
||||
|
||||
// NewPluginRunner creates new PluginRunner.
|
||||
func NewPluginRunner(pluginID string, datasourceUID string, streamManager *StreamManager, pluginContextGetter PluginContextGetter, handler backend.StreamHandler) *PluginRunner { |
||||
return &PluginRunner{ |
||||
pluginID: pluginID, |
||||
datasourceUID: datasourceUID, |
||||
pluginContextGetter: pluginContextGetter, |
||||
handler: handler, |
||||
streamManager: streamManager, |
||||
} |
||||
} |
||||
|
||||
// GetHandlerForPath gets the handler for a path.
|
||||
func (m *PluginRunner) GetHandlerForPath(path string) (models.ChannelHandler, error) { |
||||
return &PluginPathRunner{ |
||||
path: path, |
||||
pluginID: m.pluginID, |
||||
datasourceUID: m.datasourceUID, |
||||
streamManager: m.streamManager, |
||||
handler: m.handler, |
||||
pluginContextGetter: m.pluginContextGetter, |
||||
}, nil |
||||
} |
||||
|
||||
// PluginPathRunner can handle streaming operations for channels belonging to plugin specific path.
|
||||
type PluginPathRunner struct { |
||||
path string |
||||
pluginID string |
||||
datasourceUID string |
||||
streamManager *StreamManager |
||||
handler backend.StreamHandler |
||||
pluginContextGetter PluginContextGetter |
||||
} |
||||
|
||||
// OnSubscribe passes control to a plugin.
|
||||
func (r *PluginPathRunner) OnSubscribe(client *centrifuge.Client, e centrifuge.SubscribeEvent) (centrifuge.SubscribeReply, error) { |
||||
pCtx, found, err := r.pluginContextGetter.GetPluginContext(client.Context(), r.pluginID, r.datasourceUID) |
||||
if err != nil { |
||||
logger.Error("Get plugin context error", "error", err, "path", r.path) |
||||
return centrifuge.SubscribeReply{}, err |
||||
} |
||||
if !found { |
||||
logger.Error("Plugin context not found", "path", r.path) |
||||
return centrifuge.SubscribeReply{}, centrifuge.ErrorInternal |
||||
} |
||||
resp, err := r.handler.CanSubscribeToStream(client.Context(), &backend.SubscribeToStreamRequest{ |
||||
PluginContext: pCtx, |
||||
Path: r.path, |
||||
}) |
||||
if err != nil { |
||||
logger.Error("Plugin CanSubscribeToStream call error", "error", err, "path", r.path) |
||||
return centrifuge.SubscribeReply{}, err |
||||
} |
||||
if !resp.OK { |
||||
return centrifuge.SubscribeReply{}, centrifuge.ErrorPermissionDenied |
||||
} |
||||
err = r.streamManager.SubmitStream(e.Channel, r.path, pCtx, r.handler) |
||||
if err != nil { |
||||
logger.Error("Error submitting stream to manager", "error", err, "path", r.path) |
||||
return centrifuge.SubscribeReply{}, centrifuge.ErrorInternal |
||||
} |
||||
return centrifuge.SubscribeReply{ |
||||
Options: centrifuge.SubscribeOptions{ |
||||
Presence: true, |
||||
}, |
||||
}, nil |
||||
} |
||||
|
||||
// OnPublish passes control to a plugin.
|
||||
func (r *PluginPathRunner) OnPublish(_ *centrifuge.Client, _ centrifuge.PublishEvent) (centrifuge.PublishReply, error) { |
||||
return centrifuge.PublishReply{}, fmt.Errorf("not implemented yet") |
||||
} |
||||
@ -0,0 +1,173 @@ |
||||
package features |
||||
|
||||
import ( |
||||
"context" |
||||
"errors" |
||||
"sync" |
||||
"time" |
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend" |
||||
) |
||||
|
||||
// StreamManager manages streams from Grafana to plugins.
|
||||
type StreamManager struct { |
||||
mu sync.RWMutex |
||||
streams map[string]struct{} |
||||
presenceGetter PresenceGetter |
||||
channelPublisher ChannelPublisher |
||||
registerCh chan streamRequest |
||||
closedCh chan struct{} |
||||
checkInterval time.Duration |
||||
maxChecks int |
||||
} |
||||
|
||||
// StreamManagerOption modifies StreamManager behavior (used for tests for example).
|
||||
type StreamManagerOption func(*StreamManager) |
||||
|
||||
// WithCheckConfig allows setting custom check rules.
|
||||
func WithCheckConfig(interval time.Duration, maxChecks int) StreamManagerOption { |
||||
return func(sm *StreamManager) { |
||||
sm.checkInterval = interval |
||||
sm.maxChecks = maxChecks |
||||
} |
||||
} |
||||
|
||||
const ( |
||||
defaultCheckInterval = 5 * time.Second |
||||
defaultMaxChecks = 3 |
||||
) |
||||
|
||||
// NewStreamManager creates new StreamManager.
|
||||
func NewStreamManager(chPublisher ChannelPublisher, presenceGetter PresenceGetter, opts ...StreamManagerOption) *StreamManager { |
||||
sm := &StreamManager{ |
||||
streams: make(map[string]struct{}), |
||||
channelPublisher: chPublisher, |
||||
presenceGetter: presenceGetter, |
||||
registerCh: make(chan streamRequest), |
||||
closedCh: make(chan struct{}), |
||||
checkInterval: defaultCheckInterval, |
||||
maxChecks: defaultMaxChecks, |
||||
} |
||||
for _, opt := range opts { |
||||
opt(sm) |
||||
} |
||||
return sm |
||||
} |
||||
|
||||
func (s *StreamManager) stopStream(sr streamRequest, cancelFn func()) { |
||||
s.mu.Lock() |
||||
defer s.mu.Unlock() |
||||
delete(s.streams, sr.Channel) |
||||
cancelFn() |
||||
} |
||||
|
||||
func (s *StreamManager) watchStream(ctx context.Context, cancelFn func(), sr streamRequest) { |
||||
numNoSubscribersChecks := 0 |
||||
for { |
||||
select { |
||||
case <-ctx.Done(): |
||||
return |
||||
case <-time.After(s.checkInterval): |
||||
numSubscribers, err := s.presenceGetter.GetNumSubscribers(sr.Channel) |
||||
if err != nil { |
||||
logger.Error("Error checking num subscribers", "channel", sr.Channel, "path", sr.Path) |
||||
continue |
||||
} |
||||
if numSubscribers > 0 { |
||||
// reset counter since channel has active subscribers.
|
||||
numNoSubscribersChecks = 0 |
||||
continue |
||||
} |
||||
numNoSubscribersChecks++ |
||||
if numNoSubscribersChecks >= s.maxChecks { |
||||
logger.Info("Stop stream since no active subscribers", "channel", sr.Channel, "path", sr.Path) |
||||
s.stopStream(sr, cancelFn) |
||||
return |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
// run stream until context canceled.
|
||||
func (s *StreamManager) runStream(ctx context.Context, sr streamRequest) { |
||||
for { |
||||
select { |
||||
case <-ctx.Done(): |
||||
return |
||||
default: |
||||
} |
||||
err := sr.StreamRunner.RunStream( |
||||
ctx, |
||||
&backend.RunStreamRequest{ |
||||
PluginContext: sr.PluginContext, |
||||
Path: sr.Path, |
||||
}, |
||||
newStreamSender(sr.Channel, s.channelPublisher), |
||||
) |
||||
if err != nil { |
||||
if errors.Is(ctx.Err(), context.Canceled) { |
||||
logger.Info("Stream cleanly finished", "path", sr.Path) |
||||
return |
||||
} |
||||
logger.Error("Error running stream, retrying", "path", sr.Path, "error", err) |
||||
continue |
||||
} |
||||
logger.Warn("Stream finished without error?", "path", sr.Path) |
||||
return |
||||
} |
||||
} |
||||
|
||||
func (s *StreamManager) registerStream(ctx context.Context, sr streamRequest) { |
||||
s.mu.Lock() |
||||
if _, ok := s.streams[sr.Channel]; ok { |
||||
logger.Debug("Skip running new stream (already exists)", "path", sr.Path) |
||||
s.mu.Unlock() |
||||
return |
||||
} |
||||
ctx, cancel := context.WithCancel(ctx) |
||||
defer cancel() |
||||
s.streams[sr.Channel] = struct{}{} |
||||
s.mu.Unlock() |
||||
|
||||
go s.watchStream(ctx, cancel, sr) |
||||
s.runStream(ctx, sr) |
||||
} |
||||
|
||||
// Run StreamManager till context canceled.
|
||||
func (s *StreamManager) Run(ctx context.Context) error { |
||||
for { |
||||
select { |
||||
case sr := <-s.registerCh: |
||||
go s.registerStream(ctx, sr) |
||||
case <-ctx.Done(): |
||||
close(s.closedCh) |
||||
return ctx.Err() |
||||
} |
||||
} |
||||
} |
||||
|
||||
type streamRequest struct { |
||||
Channel string |
||||
Path string |
||||
PluginContext backend.PluginContext |
||||
StreamRunner StreamRunner |
||||
} |
||||
|
||||
// SubmitStream submits stream handler in StreamManager to manage.
|
||||
// The stream will be opened and kept till channel has active subscribers.
|
||||
func (s *StreamManager) SubmitStream(channel string, path string, pCtx backend.PluginContext, streamRunner StreamRunner) error { |
||||
select { |
||||
case <-s.closedCh: |
||||
close(s.registerCh) |
||||
return nil |
||||
case s.registerCh <- streamRequest{ |
||||
Channel: channel, |
||||
Path: path, |
||||
PluginContext: pCtx, |
||||
StreamRunner: streamRunner, |
||||
}: |
||||
case <-time.After(time.Second): |
||||
return errors.New("timeout") |
||||
} |
||||
return nil |
||||
} |
||||
@ -0,0 +1,129 @@ |
||||
package features |
||||
|
||||
import ( |
||||
"context" |
||||
"testing" |
||||
"time" |
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend" |
||||
|
||||
"github.com/golang/mock/gomock" |
||||
"github.com/stretchr/testify/require" |
||||
) |
||||
|
||||
// wait until channel closed with timeout.
|
||||
func waitWithTimeout(tb testing.TB, ch chan struct{}, timeout time.Duration) { |
||||
select { |
||||
case <-ch: |
||||
case <-time.After(timeout): |
||||
tb.Fatal("timeout") |
||||
} |
||||
} |
||||
|
||||
func TestStreamManager_Run(t *testing.T) { |
||||
mockCtrl := gomock.NewController(t) |
||||
defer mockCtrl.Finish() |
||||
|
||||
mockChannelPublisher := NewMockChannelPublisher(mockCtrl) |
||||
mockPresenceGetter := NewMockPresenceGetter(mockCtrl) |
||||
|
||||
manager := NewStreamManager(mockChannelPublisher, mockPresenceGetter) |
||||
|
||||
ctx, cancel := context.WithCancel(context.Background()) |
||||
defer cancel() |
||||
|
||||
go func() { |
||||
cancel() |
||||
}() |
||||
|
||||
err := manager.Run(ctx) |
||||
require.ErrorIs(t, err, context.Canceled) |
||||
} |
||||
|
||||
func TestStreamManager_SubmitStream_Send(t *testing.T) { |
||||
mockCtrl := gomock.NewController(t) |
||||
defer mockCtrl.Finish() |
||||
|
||||
mockChannelPublisher := NewMockChannelPublisher(mockCtrl) |
||||
mockPresenceGetter := NewMockPresenceGetter(mockCtrl) |
||||
|
||||
manager := NewStreamManager(mockChannelPublisher, mockPresenceGetter) |
||||
|
||||
ctx, cancel := context.WithCancel(context.Background()) |
||||
defer cancel() |
||||
go func() { |
||||
_ = manager.Run(ctx) |
||||
}() |
||||
|
||||
startedCh := make(chan struct{}) |
||||
doneCh := make(chan struct{}) |
||||
|
||||
mockChannelPublisher.EXPECT().Publish("test", []byte("test")).Times(1) |
||||
|
||||
mockStreamRunner := NewMockStreamRunner(mockCtrl) |
||||
mockStreamRunner.EXPECT().RunStream( |
||||
gomock.Any(), gomock.Any(), gomock.Any(), |
||||
).Do(func(ctx context.Context, req *backend.RunStreamRequest, sender backend.StreamPacketSender) error { |
||||
require.Equal(t, "test", req.Path) |
||||
close(startedCh) |
||||
err := sender.Send(&backend.StreamPacket{ |
||||
Payload: []byte("test"), |
||||
}) |
||||
require.NoError(t, err) |
||||
<-ctx.Done() |
||||
close(doneCh) |
||||
return ctx.Err() |
||||
}).Times(1) |
||||
|
||||
err := manager.SubmitStream("test", "test", backend.PluginContext{}, mockStreamRunner) |
||||
require.NoError(t, err) |
||||
|
||||
// try submit the same.
|
||||
err = manager.SubmitStream("test", "test", backend.PluginContext{}, mockStreamRunner) |
||||
require.NoError(t, err) |
||||
|
||||
waitWithTimeout(t, startedCh, time.Second) |
||||
require.Len(t, manager.streams, 1) |
||||
cancel() |
||||
waitWithTimeout(t, doneCh, time.Second) |
||||
} |
||||
|
||||
func TestStreamManager_SubmitStream_CloseNoSubscribers(t *testing.T) { |
||||
mockCtrl := gomock.NewController(t) |
||||
defer mockCtrl.Finish() |
||||
|
||||
mockChannelPublisher := NewMockChannelPublisher(mockCtrl) |
||||
mockPresenceGetter := NewMockPresenceGetter(mockCtrl) |
||||
|
||||
manager := NewStreamManager( |
||||
mockChannelPublisher, |
||||
mockPresenceGetter, |
||||
WithCheckConfig(10*time.Millisecond, 3), |
||||
) |
||||
|
||||
ctx, cancel := context.WithCancel(context.Background()) |
||||
defer cancel() |
||||
go func() { |
||||
_ = manager.Run(ctx) |
||||
}() |
||||
|
||||
startedCh := make(chan struct{}) |
||||
doneCh := make(chan struct{}) |
||||
|
||||
mockPresenceGetter.EXPECT().GetNumSubscribers("test").Return(0, nil).Times(3) |
||||
|
||||
mockStreamRunner := NewMockStreamRunner(mockCtrl) |
||||
mockStreamRunner.EXPECT().RunStream(gomock.Any(), gomock.Any(), gomock.Any()).Do(func(ctx context.Context, req *backend.RunStreamRequest, sender backend.StreamPacketSender) error { |
||||
close(startedCh) |
||||
<-ctx.Done() |
||||
close(doneCh) |
||||
return ctx.Err() |
||||
}).Times(1) |
||||
|
||||
err := manager.SubmitStream("test", "test", backend.PluginContext{}, mockStreamRunner) |
||||
require.NoError(t, err) |
||||
|
||||
waitWithTimeout(t, startedCh, time.Second) |
||||
waitWithTimeout(t, doneCh, time.Second) |
||||
require.Len(t, manager.streams, 0) |
||||
} |
||||
@ -1,113 +0,0 @@ |
||||
package features |
||||
|
||||
import ( |
||||
"encoding/json" |
||||
"fmt" |
||||
"math/rand" |
||||
"time" |
||||
|
||||
"github.com/centrifugal/centrifuge" |
||||
"github.com/grafana/grafana/pkg/models" |
||||
) |
||||
|
||||
// testDataRunner manages all the `grafana/dashboard/*` channels.
|
||||
type testDataRunner struct { |
||||
publisher models.ChannelPublisher |
||||
running bool |
||||
speedMillis int |
||||
dropPercent float64 |
||||
channel string |
||||
name string |
||||
} |
||||
|
||||
// TestDataSupplier manages all the `grafana/testdata/*` channels.
|
||||
type TestDataSupplier struct { |
||||
Publisher models.ChannelPublisher |
||||
} |
||||
|
||||
// GetHandlerForPath gets the channel handler for a path.
|
||||
// Called on init.
|
||||
func (s *TestDataSupplier) GetHandlerForPath(path string) (models.ChannelHandler, error) { |
||||
channel := "grafana/testdata/" + path |
||||
|
||||
if path == "random-2s-stream" { |
||||
return &testDataRunner{ |
||||
publisher: s.Publisher, |
||||
running: false, |
||||
speedMillis: 2000, |
||||
dropPercent: 0, |
||||
channel: channel, |
||||
name: path, |
||||
}, nil |
||||
} |
||||
|
||||
if path == "random-flakey-stream" { |
||||
return &testDataRunner{ |
||||
publisher: s.Publisher, |
||||
running: false, |
||||
speedMillis: 400, |
||||
dropPercent: .6, |
||||
channel: channel, |
||||
}, nil |
||||
} |
||||
|
||||
return nil, fmt.Errorf("unknown channel") |
||||
} |
||||
|
||||
// OnSubscribe will let anyone connect to the path
|
||||
func (r *testDataRunner) OnSubscribe(c *centrifuge.Client, e centrifuge.SubscribeEvent) (centrifuge.SubscribeReply, error) { |
||||
if !r.running { |
||||
r.running = true |
||||
|
||||
// Run in the background
|
||||
go r.runRandomCSV() |
||||
} |
||||
|
||||
return centrifuge.SubscribeReply{}, nil |
||||
} |
||||
|
||||
// OnPublish checks if a message from the websocket can be broadcast on this channel
|
||||
func (r *testDataRunner) OnPublish(c *centrifuge.Client, e centrifuge.PublishEvent) (centrifuge.PublishReply, error) { |
||||
return centrifuge.PublishReply{}, fmt.Errorf("can not publish to testdata") |
||||
} |
||||
|
||||
// runRandomCSV is just for an example.
|
||||
func (r *testDataRunner) runRandomCSV() { |
||||
spread := 50.0 |
||||
|
||||
walker := rand.Float64() * 100 |
||||
ticker := time.NewTicker(time.Duration(r.speedMillis) * time.Millisecond) |
||||
|
||||
measurement := models.Measurement{ |
||||
Name: r.name, |
||||
Time: 0, |
||||
Values: make(map[string]interface{}, 5), |
||||
} |
||||
msg := models.MeasurementBatch{ |
||||
Measurements: []models.Measurement{measurement}, // always a single measurement
|
||||
} |
||||
|
||||
for t := range ticker.C { |
||||
if rand.Float64() <= r.dropPercent { |
||||
continue |
||||
} |
||||
delta := rand.Float64() - 0.5 |
||||
walker += delta |
||||
|
||||
measurement.Time = t.UnixNano() / int64(time.Millisecond) |
||||
measurement.Values["value"] = walker |
||||
measurement.Values["min"] = walker - ((rand.Float64() * spread) + 0.01) |
||||
measurement.Values["max"] = walker + ((rand.Float64() * spread) + 0.01) |
||||
|
||||
bytes, err := json.Marshal(&msg) |
||||
if err != nil { |
||||
logger.Warn("unable to marshal line", "error", err) |
||||
continue |
||||
} |
||||
|
||||
err = r.publisher(r.channel, bytes) |
||||
if err != nil { |
||||
logger.Warn("write", "channel", r.channel, "measurement", measurement) |
||||
} |
||||
} |
||||
} |
||||
@ -1,27 +0,0 @@ |
||||
package live |
||||
|
||||
import ( |
||||
"github.com/centrifugal/centrifuge" |
||||
"github.com/grafana/grafana/pkg/models" |
||||
"github.com/grafana/grafana/pkg/plugins" |
||||
) |
||||
|
||||
// PluginHandler manages all the `grafana/dashboard/*` channels
|
||||
type PluginHandler struct { |
||||
Plugin *plugins.PluginBase |
||||
} |
||||
|
||||
// GetHandlerForPath called on init
|
||||
func (h *PluginHandler) GetHandlerForPath(path string) (models.ChannelHandler, error) { |
||||
return h, nil // all dashboards share the same handler
|
||||
} |
||||
|
||||
// OnSubscribe for now allows anyone to subscribe
|
||||
func (h *PluginHandler) OnSubscribe(c *centrifuge.Client, e centrifuge.SubscribeEvent) (centrifuge.SubscribeReply, error) { |
||||
return centrifuge.SubscribeReply{}, nil |
||||
} |
||||
|
||||
// OnPublish checks if a message from the websocket can be broadcast on this channel
|
||||
func (h *PluginHandler) OnPublish(c *centrifuge.Client, e centrifuge.PublishEvent) (centrifuge.PublishReply, error) { |
||||
return centrifuge.PublishReply{}, nil // broadcast any event
|
||||
} |
||||
@ -0,0 +1,57 @@ |
||||
package live |
||||
|
||||
import ( |
||||
"context" |
||||
"fmt" |
||||
|
||||
"github.com/centrifugal/centrifuge" |
||||
"github.com/grafana/grafana-plugin-sdk-go/backend" |
||||
"github.com/grafana/grafana/pkg/plugins/plugincontext" |
||||
) |
||||
|
||||
type pluginChannelPublisher struct { |
||||
node *centrifuge.Node |
||||
} |
||||
|
||||
func newPluginChannelPublisher(node *centrifuge.Node) *pluginChannelPublisher { |
||||
return &pluginChannelPublisher{node: node} |
||||
} |
||||
|
||||
func (p *pluginChannelPublisher) Publish(channel string, data []byte) error { |
||||
_, err := p.node.Publish(channel, data) |
||||
return err |
||||
} |
||||
|
||||
type pluginPresenceGetter struct { |
||||
node *centrifuge.Node |
||||
} |
||||
|
||||
func newPluginPresenceGetter(node *centrifuge.Node) *pluginPresenceGetter { |
||||
return &pluginPresenceGetter{node: node} |
||||
} |
||||
|
||||
func (p *pluginPresenceGetter) GetNumSubscribers(channel string) (int, error) { |
||||
res, err := p.node.PresenceStats(channel) |
||||
if err != nil { |
||||
return 0, err |
||||
} |
||||
return res.NumClients, nil |
||||
} |
||||
|
||||
type pluginContextGetter struct { |
||||
PluginContextProvider *plugincontext.Provider |
||||
} |
||||
|
||||
func newPluginContextGetter(pluginContextProvider *plugincontext.Provider) *pluginContextGetter { |
||||
return &pluginContextGetter{ |
||||
PluginContextProvider: pluginContextProvider, |
||||
} |
||||
} |
||||
|
||||
func (g *pluginContextGetter) GetPluginContext(ctx context.Context, pluginID string, datasourceUID string) (backend.PluginContext, bool, error) { |
||||
user, ok := getContextSignedUser(ctx) |
||||
if !ok { |
||||
return backend.PluginContext{}, false, fmt.Errorf("no signed user found in context") |
||||
} |
||||
return g.PluginContextProvider.Get(pluginID, datasourceUID, user) |
||||
} |
||||
@ -0,0 +1,10 @@ |
||||
package live |
||||
|
||||
const ( |
||||
// ScopeGrafana contains builtin features of Grafana Core.
|
||||
ScopeGrafana = "grafana" |
||||
// ScopePlugin passes control to a plugin.
|
||||
ScopePlugin = "plugin" |
||||
// ScopeDatasource passes control to a datasource plugin.
|
||||
ScopeDatasource = "ds" |
||||
) |
||||
@ -0,0 +1,104 @@ |
||||
package testdatasource |
||||
|
||||
import ( |
||||
"context" |
||||
"fmt" |
||||
"math/rand" |
||||
"time" |
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend" |
||||
"github.com/grafana/grafana-plugin-sdk-go/data" |
||||
|
||||
"github.com/grafana/grafana/pkg/cmd/grafana-cli/logger" |
||||
"github.com/grafana/grafana/pkg/infra/log" |
||||
) |
||||
|
||||
type testStreamHandler struct { |
||||
logger log.Logger |
||||
} |
||||
|
||||
func newTestStreamHandler(logger log.Logger) *testStreamHandler { |
||||
return &testStreamHandler{ |
||||
logger: logger, |
||||
} |
||||
} |
||||
|
||||
func (p *testStreamHandler) CanSubscribeToStream(_ context.Context, req *backend.SubscribeToStreamRequest) (*backend.SubscribeToStreamResponse, error) { |
||||
p.logger.Debug("Allowing access to stream", "path", req.Path, "user", req.PluginContext.User) |
||||
return &backend.SubscribeToStreamResponse{OK: true}, nil |
||||
} |
||||
|
||||
func (p *testStreamHandler) RunStream(ctx context.Context, request *backend.RunStreamRequest, sender backend.StreamPacketSender) error { |
||||
p.logger.Debug("New stream call", "path", request.Path) |
||||
var conf testStreamConfig |
||||
switch request.Path { |
||||
case "random-2s-stream": |
||||
conf = testStreamConfig{ |
||||
Interval: 200 * time.Millisecond, |
||||
} |
||||
case "random-flakey-stream": |
||||
conf = testStreamConfig{ |
||||
Interval: 200 * time.Millisecond, |
||||
Drop: 0.6, |
||||
} |
||||
case "random-20Hz-stream": |
||||
conf = testStreamConfig{ |
||||
Interval: 50 * time.Millisecond, |
||||
} |
||||
default: |
||||
return fmt.Errorf("testdata plugin does not support path: %s", request.Path) |
||||
} |
||||
return p.runTestStream(ctx, request.Path, conf, sender) |
||||
} |
||||
|
||||
type testStreamConfig struct { |
||||
Interval time.Duration |
||||
Drop float64 |
||||
} |
||||
|
||||
func (p *testStreamHandler) runTestStream(ctx context.Context, path string, conf testStreamConfig, sender backend.StreamPacketSender) error { |
||||
spread := 50.0 |
||||
walker := rand.Float64() * 100 |
||||
|
||||
ticker := time.NewTicker(conf.Interval) |
||||
defer ticker.Stop() |
||||
|
||||
frame := data.NewFrame("testdata", |
||||
data.NewField("Time", nil, make([]time.Time, 1)), |
||||
data.NewField("Value", nil, make([]float64, 1)), |
||||
data.NewField("Min", nil, make([]float64, 1)), |
||||
data.NewField("Max", nil, make([]float64, 1)), |
||||
) |
||||
|
||||
for { |
||||
select { |
||||
case <-ctx.Done(): |
||||
p.logger.Debug("Stop streaming data for path", "path", path) |
||||
return ctx.Err() |
||||
case t := <-ticker.C: |
||||
if rand.Float64() < conf.Drop { |
||||
continue |
||||
} |
||||
delta := rand.Float64() - 0.5 |
||||
walker += delta |
||||
|
||||
frame.Fields[0].Set(0, t) |
||||
frame.Fields[1].Set(0, walker) // Value
|
||||
frame.Fields[2].Set(0, walker-((rand.Float64()*spread)+0.01)) // Min
|
||||
frame.Fields[3].Set(0, walker+((rand.Float64()*spread)+0.01)) // Max
|
||||
|
||||
bytes, err := data.FrameToJSON(frame, true, true) |
||||
if err != nil { |
||||
logger.Warn("unable to marshal line", "error", err) |
||||
continue |
||||
} |
||||
|
||||
packet := &backend.StreamPacket{ |
||||
Payload: bytes, |
||||
} |
||||
if err := sender.Send(packet); err != nil { |
||||
return err |
||||
} |
||||
} |
||||
} |
||||
} |
||||
Loading…
Reference in new issue