|
|
|
@ -2,7 +2,6 @@ package live |
|
|
|
|
|
|
|
|
|
import ( |
|
|
|
|
"fmt" |
|
|
|
|
"strings" |
|
|
|
|
"sync" |
|
|
|
|
|
|
|
|
|
"github.com/centrifugal/centrifuge" |
|
|
|
@ -60,7 +59,7 @@ type GrafanaLive struct { |
|
|
|
|
// Init initializes the instance.
|
|
|
|
|
// Required to implement the registry.Service interface.
|
|
|
|
|
func (g *GrafanaLive) Init() error { |
|
|
|
|
logger.Debug("GrafanaLive initing") |
|
|
|
|
logger.Debug("GrafanaLive initialization") |
|
|
|
|
|
|
|
|
|
if !g.IsEnabled() { |
|
|
|
|
logger.Debug("GrafanaLive feature not enabled, skipping initialization") |
|
|
|
@ -113,9 +112,9 @@ func (g *GrafanaLive) Init() error { |
|
|
|
|
} |
|
|
|
|
}) |
|
|
|
|
|
|
|
|
|
// Called when a client writes to the websocket channel.
|
|
|
|
|
// Called when a client publishes to the websocket channel.
|
|
|
|
|
// In general, we should prefer writing to the HTTP API, but this
|
|
|
|
|
// allows some simple prototypes to work quickly
|
|
|
|
|
// allows some simple prototypes to work quickly.
|
|
|
|
|
client.OnPublish(func(e centrifuge.PublishEvent, cb centrifuge.PublishCallback) { |
|
|
|
|
handler, err := g.GetChannelHandler(e.Channel) |
|
|
|
|
if err != nil { |
|
|
|
@ -131,15 +130,7 @@ func (g *GrafanaLive) Init() error { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// SockJS will find the best protocol possible for the browser
|
|
|
|
|
sockJsPrefix := "/live/sockjs" |
|
|
|
|
sockjsHandler := centrifuge.NewSockjsHandler(node, centrifuge.SockjsConfig{ |
|
|
|
|
HandlerPrefix: sockJsPrefix, |
|
|
|
|
WebsocketReadBufferSize: 1024, |
|
|
|
|
WebsocketWriteBufferSize: 1024, |
|
|
|
|
}) |
|
|
|
|
|
|
|
|
|
// Use a direct websocket from go clients
|
|
|
|
|
// Use a pure websocket transport.
|
|
|
|
|
wsHandler := centrifuge.NewWebsocketHandler(node, centrifuge.WebsocketConfig{ |
|
|
|
|
ReadBufferSize: 1024, |
|
|
|
|
WriteBufferSize: 1024, |
|
|
|
@ -152,31 +143,19 @@ func (g *GrafanaLive) Init() error { |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Centrifuge expects Credentials in context with a current user ID.
|
|
|
|
|
cred := ¢rifuge.Credentials{ |
|
|
|
|
UserID: fmt.Sprintf("%d", user.UserId), |
|
|
|
|
} |
|
|
|
|
newCtx := centrifuge.SetCredentials(ctx.Req.Context(), cred) |
|
|
|
|
|
|
|
|
|
r := ctx.Req.Request |
|
|
|
|
r = r.WithContext(newCtx) // Set a user ID
|
|
|
|
|
|
|
|
|
|
// Check if this is a direct websocket connection
|
|
|
|
|
path := ctx.Req.URL.Path |
|
|
|
|
if strings.Contains(path, "live/ws") { |
|
|
|
|
wsHandler.ServeHTTP(ctx.Resp, r) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if strings.Contains(path, sockJsPrefix) { |
|
|
|
|
sockjsHandler.ServeHTTP(ctx.Resp, r) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
r = r.WithContext(newCtx) // Set a user ID.
|
|
|
|
|
|
|
|
|
|
// Unknown path
|
|
|
|
|
ctx.Resp.WriteHeader(404) |
|
|
|
|
wsHandler.ServeHTTP(ctx.Resp, r) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
g.RouteRegister.Any("/live/*", g.WebsocketHandler) |
|
|
|
|
g.RouteRegister.Get("/live/ws", g.WebsocketHandler) |
|
|
|
|
|
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
@ -286,7 +265,7 @@ func handleLog(msg centrifuge.LogEntry) { |
|
|
|
|
loggerCF.Error(msg.Message, arr...) |
|
|
|
|
case centrifuge.LogLevelInfo: |
|
|
|
|
loggerCF.Info(msg.Message, arr...) |
|
|
|
|
case centrifuge.LogLevelNone: |
|
|
|
|
default: |
|
|
|
|
loggerCF.Debug(msg.Message, arr...) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|