diff --git a/pkg/logentry/metric/counters.go b/pkg/logentry/metric/counters.go index c500f083bb..eb89e39535 100644 --- a/pkg/logentry/metric/counters.go +++ b/pkg/logentry/metric/counters.go @@ -1,17 +1,60 @@ package metric import ( + "github.com/mitchellh/mapstructure" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" ) -// Counters is a vector of counters for a each log stream. +const ( + CounterInc = "inc" + CounterAdd = "add" + + ErrCounterActionRequired = "counter action must be defined as either `inc` or `add`" + ErrCounterInvalidAction = "action %s is not valid, action must be either `inc` or `add`" +) + +type CounterConfig struct { + Value *string `mapstructure:"value"` + Action string `mapstructure:"action"` +} + +func validateCounterConfig(config *CounterConfig) error { + if config.Action == "" { + return errors.New(ErrCounterActionRequired) + } + if config.Action != CounterInc && config.Action != CounterAdd { + return errors.Errorf(ErrCounterInvalidAction, config.Action) + } + return nil +} + +func parseCounterConfig(config interface{}) (*CounterConfig, error) { + cfg := &CounterConfig{} + err := mapstructure.Decode(config, cfg) + if err != nil { + return nil, err + } + return cfg, nil +} + +// Counters is a vec tor of counters for a each log stream. type Counters struct { *metricVec + Cfg *CounterConfig } // NewCounters creates a new counter vec. -func NewCounters(name, help string) *Counters { +func NewCounters(name, help string, config interface{}) (*Counters, error) { + cfg, err := parseCounterConfig(config) + if err != nil { + return nil, err + } + err = validateCounterConfig(cfg) + if err != nil { + return nil, err + } return &Counters{ metricVec: newMetricVec(func(labels map[string]string) prometheus.Metric { return prometheus.NewCounter(prometheus.CounterOpts{ @@ -19,7 +62,9 @@ func NewCounters(name, help string) *Counters { Name: name, ConstLabels: labels, }) - })} + }), + Cfg: cfg, + }, nil } // With returns the counter associated with a stream labelset. diff --git a/pkg/logentry/metric/entries.go b/pkg/logentry/metric/entries.go index 7f4a3f8699..e6777aab17 100644 --- a/pkg/logentry/metric/entries.go +++ b/pkg/logentry/metric/entries.go @@ -3,14 +3,21 @@ package metric import ( "time" - "github.com/grafana/loki/pkg/promtail/api" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" + + "github.com/grafana/loki/pkg/promtail/api" ) // LogCount counts log line for each stream. func LogCount(reg prometheus.Registerer, next api.EntryHandler) api.EntryHandler { - c := NewCounters("log_entries_total", "the total count of log entries") + cfg := CounterConfig{ + Action: CounterInc, + } + c, err := NewCounters("log_entries_total", "the total count of log entries", cfg) + if err != nil { + //TODO what do we want to do with this?? + } reg.MustRegister(c) return api.EntryHandlerFunc(func(labels model.LabelSet, time time.Time, entry string) error { if err := next.Handle(labels, time, entry); err != nil { @@ -23,7 +30,13 @@ func LogCount(reg prometheus.Registerer, next api.EntryHandler) api.EntryHandler // LogSize observes log line size for each stream. func LogSize(reg prometheus.Registerer, next api.EntryHandler) api.EntryHandler { - c := NewHistograms("log_entries_bytes", "the total count of bytes", prometheus.ExponentialBuckets(16, 2, 8)) + cfg := HistogramConfig{ + Buckets: prometheus.ExponentialBuckets(16, 2, 8), + } + c, err := NewHistograms("log_entries_bytes", "the total count of bytes", cfg) + if err != nil { + //TODO what do we want to do with this?? + } reg.MustRegister(c) return api.EntryHandlerFunc(func(labels model.LabelSet, time time.Time, entry string) error { if err := next.Handle(labels, time, entry); err != nil { diff --git a/pkg/logentry/metric/gauges.go b/pkg/logentry/metric/gauges.go index 279d488347..378e1381e4 100644 --- a/pkg/logentry/metric/gauges.go +++ b/pkg/logentry/metric/gauges.go @@ -1,17 +1,67 @@ package metric import ( + "github.com/mitchellh/mapstructure" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" ) +const ( + GaugeSet = "set" + GaugeInc = "inc" + GaugeDec = "dec" + GaugeAdd = "add" + GaugeSub = "sub" + + ErrGaugeActionRequired = "gauge action must be defined as `set`, `inc`, `dec`, `add`, or `sub`" + ErrGaugeInvalidAction = "action %s is not valid, action must be `set`, `inc`, `dec`, `add`, or `sub`" +) + +type GaugeConfig struct { + Value *string `mapstructure:"value"` + Action string `mapstructure:"action"` +} + +func validateGaugeConfig(config *GaugeConfig) error { + if config.Action == "" { + return errors.New(ErrGaugeActionRequired) + } + if config.Action != GaugeSet && + config.Action != GaugeInc && + config.Action != GaugeDec && + config.Action != GaugeAdd && + config.Action != GaugeSub { + return errors.Errorf(ErrGaugeInvalidAction, config.Action) + } + return nil +} + +func parseGaugeConfig(config interface{}) (*GaugeConfig, error) { + cfg := &GaugeConfig{} + err := mapstructure.Decode(config, cfg) + if err != nil { + return nil, err + } + return cfg, nil +} + // Gauges is a vector of gauges for a each log stream. type Gauges struct { *metricVec + Cfg *GaugeConfig } // NewGauges creates a new gauge vec. -func NewGauges(name, help string) *Gauges { +func NewGauges(name, help string, config interface{}) (*Gauges, error) { + cfg, err := parseGaugeConfig(config) + if err != nil { + return nil, err + } + err = validateGaugeConfig(cfg) + if err != nil { + return nil, err + } return &Gauges{ metricVec: newMetricVec(func(labels map[string]string) prometheus.Metric { return prometheus.NewGauge(prometheus.GaugeOpts{ @@ -19,7 +69,9 @@ func NewGauges(name, help string) *Gauges { Name: name, ConstLabels: labels, }) - })} + }), + Cfg: cfg, + }, nil } // With returns the gauge associated with a stream labelset. diff --git a/pkg/logentry/metric/histograms.go b/pkg/logentry/metric/histograms.go index a9db0d6ff8..ebf8a69e0f 100644 --- a/pkg/logentry/metric/histograms.go +++ b/pkg/logentry/metric/histograms.go @@ -1,26 +1,57 @@ package metric import ( + "github.com/mitchellh/mapstructure" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" ) +type HistogramConfig struct { + Value *string `mapstructure:"value"` + Buckets []float64 `mapstructure:"buckets"` +} + +func validateHistogramConfig(config *HistogramConfig) error { + //TODO is there any validation required? + return nil +} + +func parseHistogramConfig(config interface{}) (*HistogramConfig, error) { + cfg := &HistogramConfig{} + err := mapstructure.Decode(config, cfg) + if err != nil { + return nil, err + } + return cfg, nil +} + // Histograms is a vector of histograms for a each log stream. type Histograms struct { *metricVec + Cfg *HistogramConfig } // NewHistograms creates a new histogram vec. -func NewHistograms(name, help string, buckets []float64) *Histograms { +func NewHistograms(name, help string, config interface{}) (*Histograms, error) { + cfg, err := parseHistogramConfig(config) + if err != nil { + return nil, err + } + err = validateHistogramConfig(cfg) + if err != nil { + return nil, err + } return &Histograms{ metricVec: newMetricVec(func(labels map[string]string) prometheus.Metric { return prometheus.NewHistogram(prometheus.HistogramOpts{ Help: help, Name: name, ConstLabels: labels, - Buckets: buckets, + Buckets: cfg.Buckets, }) - })} + }), + Cfg: cfg, + }, nil } // With returns the histogram associated with a stream labelset. diff --git a/pkg/logentry/stages/extensions.go b/pkg/logentry/stages/extensions.go index 10566cfda4..5efca97adf 100644 --- a/pkg/logentry/stages/extensions.go +++ b/pkg/logentry/stages/extensions.go @@ -2,12 +2,13 @@ package stages import ( "github.com/go-kit/kit/log" + "github.com/prometheus/client_golang/prometheus" ) const RFC3339Nano = "RFC3339Nano" // NewDocker creates a Docker json log format specific pipeline stage. -func NewDocker(logger log.Logger, jobName string) (Stage, error) { +func NewDocker(logger log.Logger, jobName string, registerer prometheus.Registerer) (Stage, error) { t := "timestamp" f := RFC3339Nano o := "output" @@ -35,11 +36,11 @@ func NewDocker(logger log.Logger, jobName string) (Stage, error) { }, }} - return NewPipeline(logger, stages, jobName+"_docker") + return NewPipeline(logger, stages, jobName+"_docker", registerer) } // NewCRI creates a CRI format specific pipeline stage -func NewCRI(logger log.Logger, jobName string) (Stage, error) { +func NewCRI(logger log.Logger, jobName string, registerer prometheus.Registerer) (Stage, error) { t := "time" f := RFC3339Nano o := "content" @@ -66,5 +67,5 @@ func NewCRI(logger log.Logger, jobName string) (Stage, error) { }, }, } - return NewPipeline(logger, stages, jobName+"_cri") + return NewPipeline(logger, stages, jobName+"_cri", registerer) } diff --git a/pkg/logentry/stages/extensions_test.go b/pkg/logentry/stages/extensions_test.go index a5530480a5..9f44d0a8f6 100644 --- a/pkg/logentry/stages/extensions_test.go +++ b/pkg/logentry/stages/extensions_test.go @@ -5,6 +5,7 @@ import ( "time" "github.com/cortexproject/cortex/pkg/util" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" ) @@ -64,7 +65,7 @@ func TestNewDocker(t *testing.T) { tt := tt t.Run(tName, func(t *testing.T) { t.Parallel() - p, err := NewDocker(util.Logger, "test") + p, err := NewDocker(util.Logger, "test", prometheus.DefaultRegisterer) if err != nil { t.Fatalf("failed to create Docker parser: %s", err) } @@ -140,7 +141,7 @@ func TestNewCri(t *testing.T) { tt := tt t.Run(tName, func(t *testing.T) { t.Parallel() - p, err := NewCRI(util.Logger, "test") + p, err := NewCRI(util.Logger, "test", prometheus.DefaultRegisterer) if err != nil { t.Fatalf("failed to create CRI parser: %s", err) } diff --git a/pkg/logentry/stages/match.go b/pkg/logentry/stages/match.go index 58427ad162..1fbece443c 100644 --- a/pkg/logentry/stages/match.go +++ b/pkg/logentry/stages/match.go @@ -6,6 +6,7 @@ import ( "github.com/go-kit/kit/log" "github.com/mitchellh/mapstructure" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" @@ -30,9 +31,11 @@ func validateMatcherConfig(cfg *MatcherConfig) ([]*labels.Matcher, error) { if cfg == nil { return nil, errors.New(ErrEmptyMatchStageConfig) } + //FIXME PipelineName does not need to be a pointer if cfg.PipelineName == nil || *cfg.PipelineName == "" { return nil, errors.New(ErrPipelineNameRequired) } + //FIXME selector does not need to be a pointer if cfg.Selector == nil || *cfg.Selector == "" { return nil, errors.New(ErrSelectorRequired) } @@ -46,7 +49,7 @@ func validateMatcherConfig(cfg *MatcherConfig) ([]*labels.Matcher, error) { return matchers, nil } -func newMatcherStage(logger log.Logger, config interface{}) (Stage, error) { +func newMatcherStage(logger log.Logger, config interface{}, registerer prometheus.Registerer) (Stage, error) { cfg := &MatcherConfig{} err := mapstructure.Decode(config, cfg) if err != nil { @@ -57,7 +60,7 @@ func newMatcherStage(logger log.Logger, config interface{}) (Stage, error) { return nil, err } - pl, err := NewPipeline(logger, cfg.Stages, *cfg.PipelineName) + pl, err := NewPipeline(logger, cfg.Stages, *cfg.PipelineName, registerer) if err != nil { return nil, errors.Wrapf(err, "match stage %s failed to create pipeline", *cfg.PipelineName) } diff --git a/pkg/logentry/stages/match_test.go b/pkg/logentry/stages/match_test.go index 1605b4a55f..875cf07bd6 100644 --- a/pkg/logentry/stages/match_test.go +++ b/pkg/logentry/stages/match_test.go @@ -6,6 +6,7 @@ import ( "time" "github.com/cortexproject/cortex/pkg/util" + "github.com/prometheus/client_golang/prometheus" ) var pipelineName = "pl_name" @@ -50,7 +51,7 @@ func TestMatcher(t *testing.T) { }, }, } - s, err := newMatcherStage(util.Logger, matchConfig) + s, err := newMatcherStage(util.Logger, matchConfig, prometheus.DefaultRegisterer) if (err != nil) != tt.wantErr { t.Errorf("withMatcher() error = %v, wantErr %v", err, tt.wantErr) return diff --git a/pkg/logentry/stages/metrics.go b/pkg/logentry/stages/metrics.go index 8d3a7856fc..31be3ee6c0 100644 --- a/pkg/logentry/stages/metrics.go +++ b/pkg/logentry/stages/metrics.go @@ -23,10 +23,10 @@ const ( // MetricConfig is a single metrics configuration. type MetricConfig struct { - MetricType string `mapstructure:"type"` - Description string `mapstructure:"description"` - Source *string `mapstructure:"source"` - Buckets []float64 `mapstructure:"buckets"` + MetricType string `mapstructure:"type"` + Description string `mapstructure:"description"` + Source *string `mapstructure:"source"` + Config interface{} `mapstructure:"config"` } // MetricsConfig is a set of configured metrics. @@ -65,11 +65,20 @@ func newMetric(config interface{}, registry prometheus.Registerer) (*metricStage switch strings.ToLower(cfg.MetricType) { case MetricTypeCounter: - collector = metric.NewCounters(customPrefix+name, cfg.Description) + collector, err = metric.NewCounters(customPrefix+name, cfg.Description, cfg.Config) + if err != nil { + return nil, err + } case MetricTypeGauge: - collector = metric.NewGauges(customPrefix+name, cfg.Description) + collector, err = metric.NewGauges(customPrefix+name, cfg.Description, cfg.Config) + if err != nil { + return nil, err + } case MetricTypeHistogram: - collector = metric.NewHistograms(customPrefix+name, cfg.Description, cfg.Buckets) + collector, err = metric.NewHistograms(customPrefix+name, cfg.Description, cfg.Config) + if err != nil { + return nil, err + } } if collector != nil { registry.MustRegister(collector) @@ -92,39 +101,102 @@ func (m *metricStage) Process(labels model.LabelSet, extracted map[string]interf if v, ok := extracted[*m.cfg[name].Source]; ok { switch vec := collector.(type) { case *metric.Counters: - recordCounter(vec.With(labels), v) + recordCounter(vec, labels, v) case *metric.Gauges: - recordGauge(vec.With(labels), v) + recordGauge(vec, labels, v) case *metric.Histograms: - recordHistogram(vec.With(labels), v) + recordHistogram(vec, labels, v) } } } } -func recordCounter(counter prometheus.Counter, v interface{}) { - f, err := getFloat(v) - if err != nil || f < 0 { - return +func recordCounter(counter *metric.Counters, labels model.LabelSet, v interface{}) { + // If value matching is defined, make sure value matches. + if counter.Cfg.Value != nil { + stringVal, err := getString(v) + if err != nil { + //TODO need logger + return + } + if *counter.Cfg.Value != stringVal { + return + } + } + + switch counter.Cfg.Action { + case metric.CounterInc: + counter.With(labels).Inc() + case metric.CounterAdd: + f, err := getFloat(v) + if err != nil || f < 0 { + //TODO need logger + return + } + counter.With(labels).Add(f) } - counter.Add(f) } -func recordGauge(gauge prometheus.Gauge, v interface{}) { - f, err := getFloat(v) - if err != nil { - return +func recordGauge(gauge *metric.Gauges, labels model.LabelSet, v interface{}) { + // If value matching is defined, make sure value matches. + if gauge.Cfg.Value != nil { + stringVal, err := getString(v) + if err != nil { + //TODO need logger + return + } + if *gauge.Cfg.Value != stringVal { + return + } + } + + switch gauge.Cfg.Action { + case metric.GaugeSet: + f, err := getFloat(v) + if err != nil || f < 0 { + //TODO need logger + return + } + gauge.With(labels).Set(f) + case metric.GaugeInc: + gauge.With(labels).Inc() + case metric.GaugeDec: + gauge.With(labels).Dec() + case metric.GaugeAdd: + f, err := getFloat(v) + if err != nil || f < 0 { + //TODO need logger + return + } + gauge.With(labels).Add(f) + case metric.GaugeSub: + f, err := getFloat(v) + if err != nil || f < 0 { + //TODO need logger + return + } + gauge.With(labels).Sub(f) } - //todo Gauge we be able to add,inc,dec,set - gauge.Add(f) } -func recordHistogram(histogram prometheus.Histogram, v interface{}) { +func recordHistogram(histogram *metric.Histograms, labels model.LabelSet, v interface{}) { + // If value matching is defined, make sure value matches. + if histogram.Cfg.Value != nil { + stringVal, err := getString(v) + if err != nil { + //TODO need logger + return + } + if *histogram.Cfg.Value != stringVal { + return + } + } f, err := getFloat(v) if err != nil { + //TODO need logger return } - histogram.Observe(f) + histogram.With(labels).Observe(f) } func getFloat(unk interface{}) (float64, error) { diff --git a/pkg/logentry/stages/metrics_test.go b/pkg/logentry/stages/metrics_test.go index 455140074a..16fbcabb6a 100644 --- a/pkg/logentry/stages/metrics_test.go +++ b/pkg/logentry/stages/metrics_test.go @@ -9,6 +9,8 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/common/model" + + "github.com/grafana/loki/pkg/logentry/metric" ) var labelFoo = model.LabelSet(map[model.LabelName]model.LabelValue{"foo": "bar", "bar": "foo"}) @@ -31,51 +33,82 @@ func Test_withMetric(t *testing.T) { "expression": "(?P\"GET).*HTTP/1.1\" (?P\\d*) (?P