|
|
|
@ -27,7 +27,6 @@ type TransactionManager interface { |
|
|
|
|
|
|
|
|
|
// Bus type defines the bus interface structure
|
|
|
|
|
type Bus interface { |
|
|
|
|
Dispatch(msg Msg) error |
|
|
|
|
DispatchCtx(ctx context.Context, msg Msg) error |
|
|
|
|
|
|
|
|
|
PublishCtx(ctx context.Context, msg Msg) error |
|
|
|
@ -38,7 +37,6 @@ type Bus interface { |
|
|
|
|
// callback returns an error.
|
|
|
|
|
InTransaction(ctx context.Context, fn func(ctx context.Context) error) error |
|
|
|
|
|
|
|
|
|
AddHandler(handler HandlerFunc) |
|
|
|
|
AddHandlerCtx(handler HandlerFunc) |
|
|
|
|
|
|
|
|
|
AddEventListenerCtx(handler HandlerFunc) |
|
|
|
@ -128,37 +126,6 @@ func (b *InProcBus) DispatchCtx(ctx context.Context, msg Msg) error { |
|
|
|
|
return err.(error) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Dispatch function dispatch a message to the bus.
|
|
|
|
|
func (b *InProcBus) Dispatch(msg Msg) error { |
|
|
|
|
var msgName = reflect.TypeOf(msg).Elem().Name() |
|
|
|
|
|
|
|
|
|
withCtx := true |
|
|
|
|
handler := b.handlersWithCtx[msgName] |
|
|
|
|
if handler == nil { |
|
|
|
|
withCtx = false |
|
|
|
|
handler = b.handlers[msgName] |
|
|
|
|
if handler == nil { |
|
|
|
|
return ErrHandlerNotFound |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
var params = []reflect.Value{} |
|
|
|
|
if withCtx { |
|
|
|
|
if setting.Env == setting.Dev { |
|
|
|
|
b.logger.Warn("Dispatch called with message handler registered using AddHandlerCtx and should be changed to use DispatchCtx", "msgName", msgName) |
|
|
|
|
} |
|
|
|
|
params = append(params, reflect.ValueOf(context.Background())) |
|
|
|
|
} |
|
|
|
|
params = append(params, reflect.ValueOf(msg)) |
|
|
|
|
|
|
|
|
|
ret := reflect.ValueOf(handler).Call(params) |
|
|
|
|
err := ret[0].Interface() |
|
|
|
|
if err == nil { |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
return err.(error) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// PublishCtx function publish a message to the bus listener.
|
|
|
|
|
func (b *InProcBus) PublishCtx(ctx context.Context, msg Msg) error { |
|
|
|
|
var msgName = reflect.TypeOf(msg).Elem().Name() |
|
|
|
@ -205,12 +172,6 @@ func callListeners(listeners []HandlerFunc, params []reflect.Value) error { |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (b *InProcBus) AddHandler(handler HandlerFunc) { |
|
|
|
|
handlerType := reflect.TypeOf(handler) |
|
|
|
|
queryTypeName := handlerType.In(0).Elem().Name() |
|
|
|
|
b.handlers[queryTypeName] = handler |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (b *InProcBus) AddHandlerCtx(handler HandlerFunc) { |
|
|
|
|
handlerType := reflect.TypeOf(handler) |
|
|
|
|
queryTypeName := handlerType.In(1).Elem().Name() |
|
|
|
@ -232,12 +193,6 @@ func (b *InProcBus) AddEventListenerCtx(handler HandlerFunc) { |
|
|
|
|
b.listenersWithCtx[eventName] = append(b.listenersWithCtx[eventName], handler) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// AddHandler attaches a handler function to the global bus.
|
|
|
|
|
// Package level function.
|
|
|
|
|
func AddHandler(implName string, handler HandlerFunc) { |
|
|
|
|
globalBus.AddHandler(handler) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// AddHandlerCtx attaches a handler function to the global bus context.
|
|
|
|
|
// Package level function.
|
|
|
|
|
func AddHandlerCtx(implName string, handler HandlerFunc) { |
|
|
|
@ -250,10 +205,6 @@ func AddEventListenerCtx(handler HandlerFunc) { |
|
|
|
|
globalBus.AddEventListenerCtx(handler) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func Dispatch(msg Msg) error { |
|
|
|
|
return globalBus.Dispatch(msg) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func DispatchCtx(ctx context.Context, msg Msg) error { |
|
|
|
|
return globalBus.DispatchCtx(ctx, msg) |
|
|
|
|
} |
|
|
|
|