|
|
|
@ -158,7 +158,7 @@ func ProvideService(plugCtxProvider *plugincontext.Provider, cfg *setting.Cfg, r |
|
|
|
|
redisClient := redis.NewClient(&redis.Options{ |
|
|
|
|
Addr: g.Cfg.LiveHAEngineAddress, |
|
|
|
|
}) |
|
|
|
|
cmd := redisClient.Ping(context.TODO()) |
|
|
|
|
cmd := redisClient.Ping(context.Background()) |
|
|
|
|
if _, err := cmd.Result(); err != nil { |
|
|
|
|
return nil, fmt.Errorf("error pinging Redis: %v", err) |
|
|
|
|
} |
|
|
|
@ -206,7 +206,7 @@ func ProvideService(plugCtxProvider *plugincontext.Provider, cfg *setting.Cfg, r |
|
|
|
|
// This can be unreasonable to have in production scenario with many
|
|
|
|
|
// organizations.
|
|
|
|
|
orgQuery := &models.SearchOrgsQuery{} |
|
|
|
|
err := sqlstore.SearchOrgs(context.TODO(), orgQuery) |
|
|
|
|
err := sqlstore.SearchOrgs(context.Background(), orgQuery) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, fmt.Errorf("can't get org list: %w", err) |
|
|
|
|
} |
|
|
|
@ -278,7 +278,7 @@ func ProvideService(plugCtxProvider *plugincontext.Provider, cfg *setting.Cfg, r |
|
|
|
|
// Called when client subscribes to the channel.
|
|
|
|
|
client.OnSubscribe(func(e centrifuge.SubscribeEvent, cb centrifuge.SubscribeCallback) { |
|
|
|
|
err := runConcurrentlyIfNeeded(client.Context(), semaphore, func() { |
|
|
|
|
cb(g.handleOnSubscribe(client, e)) |
|
|
|
|
cb(g.handleOnSubscribe(context.Background(), client, e)) |
|
|
|
|
}) |
|
|
|
|
if err != nil { |
|
|
|
|
cb(centrifuge.SubscribeReply{}, err) |
|
|
|
@ -290,7 +290,7 @@ func ProvideService(plugCtxProvider *plugincontext.Provider, cfg *setting.Cfg, r |
|
|
|
|
// allows some simple prototypes to work quickly.
|
|
|
|
|
client.OnPublish(func(e centrifuge.PublishEvent, cb centrifuge.PublishCallback) { |
|
|
|
|
err := runConcurrentlyIfNeeded(client.Context(), semaphore, func() { |
|
|
|
|
cb(g.handleOnPublish(client, e)) |
|
|
|
|
cb(g.handleOnPublish(context.Background(), client, e)) |
|
|
|
|
}) |
|
|
|
|
if err != nil { |
|
|
|
|
cb(centrifuge.PublishReply{}, err) |
|
|
|
@ -429,8 +429,8 @@ type GrafanaLive struct { |
|
|
|
|
usageStats usageStats |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (g *GrafanaLive) getStreamPlugin(pluginID string) (backend.StreamHandler, error) { |
|
|
|
|
plugin, exists := g.pluginStore.Plugin(context.TODO(), pluginID) |
|
|
|
|
func (g *GrafanaLive) getStreamPlugin(ctx context.Context, pluginID string) (backend.StreamHandler, error) { |
|
|
|
|
plugin, exists := g.pluginStore.Plugin(ctx, pluginID) |
|
|
|
|
if !exists { |
|
|
|
|
return nil, fmt.Errorf("plugin not found: %s", pluginID) |
|
|
|
|
} |
|
|
|
@ -604,7 +604,7 @@ func (g *GrafanaLive) handleOnRPC(client *centrifuge.Client, e centrifuge.RPCEve |
|
|
|
|
}, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (g *GrafanaLive) handleOnSubscribe(client *centrifuge.Client, e centrifuge.SubscribeEvent) (centrifuge.SubscribeReply, error) { |
|
|
|
|
func (g *GrafanaLive) handleOnSubscribe(ctx context.Context, client *centrifuge.Client, e centrifuge.SubscribeEvent) (centrifuge.SubscribeReply, error) { |
|
|
|
|
logger.Debug("Client wants to subscribe", "user", client.UserID(), "client", client.ID(), "channel", e.Channel) |
|
|
|
|
|
|
|
|
|
user, ok := livecontext.GetContextSignedUser(client.Context()) |
|
|
|
@ -668,7 +668,7 @@ func (g *GrafanaLive) handleOnSubscribe(client *centrifuge.Client, e centrifuge. |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if !ruleFound { |
|
|
|
|
handler, addr, err := g.GetChannelHandler(user, channel) |
|
|
|
|
handler, addr, err := g.GetChannelHandler(ctx, user, channel) |
|
|
|
|
if err != nil { |
|
|
|
|
if errors.Is(err, live.ErrInvalidChannelID) { |
|
|
|
|
logger.Info("Invalid channel ID", "user", client.UserID(), "client", client.ID(), "channel", e.Channel) |
|
|
|
@ -704,7 +704,7 @@ func (g *GrafanaLive) handleOnSubscribe(client *centrifuge.Client, e centrifuge. |
|
|
|
|
}, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (g *GrafanaLive) handleOnPublish(client *centrifuge.Client, e centrifuge.PublishEvent) (centrifuge.PublishReply, error) { |
|
|
|
|
func (g *GrafanaLive) handleOnPublish(ctx context.Context, client *centrifuge.Client, e centrifuge.PublishEvent) (centrifuge.PublishReply, error) { |
|
|
|
|
logger.Debug("Client wants to publish", "user", client.UserID(), "client", client.ID(), "channel", e.Channel) |
|
|
|
|
|
|
|
|
|
user, ok := livecontext.GetContextSignedUser(client.Context()) |
|
|
|
@ -761,7 +761,7 @@ func (g *GrafanaLive) handleOnPublish(client *centrifuge.Client, e centrifuge.Pu |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
handler, addr, err := g.GetChannelHandler(user, channel) |
|
|
|
|
handler, addr, err := g.GetChannelHandler(ctx, user, channel) |
|
|
|
|
if err != nil { |
|
|
|
|
if errors.Is(err, live.ErrInvalidChannelID) { |
|
|
|
|
logger.Info("Invalid channel ID", "user", client.UserID(), "client", client.ID(), "channel", e.Channel) |
|
|
|
@ -831,7 +831,7 @@ func publishStatusToHTTPError(status backend.PublishStreamStatus) (int, string) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// GetChannelHandler gives thread-safe access to the channel.
|
|
|
|
|
func (g *GrafanaLive) GetChannelHandler(user *models.SignedInUser, channel string) (models.ChannelHandler, live.Channel, error) { |
|
|
|
|
func (g *GrafanaLive) GetChannelHandler(ctx context.Context, user *models.SignedInUser, channel string) (models.ChannelHandler, live.Channel, error) { |
|
|
|
|
// Parse the identifier ${scope}/${namespace}/${path}
|
|
|
|
|
addr, err := live.ParseChannel(channel) |
|
|
|
|
if err != nil { |
|
|
|
@ -854,7 +854,7 @@ func (g *GrafanaLive) GetChannelHandler(user *models.SignedInUser, channel strin |
|
|
|
|
return c, addr, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
getter, err := g.GetChannelHandlerFactory(user, addr.Scope, addr.Namespace) |
|
|
|
|
getter, err := g.GetChannelHandlerFactory(ctx, user, addr.Scope, addr.Namespace) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, addr, fmt.Errorf("error getting channel handler factory: %w", err) |
|
|
|
|
} |
|
|
|
@ -872,14 +872,14 @@ func (g *GrafanaLive) GetChannelHandler(user *models.SignedInUser, channel strin |
|
|
|
|
|
|
|
|
|
// GetChannelHandlerFactory gets a ChannelHandlerFactory for a namespace.
|
|
|
|
|
// It gives thread-safe access to the channel.
|
|
|
|
|
func (g *GrafanaLive) GetChannelHandlerFactory(user *models.SignedInUser, scope string, namespace string) (models.ChannelHandlerFactory, error) { |
|
|
|
|
func (g *GrafanaLive) GetChannelHandlerFactory(ctx context.Context, user *models.SignedInUser, scope string, namespace string) (models.ChannelHandlerFactory, error) { |
|
|
|
|
switch scope { |
|
|
|
|
case live.ScopeGrafana: |
|
|
|
|
return g.handleGrafanaScope(user, namespace) |
|
|
|
|
case live.ScopePlugin: |
|
|
|
|
return g.handlePluginScope(user, namespace) |
|
|
|
|
return g.handlePluginScope(ctx, user, namespace) |
|
|
|
|
case live.ScopeDatasource: |
|
|
|
|
return g.handleDatasourceScope(user, namespace) |
|
|
|
|
return g.handleDatasourceScope(ctx, user, namespace) |
|
|
|
|
case live.ScopeStream: |
|
|
|
|
return g.handleStreamScope(user, namespace) |
|
|
|
|
default: |
|
|
|
@ -894,7 +894,7 @@ func (g *GrafanaLive) handleGrafanaScope(_ *models.SignedInUser, namespace strin |
|
|
|
|
return nil, fmt.Errorf("unknown feature: %q", namespace) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (g *GrafanaLive) handlePluginScope(_ *models.SignedInUser, namespace string) (models.ChannelHandlerFactory, error) { |
|
|
|
|
func (g *GrafanaLive) handlePluginScope(ctx context.Context, _ *models.SignedInUser, namespace string) (models.ChannelHandlerFactory, error) { |
|
|
|
|
// Temporary hack until we have a more generic solution later on
|
|
|
|
|
if namespace == "cloudwatch" { |
|
|
|
|
return &cloudwatch.LogQueryRunnerSupplier{ |
|
|
|
@ -902,7 +902,7 @@ func (g *GrafanaLive) handlePluginScope(_ *models.SignedInUser, namespace string |
|
|
|
|
Service: g.LogsService, |
|
|
|
|
}, nil |
|
|
|
|
} |
|
|
|
|
streamHandler, err := g.getStreamPlugin(namespace) |
|
|
|
|
streamHandler, err := g.getStreamPlugin(ctx, namespace) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, fmt.Errorf("can't find stream plugin: %s", namespace) |
|
|
|
|
} |
|
|
|
@ -919,12 +919,12 @@ func (g *GrafanaLive) handleStreamScope(u *models.SignedInUser, namespace string |
|
|
|
|
return g.ManagedStreamRunner.GetOrCreateStream(u.OrgId, live.ScopeStream, namespace) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (g *GrafanaLive) handleDatasourceScope(user *models.SignedInUser, namespace string) (models.ChannelHandlerFactory, error) { |
|
|
|
|
ds, err := g.DataSourceCache.GetDatasourceByUID(context.TODO(), namespace, user, false) |
|
|
|
|
func (g *GrafanaLive) handleDatasourceScope(ctx context.Context, user *models.SignedInUser, namespace string) (models.ChannelHandlerFactory, error) { |
|
|
|
|
ds, err := g.DataSourceCache.GetDatasourceByUID(ctx, namespace, user, false) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, fmt.Errorf("error getting datasource: %w", err) |
|
|
|
|
} |
|
|
|
|
streamHandler, err := g.getStreamPlugin(ds.Type) |
|
|
|
|
streamHandler, err := g.getStreamPlugin(ctx, ds.Type) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, fmt.Errorf("can't find stream plugin: %s", ds.Type) |
|
|
|
|
} |
|
|
|
@ -996,7 +996,7 @@ func (g *GrafanaLive) HandleHTTPPublish(ctx *models.ReqContext) response.Respons |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
channelHandler, addr, err := g.GetChannelHandler(ctx.SignedInUser, cmd.Channel) |
|
|
|
|
channelHandler, addr, err := g.GetChannelHandler(ctx.Req.Context(), ctx.SignedInUser, cmd.Channel) |
|
|
|
|
if err != nil { |
|
|
|
|
logger.Error("Error getting channels handler", "error", err, "channel", cmd.Channel) |
|
|
|
|
return response.Error(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError), nil) |
|
|
|
|