diff --git a/pkg/services/live/live.go b/pkg/services/live/live.go index 58a0fd0d7a0..e683ded4074 100644 --- a/pkg/services/live/live.go +++ b/pkg/services/live/live.go @@ -168,9 +168,10 @@ func ProvideService(plugCtxProvider *plugincontext.Provider, cfg *setting.Cfg, r var builder pipeline.RuleBuilder if os.Getenv("GF_LIVE_DEV_BUILDER") != "" { builder = &pipeline.DevRuleBuilder{ - Node: node, - ManagedStream: g.ManagedStreamRunner, - FrameStorage: pipeline.NewFrameStorage(), + Node: node, + ManagedStream: g.ManagedStreamRunner, + FrameStorage: pipeline.NewFrameStorage(), + ChannelHandlerGetter: g, } } else { storage := &pipeline.FileStorage{ @@ -178,10 +179,11 @@ func ProvideService(plugCtxProvider *plugincontext.Provider, cfg *setting.Cfg, r } g.channelRuleStorage = storage builder = &pipeline.StorageRuleBuilder{ - Node: node, - ManagedStream: g.ManagedStreamRunner, - FrameStorage: pipeline.NewFrameStorage(), - RuleStorage: storage, + Node: node, + ManagedStream: g.ManagedStreamRunner, + FrameStorage: pipeline.NewFrameStorage(), + RuleStorage: storage, + ChannelHandlerGetter: g, } } channelRuleGetter := pipeline.NewCacheSegmentedTree(builder) @@ -509,22 +511,47 @@ func (g *GrafanaLive) handleOnSubscribe(client *centrifuge.Client, e centrifuge. return centrifuge.SubscribeReply{}, centrifuge.ErrorPermissionDenied } - handler, addr, err := g.GetChannelHandler(user, channel) - if err != nil { - if errors.Is(err, live.ErrInvalidChannelID) { - logger.Info("Invalid channel ID", "user", client.UserID(), "client", client.ID(), "channel", e.Channel) - return centrifuge.SubscribeReply{}, ¢rifuge.Error{Code: uint32(http.StatusBadRequest), Message: "invalid channel ID"} + var reply models.SubscribeReply + var status backend.SubscribeStreamStatus + + var subscribeRuleFound bool + if g.Pipeline != nil { + rule, ok, err := g.Pipeline.Get(user.OrgId, channel) + if err != nil { + logger.Error("Error getting channel rule", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err) + return centrifuge.SubscribeReply{}, centrifuge.ErrorInternal + } + if ok && rule.Subscriber != nil { + subscribeRuleFound = true + var err error + reply, status, err = rule.Subscriber.Subscribe(client.Context(), pipeline.Vars{ + OrgID: orgID, + Channel: channel, + }) + if err != nil { + logger.Error("Error channel rule subscribe", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err) + return centrifuge.SubscribeReply{}, centrifuge.ErrorInternal + } } - logger.Error("Error getting channel handler", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err) - return centrifuge.SubscribeReply{}, centrifuge.ErrorInternal } - reply, status, err := handler.OnSubscribe(client.Context(), user, models.SubscribeEvent{ - Channel: channel, - Path: addr.Path, - }) - if err != nil { - logger.Error("Error calling channel handler subscribe", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err) - return centrifuge.SubscribeReply{}, centrifuge.ErrorInternal + if !subscribeRuleFound { + handler, addr, err := g.GetChannelHandler(user, channel) + if err != nil { + if errors.Is(err, live.ErrInvalidChannelID) { + logger.Info("Invalid channel ID", "user", client.UserID(), "client", client.ID(), "channel", e.Channel) + return centrifuge.SubscribeReply{}, ¢rifuge.Error{Code: uint32(http.StatusBadRequest), Message: "invalid channel ID"} + } + logger.Error("Error getting channel handler", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err) + return centrifuge.SubscribeReply{}, centrifuge.ErrorInternal + } + reply, status, err = handler.OnSubscribe(client.Context(), user, models.SubscribeEvent{ + Channel: channel, + Path: addr.Path, + }) + if err != nil { + logger.Error("Error calling channel handler subscribe", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err) + return centrifuge.SubscribeReply{}, centrifuge.ErrorInternal + } } if status != backend.SubscribeStreamStatusOK { // using HTTP error codes for WS errors too. diff --git a/pkg/services/live/pipeline/config.go b/pkg/services/live/pipeline/config.go index d5efa9142e1..61ec3373f9d 100644 --- a/pkg/services/live/pipeline/config.go +++ b/pkg/services/live/pipeline/config.go @@ -56,10 +56,20 @@ type OutputterConfig struct { ChangeLogOutputConfig *ChangeLogOutputConfig `json:"changeLog,omitempty"` } +type MultipleSubscriberConfig struct { + Subscribers []SubscriberConfig `json:"subscribers"` +} + +type SubscriberConfig struct { + Type string `json:"type"` + MultipleSubscriberConfig *MultipleSubscriberConfig `json:"multiple,omitempty"` +} + type ChannelRuleSettings struct { - Converter *ConverterConfig `json:"converter,omitempty"` - Processor *ProcessorConfig `json:"processor,omitempty"` - Outputter *OutputterConfig `json:"output,omitempty"` + Subscriber *SubscriberConfig `json:"subscriber,omitempty"` + Converter *ConverterConfig `json:"converter,omitempty"` + Processor *ProcessorConfig `json:"processor,omitempty"` + Outputter *OutputterConfig `json:"output,omitempty"` } type ChannelRule struct { @@ -105,10 +115,40 @@ type RuleStorage interface { } type StorageRuleBuilder struct { - Node *centrifuge.Node - ManagedStream *managedstream.Runner - FrameStorage *FrameStorage - RuleStorage RuleStorage + Node *centrifuge.Node + ManagedStream *managedstream.Runner + FrameStorage *FrameStorage + RuleStorage RuleStorage + ChannelHandlerGetter ChannelHandlerGetter +} + +func (f *StorageRuleBuilder) extractSubscriber(config *SubscriberConfig) (Subscriber, error) { + if config == nil { + return nil, nil + } + missingConfiguration := fmt.Errorf("missing configuration for %s", config.Type) + switch config.Type { + case "builtin": + return NewBuiltinSubscriber(f.ChannelHandlerGetter), nil + case "managedStream": + return NewManagedStreamSubscriber(f.ManagedStream), nil + case "multiple": + if config.MultipleSubscriberConfig == nil { + return nil, missingConfiguration + } + var subscribers []Subscriber + for _, outConf := range config.MultipleSubscriberConfig.Subscribers { + out := outConf + sub, err := f.extractSubscriber(&out) + if err != nil { + return nil, err + } + subscribers = append(subscribers, sub) + } + return NewMultipleSubscriber(subscribers...), nil + default: + return nil, fmt.Errorf("unknown subscriber type: %s", config.Type) + } } func (f *StorageRuleBuilder) extractConverter(config *ConverterConfig) (Converter, error) { @@ -302,6 +342,10 @@ func (f *StorageRuleBuilder) BuildRules(ctx context.Context, orgID int64) ([]*Li Pattern: ruleConfig.Pattern, } var err error + rule.Subscriber, err = f.extractSubscriber(ruleConfig.Settings.Subscriber) + if err != nil { + return nil, err + } rule.Converter, err = f.extractConverter(ruleConfig.Settings.Converter) if err != nil { return nil, err diff --git a/pkg/services/live/pipeline/devdata.go b/pkg/services/live/pipeline/devdata.go index d80ae1a1187..59bf7ce70c4 100644 --- a/pkg/services/live/pipeline/devdata.go +++ b/pkg/services/live/pipeline/devdata.go @@ -10,10 +10,10 @@ import ( "os" "time" - "github.com/centrifugal/centrifuge" + "github.com/grafana/grafana/pkg/services/live/managedstream" + "github.com/centrifugal/centrifuge" "github.com/grafana/grafana-plugin-sdk-go/data" - "github.com/grafana/grafana/pkg/services/live/managedstream" ) type Data struct { @@ -90,15 +90,20 @@ func postTestData() { } type DevRuleBuilder struct { - Node *centrifuge.Node - ManagedStream *managedstream.Runner - FrameStorage *FrameStorage + Node *centrifuge.Node + ManagedStream *managedstream.Runner + FrameStorage *FrameStorage + ChannelHandlerGetter ChannelHandlerGetter } func (f *DevRuleBuilder) BuildRules(_ context.Context, _ int64) ([]*LiveChannelRule, error) { return []*LiveChannelRule{ { - Pattern: "plugin/testdata/random-20Hz-stream", + Pattern: "plugin/testdata/random-20Hz-stream", + Subscriber: NewMultipleSubscriber( + NewBuiltinSubscriber(f.ChannelHandlerGetter), + NewManagedStreamSubscriber(f.ManagedStream), + ), Converter: NewJsonFrameConverter(JsonFrameConverterConfig{}), Outputter: NewMultipleOutput( NewManagedStreamOutput(f.ManagedStream), diff --git a/pkg/services/live/pipeline/pipeline.go b/pkg/services/live/pipeline/pipeline.go index ad5efca7d0c..5ff17d5b086 100644 --- a/pkg/services/live/pipeline/pipeline.go +++ b/pkg/services/live/pipeline/pipeline.go @@ -5,6 +5,10 @@ import ( "fmt" "os" + "github.com/grafana/grafana-plugin-sdk-go/backend" + + "github.com/grafana/grafana/pkg/models" + "github.com/grafana/grafana-plugin-sdk-go/data" "github.com/grafana/grafana-plugin-sdk-go/live" ) @@ -56,14 +60,20 @@ type Outputter interface { Output(ctx context.Context, vars OutputVars, frame *data.Frame) ([]*ChannelFrame, error) } +// Subscriber can handle channel subscribe events. +type Subscriber interface { + Subscribe(ctx context.Context, vars Vars) (models.SubscribeReply, backend.SubscribeStreamStatus, error) +} + // LiveChannelRule is an in-memory representation of each specific rule, with Converter, Processor // and Outputter to be executed by Pipeline. type LiveChannelRule struct { - OrgId int64 - Pattern string - Converter Converter - Processor Processor - Outputter Outputter + OrgId int64 + Pattern string + Subscriber Subscriber + Converter Converter + Processor Processor + Outputter Outputter } // Label ... diff --git a/pkg/services/live/pipeline/subscribe_builtin.go b/pkg/services/live/pipeline/subscribe_builtin.go new file mode 100644 index 00000000000..7788ab1c28a --- /dev/null +++ b/pkg/services/live/pipeline/subscribe_builtin.go @@ -0,0 +1,38 @@ +package pipeline + +import ( + "context" + + "github.com/grafana/grafana/pkg/models" + "github.com/grafana/grafana/pkg/services/live/livecontext" + + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana-plugin-sdk-go/live" +) + +type BuiltinSubscriber struct { + channelHandlerGetter ChannelHandlerGetter +} + +type ChannelHandlerGetter interface { + GetChannelHandler(user *models.SignedInUser, channel string) (models.ChannelHandler, live.Channel, error) +} + +func NewBuiltinSubscriber(channelHandlerGetter ChannelHandlerGetter) *BuiltinSubscriber { + return &BuiltinSubscriber{channelHandlerGetter: channelHandlerGetter} +} + +func (m *BuiltinSubscriber) Subscribe(ctx context.Context, vars Vars) (models.SubscribeReply, backend.SubscribeStreamStatus, error) { + u, ok := livecontext.GetContextSignedUser(ctx) + if !ok { + return models.SubscribeReply{}, backend.SubscribeStreamStatusPermissionDenied, nil + } + handler, _, err := m.channelHandlerGetter.GetChannelHandler(u, vars.Channel) + if err != nil { + return models.SubscribeReply{}, 0, err + } + return handler.OnSubscribe(ctx, u, models.SubscribeEvent{ + Channel: vars.Channel, + Path: vars.Path, + }) +} diff --git a/pkg/services/live/pipeline/subscribe_managed_stream.go b/pkg/services/live/pipeline/subscribe_managed_stream.go new file mode 100644 index 00000000000..9ddd21d16ce --- /dev/null +++ b/pkg/services/live/pipeline/subscribe_managed_stream.go @@ -0,0 +1,35 @@ +package pipeline + +import ( + "context" + + "github.com/grafana/grafana/pkg/services/live/livecontext" + "github.com/grafana/grafana/pkg/services/live/managedstream" + + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana/pkg/models" +) + +type ManagedStreamSubscriber struct { + managedStream *managedstream.Runner +} + +func NewManagedStreamSubscriber(managedStream *managedstream.Runner) *ManagedStreamSubscriber { + return &ManagedStreamSubscriber{managedStream: managedStream} +} + +func (m *ManagedStreamSubscriber) Subscribe(ctx context.Context, vars Vars) (models.SubscribeReply, backend.SubscribeStreamStatus, error) { + stream, err := m.managedStream.GetOrCreateStream(vars.OrgID, vars.Scope, vars.Namespace) + if err != nil { + logger.Error("Error getting managed stream", "error", err) + return models.SubscribeReply{}, 0, err + } + u, ok := livecontext.GetContextSignedUser(ctx) + if !ok { + return models.SubscribeReply{}, backend.SubscribeStreamStatusPermissionDenied, nil + } + return stream.OnSubscribe(ctx, u, models.SubscribeEvent{ + Channel: vars.Channel, + Path: vars.Path, + }) +} diff --git a/pkg/services/live/pipeline/subscribe_multiple.go b/pkg/services/live/pipeline/subscribe_multiple.go new file mode 100644 index 00000000000..b508cddb627 --- /dev/null +++ b/pkg/services/live/pipeline/subscribe_multiple.go @@ -0,0 +1,43 @@ +package pipeline + +import ( + "context" + + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana/pkg/models" +) + +type MultipleSubscriber struct { + Subscribers []Subscriber +} + +func NewMultipleSubscriber(subscribers ...Subscriber) *MultipleSubscriber { + return &MultipleSubscriber{Subscribers: subscribers} +} + +func (m *MultipleSubscriber) Subscribe(ctx context.Context, vars Vars) (models.SubscribeReply, backend.SubscribeStreamStatus, error) { + finalReply := models.SubscribeReply{} + + for _, s := range m.Subscribers { + reply, status, err := s.Subscribe(ctx, vars) + if err != nil { + return models.SubscribeReply{}, 0, err + } + if status != backend.SubscribeStreamStatusOK { + return models.SubscribeReply{}, status, nil + } + if finalReply.Data == nil { + finalReply.Data = reply.Data + } + if !finalReply.JoinLeave { + finalReply.JoinLeave = reply.JoinLeave + } + if !finalReply.Presence { + finalReply.Presence = reply.Presence + } + if !finalReply.Recover { + finalReply.Recover = reply.Recover + } + } + return finalReply, backend.SubscribeStreamStatusOK, nil +} diff --git a/pkg/services/live/pipeline/subscribe_permission_denied.go b/pkg/services/live/pipeline/subscribe_permission_denied.go new file mode 100644 index 00000000000..bfd3fe8a68d --- /dev/null +++ b/pkg/services/live/pipeline/subscribe_permission_denied.go @@ -0,0 +1,18 @@ +package pipeline + +import ( + "context" + + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana/pkg/models" +) + +type PermissionDeniedSubscriber struct{} + +func NewPermissionDeniedSubscriber() *PermissionDeniedSubscriber { + return &PermissionDeniedSubscriber{} +} + +func (m *PermissionDeniedSubscriber) Subscribe(ctx context.Context, vars Vars) (models.SubscribeReply, backend.SubscribeStreamStatus, error) { + return models.SubscribeReply{}, backend.SubscribeStreamStatusPermissionDenied, nil +} diff --git a/pkg/tsdb/testdatasource/stream_handler.go b/pkg/tsdb/testdatasource/stream_handler.go index 3d229dc846a..5c2df6a95e7 100644 --- a/pkg/tsdb/testdatasource/stream_handler.go +++ b/pkg/tsdb/testdatasource/stream_handler.go @@ -50,6 +50,11 @@ func (p *testStreamHandler) SubscribeStream(_ context.Context, req *backend.Subs } } + if p.livePipelineEnabled { + // While developing Live pipeline avoid sending initial data. + initialData = nil + } + return &backend.SubscribeStreamResponse{ Status: backend.SubscribeStreamStatusOK, InitialData: initialData,