The open and composable observability and data visualization platform. Visualize metrics, logs, and traces from multiple sources like Prometheus, Loki, Elasticsearch, InfluxDB, Postgres and many more.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
grafana/pkg/bus/bus.go

86 lines
2.0 KiB

package bus
import (
"context"
"errors"
"fmt"
"reflect"
"go.opentelemetry.io/otel/attribute"
"github.com/grafana/grafana/pkg/infra/tracing"
)
// HandlerFunc defines a handler function interface.
type HandlerFunc any
// Msg defines a message interface.
type Msg any
// ErrHandlerNotFound defines an error if a handler is not found.
var ErrHandlerNotFound = errors.New("handler not found")
// Bus type defines the bus interface structure.
type Bus interface {
Publish(ctx context.Context, msg Msg) error
AddEventListener(handler HandlerFunc)
}
// InProcBus defines the bus structure.
type InProcBus struct {
listeners map[string][]HandlerFunc
tracer tracing.Tracer
}
func ProvideBus(tracer tracing.Tracer) *InProcBus {
return &InProcBus{
listeners: make(map[string][]HandlerFunc),
tracer: tracer,
}
}
// Publish function publish a message to the bus listener.
func (b *InProcBus) Publish(ctx context.Context, msg Msg) error {
var msgName = reflect.TypeOf(msg).Elem().Name()
var params = []reflect.Value{}
if listeners, exists := b.listeners[msgName]; exists {
params = append(params, reflect.ValueOf(ctx))
params = append(params, reflect.ValueOf(msg))
if err := callListeners(listeners, params); err != nil {
return err
}
}
_, span := b.tracer.Start(ctx, "bus - "+msgName)
defer span.End()
span.SetAttributes(attribute.String("msg", msgName))
return nil
}
func callListeners(listeners []HandlerFunc, params []reflect.Value) error {
for _, listenerHandler := range listeners {
ret := reflect.ValueOf(listenerHandler).Call(params)
e := ret[0].Interface()
if e != nil {
err, ok := e.(error)
if ok {
return err
}
return fmt.Errorf("expected listener to return an error, got '%T'", e)
}
}
return nil
}
func (b *InProcBus) AddEventListener(handler HandlerFunc) {
handlerType := reflect.TypeOf(handler)
eventName := handlerType.In(1).Elem().Name()
_, exists := b.listeners[eventName]
if !exists {
b.listeners[eventName] = make([]HandlerFunc, 0)
}
b.listeners[eventName] = append(b.listeners[eventName], handler)
}