From a712f1a2315967d976b1104b1900c38244142d25 Mon Sep 17 00:00:00 2001 From: woodsaj Date: Tue, 3 Feb 2015 23:57:42 +0800 Subject: [PATCH 1/5] Add inital implementation of Notification events. If notifications are enabled in the config, Adds a eventHandler accepting Notification{} payloads to the internal Bus. The eventHandler then marshals the payload into json and sends it to a rabbitmq topic exchange using the Notification.Priority+Noticiation.EventType as the routing key. eg. INFO.account.created Currently, notifications are only being emitted for INFO.account.created INFO.account.updated INFO.user.created INFO.user.updated --- conf/grafana.ini | 5 +- grafana | 2 +- pkg/bus/bus.go | 14 ++- pkg/cmd/web.go | 9 +- pkg/models/notification.go | 21 ++++ pkg/services/notification/notification.go | 130 ++++++++++++++++++++++ pkg/services/sqlstore/account.go | 18 +++ pkg/services/sqlstore/user.go | 16 +++ pkg/setting/setting.go | 17 +++ 9 files changed, 223 insertions(+), 9 deletions(-) create mode 100644 pkg/models/notification.go create mode 100644 pkg/services/notification/notification.go diff --git a/conf/grafana.ini b/conf/grafana.ini index 4fc77206190..5f70bf3f9da 100644 --- a/conf/grafana.ini +++ b/conf/grafana.ini @@ -121,4 +121,7 @@ daily_rotate = true ; Expired days of log file(delete after max days), default is 7 max_days = 7 - +[notifications] +enabled = false +rabbitmq_url = amqp://localhost/ +notifications_exchange = notifications diff --git a/grafana b/grafana index 7fef460fa2b..0fe83d51981 160000 --- a/grafana +++ b/grafana @@ -1 +1 @@ -Subproject commit 7fef460fa2b6034429b205dc63d9b9f298f43fb4 +Subproject commit 0fe83d51981333600f1e3801044fc1cfd5acf1ae diff --git a/pkg/bus/bus.go b/pkg/bus/bus.go index 7afff22eb4e..f6c313b91a2 100644 --- a/pkg/bus/bus.go +++ b/pkg/bus/bus.go @@ -52,7 +52,6 @@ func (b *InProcBus) Dispatch(msg Msg) error { func (b *InProcBus) Publish(msg Msg) error { var msgName = reflect.TypeOf(msg).Elem().Name() - var listeners = b.listeners[msgName] if len(listeners) == 0 { return nil @@ -61,7 +60,7 @@ func (b *InProcBus) Publish(msg Msg) error { var params = make([]reflect.Value, 1) params[0] = reflect.ValueOf(msg) - for listenerHandler := range listeners { + for _, listenerHandler := range listeners { ret := reflect.ValueOf(listenerHandler).Call(params) err := ret[0].Interface() if err != nil { @@ -81,12 +80,11 @@ func (b *InProcBus) AddHandler(handler HandlerFunc) { func (b *InProcBus) AddEventListener(handler HandlerFunc) { handlerType := reflect.TypeOf(handler) eventName := handlerType.In(0).Elem().Name() - list, exists := b.listeners[eventName] + _, exists := b.listeners[eventName] if !exists { - list = make([]HandlerFunc, 0) - b.listeners[eventName] = list + b.listeners[eventName] = make([]HandlerFunc, 0) } - list = append(list, handler) + b.listeners[eventName] = append(b.listeners[eventName], handler) } // Package level functions @@ -102,3 +100,7 @@ func AddEventListener(handler HandlerFunc) { func Dispatch(msg Msg) error { return globalBus.Dispatch(msg) } + +func Publish(msg Msg) error { + return globalBus.Publish(msg) +} \ No newline at end of file diff --git a/pkg/cmd/web.go b/pkg/cmd/web.go index 73bd64d5f10..caef2e9f64d 100644 --- a/pkg/cmd/web.go +++ b/pkg/cmd/web.go @@ -19,6 +19,7 @@ import ( "github.com/torkelo/grafana-pro/pkg/services/sqlstore" "github.com/torkelo/grafana-pro/pkg/setting" "github.com/torkelo/grafana-pro/pkg/social" + "github.com/torkelo/grafana-pro/pkg/services/notification" ) var CmdWeb = cli.Command{ @@ -81,11 +82,17 @@ func runWeb(c *cli.Context) { social.NewOAuthService() sqlstore.NewEngine() sqlstore.EnsureAdminUser() + var err error + if setting.NotificationsEnabled { + err = notification.Init(setting.RabbitmqUrl, setting.NotificationsExchange) + if err != nil { + log.Fatal(4, "Failed to connect to notification queue: %v", err) + } + } m := newMacaron() api.Register(m) - var err error listenAddr := fmt.Sprintf("%s:%s", setting.HttpAddr, setting.HttpPort) log.Info("Listen: %v://%s%s", setting.Protocol, listenAddr, setting.AppSubUrl) switch setting.Protocol { diff --git a/pkg/models/notification.go b/pkg/models/notification.go new file mode 100644 index 00000000000..f849c4f6274 --- /dev/null +++ b/pkg/models/notification.go @@ -0,0 +1,21 @@ +package models + +import ( + "time" +) + +type EventPriority string + +const ( + PRIO_DEBUG EventPriority = "DEBUG" + PRIO_INFO EventPriority = "INFO" + PRIO_ERROR EventPriority = "ERROR" +) + +type Notification struct { + EventType string `json:"event_type"` + Timestamp time.Time `json:"timestamp"` + Priority EventPriority `json:"priority"` + Payload interface{} `json:"payload"` +} + diff --git a/pkg/services/notification/notification.go b/pkg/services/notification/notification.go new file mode 100644 index 00000000000..833fd29bf43 --- /dev/null +++ b/pkg/services/notification/notification.go @@ -0,0 +1,130 @@ +package notification + +import ( + "fmt" + "time" + "encoding/json" + "github.com/streadway/amqp" + "github.com/torkelo/grafana-pro/pkg/bus" + m "github.com/torkelo/grafana-pro/pkg/models" +) + +var ( + url string + exchange string + conn *amqp.Connection + channel *amqp.Channel +) + +func getConnection() (*amqp.Connection, error) { + c, err := amqp.Dial(url) + if err != nil { + return nil, err + } + return c, err +} + +func getChannel() (*amqp.Channel, error) { + ch, err := conn.Channel() + if err != nil { + return nil, err + } + + err = ch.ExchangeDeclare( + exchange, // name + "topic", // type + true, // durable + false, // auto-deleted + false, // internal + false, // no-wait + nil, // arguments + ) + if (err != nil) { + return nil, err + } + return ch, err +} + +func Init(rabbitUrl string, exchangeName string) error { + url = rabbitUrl + exchange = exchangeName + bus.AddEventListener(NotificationHandler) + return Setup() +} + +// Every connection should declare the topology they expect +func Setup() error { + c, err := getConnection() + if err != nil { + return err + } + conn = c + ch, err := getChannel() + if err != nil { + return err + } + + channel = ch + + // listen for close events so we can reconnect. + errChan := channel.NotifyClose(make(chan *amqp.Error)) + go func() { + for e := range errChan { + fmt.Println("connection to rabbitmq lost.") + fmt.Println(e) + fmt.Println("attempting to create new rabbitmq channel.") + ch, err := getChannel() + if err == nil { + channel = ch + break + } + + //could not create channel, so lets close the connection + // and re-create. + _ = conn.Close() + + for err != nil { + time.Sleep(2 * time.Second) + fmt.Println("attempting to reconnect to rabbitmq.") + err = Setup() + } + fmt.Println("Connected to rabbitmq again.") + } + }() + + return nil +} + +func Publish(routingKey string, msgString []byte) { + err := channel.Publish( + exchange, //exchange + routingKey, // routing key + false, // mandatory + false, // immediate + amqp.Publishing{ + ContentType: "application/json", + Body: msgString, + }, + ) + if err != nil { + // failures are most likely because the connection was lost. + // the connection will be re-established, so just keep + // retrying every 2seconds until we successfully publish. + time.Sleep(2 * time.Second) + fmt.Println("publish failed, retrying."); + Publish(routingKey, msgString) + } + return +} + +func NotificationHandler(event *m.Notification) error { + msgString, err := json.Marshal(event) + if err != nil { + return err + } + routingKey := fmt.Sprintf("%s.%s", event.Priority, event.EventType) + // this is run in a greenthread and we expect that publish will keep + // retrying until the message gets sent. + go Publish(routingKey, msgString) + return nil +} \ No newline at end of file diff --git a/pkg/services/sqlstore/account.go b/pkg/services/sqlstore/account.go index 89c5b1ac46a..11cde674268 100644 --- a/pkg/services/sqlstore/account.go +++ b/pkg/services/sqlstore/account.go @@ -72,6 +72,14 @@ func CreateAccount(cmd *m.CreateAccountCommand) error { _, err := sess.Insert(&user) cmd.Result = account + // silently ignore failures to publish events. + _ = bus.Publish(&m.Notification{ + EventType: "account.create", + Timestamp: account.Created, + Priority: m.PRIO_INFO, + Payload: account, + }) + return err }) } @@ -85,6 +93,16 @@ func UpdateAccount(cmd *m.UpdateAccountCommand) error { } _, err := sess.Id(cmd.AccountId).Update(&account) + if err == nil { + // silently ignore failures to publish events. + account.Id = cmd.AccountId + _ = bus.Publish(&m.Notification{ + EventType: "account.update", + Timestamp: account.Updated, + Priority: m.PRIO_INFO, + Payload: account, + }) + } return err }) } diff --git a/pkg/services/sqlstore/user.go b/pkg/services/sqlstore/user.go index 67a78aa3ec6..c03057705f5 100644 --- a/pkg/services/sqlstore/user.go +++ b/pkg/services/sqlstore/user.go @@ -95,6 +95,12 @@ func CreateUser(cmd *m.CreateUserCommand) error { _, err = sess.Insert(&accountUser) cmd.Result = user + _ = bus.Publish(&m.Notification{ + EventType: "user.create", + Timestamp: user.Created, + Priority: m.PRIO_INFO, + Payload: user, + }) return err }) } @@ -135,6 +141,16 @@ func UpdateUser(cmd *m.UpdateUserCommand) error { } _, err := sess.Id(cmd.UserId).Update(&user) + if err == nil { + // silently ignore failures to publish events. + user.Id = cmd.UserId + _ = bus.Publish(&m.Notification{ + EventType: "user.update", + Timestamp: user.Updated, + Priority: m.PRIO_INFO, + Payload: user, + }) + } return err }) } diff --git a/pkg/setting/setting.go b/pkg/setting/setting.go index 3b65790295e..64df98603b0 100644 --- a/pkg/setting/setting.go +++ b/pkg/setting/setting.go @@ -93,6 +93,12 @@ var ( // PhantomJs Rendering ImagesDir string PhantomDir string + + //Notifications + NotificationsEnabled bool + RabbitmqUrl string + NotificationsExchange string + ) func init() { @@ -219,6 +225,17 @@ func NewConfigContext() { LogRootPath = Cfg.Section("log").Key("root_path").MustString(path.Join(WorkDir, "/data/log")) + // Notifications + NotificationsEnabled = Cfg.Section("notifications").Key("enabled").MustBool(false) + RabbitmqUrl = Cfg.Section("notifications").Key("rabbitmq_url").MustString("amqp://localhost/") + + // validate rabbitmqUrl. + _, err = url.Parse(RabbitmqUrl) + if err != nil { + log.Fatal(4, "Invalid rabbitmq_url(%s): %s", RabbitmqUrl, err) + } + NotificationsExchange = Cfg.Section("notifications").Key("notifications_exchange").MustString("notifications") + readSessionConfig() } From 37523791061ef13c34066fc39732e2a519f2310b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Torkel=20=C3=96degaard?= Date: Wed, 4 Feb 2015 15:37:26 +0100 Subject: [PATCH 2/5] Worked on event system, needs a little more work --- conf/grafana.ini | 4 +- grafana | 2 +- pkg/bus/bus.go | 29 +++++-- pkg/bus/bus_test.go | 5 +- pkg/cmd/web.go | 10 +-- pkg/events/events.go | 8 ++ pkg/events/events_test.go | 18 ++++ .../eventpublisher.go} | 84 +++++++++++-------- pkg/services/sqlstore/account.go | 8 +- pkg/services/sqlstore/shared.go | 71 ++++++++++++++++ pkg/services/sqlstore/sqlstore.go | 24 ------ pkg/setting/setting.go | 8 +- 12 files changed, 185 insertions(+), 86 deletions(-) create mode 100644 pkg/events/events.go create mode 100644 pkg/events/events_test.go rename pkg/services/{notification/notification.go => eventpublisher/eventpublisher.go} (60%) create mode 100644 pkg/services/sqlstore/shared.go diff --git a/conf/grafana.ini b/conf/grafana.ini index 5f70bf3f9da..2d643191b4a 100644 --- a/conf/grafana.ini +++ b/conf/grafana.ini @@ -121,7 +121,7 @@ daily_rotate = true ; Expired days of log file(delete after max days), default is 7 max_days = 7 -[notifications] +[event_publisher] enabled = false rabbitmq_url = amqp://localhost/ -notifications_exchange = notifications +exchange = grafana_events diff --git a/grafana b/grafana index 0fe83d51981..07ec00641fb 160000 --- a/grafana +++ b/grafana @@ -1 +1 @@ -Subproject commit 0fe83d51981333600f1e3801044fc1cfd5acf1ae +Subproject commit 07ec00641fb6d633dc2914ff433e61db1ef8a313 diff --git a/pkg/bus/bus.go b/pkg/bus/bus.go index f6c313b91a2..2865c740fd1 100644 --- a/pkg/bus/bus.go +++ b/pkg/bus/bus.go @@ -11,13 +11,16 @@ type Msg interface{} type Bus interface { Dispatch(msg Msg) error Publish(msg Msg) error + AddHandler(handler HandlerFunc) AddEventListener(handler HandlerFunc) + AddWildcardListener(handler HandlerFunc) } type InProcBus struct { - handlers map[string]HandlerFunc - listeners map[string][]HandlerFunc + handlers map[string]HandlerFunc + listeners map[string][]HandlerFunc + wildcardListeners []HandlerFunc } // temp stuff, not sure how to handle bus instance, and init yet @@ -27,6 +30,7 @@ func New() Bus { bus := &InProcBus{} bus.handlers = make(map[string]HandlerFunc) bus.listeners = make(map[string][]HandlerFunc) + bus.wildcardListeners = make([]HandlerFunc, 0) return bus } @@ -53,9 +57,6 @@ func (b *InProcBus) Dispatch(msg Msg) error { func (b *InProcBus) Publish(msg Msg) error { var msgName = reflect.TypeOf(msg).Elem().Name() var listeners = b.listeners[msgName] - if len(listeners) == 0 { - return nil - } var params = make([]reflect.Value, 1) params[0] = reflect.ValueOf(msg) @@ -68,9 +69,21 @@ func (b *InProcBus) Publish(msg Msg) error { } } + for _, listenerHandler := range b.wildcardListeners { + ret := reflect.ValueOf(listenerHandler).Call(params) + err := ret[0].Interface() + if err != nil { + return err.(error) + } + } + return nil } +func (b *InProcBus) AddWildcardListener(handler HandlerFunc) { + b.wildcardListeners = append(b.wildcardListeners, handler) +} + func (b *InProcBus) AddHandler(handler HandlerFunc) { handlerType := reflect.TypeOf(handler) queryTypeName := handlerType.In(0).Elem().Name() @@ -97,10 +110,14 @@ func AddEventListener(handler HandlerFunc) { globalBus.AddEventListener(handler) } +func AddWildcardListener(handler HandlerFunc) { + globalBus.AddWildcardListener(handler) +} + func Dispatch(msg Msg) error { return globalBus.Dispatch(msg) } func Publish(msg Msg) error { return globalBus.Publish(msg) -} \ No newline at end of file +} diff --git a/pkg/bus/bus_test.go b/pkg/bus/bus_test.go index 45191eeb682..62e72f18308 100644 --- a/pkg/bus/bus_test.go +++ b/pkg/bus/bus_test.go @@ -2,6 +2,7 @@ package bus import ( "errors" + "fmt" "testing" ) @@ -62,7 +63,7 @@ func TestEventListeners(t *testing.T) { if err != nil { t.Fatal("Publish event failed " + err.Error()) - } else if count != 0 { - t.Fatal("Publish event failed, listeners called: %v, expected: %v", count, 11) + } else if count != 11 { + t.Fatal(fmt.Sprintf("Publish event failed, listeners called: %v, expected: %v", count, 11)) } } diff --git a/pkg/cmd/web.go b/pkg/cmd/web.go index caef2e9f64d..dfc5ff41bb3 100644 --- a/pkg/cmd/web.go +++ b/pkg/cmd/web.go @@ -16,10 +16,10 @@ import ( "github.com/torkelo/grafana-pro/pkg/api" "github.com/torkelo/grafana-pro/pkg/log" "github.com/torkelo/grafana-pro/pkg/middleware" + "github.com/torkelo/grafana-pro/pkg/services/eventpublisher" "github.com/torkelo/grafana-pro/pkg/services/sqlstore" "github.com/torkelo/grafana-pro/pkg/setting" "github.com/torkelo/grafana-pro/pkg/social" - "github.com/torkelo/grafana-pro/pkg/services/notification" ) var CmdWeb = cli.Command{ @@ -82,13 +82,9 @@ func runWeb(c *cli.Context) { social.NewOAuthService() sqlstore.NewEngine() sqlstore.EnsureAdminUser() + eventpublisher.Init() + var err error - if setting.NotificationsEnabled { - err = notification.Init(setting.RabbitmqUrl, setting.NotificationsExchange) - if err != nil { - log.Fatal(4, "Failed to connect to notification queue: %v", err) - } - } m := newMacaron() api.Register(m) diff --git a/pkg/events/events.go b/pkg/events/events.go new file mode 100644 index 00000000000..a86af406230 --- /dev/null +++ b/pkg/events/events.go @@ -0,0 +1,8 @@ +package events + +// Events can be passed to external systems via for example AMPQ +// Treat these events as basically DTOs so changes has to be backward compatible + +type AccountCreated struct { + Name string `json:"name"` +} diff --git a/pkg/events/events_test.go b/pkg/events/events_test.go new file mode 100644 index 00000000000..f527569d0cf --- /dev/null +++ b/pkg/events/events_test.go @@ -0,0 +1,18 @@ +package events + +import ( + "testing" + + . "github.com/smartystreets/goconvey/convey" +) + +func TestEventCreation(t *testing.T) { + + Convey("When generating slug", t, func() { + dashboard := NewDashboard("Grafana Play Home") + dashboard.UpdateSlug() + + So(dashboard.Slug, ShouldEqual, "grafana-play-home") + }) + +} diff --git a/pkg/services/notification/notification.go b/pkg/services/eventpublisher/eventpublisher.go similarity index 60% rename from pkg/services/notification/notification.go rename to pkg/services/eventpublisher/eventpublisher.go index 833fd29bf43..59c03f3c09a 100644 --- a/pkg/services/notification/notification.go +++ b/pkg/services/eventpublisher/eventpublisher.go @@ -1,25 +1,28 @@ -package notification +package eventpublisher import ( + "encoding/json" "fmt" + "log" + "reflect" "time" - "encoding/json" + "github.com/streadway/amqp" "github.com/torkelo/grafana-pro/pkg/bus" - m "github.com/torkelo/grafana-pro/pkg/models" + "github.com/torkelo/grafana-pro/pkg/setting" ) var ( - url string + url string exchange string - conn *amqp.Connection - channel *amqp.Channel + conn *amqp.Connection + channel *amqp.Channel ) func getConnection() (*amqp.Connection, error) { c, err := amqp.Dial(url) if err != nil { - return nil, err + return nil, err } return c, err } @@ -31,25 +34,35 @@ func getChannel() (*amqp.Channel, error) { } err = ch.ExchangeDeclare( - exchange, // name - "topic", // type - true, // durable - false, // auto-deleted - false, // internal - false, // no-wait - nil, // arguments + exchange, // name + "topic", // type + true, // durable + false, // auto-deleted + false, // internal + false, // no-wait + nil, // arguments ) - if (err != nil) { + if err != nil { return nil, err } return ch, err } -func Init(rabbitUrl string, exchangeName string) error { - url = rabbitUrl - exchange = exchangeName - bus.AddEventListener(NotificationHandler) - return Setup() +func Init() { + sec := setting.Cfg.Section("event_publisher") + + if !sec.Key("enabled").MustBool(false) { + return + } + + url = sec.Key("rabbitmq_url").String() + exchange = sec.Key("exchange").String() + bus.AddWildcardListener(eventListener) + + if err := Setup(); err != nil { + log.Fatal(4, "Failed to connect to notification queue: %v", err) + return + } } // Every connection should declare the topology they expect @@ -82,7 +95,7 @@ func Setup() error { //could not create channel, so lets close the connection // and re-create. _ = conn.Close() - + for err != nil { time.Sleep(2 * time.Second) fmt.Println("attempting to reconnect to rabbitmq.") @@ -92,39 +105,42 @@ func Setup() error { } }() - return nil + return nil } -func Publish(routingKey string, msgString []byte) { +func publish(routingKey string, msgString []byte) { err := channel.Publish( - exchange, //exchange - routingKey, // routing key - false, // mandatory + exchange, //exchange + routingKey, // routing key + false, // mandatory false, // immediate amqp.Publishing{ ContentType: "application/json", - Body: msgString, + Body: msgString, }, ) if err != nil { // failures are most likely because the connection was lost. - // the connection will be re-established, so just keep + // the connection will be re-established, so just keep // retrying every 2seconds until we successfully publish. time.Sleep(2 * time.Second) - fmt.Println("publish failed, retrying."); - Publish(routingKey, msgString) + fmt.Println("publish failed, retrying.") + publish(routingKey, msgString) } return } -func NotificationHandler(event *m.Notification) error { +func eventListener(event interface{}) error { msgString, err := json.Marshal(event) if err != nil { return err } - routingKey := fmt.Sprintf("%s.%s", event.Priority, event.EventType) + + eventType := reflect.TypeOf(event) + + routingKey := fmt.Sprintf("%s.%s", "INFO", eventType.Name()) // this is run in a greenthread and we expect that publish will keep // retrying until the message gets sent. - go Publish(routingKey, msgString) + go publish(routingKey, msgString) return nil -} \ No newline at end of file +} diff --git a/pkg/services/sqlstore/account.go b/pkg/services/sqlstore/account.go index 11cde674268..4fddee071c0 100644 --- a/pkg/services/sqlstore/account.go +++ b/pkg/services/sqlstore/account.go @@ -6,6 +6,7 @@ import ( "github.com/go-xorm/xorm" "github.com/torkelo/grafana-pro/pkg/bus" + "github.com/torkelo/grafana-pro/pkg/events" m "github.com/torkelo/grafana-pro/pkg/models" ) @@ -48,7 +49,7 @@ func GetAccountByName(query *m.GetAccountByNameQuery) error { } func CreateAccount(cmd *m.CreateAccountCommand) error { - return inTransaction(func(sess *xorm.Session) error { + return inTransaction2(func(sess *session) error { account := m.Account{ Name: cmd.Name, @@ -60,7 +61,6 @@ func CreateAccount(cmd *m.CreateAccountCommand) error { return err } - // create inital admin account user user := m.AccountUser{ AccountId: account.Id, UserId: cmd.UserId, @@ -72,6 +72,8 @@ func CreateAccount(cmd *m.CreateAccountCommand) error { _, err := sess.Insert(&user) cmd.Result = account + sess.publishAfterCommit(&events.AccountCreated{}) + // silently ignore failures to publish events. _ = bus.Publish(&m.Notification{ EventType: "account.create", @@ -79,7 +81,7 @@ func CreateAccount(cmd *m.CreateAccountCommand) error { Priority: m.PRIO_INFO, Payload: account, }) - + return err }) } diff --git a/pkg/services/sqlstore/shared.go b/pkg/services/sqlstore/shared.go new file mode 100644 index 00000000000..51300167757 --- /dev/null +++ b/pkg/services/sqlstore/shared.go @@ -0,0 +1,71 @@ +package sqlstore + +import ( + "github.com/go-xorm/xorm" + "github.com/torkelo/grafana-pro/pkg/bus" + "github.com/torkelo/grafana-pro/pkg/log" +) + +type dbTransactionFunc func(sess *xorm.Session) error +type dbTransactionFunc2 func(sess *session) error + +type session struct { + *xorm.Session + events []interface{} +} + +func (sess *session) publishAfterCommit(msg interface{}) { + sess.events = append(sess.events, msg) +} + +func inTransaction(callback dbTransactionFunc) error { + var err error + + sess := x.NewSession() + defer sess.Close() + + if err = sess.Begin(); err != nil { + return err + } + + err = callback(sess) + + if err != nil { + sess.Rollback() + return err + } else if err = sess.Commit(); err != nil { + return err + } + + return nil +} + +func inTransaction2(callback dbTransactionFunc2) error { + var err error + + sess := session{Session: x.NewSession()} + + defer sess.Close() + if err = sess.Begin(); err != nil { + return err + } + + err = callback(&sess) + + if err != nil { + sess.Rollback() + return err + } else if err = sess.Commit(); err != nil { + return err + } + + if len(sess.events) > 0 { + for _, e := range sess.events { + if err = bus.Publish(e); err != nil { + log.Error(3, "Failed to publish event after commit", err) + } + } + } + + return nil +} diff --git a/pkg/services/sqlstore/sqlstore.go b/pkg/services/sqlstore/sqlstore.go index f3968b4923d..9d1bef8106f 100644 --- a/pkg/services/sqlstore/sqlstore.go +++ b/pkg/services/sqlstore/sqlstore.go @@ -149,27 +149,3 @@ func LoadConfig() { DbCfg.SslMode = sec.Key("ssl_mode").String() DbCfg.Path = sec.Key("path").MustString("data/grafana.db") } - -type dbTransactionFunc func(sess *xorm.Session) error - -func inTransaction(callback dbTransactionFunc) error { - var err error - - sess := x.NewSession() - defer sess.Close() - - if err = sess.Begin(); err != nil { - return err - } - - err = callback(sess) - - if err != nil { - sess.Rollback() - return err - } else if err = sess.Commit(); err != nil { - return err - } - - return nil -} diff --git a/pkg/setting/setting.go b/pkg/setting/setting.go index 64df98603b0..50a78c8a793 100644 --- a/pkg/setting/setting.go +++ b/pkg/setting/setting.go @@ -93,12 +93,6 @@ var ( // PhantomJs Rendering ImagesDir string PhantomDir string - - //Notifications - NotificationsEnabled bool - RabbitmqUrl string - NotificationsExchange string - ) func init() { @@ -228,7 +222,7 @@ func NewConfigContext() { // Notifications NotificationsEnabled = Cfg.Section("notifications").Key("enabled").MustBool(false) RabbitmqUrl = Cfg.Section("notifications").Key("rabbitmq_url").MustString("amqp://localhost/") - + // validate rabbitmqUrl. _, err = url.Parse(RabbitmqUrl) if err != nil { From dace35d31daaf2ad258c47a5bb05393c2d196931 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Torkel=20=C3=96degaard?= Date: Wed, 4 Feb 2015 15:41:40 +0100 Subject: [PATCH 3/5] Missed setting account name --- pkg/services/sqlstore/account.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/services/sqlstore/account.go b/pkg/services/sqlstore/account.go index 4fddee071c0..30495199d6a 100644 --- a/pkg/services/sqlstore/account.go +++ b/pkg/services/sqlstore/account.go @@ -72,7 +72,9 @@ func CreateAccount(cmd *m.CreateAccountCommand) error { _, err := sess.Insert(&user) cmd.Result = account - sess.publishAfterCommit(&events.AccountCreated{}) + sess.publishAfterCommit(&events.AccountCreated{ + Name: account.Name, + }) // silently ignore failures to publish events. _ = bus.Publish(&m.Notification{ From 525179eb85262cdcb71ed29515d9e7b1b965494c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Torkel=20=C3=96degaard?= Date: Wed, 4 Feb 2015 16:57:20 +0100 Subject: [PATCH 4/5] Added on wire event format --- pkg/events/events.go | 57 ++++++++++++++++++- pkg/events/events_test.go | 20 +++++-- pkg/services/eventpublisher/eventpublisher.go | 11 ++-- pkg/services/sqlstore/account.go | 35 +++++------- pkg/setting/setting.go | 11 ---- 5 files changed, 92 insertions(+), 42 deletions(-) diff --git a/pkg/events/events.go b/pkg/events/events.go index a86af406230..cba16a9cc46 100644 --- a/pkg/events/events.go +++ b/pkg/events/events.go @@ -1,8 +1,63 @@ package events +import ( + "reflect" + "time" +) + // Events can be passed to external systems via for example AMPQ // Treat these events as basically DTOs so changes has to be backward compatible +type Priority string + +const ( + PRIO_DEBUG Priority = "DEBUG" + PRIO_INFO Priority = "INFO" + PRIO_ERROR Priority = "ERROR" +) + +type Event struct { + Timestamp time.Time `json:"timestamp"` +} + +type OnTheWireEvent struct { + EventType string `json:"event_type"` + Priority Priority `json:"priority"` + Timestamp time.Time `json:"timestamp"` + Payload interface{} `json:"payload"` +} + +type EventBase interface { + ToOnWriteEvent() *OnTheWireEvent +} + +func ToOnWriteEvent(event interface{}) (*OnTheWireEvent, error) { + eventType := reflect.TypeOf(event) + + wireEvent := OnTheWireEvent{ + Priority: PRIO_INFO, + EventType: eventType.Name(), + Payload: event, + } + + baseField := reflect.ValueOf(event).FieldByName("Timestamp") + if baseField.IsValid() { + wireEvent.Timestamp = baseField.Interface().(time.Time) + } else { + wireEvent.Timestamp = time.Now() + } + + return &wireEvent, nil +} + type AccountCreated struct { - Name string `json:"name"` + Timestamp time.Time `json:"timestamp"` + Id int64 `json:"id"` + Name string `json:"name"` +} + +type AccountUpdated struct { + Timestamp time.Time `json:"timestamp"` + Id int64 `json:"id"` + Name string `json:"name"` } diff --git a/pkg/events/events_test.go b/pkg/events/events_test.go index f527569d0cf..36659eb9b45 100644 --- a/pkg/events/events_test.go +++ b/pkg/events/events_test.go @@ -1,18 +1,30 @@ package events import ( + "encoding/json" "testing" + "time" . "github.com/smartystreets/goconvey/convey" ) +type TestEvent struct { + Timestamp time.Time +} + func TestEventCreation(t *testing.T) { - Convey("When generating slug", t, func() { - dashboard := NewDashboard("Grafana Play Home") - dashboard.UpdateSlug() + Convey("Event to wire event", t, func() { + e := TestEvent{ + Timestamp: time.Unix(1231421123, 223), + } + + wire, _ := ToOnWriteEvent(e) + So(e.Timestamp.Unix(), ShouldEqual, wire.Timestamp.Unix()) + So(wire.EventType, ShouldEqual, "TestEvent") - So(dashboard.Slug, ShouldEqual, "grafana-play-home") + json, _ := json.Marshal(wire) + So(string(json), ShouldEqual, `{"event_type":"TestEvent","priority":"INFO","timestamp":"2009-01-08T14:25:23.000000223+01:00","payload":{"Timestamp":"2009-01-08T14:25:23.000000223+01:00"}}`) }) } diff --git a/pkg/services/eventpublisher/eventpublisher.go b/pkg/services/eventpublisher/eventpublisher.go index 59c03f3c09a..33983ea79a5 100644 --- a/pkg/services/eventpublisher/eventpublisher.go +++ b/pkg/services/eventpublisher/eventpublisher.go @@ -4,11 +4,11 @@ import ( "encoding/json" "fmt" "log" - "reflect" "time" "github.com/streadway/amqp" "github.com/torkelo/grafana-pro/pkg/bus" + "github.com/torkelo/grafana-pro/pkg/events" "github.com/torkelo/grafana-pro/pkg/setting" ) @@ -131,14 +131,17 @@ func publish(routingKey string, msgString []byte) { } func eventListener(event interface{}) error { - msgString, err := json.Marshal(event) + wireEvent, err := events.ToOnWriteEvent(event) if err != nil { return err } - eventType := reflect.TypeOf(event) + msgString, err := json.Marshal(wireEvent) + if err != nil { + return err + } - routingKey := fmt.Sprintf("%s.%s", "INFO", eventType.Name()) + routingKey := fmt.Sprintf("%s.%s", wireEvent.Priority, wireEvent.EventType) // this is run in a greenthread and we expect that publish will keep // retrying until the message gets sent. go publish(routingKey, msgString) diff --git a/pkg/services/sqlstore/account.go b/pkg/services/sqlstore/account.go index 30495199d6a..960c2b5b3a5 100644 --- a/pkg/services/sqlstore/account.go +++ b/pkg/services/sqlstore/account.go @@ -3,8 +3,6 @@ package sqlstore import ( "time" - "github.com/go-xorm/xorm" - "github.com/torkelo/grafana-pro/pkg/bus" "github.com/torkelo/grafana-pro/pkg/events" m "github.com/torkelo/grafana-pro/pkg/models" @@ -73,15 +71,9 @@ func CreateAccount(cmd *m.CreateAccountCommand) error { cmd.Result = account sess.publishAfterCommit(&events.AccountCreated{ - Name: account.Name, - }) - - // silently ignore failures to publish events. - _ = bus.Publish(&m.Notification{ - EventType: "account.create", Timestamp: account.Created, - Priority: m.PRIO_INFO, - Payload: account, + Id: account.Id, + Name: account.Name, }) return err @@ -89,24 +81,23 @@ func CreateAccount(cmd *m.CreateAccountCommand) error { } func UpdateAccount(cmd *m.UpdateAccountCommand) error { - return inTransaction(func(sess *xorm.Session) error { + return inTransaction2(func(sess *session) error { account := m.Account{ Name: cmd.Name, Updated: time.Now(), } - _, err := sess.Id(cmd.AccountId).Update(&account) - if err == nil { - // silently ignore failures to publish events. - account.Id = cmd.AccountId - _ = bus.Publish(&m.Notification{ - EventType: "account.update", - Timestamp: account.Updated, - Priority: m.PRIO_INFO, - Payload: account, - }) + if _, err := sess.Id(cmd.AccountId).Update(&account); err != nil { + return err } - return err + + sess.publishAfterCommit(events.AccountUpdated{ + Timestamp: account.Updated, + Id: account.Id, + Name: account.Name, + }) + + return nil }) } diff --git a/pkg/setting/setting.go b/pkg/setting/setting.go index 50a78c8a793..3b65790295e 100644 --- a/pkg/setting/setting.go +++ b/pkg/setting/setting.go @@ -219,17 +219,6 @@ func NewConfigContext() { LogRootPath = Cfg.Section("log").Key("root_path").MustString(path.Join(WorkDir, "/data/log")) - // Notifications - NotificationsEnabled = Cfg.Section("notifications").Key("enabled").MustBool(false) - RabbitmqUrl = Cfg.Section("notifications").Key("rabbitmq_url").MustString("amqp://localhost/") - - // validate rabbitmqUrl. - _, err = url.Parse(RabbitmqUrl) - if err != nil { - log.Fatal(4, "Invalid rabbitmq_url(%s): %s", RabbitmqUrl, err) - } - NotificationsExchange = Cfg.Section("notifications").Key("notifications_exchange").MustString("notifications") - readSessionConfig() } From d8db5189c1e9eed55c8e8e68712ecec8de516ff9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Torkel=20=C3=96degaard?= Date: Wed, 4 Feb 2015 17:15:05 +0100 Subject: [PATCH 5/5] More work on events, still have to convert pascal case event type name to rabbitmq dot notation, but after that should be done --- pkg/cmd/web.go | 1 - pkg/events/events.go | 16 +++++++++++ pkg/models/notification.go | 21 -------------- pkg/services/sqlstore/account.go | 2 +- pkg/services/sqlstore/sqlstore.go | 5 ++-- pkg/services/sqlstore/user.go | 48 +++++++++++++++++-------------- 6 files changed, 47 insertions(+), 46 deletions(-) delete mode 100644 pkg/models/notification.go diff --git a/pkg/cmd/web.go b/pkg/cmd/web.go index dfc5ff41bb3..a5c760634d0 100644 --- a/pkg/cmd/web.go +++ b/pkg/cmd/web.go @@ -85,7 +85,6 @@ func runWeb(c *cli.Context) { eventpublisher.Init() var err error - m := newMacaron() api.Register(m) diff --git a/pkg/events/events.go b/pkg/events/events.go index cba16a9cc46..9101a9236e3 100644 --- a/pkg/events/events.go +++ b/pkg/events/events.go @@ -61,3 +61,19 @@ type AccountUpdated struct { Id int64 `json:"id"` Name string `json:"name"` } + +type UserCreated struct { + Timestamp time.Time `json:"timestamp"` + Id int64 `json:"id"` + Name string `json:"name"` + Login string `json:"login"` + Email string `json:"email"` +} + +type UserUpdated struct { + Timestamp time.Time `json:"timestamp"` + Id int64 `json:"id"` + Name string `json:"name"` + Login string `json:"login"` + Email string `json:"email"` +} diff --git a/pkg/models/notification.go b/pkg/models/notification.go deleted file mode 100644 index f849c4f6274..00000000000 --- a/pkg/models/notification.go +++ /dev/null @@ -1,21 +0,0 @@ -package models - -import ( - "time" -) - -type EventPriority string - -const ( - PRIO_DEBUG EventPriority = "DEBUG" - PRIO_INFO EventPriority = "INFO" - PRIO_ERROR EventPriority = "ERROR" -) - -type Notification struct { - EventType string `json:"event_type"` - Timestamp time.Time `json:"timestamp"` - Priority EventPriority `json:"priority"` - Payload interface{} `json:"payload"` -} - diff --git a/pkg/services/sqlstore/account.go b/pkg/services/sqlstore/account.go index 960c2b5b3a5..c2154624d62 100644 --- a/pkg/services/sqlstore/account.go +++ b/pkg/services/sqlstore/account.go @@ -92,7 +92,7 @@ func UpdateAccount(cmd *m.UpdateAccountCommand) error { return err } - sess.publishAfterCommit(events.AccountUpdated{ + sess.publishAfterCommit(&events.AccountUpdated{ Timestamp: account.Updated, Id: account.Id, Name: account.Name, diff --git a/pkg/services/sqlstore/sqlstore.go b/pkg/services/sqlstore/sqlstore.go index 9d1bef8106f..9200ef6ec2c 100644 --- a/pkg/services/sqlstore/sqlstore.go +++ b/pkg/services/sqlstore/sqlstore.go @@ -43,12 +43,13 @@ func EnsureAdminUser() { cmd.IsAdmin = true if err = bus.Dispatch(&cmd); err != nil { - log.Fatal(3, "Failed to create default admin user", err) + log.Error(3, "Failed to create default admin user", err) + return } log.Info("Created default admin user: %v", setting.AdminUser) } else if err != nil { - log.Fatal(3, "Could not determine if admin user exists: %v", err) + log.Error(3, "Could not determine if admin user exists: %v", err) } } diff --git a/pkg/services/sqlstore/user.go b/pkg/services/sqlstore/user.go index c03057705f5..ffb73d9cf0b 100644 --- a/pkg/services/sqlstore/user.go +++ b/pkg/services/sqlstore/user.go @@ -7,6 +7,7 @@ import ( "github.com/go-xorm/xorm" "github.com/torkelo/grafana-pro/pkg/bus" + "github.com/torkelo/grafana-pro/pkg/events" m "github.com/torkelo/grafana-pro/pkg/models" "github.com/torkelo/grafana-pro/pkg/setting" "github.com/torkelo/grafana-pro/pkg/util" @@ -23,7 +24,7 @@ func init() { bus.AddHandler("sql", GetUserAccounts) } -func getAccountIdForNewUser(userEmail string, sess *xorm.Session) (int64, error) { +func getAccountIdForNewUser(userEmail string, sess *session) (int64, error) { var account m.Account if setting.SingleAccountMode { @@ -51,7 +52,7 @@ func getAccountIdForNewUser(userEmail string, sess *xorm.Session) (int64, error) } func CreateUser(cmd *m.CreateUserCommand) error { - return inTransaction(func(sess *xorm.Session) error { + return inTransaction2(func(sess *session) error { accountId, err := getAccountIdForNewUser(cmd.Email, sess) if err != nil { return err @@ -92,16 +93,20 @@ func CreateUser(cmd *m.CreateUserCommand) error { accountUser.Role = m.RoleType(setting.DefaultAccountRole) } - _, err = sess.Insert(&accountUser) + if _, err = sess.Insert(&accountUser); err != nil { + return err + } - cmd.Result = user - _ = bus.Publish(&m.Notification{ - EventType: "user.create", + sess.publishAfterCommit(&events.UserCreated{ Timestamp: user.Created, - Priority: m.PRIO_INFO, - Payload: user, + Id: user.Id, + Name: user.Name, + Login: user.Login, + Email: user.Email, }) - return err + + cmd.Result = user + return nil }) } @@ -131,7 +136,7 @@ func GetUserByLogin(query *m.GetUserByLoginQuery) error { } func UpdateUser(cmd *m.UpdateUserCommand) error { - return inTransaction(func(sess *xorm.Session) error { + return inTransaction2(func(sess *session) error { user := m.User{ Name: cmd.Name, @@ -140,18 +145,19 @@ func UpdateUser(cmd *m.UpdateUserCommand) error { Updated: time.Now(), } - _, err := sess.Id(cmd.UserId).Update(&user) - if err == nil { - // silently ignore failures to publish events. - user.Id = cmd.UserId - _ = bus.Publish(&m.Notification{ - EventType: "user.update", - Timestamp: user.Updated, - Priority: m.PRIO_INFO, - Payload: user, - }) + if _, err := sess.Id(cmd.UserId).Update(&user); err != nil { + return err } - return err + + sess.publishAfterCommit(&events.UserUpdated{ + Timestamp: user.Created, + Id: user.Id, + Name: user.Name, + Login: user.Login, + Email: user.Email, + }) + + return nil }) }