refactor(http): refactoring http server

pull/7347/head
Torkel Ödegaard 9 years ago
parent 8bccbdafd2
commit fad07f0d15
  1. 10
      pkg/api/api.go
  2. 138
      pkg/api/http_server.go
  3. 10
      pkg/api/live/conn.go
  4. 26
      pkg/api/live/hub.go
  5. 40
      pkg/api/live/live.go
  6. 41
      pkg/api/live/stream_manager.go
  7. 45
      pkg/cmd/grafana-server/server.go
  8. 107
      pkg/cmd/grafana-server/web.go

@ -4,14 +4,13 @@ import (
"github.com/go-macaron/binding"
"github.com/grafana/grafana/pkg/api/avatar"
"github.com/grafana/grafana/pkg/api/dtos"
"github.com/grafana/grafana/pkg/api/live"
"github.com/grafana/grafana/pkg/middleware"
m "github.com/grafana/grafana/pkg/models"
"gopkg.in/macaron.v1"
)
// Register adds http routes
func Register(r *macaron.Macaron) {
func (hs *HttpServer) registerRoutes() {
r := hs.macaron
reqSignedIn := middleware.Auth(&middleware.AuthOptions{ReqSignedIn: true})
reqGrafanaAdmin := middleware.Auth(&middleware.AuthOptions{ReqSignedIn: true, ReqGrafanaAdmin: true})
reqEditorRole := middleware.RoleAuth(m.ROLE_EDITOR, m.ROLE_ADMIN)
@ -303,11 +302,10 @@ func Register(r *macaron.Macaron) {
r.Get("/avatar/:hash", avt.ServeHTTP)
// Websocket
liveConn := live.New()
r.Any("/ws", liveConn.Serve)
r.Any("/ws", hs.streamManager.Serve)
// streams
r.Post("/api/streams/push", reqSignedIn, bind(dtos.StreamMessage{}), liveConn.PushToStream)
//r.Post("/api/streams/push", reqSignedIn, bind(dtos.StreamMessage{}), liveConn.PushToStream)
InitAppPluginRoutes(r)

@ -0,0 +1,138 @@
package api
import (
"context"
"errors"
"fmt"
"net/http"
"os"
"path"
macaron "gopkg.in/macaron.v1"
"github.com/grafana/grafana/pkg/api/live"
httpstatic "github.com/grafana/grafana/pkg/api/static"
"github.com/grafana/grafana/pkg/cmd/grafana-cli/logger"
"github.com/grafana/grafana/pkg/log"
"github.com/grafana/grafana/pkg/middleware"
"github.com/grafana/grafana/pkg/plugins"
"github.com/grafana/grafana/pkg/setting"
)
type HttpServer struct {
log log.Logger
macaron *macaron.Macaron
context context.Context
streamManager *live.StreamManager
}
func NewHttpServer() *HttpServer {
return &HttpServer{
log: log.New("http.server"),
}
}
func (hs *HttpServer) Start(ctx context.Context) error {
var err error
hs.context = ctx
hs.streamManager = live.NewStreamManager()
hs.macaron = hs.newMacaron()
hs.registerRoutes()
hs.streamManager.Run(ctx)
listenAddr := fmt.Sprintf("%s:%s", setting.HttpAddr, setting.HttpPort)
hs.log.Info("Initializing HTTP Server", "address", listenAddr, "protocol", setting.Protocol, "subUrl", setting.AppSubUrl)
switch setting.Protocol {
case setting.HTTP:
err = http.ListenAndServe(listenAddr, hs.macaron)
case setting.HTTPS:
err = hs.listenAndServeTLS(listenAddr, setting.CertFile, setting.KeyFile)
default:
hs.log.Error("Invalid protocol", "protocol", setting.Protocol)
err = errors.New("Invalid Protocol")
}
return err
}
func (hs *HttpServer) listenAndServeTLS(listenAddr, certfile, keyfile string) error {
if certfile == "" {
return fmt.Errorf("cert_file cannot be empty when using HTTPS")
}
if keyfile == "" {
return fmt.Errorf("cert_key cannot be empty when using HTTPS")
}
if _, err := os.Stat(setting.CertFile); os.IsNotExist(err) {
return fmt.Errorf(`Cannot find SSL cert_file at %v`, setting.CertFile)
}
if _, err := os.Stat(setting.KeyFile); os.IsNotExist(err) {
return fmt.Errorf(`Cannot find SSL key_file at %v`, setting.KeyFile)
}
return http.ListenAndServeTLS(listenAddr, setting.CertFile, setting.KeyFile, hs.macaron)
}
func (hs *HttpServer) newMacaron() *macaron.Macaron {
macaron.Env = setting.Env
m := macaron.New()
m.Use(middleware.Logger())
m.Use(middleware.Recovery())
if setting.EnableGzip {
m.Use(middleware.Gziper())
}
for _, route := range plugins.StaticRoutes {
pluginRoute := path.Join("/public/plugins/", route.PluginId)
logger.Debug("Plugins: Adding route", "route", pluginRoute, "dir", route.Directory)
hs.mapStatic(m, route.Directory, "", pluginRoute)
}
hs.mapStatic(m, setting.StaticRootPath, "", "public")
hs.mapStatic(m, setting.StaticRootPath, "robots.txt", "robots.txt")
m.Use(macaron.Renderer(macaron.RenderOptions{
Directory: path.Join(setting.StaticRootPath, "views"),
IndentJSON: macaron.Env != macaron.PROD,
Delims: macaron.Delims{Left: "[[", Right: "]]"},
}))
m.Use(middleware.GetContextHandler())
m.Use(middleware.Sessioner(&setting.SessionOptions))
m.Use(middleware.RequestMetrics())
// needs to be after context handler
if setting.EnforceDomain {
m.Use(middleware.ValidateHostHeader(setting.Domain))
}
return m
}
func (hs *HttpServer) mapStatic(m *macaron.Macaron, rootDir string, dir string, prefix string) {
headers := func(c *macaron.Context) {
c.Resp.Header().Set("Cache-Control", "public, max-age=3600")
}
if setting.Env == setting.DEV {
headers = func(c *macaron.Context) {
c.Resp.Header().Set("Cache-Control", "max-age=0, must-revalidate, no-cache")
}
}
m.Use(httpstatic.Static(
path.Join(rootDir, dir),
httpstatic.StaticOptions{
SkipLogging: true,
Prefix: prefix,
AddHeaders: headers,
},
))
}

@ -32,12 +32,14 @@ var upgrader = websocket.Upgrader{
}
type connection struct {
hub *hub
ws *websocket.Conn
send chan []byte
}
func newConnection(ws *websocket.Conn) *connection {
func newConnection(ws *websocket.Conn, hub *hub) *connection {
return &connection{
hub: hub,
send: make(chan []byte, 256),
ws: ws,
}
@ -45,7 +47,7 @@ func newConnection(ws *websocket.Conn) *connection {
func (c *connection) readPump() {
defer func() {
h.unregister <- c
c.hub.unregister <- c
c.ws.Close()
}()
@ -81,9 +83,9 @@ func (c *connection) handleMessage(message []byte) {
switch msgType {
case "subscribe":
h.subChannel <- &streamSubscription{name: streamName, conn: c}
c.hub.subChannel <- &streamSubscription{name: streamName, conn: c}
case "unsubscribe":
h.subChannel <- &streamSubscription{name: streamName, conn: c, remove: true}
c.hub.subChannel <- &streamSubscription{name: streamName, conn: c, remove: true}
}
}

@ -1,6 +1,8 @@
package live
import (
"context"
"github.com/grafana/grafana/pkg/api/dtos"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/log"
@ -23,22 +25,26 @@ type streamSubscription struct {
remove bool
}
var h = hub{
connections: make(map[*connection]bool),
streams: make(map[string]map[*connection]bool),
register: make(chan *connection),
unregister: make(chan *connection),
streamChannel: make(chan *dtos.StreamMessage),
subChannel: make(chan *streamSubscription),
log: log.New("live.hub"),
func newHub() *hub {
return &hub{
connections: make(map[*connection]bool),
streams: make(map[string]map[*connection]bool),
register: make(chan *connection),
unregister: make(chan *connection),
streamChannel: make(chan *dtos.StreamMessage),
subChannel: make(chan *streamSubscription),
log: log.New("stream.hub"),
}
}
func (h *hub) removeConnection() {
}
func (h *hub) run() {
func (h *hub) run(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case c := <-h.register:
h.connections[c] = true
h.log.Info("New connection", "total", len(h.connections))
@ -49,7 +55,7 @@ func (h *hub) run() {
delete(h.connections, c)
close(c.send)
}
// hand stream subscriptions
// hand stream subscriptions
case sub := <-h.subChannel:
h.log.Info("Subscribing", "channel", sub.name, "remove", sub.remove)
subscribers, exists := h.streams[sub.name]

@ -1,40 +0,0 @@
package live
import (
"net/http"
"github.com/grafana/grafana/pkg/api/dtos"
"github.com/grafana/grafana/pkg/log"
"github.com/grafana/grafana/pkg/middleware"
)
type LiveConn struct {
log log.Logger
}
func New() *LiveConn {
go h.run()
return &LiveConn{log: log.New("live.server")}
}
func (lc *LiveConn) Serve(w http.ResponseWriter, r *http.Request) {
lc.log.Info("Upgrading to WebSocket")
ws, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Error(3, "Live: Failed to upgrade connection to WebSocket", err)
return
}
c := newConnection(ws)
h.register <- c
go c.writePump()
c.readPump()
}
func (lc *LiveConn) PushToStream(c *middleware.Context, message dtos.StreamMessage) {
h.streamChannel <- &message
c.JsonOK("Message recevived")
}

@ -1,6 +1,8 @@
package live
import (
"context"
"net/http"
"sync"
"github.com/grafana/grafana/pkg/components/simplejson"
@ -8,21 +10,48 @@ import (
m "github.com/grafana/grafana/pkg/models"
)
type StreamManagerImpl struct {
type StreamManager struct {
log log.Logger
streams map[string]*Stream
streamRWMutex *sync.RWMutex
hub *hub
}
func NewStreamManager() m.StreamManager {
return &StreamManagerImpl{
log: log.New("live.stream.manager"),
func NewStreamManager() *StreamManager {
return &StreamManager{
hub: newHub(),
log: log.New("stream.manager"),
streams: make(map[string]*Stream),
streamRWMutex: &sync.RWMutex{},
}
}
func (s *StreamManagerImpl) GetStreamList() m.StreamList {
func (sm *StreamManager) Run(context context.Context) {
log.Info("Initializing Stream Manager")
go func() {
sm.hub.run(context)
log.Info("Stopped Stream Manager")
}()
}
func (sm *StreamManager) Serve(w http.ResponseWriter, r *http.Request) {
sm.log.Info("Upgrading to WebSocket")
ws, err := upgrader.Upgrade(w, r, nil)
if err != nil {
sm.log.Error("Failed to upgrade connection to WebSocket", "error", err)
return
}
c := newConnection(ws, sm.hub)
sm.hub.register <- c
go c.writePump()
c.readPump()
}
func (s *StreamManager) GetStreamList() m.StreamList {
list := make(m.StreamList, 0)
for _, stream := range s.streams {
@ -34,7 +63,7 @@ func (s *StreamManagerImpl) GetStreamList() m.StreamList {
return list
}
func (s *StreamManagerImpl) Push(packet *m.StreamPacket) {
func (s *StreamManager) Push(packet *m.StreamPacket) {
stream, exist := s.streams[packet.Stream]
if !exist {

@ -2,13 +2,9 @@ package main
import (
"context"
"fmt"
"net/http"
"os"
"time"
"gopkg.in/macaron.v1"
"golang.org/x/sync/errgroup"
"github.com/grafana/grafana/pkg/api"
@ -78,24 +74,9 @@ func (g *GrafanaServerImpl) Start() {
}
func (g *GrafanaServerImpl) startHttpServer() {
logger = log.New("http.server")
var err error
m := newMacaron()
api.Register(m)
listenAddr := fmt.Sprintf("%s:%s", setting.HttpAddr, setting.HttpPort)
g.log.Info("Initializing HTTP Server", "address", listenAddr, "protocol", setting.Protocol, "subUrl", setting.AppSubUrl)
switch setting.Protocol {
case setting.HTTP:
err = http.ListenAndServe(listenAddr, m)
case setting.HTTPS:
err = ListenAndServeTLS(listenAddr, setting.CertFile, setting.KeyFile, m)
default:
g.log.Error("Invalid protocol", "protocol", setting.Protocol)
g.Shutdown(1, "Startup failed")
}
httpServer := api.NewHttpServer()
err := httpServer.Start(g.context)
if err != nil {
g.log.Error("Fail to start server", "error", err)
@ -115,26 +96,6 @@ func (g *GrafanaServerImpl) Shutdown(code int, reason string) {
os.Exit(code)
}
func ListenAndServeTLS(listenAddr, certfile, keyfile string, m *macaron.Macaron) error {
if certfile == "" {
return fmt.Errorf("cert_file cannot be empty when using HTTPS")
}
if keyfile == "" {
return fmt.Errorf("cert_key cannot be empty when using HTTPS")
}
if _, err := os.Stat(setting.CertFile); os.IsNotExist(err) {
return fmt.Errorf(`Cannot find SSL cert_file at %v`, setting.CertFile)
}
if _, err := os.Stat(setting.KeyFile); os.IsNotExist(err) {
return fmt.Errorf(`Cannot find SSL key_file at %v`, setting.KeyFile)
}
return http.ListenAndServeTLS(listenAddr, setting.CertFile, setting.KeyFile, m)
}
// implement context.Context
func (g *GrafanaServerImpl) Deadline() (deadline time.Time, ok bool) {
return g.context.Deadline()

@ -1,107 +0,0 @@
// Copyright 2014 Unknwon
// Copyright 2014 Torkel Ödegaard
package main
import (
"fmt"
"net/http"
"path"
"gopkg.in/macaron.v1"
"github.com/grafana/grafana/pkg/api"
"github.com/grafana/grafana/pkg/api/static"
"github.com/grafana/grafana/pkg/log"
"github.com/grafana/grafana/pkg/middleware"
"github.com/grafana/grafana/pkg/plugins"
"github.com/grafana/grafana/pkg/setting"
)
var logger log.Logger
func newMacaron() *macaron.Macaron {
macaron.Env = setting.Env
m := macaron.New()
m.Use(middleware.Logger())
m.Use(middleware.Recovery())
if setting.EnableGzip {
m.Use(middleware.Gziper())
}
for _, route := range plugins.StaticRoutes {
pluginRoute := path.Join("/public/plugins/", route.PluginId)
logger.Debug("Plugins: Adding route", "route", pluginRoute, "dir", route.Directory)
mapStatic(m, route.Directory, "", pluginRoute)
}
mapStatic(m, setting.StaticRootPath, "", "public")
mapStatic(m, setting.StaticRootPath, "robots.txt", "robots.txt")
m.Use(macaron.Renderer(macaron.RenderOptions{
Directory: path.Join(setting.StaticRootPath, "views"),
IndentJSON: macaron.Env != macaron.PROD,
Delims: macaron.Delims{Left: "[[", Right: "]]"},
}))
m.Use(middleware.GetContextHandler())
m.Use(middleware.Sessioner(&setting.SessionOptions))
m.Use(middleware.RequestMetrics())
// needs to be after context handler
if setting.EnforceDomain {
m.Use(middleware.ValidateHostHeader(setting.Domain))
}
return m
}
func mapStatic(m *macaron.Macaron, rootDir string, dir string, prefix string) {
headers := func(c *macaron.Context) {
c.Resp.Header().Set("Cache-Control", "public, max-age=3600")
}
if setting.Env == setting.DEV {
headers = func(c *macaron.Context) {
c.Resp.Header().Set("Cache-Control", "max-age=0, must-revalidate, no-cache")
}
}
m.Use(httpstatic.Static(
path.Join(rootDir, dir),
httpstatic.StaticOptions{
SkipLogging: true,
Prefix: prefix,
AddHeaders: headers,
},
))
}
func StartServer() int {
logger = log.New("server")
var err error
m := newMacaron()
api.Register(m)
listenAddr := fmt.Sprintf("%s:%s", setting.HttpAddr, setting.HttpPort)
logger.Info("Server Listening", "address", listenAddr, "protocol", setting.Protocol, "subUrl", setting.AppSubUrl)
switch setting.Protocol {
case setting.HTTP:
err = http.ListenAndServe(listenAddr, m)
case setting.HTTPS:
err = http.ListenAndServeTLS(listenAddr, setting.CertFile, setting.KeyFile, m)
default:
logger.Error("Invalid protocol", "protocol", setting.Protocol)
return 1
}
if err != nil {
logger.Error("Fail to start server", "error", err)
return 1
}
return 0
}
Loading…
Cancel
Save