The open and composable observability and data visualization platform. Visualize metrics, logs, and traces from multiple sources like Prometheus, Loki, Elasticsearch, InfluxDB, Postgres and many more.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
grafana/pkg/services/ngalert/state/historian/loki.go

146 lines
4.2 KiB

package historian
import (
"context"
"encoding/json"
"fmt"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/ngalert/eval"
"github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/grafana/grafana/pkg/services/ngalert/state"
history_model "github.com/grafana/grafana/pkg/services/ngalert/state/historian/model"
)
const (
OrgIDLabel = "orgID"
RuleUIDLabel = "ruleUID"
GroupLabel = "group"
FolderUIDLabel = "folderUID"
)
type remoteLokiClient interface {
ping(context.Context) error
push(context.Context, []stream) error
}
type RemoteLokiBackend struct {
client remoteLokiClient
externalLabels map[string]string
log log.Logger
}
func NewRemoteLokiBackend(cfg LokiConfig) *RemoteLokiBackend {
logger := log.New("ngalert.state.historian", "backend", "loki")
return &RemoteLokiBackend{
client: newLokiClient(cfg, logger),
externalLabels: cfg.ExternalLabels,
log: logger,
}
}
func (h *RemoteLokiBackend) TestConnection(ctx context.Context) error {
return h.client.ping(ctx)
}
func (h *RemoteLokiBackend) RecordStatesAsync(ctx context.Context, rule history_model.RuleMeta, states []state.StateTransition) <-chan error {
logger := h.log.FromContext(ctx)
streams := statesToStreams(rule, states, h.externalLabels, logger)
errCh := make(chan error, 1)
go func() {
defer close(errCh)
if err := h.recordStreams(ctx, streams, logger); err != nil {
logger.Error("Failed to save alert state history batch", "error", err)
errCh <- fmt.Errorf("failed to save alert state history batch: %w", err)
}
}()
return errCh
}
func (h *RemoteLokiBackend) QueryStates(ctx context.Context, query models.HistoryQuery) (*data.Frame, error) {
return data.NewFrame("states"), nil
}
func statesToStreams(rule history_model.RuleMeta, states []state.StateTransition, externalLabels map[string]string, logger log.Logger) []stream {
buckets := make(map[string][]row) // label repr -> entries
for _, state := range states {
if !shouldRecord(state) {
continue
}
labels := mergeLabels(removePrivateLabels(state.State.Labels), externalLabels)
labels[OrgIDLabel] = fmt.Sprint(rule.OrgID)
labels[RuleUIDLabel] = fmt.Sprint(rule.UID)
labels[GroupLabel] = fmt.Sprint(rule.Group)
labels[FolderUIDLabel] = fmt.Sprint(rule.NamespaceUID)
repr := labels.String()
entry := lokiEntry{
SchemaVersion: 1,
Previous: state.PreviousFormatted(),
Current: state.Formatted(),
Values: valuesAsDataBlob(state.State),
DashboardUID: rule.DashboardUID,
PanelID: rule.PanelID,
}
if state.State.State == eval.Error {
entry.Error = state.Error.Error()
}
jsn, err := json.Marshal(entry)
if err != nil {
logger.Error("Failed to construct history record for state, skipping", "error", err)
continue
}
line := string(jsn)
buckets[repr] = append(buckets[repr], row{
At: state.State.LastEvaluationTime,
Val: line,
})
}
result := make([]stream, 0, len(buckets))
for repr, rows := range buckets {
labels, err := data.LabelsFromString(repr)
if err != nil {
logger.Error("Failed to parse frame labels, skipping state history batch: %w", err)
continue
}
result = append(result, stream{
Stream: labels,
Values: rows,
})
}
return result
}
func (h *RemoteLokiBackend) recordStreams(ctx context.Context, streams []stream, logger log.Logger) error {
if err := h.client.push(ctx, streams); err != nil {
return err
}
logger.Debug("Done saving alert state history batch")
return nil
}
type lokiEntry struct {
SchemaVersion int `json:"schemaVersion"`
Previous string `json:"previous"`
Current string `json:"current"`
Error string `json:"error,omitempty"`
Values *simplejson.Json `json:"values"`
DashboardUID string `json:"dashboardUID"`
PanelID int64 `json:"panelID"`
}
func valuesAsDataBlob(state *state.State) *simplejson.Json {
if state.State == eval.Error || state.State == eval.NoData {
return simplejson.New()
}
return jsonifyValues(state.Values)
}