The open and composable observability and data visualization platform. Visualize metrics, logs, and traces from multiple sources like Prometheus, Loki, Elasticsearch, InfluxDB, Postgres and many more.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
grafana/pkg/services/ngalert/writer/prom.go

321 lines
8.6 KiB

package writer
import (
"context"
"errors"
"fmt"
"net/http"
"net/url"
"strings"
"time"
"github.com/benbjohnson/clock"
"github.com/grafana/dataplane/sdata/numeric"
"github.com/m3db/prometheus_remote_client_golang/promremote"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana-plugin-sdk-go/backend/httpclient"
"github.com/grafana/grafana-plugin-sdk-go/data"
)
const backendType = "prometheus"
const (
// Fixed error messages
MimirDuplicateTimestampError = "err-mimir-sample-duplicate-timestamp"
MimirInvalidLabelError = "err-mimir-label-invalid"
MimirLabelValueTooLongError = "err-mimir-label-value-too-long"
MimirMaxLabelNamesPerSeriesError = "err-mimir-max-label-names-per-series"
MimirMaxSeriesPerUserError = "err-mimir-max-series-per-user"
// Best effort error messages
PrometheusDuplicateTimestampError = "duplicate sample for timestamp"
)
var (
// Unexpected, 500-like write errors.
ErrUnexpectedWriteFailure = errors.New("failed to write time series")
// Expected, user-level write errors like trying to write an invalid series.
ErrRejectedWrite = errors.New("series was rejected")
ErrBadFrame = errors.New("failed to read dataframe")
)
var DuplicateTimestampErrors = [...]string{
MimirDuplicateTimestampError,
PrometheusDuplicateTimestampError,
}
// Metric represents a Prometheus time series metric.
type Metric struct {
T time.Time
V float64
}
// Point is a logical representation of a single point in time for a Prometheus time series.
type Point struct {
Name string
Labels map[string]string
Metric Metric
}
func PointsFromFrames(name string, t time.Time, frames data.Frames, extraLabels map[string]string) ([]Point, error) {
cr, err := numeric.CollectionReaderFromFrames(frames)
if err != nil {
return nil, err
}
col, err := cr.GetCollection(false)
if err != nil {
return nil, err
}
points := make([]Point, 0, len(col.Refs))
for _, ref := range col.Refs {
fp, empty, err := ref.NullableFloat64Value()
if err != nil {
return nil, fmt.Errorf("unable to read float64 value: %w", err)
}
if empty {
return nil, fmt.Errorf("empty frame")
}
if fp == nil {
return nil, fmt.Errorf("nil frame")
}
metric := Metric{
T: t,
V: *fp,
}
labels := ref.GetLabels().Copy()
if labels == nil {
labels = data.Labels{}
}
delete(labels, "__name__")
for k, v := range extraLabels {
labels[k] = v
}
points = append(points, Point{
Name: name,
Labels: labels,
Metric: metric,
})
}
return points, nil
}
type HttpClientProvider interface {
New(options ...httpclient.Options) (*http.Client, error)
}
type PrometheusWriter struct {
client promremote.Client
clock clock.Clock
logger log.Logger
metrics *metrics.RemoteWriter
}
type PrometheusWriterConfig struct {
URL string
HTTPOptions httpclient.Options
Timeout time.Duration
}
func NewPrometheusWriterWithSettings(
settings setting.RecordingRuleSettings,
httpClientProvider HttpClientProvider,
clock clock.Clock,
l log.Logger,
metrics *metrics.RemoteWriter,
) (*PrometheusWriter, error) {
if err := validateSettings(settings); err != nil {
return nil, err
}
headers := make(http.Header)
for k, v := range settings.CustomHeaders {
headers.Add(k, v)
}
cfg := PrometheusWriterConfig{
URL: settings.URL,
HTTPOptions: httpclient.Options{
BasicAuth: createAuthOpts(settings.BasicAuthUsername, settings.BasicAuthPassword),
Header: headers,
},
Timeout: settings.Timeout,
}
return NewPrometheusWriter(cfg, httpClientProvider, clock, l, metrics)
}
func NewPrometheusWriter(
cfg PrometheusWriterConfig,
httpClientProvider HttpClientProvider,
clock clock.Clock,
l log.Logger,
metrics *metrics.RemoteWriter,
) (*PrometheusWriter, error) {
cl, err := httpClientProvider.New(cfg.HTTPOptions)
if err != nil {
return nil, err
}
clientCfg := promremote.NewConfig(
promremote.UserAgent("grafana-recording-rule"),
promremote.WriteURLOption(cfg.URL),
promremote.HTTPClientTimeoutOption(cfg.Timeout),
promremote.HTTPClientOption(cl),
)
client, err := promremote.NewClient(clientCfg)
if err != nil {
return nil, err
}
return &PrometheusWriter{
client: client,
clock: clock,
logger: l,
metrics: metrics,
}, nil
}
func validateSettings(settings setting.RecordingRuleSettings) error {
if settings.BasicAuthUsername != "" && settings.BasicAuthPassword == "" {
return fmt.Errorf("basic auth password is required if username is set")
}
if _, err := url.Parse(settings.URL); err != nil {
return fmt.Errorf("invalid URL: %w", err)
}
if settings.Timeout <= 0 {
return fmt.Errorf("timeout must be greater than 0")
}
return nil
}
func createAuthOpts(username, password string) *httpclient.BasicAuthOptions {
// If username is empty, do not use basic auth and ignore password.
if username == "" {
return nil
}
return &httpclient.BasicAuthOptions{
User: username,
Password: password,
}
}
// Write writes the given frames to the Prometheus remote write endpoint.
func (w PrometheusWriter) WriteDatasource(ctx context.Context, dsUID string, name string, t time.Time, frames data.Frames, orgID int64, extraLabels map[string]string) error {
l := w.logger.FromContext(ctx)
if dsUID != "" {
l.Error("Writing to specific data sources is not enabled", "org_id", orgID, "datasource_uid", dsUID)
return errors.New("writing to specific data sources is not enabled")
}
return w.Write(ctx, name, t, frames, orgID, extraLabels)
}
// Write writes the given frames to the Prometheus remote write endpoint.
func (w PrometheusWriter) Write(ctx context.Context, name string, t time.Time, frames data.Frames, orgID int64, extraLabels map[string]string) error {
l := w.logger.FromContext(ctx)
lvs := []string{fmt.Sprint(orgID), backendType}
points, err := PointsFromFrames(name, t, frames, extraLabels)
if err != nil {
return errors.Join(ErrBadFrame, err)
}
series := make([]promremote.TimeSeries, 0, len(points))
for _, p := range points {
series = append(series, promremote.TimeSeries{
Labels: promremoteLabelsFromPoint(p),
Datapoint: promremote.Datapoint{
Timestamp: p.Metric.T,
Value: p.Metric.V,
},
})
}
l.Debug("Writing metric", "name", name)
writeStart := w.clock.Now()
res, writeErr := w.client.WriteTimeSeries(ctx, series, promremote.WriteOptions{})
w.metrics.WriteDuration.WithLabelValues(lvs...).Observe(w.clock.Now().Sub(writeStart).Seconds())
lvs = append(lvs, fmt.Sprint(res.StatusCode))
w.metrics.WritesTotal.WithLabelValues(lvs...).Inc()
if writeErr != nil {
if err, ignored := checkWriteError(writeErr); err != nil {
return err
} else if ignored {
l.Debug("Ignored write error", "error", err, "status_code", res.StatusCode)
}
}
return nil
}
func promremoteLabelsFromPoint(point Point) []promremote.Label {
labels := make([]promremote.Label, 0, len(point.Labels))
labels = append(labels, promremote.Label{
Name: "__name__",
Value: point.Name,
})
for k, v := range point.Labels {
labels = append(labels, promremote.Label{
Name: k,
Value: v,
})
}
return labels
}
func checkWriteError(writeErr promremote.WriteError) (err error, ignored bool) {
if writeErr == nil {
return nil, false
}
// All 500-range statuses are automatically unexpected and not the fault of the data.
if writeErr.StatusCode()/100 == 5 {
return errors.Join(ErrUnexpectedWriteFailure, writeErr), false
}
// Special case for 400 status code. 400s may be ignorable in the event of HA writers, or the fault of the written data.
if writeErr.StatusCode() == 400 {
msg := writeErr.Error()
// HA may potentially write different values for the same timestamp, so we ignore this error
// TODO: this may not be needed, further testing needed
for _, e := range DuplicateTimestampErrors {
if strings.Contains(msg, e) {
return nil, true
}
}
// Check for expected user errors.
switch {
case strings.Contains(msg, MimirInvalidLabelError),
strings.Contains(msg, MimirMaxSeriesPerUserError),
strings.Contains(msg, MimirMaxLabelNamesPerSeriesError),
strings.Contains(msg, MimirLabelValueTooLongError):
return errors.Join(ErrRejectedWrite, writeErr), false
}
// For now, all 400s that are not previously known are considered unexpected.
// TODO: Consider blanket-converting all 400s to be known errors. This should only be done once we are confident this is not a problem with this client.
return errors.Join(ErrUnexpectedWriteFailure, writeErr), false
}
// All other errors which do not fit into the above categories are also unexpected.
return errors.Join(ErrUnexpectedWriteFailure, writeErr), false
}