The open and composable observability and data visualization platform. Visualize metrics, logs, and traces from multiple sources like Prometheus, Loki, Elasticsearch, InfluxDB, Postgres and many more.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
grafana/pkg/metrics/graphitebridge/graphite.go

396 lines
9.4 KiB

// Copyright 2016 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package graphite provides a bridge to push Prometheus metrics to a Graphite
// server.
package graphitebridge
import (
"bufio"
"errors"
"fmt"
"io"
"net"
"sort"
"strings"
"time"
"github.com/prometheus/common/expfmt"
"github.com/prometheus/common/model"
"golang.org/x/net/context"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/client_golang/prometheus"
)
const (
defaultInterval = 15 * time.Second
millisecondsPerSecond = 1000
)
// HandlerErrorHandling defines how a Handler serving metrics will handle
// errors.
type HandlerErrorHandling int
// These constants cause handlers serving metrics to behave as described if
// errors are encountered.
const (
// Ignore errors and try to push as many metrics to Graphite as possible.
ContinueOnError HandlerErrorHandling = iota
// Abort the push to Graphite upon the first error encountered.
AbortOnError
)
var metricCategoryPrefix []string = []string{"proxy_", "api_", "page_", "alerting_", "aws_", "db_", "stat_", "go_", "process_"}
var trimMetricPrefix []string = []string{"grafana_"}
// Config defines the Graphite bridge config.
type Config struct {
// The url to push data to. Required.
URL string
// The prefix for the pushed Graphite metrics. Defaults to empty string.
Prefix string
// The interval to use for pushing data to Graphite. Defaults to 15 seconds.
Interval time.Duration
// The timeout for pushing metrics to Graphite. Defaults to 15 seconds.
Timeout time.Duration
// The Gatherer to use for metrics. Defaults to prometheus.DefaultGatherer.
Gatherer prometheus.Gatherer
// The logger that messages are written to. Defaults to no logging.
Logger Logger
// ErrorHandling defines how errors are handled. Note that errors are
// logged regardless of the configured ErrorHandling provided Logger
// is not nil.
ErrorHandling HandlerErrorHandling
// Graphite does not support ever increasing counter the same way
// prometheus does. Rollups and ingestion might cannot handle ever
// increasing counters. This option allows enabled the caller to
// calculate the delta by saving the last sent counter in memory
// and subtraction it from the collected value before sending.
CountersAsDelta bool
}
// Bridge pushes metrics to the configured Graphite server.
type Bridge struct {
url string
prefix string
countersAsDetlas bool
interval time.Duration
timeout time.Duration
errorHandling HandlerErrorHandling
logger Logger
g prometheus.Gatherer
lastValue map[model.Fingerprint]float64
}
// Logger is the minimal interface Bridge needs for logging. Note that
// log.Logger from the standard library implements this interface, and it is
// easy to implement by custom loggers, if they don't do so already anyway.
type Logger interface {
Println(v ...interface{})
}
// NewBridge returns a pointer to a new Bridge struct.
func NewBridge(c *Config) (*Bridge, error) {
b := &Bridge{}
if c.URL == "" {
return nil, errors.New("missing URL")
}
b.url = c.URL
if c.Gatherer == nil {
b.g = prometheus.DefaultGatherer
} else {
b.g = c.Gatherer
}
if c.Logger != nil {
b.logger = c.Logger
}
if c.Prefix != "" {
b.prefix = c.Prefix
}
var z time.Duration
if c.Interval == z {
b.interval = defaultInterval
} else {
b.interval = c.Interval
}
if c.Timeout == z {
b.timeout = defaultInterval
} else {
b.timeout = c.Timeout
}
b.errorHandling = c.ErrorHandling
b.lastValue = map[model.Fingerprint]float64{}
b.countersAsDetlas = c.CountersAsDelta
return b, nil
}
// Run starts the event loop that pushes Prometheus metrics to Graphite at the
// configured interval.
func (b *Bridge) Run(ctx context.Context) {
ticker := time.NewTicker(b.interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if err := b.Push(); err != nil && b.logger != nil {
b.logger.Println("error pushing to Graphite:", err)
}
case <-ctx.Done():
return
}
}
}
// Push pushes Prometheus metrics to the configured Graphite server.
func (b *Bridge) Push() error {
mfs, err := b.g.Gather()
if err != nil || len(mfs) == 0 {
switch b.errorHandling {
case AbortOnError:
return err
case ContinueOnError:
if b.logger != nil {
b.logger.Println("continue on error:", err)
}
default:
panic("unrecognized error handling value")
}
}
conn, err := net.DialTimeout("tcp", b.url, b.timeout)
if err != nil {
return err
}
defer conn.Close()
return b.writeMetrics(conn, mfs, b.prefix, model.Now())
}
func (b *Bridge) writeMetrics(w io.Writer, mfs []*dto.MetricFamily, prefix string, now model.Time) error {
for _, mf := range mfs {
vec, err := expfmt.ExtractSamples(&expfmt.DecodeOptions{
Timestamp: now,
}, mf)
if err != nil {
return err
}
buf := bufio.NewWriter(w)
for _, s := range vec {
if err := writePrefix(buf, prefix); err != nil {
return err
}
if err := writeMetric(buf, s.Metric, mf); err != nil {
return err
}
value := b.replaceCounterWithDelta(mf, s.Metric, s.Value)
if _, err := fmt.Fprintf(buf, " %g %d\n", value, int64(s.Timestamp)/millisecondsPerSecond); err != nil {
return err
}
if err := buf.Flush(); err != nil {
return err
}
}
}
return nil
}
func writeMetric(buf *bufio.Writer, m model.Metric, mf *dto.MetricFamily) error {
metricName, hasName := m[model.MetricNameLabel]
numLabels := len(m) - 1
if !hasName {
numLabels = len(m)
}
for _, v := range metricCategoryPrefix {
if strings.HasPrefix(string(metricName), v) {
group := strings.Replace(v, "_", " ", 1)
metricName = model.LabelValue(strings.Replace(string(metricName), v, group, 1))
}
}
for _, v := range trimMetricPrefix {
if strings.HasPrefix(string(metricName), v) {
metricName = model.LabelValue(strings.Replace(string(metricName), v, "", 1))
}
}
labelStrings := make([]string, 0, numLabels)
for label, value := range m {
if label != model.MetricNameLabel {
labelStrings = append(labelStrings, fmt.Sprintf("%s %s", string(label), string(value)))
}
}
var err error
switch numLabels {
case 0:
if hasName {
if err := writeSanitized(buf, string(metricName)); err != nil {
return err
}
}
default:
sort.Strings(labelStrings)
if err = writeSanitized(buf, string(metricName)); err != nil {
return err
}
for _, s := range labelStrings {
if err = buf.WriteByte('.'); err != nil {
return err
}
if err = writeSanitized(buf, s); err != nil {
return err
}
}
}
if err = addExtentionConventionForRollups(buf, mf, m); err != nil {
return err
}
return nil
}
func addExtentionConventionForRollups(buf *bufio.Writer, mf *dto.MetricFamily, m model.Metric) error {
// Adding `.count` `.sum` suffix makes it possible to configure
// different rollup strategies based on metric type
mfType := mf.GetType()
var err error
if mfType == dto.MetricType_COUNTER {
if _, err = fmt.Fprint(buf, ".count"); err != nil {
return err
}
}
if mfType == dto.MetricType_SUMMARY || mfType == dto.MetricType_HISTOGRAM {
if strings.HasSuffix(string(m[model.MetricNameLabel]), "_count") {
if _, err = fmt.Fprint(buf, ".count"); err != nil {
return err
}
}
}
if mfType == dto.MetricType_HISTOGRAM {
if strings.HasSuffix(string(m[model.MetricNameLabel]), "_sum") {
if _, err = fmt.Fprint(buf, ".sum"); err != nil {
return err
}
}
}
return nil
}
func writePrefix(buf *bufio.Writer, s string) error {
for _, c := range s {
if _, err := buf.WriteRune(replaceInvalid(c)); err != nil {
return err
}
}
return nil
}
func writeSanitized(buf *bufio.Writer, s string) error {
prevUnderscore := false
for _, c := range s {
c = replaceInvalidRune(c)
if c == '_' {
if prevUnderscore {
continue
}
prevUnderscore = true
} else {
prevUnderscore = false
}
if _, err := buf.WriteRune(c); err != nil {
return err
}
}
return nil
}
func replaceInvalid(c rune) rune {
if c == ' ' || c == '.' {
return '.'
}
return replaceInvalidRune(c)
}
func replaceInvalidRune(c rune) rune {
if c == ' ' {
return '.'
}
if !((c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') || c == '_' || c == ':' || (c >= '0' && c <= '9')) {
return '_'
}
return c
}
func (b *Bridge) replaceCounterWithDelta(mf *dto.MetricFamily, metric model.Metric, value model.SampleValue) float64 {
if !b.countersAsDetlas {
return float64(value)
}
mfType := mf.GetType()
if mfType == dto.MetricType_COUNTER {
return b.returnDelta(metric, value)
}
if mfType == dto.MetricType_SUMMARY {
if strings.HasSuffix(string(metric[model.MetricNameLabel]), "_count") {
return b.returnDelta(metric, value)
}
}
return float64(value)
}
func (b *Bridge) returnDelta(metric model.Metric, value model.SampleValue) float64 {
key := metric.Fingerprint()
_, exists := b.lastValue[key]
if !exists {
b.lastValue[key] = 0
}
delta := float64(value) - b.lastValue[key]
b.lastValue[key] = float64(value)
return delta
}