From a5e47a50e84a86860e91ce6d575d195df309cced Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Mon, 13 May 2019 14:39:07 -0400 Subject: [PATCH] adds matchers and metrics to regex and json --- Gopkg.lock | 4 +- pkg/logentry/metric/entries.go | 14 +- pkg/logentry/metric/entries_test.go | 68 ++- pkg/logentry/pipeline.go | 31 +- pkg/logentry/pipeline_test.go | 6 +- pkg/logentry/stages/configs.go | 58 +++ pkg/logentry/stages/extensions.go | 8 +- pkg/logentry/stages/json.go | 102 ++-- pkg/logentry/stages/json_test.go | 16 +- pkg/logentry/stages/match.go | 28 ++ pkg/logentry/stages/match_test.go | 59 +++ pkg/logentry/stages/metrics.go | 51 +- pkg/logentry/stages/metrics_test.go | 2 +- pkg/logentry/stages/mutator.go | 29 ++ pkg/logentry/stages/regex.go | 110 ++--- pkg/logentry/stages/regex_test.go | 18 +- pkg/logentry/stages/stage.go | 20 + pkg/logql/parser.go | 17 + pkg/promtail/promtail.go | 8 +- pkg/promtail/server/server.go | 6 +- pkg/promtail/targets/filetargetmanager.go | 7 +- pkg/promtail/targets/manager.go | 3 + .../prometheus/promhttp/delegator.go | 198 ++++++++ .../prometheus/promhttp/delegator_1_8.go | 181 +++++++ .../prometheus/promhttp/delegator_pre_1_8.go | 44 ++ .../client_golang/prometheus/promhttp/http.go | 310 ++++++++++++ .../prometheus/promhttp/instrument_client.go | 97 ++++ .../promhttp/instrument_client_1_8.go | 144 ++++++ .../prometheus/promhttp/instrument_server.go | 447 ++++++++++++++++++ 29 files changed, 1849 insertions(+), 237 deletions(-) create mode 100644 pkg/logentry/stages/configs.go create mode 100644 pkg/logentry/stages/match.go create mode 100644 pkg/logentry/stages/match_test.go create mode 100644 pkg/logentry/stages/mutator.go create mode 100644 vendor/github.com/prometheus/client_golang/prometheus/promhttp/delegator.go create mode 100644 vendor/github.com/prometheus/client_golang/prometheus/promhttp/delegator_1_8.go create mode 100644 vendor/github.com/prometheus/client_golang/prometheus/promhttp/delegator_pre_1_8.go create mode 100644 vendor/github.com/prometheus/client_golang/prometheus/promhttp/http.go create mode 100644 vendor/github.com/prometheus/client_golang/prometheus/promhttp/instrument_client.go create mode 100644 vendor/github.com/prometheus/client_golang/prometheus/promhttp/instrument_client_1_8.go create mode 100644 vendor/github.com/prometheus/client_golang/prometheus/promhttp/instrument_server.go diff --git a/Gopkg.lock b/Gopkg.lock index e5d96bd510..503ec69a79 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -703,7 +703,7 @@ [[projects]] branch = "master" - digest = "1:ad2a544bc4bc4fcaa6f0b1622c4f8652da6e74b101925c263b912232e1b68cf2" + digest = "1:dff66fce6bb8fa6111998bf3575e22aa3d0fd2560712a83de3390236d3cd1f6b" name = "github.com/prometheus/client_golang" packages = [ "api", @@ -711,6 +711,7 @@ "prometheus", "prometheus/internal", "prometheus/promauto", + "prometheus/promhttp", "prometheus/testutil", ] pruneopts = "UT" @@ -1407,6 +1408,7 @@ "github.com/pkg/errors", "github.com/prometheus/client_golang/prometheus", "github.com/prometheus/client_golang/prometheus/promauto", + "github.com/prometheus/client_golang/prometheus/promhttp", "github.com/prometheus/client_golang/prometheus/testutil", "github.com/prometheus/common/config", "github.com/prometheus/common/model", diff --git a/pkg/logentry/metric/entries.go b/pkg/logentry/metric/entries.go index 9d656ff2ac..34f8cade94 100644 --- a/pkg/logentry/metric/entries.go +++ b/pkg/logentry/metric/entries.go @@ -8,20 +8,26 @@ import ( "github.com/prometheus/common/model" ) -func LogCount(reg prometheus.Registerer) api.EntryHandler { +func LogCount(reg prometheus.Registerer, next api.EntryHandler) api.EntryHandler { c := NewCounters("log_entries_total", "the total count of log entries") reg.MustRegister(c) return api.EntryHandlerFunc(func(labels model.LabelSet, time time.Time, entry string) error { + if err := next.Handle(labels, time, entry); err != nil { + return err + } c.With(labels).Inc() return nil }) } -func LogSize(reg prometheus.Registerer) api.EntryHandler { - c := NewCounters("log_entries_bytes", "the total count of bytes") +func LogSize(reg prometheus.Registerer, next api.EntryHandler) api.EntryHandler { + c := NewHistograms("log_entries_bytes", "the total count of bytes", prometheus.ExponentialBuckets(16, 2, 8)) reg.MustRegister(c) return api.EntryHandlerFunc(func(labels model.LabelSet, time time.Time, entry string) error { - c.With(labels).Add(float64(len(entry))) + if err := next.Handle(labels, time, entry); err != nil { + return err + } + c.With(labels).Observe(float64(len(entry))) return nil }) } diff --git a/pkg/logentry/metric/entries_test.go b/pkg/logentry/metric/entries_test.go index 41dde2989f..fa88c4245a 100644 --- a/pkg/logentry/metric/entries_test.go +++ b/pkg/logentry/metric/entries_test.go @@ -1,11 +1,14 @@ package metric import ( + "errors" "strings" "sync" "testing" "time" + "github.com/grafana/loki/pkg/promtail/api" + "github.com/prometheus/client_golang/prometheus" testutil "github.com/prometheus/client_golang/prometheus/testutil" @@ -20,10 +23,17 @@ log_entries_total{bar="foo"} 5.0 log_entries_total{bar="foo",foo="bar"} 5.0 ` +var errorHandler = api.EntryHandlerFunc(func(labels model.LabelSet, time time.Time, entry string) error { + if entry == "error" { + return errors.New("") + } + return nil +}) + func Test_LogCount(t *testing.T) { t.Parallel() reg := prometheus.NewRegistry() - handler := LogCount(reg) + handler := LogCount(reg, errorHandler) workerCount := 5 var wg sync.WaitGroup @@ -36,6 +46,8 @@ func Test_LogCount(t *testing.T) { _ = handler.Handle(model.LabelSet(map[model.LabelName]model.LabelValue{"bar": "foo"}), time.Now(), "") _ = handler.Handle(model.LabelSet(map[model.LabelName]model.LabelValue{"bar": "foo", "foo": "bar"}), time.Now(), "") _ = handler.Handle(model.LabelSet(map[model.LabelName]model.LabelValue{}), time.Now(), "") + _ = handler.Handle(model.LabelSet(map[model.LabelName]model.LabelValue{"foo": "bar"}), time.Now(), "error") + _ = handler.Handle(model.LabelSet(map[model.LabelName]model.LabelValue{}), time.Now(), "error") }() } @@ -48,17 +60,57 @@ func Test_LogCount(t *testing.T) { } const expectedSize = `# HELP log_entries_bytes the total count of bytes -# TYPE log_entries_bytes counter -log_entries_bytes 35.0 -log_entries_bytes{foo="bar"} 15.0 -log_entries_bytes{bar="foo"} 10.0 -log_entries_bytes{bar="foo",foo="bar"} 15.0 +# TYPE log_entries_bytes histogram +log_entries_bytes_bucket{le="16.0"} 10.0 +log_entries_bytes_bucket{le="32.0"} 10.0 +log_entries_bytes_bucket{le="64.0"} 10.0 +log_entries_bytes_bucket{le="128.0"} 10.0 +log_entries_bytes_bucket{le="256.0"} 10.0 +log_entries_bytes_bucket{le="512.0"} 10.0 +log_entries_bytes_bucket{le="1024.0"} 10.0 +log_entries_bytes_bucket{le="2048.0"} 10.0 +log_entries_bytes_bucket{le="+Inf"} 10.0 +log_entries_bytes_sum 35.0 +log_entries_bytes_count 10.0 +log_entries_bytes_bucket{foo="bar",le="16.0"} 5.0 +log_entries_bytes_bucket{foo="bar",le="32.0"} 5.0 +log_entries_bytes_bucket{foo="bar",le="64.0"} 5.0 +log_entries_bytes_bucket{foo="bar",le="128.0"} 5.0 +log_entries_bytes_bucket{foo="bar",le="256.0"} 5.0 +log_entries_bytes_bucket{foo="bar",le="512.0"} 5.0 +log_entries_bytes_bucket{foo="bar",le="1024.0"} 5.0 +log_entries_bytes_bucket{foo="bar",le="2048.0"} 5.0 +log_entries_bytes_bucket{foo="bar",le="+Inf"} 5.0 +log_entries_bytes_sum{foo="bar"} 15.0 +log_entries_bytes_count{foo="bar"} 5.0 +log_entries_bytes_bucket{bar="foo",le="16.0"} 5.0 +log_entries_bytes_bucket{bar="foo",le="32.0"} 5.0 +log_entries_bytes_bucket{bar="foo",le="64.0"} 5.0 +log_entries_bytes_bucket{bar="foo",le="128.0"} 5.0 +log_entries_bytes_bucket{bar="foo",le="256.0"} 5.0 +log_entries_bytes_bucket{bar="foo",le="512.0"} 5.0 +log_entries_bytes_bucket{bar="foo",le="1024.0"} 5.0 +log_entries_bytes_bucket{bar="foo",le="2048.0"} 5.0 +log_entries_bytes_bucket{bar="foo",le="+Inf"} 5.0 +log_entries_bytes_sum{bar="foo"} 10.0 +log_entries_bytes_count{bar="foo"} 5.0 +log_entries_bytes_bucket{bar="foo",foo="bar",le="16.0"} 5.0 +log_entries_bytes_bucket{bar="foo",foo="bar",le="32.0"} 5.0 +log_entries_bytes_bucket{bar="foo",foo="bar",le="64.0"} 5.0 +log_entries_bytes_bucket{bar="foo",foo="bar",le="128.0"} 5.0 +log_entries_bytes_bucket{bar="foo",foo="bar",le="256.0"} 5.0 +log_entries_bytes_bucket{bar="foo",foo="bar",le="512.0"} 5.0 +log_entries_bytes_bucket{bar="foo",foo="bar",le="1024.0"} 5.0 +log_entries_bytes_bucket{bar="foo",foo="bar",le="2048.0"} 5.0 +log_entries_bytes_bucket{bar="foo",foo="bar",le="+Inf"} 5.0 +log_entries_bytes_sum{bar="foo",foo="bar"} 15.0 +log_entries_bytes_count{bar="foo",foo="bar"} 5.0 ` func Test_LogSize(t *testing.T) { t.Parallel() reg := prometheus.NewRegistry() - handler := LogSize(reg) + handler := LogSize(reg, errorHandler) workerCount := 5 var wg sync.WaitGroup @@ -71,6 +123,8 @@ func Test_LogSize(t *testing.T) { _ = handler.Handle(model.LabelSet(map[model.LabelName]model.LabelValue{"bar": "foo"}), time.Now(), "fu") _ = handler.Handle(model.LabelSet(map[model.LabelName]model.LabelValue{"bar": "foo", "foo": "bar"}), time.Now(), "baz") _ = handler.Handle(model.LabelSet(map[model.LabelName]model.LabelValue{}), time.Now(), "more") + _ = handler.Handle(model.LabelSet(map[model.LabelName]model.LabelValue{}), time.Now(), "error") + _ = handler.Handle(model.LabelSet(map[model.LabelName]model.LabelValue{"bar": "foo", "foo": "bar"}), time.Now(), "error") }() } diff --git a/pkg/logentry/pipeline.go b/pkg/logentry/pipeline.go index 19e1ec63cb..52bd36810d 100644 --- a/pkg/logentry/pipeline.go +++ b/pkg/logentry/pipeline.go @@ -34,7 +34,7 @@ type Pipeline struct { } // NewPipeline creates a new log entry pipeline from a configuration -func NewPipeline(logger log.Logger, stgs PipelineStages, jobName string) (*Pipeline, error) { +func NewPipeline(logger log.Logger, stgs PipelineStages, jobName string, logRegistry prometheus.Registerer) (*Pipeline, error) { st := []stages.Stage{} for _, s := range stgs { stage, ok := s.(map[interface{}]interface{}) @@ -50,32 +50,11 @@ func NewPipeline(logger log.Logger, stgs PipelineStages, jobName string) (*Pipel if !ok { return nil, errors.New("pipeline stage key must be a string") } - switch name { - case "json": - json, err := stages.NewJSON(logger, config) - if err != nil { - return nil, errors.Wrap(err, "invalid json stage config") - } - st = append(st, json) - case "regex": - regex, err := stages.NewRegex(logger, config) - if err != nil { - return nil, errors.Wrap(err, "invalid regex stage config") - } - st = append(st, regex) - case "docker": - docker, err := stages.NewDocker(logger) - if err != nil { - return nil, errors.Wrap(err, "invalid docker stage config") - } - st = append(st, docker) - case "cri": - cri, err := stages.NewCRI(logger) - if err != nil { - return nil, errors.Wrap(err, "invalid cri stage config") - } - st = append(st, cri) + newStage, err := stages.New(logger, name, config, logRegistry) + if err != nil { + return nil, errors.Wrapf(err, "invalid %s stage config", name) } + st = append(st, newStage) } } return &Pipeline{ diff --git a/pkg/logentry/pipeline_test.go b/pkg/logentry/pipeline_test.go index 8d26b97e3a..860057e7d0 100644 --- a/pkg/logentry/pipeline_test.go +++ b/pkg/logentry/pipeline_test.go @@ -42,7 +42,7 @@ func loadConfig(yml string) PipelineStages { func TestNewPipeline(t *testing.T) { - p, err := NewPipeline(util.Logger, loadConfig(testYaml), "test") + p, err := NewPipeline(util.Logger, loadConfig(testYaml), "test", nil) if err != nil { panic(err) } @@ -62,7 +62,7 @@ func TestPipeline_MultiStage(t *testing.T) { if err != nil { panic(err) } - p, err := NewPipeline(util.Logger, config["pipeline_stages"].([]interface{}), "test") + p, err := NewPipeline(util.Logger, config["pipeline_stages"].([]interface{}), "test", nil) if err != nil { panic(err) } @@ -138,7 +138,7 @@ func Benchmark(b *testing.B) { } for _, bm := range benchmarks { b.Run(bm.name, func(b *testing.B) { - pl, err := NewPipeline(bm.logger, bm.stgs, "test") + pl, err := NewPipeline(bm.logger, bm.stgs, "test", nil) if err != nil { panic(err) } diff --git a/pkg/logentry/stages/configs.go b/pkg/logentry/stages/configs.go new file mode 100644 index 0000000000..2be92402b6 --- /dev/null +++ b/pkg/logentry/stages/configs.go @@ -0,0 +1,58 @@ +package stages + +import ( + "github.com/mitchellh/mapstructure" +) + +const ( + MetricTypeCounter = "counter" + MetricTypeGauge = "gauge" + MetricTypeHistogram = "histogram" + + StageTypeJSON = "json" + StageTypeRegex = "regex" +) + +type MetricConfig struct { + MetricType string `mapstructure:"type"` + Description string `mapstructure:"description"` + Source *string `mapstructure:"source"` + Buckets []float64 `mapstructure:"buckets"` +} + +type MetricsConfig map[string]MetricConfig + +// JSONTimestamp configures timestamp extraction +type TimestampConfig struct { + Source *string `mapstructure:"source"` + Format string `mapstructure:"format"` +} + +// JSONLabel configures a labels value extraction +type LabelConfig struct { + Source *string `mapstructure:"source"` +} + +// JSONOutput configures output value extraction +type OutputConfig struct { + Source *string `mapstructure:"source"` +} + +// JSONConfig configures the log entry parser to extract value from json +type StageConfig struct { + Timestamp *TimestampConfig `mapstructure:"timestamp"` + Output *OutputConfig `mapstructure:"output"` + Labels map[string]*LabelConfig `mapstructure:"labels"` + Metrics MetricsConfig `mapstructure:"metrics"` + Match string `mapstructure:"match"` + Expression string `mapstructure:"expression"` +} + +func NewConfig(config interface{}) (*StageConfig, error) { + cfg := &StageConfig{} + err := mapstructure.Decode(config, cfg) + if err != nil { + return nil, err + } + return cfg, nil +} diff --git a/pkg/logentry/stages/extensions.go b/pkg/logentry/stages/extensions.go index 338eb4ab1c..17a49975e0 100644 --- a/pkg/logentry/stages/extensions.go +++ b/pkg/logentry/stages/extensions.go @@ -6,7 +6,7 @@ import ( // NewDocker creates a Docker json log format specific pipeline stage. func NewDocker(logger log.Logger) (Stage, error) { - config := map[string]interface{}{ + cfg := map[string]interface{}{ "timestamp": map[string]interface{}{ "source": "time", "format": "RFC3339", @@ -20,12 +20,12 @@ func NewDocker(logger log.Logger) (Stage, error) { "source": "log", }, } - return NewJSON(logger, config, nil) + return New(logger, StageTypeJSON, cfg, nil) } // NewCRI creates a CRI format specific pipeline stage func NewCRI(logger log.Logger) (Stage, error) { - config := map[string]interface{}{ + cfg := map[string]interface{}{ "expression": "^(?s)(?P