From 29f3e175b68c3a9873c551e74cf7ac1f5ec8081e Mon Sep 17 00:00:00 2001 From: Alexander Emelin Date: Thu, 30 Sep 2021 20:28:06 +0300 Subject: [PATCH] Live: test pipeline convert endpoint (#39480) --- pkg/api/api.go | 1 + pkg/services/live/live.go | 79 ++++++++++++++++++++++++++ pkg/services/live/pipeline/pipeline.go | 9 ++- 3 files changed, 84 insertions(+), 5 deletions(-) diff --git a/pkg/api/api.go b/pkg/api/api.go index 26329adf021..ec741a33f32 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -438,6 +438,7 @@ func (hs *HTTPServer) registerRoutes() { // POST Live data to be processed according to channel rules. liveRoute.Post("/push/:streamId/:path", hs.LivePushGateway.HandlePath) liveRoute.Get("/channel-rules", routing.Wrap(hs.Live.HandleChannelRulesListHTTP), reqOrgAdmin) + liveRoute.Post("/pipeline-convert-test", routing.Wrap(hs.Live.HandlePipelineConvertTestHTTP), reqOrgAdmin) liveRoute.Post("/channel-rules", routing.Wrap(hs.Live.HandleChannelRulesPostHTTP), reqOrgAdmin) liveRoute.Put("/channel-rules", routing.Wrap(hs.Live.HandleChannelRulesPutHTTP), reqOrgAdmin) liveRoute.Delete("/channel-rules", routing.Wrap(hs.Live.HandleChannelRulesDeleteHTTP), reqOrgAdmin) diff --git a/pkg/services/live/live.go b/pkg/services/live/live.go index db84e87a372..aff5ea83753 100644 --- a/pkg/services/live/live.go +++ b/pkg/services/live/live.go @@ -885,6 +885,85 @@ func (g *GrafanaLive) HandleChannelRulesListHTTP(c *models.ReqContext) response. }) } +type ConvertDryRunRequest struct { + ChannelRules []pipeline.ChannelRule `json:"channelRules"` + Channel string `json:"channel"` + Data string `json:"data"` +} + +type ConvertDryRunResponse struct { + ChannelFrames []*pipeline.ChannelFrame `json:"channelFrames"` +} + +type DryRunRuleStorage struct { + ChannelRules []pipeline.ChannelRule +} + +func (s *DryRunRuleStorage) CreateChannelRule(_ context.Context, _ int64, _ pipeline.ChannelRule) (pipeline.ChannelRule, error) { + return pipeline.ChannelRule{}, errors.New("not implemented by dry run rule storage") +} + +func (s *DryRunRuleStorage) UpdateChannelRule(_ context.Context, _ int64, _ pipeline.ChannelRule) (pipeline.ChannelRule, error) { + return pipeline.ChannelRule{}, errors.New("not implemented by dry run rule storage") +} + +func (s *DryRunRuleStorage) DeleteChannelRule(_ context.Context, _ int64, _ string) error { + return errors.New("not implemented by dry run rule storage") +} + +func (s *DryRunRuleStorage) ListRemoteWriteBackends(_ context.Context, _ int64) ([]pipeline.RemoteWriteBackend, error) { + return nil, nil +} + +func (s *DryRunRuleStorage) ListChannelRules(_ context.Context, _ int64) ([]pipeline.ChannelRule, error) { + return s.ChannelRules, nil +} + +// HandlePipelineConvertTestHTTP ... +func (g *GrafanaLive) HandlePipelineConvertTestHTTP(c *models.ReqContext) response.Response { + body, err := ioutil.ReadAll(c.Req.Body) + if err != nil { + return response.Error(http.StatusInternalServerError, "Error reading body", err) + } + var req ConvertDryRunRequest + err = json.Unmarshal(body, &req) + if err != nil { + return response.Error(http.StatusBadRequest, "Error decoding request", err) + } + storage := &DryRunRuleStorage{ + ChannelRules: req.ChannelRules, + } + builder := &pipeline.StorageRuleBuilder{ + Node: g.node, + ManagedStream: g.ManagedStreamRunner, + FrameStorage: pipeline.NewFrameStorage(), + RuleStorage: storage, + ChannelHandlerGetter: g, + } + channelRuleGetter := pipeline.NewCacheSegmentedTree(builder) + pipe, err := pipeline.New(channelRuleGetter) + if err != nil { + return response.Error(http.StatusInternalServerError, "Error creating pipeline", err) + } + rule, ok, err := channelRuleGetter.Get(c.OrgId, req.Channel) + if err != nil { + return response.Error(http.StatusInternalServerError, "Error getting channel rule", err) + } + if !ok { + return response.Error(http.StatusNotFound, "No rule found", nil) + } + channelFrames, ok, err := pipe.DataToChannelFrames(c.Req.Context(), *rule, c.OrgId, req.Channel, []byte(req.Data)) + if err != nil { + return response.Error(http.StatusInternalServerError, "Error converting data", err) + } + if !ok { + return response.Error(http.StatusNotFound, "No converter found", nil) + } + return response.JSON(http.StatusOK, ConvertDryRunResponse{ + ChannelFrames: channelFrames, + }) +} + // HandleChannelRulesPostHTTP ... func (g *GrafanaLive) HandleChannelRulesPostHTTP(c *models.ReqContext) response.Response { body, err := ioutil.ReadAll(c.Req.Body) diff --git a/pkg/services/live/pipeline/pipeline.go b/pkg/services/live/pipeline/pipeline.go index 28d75ce98f7..ae881e8691a 100644 --- a/pkg/services/live/pipeline/pipeline.go +++ b/pkg/services/live/pipeline/pipeline.go @@ -20,8 +20,8 @@ import ( // then frame processing will be redirected to a corresponding channel rule. // TODO: avoid recursion, increment a counter while frame travels over pipeline steps, make it configurable. type ChannelFrame struct { - Channel string - Frame *data.Frame + Channel string `json:"channel"` + Frame *data.Frame `json:"frame"` } // Vars has some helpful things pipeline entities could use. @@ -111,7 +111,6 @@ type Pipeline struct { // New creates new Pipeline. func New(ruleGetter ChannelRuleGetter) (*Pipeline, error) { - logger.Info("Live pipeline initialization") p := &Pipeline{ ruleGetter: ruleGetter, } @@ -133,7 +132,7 @@ func (p *Pipeline) ProcessInput(ctx context.Context, orgID int64, channelID stri if !ok { return false, nil } - channelFrames, ok, err := p.dataToChannelFrames(ctx, *rule, orgID, channelID, body) + channelFrames, ok, err := p.DataToChannelFrames(ctx, *rule, orgID, channelID, body) if err != nil { return false, err } @@ -147,7 +146,7 @@ func (p *Pipeline) ProcessInput(ctx context.Context, orgID int64, channelID stri return true, nil } -func (p *Pipeline) dataToChannelFrames(ctx context.Context, rule LiveChannelRule, orgID int64, channelID string, body []byte) ([]*ChannelFrame, bool, error) { +func (p *Pipeline) DataToChannelFrames(ctx context.Context, rule LiveChannelRule, orgID int64, channelID string, body []byte) ([]*ChannelFrame, bool, error) { if rule.Converter == nil { return nil, false, nil }