The open and composable observability and data visualization platform. Visualize metrics, logs, and traces from multiple sources like Prometheus, Loki, Elasticsearch, InfluxDB, Postgres and many more.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
grafana/pkg/tsdb/loki/streaming.go

200 lines
4.6 KiB

package loki
import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/url"
"os"
"os/signal"
"strings"
"time"
"github.com/gorilla/websocket"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
)
func (s *Service) SubscribeStream(_ context.Context, req *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) {
dsInfo, err := s.getDSInfo(req.PluginContext)
if err != nil {
return &backend.SubscribeStreamResponse{
Status: backend.SubscribeStreamStatusNotFound,
}, err
}
// Expect tail/${key}
if !strings.HasPrefix(req.Path, "tail/") {
return &backend.SubscribeStreamResponse{
Status: backend.SubscribeStreamStatusNotFound,
}, fmt.Errorf("expected tail in channel path")
}
query, err := parseQueryModel(req.Data)
if err != nil {
return nil, err
}
if query.Expr == "" {
return &backend.SubscribeStreamResponse{
Status: backend.SubscribeStreamStatusNotFound,
}, fmt.Errorf("missing expr in channel (subscribe)")
}
dsInfo.streamsMu.RLock()
defer dsInfo.streamsMu.RUnlock()
cache, ok := dsInfo.streams[req.Path]
if ok {
msg, err := backend.NewInitialData(cache.Bytes(data.IncludeAll))
return &backend.SubscribeStreamResponse{
Status: backend.SubscribeStreamStatusOK,
InitialData: msg,
}, err
}
// nothing yet
return &backend.SubscribeStreamResponse{
Status: backend.SubscribeStreamStatusOK,
}, err
}
// Single instance for each channel (results are shared with all listeners)
func (s *Service) RunStream(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender) error {
dsInfo, err := s.getDSInfo(req.PluginContext)
if err != nil {
return err
}
query, err := parseQueryModel(req.Data)
if err != nil {
return err
}
if query.Expr == "" {
return fmt.Errorf("missing expr in cuannel")
}
count := int64(0)
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
params := url.Values{}
params.Add("query", query.Expr)
isV1 := false
wsurl, _ := url.Parse(dsInfo.URL)
// Check if the v2alpha endpoint exists
wsurl.Path = "/loki/api/v2alpha/tail"
if !is400(dsInfo.HTTPClient, wsurl) {
isV1 = true
wsurl.Path = "/loki/api/v1/tail"
}
if wsurl.Scheme == "https" {
wsurl.Scheme = "wss"
} else {
wsurl.Scheme = "ws"
}
wsurl.RawQuery = params.Encode()
s.plog.Info("connecting to websocket", "url", wsurl)
c, r, err := websocket.DefaultDialer.Dial(wsurl.String(), nil)
if err != nil {
s.plog.Error("error connecting to websocket", "err", err)
return fmt.Errorf("error connecting to websocket")
}
defer func() {
dsInfo.streamsMu.Lock()
delete(dsInfo.streams, req.Path)
dsInfo.streamsMu.Unlock()
if r != nil {
_ = r.Body.Close()
}
err = c.Close()
s.plog.Error("closing loki websocket", "err", err)
}()
prev := data.FrameJSONCache{}
// Read all messages
done := make(chan struct{})
go func() {
defer close(done)
for {
_, message, err := c.ReadMessage()
if err != nil {
s.plog.Error("websocket read:", "err", err)
return
}
frame := &data.Frame{}
if isV1 {
frame, err = lokiBytesToLabeledFrame(message)
} else {
err = json.Unmarshal(message, &frame)
}
if err == nil && frame != nil {
next, _ := data.FrameToJSONCache(frame)
if next.SameSchema(&prev) {
err = sender.SendBytes(next.Bytes(data.IncludeDataOnly))
} else {
err = sender.SendFrame(frame, data.IncludeAll)
}
prev = next
// Cache the initial data
dsInfo.streamsMu.Lock()
dsInfo.streams[req.Path] = prev
dsInfo.streamsMu.Unlock()
}
if err != nil {
s.plog.Error("websocket write:", "err", err, "raw", message)
return
}
}
}()
ticker := time.NewTicker(time.Second * 60) //.Step)
defer ticker.Stop()
for {
select {
case <-done:
s.plog.Info("socket done")
return nil
case <-ctx.Done():
s.plog.Info("stop streaming (context canceled)")
return nil
case t := <-ticker.C:
count++
s.plog.Error("loki websocket ping?", "time", t, "count", count)
}
}
}
func (s *Service) PublishStream(_ context.Context, _ *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) {
return &backend.PublishStreamResponse{
Status: backend.PublishStreamStatusPermissionDenied,
}, nil
}
// if the v2 endpoint exists it will give a 400 rather than 404/500
func is400(client *http.Client, url *url.URL) bool {
req, err := http.NewRequest("GET", url.String(), nil)
if err != nil {
return false
}
rsp, err := client.Do(req)
if err != nil {
return false
}
defer func() {
_ = rsp.Body.Close()
}()
return rsp.StatusCode == 400 // will be true
}