The open and composable observability and data visualization platform. Visualize metrics, logs, and traces from multiple sources like Prometheus, Loki, Elasticsearch, InfluxDB, Postgres and many more.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
grafana/pkg/services/live/pipeline/frame_output_loki.go

144 lines
3.5 KiB

package pipeline
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"sync"
"time"
"github.com/grafana/grafana-plugin-sdk-go/data"
)
const lokiFlushInterval = 15 * time.Second
// LokiFrameOutput can output frame encoded to JSON to Loki.
type LokiFrameOutput struct {
lokiWriter *lokiWriter
}
func NewLokiFrameOutput(endpoint string, basicAuth *BasicAuth) *LokiFrameOutput {
return &LokiFrameOutput{
lokiWriter: newLokiWriter(endpoint, basicAuth),
}
}
const FrameOutputTypeLoki = "loki"
func (out *LokiFrameOutput) Type() string {
return FrameOutputTypeLoki
}
type LokiStreamsEntry struct {
Streams []LokiStream `json:"streams"`
}
type LokiStream struct {
Stream map[string]string `json:"stream"`
Values []interface{} `json:"values"`
}
func (out *LokiFrameOutput) OutputFrame(_ context.Context, vars Vars, frame *data.Frame) ([]*ChannelFrame, error) {
if out.lokiWriter.endpoint == "" {
logger.Debug("Skip sending to Loki: no url")
return nil, nil
}
frameJSON, err := data.FrameToJSON(frame, data.IncludeAll)
if err != nil {
return nil, err
}
err = out.lokiWriter.write(LokiStream{
Stream: map[string]string{"frame": frame.Name, "channel": vars.Channel},
Values: []interface{}{
[]interface{}{time.Now().UnixNano(), string(frameJSON)},
},
})
return nil, err
}
type lokiWriter struct {
mu sync.RWMutex
httpClient *http.Client
buffer []LokiStream
// Endpoint to send streaming frames to.
endpoint string
basicAuth *BasicAuth
}
func newLokiWriter(endpoint string, basicAuth *BasicAuth) *lokiWriter {
w := &lokiWriter{
endpoint: endpoint,
basicAuth: basicAuth,
httpClient: &http.Client{
Timeout: 2 * time.Second,
},
}
go w.flushPeriodically()
return w
}
func (w *lokiWriter) flushPeriodically() {
for range time.NewTicker(lokiFlushInterval).C {
w.mu.Lock()
if len(w.buffer) == 0 {
w.mu.Unlock()
continue
}
tmpBuffer := make([]LokiStream, len(w.buffer))
copy(tmpBuffer, w.buffer)
w.buffer = nil
w.mu.Unlock()
err := w.flush(tmpBuffer)
if err != nil {
logger.Error("Error flush to Loki", "error", err)
w.mu.Lock()
// TODO: drop in case of large buffer size? Make several attempts only?
w.buffer = append(tmpBuffer, w.buffer...)
w.mu.Unlock()
}
}
}
func (w *lokiWriter) write(s LokiStream) error {
w.mu.Lock()
w.buffer = append(w.buffer, s)
w.mu.Unlock()
return nil
}
func (w *lokiWriter) flush(streams []LokiStream) error {
logger.Debug("Loki flush", "numStreams", len(streams))
writeData, err := json.Marshal(LokiStreamsEntry{
Streams: streams,
})
if err != nil {
return fmt.Errorf("error converting Loki stream entry to bytes: %v", err)
}
logger.Debug("Sending to Loki endpoint", "url", w.endpoint, "bodyLength", len(writeData))
req, err := http.NewRequest(http.MethodPost, w.endpoint, bytes.NewReader(writeData))
if err != nil {
return fmt.Errorf("error constructing loki push request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
if w.basicAuth != nil {
req.SetBasicAuth(w.basicAuth.User, w.basicAuth.Password)
}
started := time.Now()
resp, err := w.httpClient.Do(req)
if err != nil {
return fmt.Errorf("error sending to Loki: %w", err)
}
_ = resp.Body.Close()
if resp.StatusCode != http.StatusNoContent {
logger.Error("Unexpected response code from Loki endpoint", "code", resp.StatusCode)
return errors.New("unexpected response code Loki endpoint")
}
logger.Debug("Successfully sent to Loki", "elapsed", time.Since(started))
return nil
}