diff --git a/pkg/services/ngalert/state/historian/encode.go b/pkg/services/ngalert/state/historian/encode.go new file mode 100644 index 00000000000..e2feec2fe09 --- /dev/null +++ b/pkg/services/ngalert/state/historian/encode.go @@ -0,0 +1,106 @@ +package historian + +import ( + "encoding/json" + "fmt" + "strconv" + "strings" + + "github.com/gogo/protobuf/proto" + "github.com/golang/snappy" + "github.com/grafana/grafana/pkg/components/loki/logproto" + "github.com/prometheus/common/model" + "golang.org/x/exp/slices" +) + +type JsonEncoder struct{} + +func (e JsonEncoder) encode(s []stream) ([]byte, error) { + body := struct { + Streams []stream `json:"streams"` + }{Streams: s} + enc, err := json.Marshal(body) + if err != nil { + return nil, fmt.Errorf("failed to serialize Loki payload: %w", err) + } + return enc, nil +} + +func (e JsonEncoder) headers() map[string]string { + return map[string]string{ + "Content-Type": "application/json", + } +} + +type SnappyProtoEncoder struct{} + +func (e SnappyProtoEncoder) encode(s []stream) ([]byte, error) { + body := logproto.PushRequest{ + Streams: make([]logproto.Stream, 0, len(s)), + } + + for _, str := range s { + entries := make([]logproto.Entry, 0, len(str.Values)) + for _, sample := range str.Values { + entries = append(entries, logproto.Entry{ + Timestamp: sample.T, + Line: sample.V, + }) + } + body.Streams = append(body.Streams, logproto.Stream{ + Labels: labelsMapToString(str.Stream, ""), + Entries: entries, + // Hash seems to be mainly used for query responses. Promtail does not seem to calculate this field on push. + }) + } + + buf, err := proto.Marshal(&body) + if err != nil { + return nil, fmt.Errorf("failed to serialize Loki payload to proto: %w", err) + } + buf = snappy.Encode(nil, buf) + return buf, nil +} + +func (e SnappyProtoEncoder) headers() map[string]string { + return map[string]string{ + "Content-Type": "application/x-protobuf", + "Content-Encoding": "snappy", + } +} + +// Copied from promtail. +// Modified slightly to work in terms of plain map[string]string to avoid some unnecessary copies and type casts. +// TODO: pkg/components/loki/lokihttp/batch.go contains an older (loki 2.7.4 released) version of this. +// TODO: Consider replacing that one, with this one. +func labelsMapToString(ls map[string]string, without model.LabelName) string { + var b strings.Builder + totalSize := 2 + lstrs := make([]string, 0, len(ls)) + + for l, v := range ls { + if l == string(without) { + continue + } + + lstrs = append(lstrs, l) + // guess size increase: 2 for `, ` between labels and 3 for the `=` and quotes around label value + totalSize += len(l) + 2 + len(v) + 3 + } + + b.Grow(totalSize) + b.WriteByte('{') + slices.Sort(lstrs) + for i, l := range lstrs { + if i > 0 { + b.WriteString(", ") + } + + b.WriteString(l) + b.WriteString(`=`) + b.WriteString(strconv.Quote(ls[l])) + } + b.WriteByte('}') + + return b.String() +} diff --git a/pkg/services/ngalert/state/historian/loki_http.go b/pkg/services/ngalert/state/historian/loki_http.go index 5375fdb2164..f40b2c03ccd 100644 --- a/pkg/services/ngalert/state/historian/loki_http.go +++ b/pkg/services/ngalert/state/historian/loki_http.go @@ -25,6 +25,14 @@ func NewRequester() client.Requester { } } +// encoder serializes log streams to some byte format. +type encoder interface { + // encode serializes a set of log streams to bytes. + encode(s []stream) ([]byte, error) + // headers returns a set of HTTP-style headers that describes the encoding scheme used. + headers() map[string]string +} + type LokiConfig struct { ReadPathURL *url.URL WritePathURL *url.URL @@ -32,6 +40,7 @@ type LokiConfig struct { BasicAuthPassword string TenantID string ExternalLabels map[string]string + Encoder encoder } func NewLokiConfig(cfg setting.UnifiedAlertingStateHistorySettings) (LokiConfig, error) { @@ -65,11 +74,14 @@ func NewLokiConfig(cfg setting.UnifiedAlertingStateHistorySettings) (LokiConfig, BasicAuthUser: cfg.LokiBasicAuthUsername, BasicAuthPassword: cfg.LokiBasicAuthPassword, TenantID: cfg.LokiTenantID, + // Snappy-compressed protobuf is the default, same goes for Promtail. + Encoder: SnappyProtoEncoder{}, }, nil } type httpLokiClient struct { client client.Requester + encoder encoder cfg LokiConfig metrics *metrics.Historian log log.Logger @@ -101,6 +113,7 @@ func newLokiClient(cfg LokiConfig, req client.Requester, metrics *metrics.Histor tc := client.NewTimedClient(req, metrics.WriteDuration) return &httpLokiClient{ client: tc, + encoder: cfg.Encoder, cfg: cfg, metrics: metrics, log: logger.New("protocol", "http"), @@ -169,12 +182,9 @@ func (r *sample) UnmarshalJSON(b []byte) error { } func (c *httpLokiClient) push(ctx context.Context, s []stream) error { - body := struct { - Streams []stream `json:"streams"` - }{Streams: s} - enc, err := json.Marshal(body) + enc, err := c.encoder.encode(s) if err != nil { - return fmt.Errorf("failed to serialize Loki payload: %w", err) + return err } uri := c.cfg.WritePathURL.JoinPath("/loki/api/v1/push") @@ -184,7 +194,9 @@ func (c *httpLokiClient) push(ctx context.Context, s []stream) error { } c.setAuthAndTenantHeaders(req) - req.Header.Add("content-type", "application/json") + for k, v := range c.encoder.headers() { + req.Header.Add(k, v) + } c.metrics.BytesWritten.Add(float64(len(enc))) req = req.WithContext(ctx) diff --git a/pkg/services/ngalert/state/historian/loki_http_test.go b/pkg/services/ngalert/state/historian/loki_http_test.go index 137b84e0b49..aec8685d9b9 100644 --- a/pkg/services/ngalert/state/historian/loki_http_test.go +++ b/pkg/services/ngalert/state/historian/loki_http_test.go @@ -117,6 +117,7 @@ func TestLokiHTTPClient_Manual(t *testing.T) { client := newLokiClient(LokiConfig{ ReadPathURL: url, WritePathURL: url, + Encoder: JsonEncoder{}, }, NewRequester(), metrics.NewHistorianMetrics(prometheus.NewRegistry()), log.NewNopLogger()) // Unauthorized request should fail against Grafana Cloud. @@ -144,6 +145,7 @@ func TestLokiHTTPClient_Manual(t *testing.T) { WritePathURL: url, BasicAuthUser: "", BasicAuthPassword: "", + Encoder: JsonEncoder{}, }, NewRequester(), metrics.NewHistorianMetrics(prometheus.NewRegistry()), log.NewNopLogger()) // When running on prem, you might need to set the tenant id, @@ -259,6 +261,7 @@ func createTestLokiClient(req client.Requester) *httpLokiClient { cfg := LokiConfig{ WritePathURL: url, ReadPathURL: url, + Encoder: JsonEncoder{}, } met := metrics.NewHistorianMetrics(prometheus.NewRegistry()) return newLokiClient(cfg, req, met, log.NewNopLogger()) diff --git a/pkg/services/ngalert/state/historian/loki_test.go b/pkg/services/ngalert/state/historian/loki_test.go index 6b848cd56de..a0883460d6b 100644 --- a/pkg/services/ngalert/state/historian/loki_test.go +++ b/pkg/services/ngalert/state/historian/loki_test.go @@ -353,6 +353,7 @@ func createTestLokiBackend(req client.Requester, met *metrics.Historian) *Remote cfg := LokiConfig{ WritePathURL: url, ReadPathURL: url, + Encoder: JsonEncoder{}, } return NewRemoteLokiBackend(cfg, req, met) }