From 8143610024ef01729a850659072bab83fe5694c1 Mon Sep 17 00:00:00 2001 From: bergquist Date: Tue, 5 Jun 2018 21:13:53 +0200 Subject: [PATCH 01/16] bus: support multiple dispatch in one transaction this makes it possible to run multiple DispatchCtx in one transaction. The TransactionManager will start/end the transaction and pass the dbsession in the context.Context variable --- pkg/bus/bus.go | 50 +++++++++++++++++++++++++ pkg/services/alerting/notifiers/base.go | 1 + pkg/services/sqlstore/shared.go | 28 +++++++++++++- pkg/services/sqlstore/sqlstore.go | 49 +++++++++++++++++++++++- pkg/services/sqlstore/stats.go | 18 +++++++++ 5 files changed, 144 insertions(+), 2 deletions(-) diff --git a/pkg/bus/bus.go b/pkg/bus/bus.go index 32a591b6672..98279678777 100644 --- a/pkg/bus/bus.go +++ b/pkg/bus/bus.go @@ -12,21 +12,51 @@ type Msg interface{} var ErrHandlerNotFound = errors.New("handler not found") +type TransactionManager interface { + Begin(ctx context.Context) (context.Context, error) + End(ctx context.Context, err error) error +} + type Bus interface { Dispatch(msg Msg) error DispatchCtx(ctx context.Context, msg Msg) error Publish(msg Msg) error + // InTransaction starts a transaction and store it in the context. + // The caller can then pass a function with multiple DispatchCtx calls that + // all will be executed in the same transaction. InTransaction will rollback if the + // callback returns an error.s + InTransaction(ctx context.Context, fn func(ctx context.Context) error) error + AddHandler(handler HandlerFunc) AddCtxHandler(handler HandlerFunc) AddEventListener(handler HandlerFunc) AddWildcardListener(handler HandlerFunc) + + // SetTransactionManager allows the user to replace the internal + // noop TransactionManager that is responsible for manageing + // transactions in `InTransaction` + SetTransactionManager(tm TransactionManager) +} + +func (b *InProcBus) InTransaction(ctx context.Context, fn func(ctx context.Context) error) error { + ctxWithTran, err := b.transactionManager.Begin(ctx) + if err != nil { + return err + } + + err = fn(ctxWithTran) + b.transactionManager.End(ctxWithTran, err) + + return err } type InProcBus struct { handlers map[string]HandlerFunc listeners map[string][]HandlerFunc wildcardListeners []HandlerFunc + + transactionManager TransactionManager } // temp stuff, not sure how to handle bus instance, and init yet @@ -37,6 +67,9 @@ func New() Bus { bus.handlers = make(map[string]HandlerFunc) bus.listeners = make(map[string][]HandlerFunc) bus.wildcardListeners = make([]HandlerFunc, 0) + + bus.transactionManager = &NoopTransactionManager{} + return bus } @@ -45,6 +78,14 @@ func GetBus() Bus { return globalBus } +func SetTransactionManager(tm TransactionManager) { + globalBus.SetTransactionManager(tm) +} + +func (b *InProcBus) SetTransactionManager(tm TransactionManager) { + b.transactionManager = tm +} + func (b *InProcBus) DispatchCtx(ctx context.Context, msg Msg) error { var msgName = reflect.TypeOf(msg).Elem().Name() @@ -167,6 +208,15 @@ func Publish(msg Msg) error { return globalBus.Publish(msg) } +func InTransaction(ctx context.Context, fn func(ctx context.Context) error) error { + return globalBus.InTransaction(ctx, fn) +} + func ClearBusHandlers() { globalBus = New() } + +type NoopTransactionManager struct{} + +func (*NoopTransactionManager) Begin(ctx context.Context) (context.Context, error) { return ctx, nil } +func (*NoopTransactionManager) End(ctx context.Context, err error) error { return err } diff --git a/pkg/services/alerting/notifiers/base.go b/pkg/services/alerting/notifiers/base.go index 51676efdfd5..868db3aec79 100644 --- a/pkg/services/alerting/notifiers/base.go +++ b/pkg/services/alerting/notifiers/base.go @@ -3,6 +3,7 @@ package notifiers import ( "github.com/grafana/grafana/pkg/components/simplejson" m "github.com/grafana/grafana/pkg/models" + "github.com/grafana/grafana/pkg/services/alerting" ) diff --git a/pkg/services/sqlstore/shared.go b/pkg/services/sqlstore/shared.go index 9a24a513aad..3ccb92f010f 100644 --- a/pkg/services/sqlstore/shared.go +++ b/pkg/services/sqlstore/shared.go @@ -1,6 +1,7 @@ package sqlstore import ( + "context" "reflect" "time" @@ -29,10 +30,35 @@ func inTransaction(callback dbTransactionFunc) error { return inTransactionWithRetry(callback, 0) } +func startSession(ctx context.Context) *DBSession { + value := ctx.Value(ContextSessionName) + var sess *xorm.Session + sess, ok := value.(*xorm.Session) + + if !ok { + return newSession() + } + + old := newSession() + old.Session = sess + + return old +} + +func withDbSession(ctx context.Context, callback dbTransactionFunc) error { + sess := startSession(ctx) + + return callback(sess) +} + func inTransactionWithRetry(callback dbTransactionFunc, retry int) error { + return inTransactionWithRetryCtx(context.Background(), callback, retry) +} + +func inTransactionWithRetryCtx(ctx context.Context, callback dbTransactionFunc, retry int) error { var err error - sess := newSession() + sess := startSession(ctx) defer sess.Close() if err = sess.Begin(); err != nil { diff --git a/pkg/services/sqlstore/sqlstore.go b/pkg/services/sqlstore/sqlstore.go index ed82829665f..bfe462f4d91 100644 --- a/pkg/services/sqlstore/sqlstore.go +++ b/pkg/services/sqlstore/sqlstore.go @@ -1,6 +1,8 @@ package sqlstore import ( + "context" + "errors" "fmt" "net/url" "os" @@ -35,6 +37,8 @@ var ( sqlog log.Logger = log.New("sqlstore") ) +const ContextSessionName = "db-session" + func init() { registry.Register(®istry.Descriptor{ Name: "SqlStore", @@ -45,6 +49,7 @@ func init() { type SqlStore struct { Cfg *setting.Cfg `inject:""` + Bus bus.Bus `inject:""` dbCfg DatabaseConfig engine *xorm.Engine @@ -77,6 +82,10 @@ func (ss *SqlStore) Init() error { // Init repo instances annotations.SetRepository(&SqlAnnotationRepo{}) + ss.Bus.SetTransactionManager(&SQLTransactionManager{ + engine: ss.engine, + }) + // ensure admin user if ss.skipEnsureAdmin { return nil @@ -85,10 +94,47 @@ func (ss *SqlStore) Init() error { return ss.ensureAdminUser() } +type SQLTransactionManager struct { + engine *xorm.Engine +} + +func (stm *SQLTransactionManager) Begin(ctx context.Context) (context.Context, error) { + sess := stm.engine.NewSession() + err := sess.Begin() + if err != nil { + return ctx, err + } + + withValue := context.WithValue(ctx, ContextSessionName, sess) + + return withValue, nil +} + +func (stm *SQLTransactionManager) End(ctx context.Context, err error) error { + value := ctx.Value(ContextSessionName) + sess, ok := value.(*xorm.Session) + if !ok { + return errors.New("context is missing transaction") + } + + if err != nil { + sess.Rollback() + return err + } + + defer sess.Close() + + return sess.Commit() +} + func (ss *SqlStore) ensureAdminUser() error { systemUserCountQuery := m.GetSystemUserCountStatsQuery{} - if err := bus.Dispatch(&systemUserCountQuery); err != nil { + err := bus.InTransaction(context.Background(), func(ctx context.Context) error { + return bus.DispatchCtx(ctx, &systemUserCountQuery) + }) + + if err != nil { return fmt.Errorf("Could not determine if admin user exists: %v", err) } @@ -240,6 +286,7 @@ func (ss *SqlStore) readConfig() { func InitTestDB(t *testing.T) *SqlStore { sqlstore := &SqlStore{} sqlstore.skipEnsureAdmin = true + sqlstore.Bus = bus.New() dbType := migrator.SQLITE diff --git a/pkg/services/sqlstore/stats.go b/pkg/services/sqlstore/stats.go index 3e3e83c4014..af4482d9e25 100644 --- a/pkg/services/sqlstore/stats.go +++ b/pkg/services/sqlstore/stats.go @@ -1,6 +1,7 @@ package sqlstore import ( + "context" "time" "github.com/grafana/grafana/pkg/bus" @@ -13,6 +14,7 @@ func init() { bus.AddHandler("sql", GetDataSourceAccessStats) bus.AddHandler("sql", GetAdminStats) bus.AddHandler("sql", GetSystemUserCountStats) + bus.AddCtxHandler("sql", GetSystemUserCountStatsCtx) } var activeUserTimeLimit = time.Hour * 24 * 30 @@ -133,6 +135,22 @@ func GetAdminStats(query *m.GetAdminStatsQuery) error { return err } +func GetSystemUserCountStatsCtx(ctx context.Context, query *m.GetSystemUserCountStatsQuery) error { + return withDbSession(ctx, func(sess *DBSession) error { + + var rawSql = `SELECT COUNT(id) AS Count FROM ` + dialect.Quote("user") + var stats m.SystemUserCountStats + _, err := sess.SQL(rawSql).Get(&stats) + if err != nil { + return err + } + + query.Result = &stats + + return err + }) +} + func GetSystemUserCountStats(query *m.GetSystemUserCountStatsQuery) error { var rawSql = `SELECT COUNT(id) AS Count FROM ` + dialect.Quote("user") var stats m.SystemUserCountStats From 263572813a493ac34618075bedee5c0bac92df19 Mon Sep 17 00:00:00 2001 From: bergquist Date: Thu, 7 Jun 2018 18:02:28 +0200 Subject: [PATCH 02/16] replace begin/end with wrapper function --- pkg/bus/bus.go | 40 +++++++++++++--------------- pkg/services/sqlstore/shared.go | 13 +++++---- pkg/services/sqlstore/sqlstore.go | 44 ++++++++++++++++++++----------- 3 files changed, 52 insertions(+), 45 deletions(-) diff --git a/pkg/bus/bus.go b/pkg/bus/bus.go index 98279678777..0f10dfd9b17 100644 --- a/pkg/bus/bus.go +++ b/pkg/bus/bus.go @@ -12,9 +12,8 @@ type Msg interface{} var ErrHandlerNotFound = errors.New("handler not found") -type TransactionManager interface { - Begin(ctx context.Context) (context.Context, error) - End(ctx context.Context, err error) error +type TransactionWrapper interface { + Wrapp(ctx context.Context, fn func(ctx context.Context) error) error } type Bus interface { @@ -25,7 +24,7 @@ type Bus interface { // InTransaction starts a transaction and store it in the context. // The caller can then pass a function with multiple DispatchCtx calls that // all will be executed in the same transaction. InTransaction will rollback if the - // callback returns an error.s + // callback returns an error. InTransaction(ctx context.Context, fn func(ctx context.Context) error) error AddHandler(handler HandlerFunc) @@ -36,19 +35,11 @@ type Bus interface { // SetTransactionManager allows the user to replace the internal // noop TransactionManager that is responsible for manageing // transactions in `InTransaction` - SetTransactionManager(tm TransactionManager) + SetTransactionManager(tm TransactionWrapper) } func (b *InProcBus) InTransaction(ctx context.Context, fn func(ctx context.Context) error) error { - ctxWithTran, err := b.transactionManager.Begin(ctx) - if err != nil { - return err - } - - err = fn(ctxWithTran) - b.transactionManager.End(ctxWithTran, err) - - return err + return b.transactionWrapper.Wrapp(ctx, fn) } type InProcBus struct { @@ -56,7 +47,7 @@ type InProcBus struct { listeners map[string][]HandlerFunc wildcardListeners []HandlerFunc - transactionManager TransactionManager + transactionWrapper TransactionWrapper } // temp stuff, not sure how to handle bus instance, and init yet @@ -68,7 +59,7 @@ func New() Bus { bus.listeners = make(map[string][]HandlerFunc) bus.wildcardListeners = make([]HandlerFunc, 0) - bus.transactionManager = &NoopTransactionManager{} + bus.transactionWrapper = &noopTransactionManager{} return bus } @@ -78,12 +69,12 @@ func GetBus() Bus { return globalBus } -func SetTransactionManager(tm TransactionManager) { +func SetTransactionManager(tm TransactionWrapper) { globalBus.SetTransactionManager(tm) } -func (b *InProcBus) SetTransactionManager(tm TransactionManager) { - b.transactionManager = tm +func (b *InProcBus) SetTransactionManager(tm TransactionWrapper) { + b.transactionWrapper = tm } func (b *InProcBus) DispatchCtx(ctx context.Context, msg Msg) error { @@ -208,6 +199,10 @@ func Publish(msg Msg) error { return globalBus.Publish(msg) } +// InTransaction starts a transaction and store it in the context. +// The caller can then pass a function with multiple DispatchCtx calls that +// all will be executed in the same transaction. InTransaction will rollback if the +// callback returns an error. func InTransaction(ctx context.Context, fn func(ctx context.Context) error) error { return globalBus.InTransaction(ctx, fn) } @@ -216,7 +211,8 @@ func ClearBusHandlers() { globalBus = New() } -type NoopTransactionManager struct{} +type noopTransactionManager struct{} -func (*NoopTransactionManager) Begin(ctx context.Context) (context.Context, error) { return ctx, nil } -func (*NoopTransactionManager) End(ctx context.Context, err error) error { return err } +func (*noopTransactionManager) Wrapp(ctx context.Context, fn func(ctx context.Context) error) error { + return nil +} diff --git a/pkg/services/sqlstore/shared.go b/pkg/services/sqlstore/shared.go index 3ccb92f010f..7928f22b9f3 100644 --- a/pkg/services/sqlstore/shared.go +++ b/pkg/services/sqlstore/shared.go @@ -32,17 +32,16 @@ func inTransaction(callback dbTransactionFunc) error { func startSession(ctx context.Context) *DBSession { value := ctx.Value(ContextSessionName) - var sess *xorm.Session - sess, ok := value.(*xorm.Session) + var sess *DBSession + sess, ok := value.(*DBSession) if !ok { - return newSession() + newSess := newSession() + newSess.Begin() + return newSess } - old := newSession() - old.Session = sess - - return old + return sess } func withDbSession(ctx context.Context, callback dbTransactionFunc) error { diff --git a/pkg/services/sqlstore/sqlstore.go b/pkg/services/sqlstore/sqlstore.go index bfe462f4d91..d6268f3a2aa 100644 --- a/pkg/services/sqlstore/sqlstore.go +++ b/pkg/services/sqlstore/sqlstore.go @@ -2,7 +2,6 @@ package sqlstore import ( "context" - "errors" "fmt" "net/url" "os" @@ -26,6 +25,7 @@ import ( "github.com/go-xorm/xorm" _ "github.com/lib/pq" _ "github.com/mattn/go-sqlite3" + sqlite3 "github.com/mattn/go-sqlite3" _ "github.com/grafana/grafana/pkg/tsdb/mssql" ) @@ -94,37 +94,49 @@ func (ss *SqlStore) Init() error { return ss.ensureAdminUser() } +// SQLTransactionManager begin/end transaction type SQLTransactionManager struct { engine *xorm.Engine } -func (stm *SQLTransactionManager) Begin(ctx context.Context) (context.Context, error) { - sess := stm.engine.NewSession() - err := sess.Begin() - if err != nil { - return ctx, err - } +func (stm *SQLTransactionManager) Wrapp(ctx context.Context, fn func(ctx context.Context) error) error { + return stm.wrappInternal(ctx, fn, 0) +} + +func (stm *SQLTransactionManager) wrappInternal(ctx context.Context, fn func(ctx context.Context) error, retry int) error { + sess := startSession(ctx) + defer sess.Close() withValue := context.WithValue(ctx, ContextSessionName, sess) - return withValue, nil -} + err := fn(withValue) -func (stm *SQLTransactionManager) End(ctx context.Context, err error) error { - value := ctx.Value(ContextSessionName) - sess, ok := value.(*xorm.Session) - if !ok { - return errors.New("context is missing transaction") + // special handling of database locked errors for sqlite, then we can retry 3 times + if sqlError, ok := err.(sqlite3.Error); ok && retry < 5 { + if sqlError.Code == sqlite3.ErrLocked { + sess.Rollback() + time.Sleep(time.Millisecond * time.Duration(10)) + sqlog.Info("Database table locked, sleeping then retrying", "retry", retry) + return stm.wrappInternal(ctx, fn, retry+1) + } } if err != nil { sess.Rollback() return err + } else if err = sess.Commit(); err != nil { + return err } - defer sess.Close() + if len(sess.events) > 0 { + for _, e := range sess.events { + if err = bus.Publish(e); err != nil { + log.Error(3, "Failed to publish event after commit", err) + } + } + } - return sess.Commit() + return nil } func (ss *SqlStore) ensureAdminUser() error { From 1bd31aa313df95b37781f859fe7c0926ab70a9e4 Mon Sep 17 00:00:00 2001 From: bergquist Date: Thu, 7 Jun 2018 18:17:01 +0200 Subject: [PATCH 03/16] check if admin exists or create one in one transaction --- pkg/services/sqlstore/sqlstore.go | 38 ++++----- pkg/services/sqlstore/user.go | 129 ++++++++++++++++-------------- 2 files changed, 91 insertions(+), 76 deletions(-) diff --git a/pkg/services/sqlstore/sqlstore.go b/pkg/services/sqlstore/sqlstore.go index d6268f3a2aa..a401e8c8c83 100644 --- a/pkg/services/sqlstore/sqlstore.go +++ b/pkg/services/sqlstore/sqlstore.go @@ -143,30 +143,32 @@ func (ss *SqlStore) ensureAdminUser() error { systemUserCountQuery := m.GetSystemUserCountStatsQuery{} err := bus.InTransaction(context.Background(), func(ctx context.Context) error { - return bus.DispatchCtx(ctx, &systemUserCountQuery) - }) - if err != nil { - return fmt.Errorf("Could not determine if admin user exists: %v", err) - } + err := bus.DispatchCtx(ctx, &systemUserCountQuery) + if err != nil { + return fmt.Errorf("Could not determine if admin user exists: %v", err) + } - if systemUserCountQuery.Result.Count > 0 { - return nil - } + if systemUserCountQuery.Result.Count > 0 { + return nil + } - cmd := m.CreateUserCommand{} - cmd.Login = setting.AdminUser - cmd.Email = setting.AdminUser + "@localhost" - cmd.Password = setting.AdminPassword - cmd.IsAdmin = true + cmd := m.CreateUserCommand{} + cmd.Login = setting.AdminUser + cmd.Email = setting.AdminUser + "@localhost" + cmd.Password = setting.AdminPassword + cmd.IsAdmin = true - if err := bus.Dispatch(&cmd); err != nil { - return fmt.Errorf("Failed to create admin user: %v", err) - } + if err := bus.DispatchCtx(ctx, &cmd); err != nil { + return fmt.Errorf("Failed to create admin user: %v", err) + } - ss.log.Info("Created default admin user: %v", setting.AdminUser) + ss.log.Info("Created default admin", "user", setting.AdminUser) - return nil + return nil + }) + + return err } func (ss *SqlStore) buildConnectionString() (string, error) { diff --git a/pkg/services/sqlstore/user.go b/pkg/services/sqlstore/user.go index e7aa8da837a..befc3a08401 100644 --- a/pkg/services/sqlstore/user.go +++ b/pkg/services/sqlstore/user.go @@ -1,6 +1,7 @@ package sqlstore import ( + "context" "strconv" "strings" "time" @@ -30,6 +31,8 @@ func init() { bus.AddHandler("sql", DeleteUser) bus.AddHandler("sql", UpdateUserPermissions) bus.AddHandler("sql", SetUserHelpFlag) + + bus.AddCtxHandler("sql", CreateUserCtx) } func getOrgIdForNewUser(cmd *m.CreateUserCommand, sess *DBSession) (int64, error) { @@ -79,77 +82,87 @@ func getOrgIdForNewUser(cmd *m.CreateUserCommand, sess *DBSession) (int64, error return org.Id, nil } -func CreateUser(cmd *m.CreateUserCommand) error { - return inTransaction(func(sess *DBSession) error { - orgId, err := getOrgIdForNewUser(cmd, sess) - if err != nil { - return err - } +func internalCreateUser(sess *DBSession, cmd *m.CreateUserCommand) error { + orgId, err := getOrgIdForNewUser(cmd, sess) + if err != nil { + return err + } - if cmd.Email == "" { - cmd.Email = cmd.Login - } + if cmd.Email == "" { + cmd.Email = cmd.Login + } - // create user - user := m.User{ - Email: cmd.Email, - Name: cmd.Name, - Login: cmd.Login, - Company: cmd.Company, - IsAdmin: cmd.IsAdmin, - OrgId: orgId, - EmailVerified: cmd.EmailVerified, - Created: time.Now(), - Updated: time.Now(), - LastSeenAt: time.Now().AddDate(-10, 0, 0), - } + // create user + user := m.User{ + Email: cmd.Email, + Name: cmd.Name, + Login: cmd.Login, + Company: cmd.Company, + IsAdmin: cmd.IsAdmin, + OrgId: orgId, + EmailVerified: cmd.EmailVerified, + Created: time.Now(), + Updated: time.Now(), + LastSeenAt: time.Now().AddDate(-10, 0, 0), + } - if len(cmd.Password) > 0 { - user.Salt = util.GetRandomString(10) - user.Rands = util.GetRandomString(10) - user.Password = util.EncodePassword(cmd.Password, user.Salt) - } + if len(cmd.Password) > 0 { + user.Salt = util.GetRandomString(10) + user.Rands = util.GetRandomString(10) + user.Password = util.EncodePassword(cmd.Password, user.Salt) + } - sess.UseBool("is_admin") + sess.UseBool("is_admin") - if _, err := sess.Insert(&user); err != nil { - return err - } + if _, err := sess.Insert(&user); err != nil { + return err + } - sess.publishAfterCommit(&events.UserCreated{ - Timestamp: user.Created, - Id: user.Id, - Name: user.Name, - Login: user.Login, - Email: user.Email, - }) + sess.publishAfterCommit(&events.UserCreated{ + Timestamp: user.Created, + Id: user.Id, + Name: user.Name, + Login: user.Login, + Email: user.Email, + }) - cmd.Result = user + cmd.Result = user - // create org user link - if !cmd.SkipOrgSetup { - orgUser := m.OrgUser{ - OrgId: orgId, - UserId: user.Id, - Role: m.ROLE_ADMIN, - Created: time.Now(), - Updated: time.Now(), - } + // create org user link + if !cmd.SkipOrgSetup { + orgUser := m.OrgUser{ + OrgId: orgId, + UserId: user.Id, + Role: m.ROLE_ADMIN, + Created: time.Now(), + Updated: time.Now(), + } - if setting.AutoAssignOrg && !user.IsAdmin { - if len(cmd.DefaultOrgRole) > 0 { - orgUser.Role = m.RoleType(cmd.DefaultOrgRole) - } else { - orgUser.Role = m.RoleType(setting.AutoAssignOrgRole) - } + if setting.AutoAssignOrg && !user.IsAdmin { + if len(cmd.DefaultOrgRole) > 0 { + orgUser.Role = m.RoleType(cmd.DefaultOrgRole) + } else { + orgUser.Role = m.RoleType(setting.AutoAssignOrgRole) } + } - if _, err = sess.Insert(&orgUser); err != nil { - return err - } + if _, err = sess.Insert(&orgUser); err != nil { + return err } + } - return nil + return nil +} + +func CreateUserCtx(ctx context.Context, cmd *m.CreateUserCommand) error { + return inTransactionWithRetryCtx(ctx, func(sess *DBSession) error { + return internalCreateUser(sess, cmd) + }, 0) +} + +func CreateUser(cmd *m.CreateUserCommand) error { + return inTransaction(func(sess *DBSession) error { + return internalCreateUser(sess, cmd) }) } From 6775a82c82be88394949cc3f1a417cad4cfd6c10 Mon Sep 17 00:00:00 2001 From: bergquist Date: Thu, 7 Jun 2018 18:22:06 +0200 Subject: [PATCH 04/16] fixes typo in code --- pkg/bus/bus.go | 6 +++--- pkg/services/sqlstore/sqlstore.go | 12 +++++++----- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/pkg/bus/bus.go b/pkg/bus/bus.go index 0f10dfd9b17..2972c9c7614 100644 --- a/pkg/bus/bus.go +++ b/pkg/bus/bus.go @@ -13,7 +13,7 @@ type Msg interface{} var ErrHandlerNotFound = errors.New("handler not found") type TransactionWrapper interface { - Wrapp(ctx context.Context, fn func(ctx context.Context) error) error + Wrap(ctx context.Context, fn func(ctx context.Context) error) error } type Bus interface { @@ -39,7 +39,7 @@ type Bus interface { } func (b *InProcBus) InTransaction(ctx context.Context, fn func(ctx context.Context) error) error { - return b.transactionWrapper.Wrapp(ctx, fn) + return b.transactionWrapper.Wrap(ctx, fn) } type InProcBus struct { @@ -213,6 +213,6 @@ func ClearBusHandlers() { type noopTransactionManager struct{} -func (*noopTransactionManager) Wrapp(ctx context.Context, fn func(ctx context.Context) error) error { +func (*noopTransactionManager) Wrap(ctx context.Context, fn func(ctx context.Context) error) error { return nil } diff --git a/pkg/services/sqlstore/sqlstore.go b/pkg/services/sqlstore/sqlstore.go index a401e8c8c83..a36e6cb15d1 100644 --- a/pkg/services/sqlstore/sqlstore.go +++ b/pkg/services/sqlstore/sqlstore.go @@ -99,11 +99,11 @@ type SQLTransactionManager struct { engine *xorm.Engine } -func (stm *SQLTransactionManager) Wrapp(ctx context.Context, fn func(ctx context.Context) error) error { - return stm.wrappInternal(ctx, fn, 0) +func (stm *SQLTransactionManager) Wrap(ctx context.Context, fn func(ctx context.Context) error) error { + return stm.wrapInternal(ctx, fn, 0) } -func (stm *SQLTransactionManager) wrappInternal(ctx context.Context, fn func(ctx context.Context) error, retry int) error { +func (stm *SQLTransactionManager) wrapInternal(ctx context.Context, fn func(ctx context.Context) error, retry int) error { sess := startSession(ctx) defer sess.Close() @@ -117,14 +117,16 @@ func (stm *SQLTransactionManager) wrappInternal(ctx context.Context, fn func(ctx sess.Rollback() time.Sleep(time.Millisecond * time.Duration(10)) sqlog.Info("Database table locked, sleeping then retrying", "retry", retry) - return stm.wrappInternal(ctx, fn, retry+1) + return stm.wrapInternal(ctx, fn, retry+1) } } if err != nil { sess.Rollback() return err - } else if err = sess.Commit(); err != nil { + } + + if err = sess.Commit(); err != nil { return err } From 442e0e437b0384c5a25595ef6dfc8cb136b07bfc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Torkel=20=C3=96degaard?= Date: Thu, 7 Jun 2018 12:54:36 -0700 Subject: [PATCH 05/16] refactoring: transaction manager PR #12203 --- pkg/bus/bus.go | 24 +++--- pkg/models/transaction.go | 7 ++ pkg/services/sqlstore/session.go | 63 +++++++++++++++ pkg/services/sqlstore/sqlstore.go | 58 +------------- .../sqlstore/{shared.go => transactions.go} | 79 ++++++++----------- pkg/services/sqlstore/user.go | 1 - 6 files changed, 115 insertions(+), 117 deletions(-) create mode 100644 pkg/models/transaction.go create mode 100644 pkg/services/sqlstore/session.go rename pkg/services/sqlstore/{shared.go => transactions.go} (56%) diff --git a/pkg/bus/bus.go b/pkg/bus/bus.go index 2972c9c7614..18248d6667e 100644 --- a/pkg/bus/bus.go +++ b/pkg/bus/bus.go @@ -12,8 +12,8 @@ type Msg interface{} var ErrHandlerNotFound = errors.New("handler not found") -type TransactionWrapper interface { - Wrap(ctx context.Context, fn func(ctx context.Context) error) error +type TransactionManager interface { + InTransaction(ctx context.Context, fn func(ctx context.Context) error) error } type Bus interface { @@ -35,19 +35,18 @@ type Bus interface { // SetTransactionManager allows the user to replace the internal // noop TransactionManager that is responsible for manageing // transactions in `InTransaction` - SetTransactionManager(tm TransactionWrapper) + SetTransactionManager(tm TransactionManager) } func (b *InProcBus) InTransaction(ctx context.Context, fn func(ctx context.Context) error) error { - return b.transactionWrapper.Wrap(ctx, fn) + return b.txMng.InTransaction(ctx, fn) } type InProcBus struct { handlers map[string]HandlerFunc listeners map[string][]HandlerFunc wildcardListeners []HandlerFunc - - transactionWrapper TransactionWrapper + txMng TransactionManager } // temp stuff, not sure how to handle bus instance, and init yet @@ -58,8 +57,7 @@ func New() Bus { bus.handlers = make(map[string]HandlerFunc) bus.listeners = make(map[string][]HandlerFunc) bus.wildcardListeners = make([]HandlerFunc, 0) - - bus.transactionWrapper = &noopTransactionManager{} + bus.txMng = &noopTransactionManager{} return bus } @@ -69,12 +67,8 @@ func GetBus() Bus { return globalBus } -func SetTransactionManager(tm TransactionWrapper) { - globalBus.SetTransactionManager(tm) -} - -func (b *InProcBus) SetTransactionManager(tm TransactionWrapper) { - b.transactionWrapper = tm +func (b *InProcBus) SetTransactionManager(tm TransactionManager) { + b.txMng = tm } func (b *InProcBus) DispatchCtx(ctx context.Context, msg Msg) error { @@ -213,6 +207,6 @@ func ClearBusHandlers() { type noopTransactionManager struct{} -func (*noopTransactionManager) Wrap(ctx context.Context, fn func(ctx context.Context) error) error { +func (*noopTransactionManager) InTransaction(ctx context.Context, fn func(ctx context.Context) error) error { return nil } diff --git a/pkg/models/transaction.go b/pkg/models/transaction.go new file mode 100644 index 00000000000..e07b2a9e397 --- /dev/null +++ b/pkg/models/transaction.go @@ -0,0 +1,7 @@ +package models + +import "context" + +type TransactionManager interface { + InTransaction(ctx context.Context, fn func(ctx context.Context) error) error +} diff --git a/pkg/services/sqlstore/session.go b/pkg/services/sqlstore/session.go new file mode 100644 index 00000000000..307d3ee1eeb --- /dev/null +++ b/pkg/services/sqlstore/session.go @@ -0,0 +1,63 @@ +package sqlstore + +import ( + "context" + "reflect" + + "github.com/go-xorm/xorm" +) + +type DBSession struct { + *xorm.Session + events []interface{} +} + +type dbTransactionFunc func(sess *DBSession) error + +func (sess *DBSession) publishAfterCommit(msg interface{}) { + sess.events = append(sess.events, msg) +} + +func newSession() *DBSession { + return &DBSession{Session: x.NewSession()} +} + +func startSession(ctx context.Context) *DBSession { + value := ctx.Value(ContextSessionName) + var sess *DBSession + sess, ok := value.(*DBSession) + + if !ok { + newSess := newSession() + newSess.Begin() + return newSess + } + + return sess +} + +func withDbSession(ctx context.Context, callback dbTransactionFunc) error { + sess := startSession(ctx) + + return callback(sess) +} + +func (sess *DBSession) InsertId(bean interface{}) (int64, error) { + table := sess.DB().Mapper.Obj2Table(getTypeName(bean)) + + dialect.PreInsertId(table, sess.Session) + + id, err := sess.Session.InsertOne(bean) + + dialect.PostInsertId(table, sess.Session) + + return id, err +} + +func getTypeName(bean interface{}) (res string) { + t := reflect.TypeOf(bean) + for t.Kind() == reflect.Ptr { + t = t.Elem() + } + return t.Name() +} diff --git a/pkg/services/sqlstore/sqlstore.go b/pkg/services/sqlstore/sqlstore.go index a36e6cb15d1..f97134fd0d5 100644 --- a/pkg/services/sqlstore/sqlstore.go +++ b/pkg/services/sqlstore/sqlstore.go @@ -23,11 +23,10 @@ import ( "github.com/go-sql-driver/mysql" "github.com/go-xorm/xorm" - _ "github.com/lib/pq" - _ "github.com/mattn/go-sqlite3" - sqlite3 "github.com/mattn/go-sqlite3" _ "github.com/grafana/grafana/pkg/tsdb/mssql" + _ "github.com/lib/pq" + _ "github.com/mattn/go-sqlite3" ) var ( @@ -82,9 +81,7 @@ func (ss *SqlStore) Init() error { // Init repo instances annotations.SetRepository(&SqlAnnotationRepo{}) - ss.Bus.SetTransactionManager(&SQLTransactionManager{ - engine: ss.engine, - }) + ss.Bus.SetTransactionManager(ss) // ensure admin user if ss.skipEnsureAdmin { @@ -94,57 +91,10 @@ func (ss *SqlStore) Init() error { return ss.ensureAdminUser() } -// SQLTransactionManager begin/end transaction -type SQLTransactionManager struct { - engine *xorm.Engine -} - -func (stm *SQLTransactionManager) Wrap(ctx context.Context, fn func(ctx context.Context) error) error { - return stm.wrapInternal(ctx, fn, 0) -} - -func (stm *SQLTransactionManager) wrapInternal(ctx context.Context, fn func(ctx context.Context) error, retry int) error { - sess := startSession(ctx) - defer sess.Close() - - withValue := context.WithValue(ctx, ContextSessionName, sess) - - err := fn(withValue) - - // special handling of database locked errors for sqlite, then we can retry 3 times - if sqlError, ok := err.(sqlite3.Error); ok && retry < 5 { - if sqlError.Code == sqlite3.ErrLocked { - sess.Rollback() - time.Sleep(time.Millisecond * time.Duration(10)) - sqlog.Info("Database table locked, sleeping then retrying", "retry", retry) - return stm.wrapInternal(ctx, fn, retry+1) - } - } - - if err != nil { - sess.Rollback() - return err - } - - if err = sess.Commit(); err != nil { - return err - } - - if len(sess.events) > 0 { - for _, e := range sess.events { - if err = bus.Publish(e); err != nil { - log.Error(3, "Failed to publish event after commit", err) - } - } - } - - return nil -} - func (ss *SqlStore) ensureAdminUser() error { systemUserCountQuery := m.GetSystemUserCountStatsQuery{} - err := bus.InTransaction(context.Background(), func(ctx context.Context) error { + err := ss.InTransaction(context.Background(), func(ctx context.Context) error { err := bus.DispatchCtx(ctx, &systemUserCountQuery) if err != nil { diff --git a/pkg/services/sqlstore/shared.go b/pkg/services/sqlstore/transactions.go similarity index 56% rename from pkg/services/sqlstore/shared.go rename to pkg/services/sqlstore/transactions.go index 7928f22b9f3..959d21c0bf1 100644 --- a/pkg/services/sqlstore/shared.go +++ b/pkg/services/sqlstore/transactions.go @@ -2,52 +2,53 @@ package sqlstore import ( "context" - "reflect" "time" - "github.com/go-xorm/xorm" "github.com/grafana/grafana/pkg/bus" "github.com/grafana/grafana/pkg/log" sqlite3 "github.com/mattn/go-sqlite3" ) -type DBSession struct { - *xorm.Session - events []interface{} +func (ss *SqlStore) InTransaction(ctx context.Context, fn func(ctx context.Context) error) error { + return ss.inTransactionWithRetry(ctx, fn, 0) } -type dbTransactionFunc func(sess *DBSession) error - -func (sess *DBSession) publishAfterCommit(msg interface{}) { - sess.events = append(sess.events, msg) -} +func (ss *SqlStore) inTransactionWithRetry(ctx context.Context, fn func(ctx context.Context) error, retry int) error { + sess := startSession(ctx) + defer sess.Close() -func newSession() *DBSession { - return &DBSession{Session: x.NewSession()} -} + withValue := context.WithValue(ctx, ContextSessionName, sess) -func inTransaction(callback dbTransactionFunc) error { - return inTransactionWithRetry(callback, 0) -} + err := fn(withValue) -func startSession(ctx context.Context) *DBSession { - value := ctx.Value(ContextSessionName) - var sess *DBSession - sess, ok := value.(*DBSession) + // special handling of database locked errors for sqlite, then we can retry 3 times + if sqlError, ok := err.(sqlite3.Error); ok && retry < 5 { + if sqlError.Code == sqlite3.ErrLocked { + sess.Rollback() + time.Sleep(time.Millisecond * time.Duration(10)) + ss.log.Info("Database table locked, sleeping then retrying", "retry", retry) + return ss.inTransactionWithRetry(ctx, fn, retry+1) + } + } - if !ok { - newSess := newSession() - newSess.Begin() - return newSess + if err != nil { + sess.Rollback() + return err } - return sess -} + if err = sess.Commit(); err != nil { + return err + } -func withDbSession(ctx context.Context, callback dbTransactionFunc) error { - sess := startSession(ctx) + if len(sess.events) > 0 { + for _, e := range sess.events { + if err = bus.Publish(e); err != nil { + ss.log.Error("Failed to publish event after commit", err) + } + } + } - return callback(sess) + return nil } func inTransactionWithRetry(callback dbTransactionFunc, retry int) error { @@ -94,22 +95,6 @@ func inTransactionWithRetryCtx(ctx context.Context, callback dbTransactionFunc, return nil } -func (sess *DBSession) InsertId(bean interface{}) (int64, error) { - table := sess.DB().Mapper.Obj2Table(getTypeName(bean)) - - dialect.PreInsertId(table, sess.Session) - - id, err := sess.Session.InsertOne(bean) - - dialect.PostInsertId(table, sess.Session) - - return id, err -} - -func getTypeName(bean interface{}) (res string) { - t := reflect.TypeOf(bean) - for t.Kind() == reflect.Ptr { - t = t.Elem() - } - return t.Name() +func inTransaction(callback dbTransactionFunc) error { + return inTransactionWithRetry(callback, 0) } diff --git a/pkg/services/sqlstore/user.go b/pkg/services/sqlstore/user.go index befc3a08401..f01cb84ad4f 100644 --- a/pkg/services/sqlstore/user.go +++ b/pkg/services/sqlstore/user.go @@ -31,7 +31,6 @@ func init() { bus.AddHandler("sql", DeleteUser) bus.AddHandler("sql", UpdateUserPermissions) bus.AddHandler("sql", SetUserHelpFlag) - bus.AddCtxHandler("sql", CreateUserCtx) } From 5af0b924ff47cdd5b5e1ef414d9dbad28c638913 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Torkel=20=C3=96degaard?= Date: Thu, 7 Jun 2018 13:03:27 -0700 Subject: [PATCH 06/16] refactoring: renamed AddCtxHandler to AddHandlerCtx PR #12203 --- pkg/bus/bus.go | 8 ++++---- pkg/services/notifications/notifications.go | 4 ++-- pkg/services/sqlstore/stats.go | 2 +- pkg/services/sqlstore/user.go | 2 +- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/pkg/bus/bus.go b/pkg/bus/bus.go index 18248d6667e..a7d580ef2b1 100644 --- a/pkg/bus/bus.go +++ b/pkg/bus/bus.go @@ -28,7 +28,7 @@ type Bus interface { InTransaction(ctx context.Context, fn func(ctx context.Context) error) error AddHandler(handler HandlerFunc) - AddCtxHandler(handler HandlerFunc) + AddHandlerCtx(handler HandlerFunc) AddEventListener(handler HandlerFunc) AddWildcardListener(handler HandlerFunc) @@ -146,7 +146,7 @@ func (b *InProcBus) AddHandler(handler HandlerFunc) { b.handlers[queryTypeName] = handler } -func (b *InProcBus) AddCtxHandler(handler HandlerFunc) { +func (b *InProcBus) AddHandlerCtx(handler HandlerFunc) { handlerType := reflect.TypeOf(handler) queryTypeName := handlerType.In(1).Elem().Name() b.handlers[queryTypeName] = handler @@ -168,8 +168,8 @@ func AddHandler(implName string, handler HandlerFunc) { } // Package level functions -func AddCtxHandler(implName string, handler HandlerFunc) { - globalBus.AddCtxHandler(handler) +func AddHandlerCtx(implName string, handler HandlerFunc) { + globalBus.AddHandlerCtx(handler) } // Package level functions diff --git a/pkg/services/notifications/notifications.go b/pkg/services/notifications/notifications.go index 14d362c5e1e..fcefa91243d 100644 --- a/pkg/services/notifications/notifications.go +++ b/pkg/services/notifications/notifications.go @@ -45,8 +45,8 @@ func (ns *NotificationService) Init() error { ns.Bus.AddHandler(ns.validateResetPasswordCode) ns.Bus.AddHandler(ns.sendEmailCommandHandler) - ns.Bus.AddCtxHandler(ns.sendEmailCommandHandlerSync) - ns.Bus.AddCtxHandler(ns.SendWebhookSync) + ns.Bus.AddHandlerCtx(ns.sendEmailCommandHandlerSync) + ns.Bus.AddHandlerCtx(ns.SendWebhookSync) ns.Bus.AddEventListener(ns.signUpStartedHandler) ns.Bus.AddEventListener(ns.signUpCompletedHandler) diff --git a/pkg/services/sqlstore/stats.go b/pkg/services/sqlstore/stats.go index af4482d9e25..ef0f148bb2a 100644 --- a/pkg/services/sqlstore/stats.go +++ b/pkg/services/sqlstore/stats.go @@ -14,7 +14,7 @@ func init() { bus.AddHandler("sql", GetDataSourceAccessStats) bus.AddHandler("sql", GetAdminStats) bus.AddHandler("sql", GetSystemUserCountStats) - bus.AddCtxHandler("sql", GetSystemUserCountStatsCtx) + bus.AddHandlerCtx("sql", GetSystemUserCountStatsCtx) } var activeUserTimeLimit = time.Hour * 24 * 30 diff --git a/pkg/services/sqlstore/user.go b/pkg/services/sqlstore/user.go index f01cb84ad4f..252499d5fdc 100644 --- a/pkg/services/sqlstore/user.go +++ b/pkg/services/sqlstore/user.go @@ -31,7 +31,7 @@ func init() { bus.AddHandler("sql", DeleteUser) bus.AddHandler("sql", UpdateUserPermissions) bus.AddHandler("sql", SetUserHelpFlag) - bus.AddCtxHandler("sql", CreateUserCtx) + bus.AddHandlerCtx("sql", CreateUserCtx) } func getOrgIdForNewUser(cmd *m.CreateUserCommand, sess *DBSession) (int64, error) { From e2275701d8a508454395dade814381b2e4f659ba Mon Sep 17 00:00:00 2001 From: bergquist Date: Fri, 8 Jun 2018 10:51:27 +0200 Subject: [PATCH 07/16] bus: DispatchCtx can now invoke any handler --- pkg/api/dashboard.go | 2 +- pkg/bus/bus.go | 24 +++++++++++++++---- pkg/bus/bus_test.go | 57 +++++++++++++++++++++++++++++++++++++------- 3 files changed, 69 insertions(+), 14 deletions(-) diff --git a/pkg/api/dashboard.go b/pkg/api/dashboard.go index c2ab6dd9a1a..40d855773d0 100644 --- a/pkg/api/dashboard.go +++ b/pkg/api/dashboard.go @@ -103,7 +103,7 @@ func GetDashboard(c *m.ReqContext) Response { } isDashboardProvisioned := &m.IsDashboardProvisionedQuery{DashboardId: dash.Id} - err = bus.Dispatch(isDashboardProvisioned) + err = bus.DispatchCtx(c.Req.Context(), isDashboardProvisioned) if err != nil { return Error(500, "Error while checking if dashboard is provisioned", err) } diff --git a/pkg/bus/bus.go b/pkg/bus/bus.go index a7d580ef2b1..69f0b95e195 100644 --- a/pkg/bus/bus.go +++ b/pkg/bus/bus.go @@ -44,6 +44,7 @@ func (b *InProcBus) InTransaction(ctx context.Context, fn func(ctx context.Conte type InProcBus struct { handlers map[string]HandlerFunc + handlersWithCtx map[string]HandlerFunc listeners map[string][]HandlerFunc wildcardListeners []HandlerFunc txMng TransactionManager @@ -55,6 +56,7 @@ var globalBus = New() func New() Bus { bus := &InProcBus{} bus.handlers = make(map[string]HandlerFunc) + bus.handlersWithCtx = make(map[string]HandlerFunc) bus.listeners = make(map[string][]HandlerFunc) bus.wildcardListeners = make([]HandlerFunc, 0) bus.txMng = &noopTransactionManager{} @@ -74,14 +76,26 @@ func (b *InProcBus) SetTransactionManager(tm TransactionManager) { func (b *InProcBus) DispatchCtx(ctx context.Context, msg Msg) error { var msgName = reflect.TypeOf(msg).Elem().Name() - var handler = b.handlers[msgName] + // we prefer to use the handler that support context.Context + var handler = b.handlersWithCtx[msgName] + var withCtx = true + + // fallback to use classic handlers + if handler == nil { + withCtx = false + handler = b.handlers[msgName] + } + if handler == nil { return ErrHandlerNotFound } - var params = make([]reflect.Value, 2) - params[0] = reflect.ValueOf(ctx) - params[1] = reflect.ValueOf(msg) + var params = []reflect.Value{} + if withCtx { + params = append(params, reflect.ValueOf(ctx)) + } + + params = append(params, reflect.ValueOf(msg)) ret := reflect.ValueOf(handler).Call(params) err := ret[0].Interface() @@ -149,7 +163,7 @@ func (b *InProcBus) AddHandler(handler HandlerFunc) { func (b *InProcBus) AddHandlerCtx(handler HandlerFunc) { handlerType := reflect.TypeOf(handler) queryTypeName := handlerType.In(1).Elem().Name() - b.handlers[queryTypeName] = handler + b.handlersWithCtx[queryTypeName] = handler } func (b *InProcBus) AddEventListener(handler HandlerFunc) { diff --git a/pkg/bus/bus_test.go b/pkg/bus/bus_test.go index 62e72f18308..4c061900718 100644 --- a/pkg/bus/bus_test.go +++ b/pkg/bus/bus_test.go @@ -1,24 +1,65 @@ package bus import ( + "context" "errors" "fmt" "testing" ) -type TestQuery struct { +type testQuery struct { Id int64 Resp string } +func TestDispatchCtxCanUseNormalHandlers(t *testing.T) { + bus := New() + + handlerWithCtxCallCount := 0 + handlerCallCount := 0 + + handlerWithCtx := func(ctx context.Context, query *testQuery) error { + handlerWithCtxCallCount++ + return nil + } + + handler := func(query *testQuery) error { + handlerCallCount++ + return nil + } + + err := bus.DispatchCtx(context.Background(), &testQuery{}) + if err != ErrHandlerNotFound { + t.Errorf("expected bus to return HandlerNotFound is no handler is registered") + } + + t.Run("when a normal handler is registered", func(t *testing.T) { + bus.AddHandler(handler) + bus.DispatchCtx(context.Background(), &testQuery{}) + + if handlerCallCount != 1 { + t.Errorf("Expected normal handler to be called once") + } + + t.Run("when a ctx handler is registered", func(t *testing.T) { + bus.AddHandlerCtx(handlerWithCtx) + bus.DispatchCtx(context.Background(), &testQuery{}) + + if handlerWithCtxCallCount != 1 { + t.Errorf("Expected ctx handler to be called once") + } + }) + }) +} + func TestQueryHandlerReturnsError(t *testing.T) { bus := New() - bus.AddHandler(func(query *TestQuery) error { + bus.AddHandler(func(query *testQuery) error { return errors.New("handler error") }) - err := bus.Dispatch(&TestQuery{}) + err := bus.Dispatch(&testQuery{}) if err == nil { t.Fatal("Send query failed " + err.Error()) @@ -30,12 +71,12 @@ func TestQueryHandlerReturnsError(t *testing.T) { func TestQueryHandlerReturn(t *testing.T) { bus := New() - bus.AddHandler(func(q *TestQuery) error { + bus.AddHandler(func(q *testQuery) error { q.Resp = "hello from handler" return nil }) - query := &TestQuery{} + query := &testQuery{} err := bus.Dispatch(query) if err != nil { @@ -49,17 +90,17 @@ func TestEventListeners(t *testing.T) { bus := New() count := 0 - bus.AddEventListener(func(query *TestQuery) error { + bus.AddEventListener(func(query *testQuery) error { count += 1 return nil }) - bus.AddEventListener(func(query *TestQuery) error { + bus.AddEventListener(func(query *testQuery) error { count += 10 return nil }) - err := bus.Publish(&TestQuery{}) + err := bus.Publish(&testQuery{}) if err != nil { t.Fatal("Publish event failed " + err.Error()) From 629eab0b1edca6ca629077365832953e6e547dc9 Mon Sep 17 00:00:00 2001 From: bergquist Date: Sun, 10 Jun 2018 23:17:18 +0200 Subject: [PATCH 08/16] bus: dont mix ctx/classic handlers --- pkg/api/dashboard.go | 2 +- pkg/bus/bus.go | 14 +------------- pkg/bus/bus_test.go | 21 +++++++++++---------- pkg/services/sqlstore/stats.go | 18 ++---------------- pkg/services/sqlstore/stats_test.go | 3 ++- 5 files changed, 17 insertions(+), 41 deletions(-) diff --git a/pkg/api/dashboard.go b/pkg/api/dashboard.go index 40d855773d0..c2ab6dd9a1a 100644 --- a/pkg/api/dashboard.go +++ b/pkg/api/dashboard.go @@ -103,7 +103,7 @@ func GetDashboard(c *m.ReqContext) Response { } isDashboardProvisioned := &m.IsDashboardProvisionedQuery{DashboardId: dash.Id} - err = bus.DispatchCtx(c.Req.Context(), isDashboardProvisioned) + err = bus.Dispatch(isDashboardProvisioned) if err != nil { return Error(500, "Error while checking if dashboard is provisioned", err) } diff --git a/pkg/bus/bus.go b/pkg/bus/bus.go index 69f0b95e195..1259211cddc 100644 --- a/pkg/bus/bus.go +++ b/pkg/bus/bus.go @@ -76,25 +76,13 @@ func (b *InProcBus) SetTransactionManager(tm TransactionManager) { func (b *InProcBus) DispatchCtx(ctx context.Context, msg Msg) error { var msgName = reflect.TypeOf(msg).Elem().Name() - // we prefer to use the handler that support context.Context var handler = b.handlersWithCtx[msgName] - var withCtx = true - - // fallback to use classic handlers - if handler == nil { - withCtx = false - handler = b.handlers[msgName] - } - if handler == nil { return ErrHandlerNotFound } var params = []reflect.Value{} - if withCtx { - params = append(params, reflect.ValueOf(ctx)) - } - + params = append(params, reflect.ValueOf(ctx)) params = append(params, reflect.ValueOf(msg)) ret := reflect.ValueOf(handler).Call(params) diff --git a/pkg/bus/bus_test.go b/pkg/bus/bus_test.go index 4c061900718..83a0d7190ea 100644 --- a/pkg/bus/bus_test.go +++ b/pkg/bus/bus_test.go @@ -33,22 +33,23 @@ func TestDispatchCtxCanUseNormalHandlers(t *testing.T) { t.Errorf("expected bus to return HandlerNotFound is no handler is registered") } + bus.AddHandler(handler) + bus.AddHandlerCtx(handlerWithCtx) + t.Run("when a normal handler is registered", func(t *testing.T) { - bus.AddHandler(handler) - bus.DispatchCtx(context.Background(), &testQuery{}) + bus.Dispatch(&testQuery{}) if handlerCallCount != 1 { - t.Errorf("Expected normal handler to be called once") + t.Errorf("Expected normal handler to be called 1 time. was called %d", handlerCallCount) } + }) - t.Run("when a ctx handler is registered", func(t *testing.T) { - bus.AddHandlerCtx(handlerWithCtx) - bus.DispatchCtx(context.Background(), &testQuery{}) + t.Run("when a ctx handler is registered", func(t *testing.T) { + bus.DispatchCtx(context.Background(), &testQuery{}) - if handlerWithCtxCallCount != 1 { - t.Errorf("Expected ctx handler to be called once") - } - }) + if handlerWithCtxCallCount != 1 { + t.Errorf("Expected ctx handler to be called 1 time. was called %d", handlerWithCtxCallCount) + } }) } diff --git a/pkg/services/sqlstore/stats.go b/pkg/services/sqlstore/stats.go index ef0f148bb2a..6db481bf06b 100644 --- a/pkg/services/sqlstore/stats.go +++ b/pkg/services/sqlstore/stats.go @@ -13,8 +13,7 @@ func init() { bus.AddHandler("sql", GetDataSourceStats) bus.AddHandler("sql", GetDataSourceAccessStats) bus.AddHandler("sql", GetAdminStats) - bus.AddHandler("sql", GetSystemUserCountStats) - bus.AddHandlerCtx("sql", GetSystemUserCountStatsCtx) + bus.AddHandlerCtx("sql", GetSystemUserCountStats) } var activeUserTimeLimit = time.Hour * 24 * 30 @@ -135,7 +134,7 @@ func GetAdminStats(query *m.GetAdminStatsQuery) error { return err } -func GetSystemUserCountStatsCtx(ctx context.Context, query *m.GetSystemUserCountStatsQuery) error { +func GetSystemUserCountStats(ctx context.Context, query *m.GetSystemUserCountStatsQuery) error { return withDbSession(ctx, func(sess *DBSession) error { var rawSql = `SELECT COUNT(id) AS Count FROM ` + dialect.Quote("user") @@ -150,16 +149,3 @@ func GetSystemUserCountStatsCtx(ctx context.Context, query *m.GetSystemUserCount return err }) } - -func GetSystemUserCountStats(query *m.GetSystemUserCountStatsQuery) error { - var rawSql = `SELECT COUNT(id) AS Count FROM ` + dialect.Quote("user") - var stats m.SystemUserCountStats - _, err := x.SQL(rawSql).Get(&stats) - if err != nil { - return err - } - - query.Result = &stats - - return err -} diff --git a/pkg/services/sqlstore/stats_test.go b/pkg/services/sqlstore/stats_test.go index 97f0ca0c43e..dae24952d17 100644 --- a/pkg/services/sqlstore/stats_test.go +++ b/pkg/services/sqlstore/stats_test.go @@ -1,6 +1,7 @@ package sqlstore import ( + "context" "testing" m "github.com/grafana/grafana/pkg/models" @@ -20,7 +21,7 @@ func TestStatsDataAccess(t *testing.T) { Convey("Get system user count stats should not results in error", func() { query := m.GetSystemUserCountStatsQuery{} - err := GetSystemUserCountStats(&query) + err := GetSystemUserCountStats(context.Background(), &query) So(err, ShouldBeNil) }) From 9ca9a7c30299a5959056ea40d110ac50bd80d06b Mon Sep 17 00:00:00 2001 From: bergquist Date: Tue, 12 Jun 2018 22:58:03 +0200 Subject: [PATCH 09/16] bus: dont start transaction when creating session --- pkg/services/sqlstore/session.go | 1 - pkg/services/sqlstore/transactions.go | 5 +++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/services/sqlstore/session.go b/pkg/services/sqlstore/session.go index 307d3ee1eeb..fdee2c76b0c 100644 --- a/pkg/services/sqlstore/session.go +++ b/pkg/services/sqlstore/session.go @@ -29,7 +29,6 @@ func startSession(ctx context.Context) *DBSession { if !ok { newSess := newSession() - newSess.Begin() return newSess } diff --git a/pkg/services/sqlstore/transactions.go b/pkg/services/sqlstore/transactions.go index 959d21c0bf1..f72b0bb8500 100644 --- a/pkg/services/sqlstore/transactions.go +++ b/pkg/services/sqlstore/transactions.go @@ -17,6 +17,10 @@ func (ss *SqlStore) inTransactionWithRetry(ctx context.Context, fn func(ctx cont sess := startSession(ctx) defer sess.Close() + if err := sess.Begin(); err != nil { + return err + } + withValue := context.WithValue(ctx, ContextSessionName, sess) err := fn(withValue) @@ -59,6 +63,7 @@ func inTransactionWithRetryCtx(ctx context.Context, callback dbTransactionFunc, var err error sess := startSession(ctx) + defer sess.Close() if err = sess.Begin(); err != nil { From 9c1758b5931f9d55ee3080d943018aee3adbb0e0 Mon Sep 17 00:00:00 2001 From: bergquist Date: Wed, 13 Jun 2018 09:18:53 +0200 Subject: [PATCH 10/16] bus: Dispatch now passes empty ctx if handler require it --- pkg/bus/bus.go | 16 +++++++++++++--- pkg/bus/bus_test.go | 15 ++++++++------- 2 files changed, 21 insertions(+), 10 deletions(-) diff --git a/pkg/bus/bus.go b/pkg/bus/bus.go index 1259211cddc..9cd3f116172 100644 --- a/pkg/bus/bus.go +++ b/pkg/bus/bus.go @@ -96,13 +96,23 @@ func (b *InProcBus) DispatchCtx(ctx context.Context, msg Msg) error { func (b *InProcBus) Dispatch(msg Msg) error { var msgName = reflect.TypeOf(msg).Elem().Name() - var handler = b.handlers[msgName] + var handler = b.handlersWithCtx[msgName] + withCtx := true + + if handler == nil { + withCtx = false + handler = b.handlers[msgName] + } + if handler == nil { return ErrHandlerNotFound } - var params = make([]reflect.Value, 1) - params[0] = reflect.ValueOf(msg) + var params = []reflect.Value{} + if withCtx { + params = append(params, reflect.ValueOf(context.Background())) + } + params = append(params, reflect.ValueOf(msg)) ret := reflect.ValueOf(handler).Call(params) err := ret[0].Interface() diff --git a/pkg/bus/bus_test.go b/pkg/bus/bus_test.go index 83a0d7190ea..9f41a5154df 100644 --- a/pkg/bus/bus_test.go +++ b/pkg/bus/bus_test.go @@ -34,7 +34,6 @@ func TestDispatchCtxCanUseNormalHandlers(t *testing.T) { } bus.AddHandler(handler) - bus.AddHandlerCtx(handlerWithCtx) t.Run("when a normal handler is registered", func(t *testing.T) { bus.Dispatch(&testQuery{}) @@ -42,15 +41,17 @@ func TestDispatchCtxCanUseNormalHandlers(t *testing.T) { if handlerCallCount != 1 { t.Errorf("Expected normal handler to be called 1 time. was called %d", handlerCallCount) } - }) - t.Run("when a ctx handler is registered", func(t *testing.T) { - bus.DispatchCtx(context.Background(), &testQuery{}) + t.Run("when a ctx handler is registered", func(t *testing.T) { + bus.AddHandlerCtx(handlerWithCtx) + bus.Dispatch(&testQuery{}) - if handlerWithCtxCallCount != 1 { - t.Errorf("Expected ctx handler to be called 1 time. was called %d", handlerWithCtxCallCount) - } + if handlerWithCtxCallCount != 1 { + t.Errorf("Expected ctx handler to be called 1 time. was called %d", handlerWithCtxCallCount) + } + }) }) + } func TestQueryHandlerReturnsError(t *testing.T) { From a3ee778ddbfb98a79ce970ec1dc49c4ea66a1adb Mon Sep 17 00:00:00 2001 From: bergquist Date: Thu, 14 Jun 2018 09:57:49 +0200 Subject: [PATCH 11/16] removes unused code --- pkg/models/transaction.go | 7 ------- 1 file changed, 7 deletions(-) delete mode 100644 pkg/models/transaction.go diff --git a/pkg/models/transaction.go b/pkg/models/transaction.go deleted file mode 100644 index e07b2a9e397..00000000000 --- a/pkg/models/transaction.go +++ /dev/null @@ -1,7 +0,0 @@ -package models - -import "context" - -type TransactionManager interface { - InTransaction(ctx context.Context, fn func(ctx context.Context) error) error -} From 03dae10e796cb8f412c386861e634d76555fba18 Mon Sep 17 00:00:00 2001 From: bergquist Date: Thu, 14 Jun 2018 09:59:52 +0200 Subject: [PATCH 12/16] bus: noop should still execute fn --- pkg/bus/bus.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/bus/bus.go b/pkg/bus/bus.go index 9cd3f116172..9cf930aeb82 100644 --- a/pkg/bus/bus.go +++ b/pkg/bus/bus.go @@ -220,5 +220,5 @@ func ClearBusHandlers() { type noopTransactionManager struct{} func (*noopTransactionManager) InTransaction(ctx context.Context, fn func(ctx context.Context) error) error { - return nil + return fn(ctx) } From 09e71e00a36f9a487df69facca139aab160d7f52 Mon Sep 17 00:00:00 2001 From: bergquist Date: Thu, 14 Jun 2018 19:07:33 +0200 Subject: [PATCH 13/16] sql: adds tests for InTransaction --- pkg/services/sqlstore/apikey.go | 7 ++- pkg/services/sqlstore/sqlstore.go | 1 + pkg/services/sqlstore/transactions_test.go | 64 ++++++++++++++++++++++ 3 files changed, 69 insertions(+), 3 deletions(-) create mode 100644 pkg/services/sqlstore/transactions_test.go diff --git a/pkg/services/sqlstore/apikey.go b/pkg/services/sqlstore/apikey.go index 9d41b5c809e..775d4cf6447 100644 --- a/pkg/services/sqlstore/apikey.go +++ b/pkg/services/sqlstore/apikey.go @@ -1,6 +1,7 @@ package sqlstore import ( + "context" "time" "github.com/grafana/grafana/pkg/bus" @@ -11,7 +12,7 @@ func init() { bus.AddHandler("sql", GetApiKeys) bus.AddHandler("sql", GetApiKeyById) bus.AddHandler("sql", GetApiKeyByName) - bus.AddHandler("sql", DeleteApiKey) + bus.AddHandlerCtx("sql", DeleteApiKeyCtx) bus.AddHandler("sql", AddApiKey) } @@ -22,8 +23,8 @@ func GetApiKeys(query *m.GetApiKeysQuery) error { return sess.Find(&query.Result) } -func DeleteApiKey(cmd *m.DeleteApiKeyCommand) error { - return inTransaction(func(sess *DBSession) error { +func DeleteApiKeyCtx(ctx context.Context, cmd *m.DeleteApiKeyCommand) error { + return withDbSession(ctx, func(sess *DBSession) error { var rawSql = "DELETE FROM api_key WHERE id=? and org_id=?" _, err := sess.Exec(rawSql, cmd.Id, cmd.OrgId) return err diff --git a/pkg/services/sqlstore/sqlstore.go b/pkg/services/sqlstore/sqlstore.go index f97134fd0d5..40101528df5 100644 --- a/pkg/services/sqlstore/sqlstore.go +++ b/pkg/services/sqlstore/sqlstore.go @@ -250,6 +250,7 @@ func (ss *SqlStore) readConfig() { } func InitTestDB(t *testing.T) *SqlStore { + t.Helper() sqlstore := &SqlStore{} sqlstore.skipEnsureAdmin = true sqlstore.Bus = bus.New() diff --git a/pkg/services/sqlstore/transactions_test.go b/pkg/services/sqlstore/transactions_test.go new file mode 100644 index 00000000000..2575229aad5 --- /dev/null +++ b/pkg/services/sqlstore/transactions_test.go @@ -0,0 +1,64 @@ +package sqlstore + +import ( + "context" + "errors" + "testing" + + "github.com/grafana/grafana/pkg/log" + "github.com/grafana/grafana/pkg/models" + + . "github.com/smartystreets/goconvey/convey" +) + +type testQuery struct { + result bool +} + +var ProvokedError = errors.New("testing error.") + +func TestTransaction(t *testing.T) { + InitTestDB(t) + + Convey("InTransaction asdf asdf", t, func() { + ss := SqlStore{log: log.New("test-logger")} + + cmd := &models.AddApiKeyCommand{Key: "secret-key", Name: "key", OrgId: 1} + + err := AddApiKey(cmd) + So(err, ShouldBeNil) + + deleteApiKeyCmd := &models.DeleteApiKeyCommand{Id: cmd.Result.Id, OrgId: 1} + + Convey("can update key", func() { + err := ss.InTransaction(context.Background(), func(ctx context.Context) error { + return DeleteApiKeyCtx(ctx, deleteApiKeyCmd) + }) + + So(err, ShouldBeNil) + + query := &models.GetApiKeyByIdQuery{ApiKeyId: cmd.Result.Id} + err = GetApiKeyById(query) + So(err, ShouldEqual, models.ErrInvalidApiKey) + }) + + Convey("wont update if one handler fails", func() { + err := ss.InTransaction(context.Background(), func(ctx context.Context) error { + err := DeleteApiKeyCtx(ctx, deleteApiKeyCmd) + if err != nil { + return err + } + + return ProvokedError + + }) + + So(err, ShouldEqual, ProvokedError) + + query := &models.GetApiKeyByIdQuery{ApiKeyId: cmd.Result.Id} + err = GetApiKeyById(query) + So(err, ShouldBeNil) + So(query.Result.Id, ShouldEqual, cmd.Result.Id) + }) + }) +} From da91b91b4bf32efdfd1946c419578a767b9b2de8 Mon Sep 17 00:00:00 2001 From: bergquist Date: Fri, 15 Jun 2018 20:49:14 +0200 Subject: [PATCH 14/16] transactions: start sessions and transactions at the same place this make it possible for handler to use `withSession` when transactions is not nedded and `inTransactionCtx` if its needed without knowing who owns the session/transaction --- pkg/services/sqlstore/session.go | 19 ++++++++++++++----- pkg/services/sqlstore/transactions.go | 21 +++++++++------------ pkg/services/sqlstore/transactions_test.go | 6 +----- 3 files changed, 24 insertions(+), 22 deletions(-) diff --git a/pkg/services/sqlstore/session.go b/pkg/services/sqlstore/session.go index fdee2c76b0c..c85346231e4 100644 --- a/pkg/services/sqlstore/session.go +++ b/pkg/services/sqlstore/session.go @@ -22,21 +22,30 @@ func newSession() *DBSession { return &DBSession{Session: x.NewSession()} } -func startSession(ctx context.Context) *DBSession { +func startSession(ctx context.Context, engine *xorm.Engine, beginTran bool) (*DBSession, error) { value := ctx.Value(ContextSessionName) var sess *DBSession sess, ok := value.(*DBSession) if !ok { - newSess := newSession() - return newSess + newSess := &DBSession{Session: engine.NewSession()} + if beginTran { + err := newSess.Begin() + if err != nil { + return nil, err + } + } + return newSess, nil } - return sess + return sess, nil } func withDbSession(ctx context.Context, callback dbTransactionFunc) error { - sess := startSession(ctx) + sess, err := startSession(ctx, x, false) + if err != nil { + return err + } return callback(sess) } diff --git a/pkg/services/sqlstore/transactions.go b/pkg/services/sqlstore/transactions.go index f72b0bb8500..3e7634dc196 100644 --- a/pkg/services/sqlstore/transactions.go +++ b/pkg/services/sqlstore/transactions.go @@ -14,16 +14,16 @@ func (ss *SqlStore) InTransaction(ctx context.Context, fn func(ctx context.Conte } func (ss *SqlStore) inTransactionWithRetry(ctx context.Context, fn func(ctx context.Context) error, retry int) error { - sess := startSession(ctx) - defer sess.Close() - - if err := sess.Begin(); err != nil { + sess, err := startSession(ctx, ss.engine, true) + if err != nil { return err } + defer sess.Close() + withValue := context.WithValue(ctx, ContextSessionName, sess) - err := fn(withValue) + err = fn(withValue) // special handling of database locked errors for sqlite, then we can retry 3 times if sqlError, ok := err.(sqlite3.Error); ok && retry < 5 { @@ -60,16 +60,13 @@ func inTransactionWithRetry(callback dbTransactionFunc, retry int) error { } func inTransactionWithRetryCtx(ctx context.Context, callback dbTransactionFunc, retry int) error { - var err error - - sess := startSession(ctx) - - defer sess.Close() - - if err = sess.Begin(); err != nil { + sess, err := startSession(ctx, x, true) + if err != nil { return err } + defer sess.Close() + err = callback(sess) // special handling of database locked errors for sqlite, then we can retry 3 times diff --git a/pkg/services/sqlstore/transactions_test.go b/pkg/services/sqlstore/transactions_test.go index 2575229aad5..937649921ba 100644 --- a/pkg/services/sqlstore/transactions_test.go +++ b/pkg/services/sqlstore/transactions_test.go @@ -5,7 +5,6 @@ import ( "errors" "testing" - "github.com/grafana/grafana/pkg/log" "github.com/grafana/grafana/pkg/models" . "github.com/smartystreets/goconvey/convey" @@ -18,11 +17,9 @@ type testQuery struct { var ProvokedError = errors.New("testing error.") func TestTransaction(t *testing.T) { - InitTestDB(t) + ss := InitTestDB(t) Convey("InTransaction asdf asdf", t, func() { - ss := SqlStore{log: log.New("test-logger")} - cmd := &models.AddApiKeyCommand{Key: "secret-key", Name: "key", OrgId: 1} err := AddApiKey(cmd) @@ -50,7 +47,6 @@ func TestTransaction(t *testing.T) { } return ProvokedError - }) So(err, ShouldEqual, ProvokedError) From 1181e967992990c119e0194c8e7c55dfed46b0d6 Mon Sep 17 00:00:00 2001 From: bergquist Date: Fri, 15 Jun 2018 21:23:57 +0200 Subject: [PATCH 15/16] merge create user handlers --- pkg/services/sqlstore/dashboard_test.go | 3 +- pkg/services/sqlstore/org_test.go | 11 +- pkg/services/sqlstore/team_test.go | 3 +- pkg/services/sqlstore/user.go | 130 +++++++++++------------- pkg/services/sqlstore/user_auth_test.go | 3 +- pkg/services/sqlstore/user_test.go | 3 +- 6 files changed, 74 insertions(+), 79 deletions(-) diff --git a/pkg/services/sqlstore/dashboard_test.go b/pkg/services/sqlstore/dashboard_test.go index 6d7c7a93e47..e4aecf0391d 100644 --- a/pkg/services/sqlstore/dashboard_test.go +++ b/pkg/services/sqlstore/dashboard_test.go @@ -1,6 +1,7 @@ package sqlstore import ( + "context" "fmt" "testing" "time" @@ -389,7 +390,7 @@ func createUser(name string, role string, isAdmin bool) m.User { setting.AutoAssignOrgRole = role currentUserCmd := m.CreateUserCommand{Login: name, Email: name + "@test.com", Name: "a " + name, IsAdmin: isAdmin} - err := CreateUser(¤tUserCmd) + err := CreateUser(context.Background(), ¤tUserCmd) So(err, ShouldBeNil) q1 := m.GetUserOrgListQuery{UserId: currentUserCmd.Result.Id} diff --git a/pkg/services/sqlstore/org_test.go b/pkg/services/sqlstore/org_test.go index 63b20aa6e86..f41b449de96 100644 --- a/pkg/services/sqlstore/org_test.go +++ b/pkg/services/sqlstore/org_test.go @@ -1,6 +1,7 @@ package sqlstore import ( + "context" "testing" "time" @@ -22,9 +23,9 @@ func TestAccountDataAccess(t *testing.T) { ac1cmd := m.CreateUserCommand{Login: "ac1", Email: "ac1@test.com", Name: "ac1 name"} ac2cmd := m.CreateUserCommand{Login: "ac2", Email: "ac2@test.com", Name: "ac2 name"} - err := CreateUser(&ac1cmd) + err := CreateUser(context.Background(), &ac1cmd) So(err, ShouldBeNil) - err = CreateUser(&ac2cmd) + err = CreateUser(context.Background(), &ac2cmd) So(err, ShouldBeNil) q1 := m.GetUserOrgListQuery{UserId: ac1cmd.Result.Id} @@ -43,8 +44,8 @@ func TestAccountDataAccess(t *testing.T) { ac1cmd := m.CreateUserCommand{Login: "ac1", Email: "ac1@test.com", Name: "ac1 name"} ac2cmd := m.CreateUserCommand{Login: "ac2", Email: "ac2@test.com", Name: "ac2 name", IsAdmin: true} - err := CreateUser(&ac1cmd) - err = CreateUser(&ac2cmd) + err := CreateUser(context.Background(), &ac1cmd) + err = CreateUser(context.Background(), &ac2cmd) So(err, ShouldBeNil) ac1 := ac1cmd.Result @@ -182,7 +183,7 @@ func TestAccountDataAccess(t *testing.T) { Convey("Given an org user with dashboard permissions", func() { ac3cmd := m.CreateUserCommand{Login: "ac3", Email: "ac3@test.com", Name: "ac3 name", IsAdmin: false} - err := CreateUser(&ac3cmd) + err := CreateUser(context.Background(), &ac3cmd) So(err, ShouldBeNil) ac3 := ac3cmd.Result diff --git a/pkg/services/sqlstore/team_test.go b/pkg/services/sqlstore/team_test.go index f4b022906da..abaa973957d 100644 --- a/pkg/services/sqlstore/team_test.go +++ b/pkg/services/sqlstore/team_test.go @@ -1,6 +1,7 @@ package sqlstore import ( + "context" "fmt" "testing" @@ -22,7 +23,7 @@ func TestTeamCommandsAndQueries(t *testing.T) { Name: fmt.Sprint("user", i), Login: fmt.Sprint("loginuser", i), } - err := CreateUser(userCmd) + err := CreateUser(context.Background(), userCmd) So(err, ShouldBeNil) userIds = append(userIds, userCmd.Result.Id) } diff --git a/pkg/services/sqlstore/user.go b/pkg/services/sqlstore/user.go index 252499d5fdc..4448e973e99 100644 --- a/pkg/services/sqlstore/user.go +++ b/pkg/services/sqlstore/user.go @@ -16,7 +16,7 @@ import ( ) func init() { - bus.AddHandler("sql", CreateUser) + //bus.AddHandler("sql", CreateUser) bus.AddHandler("sql", GetUserById) bus.AddHandler("sql", UpdateUser) bus.AddHandler("sql", ChangeUserPassword) @@ -31,7 +31,7 @@ func init() { bus.AddHandler("sql", DeleteUser) bus.AddHandler("sql", UpdateUserPermissions) bus.AddHandler("sql", SetUserHelpFlag) - bus.AddHandlerCtx("sql", CreateUserCtx) + bus.AddHandlerCtx("sql", CreateUser) } func getOrgIdForNewUser(cmd *m.CreateUserCommand, sess *DBSession) (int64, error) { @@ -81,90 +81,80 @@ func getOrgIdForNewUser(cmd *m.CreateUserCommand, sess *DBSession) (int64, error return org.Id, nil } -func internalCreateUser(sess *DBSession, cmd *m.CreateUserCommand) error { - orgId, err := getOrgIdForNewUser(cmd, sess) - if err != nil { - return err - } +func CreateUser(ctx context.Context, cmd *m.CreateUserCommand) error { + return inTransactionWithRetryCtx(ctx, func(sess *DBSession) error { + orgId, err := getOrgIdForNewUser(cmd, sess) + if err != nil { + return err + } - if cmd.Email == "" { - cmd.Email = cmd.Login - } + if cmd.Email == "" { + cmd.Email = cmd.Login + } - // create user - user := m.User{ - Email: cmd.Email, - Name: cmd.Name, - Login: cmd.Login, - Company: cmd.Company, - IsAdmin: cmd.IsAdmin, - OrgId: orgId, - EmailVerified: cmd.EmailVerified, - Created: time.Now(), - Updated: time.Now(), - LastSeenAt: time.Now().AddDate(-10, 0, 0), - } + // create user + user := m.User{ + Email: cmd.Email, + Name: cmd.Name, + Login: cmd.Login, + Company: cmd.Company, + IsAdmin: cmd.IsAdmin, + OrgId: orgId, + EmailVerified: cmd.EmailVerified, + Created: time.Now(), + Updated: time.Now(), + LastSeenAt: time.Now().AddDate(-10, 0, 0), + } - if len(cmd.Password) > 0 { - user.Salt = util.GetRandomString(10) - user.Rands = util.GetRandomString(10) - user.Password = util.EncodePassword(cmd.Password, user.Salt) - } + if len(cmd.Password) > 0 { + user.Salt = util.GetRandomString(10) + user.Rands = util.GetRandomString(10) + user.Password = util.EncodePassword(cmd.Password, user.Salt) + } - sess.UseBool("is_admin") + sess.UseBool("is_admin") - if _, err := sess.Insert(&user); err != nil { - return err - } + if _, err := sess.Insert(&user); err != nil { + return err + } - sess.publishAfterCommit(&events.UserCreated{ - Timestamp: user.Created, - Id: user.Id, - Name: user.Name, - Login: user.Login, - Email: user.Email, - }) + sess.publishAfterCommit(&events.UserCreated{ + Timestamp: user.Created, + Id: user.Id, + Name: user.Name, + Login: user.Login, + Email: user.Email, + }) - cmd.Result = user + cmd.Result = user - // create org user link - if !cmd.SkipOrgSetup { - orgUser := m.OrgUser{ - OrgId: orgId, - UserId: user.Id, - Role: m.ROLE_ADMIN, - Created: time.Now(), - Updated: time.Now(), - } + // create org user link + if !cmd.SkipOrgSetup { + orgUser := m.OrgUser{ + OrgId: orgId, + UserId: user.Id, + Role: m.ROLE_ADMIN, + Created: time.Now(), + Updated: time.Now(), + } - if setting.AutoAssignOrg && !user.IsAdmin { - if len(cmd.DefaultOrgRole) > 0 { - orgUser.Role = m.RoleType(cmd.DefaultOrgRole) - } else { - orgUser.Role = m.RoleType(setting.AutoAssignOrgRole) + if setting.AutoAssignOrg && !user.IsAdmin { + if len(cmd.DefaultOrgRole) > 0 { + orgUser.Role = m.RoleType(cmd.DefaultOrgRole) + } else { + orgUser.Role = m.RoleType(setting.AutoAssignOrgRole) + } } - } - if _, err = sess.Insert(&orgUser); err != nil { - return err + if _, err = sess.Insert(&orgUser); err != nil { + return err + } } - } - return nil -} - -func CreateUserCtx(ctx context.Context, cmd *m.CreateUserCommand) error { - return inTransactionWithRetryCtx(ctx, func(sess *DBSession) error { - return internalCreateUser(sess, cmd) + return nil }, 0) } -func CreateUser(cmd *m.CreateUserCommand) error { - return inTransaction(func(sess *DBSession) error { - return internalCreateUser(sess, cmd) - }) -} - func GetUserById(query *m.GetUserByIdQuery) error { user := new(m.User) has, err := x.Id(query.Id).Get(user) diff --git a/pkg/services/sqlstore/user_auth_test.go b/pkg/services/sqlstore/user_auth_test.go index 882e0c7afa5..5ad93dc7a3b 100644 --- a/pkg/services/sqlstore/user_auth_test.go +++ b/pkg/services/sqlstore/user_auth_test.go @@ -1,6 +1,7 @@ package sqlstore import ( + "context" "fmt" "testing" @@ -22,7 +23,7 @@ func TestUserAuth(t *testing.T) { Name: fmt.Sprint("user", i), Login: fmt.Sprint("loginuser", i), } - err = CreateUser(cmd) + err = CreateUser(context.Background(), cmd) So(err, ShouldBeNil) users = append(users, cmd.Result) } diff --git a/pkg/services/sqlstore/user_test.go b/pkg/services/sqlstore/user_test.go index 2830733c96a..3597b6ad0c1 100644 --- a/pkg/services/sqlstore/user_test.go +++ b/pkg/services/sqlstore/user_test.go @@ -1,6 +1,7 @@ package sqlstore import ( + "context" "fmt" "testing" @@ -24,7 +25,7 @@ func TestUserDataAccess(t *testing.T) { Name: fmt.Sprint("user", i), Login: fmt.Sprint("loginuser", i), } - err = CreateUser(cmd) + err = CreateUser(context.Background(), cmd) So(err, ShouldBeNil) users = append(users, cmd.Result) } From 4c5fe68e7ef8c78f1572cb30730317358390d2bb Mon Sep 17 00:00:00 2001 From: bergquist Date: Fri, 15 Jun 2018 21:57:13 +0200 Subject: [PATCH 16/16] adds inTransactionCtx that calls inTransactionWithRetryCtx --- pkg/services/sqlstore/transactions.go | 4 ++++ pkg/services/sqlstore/user.go | 4 ++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/pkg/services/sqlstore/transactions.go b/pkg/services/sqlstore/transactions.go index 3e7634dc196..eccd37f9a43 100644 --- a/pkg/services/sqlstore/transactions.go +++ b/pkg/services/sqlstore/transactions.go @@ -100,3 +100,7 @@ func inTransactionWithRetryCtx(ctx context.Context, callback dbTransactionFunc, func inTransaction(callback dbTransactionFunc) error { return inTransactionWithRetry(callback, 0) } + +func inTransactionCtx(ctx context.Context, callback dbTransactionFunc) error { + return inTransactionWithRetryCtx(ctx, callback, 0) +} diff --git a/pkg/services/sqlstore/user.go b/pkg/services/sqlstore/user.go index 4448e973e99..d32d51e0d0c 100644 --- a/pkg/services/sqlstore/user.go +++ b/pkg/services/sqlstore/user.go @@ -82,7 +82,7 @@ func getOrgIdForNewUser(cmd *m.CreateUserCommand, sess *DBSession) (int64, error } func CreateUser(ctx context.Context, cmd *m.CreateUserCommand) error { - return inTransactionWithRetryCtx(ctx, func(sess *DBSession) error { + return inTransactionCtx(ctx, func(sess *DBSession) error { orgId, err := getOrgIdForNewUser(cmd, sess) if err != nil { return err @@ -152,7 +152,7 @@ func CreateUser(ctx context.Context, cmd *m.CreateUserCommand) error { } return nil - }, 0) + }) } func GetUserById(query *m.GetUserByIdQuery) error {