diff --git a/conf/grafana.ini b/conf/grafana.ini index 4fc77206190..2d643191b4a 100644 --- a/conf/grafana.ini +++ b/conf/grafana.ini @@ -121,4 +121,7 @@ daily_rotate = true ; Expired days of log file(delete after max days), default is 7 max_days = 7 - +[event_publisher] +enabled = false +rabbitmq_url = amqp://localhost/ +exchange = grafana_events diff --git a/pkg/bus/bus.go b/pkg/bus/bus.go index 7afff22eb4e..2865c740fd1 100644 --- a/pkg/bus/bus.go +++ b/pkg/bus/bus.go @@ -11,13 +11,16 @@ type Msg interface{} type Bus interface { Dispatch(msg Msg) error Publish(msg Msg) error + AddHandler(handler HandlerFunc) AddEventListener(handler HandlerFunc) + AddWildcardListener(handler HandlerFunc) } type InProcBus struct { - handlers map[string]HandlerFunc - listeners map[string][]HandlerFunc + handlers map[string]HandlerFunc + listeners map[string][]HandlerFunc + wildcardListeners []HandlerFunc } // temp stuff, not sure how to handle bus instance, and init yet @@ -27,6 +30,7 @@ func New() Bus { bus := &InProcBus{} bus.handlers = make(map[string]HandlerFunc) bus.listeners = make(map[string][]HandlerFunc) + bus.wildcardListeners = make([]HandlerFunc, 0) return bus } @@ -52,16 +56,20 @@ func (b *InProcBus) Dispatch(msg Msg) error { func (b *InProcBus) Publish(msg Msg) error { var msgName = reflect.TypeOf(msg).Elem().Name() - var listeners = b.listeners[msgName] - if len(listeners) == 0 { - return nil - } var params = make([]reflect.Value, 1) params[0] = reflect.ValueOf(msg) - for listenerHandler := range listeners { + for _, listenerHandler := range listeners { + ret := reflect.ValueOf(listenerHandler).Call(params) + err := ret[0].Interface() + if err != nil { + return err.(error) + } + } + + for _, listenerHandler := range b.wildcardListeners { ret := reflect.ValueOf(listenerHandler).Call(params) err := ret[0].Interface() if err != nil { @@ -72,6 +80,10 @@ func (b *InProcBus) Publish(msg Msg) error { return nil } +func (b *InProcBus) AddWildcardListener(handler HandlerFunc) { + b.wildcardListeners = append(b.wildcardListeners, handler) +} + func (b *InProcBus) AddHandler(handler HandlerFunc) { handlerType := reflect.TypeOf(handler) queryTypeName := handlerType.In(0).Elem().Name() @@ -81,12 +93,11 @@ func (b *InProcBus) AddHandler(handler HandlerFunc) { func (b *InProcBus) AddEventListener(handler HandlerFunc) { handlerType := reflect.TypeOf(handler) eventName := handlerType.In(0).Elem().Name() - list, exists := b.listeners[eventName] + _, exists := b.listeners[eventName] if !exists { - list = make([]HandlerFunc, 0) - b.listeners[eventName] = list + b.listeners[eventName] = make([]HandlerFunc, 0) } - list = append(list, handler) + b.listeners[eventName] = append(b.listeners[eventName], handler) } // Package level functions @@ -99,6 +110,14 @@ func AddEventListener(handler HandlerFunc) { globalBus.AddEventListener(handler) } +func AddWildcardListener(handler HandlerFunc) { + globalBus.AddWildcardListener(handler) +} + func Dispatch(msg Msg) error { return globalBus.Dispatch(msg) } + +func Publish(msg Msg) error { + return globalBus.Publish(msg) +} diff --git a/pkg/bus/bus_test.go b/pkg/bus/bus_test.go index 45191eeb682..62e72f18308 100644 --- a/pkg/bus/bus_test.go +++ b/pkg/bus/bus_test.go @@ -2,6 +2,7 @@ package bus import ( "errors" + "fmt" "testing" ) @@ -62,7 +63,7 @@ func TestEventListeners(t *testing.T) { if err != nil { t.Fatal("Publish event failed " + err.Error()) - } else if count != 0 { - t.Fatal("Publish event failed, listeners called: %v, expected: %v", count, 11) + } else if count != 11 { + t.Fatal(fmt.Sprintf("Publish event failed, listeners called: %v, expected: %v", count, 11)) } } diff --git a/pkg/cmd/web.go b/pkg/cmd/web.go index 73bd64d5f10..a5c760634d0 100644 --- a/pkg/cmd/web.go +++ b/pkg/cmd/web.go @@ -16,6 +16,7 @@ import ( "github.com/torkelo/grafana-pro/pkg/api" "github.com/torkelo/grafana-pro/pkg/log" "github.com/torkelo/grafana-pro/pkg/middleware" + "github.com/torkelo/grafana-pro/pkg/services/eventpublisher" "github.com/torkelo/grafana-pro/pkg/services/sqlstore" "github.com/torkelo/grafana-pro/pkg/setting" "github.com/torkelo/grafana-pro/pkg/social" @@ -81,11 +82,12 @@ func runWeb(c *cli.Context) { social.NewOAuthService() sqlstore.NewEngine() sqlstore.EnsureAdminUser() + eventpublisher.Init() + var err error m := newMacaron() api.Register(m) - var err error listenAddr := fmt.Sprintf("%s:%s", setting.HttpAddr, setting.HttpPort) log.Info("Listen: %v://%s%s", setting.Protocol, listenAddr, setting.AppSubUrl) switch setting.Protocol { diff --git a/pkg/events/events.go b/pkg/events/events.go new file mode 100644 index 00000000000..9101a9236e3 --- /dev/null +++ b/pkg/events/events.go @@ -0,0 +1,79 @@ +package events + +import ( + "reflect" + "time" +) + +// Events can be passed to external systems via for example AMPQ +// Treat these events as basically DTOs so changes has to be backward compatible + +type Priority string + +const ( + PRIO_DEBUG Priority = "DEBUG" + PRIO_INFO Priority = "INFO" + PRIO_ERROR Priority = "ERROR" +) + +type Event struct { + Timestamp time.Time `json:"timestamp"` +} + +type OnTheWireEvent struct { + EventType string `json:"event_type"` + Priority Priority `json:"priority"` + Timestamp time.Time `json:"timestamp"` + Payload interface{} `json:"payload"` +} + +type EventBase interface { + ToOnWriteEvent() *OnTheWireEvent +} + +func ToOnWriteEvent(event interface{}) (*OnTheWireEvent, error) { + eventType := reflect.TypeOf(event) + + wireEvent := OnTheWireEvent{ + Priority: PRIO_INFO, + EventType: eventType.Name(), + Payload: event, + } + + baseField := reflect.ValueOf(event).FieldByName("Timestamp") + if baseField.IsValid() { + wireEvent.Timestamp = baseField.Interface().(time.Time) + } else { + wireEvent.Timestamp = time.Now() + } + + return &wireEvent, nil +} + +type AccountCreated struct { + Timestamp time.Time `json:"timestamp"` + Id int64 `json:"id"` + Name string `json:"name"` +} + +type AccountUpdated struct { + Timestamp time.Time `json:"timestamp"` + Id int64 `json:"id"` + Name string `json:"name"` +} + +type UserCreated struct { + Timestamp time.Time `json:"timestamp"` + Id int64 `json:"id"` + Name string `json:"name"` + Login string `json:"login"` + Email string `json:"email"` +} + +type UserUpdated struct { + Timestamp time.Time `json:"timestamp"` + Id int64 `json:"id"` + Name string `json:"name"` + Login string `json:"login"` + Email string `json:"email"` +} diff --git a/pkg/events/events_test.go b/pkg/events/events_test.go new file mode 100644 index 00000000000..36659eb9b45 --- /dev/null +++ b/pkg/events/events_test.go @@ -0,0 +1,30 @@ +package events + +import ( + "encoding/json" + "testing" + "time" + + . "github.com/smartystreets/goconvey/convey" +) + +type TestEvent struct { + Timestamp time.Time +} + +func TestEventCreation(t *testing.T) { + + Convey("Event to wire event", t, func() { + e := TestEvent{ + Timestamp: time.Unix(1231421123, 223), + } + + wire, _ := ToOnWriteEvent(e) + So(e.Timestamp.Unix(), ShouldEqual, wire.Timestamp.Unix()) + So(wire.EventType, ShouldEqual, "TestEvent") + + json, _ := json.Marshal(wire) + So(string(json), ShouldEqual, `{"event_type":"TestEvent","priority":"INFO","timestamp":"2009-01-08T14:25:23.000000223+01:00","payload":{"Timestamp":"2009-01-08T14:25:23.000000223+01:00"}}`) + }) + +} diff --git a/pkg/services/eventpublisher/eventpublisher.go b/pkg/services/eventpublisher/eventpublisher.go new file mode 100644 index 00000000000..33983ea79a5 --- /dev/null +++ b/pkg/services/eventpublisher/eventpublisher.go @@ -0,0 +1,149 @@ +package eventpublisher + +import ( + "encoding/json" + "fmt" + "log" + "time" + + "github.com/streadway/amqp" + "github.com/torkelo/grafana-pro/pkg/bus" + "github.com/torkelo/grafana-pro/pkg/events" + "github.com/torkelo/grafana-pro/pkg/setting" +) + +var ( + url string + exchange string + conn *amqp.Connection + channel *amqp.Channel +) + +func getConnection() (*amqp.Connection, error) { + c, err := amqp.Dial(url) + if err != nil { + return nil, err + } + return c, err +} + +func getChannel() (*amqp.Channel, error) { + ch, err := conn.Channel() + if err != nil { + return nil, err + } + + err = ch.ExchangeDeclare( + exchange, // name + "topic", // type + true, // durable + false, // auto-deleted + false, // internal + false, // no-wait + nil, // arguments + ) + if err != nil { + return nil, err + } + return ch, err +} + +func Init() { + sec := setting.Cfg.Section("event_publisher") + + if !sec.Key("enabled").MustBool(false) { + return + } + + url = sec.Key("rabbitmq_url").String() + exchange = sec.Key("exchange").String() + bus.AddWildcardListener(eventListener) + + if err := Setup(); err != nil { + log.Fatal(4, "Failed to connect to notification queue: %v", err) + return + } +} + +// Every connection should declare the topology they expect +func Setup() error { + c, err := getConnection() + if err != nil { + return err + } + conn = c + ch, err := getChannel() + if err != nil { + return err + } + + channel = ch + + // listen for close events so we can reconnect. + errChan := channel.NotifyClose(make(chan *amqp.Error)) + go func() { + for e := range errChan { + fmt.Println("connection to rabbitmq lost.") + fmt.Println(e) + fmt.Println("attempting to create new rabbitmq channel.") + ch, err := getChannel() + if err == nil { + channel = ch + break + } + + //could not create channel, so lets close the connection + // and re-create. + _ = conn.Close() + + for err != nil { + time.Sleep(2 * time.Second) + fmt.Println("attempting to reconnect to rabbitmq.") + err = Setup() + } + fmt.Println("Connected to rabbitmq again.") + } + }() + + return nil +} + +func publish(routingKey string, msgString []byte) { + err := channel.Publish( + exchange, //exchange + routingKey, // routing key + false, // mandatory + false, // immediate + amqp.Publishing{ + ContentType: "application/json", + Body: msgString, + }, + ) + if err != nil { + // failures are most likely because the connection was lost. + // the connection will be re-established, so just keep + // retrying every 2seconds until we successfully publish. + time.Sleep(2 * time.Second) + fmt.Println("publish failed, retrying.") + publish(routingKey, msgString) + } + return +} + +func eventListener(event interface{}) error { + wireEvent, err := events.ToOnWriteEvent(event) + if err != nil { + return err + } + + msgString, err := json.Marshal(wireEvent) + if err != nil { + return err + } + + routingKey := fmt.Sprintf("%s.%s", wireEvent.Priority, wireEvent.EventType) + // this is run in a greenthread and we expect that publish will keep + // retrying until the message gets sent. + go publish(routingKey, msgString) + return nil +} diff --git a/pkg/services/sqlstore/account.go b/pkg/services/sqlstore/account.go index 89c5b1ac46a..c2154624d62 100644 --- a/pkg/services/sqlstore/account.go +++ b/pkg/services/sqlstore/account.go @@ -3,9 +3,8 @@ package sqlstore import ( "time" - "github.com/go-xorm/xorm" - "github.com/torkelo/grafana-pro/pkg/bus" + "github.com/torkelo/grafana-pro/pkg/events" m "github.com/torkelo/grafana-pro/pkg/models" ) @@ -48,7 +47,7 @@ func GetAccountByName(query *m.GetAccountByNameQuery) error { } func CreateAccount(cmd *m.CreateAccountCommand) error { - return inTransaction(func(sess *xorm.Session) error { + return inTransaction2(func(sess *session) error { account := m.Account{ Name: cmd.Name, @@ -60,7 +59,6 @@ func CreateAccount(cmd *m.CreateAccountCommand) error { return err } - // create inital admin account user user := m.AccountUser{ AccountId: account.Id, UserId: cmd.UserId, @@ -72,19 +70,34 @@ func CreateAccount(cmd *m.CreateAccountCommand) error { _, err := sess.Insert(&user) cmd.Result = account + sess.publishAfterCommit(&events.AccountCreated{ + Timestamp: account.Created, + Id: account.Id, + Name: account.Name, + }) + return err }) } func UpdateAccount(cmd *m.UpdateAccountCommand) error { - return inTransaction(func(sess *xorm.Session) error { + return inTransaction2(func(sess *session) error { account := m.Account{ Name: cmd.Name, Updated: time.Now(), } - _, err := sess.Id(cmd.AccountId).Update(&account) - return err + if _, err := sess.Id(cmd.AccountId).Update(&account); err != nil { + return err + } + + sess.publishAfterCommit(&events.AccountUpdated{ + Timestamp: account.Updated, + Id: account.Id, + Name: account.Name, + }) + + return nil }) } diff --git a/pkg/services/sqlstore/shared.go b/pkg/services/sqlstore/shared.go new file mode 100644 index 00000000000..51300167757 --- /dev/null +++ b/pkg/services/sqlstore/shared.go @@ -0,0 +1,71 @@ +package sqlstore + +import ( + "github.com/go-xorm/xorm" + "github.com/torkelo/grafana-pro/pkg/bus" + "github.com/torkelo/grafana-pro/pkg/log" +) + +type dbTransactionFunc func(sess *xorm.Session) error +type dbTransactionFunc2 func(sess *session) error + +type session struct { + *xorm.Session + events []interface{} +} + +func (sess *session) publishAfterCommit(msg interface{}) { + sess.events = append(sess.events, msg) +} + +func inTransaction(callback dbTransactionFunc) error { + var err error + + sess := x.NewSession() + defer sess.Close() + + if err = sess.Begin(); err != nil { + return err + } + + err = callback(sess) + + if err != nil { + sess.Rollback() + return err + } else if err = sess.Commit(); err != nil { + return err + } + + return nil +} + +func inTransaction2(callback dbTransactionFunc2) error { + var err error + + sess := session{Session: x.NewSession()} + + defer sess.Close() + if err = sess.Begin(); err != nil { + return err + } + + err = callback(&sess) + + if err != nil { + sess.Rollback() + return err + } else if err = sess.Commit(); err != nil { + return err + } + + if len(sess.events) > 0 { + for _, e := range sess.events { + if err = bus.Publish(e); err != nil { + log.Error(3, "Failed to publish event after commit", err) + } + } + } + + return nil +} diff --git a/pkg/services/sqlstore/sqlstore.go b/pkg/services/sqlstore/sqlstore.go index f3968b4923d..9200ef6ec2c 100644 --- a/pkg/services/sqlstore/sqlstore.go +++ b/pkg/services/sqlstore/sqlstore.go @@ -43,12 +43,13 @@ func EnsureAdminUser() { cmd.IsAdmin = true if err = bus.Dispatch(&cmd); err != nil { - log.Fatal(3, "Failed to create default admin user", err) + log.Error(3, "Failed to create default admin user", err) + return } log.Info("Created default admin user: %v", setting.AdminUser) } else if err != nil { - log.Fatal(3, "Could not determine if admin user exists: %v", err) + log.Error(3, "Could not determine if admin user exists: %v", err) } } @@ -149,27 +150,3 @@ func LoadConfig() { DbCfg.SslMode = sec.Key("ssl_mode").String() DbCfg.Path = sec.Key("path").MustString("data/grafana.db") } - -type dbTransactionFunc func(sess *xorm.Session) error - -func inTransaction(callback dbTransactionFunc) error { - var err error - - sess := x.NewSession() - defer sess.Close() - - if err = sess.Begin(); err != nil { - return err - } - - err = callback(sess) - - if err != nil { - sess.Rollback() - return err - } else if err = sess.Commit(); err != nil { - return err - } - - return nil -} diff --git a/pkg/services/sqlstore/user.go b/pkg/services/sqlstore/user.go index 0d68070e82f..8287f89106f 100644 --- a/pkg/services/sqlstore/user.go +++ b/pkg/services/sqlstore/user.go @@ -7,6 +7,7 @@ import ( "github.com/go-xorm/xorm" "github.com/torkelo/grafana-pro/pkg/bus" + "github.com/torkelo/grafana-pro/pkg/events" m "github.com/torkelo/grafana-pro/pkg/models" "github.com/torkelo/grafana-pro/pkg/setting" "github.com/torkelo/grafana-pro/pkg/util" @@ -23,7 +24,7 @@ func init() { bus.AddHandler("sql", GetUserAccounts) } -func getAccountIdForNewUser(userEmail string, sess *xorm.Session) (int64, error) { +func getAccountIdForNewUser(userEmail string, sess *session) (int64, error) { var account m.Account if setting.SingleAccountMode { @@ -51,7 +52,7 @@ func getAccountIdForNewUser(userEmail string, sess *xorm.Session) (int64, error) } func CreateUser(cmd *m.CreateUserCommand) error { - return inTransaction(func(sess *xorm.Session) error { + return inTransaction2(func(sess *session) error { accountId, err := getAccountIdForNewUser(cmd.Email, sess) if err != nil { return err @@ -94,10 +95,20 @@ func CreateUser(cmd *m.CreateUserCommand) error { accountUser.Role = m.RoleType(setting.DefaultAccountRole) } - _, err = sess.Insert(&accountUser) + if _, err = sess.Insert(&accountUser); err != nil { + return err + } + + sess.publishAfterCommit(&events.UserCreated{ + Timestamp: user.Created, + Id: user.Id, + Name: user.Name, + Login: user.Login, + Email: user.Email, + }) cmd.Result = user - return err + return nil }) } @@ -127,7 +138,7 @@ func GetUserByLogin(query *m.GetUserByLoginQuery) error { } func UpdateUser(cmd *m.UpdateUserCommand) error { - return inTransaction(func(sess *xorm.Session) error { + return inTransaction2(func(sess *session) error { user := m.User{ Name: cmd.Name, @@ -136,8 +147,19 @@ func UpdateUser(cmd *m.UpdateUserCommand) error { Updated: time.Now(), } - _, err := sess.Id(cmd.UserId).Update(&user) - return err + if _, err := sess.Id(cmd.UserId).Update(&user); err != nil { + return err + } + + sess.publishAfterCommit(&events.UserUpdated{ + Timestamp: user.Created, + Id: user.Id, + Name: user.Name, + Login: user.Login, + Email: user.Email, + }) + + return nil }) }