Live: array for Processor, Outputter and Subscriber in channel rule top level (#39677)

pull/39958/head
Alexander Emelin 4 years ago committed by GitHub
parent ed0c43b106
commit 5358c45a3a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      go.mod
  2. 9
      go.sum
  3. 146
      pkg/services/live/live.go
  4. 23
      pkg/services/live/pipeline/auth.go
  5. 14
      pkg/services/live/pipeline/compare.go
  6. 13
      pkg/services/live/pipeline/condition_checker.go
  7. 51
      pkg/services/live/pipeline/condition_checker_multiple.go
  8. 10
      pkg/services/live/pipeline/condition_type.go
  9. 259
      pkg/services/live/pipeline/config.go
  10. 2
      pkg/services/live/pipeline/converter_influx_auto.go
  11. 4
      pkg/services/live/pipeline/converter_json_auto.go
  12. 3
      pkg/services/live/pipeline/converter_json_exact_test.go
  13. 47
      pkg/services/live/pipeline/data_output_builtin.go
  14. 38
      pkg/services/live/pipeline/data_output_local_subscribers.go
  15. 37
      pkg/services/live/pipeline/data_output_redirect.go
  16. 134
      pkg/services/live/pipeline/devdata.go
  17. 13
      pkg/services/live/pipeline/frame_condition_checker.go
  18. 43
      pkg/services/live/pipeline/frame_condition_checker_multiple.go
  19. 29
      pkg/services/live/pipeline/frame_condition_number_compare.go
  20. 16
      pkg/services/live/pipeline/frame_output_changelog.go
  21. 8
      pkg/services/live/pipeline/frame_output_changelog_test.go
  22. 33
      pkg/services/live/pipeline/frame_output_conditional.go
  23. 14
      pkg/services/live/pipeline/frame_output_local_subscribers.go
  24. 32
      pkg/services/live/pipeline/frame_output_managed_stream.go
  25. 36
      pkg/services/live/pipeline/frame_output_multiple.go
  26. 16
      pkg/services/live/pipeline/frame_output_redirect.go
  27. 18
      pkg/services/live/pipeline/frame_output_remote_write.go
  28. 8
      pkg/services/live/pipeline/frame_output_threshold.go
  29. 0
      pkg/services/live/pipeline/frame_output_threshold_mock.go
  30. 16
      pkg/services/live/pipeline/frame_output_threshold_test.go
  31. 43
      pkg/services/live/pipeline/frame_processor_drop_field.go
  32. 46
      pkg/services/live/pipeline/frame_processor_keep_field.go
  33. 35
      pkg/services/live/pipeline/frame_processor_multiple.go
  34. 33
      pkg/services/live/pipeline/output_conditional.go
  35. 32
      pkg/services/live/pipeline/output_managed_stream.go
  36. 36
      pkg/services/live/pipeline/output_multiple.go
  37. 375
      pkg/services/live/pipeline/pipeline.go
  38. 32
      pkg/services/live/pipeline/pipeline_test.go
  39. 43
      pkg/services/live/pipeline/processor_drop_field.go
  40. 46
      pkg/services/live/pipeline/processor_keep_field.go
  41. 35
      pkg/services/live/pipeline/processor_multiple.go
  42. 64
      pkg/services/live/pipeline/registry.go
  43. 3
      pkg/services/live/pipeline/rule_cache_segmented.go
  44. 10
      pkg/services/live/pipeline/rule_cache_segmented_test.go
  45. 39
      pkg/services/live/pipeline/subscribe_authorize_role.go
  46. 34
      pkg/services/live/pipeline/subscribe_authorize_role_test.go

@ -102,6 +102,10 @@ require (
github.com/yudai/gojsondiff v1.0.0
go.opentelemetry.io/collector v0.31.0
go.opentelemetry.io/collector/model v0.31.0
go.opentelemetry.io/otel v1.0.0
go.opentelemetry.io/otel/exporters/jaeger v1.0.0
go.opentelemetry.io/otel/sdk v1.0.0
go.opentelemetry.io/otel/trace v1.0.0
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e
golang.org/x/exp v0.0.0-20210220032938-85be41e4509f // indirect
golang.org/x/net v0.0.0-20210903162142-ad29c8ab022f

@ -2380,11 +2380,19 @@ go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.21.0/go.mod h1:
go.opentelemetry.io/contrib/zpages v0.0.0-20210722161726-7668016acb73/go.mod h1:NAkejuYm41lpyL43Fu1XdnCOYxN5NVV80/MJ03JQ/X8=
go.opentelemetry.io/otel v0.11.0/go.mod h1:G8UCk+KooF2HLkgo8RHX9epABH/aRGYET7gQOqBVdB0=
go.opentelemetry.io/otel v1.0.0-RC1/go.mod h1:x9tRa9HK4hSSq7jf2TKbqFbtt58/TGk0f9XiEYISI1I=
go.opentelemetry.io/otel v1.0.0 h1:qTTn6x71GVBvoafHK/yaRUmFzI4LcONZD0/kXxl5PHI=
go.opentelemetry.io/otel v1.0.0/go.mod h1:AjRVh9A5/5DE7S+mZtTR6t8vpKKryam+0lREnfmS4cg=
go.opentelemetry.io/otel/exporters/jaeger v1.0.0 h1:cLhx8llHw02h5JTqGqaRbYn+QVKHmrzD9vEbKnSPk5U=
go.opentelemetry.io/otel/exporters/jaeger v1.0.0/go.mod h1:q10N1AolE1JjqKrFJK2tYw0iZpmX+HBaXBtuCzRnBGQ=
go.opentelemetry.io/otel/internal/metric v0.21.0/go.mod h1:iOfAaY2YycsXfYD4kaRSbLx2LKmfpKObWBEv9QK5zFo=
go.opentelemetry.io/otel/metric v0.21.0/go.mod h1:JWCt1bjivC4iCrz/aCrM1GSw+ZcvY44KCbaeeRhzHnc=
go.opentelemetry.io/otel/oteltest v1.0.0-RC1/go.mod h1:+eoIG0gdEOaPNftuy1YScLr1Gb4mL/9lpDkZ0JjMRq4=
go.opentelemetry.io/otel/sdk v1.0.0-RC1/go.mod h1:kj6yPn7Pgt5ByRuwesbaWcRLA+V7BSDg3Hf8xRvsvf8=
go.opentelemetry.io/otel/sdk v1.0.0 h1:BNPMYUONPNbLneMttKSjQhOTlFLOD9U22HNG1KrIN2Y=
go.opentelemetry.io/otel/sdk v1.0.0/go.mod h1:PCrDHlSy5x1kjezSdL37PhbFUMjrsLRshJ2zCzeXwbM=
go.opentelemetry.io/otel/trace v1.0.0-RC1/go.mod h1:86UHmyHWFEtWjfWPSbu0+d0Pf9Q6e1U+3ViBOc+NXAg=
go.opentelemetry.io/otel/trace v1.0.0 h1:TSBr8GTEtKevYMG/2d21M989r5WJYVimhTHBKVEZuh4=
go.opentelemetry.io/otel/trace v1.0.0/go.mod h1:PXTWqayeFUlJV1YDNhsJYB184+IvAH814St6o6ajzIs=
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
go.starlark.net v0.0.0-20200901195727-6e684ef5eeee/go.mod h1:f0znQkUKRrkk36XxWbGjMqQM8wGv/xHBVE2qc3B5oFU=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
@ -2768,6 +2776,7 @@ golang.org/x/sys v0.0.0-20210324051608-47abb6519492/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210514084401-e8d321eab015/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=

@ -192,6 +192,22 @@ func ProvideService(plugCtxProvider *plugincontext.Provider, cfg *setting.Cfg, r
}
}
channelRuleGetter := pipeline.NewCacheSegmentedTree(builder)
// Pre-build/validate channel rules for all organizations on start.
// This can be unreasonable to have in production scenario with many
// organizations.
query := &models.SearchOrgsQuery{}
err := sqlstore.SearchOrgs(query)
if err != nil {
return nil, fmt.Errorf("can't get org list: %w", err)
}
for _, org := range query.Result {
_, _, err := channelRuleGetter.Get(org.Id, "")
if err != nil {
return nil, fmt.Errorf("error building channel rules for org %d: %w", org.Id, err)
}
}
g.Pipeline, err = pipeline.New(channelRuleGetter)
if err != nil {
return nil, err
@ -537,28 +553,47 @@ func (g *GrafanaLive) handleOnSubscribe(client *centrifuge.Client, e centrifuge.
var reply models.SubscribeReply
var status backend.SubscribeStreamStatus
var ruleFound bool
var subscribeRuleFound bool
if g.Pipeline != nil {
rule, ok, err := g.Pipeline.Get(user.OrgId, channel)
if err != nil {
logger.Error("Error getting channel rule", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err)
return centrifuge.SubscribeReply{}, centrifuge.ErrorInternal
}
if ok && rule.Subscriber != nil {
subscribeRuleFound = true
var err error
reply, status, err = rule.Subscriber.Subscribe(client.Context(), pipeline.Vars{
OrgID: orgID,
Channel: channel,
})
if err != nil {
logger.Error("Error channel rule subscribe", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err)
return centrifuge.SubscribeReply{}, centrifuge.ErrorInternal
ruleFound = ok
if ok {
if rule.SubscribeAuth != nil {
ok, err := rule.SubscribeAuth.CanSubscribe(client.Context(), user)
if err != nil {
logger.Error("Error checking subscribe permissions", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err)
return centrifuge.SubscribeReply{}, centrifuge.ErrorInternal
}
if !ok {
// using HTTP error codes for WS errors too.
code, text := subscribeStatusToHTTPError(backend.SubscribeStreamStatusPermissionDenied)
return centrifuge.SubscribeReply{}, &centrifuge.Error{Code: uint32(code), Message: text}
}
}
if len(rule.Subscribers) > 0 {
var err error
for _, sub := range rule.Subscribers {
reply, status, err = sub.Subscribe(client.Context(), pipeline.Vars{
OrgID: orgID,
Channel: channel,
})
if err != nil {
logger.Error("Error channel rule subscribe", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err)
return centrifuge.SubscribeReply{}, centrifuge.ErrorInternal
}
if status != backend.SubscribeStreamStatusOK {
break
}
}
}
}
}
if !subscribeRuleFound {
if !ruleFound {
handler, addr, err := g.GetChannelHandler(user, channel)
if err != nil {
if errors.Is(err, live.ErrInvalidChannelID) {
@ -615,6 +650,42 @@ func (g *GrafanaLive) handleOnPublish(client *centrifuge.Client, e centrifuge.Pu
return centrifuge.PublishReply{}, centrifuge.ErrorPermissionDenied
}
if g.Pipeline != nil {
rule, ok, err := g.Pipeline.Get(user.OrgId, channel)
if err != nil {
logger.Error("Error getting channel rule", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err)
return centrifuge.PublishReply{}, centrifuge.ErrorInternal
}
if ok {
if rule.PublishAuth != nil {
ok, err := rule.PublishAuth.CanPublish(client.Context(), user)
if err != nil {
logger.Error("Error checking publish permissions", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err)
return centrifuge.PublishReply{}, centrifuge.ErrorInternal
}
if !ok {
// using HTTP error codes for WS errors too.
code, text := publishStatusToHTTPError(backend.PublishStreamStatusPermissionDenied)
return centrifuge.PublishReply{}, &centrifuge.Error{Code: uint32(code), Message: text}
}
} else {
if !user.HasRole(models.ROLE_ADMIN) {
// using HTTP error codes for WS errors too.
code, text := publishStatusToHTTPError(backend.PublishStreamStatusPermissionDenied)
return centrifuge.PublishReply{}, &centrifuge.Error{Code: uint32(code), Message: text}
}
}
_, err := g.Pipeline.ProcessInput(client.Context(), user.OrgId, channel, e.Data)
if err != nil {
logger.Error("Error processing input", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err)
return centrifuge.PublishReply{}, centrifuge.ErrorInternal
}
return centrifuge.PublishReply{
Result: &centrifuge.PublishResult{},
}, nil
}
}
handler, addr, err := g.GetChannelHandler(user, channel)
if err != nil {
if errors.Is(err, live.ErrInvalidChannelID) {
@ -633,6 +704,7 @@ func (g *GrafanaLive) handleOnPublish(client *centrifuge.Client, e centrifuge.Pu
logger.Error("Error calling channel handler publish", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err)
return centrifuge.PublishReply{}, centrifuge.ErrorInternal
}
if status != backend.PublishStreamStatusOK {
// using HTTP error codes for WS errors too.
code, text := publishStatusToHTTPError(status)
@ -812,6 +884,38 @@ func (g *GrafanaLive) HandleHTTPPublish(ctx *models.ReqContext, cmd dtos.LivePub
}
logger.Debug("Publish API cmd", "user", ctx.SignedInUser.UserId, "channel", cmd.Channel)
user := ctx.SignedInUser
channel := cmd.Channel
if g.Pipeline != nil {
rule, ok, err := g.Pipeline.Get(user.OrgId, channel)
if err != nil {
logger.Error("Error getting channel rule", "user", user, "channel", channel, "error", err)
return response.Error(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError), nil)
}
if ok {
if rule.PublishAuth != nil {
ok, err := rule.PublishAuth.CanPublish(ctx.Req.Context(), user)
if err != nil {
logger.Error("Error checking publish permissions", "user", user, "channel", channel, "error", err)
return response.Error(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError), nil)
}
if !ok {
return response.Error(http.StatusForbidden, http.StatusText(http.StatusForbidden), nil)
}
} else {
if !user.HasRole(models.ROLE_ADMIN) {
return response.Error(http.StatusForbidden, http.StatusText(http.StatusForbidden), nil)
}
}
_, err := g.Pipeline.ProcessInput(ctx.Req.Context(), user.OrgId, channel, cmd.Data)
if err != nil {
logger.Error("Error processing input", "user", user, "channel", channel, "error", err)
return response.Error(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError), nil)
}
return response.JSON(http.StatusOK, dtos.LivePublishResponse{})
}
}
channelHandler, addr, err := g.GetChannelHandler(ctx.SignedInUser, cmd.Channel)
if err != nil {
@ -824,6 +928,7 @@ func (g *GrafanaLive) HandleHTTPPublish(ctx *models.ReqContext, cmd dtos.LivePub
logger.Error("Error calling OnPublish", "error", err, "channel", cmd.Channel)
return response.Error(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError), nil)
}
if status != backend.PublishStreamStatusOK {
code, text := publishStatusToHTTPError(status)
return response.Error(code, text, nil)
@ -952,13 +1057,13 @@ func (g *GrafanaLive) HandlePipelineConvertTestHTTP(c *models.ReqContext) respon
if !ok {
return response.Error(http.StatusNotFound, "No rule found", nil)
}
channelFrames, ok, err := pipe.DataToChannelFrames(c.Req.Context(), *rule, c.OrgId, req.Channel, []byte(req.Data))
if rule.Converter == nil {
return response.Error(http.StatusNotFound, "No converter found", nil)
}
channelFrames, err := pipe.DataToChannelFrames(c.Req.Context(), *rule, c.OrgId, req.Channel, []byte(req.Data))
if err != nil {
return response.Error(http.StatusInternalServerError, "Error converting data", err)
}
if !ok {
return response.Error(http.StatusNotFound, "No converter found", nil)
}
return response.JSON(http.StatusOK, ConvertDryRunResponse{
ChannelFrames: channelFrames,
})
@ -1031,10 +1136,11 @@ func (g *GrafanaLive) HandleChannelRulesDeleteHTTP(c *models.ReqContext) respons
// HandlePipelineEntitiesListHTTP ...
func (g *GrafanaLive) HandlePipelineEntitiesListHTTP(_ *models.ReqContext) response.Response {
return response.JSON(http.StatusOK, util.DynMap{
"subscribers": pipeline.SubscribersRegistry,
"outputs": pipeline.OutputsRegistry,
"converters": pipeline.ConvertersRegistry,
"processors": pipeline.ProcessorsRegistry,
"subscribers": pipeline.SubscribersRegistry,
"dataOutputs": pipeline.DataOutputsRegistry,
"converters": pipeline.ConvertersRegistry,
"frameProcessors": pipeline.FrameProcessorsRegistry,
"frameOutputs": pipeline.FrameOutputsRegistry,
})
}

@ -0,0 +1,23 @@
package pipeline
import (
"context"
"github.com/grafana/grafana/pkg/models"
)
type RoleCheckAuthorizer struct {
role models.RoleType
}
func NewRoleCheckAuthorizer(role models.RoleType) *RoleCheckAuthorizer {
return &RoleCheckAuthorizer{role: role}
}
func (s *RoleCheckAuthorizer) CanSubscribe(_ context.Context, u *models.SignedInUser) (bool, error) {
return u.HasRole(s.role), nil
}
func (s *RoleCheckAuthorizer) CanPublish(_ context.Context, u *models.SignedInUser) (bool, error) {
return u.HasRole(s.role), nil
}

@ -0,0 +1,14 @@
package pipeline
// NumberCompareOp is an comparison operator.
type NumberCompareOp string
// Known NumberCompareOp types.
const (
NumberCompareOpLt NumberCompareOp = "lt"
NumberCompareOpGt NumberCompareOp = "gt"
NumberCompareOpLte NumberCompareOp = "lte"
NumberCompareOpGte NumberCompareOp = "gte"
NumberCompareOpEq NumberCompareOp = "eq"
NumberCompareOpNe NumberCompareOp = "ne"
)

@ -1,13 +0,0 @@
package pipeline
import (
"context"
"github.com/grafana/grafana-plugin-sdk-go/data"
)
// ConditionChecker checks conditions in context of data.Frame being processed.
type ConditionChecker interface {
Type() string
CheckCondition(ctx context.Context, frame *data.Frame) (bool, error)
}

@ -1,51 +0,0 @@
package pipeline
import (
"context"
"github.com/grafana/grafana-plugin-sdk-go/data"
)
// ConditionType represents multiple condition operator type.
type ConditionType string
const (
ConditionAll ConditionType = "all"
ConditionAny ConditionType = "any"
)
// MultipleConditionChecker can check multiple conditions according to ConditionType.
type MultipleConditionChecker struct {
ConditionType ConditionType
Conditions []ConditionChecker
}
const ConditionCheckerTypeMultiple = "multiple"
func (c *MultipleConditionChecker) Type() string {
return ConditionCheckerTypeMultiple
}
func (c *MultipleConditionChecker) CheckCondition(ctx context.Context, frame *data.Frame) (bool, error) {
for _, cond := range c.Conditions {
ok, err := cond.CheckCondition(ctx, frame)
if err != nil {
return false, err
}
if ok && c.ConditionType == ConditionAny {
return true, nil
}
if !ok && c.ConditionType == ConditionAll {
return false, nil
}
}
if c.ConditionType == ConditionAny {
return false, nil
}
return true, nil
}
// NewMultipleConditionChecker creates new MultipleConditionChecker.
func NewMultipleConditionChecker(conditionType ConditionType, conditions ...ConditionChecker) *MultipleConditionChecker {
return &MultipleConditionChecker{ConditionType: conditionType, Conditions: conditions}
}

@ -0,0 +1,10 @@
package pipeline
// ConditionType represents multiple condition operator type.
type ConditionType string
// Known condition types.
const (
ConditionAll ConditionType = "all"
ConditionAny ConditionType = "any"
)

@ -4,10 +4,10 @@ import (
"context"
"fmt"
"github.com/grafana/grafana/pkg/services/live/pipeline/tree"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/services/live/managedstream"
"github.com/grafana/grafana/pkg/services/live/pipeline/pattern"
"github.com/grafana/grafana/pkg/services/live/pipeline/tree"
"github.com/centrifugal/centrifuge"
)
@ -22,33 +22,33 @@ type ConverterConfig struct {
JsonFrameConverterConfig *JsonFrameConverterConfig `json:"jsonFrame,omitempty"`
}
type ProcessorConfig struct {
Type string `json:"type"`
DropFieldsProcessorConfig *DropFieldsProcessorConfig `json:"dropFields,omitempty"`
KeepFieldsProcessorConfig *KeepFieldsProcessorConfig `json:"keepFields,omitempty"`
MultipleProcessorConfig *MultipleProcessorConfig `json:"multiple,omitempty"`
type FrameProcessorConfig struct {
Type string `json:"type"`
DropFieldsProcessorConfig *DropFieldsFrameProcessorConfig `json:"dropFields,omitempty"`
KeepFieldsProcessorConfig *KeepFieldsFrameProcessorConfig `json:"keepFields,omitempty"`
MultipleProcessorConfig *MultipleFrameProcessorConfig `json:"multiple,omitempty"`
}
type MultipleProcessorConfig struct {
Processors []ProcessorConfig `json:"processors"`
type MultipleFrameProcessorConfig struct {
Processors []FrameProcessorConfig `json:"processors"`
}
type MultipleOutputterConfig struct {
Outputters []OutputterConfig `json:"outputs"`
Outputters []FrameOutputterConfig `json:"outputs"`
}
type ManagedStreamOutputConfig struct{}
type ConditionalOutputConfig struct {
Condition *ConditionCheckerConfig `json:"condition"`
Outputter *OutputterConfig `json:"output"`
Condition *FrameConditionCheckerConfig `json:"condition"`
Outputter *FrameOutputterConfig `json:"output"`
}
type RemoteWriteOutputConfig struct {
UID string `json:"uid"`
}
type OutputterConfig struct {
type FrameOutputterConfig struct {
Type string `json:"type"`
ManagedStreamConfig *ManagedStreamOutputConfig `json:"managedStream,omitempty"`
MultipleOutputterConfig *MultipleOutputterConfig `json:"multiple,omitempty"`
@ -59,21 +59,40 @@ type OutputterConfig struct {
ChangeLogOutputConfig *ChangeLogOutputConfig `json:"changeLog,omitempty"`
}
type DataOutputterConfig struct {
Type string `json:"type"`
RedirectDataOutputConfig *RedirectDataOutputConfig `json:"redirect,omitempty"`
}
type MultipleSubscriberConfig struct {
Subscribers []SubscriberConfig `json:"subscribers"`
}
type SubscriberConfig struct {
Type string `json:"type"`
MultipleSubscriberConfig *MultipleSubscriberConfig `json:"multiple,omitempty"`
AuthorizeRoleSubscriberConfig *AuthorizeRoleSubscriberConfig `json:"authorizeRole,omitempty"`
Type string `json:"type"`
MultipleSubscriberConfig *MultipleSubscriberConfig `json:"multiple,omitempty"`
}
// ChannelAuthCheckConfig is used to define auth rules for a channel.
type ChannelAuthCheckConfig struct {
RequireRole models.RoleType `json:"role,omitempty"`
}
type ChannelAuthConfig struct {
// By default anyone can subscribe.
Subscribe *ChannelAuthCheckConfig `json:"subscribe,omitempty"`
// By default HTTP and WS require admin permissions to publish.
Publish *ChannelAuthCheckConfig `json:"publish,omitempty"`
}
type ChannelRuleSettings struct {
Subscriber *SubscriberConfig `json:"subscriber,omitempty"`
Converter *ConverterConfig `json:"converter,omitempty"`
Processor *ProcessorConfig `json:"processor,omitempty"`
Outputter *OutputterConfig `json:"output,omitempty"`
Auth *ChannelAuthConfig `json:"auth,omitempty"`
Subscribers []*SubscriberConfig `json:"subscribers,omitempty"`
DataOutputters []*DataOutputterConfig `json:"dataOutputs,omitempty"`
Converter *ConverterConfig `json:"converter,omitempty"`
FrameProcessors []*FrameProcessorConfig `json:"frameProcessors,omitempty"`
FrameOutputters []*FrameOutputterConfig `json:"frameOutputs,omitempty"`
}
type ChannelRule struct {
@ -92,19 +111,25 @@ func (r ChannelRule) Valid() (bool, string) {
return false, fmt.Sprintf("unknown converter type: %s", r.Settings.Converter.Type)
}
}
if r.Settings.Subscriber != nil {
if !typeRegistered(r.Settings.Subscriber.Type, SubscribersRegistry) {
return false, fmt.Sprintf("unknown subscriber type: %s", r.Settings.Subscriber.Type)
if len(r.Settings.Subscribers) > 0 {
for _, sub := range r.Settings.Subscribers {
if !typeRegistered(sub.Type, SubscribersRegistry) {
return false, fmt.Sprintf("unknown subscriber type: %s", sub.Type)
}
}
}
if r.Settings.Processor != nil {
if !typeRegistered(r.Settings.Processor.Type, ProcessorsRegistry) {
return false, fmt.Sprintf("unknown processor type: %s", r.Settings.Processor.Type)
if len(r.Settings.FrameProcessors) > 0 {
for _, proc := range r.Settings.FrameProcessors {
if !typeRegistered(proc.Type, FrameProcessorsRegistry) {
return false, fmt.Sprintf("unknown processor type: %s", proc.Type)
}
}
}
if r.Settings.Outputter != nil {
if !typeRegistered(r.Settings.Outputter.Type, OutputsRegistry) {
return false, fmt.Sprintf("unknown output type: %s", r.Settings.Outputter.Type)
if len(r.Settings.FrameOutputters) > 0 {
for _, out := range r.Settings.FrameOutputters {
if !typeRegistered(out.Type, FrameOutputsRegistry) {
return false, fmt.Sprintf("unknown output type: %s", out.Type)
}
}
}
return true, ""
@ -150,21 +175,21 @@ func checkRulesValid(orgID int64, rules []ChannelRule) (ok bool, reason string)
return ok, reason
}
type MultipleConditionCheckerConfig struct {
Type ConditionType `json:"type"`
Conditions []ConditionCheckerConfig `json:"conditions"`
type MultipleFrameConditionCheckerConfig struct {
Type ConditionType `json:"type"`
Conditions []FrameConditionCheckerConfig `json:"conditions"`
}
type NumberCompareConditionConfig struct {
type NumberCompareFrameConditionConfig struct {
FieldName string `json:"fieldName"`
Op NumberCompareOp `json:"op"`
Value float64 `json:"value"`
}
type ConditionCheckerConfig struct {
Type string `json:"type"`
MultipleConditionCheckerConfig *MultipleConditionCheckerConfig `json:"multiple,omitempty"`
NumberCompareConditionConfig *NumberCompareConditionConfig `json:"numberCompare,omitempty"`
type FrameConditionCheckerConfig struct {
Type string `json:"type"`
MultipleConditionCheckerConfig *MultipleFrameConditionCheckerConfig `json:"multiple,omitempty"`
NumberCompareConditionConfig *NumberCompareFrameConditionConfig `json:"numberCompare,omitempty"`
}
type RuleStorage interface {
@ -193,11 +218,6 @@ func (f *StorageRuleBuilder) extractSubscriber(config *SubscriberConfig) (Subscr
return NewBuiltinSubscriber(f.ChannelHandlerGetter), nil
case SubscriberTypeManagedStream:
return NewManagedStreamSubscriber(f.ManagedStream), nil
case SubscriberTypeAuthorizeRole:
if config.AuthorizeRoleSubscriberConfig == nil {
return nil, missingConfiguration
}
return NewAuthorizeRoleSubscriber(*config.AuthorizeRoleSubscriberConfig), nil
case SubscriberTypeMultiple:
if config.MultipleSubscriberConfig == nil {
return nil, missingConfiguration
@ -225,7 +245,7 @@ func (f *StorageRuleBuilder) extractConverter(config *ConverterConfig) (Converte
switch config.Type {
case ConverterTypeJsonAuto:
if config.AutoJsonConverterConfig == nil {
return nil, missingConfiguration
config.AutoJsonConverterConfig = &AutoJsonConverterConfig{}
}
return NewAutoJsonConverter(*config.AutoJsonConverterConfig), nil
case ConverterTypeJsonExact:
@ -235,7 +255,7 @@ func (f *StorageRuleBuilder) extractConverter(config *ConverterConfig) (Converte
return NewExactJsonConverter(*config.ExactJsonConverterConfig), nil
case ConverterTypeJsonFrame:
if config.JsonFrameConverterConfig == nil {
return nil, missingConfiguration
config.JsonFrameConverterConfig = &JsonFrameConverterConfig{}
}
return NewJsonFrameConverter(*config.JsonFrameConverterConfig), nil
case ConverterTypeInfluxAuto:
@ -248,120 +268,120 @@ func (f *StorageRuleBuilder) extractConverter(config *ConverterConfig) (Converte
}
}
func (f *StorageRuleBuilder) extractProcessor(config *ProcessorConfig) (Processor, error) {
func (f *StorageRuleBuilder) extractFrameProcessor(config *FrameProcessorConfig) (FrameProcessor, error) {
if config == nil {
return nil, nil
}
missingConfiguration := fmt.Errorf("missing configuration for %s", config.Type)
switch config.Type {
case ProcessorTypeDropFields:
case FrameProcessorTypeDropFields:
if config.DropFieldsProcessorConfig == nil {
return nil, missingConfiguration
}
return NewDropFieldsProcessor(*config.DropFieldsProcessorConfig), nil
case ProcessorTypeKeepFields:
return NewDropFieldsFrameProcessor(*config.DropFieldsProcessorConfig), nil
case FrameProcessorTypeKeepFields:
if config.KeepFieldsProcessorConfig == nil {
return nil, missingConfiguration
}
return NewKeepFieldsProcessor(*config.KeepFieldsProcessorConfig), nil
case ProcessorTypeMultiple:
return NewKeepFieldsFrameProcessor(*config.KeepFieldsProcessorConfig), nil
case FrameProcessorTypeMultiple:
if config.MultipleProcessorConfig == nil {
return nil, missingConfiguration
}
var processors []Processor
var processors []FrameProcessor
for _, outConf := range config.MultipleProcessorConfig.Processors {
out := outConf
proc, err := f.extractProcessor(&out)
proc, err := f.extractFrameProcessor(&out)
if err != nil {
return nil, err
}
processors = append(processors, proc)
}
return NewMultipleProcessor(processors...), nil
return NewMultipleFrameProcessor(processors...), nil
default:
return nil, fmt.Errorf("unknown processor type: %s", config.Type)
}
}
func (f *StorageRuleBuilder) extractConditionChecker(config *ConditionCheckerConfig) (ConditionChecker, error) {
func (f *StorageRuleBuilder) extractFrameConditionChecker(config *FrameConditionCheckerConfig) (FrameConditionChecker, error) {
if config == nil {
return nil, nil
}
missingConfiguration := fmt.Errorf("missing configuration for %s", config.Type)
switch config.Type {
case ConditionCheckerTypeNumberCompare:
case FrameConditionCheckerTypeNumberCompare:
if config.NumberCompareConditionConfig == nil {
return nil, missingConfiguration
}
c := *config.NumberCompareConditionConfig
return NewNumberCompareCondition(c.FieldName, c.Op, c.Value), nil
case ConditionCheckerTypeMultiple:
var conditions []ConditionChecker
return NewFrameNumberCompareCondition(c.FieldName, c.Op, c.Value), nil
case FrameConditionCheckerTypeMultiple:
var conditions []FrameConditionChecker
if config.MultipleConditionCheckerConfig == nil {
return nil, missingConfiguration
}
for _, outConf := range config.MultipleConditionCheckerConfig.Conditions {
out := outConf
cond, err := f.extractConditionChecker(&out)
cond, err := f.extractFrameConditionChecker(&out)
if err != nil {
return nil, err
}
conditions = append(conditions, cond)
}
return NewMultipleConditionChecker(config.MultipleConditionCheckerConfig.Type, conditions...), nil
return NewMultipleFrameConditionChecker(config.MultipleConditionCheckerConfig.Type, conditions...), nil
default:
return nil, fmt.Errorf("unknown condition type: %s", config.Type)
}
}
func (f *StorageRuleBuilder) extractOutputter(config *OutputterConfig, remoteWriteBackends []RemoteWriteBackend) (Outputter, error) {
func (f *StorageRuleBuilder) extractFrameOutputter(config *FrameOutputterConfig, remoteWriteBackends []RemoteWriteBackend) (FrameOutputter, error) {
if config == nil {
return nil, nil
}
missingConfiguration := fmt.Errorf("missing configuration for %s", config.Type)
switch config.Type {
case OutputTypeRedirect:
case FrameOutputTypeRedirect:
if config.RedirectOutputConfig == nil {
return nil, missingConfiguration
}
return NewRedirectOutput(*config.RedirectOutputConfig), nil
case OutputTypeMultiple:
return NewRedirectFrameOutput(*config.RedirectOutputConfig), nil
case FrameOutputTypeMultiple:
if config.MultipleOutputterConfig == nil {
return nil, missingConfiguration
}
var outputters []Outputter
var outputters []FrameOutputter
for _, outConf := range config.MultipleOutputterConfig.Outputters {
out := outConf
outputter, err := f.extractOutputter(&out, remoteWriteBackends)
outputter, err := f.extractFrameOutputter(&out, remoteWriteBackends)
if err != nil {
return nil, err
}
outputters = append(outputters, outputter)
}
return NewMultipleOutput(outputters...), nil
case OutputTypeManagedStream:
return NewManagedStreamOutput(f.ManagedStream), nil
case OutputTypeLocalSubscribers:
return NewLocalSubscribersOutput(f.Node), nil
case OutputTypeConditional:
return NewMultipleFrameOutput(outputters...), nil
case FrameOutputTypeManagedStream:
return NewManagedStreamFrameOutput(f.ManagedStream), nil
case FrameOutputTypeLocalSubscribers:
return NewLocalSubscribersFrameOutput(f.Node), nil
case FrameOutputTypeConditional:
if config.ConditionalOutputConfig == nil {
return nil, missingConfiguration
}
condition, err := f.extractConditionChecker(config.ConditionalOutputConfig.Condition)
condition, err := f.extractFrameConditionChecker(config.ConditionalOutputConfig.Condition)
if err != nil {
return nil, err
}
outputter, err := f.extractOutputter(config.ConditionalOutputConfig.Outputter, remoteWriteBackends)
outputter, err := f.extractFrameOutputter(config.ConditionalOutputConfig.Outputter, remoteWriteBackends)
if err != nil {
return nil, err
}
return NewConditionalOutput(condition, outputter), nil
case OutputTypeThreshold:
case FrameOutputTypeThreshold:
if config.ThresholdOutputConfig == nil {
return nil, missingConfiguration
}
return NewThresholdOutput(f.FrameStorage, *config.ThresholdOutputConfig), nil
case OutputTypeRemoteWrite:
case FrameOutputTypeRemoteWrite:
if config.RemoteWriteOutputConfig == nil {
return nil, missingConfiguration
}
@ -369,17 +389,37 @@ func (f *StorageRuleBuilder) extractOutputter(config *OutputterConfig, remoteWri
if !ok {
return nil, fmt.Errorf("unknown remote write backend uid: %s", config.RemoteWriteOutputConfig.UID)
}
return NewRemoteWriteOutput(*remoteWriteConfig), nil
case OutputTypeChangeLog:
return NewRemoteWriteFrameOutput(*remoteWriteConfig), nil
case FrameOutputTypeChangeLog:
if config.ChangeLogOutputConfig == nil {
return nil, missingConfiguration
}
return NewChangeLogOutput(f.FrameStorage, *config.ChangeLogOutputConfig), nil
return NewChangeLogFrameOutput(f.FrameStorage, *config.ChangeLogOutputConfig), nil
default:
return nil, fmt.Errorf("unknown output type: %s", config.Type)
}
}
func (f *StorageRuleBuilder) extractDataOutputter(config *DataOutputterConfig) (DataOutputter, error) {
if config == nil {
return nil, nil
}
missingConfiguration := fmt.Errorf("missing configuration for %s", config.Type)
switch config.Type {
case DataOutputTypeRedirect:
if config.RedirectDataOutputConfig == nil {
return nil, missingConfiguration
}
return NewRedirectDataOutput(*config.RedirectDataOutputConfig), nil
case DataOutputTypeBuiltin:
return NewBuiltinDataOutput(f.ChannelHandlerGetter), nil
case DataOutputTypeLocalSubscribers:
return NewLocalSubscribersDataOutput(f.Node), nil
default:
return nil, fmt.Errorf("unknown data output type: %s", config.Type)
}
}
func (f *StorageRuleBuilder) getRemoteWriteConfig(uid string, remoteWriteBackends []RemoteWriteBackend) (*RemoteWriteConfig, bool) {
for _, rwb := range remoteWriteBackends {
if rwb.UID == uid {
@ -407,23 +447,62 @@ func (f *StorageRuleBuilder) BuildRules(ctx context.Context, orgID int64) ([]*Li
OrgId: orgID,
Pattern: ruleConfig.Pattern,
}
var err error
rule.Subscriber, err = f.extractSubscriber(ruleConfig.Settings.Subscriber)
if err != nil {
return nil, err
if ruleConfig.Settings.Auth != nil && ruleConfig.Settings.Auth.Subscribe != nil {
rule.SubscribeAuth = NewRoleCheckAuthorizer(ruleConfig.Settings.Auth.Subscribe.RequireRole)
}
if ruleConfig.Settings.Auth != nil && ruleConfig.Settings.Auth.Publish != nil {
rule.PublishAuth = NewRoleCheckAuthorizer(ruleConfig.Settings.Auth.Publish.RequireRole)
}
var err error
rule.Converter, err = f.extractConverter(ruleConfig.Settings.Converter)
if err != nil {
return nil, err
return nil, fmt.Errorf("error building converter for %s: %w", rule.Pattern, err)
}
rule.Processor, err = f.extractProcessor(ruleConfig.Settings.Processor)
if err != nil {
return nil, err
var processors []FrameProcessor
for _, procConfig := range ruleConfig.Settings.FrameProcessors {
proc, err := f.extractFrameProcessor(procConfig)
if err != nil {
return nil, fmt.Errorf("error building processor for %s: %w", rule.Pattern, err)
}
processors = append(processors, proc)
}
rule.Outputter, err = f.extractOutputter(ruleConfig.Settings.Outputter, remoteWriteBackends)
if err != nil {
return nil, err
rule.FrameProcessors = processors
var dataOutputters []DataOutputter
for _, outConfig := range ruleConfig.Settings.DataOutputters {
out, err := f.extractDataOutputter(outConfig)
if err != nil {
return nil, fmt.Errorf("error building data outputter for %s: %w", rule.Pattern, err)
}
dataOutputters = append(dataOutputters, out)
}
rule.DataOutputters = dataOutputters
var outputters []FrameOutputter
for _, outConfig := range ruleConfig.Settings.FrameOutputters {
out, err := f.extractFrameOutputter(outConfig, remoteWriteBackends)
if err != nil {
return nil, fmt.Errorf("error building frame outputter for %s: %w", rule.Pattern, err)
}
outputters = append(outputters, out)
}
rule.FrameOutputters = outputters
var subscribers []Subscriber
for _, subConfig := range ruleConfig.Settings.Subscribers {
sub, err := f.extractSubscriber(subConfig)
if err != nil {
return nil, fmt.Errorf("error building subscriber for %s: %w", rule.Pattern, err)
}
subscribers = append(subscribers, sub)
}
rule.Subscribers = subscribers
rules = append(rules, rule)
}

@ -6,6 +6,7 @@ import (
"github.com/grafana/grafana/pkg/services/live/convert"
)
// AutoInfluxConverterConfig ...
type AutoInfluxConverterConfig struct {
FrameFormat string `json:"frameFormat"`
}
@ -18,6 +19,7 @@ type AutoInfluxConverter struct {
converter *convert.Converter
}
// NewAutoInfluxConverter creates new AutoInfluxConverter.
func NewAutoInfluxConverter(config AutoInfluxConverterConfig) *AutoInfluxConverter {
return &AutoInfluxConverter{config: config, converter: convert.NewConverter()}
}

@ -28,8 +28,8 @@ func (c *AutoJsonConverter) Type() string {
// * Time added automatically
// * Nulls dropped
// To preserve nulls we need FieldTips from a user.
// Custom time can be injected on Processor stage theoretically.
// Custom labels can be injected on Processor stage theoretically.
// Custom time can be injected on FrameProcessor stage theoretically.
// Custom labels can be injected on FrameProcessor stage theoretically.
func (c *AutoJsonConverter) Convert(_ context.Context, vars Vars, body []byte) ([]*ChannelFrame, error) {
nowTimeFunc := c.nowTimeFunc
if nowTimeFunc == nil {

@ -6,9 +6,8 @@ import (
"testing"
"time"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana-plugin-sdk-go/experimental"
"github.com/stretchr/testify/require"
)

@ -0,0 +1,47 @@
package pipeline
import (
"context"
"errors"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/services/live/livecontext"
"github.com/grafana/grafana-plugin-sdk-go/backend"
)
type BuiltinDataOutput struct {
channelHandlerGetter ChannelHandlerGetter
}
const DataOutputTypeBuiltin = "builtin"
func NewBuiltinDataOutput(channelHandlerGetter ChannelHandlerGetter) *BuiltinDataOutput {
return &BuiltinDataOutput{channelHandlerGetter: channelHandlerGetter}
}
func (s *BuiltinDataOutput) Type() string {
return DataOutputTypeBuiltin
}
func (s *BuiltinDataOutput) OutputData(ctx context.Context, vars Vars, data []byte) ([]*ChannelData, error) {
u, ok := livecontext.GetContextSignedUser(ctx)
if !ok {
return nil, errors.New("user not found in context")
}
handler, _, err := s.channelHandlerGetter.GetChannelHandler(u, vars.Channel)
if err != nil {
return nil, err
}
_, status, err := handler.OnPublish(ctx, u, models.PublishEvent{
Channel: vars.Channel,
Data: data,
})
if err != nil {
return nil, err
}
if status != backend.PublishStreamStatusOK {
return nil, errors.New("unauthorized publish")
}
return nil, nil
}

@ -0,0 +1,38 @@
package pipeline
import (
"context"
"fmt"
"github.com/grafana/grafana/pkg/services/live/orgchannel"
"github.com/centrifugal/centrifuge"
)
type LocalSubscribersDataOutput struct {
// TODO: refactor to depend on interface (avoid Centrifuge dependency here).
node *centrifuge.Node
}
func NewLocalSubscribersDataOutput(node *centrifuge.Node) *LocalSubscribersDataOutput {
return &LocalSubscribersDataOutput{node: node}
}
const DataOutputTypeLocalSubscribers = "localSubscribers"
func (out *LocalSubscribersDataOutput) Type() string {
return DataOutputTypeLocalSubscribers
}
func (out *LocalSubscribersDataOutput) OutputData(_ context.Context, vars Vars, data []byte) ([]*ChannelData, error) {
channelID := vars.Channel
channel := orgchannel.PrependOrgID(vars.OrgID, channelID)
pub := &centrifuge.Publication{
Data: data,
}
err := out.node.Hub().BroadcastPublication(channel, pub, centrifuge.StreamPosition{})
if err != nil {
return nil, fmt.Errorf("error publishing %s: %w", string(data), err)
}
return nil, nil
}

@ -0,0 +1,37 @@
package pipeline
import (
"context"
"fmt"
)
// RedirectDataOutputConfig ...
type RedirectDataOutputConfig struct {
Channel string `json:"channel"`
}
// RedirectDataOutput passes processing control to the rule defined
// for a configured channel.
type RedirectDataOutput struct {
config RedirectDataOutputConfig
}
func NewRedirectDataOutput(config RedirectDataOutputConfig) *RedirectDataOutput {
return &RedirectDataOutput{config: config}
}
const DataOutputTypeRedirect = "redirect"
func (out *RedirectDataOutput) Type() string {
return DataOutputTypeRedirect
}
func (out *RedirectDataOutput) OutputData(_ context.Context, vars Vars, data []byte) ([]*ChannelData, error) {
if vars.Channel == out.config.Channel {
return nil, fmt.Errorf("redirect to the same channel: %s", out.config.Channel)
}
return []*ChannelData{{
Channel: out.config.Channel,
Data: data,
}}, nil
}

@ -99,27 +99,31 @@ type DevRuleBuilder struct {
func (f *DevRuleBuilder) BuildRules(_ context.Context, _ int64) ([]*LiveChannelRule, error) {
return []*LiveChannelRule{
{
Pattern: "plugin/testdata/random-20Hz-stream",
Subscriber: NewMultipleSubscriber(
NewBuiltinSubscriber(f.ChannelHandlerGetter),
NewManagedStreamSubscriber(f.ManagedStream),
),
Pattern: "plugin/testdata/random-20Hz-stream",
Converter: NewJsonFrameConverter(JsonFrameConverterConfig{}),
Outputter: NewMultipleOutput(
NewManagedStreamOutput(f.ManagedStream),
NewRemoteWriteOutput(RemoteWriteConfig{
FrameOutputters: []FrameOutputter{
NewManagedStreamFrameOutput(f.ManagedStream),
NewRemoteWriteFrameOutput(RemoteWriteConfig{
Endpoint: os.Getenv("GF_LIVE_REMOTE_WRITE_ENDPOINT"),
User: os.Getenv("GF_LIVE_REMOTE_WRITE_USER"),
Password: os.Getenv("GF_LIVE_REMOTE_WRITE_PASSWORD"),
}),
),
},
Subscribers: []Subscriber{
NewBuiltinSubscriber(f.ChannelHandlerGetter),
NewManagedStreamSubscriber(f.ManagedStream),
},
},
{
Pattern: "stream/testdata/random-20Hz-stream",
Processor: NewKeepFieldsProcessor(KeepFieldsProcessorConfig{
FieldNames: []string{"Time", "Min", "Max"},
}),
Outputter: NewManagedStreamOutput(f.ManagedStream),
FrameProcessors: []FrameProcessor{
NewKeepFieldsFrameProcessor(KeepFieldsFrameProcessorConfig{
FieldNames: []string{"Time", "Min", "Max"},
}),
},
FrameOutputters: []FrameOutputter{
NewManagedStreamFrameOutput(f.ManagedStream),
},
},
{
OrgId: 1,
@ -129,9 +133,11 @@ func (f *DevRuleBuilder) BuildRules(_ context.Context, _ int64) ([]*LiveChannelR
}),
},
{
OrgId: 1,
Pattern: "stream/influx/input/:rest",
Outputter: NewManagedStreamOutput(f.ManagedStream),
OrgId: 1,
Pattern: "stream/influx/input/:rest",
FrameOutputters: []FrameOutputter{
NewManagedStreamFrameOutput(f.ManagedStream),
},
},
{
OrgId: 1,
@ -140,29 +146,31 @@ func (f *DevRuleBuilder) BuildRules(_ context.Context, _ int64) ([]*LiveChannelR
// since there are cases when labels attached to a field, and cases where labels
// set in a first frame column (in Influx converter). For example, this will allow
// to leave only "total-cpu" data while dropping individual CPUs.
Processor: NewKeepFieldsProcessor(KeepFieldsProcessorConfig{
FieldNames: []string{"labels", "time", "usage_user"},
}),
Outputter: NewMultipleOutput(
NewManagedStreamOutput(f.ManagedStream),
FrameProcessors: []FrameProcessor{
NewKeepFieldsFrameProcessor(KeepFieldsFrameProcessorConfig{
FieldNames: []string{"labels", "time", "usage_user"},
}),
},
FrameOutputters: []FrameOutputter{
NewManagedStreamFrameOutput(f.ManagedStream),
NewConditionalOutput(
NewNumberCompareCondition("usage_user", "gte", 50),
NewRedirectOutput(RedirectOutputConfig{
NewFrameNumberCompareCondition("usage_user", "gte", 50),
NewRedirectFrameOutput(RedirectOutputConfig{
Channel: "stream/influx/input/cpu/spikes",
}),
),
),
},
},
{
OrgId: 1,
Pattern: "stream/influx/input/cpu/spikes",
Outputter: NewManagedStreamOutput(f.ManagedStream),
OrgId: 1,
Pattern: "stream/influx/input/cpu/spikes",
FrameOutputters: []FrameOutputter{NewManagedStreamFrameOutput(f.ManagedStream)},
},
{
OrgId: 1,
Pattern: "stream/json/auto",
Converter: NewAutoJsonConverter(AutoJsonConverterConfig{}),
Outputter: NewManagedStreamOutput(f.ManagedStream),
OrgId: 1,
Pattern: "stream/json/auto",
Converter: NewAutoJsonConverter(AutoJsonConverterConfig{}),
FrameOutputters: []FrameOutputter{NewManagedStreamFrameOutput(f.ManagedStream)},
},
{
OrgId: 1,
@ -179,10 +187,14 @@ func (f *DevRuleBuilder) BuildRules(_ context.Context, _ int64) ([]*LiveChannelR
},
},
}),
Processor: NewDropFieldsProcessor(DropFieldsProcessorConfig{
FieldNames: []string{"value2"},
}),
Outputter: NewManagedStreamOutput(f.ManagedStream),
FrameProcessors: []FrameProcessor{
NewDropFieldsFrameProcessor(DropFieldsFrameProcessorConfig{
FieldNames: []string{"value2"},
}),
},
FrameOutputters: []FrameOutputter{
NewManagedStreamFrameOutput(f.ManagedStream),
},
},
{
OrgId: 1,
@ -274,28 +286,28 @@ func (f *DevRuleBuilder) BuildRules(_ context.Context, _ int64) ([]*LiveChannelR
},
},
}),
Outputter: NewMultipleOutput(
NewManagedStreamOutput(f.ManagedStream),
NewRemoteWriteOutput(RemoteWriteConfig{
FrameOutputters: []FrameOutputter{
NewManagedStreamFrameOutput(f.ManagedStream),
NewRemoteWriteFrameOutput(RemoteWriteConfig{
Endpoint: os.Getenv("GF_LIVE_REMOTE_WRITE_ENDPOINT"),
User: os.Getenv("GF_LIVE_REMOTE_WRITE_USER"),
Password: os.Getenv("GF_LIVE_REMOTE_WRITE_PASSWORD"),
}),
NewChangeLogOutput(f.FrameStorage, ChangeLogOutputConfig{
NewChangeLogFrameOutput(f.FrameStorage, ChangeLogOutputConfig{
FieldName: "value3",
Channel: "stream/json/exact/value3/changes",
}),
NewChangeLogOutput(f.FrameStorage, ChangeLogOutputConfig{
NewChangeLogFrameOutput(f.FrameStorage, ChangeLogOutputConfig{
FieldName: "annotation",
Channel: "stream/json/exact/annotation/changes",
}),
NewConditionalOutput(
NewMultipleConditionChecker(
NewMultipleFrameConditionChecker(
ConditionAll,
NewNumberCompareCondition("value1", "gte", 3.0),
NewNumberCompareCondition("value2", "gte", 3.0),
NewFrameNumberCompareCondition("value1", "gte", 3.0),
NewFrameNumberCompareCondition("value2", "gte", 3.0),
),
NewRedirectOutput(RedirectOutputConfig{
NewRedirectFrameOutput(RedirectOutputConfig{
Channel: "stream/json/exact/condition",
}),
),
@ -303,34 +315,40 @@ func (f *DevRuleBuilder) BuildRules(_ context.Context, _ int64) ([]*LiveChannelR
FieldName: "value4",
Channel: "stream/json/exact/value4/state",
}),
),
},
},
{
OrgId: 1,
Pattern: "stream/json/exact/value3/changes",
Outputter: NewMultipleOutput(
NewManagedStreamOutput(f.ManagedStream),
NewRemoteWriteOutput(RemoteWriteConfig{
FrameOutputters: []FrameOutputter{
NewManagedStreamFrameOutput(f.ManagedStream),
NewRemoteWriteFrameOutput(RemoteWriteConfig{
Endpoint: os.Getenv("GF_LIVE_REMOTE_WRITE_ENDPOINT"),
User: os.Getenv("GF_LIVE_REMOTE_WRITE_USER"),
Password: os.Getenv("GF_LIVE_REMOTE_WRITE_PASSWORD"),
}),
),
},
},
{
OrgId: 1,
Pattern: "stream/json/exact/annotation/changes",
Outputter: NewManagedStreamOutput(f.ManagedStream),
OrgId: 1,
Pattern: "stream/json/exact/annotation/changes",
FrameOutputters: []FrameOutputter{
NewManagedStreamFrameOutput(f.ManagedStream),
},
},
{
OrgId: 1,
Pattern: "stream/json/exact/condition",
Outputter: NewManagedStreamOutput(f.ManagedStream),
OrgId: 1,
Pattern: "stream/json/exact/condition",
FrameOutputters: []FrameOutputter{
NewManagedStreamFrameOutput(f.ManagedStream),
},
},
{
OrgId: 1,
Pattern: "stream/json/exact/value4/state",
Outputter: NewManagedStreamOutput(f.ManagedStream),
OrgId: 1,
Pattern: "stream/json/exact/value4/state",
FrameOutputters: []FrameOutputter{
NewManagedStreamFrameOutput(f.ManagedStream),
},
},
}, nil
}

@ -0,0 +1,13 @@
package pipeline
import (
"context"
"github.com/grafana/grafana-plugin-sdk-go/data"
)
// FrameConditionChecker checks conditions in context of data.Frame being processed.
type FrameConditionChecker interface {
Type() string
CheckFrameCondition(ctx context.Context, frame *data.Frame) (bool, error)
}

@ -0,0 +1,43 @@
package pipeline
import (
"context"
"github.com/grafana/grafana-plugin-sdk-go/data"
)
// MultipleFrameConditionChecker can check multiple conditions according to ConditionType.
type MultipleFrameConditionChecker struct {
ConditionType ConditionType
Conditions []FrameConditionChecker
}
const FrameConditionCheckerTypeMultiple = "multiple"
func (c *MultipleFrameConditionChecker) Type() string {
return FrameConditionCheckerTypeMultiple
}
func (c *MultipleFrameConditionChecker) CheckFrameCondition(ctx context.Context, frame *data.Frame) (bool, error) {
for _, cond := range c.Conditions {
ok, err := cond.CheckFrameCondition(ctx, frame)
if err != nil {
return false, err
}
if ok && c.ConditionType == ConditionAny {
return true, nil
}
if !ok && c.ConditionType == ConditionAll {
return false, nil
}
}
if c.ConditionType == ConditionAny {
return false, nil
}
return true, nil
}
// NewMultipleFrameConditionChecker creates new MultipleFrameConditionChecker.
func NewMultipleFrameConditionChecker(conditionType ConditionType, conditions ...FrameConditionChecker) *MultipleFrameConditionChecker {
return &MultipleFrameConditionChecker{ConditionType: conditionType, Conditions: conditions}
}

@ -7,33 +7,20 @@ import (
"github.com/grafana/grafana-plugin-sdk-go/data"
)
// NumberCompareCondition can compare numbers.
type NumberCompareCondition struct {
// FrameNumberCompareCondition can compare numbers.
type FrameNumberCompareCondition struct {
FieldName string
Op NumberCompareOp
Value float64
}
// NumberCompareOp is an comparison operator.
type NumberCompareOp string
const FrameConditionCheckerTypeNumberCompare = "numberCompare"
// Known NumberCompareOp types.
const (
NumberCompareOpLt NumberCompareOp = "lt"
NumberCompareOpGt NumberCompareOp = "gt"
NumberCompareOpLte NumberCompareOp = "lte"
NumberCompareOpGte NumberCompareOp = "gte"
NumberCompareOpEq NumberCompareOp = "eq"
NumberCompareOpNe NumberCompareOp = "ne"
)
const ConditionCheckerTypeNumberCompare = "numberCompare"
func (c *NumberCompareCondition) Type() string {
return ConditionCheckerTypeNumberCompare
func (c *FrameNumberCompareCondition) Type() string {
return FrameConditionCheckerTypeNumberCompare
}
func (c *NumberCompareCondition) CheckCondition(_ context.Context, frame *data.Frame) (bool, error) {
func (c *FrameNumberCompareCondition) CheckFrameCondition(_ context.Context, frame *data.Frame) (bool, error) {
for _, field := range frame.Fields {
// TODO: support other numeric types.
if field.Name == c.FieldName && (field.Type() == data.FieldTypeNullableFloat64) {
@ -65,6 +52,6 @@ func (c *NumberCompareCondition) CheckCondition(_ context.Context, frame *data.F
return false, nil
}
func NewNumberCompareCondition(fieldName string, op NumberCompareOp, value float64) *NumberCompareCondition {
return &NumberCompareCondition{FieldName: fieldName, Op: op, Value: value}
func NewFrameNumberCompareCondition(fieldName string, op NumberCompareOp, value float64) *FrameNumberCompareCondition {
return &FrameNumberCompareCondition{FieldName: fieldName, Op: op, Value: value}
}

@ -13,24 +13,24 @@ type ChangeLogOutputConfig struct {
Channel string `json:"channel"`
}
// ChangeLogOutput can monitor value changes of the specified field and output
// ChangeLogFrameOutput can monitor value changes of the specified field and output
// special change frame to the configured channel.
type ChangeLogOutput struct {
type ChangeLogFrameOutput struct {
frameStorage FrameGetSetter
config ChangeLogOutputConfig
}
func NewChangeLogOutput(frameStorage FrameGetSetter, config ChangeLogOutputConfig) *ChangeLogOutput {
return &ChangeLogOutput{frameStorage: frameStorage, config: config}
func NewChangeLogFrameOutput(frameStorage FrameGetSetter, config ChangeLogOutputConfig) *ChangeLogFrameOutput {
return &ChangeLogFrameOutput{frameStorage: frameStorage, config: config}
}
const OutputTypeChangeLog = "changeLog"
const FrameOutputTypeChangeLog = "changeLog"
func (out *ChangeLogOutput) Type() string {
return OutputTypeChangeLog
func (out *ChangeLogFrameOutput) Type() string {
return FrameOutputTypeChangeLog
}
func (out *ChangeLogOutput) Output(_ context.Context, vars OutputVars, frame *data.Frame) ([]*ChannelFrame, error) {
func (out *ChangeLogFrameOutput) OutputFrame(_ context.Context, vars Vars, frame *data.Frame) ([]*ChannelFrame, error) {
previousFrame, previousFrameOK, err := out.frameStorage.Get(vars.OrgID, out.config.Channel)
if err != nil {
return nil, err

@ -22,7 +22,7 @@ func TestChangeLogOutput_NoPreviousFrame_SingleRow(t *testing.T) {
mockStorage.EXPECT().Set(gomock.Any(), gomock.Any(), gomock.Any()).Times(1)
outputter := NewChangeLogOutput(mockStorage, ChangeLogOutputConfig{
outputter := NewChangeLogFrameOutput(mockStorage, ChangeLogOutputConfig{
FieldName: "test",
Channel: "stream/test/no_previous_frame",
})
@ -35,7 +35,7 @@ func TestChangeLogOutput_NoPreviousFrame_SingleRow(t *testing.T) {
frame := data.NewFrame("test", f1, f2)
channelFrames, err := outputter.Output(context.Background(), OutputVars{}, frame)
channelFrames, err := outputter.OutputFrame(context.Background(), Vars{}, frame)
require.NoError(t, err)
require.Len(t, channelFrames, 1)
@ -59,7 +59,7 @@ func TestChangeLogOutput_NoPreviousFrame_MultipleRows(t *testing.T) {
mockStorage.EXPECT().Set(gomock.Any(), gomock.Any(), gomock.Any()).Times(1)
outputter := NewChangeLogOutput(mockStorage, ChangeLogOutputConfig{
outputter := NewChangeLogFrameOutput(mockStorage, ChangeLogOutputConfig{
FieldName: "test",
Channel: "stream/test/no_previous_frame",
})
@ -74,7 +74,7 @@ func TestChangeLogOutput_NoPreviousFrame_MultipleRows(t *testing.T) {
frame := data.NewFrame("test", f1, f2)
channelFrames, err := outputter.Output(context.Background(), OutputVars{}, frame)
channelFrames, err := outputter.OutputFrame(context.Background(), Vars{}, frame)
require.NoError(t, err)
require.Len(t, channelFrames, 1)
changeFrame := channelFrames[0].Frame

@ -0,0 +1,33 @@
package pipeline
import (
"context"
"github.com/grafana/grafana-plugin-sdk-go/data"
)
type ConditionalOutput struct {
Condition FrameConditionChecker
Outputter FrameOutputter
}
func NewConditionalOutput(condition FrameConditionChecker, outputter FrameOutputter) *ConditionalOutput {
return &ConditionalOutput{Condition: condition, Outputter: outputter}
}
const FrameOutputTypeConditional = "conditional"
func (out *ConditionalOutput) Type() string {
return FrameOutputTypeConditional
}
func (out ConditionalOutput) OutputFrame(ctx context.Context, vars Vars, frame *data.Frame) ([]*ChannelFrame, error) {
ok, err := out.Condition.CheckFrameCondition(ctx, frame)
if err != nil {
return nil, err
}
if !ok {
return nil, nil
}
return out.Outputter.OutputFrame(ctx, vars, frame)
}

@ -11,22 +11,22 @@ import (
"github.com/grafana/grafana-plugin-sdk-go/data"
)
type LocalSubscribersOutput struct {
type LocalSubscribersFrameOutput struct {
// TODO: refactor to depend on interface (avoid Centrifuge dependency here).
node *centrifuge.Node
}
func NewLocalSubscribersOutput(node *centrifuge.Node) *LocalSubscribersOutput {
return &LocalSubscribersOutput{node: node}
func NewLocalSubscribersFrameOutput(node *centrifuge.Node) *LocalSubscribersFrameOutput {
return &LocalSubscribersFrameOutput{node: node}
}
const OutputTypeLocalSubscribers = "localSubscribers"
const FrameOutputTypeLocalSubscribers = "localSubscribers"
func (out *LocalSubscribersOutput) Type() string {
return OutputTypeLocalSubscribers
func (out *LocalSubscribersFrameOutput) Type() string {
return FrameOutputTypeLocalSubscribers
}
func (out *LocalSubscribersOutput) Output(_ context.Context, vars OutputVars, frame *data.Frame) ([]*ChannelFrame, error) {
func (out *LocalSubscribersFrameOutput) OutputFrame(_ context.Context, vars Vars, frame *data.Frame) ([]*ChannelFrame, error) {
channelID := vars.Channel
channel := orgchannel.PrependOrgID(vars.OrgID, channelID)
frameJSON, err := json.Marshal(frame)

@ -0,0 +1,32 @@
package pipeline
import (
"context"
"github.com/grafana/grafana/pkg/services/live/managedstream"
"github.com/grafana/grafana-plugin-sdk-go/data"
)
type ManagedStreamFrameOutput struct {
managedStream *managedstream.Runner
}
func NewManagedStreamFrameOutput(managedStream *managedstream.Runner) *ManagedStreamFrameOutput {
return &ManagedStreamFrameOutput{managedStream: managedStream}
}
const FrameOutputTypeManagedStream = "managedStream"
func (out *ManagedStreamFrameOutput) Type() string {
return FrameOutputTypeManagedStream
}
func (out *ManagedStreamFrameOutput) OutputFrame(_ context.Context, vars Vars, frame *data.Frame) ([]*ChannelFrame, error) {
stream, err := out.managedStream.GetOrCreateStream(vars.OrgID, vars.Scope, vars.Namespace)
if err != nil {
logger.Error("Error getting stream", "error", err)
return nil, err
}
return nil, stream.Push(vars.Path, frame)
}

@ -0,0 +1,36 @@
package pipeline
import (
"context"
"github.com/grafana/grafana-plugin-sdk-go/data"
)
// MultipleFrameOutput can combine several FrameOutputter and
// execute them sequentially.
type MultipleFrameOutput struct {
Outputters []FrameOutputter
}
const FrameOutputTypeMultiple = "multiple"
func (out *MultipleFrameOutput) Type() string {
return FrameOutputTypeMultiple
}
func (out MultipleFrameOutput) OutputFrame(ctx context.Context, vars Vars, frame *data.Frame) ([]*ChannelFrame, error) {
var frames []*ChannelFrame
for _, out := range out.Outputters {
f, err := out.OutputFrame(ctx, vars, frame)
if err != nil {
logger.Error("Error outputting frame", "error", err)
return nil, err
}
frames = append(frames, f...)
}
return frames, nil
}
func NewMultipleFrameOutput(outputters ...FrameOutputter) *MultipleFrameOutput {
return &MultipleFrameOutput{Outputters: outputters}
}

@ -12,23 +12,23 @@ type RedirectOutputConfig struct {
Channel string `json:"channel"`
}
// RedirectOutput passes processing control to the rule defined
// RedirectFrameOutput passes processing control to the rule defined
// for a configured channel.
type RedirectOutput struct {
type RedirectFrameOutput struct {
config RedirectOutputConfig
}
func NewRedirectOutput(config RedirectOutputConfig) *RedirectOutput {
return &RedirectOutput{config: config}
func NewRedirectFrameOutput(config RedirectOutputConfig) *RedirectFrameOutput {
return &RedirectFrameOutput{config: config}
}
const OutputTypeRedirect = "redirect"
const FrameOutputTypeRedirect = "redirect"
func (out *RedirectOutput) Type() string {
return OutputTypeRedirect
func (out *RedirectFrameOutput) Type() string {
return FrameOutputTypeRedirect
}
func (out *RedirectOutput) Output(_ context.Context, vars OutputVars, frame *data.Frame) ([]*ChannelFrame, error) {
func (out *RedirectFrameOutput) OutputFrame(_ context.Context, vars Vars, frame *data.Frame) ([]*ChannelFrame, error) {
if vars.Channel == out.config.Channel {
return nil, fmt.Errorf("redirect to the same channel: %s", out.config.Channel)
}

@ -23,15 +23,15 @@ type RemoteWriteConfig struct {
Password string `json:"password"`
}
type RemoteWriteOutput struct {
type RemoteWriteFrameOutput struct {
mu sync.Mutex
config RemoteWriteConfig
httpClient *http.Client
buffer []prompb.TimeSeries
}
func NewRemoteWriteOutput(config RemoteWriteConfig) *RemoteWriteOutput {
out := &RemoteWriteOutput{
func NewRemoteWriteFrameOutput(config RemoteWriteConfig) *RemoteWriteFrameOutput {
out := &RemoteWriteFrameOutput{
config: config,
httpClient: &http.Client{Timeout: 2 * time.Second},
}
@ -41,13 +41,13 @@ func NewRemoteWriteOutput(config RemoteWriteConfig) *RemoteWriteOutput {
return out
}
const OutputTypeRemoteWrite = "remoteWrite"
const FrameOutputTypeRemoteWrite = "remoteWrite"
func (out *RemoteWriteOutput) Type() string {
return OutputTypeRemoteWrite
func (out *RemoteWriteFrameOutput) Type() string {
return FrameOutputTypeRemoteWrite
}
func (out *RemoteWriteOutput) flushPeriodically() {
func (out *RemoteWriteFrameOutput) flushPeriodically() {
for range time.NewTicker(15 * time.Second).C {
out.mu.Lock()
if len(out.buffer) == 0 {
@ -70,7 +70,7 @@ func (out *RemoteWriteOutput) flushPeriodically() {
}
}
func (out *RemoteWriteOutput) flush(timeSeries []prompb.TimeSeries) error {
func (out *RemoteWriteFrameOutput) flush(timeSeries []prompb.TimeSeries) error {
logger.Debug("Remote write flush", "num time series", len(timeSeries))
remoteWriteData, err := remotewrite.TimeSeriesToBytes(timeSeries)
if err != nil {
@ -100,7 +100,7 @@ func (out *RemoteWriteOutput) flush(timeSeries []prompb.TimeSeries) error {
return nil
}
func (out *RemoteWriteOutput) Output(_ context.Context, _ OutputVars, frame *data.Frame) ([]*ChannelFrame, error) {
func (out *RemoteWriteFrameOutput) OutputFrame(_ context.Context, _ Vars, frame *data.Frame) ([]*ChannelFrame, error) {
if out.config.Endpoint == "" {
logger.Debug("Skip sending to remote write: no url")
return nil, nil

@ -13,7 +13,7 @@ type ThresholdOutputConfig struct {
Channel string `json:"channel"`
}
//go:generate mockgen -destination=output_threshold_mock.go -package=pipeline github.com/grafana/grafana/pkg/services/live/pipeline FrameGetSetter
//go:generate mockgen -destination=frame_output_threshold_mock.go -package=pipeline github.com/grafana/grafana/pkg/services/live/pipeline FrameGetSetter
type FrameGetSetter interface {
Get(orgID int64, channel string) (*data.Frame, bool, error)
@ -31,13 +31,13 @@ func NewThresholdOutput(frameStorage FrameGetSetter, config ThresholdOutputConfi
return &ThresholdOutput{frameStorage: frameStorage, config: config}
}
const OutputTypeThreshold = "threshold"
const FrameOutputTypeThreshold = "threshold"
func (out *ThresholdOutput) Type() string {
return OutputTypeThreshold
return FrameOutputTypeThreshold
}
func (out *ThresholdOutput) Output(_ context.Context, vars OutputVars, frame *data.Frame) ([]*ChannelFrame, error) {
func (out *ThresholdOutput) OutputFrame(_ context.Context, vars Vars, frame *data.Frame) ([]*ChannelFrame, error) {
if frame == nil {
return nil, nil
}

@ -17,7 +17,7 @@ func TestThresholdOutput_Output(t *testing.T) {
}
type args struct {
in0 context.Context
vars OutputVars
vars Vars
frame *data.Frame
}
tests := []struct {
@ -34,7 +34,7 @@ func TestThresholdOutput_Output(t *testing.T) {
Channel: "test",
},
},
args: args{in0: context.Background(), vars: OutputVars{}, frame: nil},
args: args{in0: context.Background(), vars: Vars{}, frame: nil},
wantErr: false,
},
}
@ -44,8 +44,8 @@ func TestThresholdOutput_Output(t *testing.T) {
frameStorage: tt.fields.frameStorage,
config: tt.fields.config,
}
if _, err := l.Output(tt.args.in0, tt.args.vars, tt.args.frame); (err != nil) != tt.wantErr {
t.Errorf("Output() error = %v, wantErr %v", err, tt.wantErr)
if _, err := l.OutputFrame(tt.args.in0, tt.args.vars, tt.args.frame); (err != nil) != tt.wantErr {
t.Errorf("OutputFrame() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
@ -88,7 +88,7 @@ func TestThresholdOutput_NoPreviousFrame_SingleRow(t *testing.T) {
frame := data.NewFrame("test", f1, f2)
channelFrames, err := outputter.Output(context.Background(), OutputVars{}, frame)
channelFrames, err := outputter.OutputFrame(context.Background(), Vars{}, frame)
require.NoError(t, err)
require.Len(t, channelFrames, 1)
@ -139,7 +139,7 @@ func TestThresholdOutput_NoPreviousFrame_MultipleRows(t *testing.T) {
frame := data.NewFrame("test", f1, f2)
channelFrames, err := outputter.Output(context.Background(), OutputVars{}, frame)
channelFrames, err := outputter.OutputFrame(context.Background(), Vars{}, frame)
require.NoError(t, err)
require.Len(t, channelFrames, 1)
@ -198,7 +198,7 @@ func TestThresholdOutput_WithPreviousFrame_SingleRow(t *testing.T) {
frame := data.NewFrame("test", f1, f2)
channelFrames, err := outputter.Output(context.Background(), OutputVars{}, frame)
channelFrames, err := outputter.OutputFrame(context.Background(), Vars{}, frame)
require.NoError(t, err)
require.Len(t, channelFrames, 0)
}
@ -248,7 +248,7 @@ func TestThresholdOutput_WithPreviousFrame_MultipleRows(t *testing.T) {
frame := data.NewFrame("test", f1, f2)
channelFrames, err := outputter.Output(context.Background(), OutputVars{}, frame)
channelFrames, err := outputter.OutputFrame(context.Background(), Vars{}, frame)
require.NoError(t, err)
require.Len(t, channelFrames, 1)
}

@ -0,0 +1,43 @@
package pipeline
import (
"context"
"github.com/grafana/grafana-plugin-sdk-go/data"
)
type DropFieldsFrameProcessorConfig struct {
FieldNames []string `json:"fieldNames"`
}
// DropFieldsFrameProcessor can drop specified fields from a data.Frame.
type DropFieldsFrameProcessor struct {
config DropFieldsFrameProcessorConfig
}
func removeIndex(s []*data.Field, index int) []*data.Field {
return append(s[:index], s[index+1:]...)
}
func NewDropFieldsFrameProcessor(config DropFieldsFrameProcessorConfig) *DropFieldsFrameProcessor {
return &DropFieldsFrameProcessor{config: config}
}
const FrameProcessorTypeDropFields = "dropFields"
func (p *DropFieldsFrameProcessor) Type() string {
return FrameProcessorTypeDropFields
}
func (p *DropFieldsFrameProcessor) ProcessFrame(_ context.Context, _ Vars, frame *data.Frame) (*data.Frame, error) {
for _, f := range p.config.FieldNames {
inner:
for i, field := range frame.Fields {
if f == field.Name {
frame.Fields = removeIndex(frame.Fields, i)
continue inner
}
}
}
return frame, nil
}

@ -0,0 +1,46 @@
package pipeline
import (
"context"
"github.com/grafana/grafana-plugin-sdk-go/data"
)
type KeepFieldsFrameProcessorConfig struct {
FieldNames []string `json:"fieldNames"`
}
// KeepFieldsFrameProcessor can keep specified fields in a data.Frame dropping all other fields.
type KeepFieldsFrameProcessor struct {
config KeepFieldsFrameProcessorConfig
}
func NewKeepFieldsFrameProcessor(config KeepFieldsFrameProcessorConfig) *KeepFieldsFrameProcessor {
return &KeepFieldsFrameProcessor{config: config}
}
func stringInSlice(str string, slice []string) bool {
for _, s := range slice {
if s == str {
return true
}
}
return false
}
const FrameProcessorTypeKeepFields = "keepFields"
func (p *KeepFieldsFrameProcessor) Type() string {
return FrameProcessorTypeKeepFields
}
func (p *KeepFieldsFrameProcessor) ProcessFrame(_ context.Context, _ Vars, frame *data.Frame) (*data.Frame, error) {
var fieldsToKeep []*data.Field
for _, field := range frame.Fields {
if stringInSlice(field.Name, p.config.FieldNames) {
fieldsToKeep = append(fieldsToKeep, field)
}
}
f := data.NewFrame(frame.Name, fieldsToKeep...)
return f, nil
}

@ -0,0 +1,35 @@
package pipeline
import (
"context"
"github.com/grafana/grafana-plugin-sdk-go/data"
)
// MultipleFrameProcessor can combine several FrameProcessor and
// execute them sequentially.
type MultipleFrameProcessor struct {
Processors []FrameProcessor
}
const FrameProcessorTypeMultiple = "multiple"
func (p *MultipleFrameProcessor) Type() string {
return FrameProcessorTypeMultiple
}
func (p *MultipleFrameProcessor) ProcessFrame(ctx context.Context, vars Vars, frame *data.Frame) (*data.Frame, error) {
for _, p := range p.Processors {
var err error
frame, err = p.ProcessFrame(ctx, vars, frame)
if err != nil {
logger.Error("Error processing frame", "error", err)
return nil, err
}
}
return frame, nil
}
func NewMultipleFrameProcessor(processors ...FrameProcessor) *MultipleFrameProcessor {
return &MultipleFrameProcessor{Processors: processors}
}

@ -1,33 +0,0 @@
package pipeline
import (
"context"
"github.com/grafana/grafana-plugin-sdk-go/data"
)
type ConditionalOutput struct {
Condition ConditionChecker
Outputter Outputter
}
func NewConditionalOutput(condition ConditionChecker, outputter Outputter) *ConditionalOutput {
return &ConditionalOutput{Condition: condition, Outputter: outputter}
}
const OutputTypeConditional = "conditional"
func (out *ConditionalOutput) Type() string {
return OutputTypeConditional
}
func (out ConditionalOutput) Output(ctx context.Context, vars OutputVars, frame *data.Frame) ([]*ChannelFrame, error) {
ok, err := out.Condition.CheckCondition(ctx, frame)
if err != nil {
return nil, err
}
if !ok {
return nil, nil
}
return out.Outputter.Output(ctx, vars, frame)
}

@ -1,32 +0,0 @@
package pipeline
import (
"context"
"github.com/grafana/grafana/pkg/services/live/managedstream"
"github.com/grafana/grafana-plugin-sdk-go/data"
)
type ManagedStreamOutput struct {
managedStream *managedstream.Runner
}
func NewManagedStreamOutput(managedStream *managedstream.Runner) *ManagedStreamOutput {
return &ManagedStreamOutput{managedStream: managedStream}
}
const OutputTypeManagedStream = "managedStream"
func (out *ManagedStreamOutput) Type() string {
return OutputTypeManagedStream
}
func (out *ManagedStreamOutput) Output(_ context.Context, vars OutputVars, frame *data.Frame) ([]*ChannelFrame, error) {
stream, err := out.managedStream.GetOrCreateStream(vars.OrgID, vars.Scope, vars.Namespace)
if err != nil {
logger.Error("Error getting stream", "error", err)
return nil, err
}
return nil, stream.Push(vars.Path, frame)
}

@ -1,36 +0,0 @@
package pipeline
import (
"context"
"github.com/grafana/grafana-plugin-sdk-go/data"
)
// MultipleOutput can combine several Outputter and
// execute them sequentially.
type MultipleOutput struct {
Outputters []Outputter
}
const OutputTypeMultiple = "multiple"
func (out *MultipleOutput) Type() string {
return OutputTypeMultiple
}
func (out MultipleOutput) Output(ctx context.Context, vars OutputVars, frame *data.Frame) ([]*ChannelFrame, error) {
var frames []*ChannelFrame
for _, out := range out.Outputters {
f, err := out.Output(ctx, vars, frame)
if err != nil {
logger.Error("Error outputting frame", "error", err)
return nil, err
}
frames = append(frames, f...)
}
return frames, nil
}
func NewMultipleOutput(outputters ...Outputter) *MultipleOutput {
return &MultipleOutput{Outputters: outputters}
}

@ -6,19 +6,63 @@ import (
"fmt"
"os"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana-plugin-sdk-go/live"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/exporters/jaeger"
"go.opentelemetry.io/otel/sdk/resource"
tracesdk "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"go.opentelemetry.io/otel/trace"
)
const (
service = "grafana"
environment = "dev"
id = 1
)
// tracerProvider returns an OpenTelemetry TracerProvider configured to use
// the Jaeger exporter that will send spans to the provided url. The returned
// TracerProvider will also use a Resource configured with all the information
// about the application.
func tracerProvider(url string) (*tracesdk.TracerProvider, error) {
// Create the Jaeger exporter
exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(url)))
if err != nil {
return nil, err
}
tp := tracesdk.NewTracerProvider(
// Always be sure to batch in production.
tracesdk.WithBatcher(exp),
// Record information about this application in an Resource.
tracesdk.WithResource(resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String(service),
attribute.String("environment", environment),
attribute.Int64("ID", id),
)),
)
return tp, nil
}
// ChannelData is a wrapper over raw data with additional channel information.
// Channel is used for rule routing, if the channel is empty then data processing
// stops. If channel is not empty then data processing will be redirected to a
// corresponding channel rule.
type ChannelData struct {
Channel string
Data []byte
}
// ChannelFrame is a wrapper over data.Frame with additional channel information.
// Channel is used for rule routing, if the channel is empty then frame processing
// will try to take current rule Processor and Outputter. If channel is not empty
// will try to take current rule FrameProcessor and FrameOutputter. If channel is not empty
// then frame processing will be redirected to a corresponding channel rule.
// TODO: avoid recursion, increment a counter while frame travels over pipeline steps, make it configurable.
type ChannelFrame struct {
Channel string `json:"channel"`
Frame *data.Frame `json:"frame"`
@ -33,14 +77,10 @@ type Vars struct {
Path string
}
// ProcessorVars has some helpful things Processor entities could use.
type ProcessorVars struct {
Vars
}
// OutputVars has some helpful things Outputter entities could use.
type OutputVars struct {
ProcessorVars
// DataOutputter can output incoming data before conversion to frames.
type DataOutputter interface {
Type() string
OutputData(ctx context.Context, vars Vars, data []byte) ([]*ChannelData, error)
}
// Converter converts raw bytes to slice of ChannelFrame. Each element
@ -51,17 +91,17 @@ type Converter interface {
Convert(ctx context.Context, vars Vars, body []byte) ([]*ChannelFrame, error)
}
// Processor can modify data.Frame in a custom way before it will be outputted.
type Processor interface {
// FrameProcessor can modify data.Frame in a custom way before it will be outputted.
type FrameProcessor interface {
Type() string
Process(ctx context.Context, vars ProcessorVars, frame *data.Frame) (*data.Frame, error)
ProcessFrame(ctx context.Context, vars Vars, frame *data.Frame) (*data.Frame, error)
}
// Outputter outputs data.Frame to a custom destination. Or simply
// FrameOutputter outputs data.Frame to a custom destination. Or simply
// do nothing if some conditions not met.
type Outputter interface {
type FrameOutputter interface {
Type() string
Output(ctx context.Context, vars OutputVars, frame *data.Frame) ([]*ChannelFrame, error)
OutputFrame(ctx context.Context, vars Vars, frame *data.Frame) ([]*ChannelFrame, error)
}
// Subscriber can handle channel subscribe events.
@ -70,15 +110,28 @@ type Subscriber interface {
Subscribe(ctx context.Context, vars Vars) (models.SubscribeReply, backend.SubscribeStreamStatus, error)
}
// LiveChannelRule is an in-memory representation of each specific rule, with Converter, Processor
// and Outputter to be executed by Pipeline.
// PublishAuthChecker checks whether current user can publish to a channel.
type PublishAuthChecker interface {
CanPublish(ctx context.Context, u *models.SignedInUser) (bool, error)
}
// SubscribeAuthChecker checks whether current user can subscribe to a channel.
type SubscribeAuthChecker interface {
CanSubscribe(ctx context.Context, u *models.SignedInUser) (bool, error)
}
// LiveChannelRule is an in-memory representation of each specific rule, with Converter, FrameProcessor
// and FrameOutputter to be executed by Pipeline.
type LiveChannelRule struct {
OrgId int64
Pattern string
Subscriber Subscriber
Converter Converter
Processor Processor
Outputter Outputter
OrgId int64
Pattern string
PublishAuth PublishAuthChecker
SubscribeAuth SubscribeAuthChecker
Subscribers []Subscriber
DataOutputters []DataOutputter
Converter Converter
FrameProcessors []FrameProcessor
FrameOutputters []FrameOutputter
}
// Label ...
@ -107,6 +160,7 @@ type ChannelRuleGetter interface {
// * output resulting frames to various destinations.
type Pipeline struct {
ruleGetter ChannelRuleGetter
tracer trace.Tracer
}
// New creates new Pipeline.
@ -114,9 +168,24 @@ func New(ruleGetter ChannelRuleGetter) (*Pipeline, error) {
p := &Pipeline{
ruleGetter: ruleGetter,
}
if os.Getenv("GF_LIVE_PIPELINE_TRACE") != "" {
// Traces for development only at the moment.
// Start local Jaeger and then run Grafana with GF_LIVE_PIPELINE_TRACE:
// docker run --rm -it --name jaeger -e COLLECTOR_ZIPKIN_HOST_PORT=:9411 -p 5775:5775/udp -p 6831:6831/udp -p 6832:6832/udp -p 5778:5778 -p 16686:16686 -p 14268:14268 -p 14250:14250 -p 9411:9411 jaegertracing/all-in-one:1.26
// Then visit http://localhost:16686/ where Jaeger UI is served.
tp, err := tracerProvider("http://localhost:14268/api/traces")
if err != nil {
return nil, err
}
tracer := tp.Tracer("gf.live.pipeline")
p.tracer = tracer
}
if os.Getenv("GF_LIVE_PIPELINE_DEV") != "" {
go postTestData() // TODO: temporary for development, remove before merge.
}
return p, nil
}
@ -125,6 +194,37 @@ func (p *Pipeline) Get(orgID int64, channel string) (*LiveChannelRule, bool, err
}
func (p *Pipeline) ProcessInput(ctx context.Context, orgID int64, channelID string, body []byte) (bool, error) {
var span trace.Span
if p.tracer != nil {
ctx, span = p.tracer.Start(ctx, "live.pipeline.process_input")
span.SetAttributes(
attribute.Int64("orgId", orgID),
attribute.String("channel", channelID),
attribute.String("body", string(body)),
)
defer span.End()
}
ok, err := p.processInput(ctx, orgID, channelID, body, nil)
if err != nil {
if p.tracer != nil && span != nil {
span.SetStatus(codes.Error, err.Error())
}
return ok, err
}
return ok, err
}
func (p *Pipeline) processInput(ctx context.Context, orgID int64, channelID string, body []byte, visitedChannels map[string]struct{}) (bool, error) {
var span trace.Span
if p.tracer != nil {
ctx, span = p.tracer.Start(ctx, "live.pipeline.process_input_"+channelID)
span.SetAttributes(
attribute.Int64("orgId", orgID),
attribute.String("channel", channelID),
attribute.String("body", string(body)),
)
defer span.End()
}
rule, ok, err := p.ruleGetter.Get(orgID, channelID)
if err != nil {
return false, err
@ -132,13 +232,23 @@ func (p *Pipeline) ProcessInput(ctx context.Context, orgID int64, channelID stri
if !ok {
return false, nil
}
channelFrames, ok, err := p.DataToChannelFrames(ctx, *rule, orgID, channelID, body)
if err != nil {
return false, err
if visitedChannels == nil {
visitedChannels = map[string]struct{}{}
}
if !ok {
if len(rule.DataOutputters) > 0 {
channelDataList := []*ChannelData{{Channel: channelID, Data: body}}
err = p.processChannelDataList(ctx, orgID, channelID, channelDataList, visitedChannels)
if err != nil {
return false, err
}
}
if rule.Converter == nil {
return false, nil
}
channelFrames, err := p.DataToChannelFrames(ctx, *rule, orgID, channelID, body)
if err != nil {
return false, err
}
err = p.processChannelFrames(ctx, orgID, channelID, channelFrames, nil)
if err != nil {
return false, fmt.Errorf("error processing frame: %w", err)
@ -146,15 +256,21 @@ func (p *Pipeline) ProcessInput(ctx context.Context, orgID int64, channelID stri
return true, nil
}
func (p *Pipeline) DataToChannelFrames(ctx context.Context, rule LiveChannelRule, orgID int64, channelID string, body []byte) ([]*ChannelFrame, bool, error) {
if rule.Converter == nil {
return nil, false, nil
func (p *Pipeline) DataToChannelFrames(ctx context.Context, rule LiveChannelRule, orgID int64, channelID string, body []byte) ([]*ChannelFrame, error) {
var span trace.Span
if p.tracer != nil {
ctx, span = p.tracer.Start(ctx, "live.pipeline.convert_"+rule.Converter.Type())
span.SetAttributes(
attribute.Int64("orgId", orgID),
attribute.String("channel", channelID),
)
defer span.End()
}
channel, err := live.ParseChannel(channelID)
if err != nil {
logger.Error("Error parsing channel", "error", err, "channel", channelID)
return nil, false, err
return nil, err
}
vars := Vars{
@ -168,14 +284,40 @@ func (p *Pipeline) DataToChannelFrames(ctx context.Context, rule LiveChannelRule
frames, err := rule.Converter.Convert(ctx, vars, body)
if err != nil {
logger.Error("Error converting data", "error", err)
return nil, false, err
return nil, err
}
return frames, true, nil
return frames, nil
}
var errChannelRecursion = errors.New("channel recursion")
func (p *Pipeline) processChannelDataList(ctx context.Context, orgID int64, channelID string, channelDataList []*ChannelData, visitedChannels map[string]struct{}) error {
for _, channelData := range channelDataList {
var nextChannel = channelID
if channelData.Channel != "" {
nextChannel = channelData.Channel
}
if _, ok := visitedChannels[nextChannel]; ok {
return fmt.Errorf("%w: %s", errChannelRecursion, nextChannel)
}
visitedChannels[nextChannel] = struct{}{}
newChannelDataList, err := p.processData(ctx, orgID, nextChannel, channelData.Data)
if err != nil {
return err
}
if len(newChannelDataList) > 0 {
for _, cd := range newChannelDataList {
_, err := p.processInput(ctx, orgID, cd.Channel, cd.Data, visitedChannels)
if err != nil {
return err
}
}
}
}
return nil
}
func (p *Pipeline) processChannelFrames(ctx context.Context, orgID int64, channelID string, channelFrames []*ChannelFrame, visitedChannels map[string]struct{}) error {
if visitedChannels == nil {
visitedChannels = map[string]struct{}{}
@ -204,6 +346,20 @@ func (p *Pipeline) processChannelFrames(ctx context.Context, orgID int64, channe
}
func (p *Pipeline) processFrame(ctx context.Context, orgID int64, channelID string, frame *data.Frame) ([]*ChannelFrame, error) {
var span trace.Span
if p.tracer != nil {
table, err := frame.StringTable(32, 32)
if err != nil {
return nil, err
}
ctx, span = p.tracer.Start(ctx, "live.pipeline.process_frame_"+channelID)
span.SetAttributes(
attribute.Int64("orgId", orgID),
attribute.String("channel", channelID),
attribute.String("frame", table),
)
defer span.End()
}
rule, ruleOk, err := p.ruleGetter.Get(orgID, channelID)
if err != nil {
logger.Error("Error getting rule", "error", err)
@ -220,39 +376,144 @@ func (p *Pipeline) processFrame(ctx context.Context, orgID int64, channelID stri
return nil, err
}
vars := ProcessorVars{
Vars: Vars{
OrgID: orgID,
Channel: channelID,
Scope: ch.Scope,
Namespace: ch.Namespace,
Path: ch.Path,
},
vars := Vars{
OrgID: orgID,
Channel: channelID,
Scope: ch.Scope,
Namespace: ch.Namespace,
Path: ch.Path,
}
if rule.Processor != nil {
frame, err = rule.Processor.Process(ctx, vars, frame)
if err != nil {
logger.Error("Error processing frame", "error", err)
return nil, err
if len(rule.FrameProcessors) > 0 {
for _, proc := range rule.FrameProcessors {
frame, err = p.execProcessor(ctx, proc, vars, frame)
if err != nil {
logger.Error("Error processing frame", "error", err)
return nil, err
}
if frame == nil {
return nil, nil
}
}
if frame == nil {
return nil, nil
}
if len(rule.FrameOutputters) > 0 {
var resultingFrames []*ChannelFrame
for _, out := range rule.FrameOutputters {
frames, err := p.processFrameOutput(ctx, out, vars, frame)
if err != nil {
logger.Error("Error outputting frame", "error", err)
return nil, err
}
resultingFrames = append(resultingFrames, frames...)
}
return resultingFrames, nil
}
outputVars := OutputVars{
ProcessorVars: vars,
return nil, nil
}
func (p *Pipeline) execProcessor(ctx context.Context, proc FrameProcessor, vars Vars, frame *data.Frame) (*data.Frame, error) {
var span trace.Span
if p.tracer != nil {
ctx, span = p.tracer.Start(ctx, "live.pipeline.apply_processor_"+proc.Type())
table, err := frame.StringTable(32, 32)
if err != nil {
return nil, err
}
span.SetAttributes(
attribute.Int64("orgId", vars.OrgID),
attribute.String("channel", vars.Channel),
attribute.String("frame", table),
attribute.String("processor", proc.Type()),
)
// Note: we can also visualize resulting frame here.
defer span.End()
}
return proc.ProcessFrame(ctx, vars, frame)
}
if rule.Outputter != nil {
frames, err := rule.Outputter.Output(ctx, outputVars, frame)
func (p *Pipeline) processFrameOutput(ctx context.Context, out FrameOutputter, vars Vars, frame *data.Frame) ([]*ChannelFrame, error) {
var span trace.Span
if p.tracer != nil {
ctx, span = p.tracer.Start(ctx, "live.pipeline.frame_output_"+out.Type())
table, err := frame.StringTable(32, 32)
if err != nil {
logger.Error("Error outputting frame", "error", err)
return nil, err
}
return frames, nil
span.SetAttributes(
attribute.Int64("orgId", vars.OrgID),
attribute.String("channel", vars.Channel),
attribute.String("frame", table),
attribute.String("output", out.Type()),
)
defer span.End()
}
return out.OutputFrame(ctx, vars, frame)
}
func (p *Pipeline) processData(ctx context.Context, orgID int64, channelID string, data []byte) ([]*ChannelData, error) {
var span trace.Span
if p.tracer != nil {
ctx, span = p.tracer.Start(ctx, "live.pipeline.process_data_"+channelID)
span.SetAttributes(
attribute.Int64("orgId", orgID),
attribute.String("channel", channelID),
attribute.String("data", string(data)),
)
defer span.End()
}
rule, ruleOk, err := p.ruleGetter.Get(orgID, channelID)
if err != nil {
logger.Error("Error getting rule", "error", err)
return nil, err
}
if !ruleOk {
logger.Debug("Rule not found", "channel", channelID)
return nil, err
}
ch, err := live.ParseChannel(channelID)
if err != nil {
logger.Error("Error parsing channel", "error", err, "channel", channelID)
return nil, err
}
vars := Vars{
OrgID: orgID,
Channel: channelID,
Scope: ch.Scope,
Namespace: ch.Namespace,
Path: ch.Path,
}
if len(rule.DataOutputters) > 0 {
var resultingChannelDataList []*ChannelData
for _, out := range rule.DataOutputters {
channelDataList, err := p.processDataOutput(ctx, out, vars, data)
if err != nil {
logger.Error("Error outputting frame", "error", err)
return nil, err
}
resultingChannelDataList = append(resultingChannelDataList, channelDataList...)
}
return resultingChannelDataList, nil
}
return nil, nil
}
func (p *Pipeline) processDataOutput(ctx context.Context, out DataOutputter, vars Vars, data []byte) ([]*ChannelData, error) {
var span trace.Span
if p.tracer != nil {
ctx, span = p.tracer.Start(ctx, "live.pipeline.data_output_"+out.Type())
span.SetAttributes(
attribute.Int64("orgId", vars.OrgID),
attribute.String("channel", vars.Channel),
attribute.String("data", string(data)),
attribute.String("output", out.Type()),
)
defer span.End()
}
return out.OutputData(ctx, vars, data)
}

@ -62,7 +62,7 @@ func (t *testProcessor) Type() string {
return "test"
}
func (t *testProcessor) Process(_ context.Context, _ ProcessorVars, frame *data.Frame) (*data.Frame, error) {
func (t *testProcessor) ProcessFrame(_ context.Context, _ Vars, frame *data.Frame) (*data.Frame, error) {
return frame, nil
}
@ -75,7 +75,7 @@ func (t *testOutputter) Type() string {
return "test"
}
func (t *testOutputter) Output(_ context.Context, _ OutputVars, frame *data.Frame) ([]*ChannelFrame, error) {
func (t *testOutputter) OutputFrame(_ context.Context, _ Vars, frame *data.Frame) ([]*ChannelFrame, error) {
if t.err != nil {
return nil, t.err
}
@ -88,9 +88,9 @@ func TestPipeline(t *testing.T) {
p, err := New(&testRuleGetter{
rules: map[string]*LiveChannelRule{
"stream/test/xxx": {
Converter: &testConverter{"", data.NewFrame("test")},
Processor: &testProcessor{},
Outputter: outputter,
Converter: &testConverter{"", data.NewFrame("test")},
FrameProcessors: []FrameProcessor{&testProcessor{}},
FrameOutputters: []FrameOutputter{outputter},
},
},
})
@ -107,9 +107,9 @@ func TestPipeline_OutputError(t *testing.T) {
p, err := New(&testRuleGetter{
rules: map[string]*LiveChannelRule{
"stream/test/xxx": {
Converter: &testConverter{"", data.NewFrame("test")},
Processor: &testProcessor{},
Outputter: outputter,
Converter: &testConverter{"", data.NewFrame("test")},
FrameProcessors: []FrameProcessor{&testProcessor{}},
FrameOutputters: []FrameOutputter{outputter},
},
},
})
@ -123,15 +123,19 @@ func TestPipeline_Recursion(t *testing.T) {
rules: map[string]*LiveChannelRule{
"stream/test/xxx": {
Converter: &testConverter{"", data.NewFrame("test")},
Outputter: NewRedirectOutput(RedirectOutputConfig{
Channel: "stream/test/yyy",
}),
FrameOutputters: []FrameOutputter{
NewRedirectFrameOutput(RedirectOutputConfig{
Channel: "stream/test/yyy",
}),
},
},
"stream/test/yyy": {
Converter: &testConverter{"", data.NewFrame("test")},
Outputter: NewRedirectOutput(RedirectOutputConfig{
Channel: "stream/test/xxx",
}),
FrameOutputters: []FrameOutputter{
NewRedirectFrameOutput(RedirectOutputConfig{
Channel: "stream/test/xxx",
}),
},
},
},
})

@ -1,43 +0,0 @@
package pipeline
import (
"context"
"github.com/grafana/grafana-plugin-sdk-go/data"
)
type DropFieldsProcessorConfig struct {
FieldNames []string `json:"fieldNames"`
}
// DropFieldsProcessor can drop specified fields from a data.Frame.
type DropFieldsProcessor struct {
config DropFieldsProcessorConfig
}
func removeIndex(s []*data.Field, index int) []*data.Field {
return append(s[:index], s[index+1:]...)
}
func NewDropFieldsProcessor(config DropFieldsProcessorConfig) *DropFieldsProcessor {
return &DropFieldsProcessor{config: config}
}
const ProcessorTypeDropFields = "dropFields"
func (p *DropFieldsProcessor) Type() string {
return ProcessorTypeDropFields
}
func (p *DropFieldsProcessor) Process(_ context.Context, _ ProcessorVars, frame *data.Frame) (*data.Frame, error) {
for _, f := range p.config.FieldNames {
inner:
for i, field := range frame.Fields {
if f == field.Name {
frame.Fields = removeIndex(frame.Fields, i)
continue inner
}
}
}
return frame, nil
}

@ -1,46 +0,0 @@
package pipeline
import (
"context"
"github.com/grafana/grafana-plugin-sdk-go/data"
)
type KeepFieldsProcessorConfig struct {
FieldNames []string `json:"fieldNames"`
}
// KeepFieldsProcessor can keep specified fields in a data.Frame dropping all other fields.
type KeepFieldsProcessor struct {
config KeepFieldsProcessorConfig
}
func NewKeepFieldsProcessor(config KeepFieldsProcessorConfig) *KeepFieldsProcessor {
return &KeepFieldsProcessor{config: config}
}
func stringInSlice(str string, slice []string) bool {
for _, s := range slice {
if s == str {
return true
}
}
return false
}
const ProcessorTypeKeepFields = "keepFields"
func (p *KeepFieldsProcessor) Type() string {
return ProcessorTypeKeepFields
}
func (p *KeepFieldsProcessor) Process(_ context.Context, _ ProcessorVars, frame *data.Frame) (*data.Frame, error) {
var fieldsToKeep []*data.Field
for _, field := range frame.Fields {
if stringInSlice(field.Name, p.config.FieldNames) {
fieldsToKeep = append(fieldsToKeep, field)
}
}
f := data.NewFrame(frame.Name, fieldsToKeep...)
return f, nil
}

@ -1,35 +0,0 @@
package pipeline
import (
"context"
"github.com/grafana/grafana-plugin-sdk-go/data"
)
// MultipleProcessor can combine several Processor and
// execute them sequentially.
type MultipleProcessor struct {
Processors []Processor
}
const ProcessorTypeMultiple = "multiple"
func (p *MultipleProcessor) Type() string {
return ProcessorTypeMultiple
}
func (p *MultipleProcessor) Process(ctx context.Context, vars ProcessorVars, frame *data.Frame) (*data.Frame, error) {
for _, p := range p.Processors {
var err error
frame, err = p.Process(ctx, vars, frame)
if err != nil {
logger.Error("Error processing frame", "error", err)
return nil, err
}
}
return frame, nil
}
func NewMultipleProcessor(processors ...Processor) *MultipleProcessor {
return &MultipleProcessor{Processors: processors}
}

@ -15,52 +15,45 @@ var SubscribersRegistry = []EntityInfo{
Type: SubscriberTypeManagedStream,
Description: "apply managed stream subscribe logic",
},
{
Type: SubscriberTypeMultiple,
Description: "apply multiple subscribers",
},
{
Type: SubscriberTypeAuthorizeRole,
Description: "authorize user role",
},
}
var OutputsRegistry = []EntityInfo{
var FrameOutputsRegistry = []EntityInfo{
{
Type: OutputTypeManagedStream,
Description: "Only send schema when structure changes. Note this also requires a matching subscriber",
Type: FrameOutputTypeManagedStream,
Description: "only send schema when structure changes (note this also requires a matching subscriber)",
Example: ManagedStreamOutputConfig{},
},
{
Type: OutputTypeMultiple,
Description: "Send the output to multiple destinations",
Example: MultipleOutputterConfig{},
},
{
Type: OutputTypeConditional,
Type: FrameOutputTypeConditional,
Description: "send to an output depending on frame values",
Example: ConditionalOutputConfig{},
},
{
Type: OutputTypeRedirect,
Type: FrameOutputTypeRedirect,
Description: "redirect for processing by another channel rule",
},
{
Type: OutputTypeThreshold,
Type: FrameOutputTypeThreshold,
Description: "output field threshold boundaries cross into new channel",
},
{
Type: OutputTypeChangeLog,
Type: FrameOutputTypeChangeLog,
Description: "output field changes into new channel",
},
{
Type: OutputTypeRemoteWrite,
Type: FrameOutputTypeRemoteWrite,
Description: "output to remote write endpoint",
},
}
var ConvertersRegistry = []EntityInfo{
{
Type: ConverterTypeJsonAuto,
Type: ConverterTypeJsonAuto,
Description: "automatic recursive JSON to Frame conversion",
},
{
Type: ConverterTypeJsonExact,
Type: ConverterTypeJsonExact,
Description: "JSON to Frame conversion according to exact list of fields",
},
{
Type: ConverterTypeInfluxAuto,
@ -68,24 +61,31 @@ var ConvertersRegistry = []EntityInfo{
Example: AutoInfluxConverterConfig{},
},
{
Type: ConverterTypeJsonFrame,
Type: ConverterTypeJsonFrame,
Description: "JSON-encoded Grafana data frame",
},
}
var ProcessorsRegistry = []EntityInfo{
var FrameProcessorsRegistry = []EntityInfo{
{
Type: ProcessorTypeKeepFields,
Type: FrameProcessorTypeKeepFields,
Description: "list the fields that should stay",
Example: KeepFieldsProcessorConfig{},
Example: KeepFieldsFrameProcessorConfig{},
},
{
Type: ProcessorTypeDropFields,
Type: FrameProcessorTypeDropFields,
Description: "list the fields that should be removed",
Example: DropFieldsProcessorConfig{},
Example: DropFieldsFrameProcessorConfig{},
},
}
var DataOutputsRegistry = []EntityInfo{
{
Type: DataOutputTypeBuiltin,
Description: "use builtin publish handler",
},
{
Type: ProcessorTypeMultiple,
Description: "apply multiple processors",
Example: MultipleProcessorConfig{},
Type: DataOutputTypeRedirect,
Description: "redirect data processing to another channel rule",
},
}

@ -2,6 +2,7 @@ package pipeline
import (
"context"
"fmt"
"sync"
"time"
@ -65,7 +66,7 @@ func (s *CacheSegmentedTree) Get(orgID int64, channel string) (*LiveChannelRule,
if !ok {
err := s.fillOrg(orgID)
if err != nil {
return nil, false, err
return nil, false, fmt.Errorf("error filling org: %w", err)
}
}
s.radixMu.RLock()

@ -21,9 +21,11 @@ func (t *testBuilder) BuildRules(_ context.Context, _ int64) ([]*LiveChannelRule
Pattern: "stream/telegraf/:metric",
},
{
OrgId: 1,
Pattern: "stream/telegraf/:metric/:extra",
Outputter: NewRedirectOutput(RedirectOutputConfig{}),
OrgId: 1,
Pattern: "stream/telegraf/:metric/:extra",
FrameOutputters: []FrameOutputter{
NewRedirectFrameOutput(RedirectOutputConfig{}),
},
},
{
OrgId: 1,
@ -48,7 +50,7 @@ func TestStorage_Get(t *testing.T) {
rule, ok, err = s.Get(1, "stream/telegraf/mem/rss")
require.NoError(t, err)
require.True(t, ok)
require.Equal(t, OutputTypeRedirect, rule.Outputter.Type())
require.Equal(t, FrameOutputTypeRedirect, rule.FrameOutputters[0].Type())
rule, ok, err = s.Get(1, "stream/booms")
require.NoError(t, err)

@ -1,39 +0,0 @@
package pipeline
import (
"context"
"github.com/grafana/grafana/pkg/services/live/livecontext"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana/pkg/models"
)
type AuthorizeRoleSubscriberConfig struct {
Role models.RoleType `json:"role,omitempty"`
}
type AuthorizeRoleSubscriber struct {
config AuthorizeRoleSubscriberConfig
}
func NewAuthorizeRoleSubscriber(config AuthorizeRoleSubscriberConfig) *AuthorizeRoleSubscriber {
return &AuthorizeRoleSubscriber{config: config}
}
const SubscriberTypeAuthorizeRole = "authorizeRole"
func (s *AuthorizeRoleSubscriber) Type() string {
return SubscriberTypeAuthorizeRole
}
func (s *AuthorizeRoleSubscriber) Subscribe(ctx context.Context, _ Vars) (models.SubscribeReply, backend.SubscribeStreamStatus, error) {
u, ok := livecontext.GetContextSignedUser(ctx)
if !ok {
return models.SubscribeReply{}, backend.SubscribeStreamStatusPermissionDenied, nil
}
if u.HasRole(s.config.Role) {
return models.SubscribeReply{}, backend.SubscribeStreamStatusOK, nil
}
return models.SubscribeReply{}, backend.SubscribeStreamStatusPermissionDenied, nil
}

@ -1,34 +0,0 @@
package pipeline
import (
"context"
"testing"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/services/live/livecontext"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/stretchr/testify/require"
)
func TestAuthorizeRoleSubscriber_Subscribe_PermissionDenied(t *testing.T) {
ctx := context.Background()
ctx = livecontext.SetContextSignedUser(ctx, &models.SignedInUser{OrgRole: models.ROLE_EDITOR})
s := NewAuthorizeRoleSubscriber(AuthorizeRoleSubscriberConfig{
Role: models.ROLE_ADMIN,
})
_, status, err := s.Subscribe(ctx, Vars{})
require.NoError(t, err)
require.Equal(t, backend.SubscribeStreamStatusPermissionDenied, status)
}
func TestAuthorizeRoleSubscriber_Subscribe_OK(t *testing.T) {
ctx := context.Background()
ctx = livecontext.SetContextSignedUser(ctx, &models.SignedInUser{OrgRole: models.ROLE_ADMIN})
s := NewAuthorizeRoleSubscriber(AuthorizeRoleSubscriberConfig{
Role: models.ROLE_ADMIN,
})
_, status, err := s.Subscribe(ctx, Vars{})
require.NoError(t, err)
require.Equal(t, backend.SubscribeStreamStatusOK, status)
}
Loading…
Cancel
Save