Live: pipeline subscriber interface (#39299)

pull/39353/head
Alexander Emelin 4 years ago committed by GitHub
parent 73873f99cd
commit e1f1773036
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 69
      pkg/services/live/live.go
  2. 58
      pkg/services/live/pipeline/config.go
  3. 17
      pkg/services/live/pipeline/devdata.go
  4. 20
      pkg/services/live/pipeline/pipeline.go
  5. 38
      pkg/services/live/pipeline/subscribe_builtin.go
  6. 35
      pkg/services/live/pipeline/subscribe_managed_stream.go
  7. 43
      pkg/services/live/pipeline/subscribe_multiple.go
  8. 18
      pkg/services/live/pipeline/subscribe_permission_denied.go
  9. 5
      pkg/tsdb/testdatasource/stream_handler.go

@ -168,9 +168,10 @@ func ProvideService(plugCtxProvider *plugincontext.Provider, cfg *setting.Cfg, r
var builder pipeline.RuleBuilder var builder pipeline.RuleBuilder
if os.Getenv("GF_LIVE_DEV_BUILDER") != "" { if os.Getenv("GF_LIVE_DEV_BUILDER") != "" {
builder = &pipeline.DevRuleBuilder{ builder = &pipeline.DevRuleBuilder{
Node: node, Node: node,
ManagedStream: g.ManagedStreamRunner, ManagedStream: g.ManagedStreamRunner,
FrameStorage: pipeline.NewFrameStorage(), FrameStorage: pipeline.NewFrameStorage(),
ChannelHandlerGetter: g,
} }
} else { } else {
storage := &pipeline.FileStorage{ storage := &pipeline.FileStorage{
@ -178,10 +179,11 @@ func ProvideService(plugCtxProvider *plugincontext.Provider, cfg *setting.Cfg, r
} }
g.channelRuleStorage = storage g.channelRuleStorage = storage
builder = &pipeline.StorageRuleBuilder{ builder = &pipeline.StorageRuleBuilder{
Node: node, Node: node,
ManagedStream: g.ManagedStreamRunner, ManagedStream: g.ManagedStreamRunner,
FrameStorage: pipeline.NewFrameStorage(), FrameStorage: pipeline.NewFrameStorage(),
RuleStorage: storage, RuleStorage: storage,
ChannelHandlerGetter: g,
} }
} }
channelRuleGetter := pipeline.NewCacheSegmentedTree(builder) channelRuleGetter := pipeline.NewCacheSegmentedTree(builder)
@ -509,22 +511,47 @@ func (g *GrafanaLive) handleOnSubscribe(client *centrifuge.Client, e centrifuge.
return centrifuge.SubscribeReply{}, centrifuge.ErrorPermissionDenied return centrifuge.SubscribeReply{}, centrifuge.ErrorPermissionDenied
} }
handler, addr, err := g.GetChannelHandler(user, channel) var reply models.SubscribeReply
if err != nil { var status backend.SubscribeStreamStatus
if errors.Is(err, live.ErrInvalidChannelID) {
logger.Info("Invalid channel ID", "user", client.UserID(), "client", client.ID(), "channel", e.Channel) var subscribeRuleFound bool
return centrifuge.SubscribeReply{}, &centrifuge.Error{Code: uint32(http.StatusBadRequest), Message: "invalid channel ID"} if g.Pipeline != nil {
rule, ok, err := g.Pipeline.Get(user.OrgId, channel)
if err != nil {
logger.Error("Error getting channel rule", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err)
return centrifuge.SubscribeReply{}, centrifuge.ErrorInternal
}
if ok && rule.Subscriber != nil {
subscribeRuleFound = true
var err error
reply, status, err = rule.Subscriber.Subscribe(client.Context(), pipeline.Vars{
OrgID: orgID,
Channel: channel,
})
if err != nil {
logger.Error("Error channel rule subscribe", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err)
return centrifuge.SubscribeReply{}, centrifuge.ErrorInternal
}
} }
logger.Error("Error getting channel handler", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err)
return centrifuge.SubscribeReply{}, centrifuge.ErrorInternal
} }
reply, status, err := handler.OnSubscribe(client.Context(), user, models.SubscribeEvent{ if !subscribeRuleFound {
Channel: channel, handler, addr, err := g.GetChannelHandler(user, channel)
Path: addr.Path, if err != nil {
}) if errors.Is(err, live.ErrInvalidChannelID) {
if err != nil { logger.Info("Invalid channel ID", "user", client.UserID(), "client", client.ID(), "channel", e.Channel)
logger.Error("Error calling channel handler subscribe", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err) return centrifuge.SubscribeReply{}, &centrifuge.Error{Code: uint32(http.StatusBadRequest), Message: "invalid channel ID"}
return centrifuge.SubscribeReply{}, centrifuge.ErrorInternal }
logger.Error("Error getting channel handler", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err)
return centrifuge.SubscribeReply{}, centrifuge.ErrorInternal
}
reply, status, err = handler.OnSubscribe(client.Context(), user, models.SubscribeEvent{
Channel: channel,
Path: addr.Path,
})
if err != nil {
logger.Error("Error calling channel handler subscribe", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err)
return centrifuge.SubscribeReply{}, centrifuge.ErrorInternal
}
} }
if status != backend.SubscribeStreamStatusOK { if status != backend.SubscribeStreamStatusOK {
// using HTTP error codes for WS errors too. // using HTTP error codes for WS errors too.

@ -56,10 +56,20 @@ type OutputterConfig struct {
ChangeLogOutputConfig *ChangeLogOutputConfig `json:"changeLog,omitempty"` ChangeLogOutputConfig *ChangeLogOutputConfig `json:"changeLog,omitempty"`
} }
type MultipleSubscriberConfig struct {
Subscribers []SubscriberConfig `json:"subscribers"`
}
type SubscriberConfig struct {
Type string `json:"type"`
MultipleSubscriberConfig *MultipleSubscriberConfig `json:"multiple,omitempty"`
}
type ChannelRuleSettings struct { type ChannelRuleSettings struct {
Converter *ConverterConfig `json:"converter,omitempty"` Subscriber *SubscriberConfig `json:"subscriber,omitempty"`
Processor *ProcessorConfig `json:"processor,omitempty"` Converter *ConverterConfig `json:"converter,omitempty"`
Outputter *OutputterConfig `json:"output,omitempty"` Processor *ProcessorConfig `json:"processor,omitempty"`
Outputter *OutputterConfig `json:"output,omitempty"`
} }
type ChannelRule struct { type ChannelRule struct {
@ -105,10 +115,40 @@ type RuleStorage interface {
} }
type StorageRuleBuilder struct { type StorageRuleBuilder struct {
Node *centrifuge.Node Node *centrifuge.Node
ManagedStream *managedstream.Runner ManagedStream *managedstream.Runner
FrameStorage *FrameStorage FrameStorage *FrameStorage
RuleStorage RuleStorage RuleStorage RuleStorage
ChannelHandlerGetter ChannelHandlerGetter
}
func (f *StorageRuleBuilder) extractSubscriber(config *SubscriberConfig) (Subscriber, error) {
if config == nil {
return nil, nil
}
missingConfiguration := fmt.Errorf("missing configuration for %s", config.Type)
switch config.Type {
case "builtin":
return NewBuiltinSubscriber(f.ChannelHandlerGetter), nil
case "managedStream":
return NewManagedStreamSubscriber(f.ManagedStream), nil
case "multiple":
if config.MultipleSubscriberConfig == nil {
return nil, missingConfiguration
}
var subscribers []Subscriber
for _, outConf := range config.MultipleSubscriberConfig.Subscribers {
out := outConf
sub, err := f.extractSubscriber(&out)
if err != nil {
return nil, err
}
subscribers = append(subscribers, sub)
}
return NewMultipleSubscriber(subscribers...), nil
default:
return nil, fmt.Errorf("unknown subscriber type: %s", config.Type)
}
} }
func (f *StorageRuleBuilder) extractConverter(config *ConverterConfig) (Converter, error) { func (f *StorageRuleBuilder) extractConverter(config *ConverterConfig) (Converter, error) {
@ -302,6 +342,10 @@ func (f *StorageRuleBuilder) BuildRules(ctx context.Context, orgID int64) ([]*Li
Pattern: ruleConfig.Pattern, Pattern: ruleConfig.Pattern,
} }
var err error var err error
rule.Subscriber, err = f.extractSubscriber(ruleConfig.Settings.Subscriber)
if err != nil {
return nil, err
}
rule.Converter, err = f.extractConverter(ruleConfig.Settings.Converter) rule.Converter, err = f.extractConverter(ruleConfig.Settings.Converter)
if err != nil { if err != nil {
return nil, err return nil, err

@ -10,10 +10,10 @@ import (
"os" "os"
"time" "time"
"github.com/centrifugal/centrifuge" "github.com/grafana/grafana/pkg/services/live/managedstream"
"github.com/centrifugal/centrifuge"
"github.com/grafana/grafana-plugin-sdk-go/data" "github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/services/live/managedstream"
) )
type Data struct { type Data struct {
@ -90,15 +90,20 @@ func postTestData() {
} }
type DevRuleBuilder struct { type DevRuleBuilder struct {
Node *centrifuge.Node Node *centrifuge.Node
ManagedStream *managedstream.Runner ManagedStream *managedstream.Runner
FrameStorage *FrameStorage FrameStorage *FrameStorage
ChannelHandlerGetter ChannelHandlerGetter
} }
func (f *DevRuleBuilder) BuildRules(_ context.Context, _ int64) ([]*LiveChannelRule, error) { func (f *DevRuleBuilder) BuildRules(_ context.Context, _ int64) ([]*LiveChannelRule, error) {
return []*LiveChannelRule{ return []*LiveChannelRule{
{ {
Pattern: "plugin/testdata/random-20Hz-stream", Pattern: "plugin/testdata/random-20Hz-stream",
Subscriber: NewMultipleSubscriber(
NewBuiltinSubscriber(f.ChannelHandlerGetter),
NewManagedStreamSubscriber(f.ManagedStream),
),
Converter: NewJsonFrameConverter(JsonFrameConverterConfig{}), Converter: NewJsonFrameConverter(JsonFrameConverterConfig{}),
Outputter: NewMultipleOutput( Outputter: NewMultipleOutput(
NewManagedStreamOutput(f.ManagedStream), NewManagedStreamOutput(f.ManagedStream),

@ -5,6 +5,10 @@ import (
"fmt" "fmt"
"os" "os"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana-plugin-sdk-go/data" "github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana-plugin-sdk-go/live" "github.com/grafana/grafana-plugin-sdk-go/live"
) )
@ -56,14 +60,20 @@ type Outputter interface {
Output(ctx context.Context, vars OutputVars, frame *data.Frame) ([]*ChannelFrame, error) Output(ctx context.Context, vars OutputVars, frame *data.Frame) ([]*ChannelFrame, error)
} }
// Subscriber can handle channel subscribe events.
type Subscriber interface {
Subscribe(ctx context.Context, vars Vars) (models.SubscribeReply, backend.SubscribeStreamStatus, error)
}
// LiveChannelRule is an in-memory representation of each specific rule, with Converter, Processor // LiveChannelRule is an in-memory representation of each specific rule, with Converter, Processor
// and Outputter to be executed by Pipeline. // and Outputter to be executed by Pipeline.
type LiveChannelRule struct { type LiveChannelRule struct {
OrgId int64 OrgId int64
Pattern string Pattern string
Converter Converter Subscriber Subscriber
Processor Processor Converter Converter
Outputter Outputter Processor Processor
Outputter Outputter
} }
// Label ... // Label ...

@ -0,0 +1,38 @@
package pipeline
import (
"context"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/services/live/livecontext"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/live"
)
type BuiltinSubscriber struct {
channelHandlerGetter ChannelHandlerGetter
}
type ChannelHandlerGetter interface {
GetChannelHandler(user *models.SignedInUser, channel string) (models.ChannelHandler, live.Channel, error)
}
func NewBuiltinSubscriber(channelHandlerGetter ChannelHandlerGetter) *BuiltinSubscriber {
return &BuiltinSubscriber{channelHandlerGetter: channelHandlerGetter}
}
func (m *BuiltinSubscriber) Subscribe(ctx context.Context, vars Vars) (models.SubscribeReply, backend.SubscribeStreamStatus, error) {
u, ok := livecontext.GetContextSignedUser(ctx)
if !ok {
return models.SubscribeReply{}, backend.SubscribeStreamStatusPermissionDenied, nil
}
handler, _, err := m.channelHandlerGetter.GetChannelHandler(u, vars.Channel)
if err != nil {
return models.SubscribeReply{}, 0, err
}
return handler.OnSubscribe(ctx, u, models.SubscribeEvent{
Channel: vars.Channel,
Path: vars.Path,
})
}

@ -0,0 +1,35 @@
package pipeline
import (
"context"
"github.com/grafana/grafana/pkg/services/live/livecontext"
"github.com/grafana/grafana/pkg/services/live/managedstream"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana/pkg/models"
)
type ManagedStreamSubscriber struct {
managedStream *managedstream.Runner
}
func NewManagedStreamSubscriber(managedStream *managedstream.Runner) *ManagedStreamSubscriber {
return &ManagedStreamSubscriber{managedStream: managedStream}
}
func (m *ManagedStreamSubscriber) Subscribe(ctx context.Context, vars Vars) (models.SubscribeReply, backend.SubscribeStreamStatus, error) {
stream, err := m.managedStream.GetOrCreateStream(vars.OrgID, vars.Scope, vars.Namespace)
if err != nil {
logger.Error("Error getting managed stream", "error", err)
return models.SubscribeReply{}, 0, err
}
u, ok := livecontext.GetContextSignedUser(ctx)
if !ok {
return models.SubscribeReply{}, backend.SubscribeStreamStatusPermissionDenied, nil
}
return stream.OnSubscribe(ctx, u, models.SubscribeEvent{
Channel: vars.Channel,
Path: vars.Path,
})
}

@ -0,0 +1,43 @@
package pipeline
import (
"context"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana/pkg/models"
)
type MultipleSubscriber struct {
Subscribers []Subscriber
}
func NewMultipleSubscriber(subscribers ...Subscriber) *MultipleSubscriber {
return &MultipleSubscriber{Subscribers: subscribers}
}
func (m *MultipleSubscriber) Subscribe(ctx context.Context, vars Vars) (models.SubscribeReply, backend.SubscribeStreamStatus, error) {
finalReply := models.SubscribeReply{}
for _, s := range m.Subscribers {
reply, status, err := s.Subscribe(ctx, vars)
if err != nil {
return models.SubscribeReply{}, 0, err
}
if status != backend.SubscribeStreamStatusOK {
return models.SubscribeReply{}, status, nil
}
if finalReply.Data == nil {
finalReply.Data = reply.Data
}
if !finalReply.JoinLeave {
finalReply.JoinLeave = reply.JoinLeave
}
if !finalReply.Presence {
finalReply.Presence = reply.Presence
}
if !finalReply.Recover {
finalReply.Recover = reply.Recover
}
}
return finalReply, backend.SubscribeStreamStatusOK, nil
}

@ -0,0 +1,18 @@
package pipeline
import (
"context"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana/pkg/models"
)
type PermissionDeniedSubscriber struct{}
func NewPermissionDeniedSubscriber() *PermissionDeniedSubscriber {
return &PermissionDeniedSubscriber{}
}
func (m *PermissionDeniedSubscriber) Subscribe(ctx context.Context, vars Vars) (models.SubscribeReply, backend.SubscribeStreamStatus, error) {
return models.SubscribeReply{}, backend.SubscribeStreamStatusPermissionDenied, nil
}

@ -50,6 +50,11 @@ func (p *testStreamHandler) SubscribeStream(_ context.Context, req *backend.Subs
} }
} }
if p.livePipelineEnabled {
// While developing Live pipeline avoid sending initial data.
initialData = nil
}
return &backend.SubscribeStreamResponse{ return &backend.SubscribeStreamResponse{
Status: backend.SubscribeStreamStatusOK, Status: backend.SubscribeStreamStatusOK,
InitialData: initialData, InitialData: initialData,

Loading…
Cancel
Save