Live: client connection concurrency (#33642)

pull/33656/head
Alexander Emelin 4 years ago committed by GitHub
parent 806761fe70
commit fa866f1154
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 201
      pkg/services/live/live.go
  2. 52
      pkg/services/live/live_test.go

@ -8,11 +8,10 @@ import (
"sync" "sync"
"time" "time"
"github.com/grafana/grafana/pkg/infra/localcache"
"github.com/grafana/grafana/pkg/api/dtos" "github.com/grafana/grafana/pkg/api/dtos"
"github.com/grafana/grafana/pkg/api/response" "github.com/grafana/grafana/pkg/api/response"
"github.com/grafana/grafana/pkg/api/routing" "github.com/grafana/grafana/pkg/api/routing"
"github.com/grafana/grafana/pkg/infra/localcache"
"github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/middleware" "github.com/grafana/grafana/pkg/middleware"
"github.com/grafana/grafana/pkg/models" "github.com/grafana/grafana/pkg/models"
@ -130,6 +129,8 @@ func (g *GrafanaLive) Run(ctx context.Context) error {
return nil return nil
} }
var clientConcurrency = 8
// Init initializes Live service. // Init initializes Live service.
// Required to implement the registry.Service interface. // Required to implement the registry.Service interface.
func (g *GrafanaLive) Init() error { func (g *GrafanaLive) Init() error {
@ -179,103 +180,33 @@ func (g *GrafanaLive) Init() error {
// different goroutines (belonging to different client connections). This is also // different goroutines (belonging to different client connections). This is also
// true for other event handlers. // true for other event handlers.
node.OnConnect(func(client *centrifuge.Client) { node.OnConnect(func(client *centrifuge.Client) {
var semaphore chan struct{}
if clientConcurrency > 1 {
semaphore = make(chan struct{}, clientConcurrency)
}
logger.Debug("Client connected", "user", client.UserID(), "client", client.ID()) logger.Debug("Client connected", "user", client.UserID(), "client", client.ID())
connectedAt := time.Now() connectedAt := time.Now()
// Called when client subscribes to the channel.
client.OnSubscribe(func(e centrifuge.SubscribeEvent, cb centrifuge.SubscribeCallback) { client.OnSubscribe(func(e centrifuge.SubscribeEvent, cb centrifuge.SubscribeCallback) {
logger.Debug("Client wants to subscribe", "user", client.UserID(), "client", client.ID(), "channel", e.Channel) err := runConcurrentlyIfNeeded(client.Context(), semaphore, func() {
user, ok := livecontext.GetContextSignedUser(client.Context()) cb(g.handleOnSubscribe(client, e))
if !ok {
logger.Error("No user found in context", "user", client.UserID(), "client", client.ID(), "channel", e.Channel)
cb(centrifuge.SubscribeReply{}, centrifuge.ErrorInternal)
return
}
handler, addr, err := g.GetChannelHandler(user, e.Channel)
if err != nil {
logger.Error("Error getting channel handler", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err)
cb(centrifuge.SubscribeReply{}, centrifuge.ErrorInternal)
return
}
reply, status, err := handler.OnSubscribe(client.Context(), user, models.SubscribeEvent{
Channel: e.Channel,
Path: addr.Path,
}) })
if err != nil { if err != nil {
logger.Error("Error calling channel handler subscribe", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err) cb(centrifuge.SubscribeReply{}, err)
cb(centrifuge.SubscribeReply{}, centrifuge.ErrorInternal)
return
}
if status != backend.SubscribeStreamStatusOK {
// using HTTP error codes for WS errors too.
code, text := subscribeStatusToHTTPError(status)
logger.Debug("Return custom subscribe error", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "code", code)
cb(centrifuge.SubscribeReply{}, &centrifuge.Error{Code: uint32(code), Message: text})
return
} }
logger.Debug("Client subscribed", "user", client.UserID(), "client", client.ID(), "channel", e.Channel)
cb(centrifuge.SubscribeReply{
Options: centrifuge.SubscribeOptions{
Presence: reply.Presence,
JoinLeave: reply.JoinLeave,
Recover: reply.Recover,
Data: reply.Data,
},
}, nil)
}) })
// Called when a client publishes to the websocket channel. // Called when a client publishes to the channel.
// In general, we should prefer writing to the HTTP API, but this // In general, we should prefer writing to the HTTP API, but this
// allows some simple prototypes to work quickly. // allows some simple prototypes to work quickly.
client.OnPublish(func(e centrifuge.PublishEvent, cb centrifuge.PublishCallback) { client.OnPublish(func(e centrifuge.PublishEvent, cb centrifuge.PublishCallback) {
logger.Debug("Client wants to publish", "user", client.UserID(), "client", client.ID(), "channel", e.Channel) err := runConcurrentlyIfNeeded(client.Context(), semaphore, func() {
user, ok := livecontext.GetContextSignedUser(client.Context()) cb(g.handleOnPublish(client, e))
if !ok {
logger.Error("No user found in context", "user", client.UserID(), "client", client.ID(), "channel", e.Channel)
cb(centrifuge.PublishReply{}, centrifuge.ErrorInternal)
return
}
handler, addr, err := g.GetChannelHandler(user, e.Channel)
if err != nil {
logger.Error("Error getting channel handler", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err)
cb(centrifuge.PublishReply{}, centrifuge.ErrorInternal)
return
}
reply, status, err := handler.OnPublish(client.Context(), user, models.PublishEvent{
Channel: e.Channel,
Path: addr.Path,
Data: e.Data,
}) })
if err != nil { if err != nil {
logger.Error("Error calling channel handler publish", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err) cb(centrifuge.PublishReply{}, err)
cb(centrifuge.PublishReply{}, centrifuge.ErrorInternal)
return
}
if status != backend.PublishStreamStatusOK {
// using HTTP error codes for WS errors too.
code, text := publishStatusToHTTPError(status)
logger.Debug("Return custom publish error", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "code", code)
cb(centrifuge.PublishReply{}, &centrifuge.Error{Code: uint32(code), Message: text})
return
} }
centrifugeReply := centrifuge.PublishReply{
Options: centrifuge.PublishOptions{
HistorySize: reply.HistorySize,
HistoryTTL: reply.HistoryTTL,
},
}
if reply.Data != nil {
// If data is not nil then we published it manually and tell Centrifuge
// publication result so Centrifuge won't publish itself.
result, err := g.node.Publish(e.Channel, reply.Data)
if err != nil {
logger.Error("Error publishing", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err, "data", string(reply.Data))
cb(centrifuge.PublishReply{}, centrifuge.ErrorInternal)
return
}
centrifugeReply.Result = &result
}
logger.Debug("Publication successful", "user", client.UserID(), "client", client.ID(), "channel", e.Channel)
cb(centrifugeReply, nil)
}) })
client.OnDisconnect(func(e centrifuge.DisconnectEvent) { client.OnDisconnect(func(e centrifuge.DisconnectEvent) {
@ -338,6 +269,108 @@ func (g *GrafanaLive) Init() error {
return nil return nil
} }
func runConcurrentlyIfNeeded(ctx context.Context, semaphore chan struct{}, fn func()) error {
if cap(semaphore) > 1 {
select {
case semaphore <- struct{}{}:
case <-ctx.Done():
return ctx.Err()
}
go func() {
defer func() { <-semaphore }()
fn()
}()
} else {
// No need in separate goroutines.
fn()
}
return nil
}
func (g *GrafanaLive) handleOnSubscribe(client *centrifuge.Client, e centrifuge.SubscribeEvent) (centrifuge.SubscribeReply, error) {
logger.Debug("Client wants to subscribe", "user", client.UserID(), "client", client.ID(), "channel", e.Channel)
user, ok := livecontext.GetContextSignedUser(client.Context())
if !ok {
logger.Error("No user found in context", "user", client.UserID(), "client", client.ID(), "channel", e.Channel)
return centrifuge.SubscribeReply{}, centrifuge.ErrorInternal
}
handler, addr, err := g.GetChannelHandler(user, e.Channel)
if err != nil {
logger.Error("Error getting channel handler", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err)
return centrifuge.SubscribeReply{}, centrifuge.ErrorInternal
}
reply, status, err := handler.OnSubscribe(client.Context(), user, models.SubscribeEvent{
Channel: e.Channel,
Path: addr.Path,
})
if err != nil {
logger.Error("Error calling channel handler subscribe", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err)
return centrifuge.SubscribeReply{}, centrifuge.ErrorInternal
}
if status != backend.SubscribeStreamStatusOK {
// using HTTP error codes for WS errors too.
code, text := subscribeStatusToHTTPError(status)
logger.Debug("Return custom subscribe error", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "code", code)
return centrifuge.SubscribeReply{}, &centrifuge.Error{Code: uint32(code), Message: text}
}
logger.Debug("Client subscribed", "user", client.UserID(), "client", client.ID(), "channel", e.Channel)
return centrifuge.SubscribeReply{
Options: centrifuge.SubscribeOptions{
Presence: reply.Presence,
JoinLeave: reply.JoinLeave,
Recover: reply.Recover,
Data: reply.Data,
},
}, nil
}
func (g *GrafanaLive) handleOnPublish(client *centrifuge.Client, e centrifuge.PublishEvent) (centrifuge.PublishReply, error) {
logger.Debug("Client wants to publish", "user", client.UserID(), "client", client.ID(), "channel", e.Channel)
user, ok := livecontext.GetContextSignedUser(client.Context())
if !ok {
logger.Error("No user found in context", "user", client.UserID(), "client", client.ID(), "channel", e.Channel)
return centrifuge.PublishReply{}, centrifuge.ErrorInternal
}
handler, addr, err := g.GetChannelHandler(user, e.Channel)
if err != nil {
logger.Error("Error getting channel handler", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err)
return centrifuge.PublishReply{}, centrifuge.ErrorInternal
}
reply, status, err := handler.OnPublish(client.Context(), user, models.PublishEvent{
Channel: e.Channel,
Path: addr.Path,
Data: e.Data,
})
if err != nil {
logger.Error("Error calling channel handler publish", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err)
return centrifuge.PublishReply{}, centrifuge.ErrorInternal
}
if status != backend.PublishStreamStatusOK {
// using HTTP error codes for WS errors too.
code, text := publishStatusToHTTPError(status)
logger.Debug("Return custom publish error", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "code", code)
return centrifuge.PublishReply{}, &centrifuge.Error{Code: uint32(code), Message: text}
}
centrifugeReply := centrifuge.PublishReply{
Options: centrifuge.PublishOptions{
HistorySize: reply.HistorySize,
HistoryTTL: reply.HistoryTTL,
},
}
if reply.Data != nil {
// If data is not nil then we published it manually and tell Centrifuge
// publication result so Centrifuge won't publish itself.
result, err := g.node.Publish(e.Channel, reply.Data)
if err != nil {
logger.Error("Error publishing", "user", client.UserID(), "client", client.ID(), "channel", e.Channel, "error", err, "data", string(reply.Data))
return centrifuge.PublishReply{}, centrifuge.ErrorInternal
}
centrifugeReply.Result = &result
}
logger.Debug("Publication successful", "user", client.UserID(), "client", client.ID(), "channel", e.Channel)
return centrifugeReply, nil
}
func subscribeStatusToHTTPError(status backend.SubscribeStreamStatus) (int, string) { func subscribeStatusToHTTPError(status backend.SubscribeStreamStatus) (int, string) {
switch status { switch status {
case backend.SubscribeStreamStatusNotFound: case backend.SubscribeStreamStatusNotFound:

@ -0,0 +1,52 @@
package live
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/require"
)
func Test_runConcurrentlyIfNeeded_Concurrent(t *testing.T) {
doneCh := make(chan struct{})
f := func() {
close(doneCh)
}
semaphore := make(chan struct{}, 2)
err := runConcurrentlyIfNeeded(context.Background(), semaphore, f)
require.NoError(t, err)
select {
case <-doneCh:
case <-time.After(time.Second):
t.Fatal("timeout waiting for function execution")
}
}
func Test_runConcurrentlyIfNeeded_NoConcurrency(t *testing.T) {
doneCh := make(chan struct{})
f := func() {
close(doneCh)
}
err := runConcurrentlyIfNeeded(context.Background(), nil, f)
require.NoError(t, err)
select {
case <-doneCh:
case <-time.After(time.Second):
t.Fatal("timeout waiting for function execution")
}
}
func Test_runConcurrentlyIfNeeded_DeadlineExceeded(t *testing.T) {
f := func() {}
semaphore := make(chan struct{}, 2)
semaphore <- struct{}{}
semaphore <- struct{}{}
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(-time.Second))
defer cancel()
err := runConcurrentlyIfNeeded(ctx, semaphore, f)
require.ErrorIs(t, err, context.DeadlineExceeded)
}
Loading…
Cancel
Save