adds matchers and metrics to regex and json

pull/640/head
Cyril Tovena 7 years ago committed by Ed
parent fea52f6a37
commit a5e47a50e8
  1. 4
      Gopkg.lock
  2. 14
      pkg/logentry/metric/entries.go
  3. 68
      pkg/logentry/metric/entries_test.go
  4. 31
      pkg/logentry/pipeline.go
  5. 6
      pkg/logentry/pipeline_test.go
  6. 58
      pkg/logentry/stages/configs.go
  7. 8
      pkg/logentry/stages/extensions.go
  8. 102
      pkg/logentry/stages/json.go
  9. 16
      pkg/logentry/stages/json_test.go
  10. 28
      pkg/logentry/stages/match.go
  11. 59
      pkg/logentry/stages/match_test.go
  12. 51
      pkg/logentry/stages/metrics.go
  13. 2
      pkg/logentry/stages/metrics_test.go
  14. 29
      pkg/logentry/stages/mutator.go
  15. 110
      pkg/logentry/stages/regex.go
  16. 18
      pkg/logentry/stages/regex_test.go
  17. 20
      pkg/logentry/stages/stage.go
  18. 17
      pkg/logql/parser.go
  19. 8
      pkg/promtail/promtail.go
  20. 6
      pkg/promtail/server/server.go
  21. 7
      pkg/promtail/targets/filetargetmanager.go
  22. 3
      pkg/promtail/targets/manager.go
  23. 198
      vendor/github.com/prometheus/client_golang/prometheus/promhttp/delegator.go
  24. 181
      vendor/github.com/prometheus/client_golang/prometheus/promhttp/delegator_1_8.go
  25. 44
      vendor/github.com/prometheus/client_golang/prometheus/promhttp/delegator_pre_1_8.go
  26. 310
      vendor/github.com/prometheus/client_golang/prometheus/promhttp/http.go
  27. 97
      vendor/github.com/prometheus/client_golang/prometheus/promhttp/instrument_client.go
  28. 144
      vendor/github.com/prometheus/client_golang/prometheus/promhttp/instrument_client_1_8.go
  29. 447
      vendor/github.com/prometheus/client_golang/prometheus/promhttp/instrument_server.go

4
Gopkg.lock generated

@ -703,7 +703,7 @@
[[projects]]
branch = "master"
digest = "1:ad2a544bc4bc4fcaa6f0b1622c4f8652da6e74b101925c263b912232e1b68cf2"
digest = "1:dff66fce6bb8fa6111998bf3575e22aa3d0fd2560712a83de3390236d3cd1f6b"
name = "github.com/prometheus/client_golang"
packages = [
"api",
@ -711,6 +711,7 @@
"prometheus",
"prometheus/internal",
"prometheus/promauto",
"prometheus/promhttp",
"prometheus/testutil",
]
pruneopts = "UT"
@ -1407,6 +1408,7 @@
"github.com/pkg/errors",
"github.com/prometheus/client_golang/prometheus",
"github.com/prometheus/client_golang/prometheus/promauto",
"github.com/prometheus/client_golang/prometheus/promhttp",
"github.com/prometheus/client_golang/prometheus/testutil",
"github.com/prometheus/common/config",
"github.com/prometheus/common/model",

@ -8,20 +8,26 @@ import (
"github.com/prometheus/common/model"
)
func LogCount(reg prometheus.Registerer) api.EntryHandler {
func LogCount(reg prometheus.Registerer, next api.EntryHandler) api.EntryHandler {
c := NewCounters("log_entries_total", "the total count of log entries")
reg.MustRegister(c)
return api.EntryHandlerFunc(func(labels model.LabelSet, time time.Time, entry string) error {
if err := next.Handle(labels, time, entry); err != nil {
return err
}
c.With(labels).Inc()
return nil
})
}
func LogSize(reg prometheus.Registerer) api.EntryHandler {
c := NewCounters("log_entries_bytes", "the total count of bytes")
func LogSize(reg prometheus.Registerer, next api.EntryHandler) api.EntryHandler {
c := NewHistograms("log_entries_bytes", "the total count of bytes", prometheus.ExponentialBuckets(16, 2, 8))
reg.MustRegister(c)
return api.EntryHandlerFunc(func(labels model.LabelSet, time time.Time, entry string) error {
c.With(labels).Add(float64(len(entry)))
if err := next.Handle(labels, time, entry); err != nil {
return err
}
c.With(labels).Observe(float64(len(entry)))
return nil
})
}

@ -1,11 +1,14 @@
package metric
import (
"errors"
"strings"
"sync"
"testing"
"time"
"github.com/grafana/loki/pkg/promtail/api"
"github.com/prometheus/client_golang/prometheus"
testutil "github.com/prometheus/client_golang/prometheus/testutil"
@ -20,10 +23,17 @@ log_entries_total{bar="foo"} 5.0
log_entries_total{bar="foo",foo="bar"} 5.0
`
var errorHandler = api.EntryHandlerFunc(func(labels model.LabelSet, time time.Time, entry string) error {
if entry == "error" {
return errors.New("")
}
return nil
})
func Test_LogCount(t *testing.T) {
t.Parallel()
reg := prometheus.NewRegistry()
handler := LogCount(reg)
handler := LogCount(reg, errorHandler)
workerCount := 5
var wg sync.WaitGroup
@ -36,6 +46,8 @@ func Test_LogCount(t *testing.T) {
_ = handler.Handle(model.LabelSet(map[model.LabelName]model.LabelValue{"bar": "foo"}), time.Now(), "")
_ = handler.Handle(model.LabelSet(map[model.LabelName]model.LabelValue{"bar": "foo", "foo": "bar"}), time.Now(), "")
_ = handler.Handle(model.LabelSet(map[model.LabelName]model.LabelValue{}), time.Now(), "")
_ = handler.Handle(model.LabelSet(map[model.LabelName]model.LabelValue{"foo": "bar"}), time.Now(), "error")
_ = handler.Handle(model.LabelSet(map[model.LabelName]model.LabelValue{}), time.Now(), "error")
}()
}
@ -48,17 +60,57 @@ func Test_LogCount(t *testing.T) {
}
const expectedSize = `# HELP log_entries_bytes the total count of bytes
# TYPE log_entries_bytes counter
log_entries_bytes 35.0
log_entries_bytes{foo="bar"} 15.0
log_entries_bytes{bar="foo"} 10.0
log_entries_bytes{bar="foo",foo="bar"} 15.0
# TYPE log_entries_bytes histogram
log_entries_bytes_bucket{le="16.0"} 10.0
log_entries_bytes_bucket{le="32.0"} 10.0
log_entries_bytes_bucket{le="64.0"} 10.0
log_entries_bytes_bucket{le="128.0"} 10.0
log_entries_bytes_bucket{le="256.0"} 10.0
log_entries_bytes_bucket{le="512.0"} 10.0
log_entries_bytes_bucket{le="1024.0"} 10.0
log_entries_bytes_bucket{le="2048.0"} 10.0
log_entries_bytes_bucket{le="+Inf"} 10.0
log_entries_bytes_sum 35.0
log_entries_bytes_count 10.0
log_entries_bytes_bucket{foo="bar",le="16.0"} 5.0
log_entries_bytes_bucket{foo="bar",le="32.0"} 5.0
log_entries_bytes_bucket{foo="bar",le="64.0"} 5.0
log_entries_bytes_bucket{foo="bar",le="128.0"} 5.0
log_entries_bytes_bucket{foo="bar",le="256.0"} 5.0
log_entries_bytes_bucket{foo="bar",le="512.0"} 5.0
log_entries_bytes_bucket{foo="bar",le="1024.0"} 5.0
log_entries_bytes_bucket{foo="bar",le="2048.0"} 5.0
log_entries_bytes_bucket{foo="bar",le="+Inf"} 5.0
log_entries_bytes_sum{foo="bar"} 15.0
log_entries_bytes_count{foo="bar"} 5.0
log_entries_bytes_bucket{bar="foo",le="16.0"} 5.0
log_entries_bytes_bucket{bar="foo",le="32.0"} 5.0
log_entries_bytes_bucket{bar="foo",le="64.0"} 5.0
log_entries_bytes_bucket{bar="foo",le="128.0"} 5.0
log_entries_bytes_bucket{bar="foo",le="256.0"} 5.0
log_entries_bytes_bucket{bar="foo",le="512.0"} 5.0
log_entries_bytes_bucket{bar="foo",le="1024.0"} 5.0
log_entries_bytes_bucket{bar="foo",le="2048.0"} 5.0
log_entries_bytes_bucket{bar="foo",le="+Inf"} 5.0
log_entries_bytes_sum{bar="foo"} 10.0
log_entries_bytes_count{bar="foo"} 5.0
log_entries_bytes_bucket{bar="foo",foo="bar",le="16.0"} 5.0
log_entries_bytes_bucket{bar="foo",foo="bar",le="32.0"} 5.0
log_entries_bytes_bucket{bar="foo",foo="bar",le="64.0"} 5.0
log_entries_bytes_bucket{bar="foo",foo="bar",le="128.0"} 5.0
log_entries_bytes_bucket{bar="foo",foo="bar",le="256.0"} 5.0
log_entries_bytes_bucket{bar="foo",foo="bar",le="512.0"} 5.0
log_entries_bytes_bucket{bar="foo",foo="bar",le="1024.0"} 5.0
log_entries_bytes_bucket{bar="foo",foo="bar",le="2048.0"} 5.0
log_entries_bytes_bucket{bar="foo",foo="bar",le="+Inf"} 5.0
log_entries_bytes_sum{bar="foo",foo="bar"} 15.0
log_entries_bytes_count{bar="foo",foo="bar"} 5.0
`
func Test_LogSize(t *testing.T) {
t.Parallel()
reg := prometheus.NewRegistry()
handler := LogSize(reg)
handler := LogSize(reg, errorHandler)
workerCount := 5
var wg sync.WaitGroup
@ -71,6 +123,8 @@ func Test_LogSize(t *testing.T) {
_ = handler.Handle(model.LabelSet(map[model.LabelName]model.LabelValue{"bar": "foo"}), time.Now(), "fu")
_ = handler.Handle(model.LabelSet(map[model.LabelName]model.LabelValue{"bar": "foo", "foo": "bar"}), time.Now(), "baz")
_ = handler.Handle(model.LabelSet(map[model.LabelName]model.LabelValue{}), time.Now(), "more")
_ = handler.Handle(model.LabelSet(map[model.LabelName]model.LabelValue{}), time.Now(), "error")
_ = handler.Handle(model.LabelSet(map[model.LabelName]model.LabelValue{"bar": "foo", "foo": "bar"}), time.Now(), "error")
}()
}

@ -34,7 +34,7 @@ type Pipeline struct {
}
// NewPipeline creates a new log entry pipeline from a configuration
func NewPipeline(logger log.Logger, stgs PipelineStages, jobName string) (*Pipeline, error) {
func NewPipeline(logger log.Logger, stgs PipelineStages, jobName string, logRegistry prometheus.Registerer) (*Pipeline, error) {
st := []stages.Stage{}
for _, s := range stgs {
stage, ok := s.(map[interface{}]interface{})
@ -50,32 +50,11 @@ func NewPipeline(logger log.Logger, stgs PipelineStages, jobName string) (*Pipel
if !ok {
return nil, errors.New("pipeline stage key must be a string")
}
switch name {
case "json":
json, err := stages.NewJSON(logger, config)
if err != nil {
return nil, errors.Wrap(err, "invalid json stage config")
}
st = append(st, json)
case "regex":
regex, err := stages.NewRegex(logger, config)
if err != nil {
return nil, errors.Wrap(err, "invalid regex stage config")
}
st = append(st, regex)
case "docker":
docker, err := stages.NewDocker(logger)
if err != nil {
return nil, errors.Wrap(err, "invalid docker stage config")
}
st = append(st, docker)
case "cri":
cri, err := stages.NewCRI(logger)
if err != nil {
return nil, errors.Wrap(err, "invalid cri stage config")
}
st = append(st, cri)
newStage, err := stages.New(logger, name, config, logRegistry)
if err != nil {
return nil, errors.Wrapf(err, "invalid %s stage config", name)
}
st = append(st, newStage)
}
}
return &Pipeline{

@ -42,7 +42,7 @@ func loadConfig(yml string) PipelineStages {
func TestNewPipeline(t *testing.T) {
p, err := NewPipeline(util.Logger, loadConfig(testYaml), "test")
p, err := NewPipeline(util.Logger, loadConfig(testYaml), "test", nil)
if err != nil {
panic(err)
}
@ -62,7 +62,7 @@ func TestPipeline_MultiStage(t *testing.T) {
if err != nil {
panic(err)
}
p, err := NewPipeline(util.Logger, config["pipeline_stages"].([]interface{}), "test")
p, err := NewPipeline(util.Logger, config["pipeline_stages"].([]interface{}), "test", nil)
if err != nil {
panic(err)
}
@ -138,7 +138,7 @@ func Benchmark(b *testing.B) {
}
for _, bm := range benchmarks {
b.Run(bm.name, func(b *testing.B) {
pl, err := NewPipeline(bm.logger, bm.stgs, "test")
pl, err := NewPipeline(bm.logger, bm.stgs, "test", nil)
if err != nil {
panic(err)
}

@ -0,0 +1,58 @@
package stages
import (
"github.com/mitchellh/mapstructure"
)
const (
MetricTypeCounter = "counter"
MetricTypeGauge = "gauge"
MetricTypeHistogram = "histogram"
StageTypeJSON = "json"
StageTypeRegex = "regex"
)
type MetricConfig struct {
MetricType string `mapstructure:"type"`
Description string `mapstructure:"description"`
Source *string `mapstructure:"source"`
Buckets []float64 `mapstructure:"buckets"`
}
type MetricsConfig map[string]MetricConfig
// JSONTimestamp configures timestamp extraction
type TimestampConfig struct {
Source *string `mapstructure:"source"`
Format string `mapstructure:"format"`
}
// JSONLabel configures a labels value extraction
type LabelConfig struct {
Source *string `mapstructure:"source"`
}
// JSONOutput configures output value extraction
type OutputConfig struct {
Source *string `mapstructure:"source"`
}
// JSONConfig configures the log entry parser to extract value from json
type StageConfig struct {
Timestamp *TimestampConfig `mapstructure:"timestamp"`
Output *OutputConfig `mapstructure:"output"`
Labels map[string]*LabelConfig `mapstructure:"labels"`
Metrics MetricsConfig `mapstructure:"metrics"`
Match string `mapstructure:"match"`
Expression string `mapstructure:"expression"`
}
func NewConfig(config interface{}) (*StageConfig, error) {
cfg := &StageConfig{}
err := mapstructure.Decode(config, cfg)
if err != nil {
return nil, err
}
return cfg, nil
}

@ -6,7 +6,7 @@ import (
// NewDocker creates a Docker json log format specific pipeline stage.
func NewDocker(logger log.Logger) (Stage, error) {
config := map[string]interface{}{
cfg := map[string]interface{}{
"timestamp": map[string]interface{}{
"source": "time",
"format": "RFC3339",
@ -20,12 +20,12 @@ func NewDocker(logger log.Logger) (Stage, error) {
"source": "log",
},
}
return NewJSON(logger, config, nil)
return New(logger, StageTypeJSON, cfg, nil)
}
// NewCRI creates a CRI format specific pipeline stage
func NewCRI(logger log.Logger) (Stage, error) {
config := map[string]interface{}{
cfg := map[string]interface{}{
"expression": "^(?s)(?P<time>\\S+?) (?P<stream>stdout|stderr) (?P<flags>\\S+?) (?P<content>.*)$",
"timestamp": map[string]interface{}{
"source": "time",
@ -40,5 +40,5 @@ func NewCRI(logger log.Logger) (Stage, error) {
"source": "content",
},
}
return NewRegex(logger, config)
return New(logger, StageTypeRegex, cfg, nil)
}

@ -8,47 +8,12 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/jmespath/go-jmespath"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
)
// JSONTimestamp configures timestamp extraction
type JSONTimestamp struct {
Source *string `mapstructure:"source"`
Format string `mapstructure:"format"`
}
// JSONLabel configures a labels value extraction
type JSONLabel struct {
Source *string `mapstructure:"source"`
}
// JSONOutput configures output value extraction
type JSONOutput struct {
Source *string `mapstructure:"source"`
}
// JSONConfig configures the log entry parser to extract value from json
type JSONConfig struct {
Timestamp *JSONTimestamp `mapstructure:"timestamp"`
Output *JSONOutput `mapstructure:"output"`
Labels map[string]*JSONLabel `mapstructure:"labels"`
Metrics MetricsConfig `mapstructure:"metrics"`
}
func newJSONConfig(config interface{}) (*JSONConfig, error) {
cfg := &JSONConfig{}
err := mapstructure.Decode(config, cfg)
if err != nil {
return nil, err
}
return cfg, nil
}
// validate the config and returns a map of necessary jmespath expressions.
func (c *JSONConfig) validate() (map[string]*jmespath.JMESPath, error) {
// validateJsonConfig a json config and returns a map of necessary jmespath expressions.
func validateJsonConfig(c *StageConfig) (map[string]*jmespath.JMESPath, error) {
if c.Output == nil && len(c.Labels) == 0 && c.Timestamp == nil && len(c.Metrics) == 0 {
return nil, errors.New("empty json parser configuration")
}
@ -105,44 +70,40 @@ func (c *JSONConfig) validate() (map[string]*jmespath.JMESPath, error) {
return expressions, nil
}
type jsonStage struct {
cfg *JSONConfig
type jsonMutator struct {
cfg *StageConfig
expressions map[string]*jmespath.JMESPath
logger log.Logger
}
// NewJSON creates a new json stage from a config.
func NewJSON(logger log.Logger, config interface{}, registerer prometheus.Registerer) (Stage, error) {
cfg, err := newJSONConfig(config)
// newJSONMutator creates a new json stage from a config.
func newJSONMutator(logger log.Logger, cfg *StageConfig) (*jsonMutator, error) {
expressions, err := validateJsonConfig(cfg)
if err != nil {
return nil, err
}
expressions, err := cfg.validate()
if err != nil {
return nil, err
}
return withMetric(&jsonStage{
return &jsonMutator{
cfg: cfg,
expressions: expressions,
logger: log.With(logger, "component", "parser", "type", "json"),
}, cfg.Metrics, registerer), nil
logger: log.With(logger, "component", "mutator", "type", "json"),
}, nil
}
// Process implement a pipeline stage
func (j *jsonStage) Process(labels model.LabelSet, t *time.Time, entry *string) Valuer {
// Process implements Mutator
func (j *jsonMutator) Process(labels model.LabelSet, t *time.Time, entry *string) Extractor {
if entry == nil {
level.Debug(j.logger).Log("msg", "cannot parse a nil entry")
return nil
}
valuer, err := newJSONValuer(*entry, j.expressions)
extractor, err := newJSONExtractor(*entry, j.expressions)
if err != nil {
level.Debug(j.logger).Log("msg", "failed to create json valuer", "err", err)
level.Debug(j.logger).Log("msg", "failed to create json extractor", "err", err)
return nil
}
// parsing ts
if j.cfg.Timestamp != nil {
ts, err := parseTimestamp(valuer, j.cfg.Timestamp)
ts, err := parseTimestamp(extractor, j.cfg.Timestamp)
if err != nil {
level.Debug(j.logger).Log("msg", "failed to parse timestamp", "err", err)
} else {
@ -156,7 +117,7 @@ func (j *jsonStage) Process(labels model.LabelSet, t *time.Time, entry *string)
if lSrc != nil {
src = lSrc.Source
}
lValue, err := valuer.getJSONString(src, lName)
lValue, err := extractor.getJSONString(src, lName)
if err != nil {
level.Debug(j.logger).Log("msg", "failed to get json string", "err", err)
continue
@ -172,47 +133,48 @@ func (j *jsonStage) Process(labels model.LabelSet, t *time.Time, entry *string)
// parsing output
if j.cfg.Output != nil {
output, err := parseOutput(valuer, j.cfg.Output)
output, err := parseOutput(extractor, j.cfg.Output)
if err != nil {
level.Debug(j.logger).Log("msg", "failed to parse output", "err", err)
} else {
*entry = output
}
}
return valuer
return extractor
}
type jsonValuer struct {
type jsonExtractor struct {
exprmap map[string]*jmespath.JMESPath
data map[string]interface{}
}
func newJSONValuer(entry string, exprmap map[string]*jmespath.JMESPath) (*jsonValuer, error) {
func newJSONExtractor(entry string, exprmap map[string]*jmespath.JMESPath) (*jsonExtractor, error) {
var data map[string]interface{}
if err := json.Unmarshal([]byte(entry), &data); err != nil {
return nil, err
}
return &jsonValuer{
return &jsonExtractor{
data: data,
exprmap: exprmap,
}, nil
}
func (v *jsonValuer) Value(expr *string) (interface{}, error) {
return v.exprmap[*expr].Search(v.data)
// Value implements Extractor
func (e *jsonExtractor) Value(expr *string) (interface{}, error) {
return e.exprmap[*expr].Search(e.data)
}
func (v *jsonValuer) getJSONString(expr *string, fallback string) (result string, err error) {
func (e *jsonExtractor) getJSONString(expr *string, fallback string) (result string, err error) {
var ok bool
if expr == nil {
result, ok = v.data[fallback].(string)
result, ok = e.data[fallback].(string)
if !ok {
return result, fmt.Errorf("%s is not a string but %T", fallback, v.data[fallback])
return result, fmt.Errorf("%s is not a string but %T", fallback, e.data[fallback])
}
return
}
var searchResult interface{}
if searchResult, err = v.Value(expr); err != nil {
if searchResult, err = e.Value(expr); err != nil {
return
}
if result, ok = searchResult.(string); !ok {
@ -222,8 +184,8 @@ func (v *jsonValuer) getJSONString(expr *string, fallback string) (result string
return
}
func parseOutput(v Valuer, cfg *JSONOutput) (string, error) {
jsonObj, err := v.Value(cfg.Source)
func parseOutput(e Extractor, cfg *OutputConfig) (string, error) {
jsonObj, err := e.Value(cfg.Source)
if err != nil {
return "", errors.Wrap(err, "failed to fetch json value")
}
@ -240,8 +202,8 @@ func parseOutput(v Valuer, cfg *JSONOutput) (string, error) {
return string(b), nil
}
func parseTimestamp(v *jsonValuer, cfg *JSONTimestamp) (time.Time, error) {
ts, err := v.getJSONString(cfg.Source, "timestamp")
func parseTimestamp(e *jsonExtractor, cfg *TimestampConfig) (time.Time, error) {
ts, err := e.getJSONString(cfg.Source, "timestamp")
if err != nil {
return time.Time{}, err
}

@ -33,18 +33,18 @@ func TestYamlMapStructure(t *testing.T) {
if !ok {
t.Fatalf("could not read parser %+v", mapstruct["json"])
}
got, err := newJSONConfig(p)
got, err := NewConfig(p)
if err != nil {
t.Fatalf("could not create parser from yaml: %s", err)
}
want := &JSONConfig{
Labels: map[string]*JSONLabel{
want := &StageConfig{
Labels: map[string]*LabelConfig{
"stream": {
Source: String("json_key_name.json_sub_key_name"),
},
},
Output: &JSONOutput{Source: String("log")},
Timestamp: &JSONTimestamp{
Output: &OutputConfig{Source: String("log")},
Timestamp: &TimestampConfig{
Format: "RFC3339",
Source: String("time"),
},
@ -167,11 +167,11 @@ func TestJSONConfig_validate(t *testing.T) {
for tName, tt := range tests {
tt := tt
t.Run(tName, func(t *testing.T) {
c, err := newJSONConfig(tt.config)
c, err := NewConfig(tt.config)
if err != nil {
t.Fatalf("failed to create config: %s", err)
}
got, err := c.validate()
got, err := validateJsonConfig(c)
if (err != nil) != tt.wantErr {
t.Errorf("JSONConfig.validate() error = %v, wantErr %v", err, tt.wantErr)
return
@ -317,7 +317,7 @@ func TestJSONParser_Parse(t *testing.T) {
tt := tt
t.Run(tName, func(t *testing.T) {
t.Parallel()
p, err := NewJSON(util.Logger, tt.config, nil)
p, err := New(util.Logger, StageTypeJSON, tt.config, nil)
if err != nil {
t.Fatalf("failed to create json parser: %s", err)
}

@ -0,0 +1,28 @@
package stages
import (
"time"
"github.com/grafana/loki/pkg/logql"
"github.com/prometheus/common/model"
)
func withMatcher(s Stage, matcher string) (Stage, error) {
if matcher == "" {
return s, nil
}
matchers, err := logql.ParseMatchers(matcher)
if err != nil {
return nil, err
}
return StageFunc(func(labels model.LabelSet, t *time.Time, entry *string) {
for _, filter := range matchers {
if !filter.Matches(string(labels[model.LabelName(filter.Name)])) {
return
}
}
s.Process(labels, t, entry)
}), nil
}

@ -0,0 +1,59 @@
package stages
import (
"fmt"
"testing"
"time"
"github.com/prometheus/common/model"
)
type hasRunStage bool
func (h *hasRunStage) Process(labels model.LabelSet, t *time.Time, entry *string) {
*h = true
}
func Test_withMatcher(t *testing.T) {
t.Parallel()
tests := []struct {
matcher string
labels map[string]string
shouldRun bool
wantErr bool
}{
{"{foo=\"bar\"} |= \"foo\"", map[string]string{"foo": "bar"}, false, true},
{"{foo=\"bar\"} |~ \"foo\"", map[string]string{"foo": "bar"}, false, true},
{"foo", map[string]string{"foo": "bar"}, false, true},
{"{}", map[string]string{"foo": "bar"}, false, true},
{"{", map[string]string{"foo": "bar"}, false, true},
{"", map[string]string{"foo": "bar"}, true, false},
{"{foo=\"bar\"}", map[string]string{"foo": "bar"}, true, false},
{"{foo=\"\"}", map[string]string{"foo": "bar"}, false, false},
{"{foo=\"\"}", map[string]string{}, true, false},
{"{foo!=\"bar\"}", map[string]string{"foo": "bar"}, false, false},
{"{foo=\"bar\",bar!=\"test\"}", map[string]string{"foo": "bar"}, true, false},
{"{foo=\"bar\",bar!=\"test\"}", map[string]string{"foo": "bar", "bar": "test"}, false, false},
{"{foo=\"bar\",bar=~\"te.*\"}", map[string]string{"foo": "bar", "bar": "test"}, true, false},
{"{foo=\"bar\",bar!~\"te.*\"}", map[string]string{"foo": "bar", "bar": "test"}, false, false},
}
for _, tt := range tests {
t.Run(fmt.Sprintf("%s/%s", tt.matcher, tt.labels), func(t *testing.T) {
hasRun := hasRunStage(false)
s, err := withMatcher(&hasRun, tt.matcher)
if (err != nil) != tt.wantErr {
t.Errorf("withMatcher() error = %v, wantErr %v", err, tt.wantErr)
return
}
if s != nil {
ts, entry := time.Now(), ""
s.Process(toLabelSet(tt.labels), &ts, &entry)
if bool(hasRun) != tt.shouldRun {
t.Error("stage ran but should have not")
}
}
})
}
}

@ -9,34 +9,10 @@ import (
"github.com/grafana/loki/pkg/logentry/metric"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
)
const (
MetricTypeCounter = "counter"
MetricTypeGauge = "gauge"
MetricTypeHistogram = "histogram"
)
type MetricConfig struct {
MetricType string `mapstructure:"type"`
Description string `mapstructure:"description"`
Source *string `mapstructure:"source"`
Buckets []float64 `mapstructure:"buckets"`
}
type MetricsConfig map[string]MetricConfig
type Valuer interface {
Value(source *string) (interface{}, error)
}
type StageValuer interface {
Process(labels model.LabelSet, time *time.Time, entry *string) Valuer
}
func withMetric(s StageValuer, cfg MetricsConfig, registry prometheus.Registerer) Stage {
func withMetric(s Mutator, cfg MetricsConfig, registry prometheus.Registerer) Stage {
if registry == nil {
return StageFunc(func(labels model.LabelSet, time *time.Time, entry *string) {
_ = s.Process(labels, time, entry)
@ -53,16 +29,16 @@ func withMetric(s StageValuer, cfg MetricsConfig, registry prometheus.Registerer
func newMetric(cfgs MetricsConfig, registry prometheus.Registerer) *metricStage {
metrics := map[string]prometheus.Collector{}
for name, config := range cfgs {
for name, cfg := range cfgs {
var collector prometheus.Collector
switch strings.ToLower(config.MetricType) {
switch strings.ToLower(cfg.MetricType) {
case MetricTypeCounter:
collector = metric.NewCounters(name, config.Description)
collector = metric.NewCounters(name, cfg.Description)
case MetricTypeGauge:
collector = metric.NewGauges(name, config.Description)
collector = metric.NewGauges(name, cfg.Description)
case MetricTypeHistogram:
collector = metric.NewHistograms(name, config.Description, config.Buckets)
collector = metric.NewHistograms(name, cfg.Description, cfg.Buckets)
}
if collector != nil {
registry.MustRegister(collector)
@ -80,7 +56,7 @@ type metricStage struct {
metrics map[string]prometheus.Collector
}
func (m *metricStage) process(v Valuer, labels model.LabelSet) {
func (m *metricStage) process(v Extractor, labels model.LabelSet) {
for name, collector := range m.metrics {
switch vec := collector.(type) {
case *metric.Counters:
@ -93,8 +69,8 @@ func (m *metricStage) process(v Valuer, labels model.LabelSet) {
}
}
func recordCounter(counter prometheus.Counter, v Valuer, cfg MetricConfig) {
unk, err := v.Value(cfg.Source)
func recordCounter(counter prometheus.Counter, e Extractor, cfg MetricConfig) {
unk, err := e.Value(cfg.Source)
if err != nil {
return
}
@ -105,8 +81,8 @@ func recordCounter(counter prometheus.Counter, v Valuer, cfg MetricConfig) {
counter.Add(f)
}
func recordGauge(gauge prometheus.Gauge, v Valuer, cfg MetricConfig) {
unk, err := v.Value(cfg.Source)
func recordGauge(gauge prometheus.Gauge, e Extractor, cfg MetricConfig) {
unk, err := e.Value(cfg.Source)
if err != nil {
return
}
@ -114,11 +90,12 @@ func recordGauge(gauge prometheus.Gauge, v Valuer, cfg MetricConfig) {
if err != nil {
return
}
//todo Gauge we be able to add,inc,dec,set
gauge.Add(f)
}
func recordHistogram(histogram prometheus.Histogram, v Valuer, cfg MetricConfig) {
unk, err := v.Value(cfg.Source)
func recordHistogram(histogram prometheus.Histogram, e Extractor, cfg MetricConfig) {
unk, err := e.Value(cfg.Source)
if err != nil {
return
}

@ -61,7 +61,7 @@ func Test_withMetric(t *testing.T) {
},
}
registry := prometheus.NewRegistry()
metricStage, err := NewJSON(util.Logger, cfg, registry)
metricStage, err := New(util.Logger, StageTypeJSON, cfg, registry)
if err != nil {
t.Fatalf("failed to create stage with metrics: %v", err)
}

@ -0,0 +1,29 @@
package stages
import (
"fmt"
"time"
"github.com/go-kit/kit/log"
"github.com/prometheus/common/model"
)
type Extractor interface {
Value(source *string) (interface{}, error)
}
// return for other part of the stage.
type Mutator interface {
Process(labels model.LabelSet, time *time.Time, entry *string) Extractor
}
func newMutator(logger log.Logger, stageType string, cfg *StageConfig) (Mutator, error) {
switch stageType {
case StageTypeJSON:
return newJSONMutator(logger, cfg)
case StageTypeRegex:
return newRegexMutator(logger, cfg)
default:
return nil, fmt.Errorf("unknown mutator for stage type: %v", stageType)
}
}

@ -7,44 +7,10 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
)
// RegexTimestamp configures timestamp extraction
type RegexTimestamp struct {
Source *string `mapstructure:"source"`
Format string `mapstructure:"format"`
}
// RegexLabel configures a labels value extraction
type RegexLabel struct {
Source *string `mapstructure:"source"`
}
// RegexOutput configures output value extraction
type RegexOutput struct {
Source *string `mapstructure:"source"`
}
// RegexConfig configures the log entry parser to extract value from regex
type RegexConfig struct {
Timestamp *RegexTimestamp `mapstructure:"timestamp"`
Expression string `mapstructure:"expression"`
Output *RegexOutput `mapstructure:"output"`
Labels map[string]*RegexLabel `mapstructure:"labels"`
}
func newRegexConfig(config interface{}) (*RegexConfig, error) {
cfg := &RegexConfig{}
err := mapstructure.Decode(config, cfg)
if err != nil {
return nil, err
}
return cfg, nil
}
// Config Errors
const (
ErrExpressionRequired = "expression is required"
@ -58,7 +24,7 @@ const (
)
// validate the config and return a
func (c *RegexConfig) validate() (*regexp.Regexp, error) {
func validateRegexConfig(c *StageConfig) (*regexp.Regexp, error) {
if c.Output == nil && len(c.Labels) == 0 && c.Timestamp == nil {
return nil, errors.New(ErrEmptyRegexStageConfig)
@ -102,7 +68,7 @@ func (c *RegexConfig) validate() (*regexp.Regexp, error) {
}
if labelSrc == nil || *labelSrc.Source == "" {
lName := labelName
c.Labels[labelName] = &RegexLabel{
c.Labels[labelName] = &LabelConfig{
&lName,
}
}
@ -111,51 +77,41 @@ func (c *RegexConfig) validate() (*regexp.Regexp, error) {
return expr, nil
}
type regexStage struct {
cfg *RegexConfig
type regexMutator struct {
cfg *StageConfig
expression *regexp.Regexp
logger log.Logger
}
// NewRegex creates a new regular expression based pipeline processing stage.
func NewRegex(logger log.Logger, config interface{}) (Stage, error) {
cfg, err := newRegexConfig(config)
func newRegexMutator(logger log.Logger, cfg *StageConfig) (Mutator, error) {
expression, err := validateRegexConfig(cfg)
if err != nil {
return nil, err
}
expression, err := cfg.validate()
if err != nil {
return nil, err
}
return &regexStage{
return &regexMutator{
cfg: cfg,
expression: expression,
logger: log.With(logger, "component", "parser", "type", "regex"),
logger: log.With(logger, "component", "mutator", "type", "regex"),
}, nil
}
// Process implements a pipeline stage
func (r *regexStage) Process(labels model.LabelSet, t *time.Time, entry *string) {
// Process implements Mutator
func (r *regexMutator) Process(labels model.LabelSet, t *time.Time, entry *string) Extractor {
if entry == nil {
level.Debug(r.logger).Log("msg", "cannot parse a nil entry")
return
return nil
}
match := r.expression.FindStringSubmatch(*entry)
if match == nil {
level.Debug(r.logger).Log("msg", "regex failed to match")
return
}
groups := make(map[string]string)
for i, name := range r.expression.SubexpNames() {
if i != 0 && name != "" {
groups[name] = match[i]
}
extractor, err := newRegexExtractor(*entry, r.expression)
if err != nil {
level.Debug(r.logger).Log("msg", "failed to create regex extractor", "err", err)
return nil
}
// Parsing timestamp.
if r.cfg.Timestamp != nil {
if ts, ok := groups[*r.cfg.Timestamp.Source]; ok {
if ts, ok := extractor.groups[*r.cfg.Timestamp.Source]; ok {
parsedTs, err := time.Parse(r.cfg.Timestamp.Format, ts)
if err != nil {
level.Debug(r.logger).Log("msg", "failed to parse time", "err", err, "format", r.cfg.Timestamp.Format, "value", ts)
@ -169,7 +125,7 @@ func (r *regexStage) Process(labels model.LabelSet, t *time.Time, entry *string)
// Parsing labels.
for lName, lSrc := range r.cfg.Labels {
lValue, ok := groups[*lSrc.Source]
lValue, ok := extractor.groups[*lSrc.Source]
if !ok {
continue
}
@ -183,11 +139,41 @@ func (r *regexStage) Process(labels model.LabelSet, t *time.Time, entry *string)
// Parsing output.
if r.cfg.Output != nil {
if o, ok := groups[*r.cfg.Output.Source]; ok {
if o, ok := extractor.groups[*r.cfg.Output.Source]; ok {
*entry = o
} else {
level.Debug(r.logger).Log("msg", "regex didn't match output source")
}
}
return extractor
}
type regexExtractor struct {
groups map[string]string
}
func newRegexExtractor(entry string, expression *regexp.Regexp) (*regexExtractor, error) {
match := expression.FindStringSubmatch(entry)
if match == nil {
return nil, errors.New("regex failed to match")
}
groups := make(map[string]string)
for i, name := range expression.SubexpNames() {
if i != 0 && name != "" {
groups[name] = match[i]
}
}
return &regexExtractor{groups: groups}, nil
}
// Value implements Extractor
func (e *regexExtractor) Value(expr *string) (interface{}, error) {
if expr == nil {
return len(e.groups), nil
}
if value, ok := e.groups[*expr]; ok {
return value, nil
}
return nil, fmt.Errorf("expression not matched: %s", *expr)
}

@ -36,18 +36,18 @@ func TestRegexMapStructure(t *testing.T) {
if !ok {
t.Fatalf("could not read parser %+v", mapstruct["regex"])
}
got, err := newRegexConfig(p)
got, err := NewConfig(p)
if err != nil {
t.Fatalf("could not create parser from yaml: %s", err)
}
want := &RegexConfig{
Labels: map[string]*RegexLabel{
want := &StageConfig{
Labels: map[string]*LabelConfig{
"stream": {
Source: String("stream"),
},
},
Output: &RegexOutput{Source: String("log")},
Timestamp: &RegexTimestamp{
Output: &OutputConfig{Source: String("log")},
Timestamp: &TimestampConfig{
Format: "RFC3339",
Source: String("time"),
},
@ -181,11 +181,11 @@ func TestRegexConfig_validate(t *testing.T) {
for tName, tt := range tests {
tt := tt
t.Run(tName, func(t *testing.T) {
c, err := newRegexConfig(tt.config)
c, err := NewConfig(tt.config)
if err != nil {
t.Fatalf("failed to create config: %s", err)
}
_, err = c.validate()
_, err = validateRegexConfig(c)
if (err != nil) != (tt.err != nil) {
t.Errorf("RegexConfig.validate() expected error = %v, actual error = %v", tt.err, err)
return
@ -358,7 +358,7 @@ func TestRegexParser_Parse(t *testing.T) {
tt := tt
t.Run(tName, func(t *testing.T) {
t.Parallel()
p, err := NewRegex(util.Logger, tt.config)
p, err := New(util.Logger, StageTypeRegex, tt.config, nil)
if err != nil {
t.Fatalf("failed to create regex parser: %s", err)
}
@ -402,7 +402,7 @@ func Benchmark(b *testing.B) {
}
for _, bm := range benchmarks {
b.Run(bm.name, func(b *testing.B) {
stage, err := NewRegex(util.Logger, bm.config)
stage, err := New(util.Logger, StageTypeRegex, bm.config, nil)
if err != nil {
panic(err)
}

@ -3,6 +3,8 @@ package stages
import (
"time"
"github.com/go-kit/kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
)
@ -19,3 +21,21 @@ type StageFunc func(labels model.LabelSet, time *time.Time, entry *string)
func (s StageFunc) Process(labels model.LabelSet, time *time.Time, entry *string) {
s(labels, time, entry)
}
func New(logger log.Logger, stageType string, cfg interface{}, registerer prometheus.Registerer) (Stage, error) {
switch stageType {
case "docker":
return NewDocker(logger)
case "cri":
return NewCRI(logger)
}
stageConf, err := NewConfig(cfg)
if err != nil {
return nil, err
}
m, err := newMutator(logger, stageType, stageConf)
if err != nil {
return nil, err
}
return withMatcher(withMetric(m, stageConf.Metrics, registerer), stageConf.Match)
}

@ -1,10 +1,13 @@
package logql
import (
"errors"
"fmt"
"strconv"
"strings"
"text/scanner"
"github.com/prometheus/prometheus/pkg/labels"
)
func init() {
@ -32,6 +35,20 @@ func ParseExpr(input string) (Expr, error) {
return l.expr, nil
}
// ParseMatchers parses a string and returns labels matchers, if the expression contains
// anything else it will return an error.
func ParseMatchers(input string) ([]*labels.Matcher, error) {
expr, err := ParseExpr(input)
if err != nil {
return nil, err
}
matcherExpr, ok := expr.(*matchersExpr)
if !ok {
return nil, errors.New("only label matchers is supported")
}
return matcherExpr.matchers, nil
}
var tokens = map[string]int{
",": COMMA,
".": DOT,

@ -2,6 +2,7 @@ package promtail
import (
"github.com/cortexproject/cortex/pkg/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/grafana/loki/pkg/promtail/client"
"github.com/grafana/loki/pkg/promtail/config"
@ -35,12 +36,15 @@ func New(cfg config.Config) (*Promtail, error) {
return nil, err
}
tms, err := targets.NewTargetManagers(util.Logger, positions, client, cfg.ScrapeConfig, &cfg.TargetConfig)
// metrics from logs will be collected inside this registry
logRegistry := prometheus.NewRegistry()
tms, err := targets.NewTargetManagers(util.Logger, positions, client, cfg.ScrapeConfig, &cfg.TargetConfig, logRegistry)
if err != nil {
return nil, err
}
server, err := server.New(cfg.ServerConfig, tms)
server, err := server.New(cfg.ServerConfig, tms, logRegistry)
if err != nil {
return nil, err
}

@ -13,6 +13,8 @@ import (
logutil "github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/prometheus/common/version"
serverww "github.com/weaveworks/common/server"
@ -39,7 +41,7 @@ type Config struct {
}
// New makes a new Server
func New(cfg Config, tms *targets.TargetManagers) (*Server, error) {
func New(cfg Config, tms *targets.TargetManagers, logRegistry prometheus.Gatherer) (*Server, error) {
wws, err := serverww.New(cfg.Config)
if err != nil {
return nil, err
@ -59,6 +61,8 @@ func New(cfg Config, tms *targets.TargetManagers) (*Server, error) {
serv.HTTP.PathPrefix("/static/").Handler(http.FileServer(ui.Assets))
serv.HTTP.Path("/service-discovery").Handler(http.HandlerFunc(serv.serviceDiscovery))
serv.HTTP.Path("/targets").Handler(http.HandlerFunc(serv.targets))
serv.HTTP.Path("/logmetrics").Handler(promhttp.HandlerFor(logRegistry, promhttp.HandlerOpts{}))
return serv, nil
}

@ -7,6 +7,8 @@ import (
"strings"
"sync"
"github.com/grafana/loki/pkg/logentry/metric"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
@ -59,6 +61,7 @@ func NewFileTargetManager(
client api.EntryHandler,
scrapeConfigs []scrape.Config,
targetConfig *Config,
logRegistry prometheus.Registerer,
) (*FileTargetManager, error) {
ctx, quit := context.WithCancel(context.Background())
tm := &FileTargetManager{
@ -75,7 +78,7 @@ func NewFileTargetManager(
config := map[string]sd_config.ServiceDiscoveryConfig{}
for _, cfg := range scrapeConfigs {
pipeline, err := logentry.NewPipeline(log.With(logger, "component", "pipeline"), cfg.PipelineStages, cfg.JobName)
pipeline, err := logentry.NewPipeline(log.With(logger, "component", "pipeline"), cfg.PipelineStages, cfg.JobName, logRegistry)
if err != nil {
return nil, err
}
@ -111,7 +114,7 @@ func NewFileTargetManager(
targets: map[string]*FileTarget{},
droppedTargets: []Target{},
hostname: hostname,
entryHandler: pipeline.Wrap(client),
entryHandler: pipeline.Wrap(metric.LogSize(logRegistry, metric.LogCount(logRegistry, client))),
targetConfig: targetConfig,
}
tm.syncers[cfg.JobName] = s

@ -3,6 +3,7 @@ package targets
import (
"github.com/go-kit/kit/log"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/grafana/loki/pkg/promtail/api"
"github.com/grafana/loki/pkg/promtail/positions"
@ -28,6 +29,7 @@ func NewTargetManagers(
client api.EntryHandler,
scrapeConfigs []scrape.Config,
targetConfig *Config,
logRegistry prometheus.Registerer,
) (*TargetManagers, error) {
var targetManagers []targetManager
var fileScrapeConfigs []scrape.Config
@ -40,6 +42,7 @@ func NewTargetManagers(
client,
fileScrapeConfigs,
targetConfig,
logRegistry,
)
if err != nil {
return nil, errors.Wrap(err, "failed to make file target manager")

@ -0,0 +1,198 @@
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package promhttp
import (
"bufio"
"io"
"net"
"net/http"
)
const (
closeNotifier = 1 << iota
flusher
hijacker
readerFrom
pusher
)
type delegator interface {
http.ResponseWriter
Status() int
Written() int64
}
type responseWriterDelegator struct {
http.ResponseWriter
status int
written int64
wroteHeader bool
observeWriteHeader func(int)
}
func (r *responseWriterDelegator) Status() int {
return r.status
}
func (r *responseWriterDelegator) Written() int64 {
return r.written
}
func (r *responseWriterDelegator) WriteHeader(code int) {
r.status = code
r.wroteHeader = true
r.ResponseWriter.WriteHeader(code)
if r.observeWriteHeader != nil {
r.observeWriteHeader(code)
}
}
func (r *responseWriterDelegator) Write(b []byte) (int, error) {
if !r.wroteHeader {
r.WriteHeader(http.StatusOK)
}
n, err := r.ResponseWriter.Write(b)
r.written += int64(n)
return n, err
}
type closeNotifierDelegator struct{ *responseWriterDelegator }
type flusherDelegator struct{ *responseWriterDelegator }
type hijackerDelegator struct{ *responseWriterDelegator }
type readerFromDelegator struct{ *responseWriterDelegator }
func (d closeNotifierDelegator) CloseNotify() <-chan bool {
return d.ResponseWriter.(http.CloseNotifier).CloseNotify()
}
func (d flusherDelegator) Flush() {
d.ResponseWriter.(http.Flusher).Flush()
}
func (d hijackerDelegator) Hijack() (net.Conn, *bufio.ReadWriter, error) {
return d.ResponseWriter.(http.Hijacker).Hijack()
}
func (d readerFromDelegator) ReadFrom(re io.Reader) (int64, error) {
if !d.wroteHeader {
d.WriteHeader(http.StatusOK)
}
n, err := d.ResponseWriter.(io.ReaderFrom).ReadFrom(re)
d.written += n
return n, err
}
var pickDelegator = make([]func(*responseWriterDelegator) delegator, 32)
func init() {
// TODO(beorn7): Code generation would help here.
pickDelegator[0] = func(d *responseWriterDelegator) delegator { // 0
return d
}
pickDelegator[closeNotifier] = func(d *responseWriterDelegator) delegator { // 1
return closeNotifierDelegator{d}
}
pickDelegator[flusher] = func(d *responseWriterDelegator) delegator { // 2
return flusherDelegator{d}
}
pickDelegator[flusher+closeNotifier] = func(d *responseWriterDelegator) delegator { // 3
return struct {
*responseWriterDelegator
http.Flusher
http.CloseNotifier
}{d, flusherDelegator{d}, closeNotifierDelegator{d}}
}
pickDelegator[hijacker] = func(d *responseWriterDelegator) delegator { // 4
return hijackerDelegator{d}
}
pickDelegator[hijacker+closeNotifier] = func(d *responseWriterDelegator) delegator { // 5
return struct {
*responseWriterDelegator
http.Hijacker
http.CloseNotifier
}{d, hijackerDelegator{d}, closeNotifierDelegator{d}}
}
pickDelegator[hijacker+flusher] = func(d *responseWriterDelegator) delegator { // 6
return struct {
*responseWriterDelegator
http.Hijacker
http.Flusher
}{d, hijackerDelegator{d}, flusherDelegator{d}}
}
pickDelegator[hijacker+flusher+closeNotifier] = func(d *responseWriterDelegator) delegator { // 7
return struct {
*responseWriterDelegator
http.Hijacker
http.Flusher
http.CloseNotifier
}{d, hijackerDelegator{d}, flusherDelegator{d}, closeNotifierDelegator{d}}
}
pickDelegator[readerFrom] = func(d *responseWriterDelegator) delegator { // 8
return readerFromDelegator{d}
}
pickDelegator[readerFrom+closeNotifier] = func(d *responseWriterDelegator) delegator { // 9
return struct {
*responseWriterDelegator
io.ReaderFrom
http.CloseNotifier
}{d, readerFromDelegator{d}, closeNotifierDelegator{d}}
}
pickDelegator[readerFrom+flusher] = func(d *responseWriterDelegator) delegator { // 10
return struct {
*responseWriterDelegator
io.ReaderFrom
http.Flusher
}{d, readerFromDelegator{d}, flusherDelegator{d}}
}
pickDelegator[readerFrom+flusher+closeNotifier] = func(d *responseWriterDelegator) delegator { // 11
return struct {
*responseWriterDelegator
io.ReaderFrom
http.Flusher
http.CloseNotifier
}{d, readerFromDelegator{d}, flusherDelegator{d}, closeNotifierDelegator{d}}
}
pickDelegator[readerFrom+hijacker] = func(d *responseWriterDelegator) delegator { // 12
return struct {
*responseWriterDelegator
io.ReaderFrom
http.Hijacker
}{d, readerFromDelegator{d}, hijackerDelegator{d}}
}
pickDelegator[readerFrom+hijacker+closeNotifier] = func(d *responseWriterDelegator) delegator { // 13
return struct {
*responseWriterDelegator
io.ReaderFrom
http.Hijacker
http.CloseNotifier
}{d, readerFromDelegator{d}, hijackerDelegator{d}, closeNotifierDelegator{d}}
}
pickDelegator[readerFrom+hijacker+flusher] = func(d *responseWriterDelegator) delegator { // 14
return struct {
*responseWriterDelegator
io.ReaderFrom
http.Hijacker
http.Flusher
}{d, readerFromDelegator{d}, hijackerDelegator{d}, flusherDelegator{d}}
}
pickDelegator[readerFrom+hijacker+flusher+closeNotifier] = func(d *responseWriterDelegator) delegator { // 15
return struct {
*responseWriterDelegator
io.ReaderFrom
http.Hijacker
http.Flusher
http.CloseNotifier
}{d, readerFromDelegator{d}, hijackerDelegator{d}, flusherDelegator{d}, closeNotifierDelegator{d}}
}
}

@ -0,0 +1,181 @@
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// +build go1.8
package promhttp
import (
"io"
"net/http"
)
type pusherDelegator struct{ *responseWriterDelegator }
func (d pusherDelegator) Push(target string, opts *http.PushOptions) error {
return d.ResponseWriter.(http.Pusher).Push(target, opts)
}
func init() {
pickDelegator[pusher] = func(d *responseWriterDelegator) delegator { // 16
return pusherDelegator{d}
}
pickDelegator[pusher+closeNotifier] = func(d *responseWriterDelegator) delegator { // 17
return struct {
*responseWriterDelegator
http.Pusher
http.CloseNotifier
}{d, pusherDelegator{d}, closeNotifierDelegator{d}}
}
pickDelegator[pusher+flusher] = func(d *responseWriterDelegator) delegator { // 18
return struct {
*responseWriterDelegator
http.Pusher
http.Flusher
}{d, pusherDelegator{d}, flusherDelegator{d}}
}
pickDelegator[pusher+flusher+closeNotifier] = func(d *responseWriterDelegator) delegator { // 19
return struct {
*responseWriterDelegator
http.Pusher
http.Flusher
http.CloseNotifier
}{d, pusherDelegator{d}, flusherDelegator{d}, closeNotifierDelegator{d}}
}
pickDelegator[pusher+hijacker] = func(d *responseWriterDelegator) delegator { // 20
return struct {
*responseWriterDelegator
http.Pusher
http.Hijacker
}{d, pusherDelegator{d}, hijackerDelegator{d}}
}
pickDelegator[pusher+hijacker+closeNotifier] = func(d *responseWriterDelegator) delegator { // 21
return struct {
*responseWriterDelegator
http.Pusher
http.Hijacker
http.CloseNotifier
}{d, pusherDelegator{d}, hijackerDelegator{d}, closeNotifierDelegator{d}}
}
pickDelegator[pusher+hijacker+flusher] = func(d *responseWriterDelegator) delegator { // 22
return struct {
*responseWriterDelegator
http.Pusher
http.Hijacker
http.Flusher
}{d, pusherDelegator{d}, hijackerDelegator{d}, flusherDelegator{d}}
}
pickDelegator[pusher+hijacker+flusher+closeNotifier] = func(d *responseWriterDelegator) delegator { //23
return struct {
*responseWriterDelegator
http.Pusher
http.Hijacker
http.Flusher
http.CloseNotifier
}{d, pusherDelegator{d}, hijackerDelegator{d}, flusherDelegator{d}, closeNotifierDelegator{d}}
}
pickDelegator[pusher+readerFrom] = func(d *responseWriterDelegator) delegator { // 24
return struct {
*responseWriterDelegator
http.Pusher
io.ReaderFrom
}{d, pusherDelegator{d}, readerFromDelegator{d}}
}
pickDelegator[pusher+readerFrom+closeNotifier] = func(d *responseWriterDelegator) delegator { // 25
return struct {
*responseWriterDelegator
http.Pusher
io.ReaderFrom
http.CloseNotifier
}{d, pusherDelegator{d}, readerFromDelegator{d}, closeNotifierDelegator{d}}
}
pickDelegator[pusher+readerFrom+flusher] = func(d *responseWriterDelegator) delegator { // 26
return struct {
*responseWriterDelegator
http.Pusher
io.ReaderFrom
http.Flusher
}{d, pusherDelegator{d}, readerFromDelegator{d}, flusherDelegator{d}}
}
pickDelegator[pusher+readerFrom+flusher+closeNotifier] = func(d *responseWriterDelegator) delegator { // 27
return struct {
*responseWriterDelegator
http.Pusher
io.ReaderFrom
http.Flusher
http.CloseNotifier
}{d, pusherDelegator{d}, readerFromDelegator{d}, flusherDelegator{d}, closeNotifierDelegator{d}}
}
pickDelegator[pusher+readerFrom+hijacker] = func(d *responseWriterDelegator) delegator { // 28
return struct {
*responseWriterDelegator
http.Pusher
io.ReaderFrom
http.Hijacker
}{d, pusherDelegator{d}, readerFromDelegator{d}, hijackerDelegator{d}}
}
pickDelegator[pusher+readerFrom+hijacker+closeNotifier] = func(d *responseWriterDelegator) delegator { // 29
return struct {
*responseWriterDelegator
http.Pusher
io.ReaderFrom
http.Hijacker
http.CloseNotifier
}{d, pusherDelegator{d}, readerFromDelegator{d}, hijackerDelegator{d}, closeNotifierDelegator{d}}
}
pickDelegator[pusher+readerFrom+hijacker+flusher] = func(d *responseWriterDelegator) delegator { // 30
return struct {
*responseWriterDelegator
http.Pusher
io.ReaderFrom
http.Hijacker
http.Flusher
}{d, pusherDelegator{d}, readerFromDelegator{d}, hijackerDelegator{d}, flusherDelegator{d}}
}
pickDelegator[pusher+readerFrom+hijacker+flusher+closeNotifier] = func(d *responseWriterDelegator) delegator { // 31
return struct {
*responseWriterDelegator
http.Pusher
io.ReaderFrom
http.Hijacker
http.Flusher
http.CloseNotifier
}{d, pusherDelegator{d}, readerFromDelegator{d}, hijackerDelegator{d}, flusherDelegator{d}, closeNotifierDelegator{d}}
}
}
func newDelegator(w http.ResponseWriter, observeWriteHeaderFunc func(int)) delegator {
d := &responseWriterDelegator{
ResponseWriter: w,
observeWriteHeader: observeWriteHeaderFunc,
}
id := 0
if _, ok := w.(http.CloseNotifier); ok {
id += closeNotifier
}
if _, ok := w.(http.Flusher); ok {
id += flusher
}
if _, ok := w.(http.Hijacker); ok {
id += hijacker
}
if _, ok := w.(io.ReaderFrom); ok {
id += readerFrom
}
if _, ok := w.(http.Pusher); ok {
id += pusher
}
return pickDelegator[id](d)
}

@ -0,0 +1,44 @@
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// +build !go1.8
package promhttp
import (
"io"
"net/http"
)
func newDelegator(w http.ResponseWriter, observeWriteHeaderFunc func(int)) delegator {
d := &responseWriterDelegator{
ResponseWriter: w,
observeWriteHeader: observeWriteHeaderFunc,
}
id := 0
if _, ok := w.(http.CloseNotifier); ok {
id += closeNotifier
}
if _, ok := w.(http.Flusher); ok {
id += flusher
}
if _, ok := w.(http.Hijacker); ok {
id += hijacker
}
if _, ok := w.(io.ReaderFrom); ok {
id += readerFrom
}
return pickDelegator[id](d)
}

@ -0,0 +1,310 @@
// Copyright 2016 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package promhttp provides tooling around HTTP servers and clients.
//
// First, the package allows the creation of http.Handler instances to expose
// Prometheus metrics via HTTP. promhttp.Handler acts on the
// prometheus.DefaultGatherer. With HandlerFor, you can create a handler for a
// custom registry or anything that implements the Gatherer interface. It also
// allows the creation of handlers that act differently on errors or allow to
// log errors.
//
// Second, the package provides tooling to instrument instances of http.Handler
// via middleware. Middleware wrappers follow the naming scheme
// InstrumentHandlerX, where X describes the intended use of the middleware.
// See each function's doc comment for specific details.
//
// Finally, the package allows for an http.RoundTripper to be instrumented via
// middleware. Middleware wrappers follow the naming scheme
// InstrumentRoundTripperX, where X describes the intended use of the
// middleware. See each function's doc comment for specific details.
package promhttp
import (
"compress/gzip"
"fmt"
"io"
"net/http"
"strings"
"sync"
"time"
"github.com/prometheus/common/expfmt"
"github.com/prometheus/client_golang/prometheus"
)
const (
contentTypeHeader = "Content-Type"
contentEncodingHeader = "Content-Encoding"
acceptEncodingHeader = "Accept-Encoding"
)
var gzipPool = sync.Pool{
New: func() interface{} {
return gzip.NewWriter(nil)
},
}
// Handler returns an http.Handler for the prometheus.DefaultGatherer, using
// default HandlerOpts, i.e. it reports the first error as an HTTP error, it has
// no error logging, and it applies compression if requested by the client.
//
// The returned http.Handler is already instrumented using the
// InstrumentMetricHandler function and the prometheus.DefaultRegisterer. If you
// create multiple http.Handlers by separate calls of the Handler function, the
// metrics used for instrumentation will be shared between them, providing
// global scrape counts.
//
// This function is meant to cover the bulk of basic use cases. If you are doing
// anything that requires more customization (including using a non-default
// Gatherer, different instrumentation, and non-default HandlerOpts), use the
// HandlerFor function. See there for details.
func Handler() http.Handler {
return InstrumentMetricHandler(
prometheus.DefaultRegisterer, HandlerFor(prometheus.DefaultGatherer, HandlerOpts{}),
)
}
// HandlerFor returns an uninstrumented http.Handler for the provided
// Gatherer. The behavior of the Handler is defined by the provided
// HandlerOpts. Thus, HandlerFor is useful to create http.Handlers for custom
// Gatherers, with non-default HandlerOpts, and/or with custom (or no)
// instrumentation. Use the InstrumentMetricHandler function to apply the same
// kind of instrumentation as it is used by the Handler function.
func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler {
var inFlightSem chan struct{}
if opts.MaxRequestsInFlight > 0 {
inFlightSem = make(chan struct{}, opts.MaxRequestsInFlight)
}
h := http.HandlerFunc(func(rsp http.ResponseWriter, req *http.Request) {
if inFlightSem != nil {
select {
case inFlightSem <- struct{}{}: // All good, carry on.
defer func() { <-inFlightSem }()
default:
http.Error(rsp, fmt.Sprintf(
"Limit of concurrent requests reached (%d), try again later.", opts.MaxRequestsInFlight,
), http.StatusServiceUnavailable)
return
}
}
mfs, err := reg.Gather()
if err != nil {
if opts.ErrorLog != nil {
opts.ErrorLog.Println("error gathering metrics:", err)
}
switch opts.ErrorHandling {
case PanicOnError:
panic(err)
case ContinueOnError:
if len(mfs) == 0 {
// Still report the error if no metrics have been gathered.
httpError(rsp, err)
return
}
case HTTPErrorOnError:
httpError(rsp, err)
return
}
}
contentType := expfmt.Negotiate(req.Header)
header := rsp.Header()
header.Set(contentTypeHeader, string(contentType))
w := io.Writer(rsp)
if !opts.DisableCompression && gzipAccepted(req.Header) {
header.Set(contentEncodingHeader, "gzip")
gz := gzipPool.Get().(*gzip.Writer)
defer gzipPool.Put(gz)
gz.Reset(w)
defer gz.Close()
w = gz
}
enc := expfmt.NewEncoder(w, contentType)
var lastErr error
for _, mf := range mfs {
if err := enc.Encode(mf); err != nil {
lastErr = err
if opts.ErrorLog != nil {
opts.ErrorLog.Println("error encoding and sending metric family:", err)
}
switch opts.ErrorHandling {
case PanicOnError:
panic(err)
case ContinueOnError:
// Handled later.
case HTTPErrorOnError:
httpError(rsp, err)
return
}
}
}
if lastErr != nil {
httpError(rsp, lastErr)
}
})
if opts.Timeout <= 0 {
return h
}
return http.TimeoutHandler(h, opts.Timeout, fmt.Sprintf(
"Exceeded configured timeout of %v.\n",
opts.Timeout,
))
}
// InstrumentMetricHandler is usually used with an http.Handler returned by the
// HandlerFor function. It instruments the provided http.Handler with two
// metrics: A counter vector "promhttp_metric_handler_requests_total" to count
// scrapes partitioned by HTTP status code, and a gauge
// "promhttp_metric_handler_requests_in_flight" to track the number of
// simultaneous scrapes. This function idempotently registers collectors for
// both metrics with the provided Registerer. It panics if the registration
// fails. The provided metrics are useful to see how many scrapes hit the
// monitored target (which could be from different Prometheus servers or other
// scrapers), and how often they overlap (which would result in more than one
// scrape in flight at the same time). Note that the scrapes-in-flight gauge
// will contain the scrape by which it is exposed, while the scrape counter will
// only get incremented after the scrape is complete (as only then the status
// code is known). For tracking scrape durations, use the
// "scrape_duration_seconds" gauge created by the Prometheus server upon each
// scrape.
func InstrumentMetricHandler(reg prometheus.Registerer, handler http.Handler) http.Handler {
cnt := prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "promhttp_metric_handler_requests_total",
Help: "Total number of scrapes by HTTP status code.",
},
[]string{"code"},
)
// Initialize the most likely HTTP status codes.
cnt.WithLabelValues("200")
cnt.WithLabelValues("500")
cnt.WithLabelValues("503")
if err := reg.Register(cnt); err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
cnt = are.ExistingCollector.(*prometheus.CounterVec)
} else {
panic(err)
}
}
gge := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "promhttp_metric_handler_requests_in_flight",
Help: "Current number of scrapes being served.",
})
if err := reg.Register(gge); err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
gge = are.ExistingCollector.(prometheus.Gauge)
} else {
panic(err)
}
}
return InstrumentHandlerCounter(cnt, InstrumentHandlerInFlight(gge, handler))
}
// HandlerErrorHandling defines how a Handler serving metrics will handle
// errors.
type HandlerErrorHandling int
// These constants cause handlers serving metrics to behave as described if
// errors are encountered.
const (
// Serve an HTTP status code 500 upon the first error
// encountered. Report the error message in the body.
HTTPErrorOnError HandlerErrorHandling = iota
// Ignore errors and try to serve as many metrics as possible. However,
// if no metrics can be served, serve an HTTP status code 500 and the
// last error message in the body. Only use this in deliberate "best
// effort" metrics collection scenarios. It is recommended to at least
// log errors (by providing an ErrorLog in HandlerOpts) to not mask
// errors completely.
ContinueOnError
// Panic upon the first error encountered (useful for "crash only" apps).
PanicOnError
)
// Logger is the minimal interface HandlerOpts needs for logging. Note that
// log.Logger from the standard library implements this interface, and it is
// easy to implement by custom loggers, if they don't do so already anyway.
type Logger interface {
Println(v ...interface{})
}
// HandlerOpts specifies options how to serve metrics via an http.Handler. The
// zero value of HandlerOpts is a reasonable default.
type HandlerOpts struct {
// ErrorLog specifies an optional logger for errors collecting and
// serving metrics. If nil, errors are not logged at all.
ErrorLog Logger
// ErrorHandling defines how errors are handled. Note that errors are
// logged regardless of the configured ErrorHandling provided ErrorLog
// is not nil.
ErrorHandling HandlerErrorHandling
// If DisableCompression is true, the handler will never compress the
// response, even if requested by the client.
DisableCompression bool
// The number of concurrent HTTP requests is limited to
// MaxRequestsInFlight. Additional requests are responded to with 503
// Service Unavailable and a suitable message in the body. If
// MaxRequestsInFlight is 0 or negative, no limit is applied.
MaxRequestsInFlight int
// If handling a request takes longer than Timeout, it is responded to
// with 503 ServiceUnavailable and a suitable Message. No timeout is
// applied if Timeout is 0 or negative. Note that with the current
// implementation, reaching the timeout simply ends the HTTP requests as
// described above (and even that only if sending of the body hasn't
// started yet), while the bulk work of gathering all the metrics keeps
// running in the background (with the eventual result to be thrown
// away). Until the implementation is improved, it is recommended to
// implement a separate timeout in potentially slow Collectors.
Timeout time.Duration
}
// gzipAccepted returns whether the client will accept gzip-encoded content.
func gzipAccepted(header http.Header) bool {
a := header.Get(acceptEncodingHeader)
parts := strings.Split(a, ",")
for _, part := range parts {
part = strings.TrimSpace(part)
if part == "gzip" || strings.HasPrefix(part, "gzip;") {
return true
}
}
return false
}
// httpError removes any content-encoding header and then calls http.Error with
// the provided error and http.StatusInternalServerErrer. Error contents is
// supposed to be uncompressed plain text. However, same as with a plain
// http.Error, any header settings will be void if the header has already been
// sent. The error message will still be written to the writer, but it will
// probably be of limited use.
func httpError(rsp http.ResponseWriter, err error) {
rsp.Header().Del(contentEncodingHeader)
http.Error(
rsp,
"An error has occurred while serving metrics:\n\n"+err.Error(),
http.StatusInternalServerError,
)
}

@ -0,0 +1,97 @@
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package promhttp
import (
"net/http"
"time"
"github.com/prometheus/client_golang/prometheus"
)
// The RoundTripperFunc type is an adapter to allow the use of ordinary
// functions as RoundTrippers. If f is a function with the appropriate
// signature, RountTripperFunc(f) is a RoundTripper that calls f.
type RoundTripperFunc func(req *http.Request) (*http.Response, error)
// RoundTrip implements the RoundTripper interface.
func (rt RoundTripperFunc) RoundTrip(r *http.Request) (*http.Response, error) {
return rt(r)
}
// InstrumentRoundTripperInFlight is a middleware that wraps the provided
// http.RoundTripper. It sets the provided prometheus.Gauge to the number of
// requests currently handled by the wrapped http.RoundTripper.
//
// See the example for ExampleInstrumentRoundTripperDuration for example usage.
func InstrumentRoundTripperInFlight(gauge prometheus.Gauge, next http.RoundTripper) RoundTripperFunc {
return RoundTripperFunc(func(r *http.Request) (*http.Response, error) {
gauge.Inc()
defer gauge.Dec()
return next.RoundTrip(r)
})
}
// InstrumentRoundTripperCounter is a middleware that wraps the provided
// http.RoundTripper to observe the request result with the provided CounterVec.
// The CounterVec must have zero, one, or two non-const non-curried labels. For
// those, the only allowed label names are "code" and "method". The function
// panics otherwise. Partitioning of the CounterVec happens by HTTP status code
// and/or HTTP method if the respective instance label names are present in the
// CounterVec. For unpartitioned counting, use a CounterVec with zero labels.
//
// If the wrapped RoundTripper panics or returns a non-nil error, the Counter
// is not incremented.
//
// See the example for ExampleInstrumentRoundTripperDuration for example usage.
func InstrumentRoundTripperCounter(counter *prometheus.CounterVec, next http.RoundTripper) RoundTripperFunc {
code, method := checkLabels(counter)
return RoundTripperFunc(func(r *http.Request) (*http.Response, error) {
resp, err := next.RoundTrip(r)
if err == nil {
counter.With(labels(code, method, r.Method, resp.StatusCode)).Inc()
}
return resp, err
})
}
// InstrumentRoundTripperDuration is a middleware that wraps the provided
// http.RoundTripper to observe the request duration with the provided
// ObserverVec. The ObserverVec must have zero, one, or two non-const
// non-curried labels. For those, the only allowed label names are "code" and
// "method". The function panics otherwise. The Observe method of the Observer
// in the ObserverVec is called with the request duration in
// seconds. Partitioning happens by HTTP status code and/or HTTP method if the
// respective instance label names are present in the ObserverVec. For
// unpartitioned observations, use an ObserverVec with zero labels. Note that
// partitioning of Histograms is expensive and should be used judiciously.
//
// If the wrapped RoundTripper panics or returns a non-nil error, no values are
// reported.
//
// Note that this method is only guaranteed to never observe negative durations
// if used with Go1.9+.
func InstrumentRoundTripperDuration(obs prometheus.ObserverVec, next http.RoundTripper) RoundTripperFunc {
code, method := checkLabels(obs)
return RoundTripperFunc(func(r *http.Request) (*http.Response, error) {
start := time.Now()
resp, err := next.RoundTrip(r)
if err == nil {
obs.With(labels(code, method, r.Method, resp.StatusCode)).Observe(time.Since(start).Seconds())
}
return resp, err
})
}

@ -0,0 +1,144 @@
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// +build go1.8
package promhttp
import (
"context"
"crypto/tls"
"net/http"
"net/http/httptrace"
"time"
)
// InstrumentTrace is used to offer flexibility in instrumenting the available
// httptrace.ClientTrace hook functions. Each function is passed a float64
// representing the time in seconds since the start of the http request. A user
// may choose to use separately buckets Histograms, or implement custom
// instance labels on a per function basis.
type InstrumentTrace struct {
GotConn func(float64)
PutIdleConn func(float64)
GotFirstResponseByte func(float64)
Got100Continue func(float64)
DNSStart func(float64)
DNSDone func(float64)
ConnectStart func(float64)
ConnectDone func(float64)
TLSHandshakeStart func(float64)
TLSHandshakeDone func(float64)
WroteHeaders func(float64)
Wait100Continue func(float64)
WroteRequest func(float64)
}
// InstrumentRoundTripperTrace is a middleware that wraps the provided
// RoundTripper and reports times to hook functions provided in the
// InstrumentTrace struct. Hook functions that are not present in the provided
// InstrumentTrace struct are ignored. Times reported to the hook functions are
// time since the start of the request. Only with Go1.9+, those times are
// guaranteed to never be negative. (Earlier Go versions are not using a
// monotonic clock.) Note that partitioning of Histograms is expensive and
// should be used judiciously.
//
// For hook functions that receive an error as an argument, no observations are
// made in the event of a non-nil error value.
//
// See the example for ExampleInstrumentRoundTripperDuration for example usage.
func InstrumentRoundTripperTrace(it *InstrumentTrace, next http.RoundTripper) RoundTripperFunc {
return RoundTripperFunc(func(r *http.Request) (*http.Response, error) {
start := time.Now()
trace := &httptrace.ClientTrace{
GotConn: func(_ httptrace.GotConnInfo) {
if it.GotConn != nil {
it.GotConn(time.Since(start).Seconds())
}
},
PutIdleConn: func(err error) {
if err != nil {
return
}
if it.PutIdleConn != nil {
it.PutIdleConn(time.Since(start).Seconds())
}
},
DNSStart: func(_ httptrace.DNSStartInfo) {
if it.DNSStart != nil {
it.DNSStart(time.Since(start).Seconds())
}
},
DNSDone: func(_ httptrace.DNSDoneInfo) {
if it.DNSDone != nil {
it.DNSDone(time.Since(start).Seconds())
}
},
ConnectStart: func(_, _ string) {
if it.ConnectStart != nil {
it.ConnectStart(time.Since(start).Seconds())
}
},
ConnectDone: func(_, _ string, err error) {
if err != nil {
return
}
if it.ConnectDone != nil {
it.ConnectDone(time.Since(start).Seconds())
}
},
GotFirstResponseByte: func() {
if it.GotFirstResponseByte != nil {
it.GotFirstResponseByte(time.Since(start).Seconds())
}
},
Got100Continue: func() {
if it.Got100Continue != nil {
it.Got100Continue(time.Since(start).Seconds())
}
},
TLSHandshakeStart: func() {
if it.TLSHandshakeStart != nil {
it.TLSHandshakeStart(time.Since(start).Seconds())
}
},
TLSHandshakeDone: func(_ tls.ConnectionState, err error) {
if err != nil {
return
}
if it.TLSHandshakeDone != nil {
it.TLSHandshakeDone(time.Since(start).Seconds())
}
},
WroteHeaders: func() {
if it.WroteHeaders != nil {
it.WroteHeaders(time.Since(start).Seconds())
}
},
Wait100Continue: func() {
if it.Wait100Continue != nil {
it.Wait100Continue(time.Since(start).Seconds())
}
},
WroteRequest: func(_ httptrace.WroteRequestInfo) {
if it.WroteRequest != nil {
it.WroteRequest(time.Since(start).Seconds())
}
},
}
r = r.WithContext(httptrace.WithClientTrace(context.Background(), trace))
return next.RoundTrip(r)
})
}

@ -0,0 +1,447 @@
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package promhttp
import (
"errors"
"net/http"
"strconv"
"strings"
"time"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/client_golang/prometheus"
)
// magicString is used for the hacky label test in checkLabels. Remove once fixed.
const magicString = "zZgWfBxLqvG8kc8IMv3POi2Bb0tZI3vAnBx+gBaFi9FyPzB/CzKUer1yufDa"
// InstrumentHandlerInFlight is a middleware that wraps the provided
// http.Handler. It sets the provided prometheus.Gauge to the number of
// requests currently handled by the wrapped http.Handler.
//
// See the example for InstrumentHandlerDuration for example usage.
func InstrumentHandlerInFlight(g prometheus.Gauge, next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
g.Inc()
defer g.Dec()
next.ServeHTTP(w, r)
})
}
// InstrumentHandlerDuration is a middleware that wraps the provided
// http.Handler to observe the request duration with the provided ObserverVec.
// The ObserverVec must have zero, one, or two non-const non-curried labels. For
// those, the only allowed label names are "code" and "method". The function
// panics otherwise. The Observe method of the Observer in the ObserverVec is
// called with the request duration in seconds. Partitioning happens by HTTP
// status code and/or HTTP method if the respective instance label names are
// present in the ObserverVec. For unpartitioned observations, use an
// ObserverVec with zero labels. Note that partitioning of Histograms is
// expensive and should be used judiciously.
//
// If the wrapped Handler does not set a status code, a status code of 200 is assumed.
//
// If the wrapped Handler panics, no values are reported.
//
// Note that this method is only guaranteed to never observe negative durations
// if used with Go1.9+.
func InstrumentHandlerDuration(obs prometheus.ObserverVec, next http.Handler) http.HandlerFunc {
code, method := checkLabels(obs)
if code {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
now := time.Now()
d := newDelegator(w, nil)
next.ServeHTTP(d, r)
obs.With(labels(code, method, r.Method, d.Status())).Observe(time.Since(now).Seconds())
})
}
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
now := time.Now()
next.ServeHTTP(w, r)
obs.With(labels(code, method, r.Method, 0)).Observe(time.Since(now).Seconds())
})
}
// InstrumentHandlerCounter is a middleware that wraps the provided http.Handler
// to observe the request result with the provided CounterVec. The CounterVec
// must have zero, one, or two non-const non-curried labels. For those, the only
// allowed label names are "code" and "method". The function panics
// otherwise. Partitioning of the CounterVec happens by HTTP status code and/or
// HTTP method if the respective instance label names are present in the
// CounterVec. For unpartitioned counting, use a CounterVec with zero labels.
//
// If the wrapped Handler does not set a status code, a status code of 200 is assumed.
//
// If the wrapped Handler panics, the Counter is not incremented.
//
// See the example for InstrumentHandlerDuration for example usage.
func InstrumentHandlerCounter(counter *prometheus.CounterVec, next http.Handler) http.HandlerFunc {
code, method := checkLabels(counter)
if code {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
d := newDelegator(w, nil)
next.ServeHTTP(d, r)
counter.With(labels(code, method, r.Method, d.Status())).Inc()
})
}
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
next.ServeHTTP(w, r)
counter.With(labels(code, method, r.Method, 0)).Inc()
})
}
// InstrumentHandlerTimeToWriteHeader is a middleware that wraps the provided
// http.Handler to observe with the provided ObserverVec the request duration
// until the response headers are written. The ObserverVec must have zero, one,
// or two non-const non-curried labels. For those, the only allowed label names
// are "code" and "method". The function panics otherwise. The Observe method of
// the Observer in the ObserverVec is called with the request duration in
// seconds. Partitioning happens by HTTP status code and/or HTTP method if the
// respective instance label names are present in the ObserverVec. For
// unpartitioned observations, use an ObserverVec with zero labels. Note that
// partitioning of Histograms is expensive and should be used judiciously.
//
// If the wrapped Handler panics before calling WriteHeader, no value is
// reported.
//
// Note that this method is only guaranteed to never observe negative durations
// if used with Go1.9+.
//
// See the example for InstrumentHandlerDuration for example usage.
func InstrumentHandlerTimeToWriteHeader(obs prometheus.ObserverVec, next http.Handler) http.HandlerFunc {
code, method := checkLabels(obs)
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
now := time.Now()
d := newDelegator(w, func(status int) {
obs.With(labels(code, method, r.Method, status)).Observe(time.Since(now).Seconds())
})
next.ServeHTTP(d, r)
})
}
// InstrumentHandlerRequestSize is a middleware that wraps the provided
// http.Handler to observe the request size with the provided ObserverVec. The
// ObserverVec must have zero, one, or two non-const non-curried labels. For
// those, the only allowed label names are "code" and "method". The function
// panics otherwise. The Observe method of the Observer in the ObserverVec is
// called with the request size in bytes. Partitioning happens by HTTP status
// code and/or HTTP method if the respective instance label names are present in
// the ObserverVec. For unpartitioned observations, use an ObserverVec with zero
// labels. Note that partitioning of Histograms is expensive and should be used
// judiciously.
//
// If the wrapped Handler does not set a status code, a status code of 200 is assumed.
//
// If the wrapped Handler panics, no values are reported.
//
// See the example for InstrumentHandlerDuration for example usage.
func InstrumentHandlerRequestSize(obs prometheus.ObserverVec, next http.Handler) http.HandlerFunc {
code, method := checkLabels(obs)
if code {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
d := newDelegator(w, nil)
next.ServeHTTP(d, r)
size := computeApproximateRequestSize(r)
obs.With(labels(code, method, r.Method, d.Status())).Observe(float64(size))
})
}
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
next.ServeHTTP(w, r)
size := computeApproximateRequestSize(r)
obs.With(labels(code, method, r.Method, 0)).Observe(float64(size))
})
}
// InstrumentHandlerResponseSize is a middleware that wraps the provided
// http.Handler to observe the response size with the provided ObserverVec. The
// ObserverVec must have zero, one, or two non-const non-curried labels. For
// those, the only allowed label names are "code" and "method". The function
// panics otherwise. The Observe method of the Observer in the ObserverVec is
// called with the response size in bytes. Partitioning happens by HTTP status
// code and/or HTTP method if the respective instance label names are present in
// the ObserverVec. For unpartitioned observations, use an ObserverVec with zero
// labels. Note that partitioning of Histograms is expensive and should be used
// judiciously.
//
// If the wrapped Handler does not set a status code, a status code of 200 is assumed.
//
// If the wrapped Handler panics, no values are reported.
//
// See the example for InstrumentHandlerDuration for example usage.
func InstrumentHandlerResponseSize(obs prometheus.ObserverVec, next http.Handler) http.Handler {
code, method := checkLabels(obs)
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
d := newDelegator(w, nil)
next.ServeHTTP(d, r)
obs.With(labels(code, method, r.Method, d.Status())).Observe(float64(d.Written()))
})
}
func checkLabels(c prometheus.Collector) (code bool, method bool) {
// TODO(beorn7): Remove this hacky way to check for instance labels
// once Descriptors can have their dimensionality queried.
var (
desc *prometheus.Desc
m prometheus.Metric
pm dto.Metric
lvs []string
)
// Get the Desc from the Collector.
descc := make(chan *prometheus.Desc, 1)
c.Describe(descc)
select {
case desc = <-descc:
default:
panic("no description provided by collector")
}
select {
case <-descc:
panic("more than one description provided by collector")
default:
}
close(descc)
// Create a ConstMetric with the Desc. Since we don't know how many
// variable labels there are, try for as long as it needs.
for err := errors.New("dummy"); err != nil; lvs = append(lvs, magicString) {
m, err = prometheus.NewConstMetric(desc, prometheus.UntypedValue, 0, lvs...)
}
// Write out the metric into a proto message and look at the labels.
// If the value is not the magicString, it is a constLabel, which doesn't interest us.
// If the label is curried, it doesn't interest us.
// In all other cases, only "code" or "method" is allowed.
if err := m.Write(&pm); err != nil {
panic("error checking metric for labels")
}
for _, label := range pm.Label {
name, value := label.GetName(), label.GetValue()
if value != magicString || isLabelCurried(c, name) {
continue
}
switch name {
case "code":
code = true
case "method":
method = true
default:
panic("metric partitioned with non-supported labels")
}
}
return
}
func isLabelCurried(c prometheus.Collector, label string) bool {
// This is even hackier than the label test above.
// We essentially try to curry again and see if it works.
// But for that, we need to type-convert to the two
// types we use here, ObserverVec or *CounterVec.
switch v := c.(type) {
case *prometheus.CounterVec:
if _, err := v.CurryWith(prometheus.Labels{label: "dummy"}); err == nil {
return false
}
case prometheus.ObserverVec:
if _, err := v.CurryWith(prometheus.Labels{label: "dummy"}); err == nil {
return false
}
default:
panic("unsupported metric vec type")
}
return true
}
// emptyLabels is a one-time allocation for non-partitioned metrics to avoid
// unnecessary allocations on each request.
var emptyLabels = prometheus.Labels{}
func labels(code, method bool, reqMethod string, status int) prometheus.Labels {
if !(code || method) {
return emptyLabels
}
labels := prometheus.Labels{}
if code {
labels["code"] = sanitizeCode(status)
}
if method {
labels["method"] = sanitizeMethod(reqMethod)
}
return labels
}
func computeApproximateRequestSize(r *http.Request) int {
s := 0
if r.URL != nil {
s += len(r.URL.String())
}
s += len(r.Method)
s += len(r.Proto)
for name, values := range r.Header {
s += len(name)
for _, value := range values {
s += len(value)
}
}
s += len(r.Host)
// N.B. r.Form and r.MultipartForm are assumed to be included in r.URL.
if r.ContentLength != -1 {
s += int(r.ContentLength)
}
return s
}
func sanitizeMethod(m string) string {
switch m {
case "GET", "get":
return "get"
case "PUT", "put":
return "put"
case "HEAD", "head":
return "head"
case "POST", "post":
return "post"
case "DELETE", "delete":
return "delete"
case "CONNECT", "connect":
return "connect"
case "OPTIONS", "options":
return "options"
case "NOTIFY", "notify":
return "notify"
default:
return strings.ToLower(m)
}
}
// If the wrapped http.Handler has not set a status code, i.e. the value is
// currently 0, santizeCode will return 200, for consistency with behavior in
// the stdlib.
func sanitizeCode(s int) string {
switch s {
case 100:
return "100"
case 101:
return "101"
case 200, 0:
return "200"
case 201:
return "201"
case 202:
return "202"
case 203:
return "203"
case 204:
return "204"
case 205:
return "205"
case 206:
return "206"
case 300:
return "300"
case 301:
return "301"
case 302:
return "302"
case 304:
return "304"
case 305:
return "305"
case 307:
return "307"
case 400:
return "400"
case 401:
return "401"
case 402:
return "402"
case 403:
return "403"
case 404:
return "404"
case 405:
return "405"
case 406:
return "406"
case 407:
return "407"
case 408:
return "408"
case 409:
return "409"
case 410:
return "410"
case 411:
return "411"
case 412:
return "412"
case 413:
return "413"
case 414:
return "414"
case 415:
return "415"
case 416:
return "416"
case 417:
return "417"
case 418:
return "418"
case 500:
return "500"
case 501:
return "501"
case 502:
return "502"
case 503:
return "503"
case 504:
return "504"
case 505:
return "505"
case 428:
return "428"
case 429:
return "429"
case 431:
return "431"
case 511:
return "511"
default:
return strconv.Itoa(s)
}
}
Loading…
Cancel
Save