implementing all the functions for the counter and gauge metric types

metric test is still failing because of missing metrics when the counter is 0
added a lot of TODO's I need to go back and cleanup
pull/640/head
Edward Welch 7 years ago committed by Ed
parent eaecf0ff90
commit 29eaec1a04
  1. 51
      pkg/logentry/metric/counters.go
  2. 19
      pkg/logentry/metric/entries.go
  3. 56
      pkg/logentry/metric/gauges.go
  4. 37
      pkg/logentry/metric/histograms.go
  5. 9
      pkg/logentry/stages/extensions.go
  6. 5
      pkg/logentry/stages/extensions_test.go
  7. 7
      pkg/logentry/stages/match.go
  8. 3
      pkg/logentry/stages/match_test.go
  9. 118
      pkg/logentry/stages/metrics.go
  10. 47
      pkg/logentry/stages/metrics_test.go
  11. 1
      pkg/logentry/stages/output.go
  12. 4
      pkg/logentry/stages/pipeline.go
  13. 61
      pkg/logentry/stages/pipeline_test.go
  14. 6
      pkg/logentry/stages/stage.go
  15. 4
      pkg/logentry/stages/timestamp.go
  16. 2
      pkg/logentry/stages/util.go
  17. 7
      pkg/promtail/targets/filetargetmanager.go

@ -1,17 +1,60 @@
package metric
import (
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
)
// Counters is a vector of counters for a each log stream.
const (
CounterInc = "inc"
CounterAdd = "add"
ErrCounterActionRequired = "counter action must be defined as either `inc` or `add`"
ErrCounterInvalidAction = "action %s is not valid, action must be either `inc` or `add`"
)
type CounterConfig struct {
Value *string `mapstructure:"value"`
Action string `mapstructure:"action"`
}
func validateCounterConfig(config *CounterConfig) error {
if config.Action == "" {
return errors.New(ErrCounterActionRequired)
}
if config.Action != CounterInc && config.Action != CounterAdd {
return errors.Errorf(ErrCounterInvalidAction, config.Action)
}
return nil
}
func parseCounterConfig(config interface{}) (*CounterConfig, error) {
cfg := &CounterConfig{}
err := mapstructure.Decode(config, cfg)
if err != nil {
return nil, err
}
return cfg, nil
}
// Counters is a vec tor of counters for a each log stream.
type Counters struct {
*metricVec
Cfg *CounterConfig
}
// NewCounters creates a new counter vec.
func NewCounters(name, help string) *Counters {
func NewCounters(name, help string, config interface{}) (*Counters, error) {
cfg, err := parseCounterConfig(config)
if err != nil {
return nil, err
}
err = validateCounterConfig(cfg)
if err != nil {
return nil, err
}
return &Counters{
metricVec: newMetricVec(func(labels map[string]string) prometheus.Metric {
return prometheus.NewCounter(prometheus.CounterOpts{
@ -19,7 +62,9 @@ func NewCounters(name, help string) *Counters {
Name: name,
ConstLabels: labels,
})
})}
}),
Cfg: cfg,
}, nil
}
// With returns the counter associated with a stream labelset.

@ -3,14 +3,21 @@ package metric
import (
"time"
"github.com/grafana/loki/pkg/promtail/api"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/grafana/loki/pkg/promtail/api"
)
// LogCount counts log line for each stream.
func LogCount(reg prometheus.Registerer, next api.EntryHandler) api.EntryHandler {
c := NewCounters("log_entries_total", "the total count of log entries")
cfg := CounterConfig{
Action: CounterInc,
}
c, err := NewCounters("log_entries_total", "the total count of log entries", cfg)
if err != nil {
//TODO what do we want to do with this??
}
reg.MustRegister(c)
return api.EntryHandlerFunc(func(labels model.LabelSet, time time.Time, entry string) error {
if err := next.Handle(labels, time, entry); err != nil {
@ -23,7 +30,13 @@ func LogCount(reg prometheus.Registerer, next api.EntryHandler) api.EntryHandler
// LogSize observes log line size for each stream.
func LogSize(reg prometheus.Registerer, next api.EntryHandler) api.EntryHandler {
c := NewHistograms("log_entries_bytes", "the total count of bytes", prometheus.ExponentialBuckets(16, 2, 8))
cfg := HistogramConfig{
Buckets: prometheus.ExponentialBuckets(16, 2, 8),
}
c, err := NewHistograms("log_entries_bytes", "the total count of bytes", cfg)
if err != nil {
//TODO what do we want to do with this??
}
reg.MustRegister(c)
return api.EntryHandlerFunc(func(labels model.LabelSet, time time.Time, entry string) error {
if err := next.Handle(labels, time, entry); err != nil {

@ -1,17 +1,67 @@
package metric
import (
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
)
const (
GaugeSet = "set"
GaugeInc = "inc"
GaugeDec = "dec"
GaugeAdd = "add"
GaugeSub = "sub"
ErrGaugeActionRequired = "gauge action must be defined as `set`, `inc`, `dec`, `add`, or `sub`"
ErrGaugeInvalidAction = "action %s is not valid, action must be `set`, `inc`, `dec`, `add`, or `sub`"
)
type GaugeConfig struct {
Value *string `mapstructure:"value"`
Action string `mapstructure:"action"`
}
func validateGaugeConfig(config *GaugeConfig) error {
if config.Action == "" {
return errors.New(ErrGaugeActionRequired)
}
if config.Action != GaugeSet &&
config.Action != GaugeInc &&
config.Action != GaugeDec &&
config.Action != GaugeAdd &&
config.Action != GaugeSub {
return errors.Errorf(ErrGaugeInvalidAction, config.Action)
}
return nil
}
func parseGaugeConfig(config interface{}) (*GaugeConfig, error) {
cfg := &GaugeConfig{}
err := mapstructure.Decode(config, cfg)
if err != nil {
return nil, err
}
return cfg, nil
}
// Gauges is a vector of gauges for a each log stream.
type Gauges struct {
*metricVec
Cfg *GaugeConfig
}
// NewGauges creates a new gauge vec.
func NewGauges(name, help string) *Gauges {
func NewGauges(name, help string, config interface{}) (*Gauges, error) {
cfg, err := parseGaugeConfig(config)
if err != nil {
return nil, err
}
err = validateGaugeConfig(cfg)
if err != nil {
return nil, err
}
return &Gauges{
metricVec: newMetricVec(func(labels map[string]string) prometheus.Metric {
return prometheus.NewGauge(prometheus.GaugeOpts{
@ -19,7 +69,9 @@ func NewGauges(name, help string) *Gauges {
Name: name,
ConstLabels: labels,
})
})}
}),
Cfg: cfg,
}, nil
}
// With returns the gauge associated with a stream labelset.

@ -1,26 +1,57 @@
package metric
import (
"github.com/mitchellh/mapstructure"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
)
type HistogramConfig struct {
Value *string `mapstructure:"value"`
Buckets []float64 `mapstructure:"buckets"`
}
func validateHistogramConfig(config *HistogramConfig) error {
//TODO is there any validation required?
return nil
}
func parseHistogramConfig(config interface{}) (*HistogramConfig, error) {
cfg := &HistogramConfig{}
err := mapstructure.Decode(config, cfg)
if err != nil {
return nil, err
}
return cfg, nil
}
// Histograms is a vector of histograms for a each log stream.
type Histograms struct {
*metricVec
Cfg *HistogramConfig
}
// NewHistograms creates a new histogram vec.
func NewHistograms(name, help string, buckets []float64) *Histograms {
func NewHistograms(name, help string, config interface{}) (*Histograms, error) {
cfg, err := parseHistogramConfig(config)
if err != nil {
return nil, err
}
err = validateHistogramConfig(cfg)
if err != nil {
return nil, err
}
return &Histograms{
metricVec: newMetricVec(func(labels map[string]string) prometheus.Metric {
return prometheus.NewHistogram(prometheus.HistogramOpts{
Help: help,
Name: name,
ConstLabels: labels,
Buckets: buckets,
Buckets: cfg.Buckets,
})
})}
}),
Cfg: cfg,
}, nil
}
// With returns the histogram associated with a stream labelset.

@ -2,12 +2,13 @@ package stages
import (
"github.com/go-kit/kit/log"
"github.com/prometheus/client_golang/prometheus"
)
const RFC3339Nano = "RFC3339Nano"
// NewDocker creates a Docker json log format specific pipeline stage.
func NewDocker(logger log.Logger, jobName string) (Stage, error) {
func NewDocker(logger log.Logger, jobName string, registerer prometheus.Registerer) (Stage, error) {
t := "timestamp"
f := RFC3339Nano
o := "output"
@ -35,11 +36,11 @@ func NewDocker(logger log.Logger, jobName string) (Stage, error) {
},
}}
return NewPipeline(logger, stages, jobName+"_docker")
return NewPipeline(logger, stages, jobName+"_docker", registerer)
}
// NewCRI creates a CRI format specific pipeline stage
func NewCRI(logger log.Logger, jobName string) (Stage, error) {
func NewCRI(logger log.Logger, jobName string, registerer prometheus.Registerer) (Stage, error) {
t := "time"
f := RFC3339Nano
o := "content"
@ -66,5 +67,5 @@ func NewCRI(logger log.Logger, jobName string) (Stage, error) {
},
},
}
return NewPipeline(logger, stages, jobName+"_cri")
return NewPipeline(logger, stages, jobName+"_cri", registerer)
}

@ -5,6 +5,7 @@ import (
"time"
"github.com/cortexproject/cortex/pkg/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
)
@ -64,7 +65,7 @@ func TestNewDocker(t *testing.T) {
tt := tt
t.Run(tName, func(t *testing.T) {
t.Parallel()
p, err := NewDocker(util.Logger, "test")
p, err := NewDocker(util.Logger, "test", prometheus.DefaultRegisterer)
if err != nil {
t.Fatalf("failed to create Docker parser: %s", err)
}
@ -140,7 +141,7 @@ func TestNewCri(t *testing.T) {
tt := tt
t.Run(tName, func(t *testing.T) {
t.Parallel()
p, err := NewCRI(util.Logger, "test")
p, err := NewCRI(util.Logger, "test", prometheus.DefaultRegisterer)
if err != nil {
t.Fatalf("failed to create CRI parser: %s", err)
}

@ -6,6 +6,7 @@ import (
"github.com/go-kit/kit/log"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
@ -30,9 +31,11 @@ func validateMatcherConfig(cfg *MatcherConfig) ([]*labels.Matcher, error) {
if cfg == nil {
return nil, errors.New(ErrEmptyMatchStageConfig)
}
//FIXME PipelineName does not need to be a pointer
if cfg.PipelineName == nil || *cfg.PipelineName == "" {
return nil, errors.New(ErrPipelineNameRequired)
}
//FIXME selector does not need to be a pointer
if cfg.Selector == nil || *cfg.Selector == "" {
return nil, errors.New(ErrSelectorRequired)
}
@ -46,7 +49,7 @@ func validateMatcherConfig(cfg *MatcherConfig) ([]*labels.Matcher, error) {
return matchers, nil
}
func newMatcherStage(logger log.Logger, config interface{}) (Stage, error) {
func newMatcherStage(logger log.Logger, config interface{}, registerer prometheus.Registerer) (Stage, error) {
cfg := &MatcherConfig{}
err := mapstructure.Decode(config, cfg)
if err != nil {
@ -57,7 +60,7 @@ func newMatcherStage(logger log.Logger, config interface{}) (Stage, error) {
return nil, err
}
pl, err := NewPipeline(logger, cfg.Stages, *cfg.PipelineName)
pl, err := NewPipeline(logger, cfg.Stages, *cfg.PipelineName, registerer)
if err != nil {
return nil, errors.Wrapf(err, "match stage %s failed to create pipeline", *cfg.PipelineName)
}

@ -6,6 +6,7 @@ import (
"time"
"github.com/cortexproject/cortex/pkg/util"
"github.com/prometheus/client_golang/prometheus"
)
var pipelineName = "pl_name"
@ -50,7 +51,7 @@ func TestMatcher(t *testing.T) {
},
},
}
s, err := newMatcherStage(util.Logger, matchConfig)
s, err := newMatcherStage(util.Logger, matchConfig, prometheus.DefaultRegisterer)
if (err != nil) != tt.wantErr {
t.Errorf("withMatcher() error = %v, wantErr %v", err, tt.wantErr)
return

@ -23,10 +23,10 @@ const (
// MetricConfig is a single metrics configuration.
type MetricConfig struct {
MetricType string `mapstructure:"type"`
Description string `mapstructure:"description"`
Source *string `mapstructure:"source"`
Buckets []float64 `mapstructure:"buckets"`
MetricType string `mapstructure:"type"`
Description string `mapstructure:"description"`
Source *string `mapstructure:"source"`
Config interface{} `mapstructure:"config"`
}
// MetricsConfig is a set of configured metrics.
@ -65,11 +65,20 @@ func newMetric(config interface{}, registry prometheus.Registerer) (*metricStage
switch strings.ToLower(cfg.MetricType) {
case MetricTypeCounter:
collector = metric.NewCounters(customPrefix+name, cfg.Description)
collector, err = metric.NewCounters(customPrefix+name, cfg.Description, cfg.Config)
if err != nil {
return nil, err
}
case MetricTypeGauge:
collector = metric.NewGauges(customPrefix+name, cfg.Description)
collector, err = metric.NewGauges(customPrefix+name, cfg.Description, cfg.Config)
if err != nil {
return nil, err
}
case MetricTypeHistogram:
collector = metric.NewHistograms(customPrefix+name, cfg.Description, cfg.Buckets)
collector, err = metric.NewHistograms(customPrefix+name, cfg.Description, cfg.Config)
if err != nil {
return nil, err
}
}
if collector != nil {
registry.MustRegister(collector)
@ -92,39 +101,102 @@ func (m *metricStage) Process(labels model.LabelSet, extracted map[string]interf
if v, ok := extracted[*m.cfg[name].Source]; ok {
switch vec := collector.(type) {
case *metric.Counters:
recordCounter(vec.With(labels), v)
recordCounter(vec, labels, v)
case *metric.Gauges:
recordGauge(vec.With(labels), v)
recordGauge(vec, labels, v)
case *metric.Histograms:
recordHistogram(vec.With(labels), v)
recordHistogram(vec, labels, v)
}
}
}
}
func recordCounter(counter prometheus.Counter, v interface{}) {
f, err := getFloat(v)
if err != nil || f < 0 {
return
func recordCounter(counter *metric.Counters, labels model.LabelSet, v interface{}) {
// If value matching is defined, make sure value matches.
if counter.Cfg.Value != nil {
stringVal, err := getString(v)
if err != nil {
//TODO need logger
return
}
if *counter.Cfg.Value != stringVal {
return
}
}
switch counter.Cfg.Action {
case metric.CounterInc:
counter.With(labels).Inc()
case metric.CounterAdd:
f, err := getFloat(v)
if err != nil || f < 0 {
//TODO need logger
return
}
counter.With(labels).Add(f)
}
counter.Add(f)
}
func recordGauge(gauge prometheus.Gauge, v interface{}) {
f, err := getFloat(v)
if err != nil {
return
func recordGauge(gauge *metric.Gauges, labels model.LabelSet, v interface{}) {
// If value matching is defined, make sure value matches.
if gauge.Cfg.Value != nil {
stringVal, err := getString(v)
if err != nil {
//TODO need logger
return
}
if *gauge.Cfg.Value != stringVal {
return
}
}
switch gauge.Cfg.Action {
case metric.GaugeSet:
f, err := getFloat(v)
if err != nil || f < 0 {
//TODO need logger
return
}
gauge.With(labels).Set(f)
case metric.GaugeInc:
gauge.With(labels).Inc()
case metric.GaugeDec:
gauge.With(labels).Dec()
case metric.GaugeAdd:
f, err := getFloat(v)
if err != nil || f < 0 {
//TODO need logger
return
}
gauge.With(labels).Add(f)
case metric.GaugeSub:
f, err := getFloat(v)
if err != nil || f < 0 {
//TODO need logger
return
}
gauge.With(labels).Sub(f)
}
//todo Gauge we be able to add,inc,dec,set
gauge.Add(f)
}
func recordHistogram(histogram prometheus.Histogram, v interface{}) {
func recordHistogram(histogram *metric.Histograms, labels model.LabelSet, v interface{}) {
// If value matching is defined, make sure value matches.
if histogram.Cfg.Value != nil {
stringVal, err := getString(v)
if err != nil {
//TODO need logger
return
}
if *histogram.Cfg.Value != stringVal {
return
}
}
f, err := getFloat(v)
if err != nil {
//TODO need logger
return
}
histogram.Observe(f)
histogram.With(labels).Observe(f)
}
func getFloat(unk interface{}) (float64, error) {

@ -9,6 +9,8 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/common/model"
"github.com/grafana/loki/pkg/logentry/metric"
)
var labelFoo = model.LabelSet(map[model.LabelName]model.LabelValue{"foo": "bar", "bar": "foo"})
@ -31,51 +33,82 @@ func Test_withMetric(t *testing.T) {
"expression": "(?P<get>\"GET).*HTTP/1.1\" (?P<status>\\d*) (?P<time>\\d*) ",
}
timeSource := "time"
true := "true"
metricsConfig := MetricsConfig{
"total_keys": MetricConfig{
MetricType: "Counter",
Description: "the total keys per doc",
Config: metric.CounterConfig{
Action: metric.CounterAdd,
},
},
"keys_per_line": MetricConfig{
MetricType: "Histogram",
Description: "keys per doc",
Buckets: []float64{1, 3, 5, 10},
Config: metric.HistogramConfig{
Buckets: []float64{1, 3, 5, 10},
},
},
"numeric_float": MetricConfig{
MetricType: "Gauge",
Description: "numeric_float",
Config: metric.GaugeConfig{
Action: metric.GaugeAdd,
},
},
"numeric_integer": MetricConfig{
MetricType: "Gauge",
Description: "numeric.integer",
Config: metric.GaugeConfig{
Action: metric.GaugeAdd,
},
},
"numeric_string": MetricConfig{
MetricType: "Gauge",
Description: "numeric.string",
Config: metric.GaugeConfig{
Action: metric.GaugeAdd,
},
},
"contains_warn": MetricConfig{
MetricType: "Counter",
Description: "contains_warn",
Config: metric.CounterConfig{
Value: &true,
Action: metric.CounterInc,
},
},
"contains_false": MetricConfig{
MetricType: "Counter",
Description: "contains_false",
Config: metric.CounterConfig{
Value: &true,
Action: metric.CounterInc,
},
},
// FIXME Not showing results currently if there are no counts, this doesn't make it into the output
"unconvertible": MetricConfig{
MetricType: "Counter",
Description: "unconvertible",
Config: metric.CounterConfig{
Action: metric.CounterInc,
},
},
// FIXME Not entirely sure how we want to implement this one?
"matches": MetricConfig{
MetricType: "Counter",
Source: &timeSource,
Description: "all matches",
Config: metric.CounterConfig{
Action: metric.CounterInc,
},
},
"response_time_ms": MetricConfig{
MetricType: "Histogram",
Source: &timeSource,
Description: "response time in ms",
Buckets: []float64{1, 2, 3},
Config: metric.HistogramConfig{
Buckets: []float64{1, 2, 3},
},
},
}
@ -98,9 +131,7 @@ func Test_withMetric(t *testing.T) {
jsonStage.Process(labelFoo, extr, &ts, &entry)
regexStage.Process(labelFoo, extr, &ts, &regexLogFixture)
metricStage.Process(labelFoo, extr, &ts, &entry)
//jsonStage.Process(labelFu, extr, &ts, &entry)
//regexStage.Process(labelFu, extr, &ts, &regexLogFixture)
// Process the same extracted values again with different labels so we can verify proper metric/label assignments
metricStage.Process(labelFu, extr, &ts, &entry)
names := metricNames(metricsConfig)
@ -144,8 +175,8 @@ promtail_custom_keys_per_line_sum{baz="fu",fu="baz"} 8.0
promtail_custom_keys_per_line_count{baz="fu",fu="baz"} 1.0
# HELP promtail_custom_matches all matches
# TYPE promtail_custom_matches counter
promtail_custom_matches{bar="foo",foo="bar"} 3.0
promtail_custom_matches{baz="fu",fu="baz"} 3.0
promtail_custom_matches{bar="foo",foo="bar"} 1.0
promtail_custom_matches{baz="fu",fu="baz"} 1.0
# HELP promtail_custom_numeric_float numeric_float
# TYPE promtail_custom_numeric_float gauge
promtail_custom_numeric_float{bar="foo",foo="bar"} 12.34

@ -26,6 +26,7 @@ func validateOutputConfig(cfg *OutputConfig) error {
if cfg == nil {
return errors.New(ErrEmptyOutputStageConfig)
}
//FIXME Source does not need to be a pointer
if cfg.Source == nil || *cfg.Source == "" {
return errors.New(ErrOutputSourceRequired)
}

@ -36,7 +36,7 @@ type Pipeline struct {
}
// NewPipeline creates a new log entry pipeline from a configuration
func NewPipeline(logger log.Logger, stgs PipelineStages, jobName string) (*Pipeline, error) {
func NewPipeline(logger log.Logger, stgs PipelineStages, jobName string, registerer prometheus.Registerer) (*Pipeline, error) {
st := []Stage{}
for _, s := range stgs {
stage, ok := s.(PipelineStage)
@ -52,7 +52,7 @@ func NewPipeline(logger log.Logger, stgs PipelineStages, jobName string) (*Pipel
if !ok {
return nil, errors.New("pipeline stage key must be a string")
}
newStage, err := New(logger, jobName, name, config, prometheus.DefaultRegisterer)
newStage, err := New(logger, jobName, name, config, registerer)
if err != nil {
return nil, errors.Wrapf(err, "invalid %s stage config", name)
}

@ -1,12 +1,15 @@
package stages
import (
"strings"
"testing"
"time"
"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -48,7 +51,7 @@ func loadConfig(yml string) PipelineStages {
func TestNewPipeline(t *testing.T) {
p, err := NewPipeline(util.Logger, loadConfig(testYaml), "test")
p, err := NewPipeline(util.Logger, loadConfig(testYaml), "test", prometheus.DefaultRegisterer)
if err != nil {
panic(err)
}
@ -66,7 +69,7 @@ func TestPipeline_MultiStage(t *testing.T) {
if err != nil {
panic(err)
}
p, err := NewPipeline(util.Logger, config["pipeline_stages"].([]interface{}), "test")
p, err := NewPipeline(util.Logger, config["pipeline_stages"].([]interface{}), "test", prometheus.DefaultRegisterer)
if err != nil {
panic(err)
}
@ -145,7 +148,7 @@ pipeline_stages:
`
func TestPipeline_Output(t *testing.T) {
pl, err := NewPipeline(util.Logger, loadConfig(testOutputYaml), "test")
pl, err := NewPipeline(util.Logger, loadConfig(testOutputYaml), "test", prometheus.DefaultRegisterer)
if err != nil {
t.Fatal(err)
}
@ -179,7 +182,7 @@ pipeline_stages:
`
func TestPipeline_Labels(t *testing.T) {
pl, err := NewPipeline(util.Logger, loadConfig(testLabelsYaml), "test")
pl, err := NewPipeline(util.Logger, loadConfig(testLabelsYaml), "test", prometheus.DefaultRegisterer)
if err != nil {
t.Fatal(err)
}
@ -215,7 +218,7 @@ pipeline_stages:
`
func TestPipeline_Timestamp(t *testing.T) {
pl, err := NewPipeline(util.Logger, loadConfig(testTimestampYaml), "test")
pl, err := NewPipeline(util.Logger, loadConfig(testTimestampYaml), "test", prometheus.DefaultRegisterer)
if err != nil {
t.Fatal(err)
}
@ -227,6 +230,52 @@ func TestPipeline_Timestamp(t *testing.T) {
assert.Equal(t, time.Date(2012, 11, 01, 22, 8, 41, 0, time.FixedZone("", 0)), ts)
}
var testMetricLogLine = `
{
"time":"2012-11-01T22:08:41+00:00",
"app":"loki",
"component": ["parser","type"],
"level" : "WARN"
}
`
var testMetricYaml = `
pipeline_stages:
- json:
expressions:
app: app
- metric:
loki_count:
type: Counter
description: uhhhhhhh
source: app
config:
value: loki
action: inc
`
const expectedMetrics = `# HELP promtail_custom_loki_count uhhhhhhh
# TYPE promtail_custom_loki_count counter
promtail_custom_loki_count 1.0
`
func TestPipeline_Metrics(t *testing.T) {
registry := prometheus.NewRegistry()
pl, err := NewPipeline(util.Logger, loadConfig(testMetricYaml), "test", registry)
if err != nil {
t.Fatal(err)
}
lbls := model.LabelSet{}
ts := time.Now()
entry := testMetricLogLine
extracted := map[string]interface{}{}
pl.Process(lbls, extracted, &ts, &entry)
if err := testutil.GatherAndCompare(registry,
strings.NewReader(expectedMetrics)); err != nil {
t.Fatalf("missmatch metrics: %v", err)
}
}
var (
l = log.NewNopLogger()
//w = log.NewSyncWriter(os.Stdout)
@ -257,7 +306,7 @@ func BenchmarkPipeline(b *testing.B) {
}
for _, bm := range benchmarks {
b.Run(bm.name, func(b *testing.B) {
pl, err := NewPipeline(bm.logger, bm.stgs, "test")
pl, err := NewPipeline(bm.logger, bm.stgs, "test", prometheus.DefaultRegisterer)
if err != nil {
panic(err)
}

@ -42,12 +42,12 @@ func New(logger log.Logger, jobName string, stageType string,
var err error
switch stageType {
case StageTypeDocker:
s, err = NewDocker(logger, jobName)
s, err = NewDocker(logger, jobName, registerer)
if err != nil {
return nil, err
}
case StageTypeCRI:
s, err = NewCRI(logger, jobName)
s, err = NewCRI(logger, jobName, registerer)
if err != nil {
return nil, err
}
@ -82,7 +82,7 @@ func New(logger log.Logger, jobName string, stageType string,
return nil, err
}
case StageTypeMatch:
s, err = newMatcherStage(logger, cfg)
s, err = newMatcherStage(logger, cfg, registerer)
if err != nil {
return nil, err
}

@ -27,9 +27,11 @@ func validateTimestampConfig(cfg *TimestampConfig) (string, error) {
if cfg == nil {
return "", errors.New(ErrEmptyTimestampStageConfig)
}
//FIXME Source does not need to be a pointer
if cfg.Source == nil || *cfg.Source == "" {
return "", errors.New(ErrTimestampSourceRequired)
}
//FIXME Format does not need to be a pointer
if cfg.Format == nil || *cfg.Format == "" {
return "", errors.New(ErrTimestampFormatRequired)
}
@ -37,7 +39,7 @@ func validateTimestampConfig(cfg *TimestampConfig) (string, error) {
}
// newLabel creates a new set of metrics to process for each log entry
// newTimestamp creates a new timestamp extraction pipeline stage.
func newTimestamp(logger log.Logger, config interface{}) (*timestampStage, error) {
cfg := &TimestampConfig{}
err := mapstructure.Decode(config, cfg)

@ -61,6 +61,6 @@ func getString(unk interface{}) (string, error) {
}
return "false", nil
default:
return "", fmt.Errorf("Can't convert %v to float64", unk)
return "", fmt.Errorf("Can't convert %v to string", unk)
}
}

@ -76,7 +76,8 @@ func NewFileTargetManager(
config := map[string]sd_config.ServiceDiscoveryConfig{}
for _, cfg := range scrapeConfigs {
pipeline, err := stages.NewPipeline(log.With(logger, "component", "pipeline"), cfg.PipelineStages, cfg.JobName)
registerer := prometheus.DefaultRegisterer
pipeline, err := stages.NewPipeline(log.With(logger, "component", "pipeline"), cfg.PipelineStages, cfg.JobName, registerer)
if err != nil {
return nil, err
}
@ -86,14 +87,14 @@ func NewFileTargetManager(
switch cfg.EntryParser {
case api.CRI:
level.Warn(logger).Log("msg", "WARNING!!! entry_parser config is deprecated, please change to pipeline_stages")
cri, err := stages.NewCRI(logger, cfg.JobName)
cri, err := stages.NewCRI(logger, cfg.JobName, registerer)
if err != nil {
return nil, err
}
pipeline.AddStage(cri)
case api.Docker:
level.Warn(logger).Log("msg", "WARNING!!! entry_parser config is deprecated, please change to pipeline_stages")
docker, err := stages.NewDocker(logger, cfg.JobName)
docker, err := stages.NewDocker(logger, cfg.JobName, registerer)
if err != nil {
return nil, err
}

Loading…
Cancel
Save