diff --git a/pkg/api/api.go b/pkg/api/api.go index 64623b7f446..5e332290bb1 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -435,13 +435,16 @@ func (hs *HTTPServer) registerRoutes() { if hs.Cfg.FeatureToggles["live-pipeline"] { // POST Live data to be processed according to channel rules. liveRoute.Post("/push/:streamId/:path", hs.LivePushGateway.HandlePath) - liveRoute.Get("/channel-rules", routing.Wrap(hs.Live.HandleChannelRulesListHTTP), reqOrgAdmin) liveRoute.Post("/pipeline-convert-test", routing.Wrap(hs.Live.HandlePipelineConvertTestHTTP), reqOrgAdmin) + liveRoute.Get("/pipeline-entities", routing.Wrap(hs.Live.HandlePipelineEntitiesListHTTP), reqOrgAdmin) + liveRoute.Get("/channel-rules", routing.Wrap(hs.Live.HandleChannelRulesListHTTP), reqOrgAdmin) liveRoute.Post("/channel-rules", routing.Wrap(hs.Live.HandleChannelRulesPostHTTP), reqOrgAdmin) liveRoute.Put("/channel-rules", routing.Wrap(hs.Live.HandleChannelRulesPutHTTP), reqOrgAdmin) liveRoute.Delete("/channel-rules", routing.Wrap(hs.Live.HandleChannelRulesDeleteHTTP), reqOrgAdmin) - liveRoute.Get("/pipeline-entities", routing.Wrap(hs.Live.HandlePipelineEntitiesListHTTP), reqOrgAdmin) liveRoute.Get("/remote-write-backends", routing.Wrap(hs.Live.HandleRemoteWriteBackendsListHTTP), reqOrgAdmin) + liveRoute.Post("/remote-write-backends", routing.Wrap(hs.Live.HandleRemoteWriteBackendsPostHTTP), reqOrgAdmin) + liveRoute.Put("/remote-write-backends", routing.Wrap(hs.Live.HandleRemoteWriteBackendsPutHTTP), reqOrgAdmin) + liveRoute.Delete("/remote-write-backends", routing.Wrap(hs.Live.HandleRemoteWriteBackendsDeleteHTTP), reqOrgAdmin) } }) diff --git a/pkg/api/dashboard_test.go b/pkg/api/dashboard_test.go index 4b3000eac1e..f87105abf90 100644 --- a/pkg/api/dashboard_test.go +++ b/pkg/api/dashboard_test.go @@ -89,7 +89,7 @@ type testState struct { func newTestLive(t *testing.T) *live.GrafanaLive { cfg := &setting.Cfg{AppURL: "http://localhost:3000/"} - gLive, err := live.ProvideService(nil, cfg, routing.NewRouteRegister(), nil, nil, nil, nil, sqlstore.InitTestDB(t), &usagestats.UsageStatsMock{T: t}) + gLive, err := live.ProvideService(nil, cfg, routing.NewRouteRegister(), nil, nil, nil, nil, sqlstore.InitTestDB(t), nil, &usagestats.UsageStatsMock{T: t}) require.NoError(t, err) return gLive } diff --git a/pkg/services/live/live.go b/pkg/services/live/live.go index d820f41b71d..6fd5bec8c32 100644 --- a/pkg/services/live/live.go +++ b/pkg/services/live/live.go @@ -13,6 +13,8 @@ import ( "sync" "time" + "github.com/grafana/grafana/pkg/services/encryption" + "github.com/centrifugal/centrifuge" "github.com/go-redis/redis/v8" "github.com/gobwas/glob" @@ -62,7 +64,7 @@ type CoreGrafanaScope struct { func ProvideService(plugCtxProvider *plugincontext.Provider, cfg *setting.Cfg, routeRegister routing.RouteRegister, logsService *cloudwatch.LogsService, pluginStore plugins.Store, cacheService *localcache.CacheService, - dataSourceCache datasources.CacheService, sqlStore *sqlstore.SQLStore, + dataSourceCache datasources.CacheService, sqlStore *sqlstore.SQLStore, encService encryption.Service, usageStatsService usagestats.Service) (*GrafanaLive, error) { g := &GrafanaLive{ Cfg: cfg, @@ -73,6 +75,7 @@ func ProvideService(plugCtxProvider *plugincontext.Provider, cfg *setting.Cfg, r CacheService: cacheService, DataSourceCache: dataSourceCache, SQLStore: sqlStore, + EncryptionService: encService, channels: make(map[string]models.ChannelHandler), GrafanaScope: CoreGrafanaScope{ Features: make(map[string]models.ChannelHandlerFactory), @@ -180,15 +183,17 @@ func ProvideService(plugCtxProvider *plugincontext.Provider, cfg *setting.Cfg, r } } else { storage := &pipeline.FileStorage{ - DataPath: cfg.DataPath, + DataPath: cfg.DataPath, + EncryptionService: g.EncryptionService, } - g.channelRuleStorage = storage + g.pipelineStorage = storage builder = &pipeline.StorageRuleBuilder{ Node: node, ManagedStream: g.ManagedStreamRunner, FrameStorage: pipeline.NewFrameStorage(), - RuleStorage: storage, + Storage: storage, ChannelHandlerGetter: g, + EncryptionService: g.EncryptionService, } } channelRuleGetter := pipeline.NewCacheSegmentedTree(builder) @@ -364,6 +369,7 @@ type GrafanaLive struct { CacheService *localcache.CacheService DataSourceCache datasources.CacheService SQLStore *sqlstore.SQLStore + EncryptionService encryption.Service pluginStore plugins.Store node *centrifuge.Node @@ -382,7 +388,7 @@ type GrafanaLive struct { ManagedStreamRunner *managedstream.Runner Pipeline *pipeline.Pipeline - channelRuleStorage pipeline.RuleStorage + pipelineStorage pipeline.Storage contextGetter *liveplugin.ContextGetter runStreamManager *runstream.Manager @@ -430,10 +436,6 @@ func (g *GrafanaLive) Run(ctx context.Context) error { return eGroup.Wait() } -func (g *GrafanaLive) ChannelRuleStorage() pipeline.RuleStorage { - return g.channelRuleStorage -} - func getCheckOriginFunc(appURL *url.URL, originPatterns []string, originGlobs []glob.Glob) func(r *http.Request) bool { return func(r *http.Request) bool { origin := r.Header.Get("Origin") @@ -980,7 +982,7 @@ func (g *GrafanaLive) HandleInfoHTTP(ctx *models.ReqContext) response.Response { // HandleChannelRulesListHTTP ... func (g *GrafanaLive) HandleChannelRulesListHTTP(c *models.ReqContext) response.Response { - result, err := g.channelRuleStorage.ListChannelRules(c.Req.Context(), c.OrgId) + result, err := g.pipelineStorage.ListChannelRules(c.Req.Context(), c.OrgId) if err != nil { return response.Error(http.StatusInternalServerError, "Failed to get channel rules", err) } @@ -1003,15 +1005,31 @@ type DryRunRuleStorage struct { ChannelRules []pipeline.ChannelRule } -func (s *DryRunRuleStorage) CreateChannelRule(_ context.Context, _ int64, _ pipeline.ChannelRule) (pipeline.ChannelRule, error) { +func (s *DryRunRuleStorage) GetRemoteWriteBackend(_ context.Context, _ int64, _ pipeline.RemoteWriteBackendGetCmd) (pipeline.RemoteWriteBackend, bool, error) { + return pipeline.RemoteWriteBackend{}, false, errors.New("not implemented by dry run rule storage") +} + +func (s *DryRunRuleStorage) CreateRemoteWriteBackend(_ context.Context, _ int64, _ pipeline.RemoteWriteBackendCreateCmd) (pipeline.RemoteWriteBackend, error) { + return pipeline.RemoteWriteBackend{}, errors.New("not implemented by dry run rule storage") +} + +func (s *DryRunRuleStorage) UpdateRemoteWriteBackend(_ context.Context, _ int64, _ pipeline.RemoteWriteBackendUpdateCmd) (pipeline.RemoteWriteBackend, error) { + return pipeline.RemoteWriteBackend{}, errors.New("not implemented by dry run rule storage") +} + +func (s *DryRunRuleStorage) DeleteRemoteWriteBackend(_ context.Context, _ int64, _ pipeline.RemoteWriteBackendDeleteCmd) error { + return errors.New("not implemented by dry run rule storage") +} + +func (s *DryRunRuleStorage) CreateChannelRule(_ context.Context, _ int64, _ pipeline.ChannelRuleCreateCmd) (pipeline.ChannelRule, error) { return pipeline.ChannelRule{}, errors.New("not implemented by dry run rule storage") } -func (s *DryRunRuleStorage) UpdateChannelRule(_ context.Context, _ int64, _ pipeline.ChannelRule) (pipeline.ChannelRule, error) { +func (s *DryRunRuleStorage) UpdateChannelRule(_ context.Context, _ int64, _ pipeline.ChannelRuleUpdateCmd) (pipeline.ChannelRule, error) { return pipeline.ChannelRule{}, errors.New("not implemented by dry run rule storage") } -func (s *DryRunRuleStorage) DeleteChannelRule(_ context.Context, _ int64, _ string) error { +func (s *DryRunRuleStorage) DeleteChannelRule(_ context.Context, _ int64, _ pipeline.ChannelRuleDeleteCmd) error { return errors.New("not implemented by dry run rule storage") } @@ -1041,7 +1059,7 @@ func (g *GrafanaLive) HandlePipelineConvertTestHTTP(c *models.ReqContext) respon Node: g.node, ManagedStream: g.ManagedStreamRunner, FrameStorage: pipeline.NewFrameStorage(), - RuleStorage: storage, + Storage: storage, ChannelHandlerGetter: g, } channelRuleGetter := pipeline.NewCacheSegmentedTree(builder) @@ -1074,17 +1092,17 @@ func (g *GrafanaLive) HandleChannelRulesPostHTTP(c *models.ReqContext) response. if err != nil { return response.Error(http.StatusInternalServerError, "Error reading body", err) } - var rule pipeline.ChannelRule - err = json.Unmarshal(body, &rule) + var cmd pipeline.ChannelRuleCreateCmd + err = json.Unmarshal(body, &cmd) if err != nil { return response.Error(http.StatusBadRequest, "Error decoding channel rule", err) } - result, err := g.channelRuleStorage.CreateChannelRule(c.Req.Context(), c.OrgId, rule) + rule, err := g.pipelineStorage.CreateChannelRule(c.Req.Context(), c.OrgId, cmd) if err != nil { return response.Error(http.StatusInternalServerError, "Failed to create channel rule", err) } return response.JSON(http.StatusOK, util.DynMap{ - "rule": result, + "rule": rule, }) } @@ -1094,15 +1112,15 @@ func (g *GrafanaLive) HandleChannelRulesPutHTTP(c *models.ReqContext) response.R if err != nil { return response.Error(http.StatusInternalServerError, "Error reading body", err) } - var rule pipeline.ChannelRule - err = json.Unmarshal(body, &rule) + var cmd pipeline.ChannelRuleUpdateCmd + err = json.Unmarshal(body, &cmd) if err != nil { return response.Error(http.StatusBadRequest, "Error decoding channel rule", err) } - if rule.Pattern == "" { + if cmd.Pattern == "" { return response.Error(http.StatusBadRequest, "Rule pattern required", nil) } - rule, err = g.channelRuleStorage.UpdateChannelRule(c.Req.Context(), c.OrgId, rule) + rule, err := g.pipelineStorage.UpdateChannelRule(c.Req.Context(), c.OrgId, cmd) if err != nil { return response.Error(http.StatusInternalServerError, "Failed to update channel rule", err) } @@ -1117,15 +1135,15 @@ func (g *GrafanaLive) HandleChannelRulesDeleteHTTP(c *models.ReqContext) respons if err != nil { return response.Error(http.StatusInternalServerError, "Error reading body", err) } - var rule pipeline.ChannelRule - err = json.Unmarshal(body, &rule) + var cmd pipeline.ChannelRuleDeleteCmd + err = json.Unmarshal(body, &cmd) if err != nil { return response.Error(http.StatusBadRequest, "Error decoding channel rule", err) } - if rule.Pattern == "" { + if cmd.Pattern == "" { return response.Error(http.StatusBadRequest, "Rule pattern required", nil) } - err = g.channelRuleStorage.DeleteChannelRule(c.Req.Context(), c.OrgId, rule.Pattern) + err = g.pipelineStorage.DeleteChannelRule(c.Req.Context(), c.OrgId, cmd) if err != nil { return response.Error(http.StatusInternalServerError, "Failed to delete channel rule", err) } @@ -1145,15 +1163,104 @@ func (g *GrafanaLive) HandlePipelineEntitiesListHTTP(_ *models.ReqContext) respo // HandleRemoteWriteBackendsListHTTP ... func (g *GrafanaLive) HandleRemoteWriteBackendsListHTTP(c *models.ReqContext) response.Response { - result, err := g.channelRuleStorage.ListRemoteWriteBackends(c.Req.Context(), c.OrgId) + backends, err := g.pipelineStorage.ListRemoteWriteBackends(c.Req.Context(), c.OrgId) if err != nil { - return response.Error(http.StatusInternalServerError, "Failed to get channel rules", err) + return response.Error(http.StatusInternalServerError, "Failed to get remote write backends", err) + } + result := make([]pipeline.RemoteWriteBackendDto, 0, len(backends)) + for _, b := range backends { + result = append(result, pipeline.RemoteWriteBackendToDto(b)) } return response.JSON(http.StatusOK, util.DynMap{ "remoteWriteBackends": result, }) } +// HandleChannelRulesPostHTTP ... +func (g *GrafanaLive) HandleRemoteWriteBackendsPostHTTP(c *models.ReqContext) response.Response { + body, err := ioutil.ReadAll(c.Req.Body) + if err != nil { + return response.Error(http.StatusInternalServerError, "Error reading body", err) + } + var cmd pipeline.RemoteWriteBackendCreateCmd + err = json.Unmarshal(body, &cmd) + if err != nil { + return response.Error(http.StatusBadRequest, "Error decoding remote write backend", err) + } + result, err := g.pipelineStorage.CreateRemoteWriteBackend(c.Req.Context(), c.OrgId, cmd) + if err != nil { + return response.Error(http.StatusInternalServerError, "Failed to create remote write backend", err) + } + return response.JSON(http.StatusOK, util.DynMap{ + "remoteWriteBackend": pipeline.RemoteWriteBackendToDto(result), + }) +} + +// HandleChannelRulesPutHTTP ... +func (g *GrafanaLive) HandleRemoteWriteBackendsPutHTTP(c *models.ReqContext) response.Response { + body, err := ioutil.ReadAll(c.Req.Body) + if err != nil { + return response.Error(http.StatusInternalServerError, "Error reading body", err) + } + var cmd pipeline.RemoteWriteBackendUpdateCmd + err = json.Unmarshal(body, &cmd) + if err != nil { + return response.Error(http.StatusBadRequest, "Error decoding remote write backend", err) + } + if cmd.UID == "" { + return response.Error(http.StatusBadRequest, "UID required", nil) + } + existingBackend, ok, err := g.pipelineStorage.GetRemoteWriteBackend(c.Req.Context(), c.OrgId, pipeline.RemoteWriteBackendGetCmd{ + UID: cmd.UID, + }) + if err != nil { + return response.Error(http.StatusInternalServerError, "Failed to get remote write backend", err) + } + if ok { + if cmd.SecureSettings == nil { + cmd.SecureSettings = map[string]string{} + } + secureJSONData, err := g.EncryptionService.DecryptJsonData(c.Req.Context(), existingBackend.SecureSettings, setting.SecretKey) + if err != nil { + logger.Error("Error decrypting secure settings", "error", err) + return response.Error(http.StatusInternalServerError, "Error decrypting secure settings", err) + } + for k, v := range secureJSONData { + if _, ok := cmd.SecureSettings[k]; !ok { + cmd.SecureSettings[k] = v + } + } + } + result, err := g.pipelineStorage.UpdateRemoteWriteBackend(c.Req.Context(), c.OrgId, cmd) + if err != nil { + return response.Error(http.StatusInternalServerError, "Failed to update remote write backend", err) + } + return response.JSON(http.StatusOK, util.DynMap{ + "remoteWriteBackend": pipeline.RemoteWriteBackendToDto(result), + }) +} + +// HandleChannelRulesDeleteHTTP ... +func (g *GrafanaLive) HandleRemoteWriteBackendsDeleteHTTP(c *models.ReqContext) response.Response { + body, err := ioutil.ReadAll(c.Req.Body) + if err != nil { + return response.Error(http.StatusInternalServerError, "Error reading body", err) + } + var cmd pipeline.RemoteWriteBackendDeleteCmd + err = json.Unmarshal(body, &cmd) + if err != nil { + return response.Error(http.StatusBadRequest, "Error decoding remote write backend", err) + } + if cmd.UID == "" { + return response.Error(http.StatusBadRequest, "UID required", nil) + } + err = g.pipelineStorage.DeleteRemoteWriteBackend(c.Req.Context(), c.OrgId, cmd) + if err != nil { + return response.Error(http.StatusInternalServerError, "Failed to delete remote write backend", err) + } + return response.JSON(http.StatusOK, util.DynMap{}) +} + // Write to the standard log15 logger func handleLog(msg centrifuge.LogEntry) { arr := make([]interface{}, 0) diff --git a/pkg/services/live/pipeline/config.go b/pkg/services/live/pipeline/config.go index 5b4511db31d..d40c08a680a 100644 --- a/pkg/services/live/pipeline/config.go +++ b/pkg/services/live/pipeline/config.go @@ -4,6 +4,10 @@ import ( "context" "fmt" + "github.com/grafana/grafana/pkg/setting" + + "github.com/grafana/grafana/pkg/services/encryption" + "github.com/grafana/grafana/pkg/models" "github.com/grafana/grafana/pkg/services/live/managedstream" "github.com/grafana/grafana/pkg/services/live/pipeline/pattern" @@ -45,7 +49,8 @@ type ConditionalOutputConfig struct { } type RemoteWriteOutputConfig struct { - UID string `json:"uid"` + UID string `json:"uid"` + SampleMilliseconds int64 `json:"sampleMilliseconds"` } type FrameOutputterConfig struct { @@ -144,10 +149,76 @@ func typeRegistered(entityType string, registry []EntityInfo) bool { return false } +func RemoteWriteBackendToDto(b RemoteWriteBackend) RemoteWriteBackendDto { + secureFields := make(map[string]bool, len(b.SecureSettings)) + for k := range b.SecureSettings { + secureFields[k] = true + } + return RemoteWriteBackendDto{ + UID: b.UID, + Settings: b.Settings, + SecureFields: secureFields, + } +} + +type RemoteWriteBackendDto struct { + UID string `json:"uid"` + Settings RemoteWriteSettings `json:"settings"` + SecureFields map[string]bool `json:"secureFields"` +} + +type RemoteWriteBackendGetCmd struct { + UID string `json:"uid"` +} + +type RemoteWriteBackendCreateCmd struct { + UID string `json:"uid"` + Settings RemoteWriteSettings `json:"settings"` + SecureSettings map[string]string `json:"secureSettings"` +} + +// TODO: add version field later. +type RemoteWriteBackendUpdateCmd struct { + UID string `json:"uid"` + Settings RemoteWriteSettings `json:"settings"` + SecureSettings map[string]string `json:"secureSettings"` +} + +type RemoteWriteBackendDeleteCmd struct { + UID string `json:"uid"` +} + type RemoteWriteBackend struct { - OrgId int64 `json:"-"` - UID string `json:"uid"` - Settings *RemoteWriteConfig `json:"settings"` + OrgId int64 `json:"-"` + UID string `json:"uid"` + Settings RemoteWriteSettings `json:"settings"` + SecureSettings map[string][]byte `json:"secureSettings,omitempty"` +} + +func (r RemoteWriteBackend) Valid() (bool, string) { + if r.UID == "" { + return false, "uid required" + } + if r.Settings.Endpoint == "" { + return false, "endpoint required" + } + if r.Settings.User == "" { + return false, "user required" + } + if string(r.SecureSettings["password"]) == "" && r.Settings.Password == "" { + return false, "password required" + } + return true, "" +} + +type RemoteWriteSettings struct { + // Endpoint to send streaming frames to. + Endpoint string `json:"endpoint"` + // User is a user for remote write request. + User string `json:"user"` + // Password is a plain text non-encrypted password. + // TODO: remove after integrating with the database. + Password string `json:"password,omitempty"` } type RemoteWriteBackends struct { @@ -192,20 +263,39 @@ type FrameConditionCheckerConfig struct { NumberCompareConditionConfig *NumberCompareFrameConditionConfig `json:"numberCompare,omitempty"` } -type RuleStorage interface { +type ChannelRuleCreateCmd struct { + Pattern string `json:"pattern"` + Settings ChannelRuleSettings `json:"settings"` +} + +type ChannelRuleUpdateCmd struct { + Pattern string `json:"pattern"` + Settings ChannelRuleSettings `json:"settings"` +} + +type ChannelRuleDeleteCmd struct { + Pattern string `json:"pattern"` +} + +type Storage interface { ListRemoteWriteBackends(_ context.Context, orgID int64) ([]RemoteWriteBackend, error) + GetRemoteWriteBackend(_ context.Context, orgID int64, cmd RemoteWriteBackendGetCmd) (RemoteWriteBackend, bool, error) + CreateRemoteWriteBackend(_ context.Context, orgID int64, cmd RemoteWriteBackendCreateCmd) (RemoteWriteBackend, error) + UpdateRemoteWriteBackend(_ context.Context, orgID int64, cmd RemoteWriteBackendUpdateCmd) (RemoteWriteBackend, error) + DeleteRemoteWriteBackend(_ context.Context, orgID int64, cmd RemoteWriteBackendDeleteCmd) error ListChannelRules(_ context.Context, orgID int64) ([]ChannelRule, error) - CreateChannelRule(_ context.Context, orgID int64, rule ChannelRule) (ChannelRule, error) - UpdateChannelRule(_ context.Context, orgID int64, rule ChannelRule) (ChannelRule, error) - DeleteChannelRule(_ context.Context, orgID int64, pattern string) error + CreateChannelRule(_ context.Context, orgID int64, cmd ChannelRuleCreateCmd) (ChannelRule, error) + UpdateChannelRule(_ context.Context, orgID int64, cmd ChannelRuleUpdateCmd) (ChannelRule, error) + DeleteChannelRule(_ context.Context, orgID int64, cmd ChannelRuleDeleteCmd) error } type StorageRuleBuilder struct { Node *centrifuge.Node ManagedStream *managedstream.Runner FrameStorage *FrameStorage - RuleStorage RuleStorage + Storage Storage ChannelHandlerGetter ChannelHandlerGetter + EncryptionService encryption.Service } func (f *StorageRuleBuilder) extractSubscriber(config *SubscriberConfig) (Subscriber, error) { @@ -385,11 +475,30 @@ func (f *StorageRuleBuilder) extractFrameOutputter(config *FrameOutputterConfig, if config.RemoteWriteOutputConfig == nil { return nil, missingConfiguration } - remoteWriteConfig, ok := f.getRemoteWriteConfig(config.RemoteWriteOutputConfig.UID, remoteWriteBackends) + remoteWriteBackend, ok := f.getRemoteWriteBackend(config.RemoteWriteOutputConfig.UID, remoteWriteBackends) if !ok { return nil, fmt.Errorf("unknown remote write backend uid: %s", config.RemoteWriteOutputConfig.UID) } - return NewRemoteWriteFrameOutput(*remoteWriteConfig), nil + + var password string + hasSecurePassword := len(remoteWriteBackend.SecureSettings["password"]) > 0 + if hasSecurePassword { + passwordBytes, err := f.EncryptionService.Decrypt(context.Background(), remoteWriteBackend.SecureSettings["password"], setting.SecretKey) + if err != nil { + return nil, fmt.Errorf("password can't be decrypted: %w", err) + } + password = string(passwordBytes) + } else { + // Use plain text password (should be removed upon database integration). + password = remoteWriteBackend.Settings.Password + } + + return NewRemoteWriteFrameOutput( + remoteWriteBackend.Settings.Endpoint, + remoteWriteBackend.Settings.User, + password, + config.RemoteWriteOutputConfig.SampleMilliseconds, + ), nil case FrameOutputTypeChangeLog: if config.ChangeLogOutputConfig == nil { return nil, missingConfiguration @@ -420,22 +529,22 @@ func (f *StorageRuleBuilder) extractDataOutputter(config *DataOutputterConfig) ( } } -func (f *StorageRuleBuilder) getRemoteWriteConfig(uid string, remoteWriteBackends []RemoteWriteBackend) (*RemoteWriteConfig, bool) { +func (f *StorageRuleBuilder) getRemoteWriteBackend(uid string, remoteWriteBackends []RemoteWriteBackend) (RemoteWriteBackend, bool) { for _, rwb := range remoteWriteBackends { if rwb.UID == uid { - return rwb.Settings, true + return rwb, true } } - return nil, false + return RemoteWriteBackend{}, false } func (f *StorageRuleBuilder) BuildRules(ctx context.Context, orgID int64) ([]*LiveChannelRule, error) { - channelRules, err := f.RuleStorage.ListChannelRules(ctx, orgID) + channelRules, err := f.Storage.ListChannelRules(ctx, orgID) if err != nil { return nil, err } - remoteWriteBackends, err := f.RuleStorage.ListRemoteWriteBackends(ctx, orgID) + remoteWriteBackends, err := f.Storage.ListRemoteWriteBackends(ctx, orgID) if err != nil { return nil, err } diff --git a/pkg/services/live/pipeline/devdata.go b/pkg/services/live/pipeline/devdata.go index 12ce499ef8c..086a39fc123 100644 --- a/pkg/services/live/pipeline/devdata.go +++ b/pkg/services/live/pipeline/devdata.go @@ -103,12 +103,12 @@ func (f *DevRuleBuilder) BuildRules(_ context.Context, _ int64) ([]*LiveChannelR Converter: NewJsonFrameConverter(JsonFrameConverterConfig{}), FrameOutputters: []FrameOutputter{ NewManagedStreamFrameOutput(f.ManagedStream), - NewRemoteWriteFrameOutput(RemoteWriteConfig{ - Endpoint: os.Getenv("GF_LIVE_REMOTE_WRITE_ENDPOINT"), - User: os.Getenv("GF_LIVE_REMOTE_WRITE_USER"), - Password: os.Getenv("GF_LIVE_REMOTE_WRITE_PASSWORD"), - SampleMilliseconds: 1000, - }), + NewRemoteWriteFrameOutput( + os.Getenv("GF_LIVE_REMOTE_WRITE_ENDPOINT"), + os.Getenv("GF_LIVE_REMOTE_WRITE_USER"), + os.Getenv("GF_LIVE_REMOTE_WRITE_PASSWORD"), + 1000, + ), }, Subscribers: []Subscriber{ NewBuiltinSubscriber(f.ChannelHandlerGetter), @@ -289,11 +289,12 @@ func (f *DevRuleBuilder) BuildRules(_ context.Context, _ int64) ([]*LiveChannelR }), FrameOutputters: []FrameOutputter{ NewManagedStreamFrameOutput(f.ManagedStream), - NewRemoteWriteFrameOutput(RemoteWriteConfig{ - Endpoint: os.Getenv("GF_LIVE_REMOTE_WRITE_ENDPOINT"), - User: os.Getenv("GF_LIVE_REMOTE_WRITE_USER"), - Password: os.Getenv("GF_LIVE_REMOTE_WRITE_PASSWORD"), - }), + NewRemoteWriteFrameOutput( + os.Getenv("GF_LIVE_REMOTE_WRITE_ENDPOINT"), + os.Getenv("GF_LIVE_REMOTE_WRITE_USER"), + os.Getenv("GF_LIVE_REMOTE_WRITE_PASSWORD"), + 0, + ), NewChangeLogFrameOutput(f.FrameStorage, ChangeLogOutputConfig{ FieldName: "value3", Channel: "stream/json/exact/value3/changes", @@ -323,11 +324,12 @@ func (f *DevRuleBuilder) BuildRules(_ context.Context, _ int64) ([]*LiveChannelR Pattern: "stream/json/exact/value3/changes", FrameOutputters: []FrameOutputter{ NewManagedStreamFrameOutput(f.ManagedStream), - NewRemoteWriteFrameOutput(RemoteWriteConfig{ - Endpoint: os.Getenv("GF_LIVE_REMOTE_WRITE_ENDPOINT"), - User: os.Getenv("GF_LIVE_REMOTE_WRITE_USER"), - Password: os.Getenv("GF_LIVE_REMOTE_WRITE_PASSWORD"), - }), + NewRemoteWriteFrameOutput( + os.Getenv("GF_LIVE_REMOTE_WRITE_ENDPOINT"), + os.Getenv("GF_LIVE_REMOTE_WRITE_USER"), + os.Getenv("GF_LIVE_REMOTE_WRITE_PASSWORD"), + 0, + ), }, }, { diff --git a/pkg/services/live/pipeline/frame_output_remote_write.go b/pkg/services/live/pipeline/frame_output_remote_write.go index f0c6a5e7e7b..fbd9f26334d 100644 --- a/pkg/services/live/pipeline/frame_output_remote_write.go +++ b/pkg/services/live/pipeline/frame_output_remote_write.go @@ -16,13 +16,15 @@ import ( const flushInterval = 15 * time.Second -type RemoteWriteConfig struct { +type RemoteWriteFrameOutput struct { + mu sync.Mutex + // Endpoint to send streaming frames to. - Endpoint string `json:"endpoint"` + Endpoint string // User is a user for remote write request. - User string `json:"user"` + User string // Password for remote write endpoint. - Password string `json:"password"` + Password string // SampleMilliseconds allow defining an interval to sample points inside a channel // when outputting to remote write endpoint (on __name__ label basis). For example // when having a 20Hz stream and SampleMilliseconds 1000 then only one point in a @@ -30,22 +32,21 @@ type RemoteWriteConfig struct { // If not set - then no down-sampling will be performed. If SampleMilliseconds is // greater than flushInterval then each flush will include a point as we only keeping // track of timestamps in terms of each individual flush at the moment. - SampleMilliseconds int64 `json:"sampleMilliseconds"` -} + SampleMilliseconds int64 -type RemoteWriteFrameOutput struct { - mu sync.Mutex - config RemoteWriteConfig httpClient *http.Client buffer []prompb.TimeSeries } -func NewRemoteWriteFrameOutput(config RemoteWriteConfig) *RemoteWriteFrameOutput { +func NewRemoteWriteFrameOutput(endpoint, user, password string, sampleMilliseconds int64) *RemoteWriteFrameOutput { out := &RemoteWriteFrameOutput{ - config: config, - httpClient: &http.Client{Timeout: 2 * time.Second}, + Endpoint: endpoint, + User: user, + Password: password, + SampleMilliseconds: sampleMilliseconds, + httpClient: &http.Client{Timeout: 2 * time.Second}, } - if config.Endpoint != "" { + if out.Endpoint != "" { go out.flushPeriodically() } return out @@ -104,7 +105,7 @@ func (out *RemoteWriteFrameOutput) sample(timeSeries []prompb.TimeSeries) []prom // In-place filtering, see https://github.com/golang/go/wiki/SliceTricks#filter-in-place. n := 0 for _, s := range ts.Samples { - if lastTimestamp == 0 || s.Timestamp > lastTimestamp+out.config.SampleMilliseconds { + if lastTimestamp == 0 || s.Timestamp > lastTimestamp+out.SampleMilliseconds { ts.Samples[n] = s n++ lastTimestamp = s.Timestamp @@ -132,7 +133,7 @@ func (out *RemoteWriteFrameOutput) flush(timeSeries []prompb.TimeSeries) error { } logger.Debug("Remote write flush", "numTimeSeries", len(timeSeries), "numSamples", numSamples) - if out.config.SampleMilliseconds > 0 { + if out.SampleMilliseconds > 0 { timeSeries = out.sample(timeSeries) numSamples = 0 for _, ts := range timeSeries { @@ -144,15 +145,17 @@ func (out *RemoteWriteFrameOutput) flush(timeSeries []prompb.TimeSeries) error { if err != nil { return fmt.Errorf("error converting time series to bytes: %v", err) } - logger.Debug("Sending to remote write endpoint", "url", out.config.Endpoint, "bodyLength", len(remoteWriteData)) - req, err := http.NewRequest(http.MethodPost, out.config.Endpoint, bytes.NewReader(remoteWriteData)) + logger.Debug("Sending to remote write endpoint", "url", out.Endpoint, "bodyLength", len(remoteWriteData)) + req, err := http.NewRequest(http.MethodPost, out.Endpoint, bytes.NewReader(remoteWriteData)) if err != nil { return fmt.Errorf("error constructing remote write request: %w", err) } req.Header.Set("Content-Type", "application/x-protobuf") req.Header.Set("Content-Encoding", "snappy") req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0") - req.SetBasicAuth(out.config.User, out.config.Password) + if out.User != "" { + req.SetBasicAuth(out.User, out.Password) + } started := time.Now() resp, err := out.httpClient.Do(req) @@ -164,12 +167,12 @@ func (out *RemoteWriteFrameOutput) flush(timeSeries []prompb.TimeSeries) error { logger.Error("Unexpected response code from remote write endpoint", "code", resp.StatusCode) return errors.New("unexpected response code from remote write endpoint") } - logger.Debug("Successfully sent to remote write endpoint", "url", out.config.Endpoint, "elapsed", time.Since(started)) + logger.Debug("Successfully sent to remote write endpoint", "url", out.Endpoint, "elapsed", time.Since(started)) return nil } func (out *RemoteWriteFrameOutput) OutputFrame(_ context.Context, _ Vars, frame *data.Frame) ([]*ChannelFrame, error) { - if out.config.Endpoint == "" { + if out.Endpoint == "" { logger.Debug("Skip sending to remote write: no url") return nil, nil } diff --git a/pkg/services/live/pipeline/frame_output_remote_write_test.go b/pkg/services/live/pipeline/frame_output_remote_write_test.go index 70d94159acc..fa282b62dd9 100644 --- a/pkg/services/live/pipeline/frame_output_remote_write_test.go +++ b/pkg/services/live/pipeline/frame_output_remote_write_test.go @@ -51,9 +51,7 @@ func TestRemoteWriteFrameOutput_sample(t *testing.T) { }, }, } - out := NewRemoteWriteFrameOutput(RemoteWriteConfig{ - SampleMilliseconds: 500, - }) + out := NewRemoteWriteFrameOutput("", "", "", 500) sampledTimeSeries := out.sample(timeSeries) require.Len(t, sampledTimeSeries, 2) @@ -125,9 +123,7 @@ func TestRemoteWriteFrameOutput_sample_merge(t *testing.T) { }, }, } - out := NewRemoteWriteFrameOutput(RemoteWriteConfig{ - SampleMilliseconds: 50, - }) + out := NewRemoteWriteFrameOutput("", "", "", 50) sampledTimeSeries := out.sample(timeSeries) require.Len(t, sampledTimeSeries, 2) diff --git a/pkg/services/live/pipeline/storage_file.go b/pkg/services/live/pipeline/storage_file.go index 042261355eb..f461a086b5f 100644 --- a/pkg/services/live/pipeline/storage_file.go +++ b/pkg/services/live/pipeline/storage_file.go @@ -8,11 +8,16 @@ import ( "io/ioutil" "os" "path/filepath" + + "github.com/grafana/grafana/pkg/services/encryption" + "github.com/grafana/grafana/pkg/setting" + "github.com/grafana/grafana/pkg/util" ) // FileStorage can load channel rules from a file on disk. type FileStorage struct { - DataPath string + DataPath string + EncryptionService encryption.Service } func (f *FileStorage) ListRemoteWriteBackends(_ context.Context, orgID int64) ([]RemoteWriteBackend, error) { @@ -37,6 +42,118 @@ func (f *FileStorage) ListRemoteWriteBackends(_ context.Context, orgID int64) ([ return backends, nil } +func (f *FileStorage) GetRemoteWriteBackend(_ context.Context, orgID int64, cmd RemoteWriteBackendGetCmd) (RemoteWriteBackend, bool, error) { + remoteWriteBackends, err := f.readRemoteWriteBackends() + if err != nil { + return RemoteWriteBackend{}, false, fmt.Errorf("can't remote write backends: %w", err) + } + for _, existingBackend := range remoteWriteBackends.Backends { + if uidMatch(orgID, cmd.UID, existingBackend) { + return existingBackend, true, nil + } + } + return RemoteWriteBackend{}, false, nil +} + +func (f *FileStorage) CreateRemoteWriteBackend(ctx context.Context, orgID int64, cmd RemoteWriteBackendCreateCmd) (RemoteWriteBackend, error) { + remoteWriteBackends, err := f.readRemoteWriteBackends() + if err != nil { + return RemoteWriteBackend{}, fmt.Errorf("can't read remote write backends: %w", err) + } + if cmd.UID == "" { + cmd.UID = util.GenerateShortUID() + } + + secureSettings, err := f.EncryptionService.EncryptJsonData(ctx, cmd.SecureSettings, setting.SecretKey) + if err != nil { + return RemoteWriteBackend{}, fmt.Errorf("error encrypting data: %w", err) + } + + backend := RemoteWriteBackend{ + OrgId: orgID, + UID: cmd.UID, + Settings: cmd.Settings, + SecureSettings: secureSettings, + } + + ok, reason := backend.Valid() + if !ok { + return RemoteWriteBackend{}, fmt.Errorf("invalid remote write backend: %s", reason) + } + for _, existingBackend := range remoteWriteBackends.Backends { + if uidMatch(orgID, backend.UID, existingBackend) { + return RemoteWriteBackend{}, fmt.Errorf("backend already exists in org: %s", backend.UID) + } + } + remoteWriteBackends.Backends = append(remoteWriteBackends.Backends, backend) + err = f.saveRemoteWriteBackends(orgID, remoteWriteBackends) + return backend, err +} + +func (f *FileStorage) UpdateRemoteWriteBackend(ctx context.Context, orgID int64, cmd RemoteWriteBackendUpdateCmd) (RemoteWriteBackend, error) { + remoteWriteBackends, err := f.readRemoteWriteBackends() + if err != nil { + return RemoteWriteBackend{}, fmt.Errorf("can't read remote write backends: %w", err) + } + + secureSettings, err := f.EncryptionService.EncryptJsonData(ctx, cmd.SecureSettings, setting.SecretKey) + if err != nil { + return RemoteWriteBackend{}, fmt.Errorf("error encrypting data: %w", err) + } + + backend := RemoteWriteBackend{ + OrgId: orgID, + UID: cmd.UID, + Settings: cmd.Settings, + SecureSettings: secureSettings, + } + + ok, reason := backend.Valid() + if !ok { + return RemoteWriteBackend{}, fmt.Errorf("invalid channel rule: %s", reason) + } + + index := -1 + + for i, existingBackend := range remoteWriteBackends.Backends { + if uidMatch(orgID, backend.UID, existingBackend) { + index = i + break + } + } + if index > -1 { + remoteWriteBackends.Backends[index] = backend + } else { + return f.CreateRemoteWriteBackend(ctx, orgID, RemoteWriteBackendCreateCmd(cmd)) + } + + err = f.saveRemoteWriteBackends(orgID, remoteWriteBackends) + return backend, err +} + +func (f *FileStorage) DeleteRemoteWriteBackend(_ context.Context, orgID int64, cmd RemoteWriteBackendDeleteCmd) error { + remoteWriteBackends, err := f.readRemoteWriteBackends() + if err != nil { + return fmt.Errorf("can't read remote write backends: %w", err) + } + + index := -1 + for i, existingBackend := range remoteWriteBackends.Backends { + if uidMatch(orgID, cmd.UID, existingBackend) { + index = i + break + } + } + + if index > -1 { + remoteWriteBackends.Backends = removeRemoteWriteBackendByIndex(remoteWriteBackends.Backends, index) + } else { + return fmt.Errorf("remote write backend not found") + } + + return f.saveRemoteWriteBackends(orgID, remoteWriteBackends) +} + func (f *FileStorage) ListChannelRules(_ context.Context, orgID int64) ([]ChannelRule, error) { channelRules, err := f.readRules() if err != nil { @@ -51,11 +168,18 @@ func (f *FileStorage) ListChannelRules(_ context.Context, orgID int64) ([]Channe return rules, nil } -func (f *FileStorage) CreateChannelRule(_ context.Context, orgID int64, rule ChannelRule) (ChannelRule, error) { +func (f *FileStorage) CreateChannelRule(_ context.Context, orgID int64, cmd ChannelRuleCreateCmd) (ChannelRule, error) { channelRules, err := f.readRules() if err != nil { - return rule, fmt.Errorf("can't read channel rules: %w", err) + return ChannelRule{}, fmt.Errorf("can't read channel rules: %w", err) + } + + rule := ChannelRule{ + OrgId: orgID, + Pattern: cmd.Pattern, + Settings: cmd.Settings, } + ok, reason := rule.Valid() if !ok { return rule, fmt.Errorf("invalid channel rule: %s", reason) @@ -74,10 +198,20 @@ func patternMatch(orgID int64, pattern string, existingRule ChannelRule) bool { return pattern == existingRule.Pattern && (existingRule.OrgId == orgID || (existingRule.OrgId == 0 && orgID == 1)) } -func (f *FileStorage) UpdateChannelRule(ctx context.Context, orgID int64, rule ChannelRule) (ChannelRule, error) { +func uidMatch(orgID int64, uid string, existingBackend RemoteWriteBackend) bool { + return uid == existingBackend.UID && (existingBackend.OrgId == orgID || (existingBackend.OrgId == 0 && orgID == 1)) +} + +func (f *FileStorage) UpdateChannelRule(ctx context.Context, orgID int64, cmd ChannelRuleUpdateCmd) (ChannelRule, error) { channelRules, err := f.readRules() if err != nil { - return rule, fmt.Errorf("can't read channel rules: %w", err) + return ChannelRule{}, fmt.Errorf("can't read channel rules: %w", err) + } + + rule := ChannelRule{ + OrgId: orgID, + Pattern: cmd.Pattern, + Settings: cmd.Settings, } ok, reason := rule.Valid() @@ -96,7 +230,7 @@ func (f *FileStorage) UpdateChannelRule(ctx context.Context, orgID int64, rule C if index > -1 { channelRules.Rules[index] = rule } else { - return f.CreateChannelRule(ctx, orgID, rule) + return f.CreateChannelRule(ctx, orgID, ChannelRuleCreateCmd(cmd)) } err = f.saveChannelRules(orgID, channelRules) @@ -149,7 +283,7 @@ func (f *FileStorage) saveChannelRules(orgID int64, rules ChannelRules) error { return nil } -func (f *FileStorage) DeleteChannelRule(_ context.Context, orgID int64, pattern string) error { +func (f *FileStorage) DeleteChannelRule(_ context.Context, orgID int64, cmd ChannelRuleDeleteCmd) error { channelRules, err := f.readRules() if err != nil { return fmt.Errorf("can't read channel rules: %w", err) @@ -157,7 +291,7 @@ func (f *FileStorage) DeleteChannelRule(_ context.Context, orgID int64, pattern index := -1 for i, existingRule := range channelRules.Rules { - if patternMatch(orgID, pattern, existingRule) { + if patternMatch(orgID, cmd.Pattern, existingRule) { index = i break } @@ -171,3 +305,45 @@ func (f *FileStorage) DeleteChannelRule(_ context.Context, orgID int64, pattern return f.saveChannelRules(orgID, channelRules) } + +func removeRemoteWriteBackendByIndex(s []RemoteWriteBackend, index int) []RemoteWriteBackend { + return append(s[:index], s[index+1:]...) +} + +func (f *FileStorage) remoteWriteFilePath() string { + return filepath.Join(f.DataPath, "pipeline", "remote-write-backends.json") +} + +func (f *FileStorage) readRemoteWriteBackends() (RemoteWriteBackends, error) { + filePath := f.remoteWriteFilePath() + // Safe to ignore gosec warning G304. + // nolint:gosec + bytes, err := ioutil.ReadFile(filePath) + if err != nil { + return RemoteWriteBackends{}, fmt.Errorf("can't read %s file: %w", filePath, err) + } + var remoteWriteBackends RemoteWriteBackends + err = json.Unmarshal(bytes, &remoteWriteBackends) + if err != nil { + return RemoteWriteBackends{}, fmt.Errorf("can't unmarshal %s data: %w", filePath, err) + } + return remoteWriteBackends, nil +} + +func (f *FileStorage) saveRemoteWriteBackends(_ int64, remoteWriteBackends RemoteWriteBackends) error { + filePath := f.remoteWriteFilePath() + // Safe to ignore gosec warning G304. + // nolint:gosec + file, err := os.OpenFile(filePath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600) + if err != nil { + return fmt.Errorf("can't open channel remote write backends file: %w", err) + } + defer func() { _ = file.Close() }() + enc := json.NewEncoder(file) + enc.SetIndent("", " ") + err = enc.Encode(remoteWriteBackends) + if err != nil { + return fmt.Errorf("can't save remote write backends to file: %w", err) + } + return nil +}