diff --git a/pkg/api/api.go b/pkg/api/api.go index e931c5de601..de9778953d4 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -4,14 +4,13 @@ import ( "github.com/go-macaron/binding" "github.com/grafana/grafana/pkg/api/avatar" "github.com/grafana/grafana/pkg/api/dtos" - "github.com/grafana/grafana/pkg/api/live" "github.com/grafana/grafana/pkg/middleware" m "github.com/grafana/grafana/pkg/models" - "gopkg.in/macaron.v1" ) // Register adds http routes -func Register(r *macaron.Macaron) { +func (hs *HttpServer) registerRoutes() { + r := hs.macaron reqSignedIn := middleware.Auth(&middleware.AuthOptions{ReqSignedIn: true}) reqGrafanaAdmin := middleware.Auth(&middleware.AuthOptions{ReqSignedIn: true, ReqGrafanaAdmin: true}) reqEditorRole := middleware.RoleAuth(m.ROLE_EDITOR, m.ROLE_ADMIN) @@ -303,11 +302,10 @@ func Register(r *macaron.Macaron) { r.Get("/avatar/:hash", avt.ServeHTTP) // Websocket - liveConn := live.New() - r.Any("/ws", liveConn.Serve) + r.Any("/ws", hs.streamManager.Serve) // streams - r.Post("/api/streams/push", reqSignedIn, bind(dtos.StreamMessage{}), liveConn.PushToStream) + //r.Post("/api/streams/push", reqSignedIn, bind(dtos.StreamMessage{}), liveConn.PushToStream) InitAppPluginRoutes(r) diff --git a/pkg/api/http_server.go b/pkg/api/http_server.go new file mode 100644 index 00000000000..442eae60e02 --- /dev/null +++ b/pkg/api/http_server.go @@ -0,0 +1,138 @@ +package api + +import ( + "context" + "errors" + "fmt" + "net/http" + "os" + "path" + + macaron "gopkg.in/macaron.v1" + + "github.com/grafana/grafana/pkg/api/live" + httpstatic "github.com/grafana/grafana/pkg/api/static" + "github.com/grafana/grafana/pkg/cmd/grafana-cli/logger" + "github.com/grafana/grafana/pkg/log" + "github.com/grafana/grafana/pkg/middleware" + "github.com/grafana/grafana/pkg/plugins" + "github.com/grafana/grafana/pkg/setting" +) + +type HttpServer struct { + log log.Logger + macaron *macaron.Macaron + context context.Context + streamManager *live.StreamManager +} + +func NewHttpServer() *HttpServer { + return &HttpServer{ + log: log.New("http.server"), + } +} + +func (hs *HttpServer) Start(ctx context.Context) error { + var err error + + hs.context = ctx + hs.streamManager = live.NewStreamManager() + hs.macaron = hs.newMacaron() + hs.registerRoutes() + + hs.streamManager.Run(ctx) + + listenAddr := fmt.Sprintf("%s:%s", setting.HttpAddr, setting.HttpPort) + hs.log.Info("Initializing HTTP Server", "address", listenAddr, "protocol", setting.Protocol, "subUrl", setting.AppSubUrl) + + switch setting.Protocol { + case setting.HTTP: + err = http.ListenAndServe(listenAddr, hs.macaron) + case setting.HTTPS: + err = hs.listenAndServeTLS(listenAddr, setting.CertFile, setting.KeyFile) + default: + hs.log.Error("Invalid protocol", "protocol", setting.Protocol) + err = errors.New("Invalid Protocol") + } + + return err +} + +func (hs *HttpServer) listenAndServeTLS(listenAddr, certfile, keyfile string) error { + if certfile == "" { + return fmt.Errorf("cert_file cannot be empty when using HTTPS") + } + + if keyfile == "" { + return fmt.Errorf("cert_key cannot be empty when using HTTPS") + } + + if _, err := os.Stat(setting.CertFile); os.IsNotExist(err) { + return fmt.Errorf(`Cannot find SSL cert_file at %v`, setting.CertFile) + } + + if _, err := os.Stat(setting.KeyFile); os.IsNotExist(err) { + return fmt.Errorf(`Cannot find SSL key_file at %v`, setting.KeyFile) + } + + return http.ListenAndServeTLS(listenAddr, setting.CertFile, setting.KeyFile, hs.macaron) +} + +func (hs *HttpServer) newMacaron() *macaron.Macaron { + macaron.Env = setting.Env + m := macaron.New() + + m.Use(middleware.Logger()) + m.Use(middleware.Recovery()) + + if setting.EnableGzip { + m.Use(middleware.Gziper()) + } + + for _, route := range plugins.StaticRoutes { + pluginRoute := path.Join("/public/plugins/", route.PluginId) + logger.Debug("Plugins: Adding route", "route", pluginRoute, "dir", route.Directory) + hs.mapStatic(m, route.Directory, "", pluginRoute) + } + + hs.mapStatic(m, setting.StaticRootPath, "", "public") + hs.mapStatic(m, setting.StaticRootPath, "robots.txt", "robots.txt") + + m.Use(macaron.Renderer(macaron.RenderOptions{ + Directory: path.Join(setting.StaticRootPath, "views"), + IndentJSON: macaron.Env != macaron.PROD, + Delims: macaron.Delims{Left: "[[", Right: "]]"}, + })) + + m.Use(middleware.GetContextHandler()) + m.Use(middleware.Sessioner(&setting.SessionOptions)) + m.Use(middleware.RequestMetrics()) + + // needs to be after context handler + if setting.EnforceDomain { + m.Use(middleware.ValidateHostHeader(setting.Domain)) + } + + return m +} + +func (hs *HttpServer) mapStatic(m *macaron.Macaron, rootDir string, dir string, prefix string) { + headers := func(c *macaron.Context) { + c.Resp.Header().Set("Cache-Control", "public, max-age=3600") + } + + if setting.Env == setting.DEV { + headers = func(c *macaron.Context) { + c.Resp.Header().Set("Cache-Control", "max-age=0, must-revalidate, no-cache") + } + } + + m.Use(httpstatic.Static( + path.Join(rootDir, dir), + httpstatic.StaticOptions{ + SkipLogging: true, + Prefix: prefix, + AddHeaders: headers, + }, + )) +} diff --git a/pkg/api/live/conn.go b/pkg/api/live/conn.go index 09b66761b42..f2a041d7631 100644 --- a/pkg/api/live/conn.go +++ b/pkg/api/live/conn.go @@ -32,12 +32,14 @@ var upgrader = websocket.Upgrader{ } type connection struct { + hub *hub ws *websocket.Conn send chan []byte } -func newConnection(ws *websocket.Conn) *connection { +func newConnection(ws *websocket.Conn, hub *hub) *connection { return &connection{ + hub: hub, send: make(chan []byte, 256), ws: ws, } @@ -45,7 +47,7 @@ func newConnection(ws *websocket.Conn) *connection { func (c *connection) readPump() { defer func() { - h.unregister <- c + c.hub.unregister <- c c.ws.Close() }() @@ -81,9 +83,9 @@ func (c *connection) handleMessage(message []byte) { switch msgType { case "subscribe": - h.subChannel <- &streamSubscription{name: streamName, conn: c} + c.hub.subChannel <- &streamSubscription{name: streamName, conn: c} case "unsubscribe": - h.subChannel <- &streamSubscription{name: streamName, conn: c, remove: true} + c.hub.subChannel <- &streamSubscription{name: streamName, conn: c, remove: true} } } diff --git a/pkg/api/live/hub.go b/pkg/api/live/hub.go index bca65d57432..37ab5667e55 100644 --- a/pkg/api/live/hub.go +++ b/pkg/api/live/hub.go @@ -1,6 +1,8 @@ package live import ( + "context" + "github.com/grafana/grafana/pkg/api/dtos" "github.com/grafana/grafana/pkg/components/simplejson" "github.com/grafana/grafana/pkg/log" @@ -23,22 +25,26 @@ type streamSubscription struct { remove bool } -var h = hub{ - connections: make(map[*connection]bool), - streams: make(map[string]map[*connection]bool), - register: make(chan *connection), - unregister: make(chan *connection), - streamChannel: make(chan *dtos.StreamMessage), - subChannel: make(chan *streamSubscription), - log: log.New("live.hub"), +func newHub() *hub { + return &hub{ + connections: make(map[*connection]bool), + streams: make(map[string]map[*connection]bool), + register: make(chan *connection), + unregister: make(chan *connection), + streamChannel: make(chan *dtos.StreamMessage), + subChannel: make(chan *streamSubscription), + log: log.New("stream.hub"), + } } func (h *hub) removeConnection() { } -func (h *hub) run() { +func (h *hub) run(ctx context.Context) { for { select { + case <-ctx.Done(): + return case c := <-h.register: h.connections[c] = true h.log.Info("New connection", "total", len(h.connections)) @@ -49,7 +55,7 @@ func (h *hub) run() { delete(h.connections, c) close(c.send) } - // hand stream subscriptions + // hand stream subscriptions case sub := <-h.subChannel: h.log.Info("Subscribing", "channel", sub.name, "remove", sub.remove) subscribers, exists := h.streams[sub.name] diff --git a/pkg/api/live/live.go b/pkg/api/live/live.go deleted file mode 100644 index a7e73b4b28a..00000000000 --- a/pkg/api/live/live.go +++ /dev/null @@ -1,40 +0,0 @@ -package live - -import ( - "net/http" - - "github.com/grafana/grafana/pkg/api/dtos" - "github.com/grafana/grafana/pkg/log" - "github.com/grafana/grafana/pkg/middleware" -) - -type LiveConn struct { - log log.Logger -} - -func New() *LiveConn { - go h.run() - - return &LiveConn{log: log.New("live.server")} -} - -func (lc *LiveConn) Serve(w http.ResponseWriter, r *http.Request) { - lc.log.Info("Upgrading to WebSocket") - - ws, err := upgrader.Upgrade(w, r, nil) - if err != nil { - log.Error(3, "Live: Failed to upgrade connection to WebSocket", err) - return - } - - c := newConnection(ws) - h.register <- c - - go c.writePump() - c.readPump() -} - -func (lc *LiveConn) PushToStream(c *middleware.Context, message dtos.StreamMessage) { - h.streamChannel <- &message - c.JsonOK("Message recevived") -} diff --git a/pkg/api/live/stream_manager.go b/pkg/api/live/stream_manager.go index ddc57c67cdc..00d98286882 100644 --- a/pkg/api/live/stream_manager.go +++ b/pkg/api/live/stream_manager.go @@ -1,6 +1,8 @@ package live import ( + "context" + "net/http" "sync" "github.com/grafana/grafana/pkg/components/simplejson" @@ -8,21 +10,48 @@ import ( m "github.com/grafana/grafana/pkg/models" ) -type StreamManagerImpl struct { +type StreamManager struct { log log.Logger streams map[string]*Stream streamRWMutex *sync.RWMutex + hub *hub } -func NewStreamManager() m.StreamManager { - return &StreamManagerImpl{ - log: log.New("live.stream.manager"), +func NewStreamManager() *StreamManager { + return &StreamManager{ + hub: newHub(), + log: log.New("stream.manager"), streams: make(map[string]*Stream), streamRWMutex: &sync.RWMutex{}, } } -func (s *StreamManagerImpl) GetStreamList() m.StreamList { +func (sm *StreamManager) Run(context context.Context) { + log.Info("Initializing Stream Manager") + + go func() { + sm.hub.run(context) + log.Info("Stopped Stream Manager") + }() +} + +func (sm *StreamManager) Serve(w http.ResponseWriter, r *http.Request) { + sm.log.Info("Upgrading to WebSocket") + + ws, err := upgrader.Upgrade(w, r, nil) + if err != nil { + sm.log.Error("Failed to upgrade connection to WebSocket", "error", err) + return + } + + c := newConnection(ws, sm.hub) + sm.hub.register <- c + + go c.writePump() + c.readPump() +} + +func (s *StreamManager) GetStreamList() m.StreamList { list := make(m.StreamList, 0) for _, stream := range s.streams { @@ -34,7 +63,7 @@ func (s *StreamManagerImpl) GetStreamList() m.StreamList { return list } -func (s *StreamManagerImpl) Push(packet *m.StreamPacket) { +func (s *StreamManager) Push(packet *m.StreamPacket) { stream, exist := s.streams[packet.Stream] if !exist { diff --git a/pkg/cmd/grafana-server/server.go b/pkg/cmd/grafana-server/server.go index 2a4682cbb4b..326566b00ef 100644 --- a/pkg/cmd/grafana-server/server.go +++ b/pkg/cmd/grafana-server/server.go @@ -2,13 +2,9 @@ package main import ( "context" - "fmt" - "net/http" "os" "time" - "gopkg.in/macaron.v1" - "golang.org/x/sync/errgroup" "github.com/grafana/grafana/pkg/api" @@ -78,24 +74,9 @@ func (g *GrafanaServerImpl) Start() { } func (g *GrafanaServerImpl) startHttpServer() { - logger = log.New("http.server") - - var err error - m := newMacaron() - api.Register(m) - - listenAddr := fmt.Sprintf("%s:%s", setting.HttpAddr, setting.HttpPort) - g.log.Info("Initializing HTTP Server", "address", listenAddr, "protocol", setting.Protocol, "subUrl", setting.AppSubUrl) - - switch setting.Protocol { - case setting.HTTP: - err = http.ListenAndServe(listenAddr, m) - case setting.HTTPS: - err = ListenAndServeTLS(listenAddr, setting.CertFile, setting.KeyFile, m) - default: - g.log.Error("Invalid protocol", "protocol", setting.Protocol) - g.Shutdown(1, "Startup failed") - } + httpServer := api.NewHttpServer() + + err := httpServer.Start(g.context) if err != nil { g.log.Error("Fail to start server", "error", err) @@ -115,26 +96,6 @@ func (g *GrafanaServerImpl) Shutdown(code int, reason string) { os.Exit(code) } -func ListenAndServeTLS(listenAddr, certfile, keyfile string, m *macaron.Macaron) error { - if certfile == "" { - return fmt.Errorf("cert_file cannot be empty when using HTTPS") - } - - if keyfile == "" { - return fmt.Errorf("cert_key cannot be empty when using HTTPS") - } - - if _, err := os.Stat(setting.CertFile); os.IsNotExist(err) { - return fmt.Errorf(`Cannot find SSL cert_file at %v`, setting.CertFile) - } - - if _, err := os.Stat(setting.KeyFile); os.IsNotExist(err) { - return fmt.Errorf(`Cannot find SSL key_file at %v`, setting.KeyFile) - } - - return http.ListenAndServeTLS(listenAddr, setting.CertFile, setting.KeyFile, m) -} - // implement context.Context func (g *GrafanaServerImpl) Deadline() (deadline time.Time, ok bool) { return g.context.Deadline() diff --git a/pkg/cmd/grafana-server/web.go b/pkg/cmd/grafana-server/web.go deleted file mode 100644 index 07a2f82b172..00000000000 --- a/pkg/cmd/grafana-server/web.go +++ /dev/null @@ -1,107 +0,0 @@ -// Copyright 2014 Unknwon -// Copyright 2014 Torkel Ödegaard - -package main - -import ( - "fmt" - "net/http" - "path" - - "gopkg.in/macaron.v1" - - "github.com/grafana/grafana/pkg/api" - "github.com/grafana/grafana/pkg/api/static" - "github.com/grafana/grafana/pkg/log" - "github.com/grafana/grafana/pkg/middleware" - "github.com/grafana/grafana/pkg/plugins" - "github.com/grafana/grafana/pkg/setting" -) - -var logger log.Logger - -func newMacaron() *macaron.Macaron { - macaron.Env = setting.Env - m := macaron.New() - - m.Use(middleware.Logger()) - m.Use(middleware.Recovery()) - - if setting.EnableGzip { - m.Use(middleware.Gziper()) - } - - for _, route := range plugins.StaticRoutes { - pluginRoute := path.Join("/public/plugins/", route.PluginId) - logger.Debug("Plugins: Adding route", "route", pluginRoute, "dir", route.Directory) - mapStatic(m, route.Directory, "", pluginRoute) - } - - mapStatic(m, setting.StaticRootPath, "", "public") - mapStatic(m, setting.StaticRootPath, "robots.txt", "robots.txt") - - m.Use(macaron.Renderer(macaron.RenderOptions{ - Directory: path.Join(setting.StaticRootPath, "views"), - IndentJSON: macaron.Env != macaron.PROD, - Delims: macaron.Delims{Left: "[[", Right: "]]"}, - })) - - m.Use(middleware.GetContextHandler()) - m.Use(middleware.Sessioner(&setting.SessionOptions)) - m.Use(middleware.RequestMetrics()) - - // needs to be after context handler - if setting.EnforceDomain { - m.Use(middleware.ValidateHostHeader(setting.Domain)) - } - - return m -} - -func mapStatic(m *macaron.Macaron, rootDir string, dir string, prefix string) { - headers := func(c *macaron.Context) { - c.Resp.Header().Set("Cache-Control", "public, max-age=3600") - } - - if setting.Env == setting.DEV { - headers = func(c *macaron.Context) { - c.Resp.Header().Set("Cache-Control", "max-age=0, must-revalidate, no-cache") - } - } - - m.Use(httpstatic.Static( - path.Join(rootDir, dir), - httpstatic.StaticOptions{ - SkipLogging: true, - Prefix: prefix, - AddHeaders: headers, - }, - )) -} - -func StartServer() int { - logger = log.New("server") - - var err error - m := newMacaron() - api.Register(m) - - listenAddr := fmt.Sprintf("%s:%s", setting.HttpAddr, setting.HttpPort) - logger.Info("Server Listening", "address", listenAddr, "protocol", setting.Protocol, "subUrl", setting.AppSubUrl) - switch setting.Protocol { - case setting.HTTP: - err = http.ListenAndServe(listenAddr, m) - case setting.HTTPS: - err = http.ListenAndServeTLS(listenAddr, setting.CertFile, setting.KeyFile, m) - default: - logger.Error("Invalid protocol", "protocol", setting.Protocol) - return 1 - } - - if err != nil { - logger.Error("Fail to start server", "error", err) - return 1 - } - - return 0 -}