cleaning up remaining TODO's, adding tests

cleaning up GoDoc
pipeline_name is now optional for matcher stage
pull/640/head
Edward Welch 6 years ago committed by Ed
parent 9a91d58d3a
commit 3a2ac64b7c
  1. 7
      pkg/logentry/stages/configs.go
  2. 9
      pkg/logentry/stages/extensions.go
  3. 4
      pkg/logentry/stages/extensions_test.go
  4. 7
      pkg/logentry/stages/json.go
  5. 2
      pkg/logentry/stages/json_test.go
  6. 8
      pkg/logentry/stages/labels.go
  7. 49
      pkg/logentry/stages/labels_test.go
  8. 23
      pkg/logentry/stages/match.go
  9. 31
      pkg/logentry/stages/match_test.go
  10. 14
      pkg/logentry/stages/metrics.go
  11. 8
      pkg/logentry/stages/metrics_test.go
  12. 7
      pkg/logentry/stages/output.go
  13. 35
      pkg/logentry/stages/output_test.go
  14. 46
      pkg/logentry/stages/pipeline.go
  15. 7
      pkg/logentry/stages/pipeline_test.go
  16. 6
      pkg/logentry/stages/regex.go
  17. 4
      pkg/logentry/stages/regex_test.go
  18. 16
      pkg/logentry/stages/stage.go
  19. 7
      pkg/logentry/stages/timestamp.go
  20. 41
      pkg/logentry/stages/timestamp_test.go
  21. 1
      pkg/logentry/stages/util.go
  22. 6
      pkg/promtail/targets/filetargetmanager.go

@ -1,7 +0,0 @@
package stages
const (
MetricTypeCounter = "counter"
MetricTypeGauge = "gauge"
MetricTypeHistogram = "histogram"
)

@ -8,7 +8,7 @@ import (
const RFC3339Nano = "RFC3339Nano"
// NewDocker creates a Docker json log format specific pipeline stage.
func NewDocker(logger log.Logger, jobName string, registerer prometheus.Registerer) (Stage, error) {
func NewDocker(logger log.Logger, registerer prometheus.Registerer) (Stage, error) {
stages := PipelineStages{
PipelineStage{
StageTypeJSON: JSONConfig{
@ -32,12 +32,11 @@ func NewDocker(logger log.Logger, jobName string, registerer prometheus.Register
"output",
},
}}
return NewPipeline(logger, stages, jobName+"_docker", registerer)
return NewPipeline(logger, stages, nil, registerer)
}
// NewCRI creates a CRI format specific pipeline stage
func NewCRI(logger log.Logger, jobName string, registerer prometheus.Registerer) (Stage, error) {
func NewCRI(logger log.Logger, registerer prometheus.Registerer) (Stage, error) {
stages := PipelineStages{
PipelineStage{
StageTypeRegex: RegexConfig{
@ -61,5 +60,5 @@ func NewCRI(logger log.Logger, jobName string, registerer prometheus.Registerer)
},
},
}
return NewPipeline(logger, stages, jobName+"_cri", registerer)
return NewPipeline(logger, stages, nil, registerer)
}

@ -65,7 +65,7 @@ func TestNewDocker(t *testing.T) {
tt := tt
t.Run(tName, func(t *testing.T) {
t.Parallel()
p, err := NewDocker(util.Logger, "test", prometheus.DefaultRegisterer)
p, err := NewDocker(util.Logger, prometheus.DefaultRegisterer)
if err != nil {
t.Fatalf("failed to create Docker parser: %s", err)
}
@ -141,7 +141,7 @@ func TestNewCri(t *testing.T) {
tt := tt
t.Run(tName, func(t *testing.T) {
t.Parallel()
p, err := NewCRI(util.Logger, "test", prometheus.DefaultRegisterer)
p, err := NewCRI(util.Logger, prometheus.DefaultRegisterer)
if err != nil {
t.Fatalf("failed to create CRI parser: %s", err)
}

@ -21,6 +21,7 @@ const (
ErrEmptyJSONStageConfig = "empty json stage configuration"
)
// JSONConfig represents a JSON Stage configuration
type JSONConfig struct {
Expressions map[string]string `mapstructure:"expressions"`
}
@ -52,14 +53,14 @@ func validateJSONConfig(c *JSONConfig) (map[string]*jmespath.JMESPath, error) {
return expressions, nil
}
// jsonStage extracts log data via json parsing.
// jsonStage sets extracted data using JMESPath expressions
type jsonStage struct {
cfg *JSONConfig
expressions map[string]*jmespath.JMESPath
logger log.Logger
}
// newJSONStage creates a new json mutator from a config.
// newJSONStage creates a new json pipeline stage from a config.
func newJSONStage(logger log.Logger, config interface{}) (*jsonStage, error) {
cfg, err := parseJSONConfig(config)
if err != nil {
@ -85,7 +86,7 @@ func parseJSONConfig(config interface{}) (*JSONConfig, error) {
return cfg, nil
}
// Process implements Mutator
// Process implements Stage
func (j *jsonStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) {
if entry == nil {
level.Debug(j.logger).Log("msg", "cannot parse a nil entry")

@ -174,7 +174,7 @@ func TestJSONParser_Parse(t *testing.T) {
tt := tt
t.Run(tName, func(t *testing.T) {
t.Parallel()
p, err := New(util.Logger, "test", StageTypeJSON, tt.config, nil)
p, err := New(util.Logger, nil, StageTypeJSON, tt.config, nil)
if err != nil {
t.Fatalf("failed to create json parser: %s", err)
}

@ -20,6 +20,7 @@ const (
// LabelsConfig is a set of labels to be extracted
type LabelsConfig map[string]*string
// validateLabelsConfig validates the Label stage configuration
func validateLabelsConfig(c LabelsConfig) error {
if c == nil {
return errors.New(ErrEmptyLabelStageConfig)
@ -37,8 +38,8 @@ func validateLabelsConfig(c LabelsConfig) error {
return nil
}
// newLabel creates a new label stage to set labels from extracted data
func newLabel(logger log.Logger, configs interface{}) (*labelStage, error) {
// newLabelStage creates a new label stage to set labels from extracted data
func newLabelStage(logger log.Logger, configs interface{}) (*labelStage, error) {
cfgs := &LabelsConfig{}
err := mapstructure.Decode(configs, cfgs)
if err != nil {
@ -54,11 +55,13 @@ func newLabel(logger log.Logger, configs interface{}) (*labelStage, error) {
}, nil
}
// labelStage sets labels from extracted data
type labelStage struct {
cfgs LabelsConfig
logger log.Logger
}
// Process implements Stage
func (l *labelStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) {
for lName, lSrc := range l.cfgs {
if _, ok := extracted[*lSrc]; ok {
@ -66,6 +69,7 @@ func (l *labelStage) Process(labels model.LabelSet, extracted map[string]interfa
s, err := getString(lValue)
if err != nil {
level.Debug(l.logger).Log("msg", "failed to convert extracted label value to string", "err", err, "type", reflect.TypeOf(lValue).String())
continue
}
labelValue := model.LabelValue(s)
if !labelValue.IsValid() {

@ -33,7 +33,7 @@ var testLabelsLogLine = `
`
func TestLabelsPipeline_Labels(t *testing.T) {
pl, err := NewPipeline(util.Logger, loadConfig(testLabelsYaml), "test", prometheus.DefaultRegisterer)
pl, err := NewPipeline(util.Logger, loadConfig(testLabelsYaml), nil, prometheus.DefaultRegisterer)
if err != nil {
t.Fatal(err)
}
@ -108,4 +108,49 @@ func TestLabels(t *testing.T) {
}
}
//TODO test label processing
func TestLabelStage_Process(t *testing.T) {
sourceName := "diff_source"
tests := map[string]struct {
config LabelsConfig
extractedData map[string]interface{}
inputLabels model.LabelSet
expectedLabels model.LabelSet
}{
"extract_success": {
LabelsConfig{
"testLabel": nil,
},
map[string]interface{}{
"testLabel": "testValue",
},
model.LabelSet{},
model.LabelSet{
"testLabel": "testValue",
},
},
"different_source_name": {
LabelsConfig{
"testLabel": &sourceName,
},
map[string]interface{}{
sourceName: "testValue",
},
model.LabelSet{},
model.LabelSet{
"testLabel": "testValue",
},
},
}
for name, test := range tests {
test := test
t.Run(name, func(t *testing.T) {
t.Parallel()
st, err := newLabelStage(util.Logger, test.config)
if err != nil {
t.Fatal(err)
}
st.Process(test.inputLabels, test.extractedData, nil, nil)
assert.Equal(t, test.expectedLabels, test.expectedLabels)
})
}
}

@ -15,23 +15,25 @@ import (
const (
ErrEmptyMatchStageConfig = "match stage config cannot be empty"
ErrPipelineNameRequired = "match stage must specify a pipeline name which is used in the exported metrics"
ErrPipelineNameRequired = "match stage pipeline name can be omitted but cannot be an empty string"
ErrSelectorRequired = "selector statement required for match stage"
ErrMatchRequiresStages = "match stage requires at least one additional stage to be defined in '- stages'"
ErrSelectorSyntax = "invalid selector syntax for match stage"
)
// MatcherConfig contains the configuration for a matcherStage
type MatcherConfig struct {
PipelineName string `mapstructure:"pipeline_name"`
PipelineName *string `mapstructure:"pipeline_name"`
Selector string `mapstructure:"selector"`
Stages PipelineStages `mapstructure:"stages"`
}
// validateMatcherConfig validates the MatcherConfig for the matcherStage
func validateMatcherConfig(cfg *MatcherConfig) ([]*labels.Matcher, error) {
if cfg == nil {
return nil, errors.New(ErrEmptyMatchStageConfig)
}
if cfg.PipelineName == "" {
if cfg.PipelineName != nil && *cfg.PipelineName == "" {
return nil, errors.New(ErrPipelineNameRequired)
}
if cfg.Selector == "" {
@ -47,7 +49,8 @@ func validateMatcherConfig(cfg *MatcherConfig) ([]*labels.Matcher, error) {
return matchers, nil
}
func newMatcherStage(logger log.Logger, config interface{}, registerer prometheus.Registerer) (Stage, error) {
// newMatcherStage creates a new matcherStage from config
func newMatcherStage(logger log.Logger, jobName *string, config interface{}, registerer prometheus.Registerer) (Stage, error) {
cfg := &MatcherConfig{}
err := mapstructure.Decode(config, cfg)
if err != nil {
@ -58,9 +61,15 @@ func newMatcherStage(logger log.Logger, config interface{}, registerer prometheu
return nil, err
}
pl, err := NewPipeline(logger, cfg.Stages, cfg.PipelineName, registerer)
var nPtr *string
if cfg.PipelineName != nil && jobName != nil {
name := *jobName + "_" + *cfg.PipelineName
nPtr = &name
}
pl, err := NewPipeline(logger, cfg.Stages, nPtr, registerer)
if err != nil {
return nil, errors.Wrapf(err, "match stage %s failed to create pipeline", cfg.PipelineName)
return nil, errors.Wrapf(err, "match stage failed to create pipeline from config: %v", config)
}
return &matcherStage{
@ -71,6 +80,7 @@ func newMatcherStage(logger log.Logger, config interface{}, registerer prometheu
}, nil
}
// matcherStage applies Label matchers to determine if the include stages should be run
type matcherStage struct {
cfgs *MatcherConfig
matchers []*labels.Matcher
@ -78,6 +88,7 @@ type matcherStage struct {
pipeline Stage
}
// Process implements Stage
func (m *matcherStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) {
for _, filter := range m.matchers {
if !filter.Matches(string(labels[model.LabelName(filter.Name)])) {

@ -1,12 +1,14 @@
package stages
import (
"bytes"
"fmt"
"testing"
"time"
"github.com/cortexproject/cortex/pkg/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/expfmt"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
)
@ -19,7 +21,6 @@ pipeline_stages:
- labels:
app:
- match:
pipeline_name: "app1"
selector: "{app=\"loki\"}"
stages:
- json:
@ -57,20 +58,42 @@ var testMatchLogLineApp2 = `
`
func TestMatchPipeline(t *testing.T) {
pl, err := NewPipeline(util.Logger, loadConfig(testMatchYaml), "test", prometheus.DefaultRegisterer)
registry := prometheus.NewRegistry()
plName := "test_pipeline"
pl, err := NewPipeline(util.Logger, loadConfig(testMatchYaml), &plName, registry)
if err != nil {
t.Fatal(err)
}
lbls := model.LabelSet{}
ts := time.Now()
// Process the first log line which should extract the output from the `message` field
entry := testMatchLogLineApp1
extracted := map[string]interface{}{}
pl.Process(lbls, extracted, &ts, &entry)
assert.Equal(t, "app1 log line", entry)
// Process the second log line which should extract the output from the `msg` field
entry = testMatchLogLineApp2
extracted = map[string]interface{}{}
pl.Process(lbls, extracted, &ts, &entry)
assert.Equal(t, "app2 log line", entry)
got, err := registry.Gather()
if err != nil {
t.Fatalf("gathering metrics failed: %s", err)
}
var gotBuf bytes.Buffer
enc := expfmt.NewEncoder(&gotBuf, expfmt.FmtText)
for _, mf := range got {
if err := enc.Encode(mf); err != nil {
t.Fatalf("encoding gathered metrics failed: %s", err)
}
}
gotStr := gotBuf.String()
// We should only get metrics from the main pipeline and the second match which defines the pipeline_name
assert.Contains(t, gotStr, "logentry_pipeline_duration_seconds_bucket{job_name=\"test_pipeline\"")
assert.Contains(t, gotStr, "logentry_pipeline_duration_seconds_bucket{job_name=\"test_pipeline_app2\"")
assert.NotContains(t, gotStr, "logentry_pipeline_duration_seconds_bucket{job_name=\"test_pipeline_app1\"")
}
func TestMatcher(t *testing.T) {
@ -103,7 +126,7 @@ func TestMatcher(t *testing.T) {
// Build a match config which has a simple label stage that when matched will add the test_label to
// the labels in the pipeline.
matchConfig := MatcherConfig{
"pl_name",
nil,
tt.matcher,
PipelineStages{
PipelineStage{
@ -113,7 +136,7 @@ func TestMatcher(t *testing.T) {
},
},
}
s, err := newMatcherStage(util.Logger, matchConfig, prometheus.DefaultRegisterer)
s, err := newMatcherStage(util.Logger, nil, matchConfig, prometheus.DefaultRegisterer)
if (err != nil) != tt.wantErr {
t.Errorf("withMatcher() error = %v, wantErr %v", err, tt.wantErr)
return

@ -21,6 +21,10 @@ import (
const customPrefix = "promtail_custom_"
const (
MetricTypeCounter = "counter"
MetricTypeGauge = "gauge"
MetricTypeHistogram = "histogram"
ErrEmptyMetricsStageConfig = "empty metric stage configuration"
)
@ -51,8 +55,8 @@ func validateMetricsConfig(cfg MetricsConfig) error {
return nil
}
// newMetric creates a new set of metrics to process for each log entry
func newMetric(logger log.Logger, config interface{}, registry prometheus.Registerer) (*metricStage, error) {
// newMetricStage creates a new set of metrics to process for each log entry
func newMetricStage(logger log.Logger, config interface{}, registry prometheus.Registerer) (*metricStage, error) {
cfgs := &MetricsConfig{}
err := mapstructure.Decode(config, cfgs)
if err != nil {
@ -95,12 +99,14 @@ func newMetric(logger log.Logger, config interface{}, registry prometheus.Regist
}, nil
}
// metricStage creates and updates prometheus metrics based on extracted pipeline data
type metricStage struct {
logger log.Logger
cfg MetricsConfig
metrics map[string]prometheus.Collector
}
// Process implements Stage
func (m *metricStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) {
for name, collector := range m.metrics {
if v, ok := extracted[*m.cfg[name].Source]; ok {
@ -116,6 +122,7 @@ func (m *metricStage) Process(labels model.LabelSet, extracted map[string]interf
}
}
// recordCounter will update a counter metric
func (m *metricStage) recordCounter(name string, counter *metric.Counters, labels model.LabelSet, v interface{}) {
// If value matching is defined, make sure value matches.
if counter.Cfg.Value != nil {
@ -144,6 +151,7 @@ func (m *metricStage) recordCounter(name string, counter *metric.Counters, label
}
}
// recordGauge will update a gauge metric
func (m *metricStage) recordGauge(name string, gauge *metric.Gauges, labels model.LabelSet, v interface{}) {
// If value matching is defined, make sure value matches.
if gauge.Cfg.Value != nil {
@ -188,6 +196,7 @@ func (m *metricStage) recordGauge(name string, gauge *metric.Gauges, labels mode
}
}
// recordHistogram will update a Histogram metric
func (m *metricStage) recordHistogram(name string, histogram *metric.Histograms, labels model.LabelSet, v interface{}) {
// If value matching is defined, make sure value matches.
if histogram.Cfg.Value != nil {
@ -210,6 +219,7 @@ func (m *metricStage) recordHistogram(name string, histogram *metric.Histograms,
histogram.With(labels).Observe(f)
}
// getFloat will take the provided value and return a float64 if possible
func getFloat(unk interface{}) (float64, error) {
switch i := unk.(type) {

@ -44,7 +44,7 @@ promtail_custom_loki_count 1.0
func TestMetricsPipeline(t *testing.T) {
registry := prometheus.NewRegistry()
pl, err := NewPipeline(util.Logger, loadConfig(testMetricYaml), "test", registry)
pl, err := NewPipeline(util.Logger, loadConfig(testMetricYaml), nil, registry)
if err != nil {
t.Fatal(err)
}
@ -150,15 +150,15 @@ func TestMetricStage_Process(t *testing.T) {
}
registry := prometheus.NewRegistry()
jsonStage, err := New(util.Logger, "test", StageTypeJSON, jsonConfig, registry)
jsonStage, err := New(util.Logger, nil, StageTypeJSON, jsonConfig, registry)
if err != nil {
t.Fatalf("failed to create stage with metrics: %v", err)
}
regexStage, err := New(util.Logger, "test", StageTypeRegex, regexConfig, registry)
regexStage, err := New(util.Logger, nil, StageTypeRegex, regexConfig, registry)
if err != nil {
t.Fatalf("failed to create stage with metrics: %v", err)
}
metricStage, err := New(util.Logger, "test", StageTypeMetric, metricsConfig, registry)
metricStage, err := New(util.Logger, nil, StageTypeMetric, metricsConfig, registry)
if err != nil {
t.Fatalf("failed to create stage with metrics: %v", err)
}

@ -22,6 +22,7 @@ type OutputConfig struct {
Source string `mapstructure:"source"`
}
// validateOutput validates the outputStage config
func validateOutputConfig(cfg *OutputConfig) error {
if cfg == nil {
return errors.New(ErrEmptyOutputStageConfig)
@ -32,8 +33,8 @@ func validateOutputConfig(cfg *OutputConfig) error {
return nil
}
// newLabel creates a new set of metrics to process for each log entry
func newOutput(logger log.Logger, config interface{}) (*outputStage, error) {
// newOutputStage creates a new outputStage
func newOutputStage(logger log.Logger, config interface{}) (*outputStage, error) {
cfg := &OutputConfig{}
err := mapstructure.Decode(config, cfg)
if err != nil {
@ -49,11 +50,13 @@ func newOutput(logger log.Logger, config interface{}) (*outputStage, error) {
}, nil
}
// outputStage will mutate the incoming entry and set it from extracted data
type outputStage struct {
cfgs *OutputConfig
logger log.Logger
}
// Process implements Stage
func (o *outputStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) {
if o.cfgs == nil {
return

@ -32,7 +32,7 @@ var testOutputLogLine = `
`
func TestPipeline_Output(t *testing.T) {
pl, err := NewPipeline(util.Logger, loadConfig(testOutputYaml), "test", prometheus.DefaultRegisterer)
pl, err := NewPipeline(util.Logger, loadConfig(testOutputYaml), nil, prometheus.DefaultRegisterer)
if err != nil {
t.Fatal(err)
}
@ -77,4 +77,35 @@ func TestOutputValidation(t *testing.T) {
}
}
//TODO test label processing
func TestOutputStage_Process(t *testing.T) {
tests := map[string]struct {
config OutputConfig
extracted map[string]interface{}
expectedOutput string
}{
"sets output": {
OutputConfig{
Source: "out",
},
map[string]interface{}{
"something": "notimportant",
"out": "outmessage",
},
"outmessage",
},
}
for name, test := range tests {
test := test
t.Run(name, func(t *testing.T) {
t.Parallel()
st, err := newOutputStage(util.Logger, test.config)
if err != nil {
t.Fatal(err)
}
lbls := model.LabelSet{}
entry := "replaceme"
st.Process(lbls, test.extracted, nil, &entry)
assert.Equal(t, test.expectedOutput, entry)
})
}
}

@ -7,21 +7,11 @@ import (
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/grafana/loki/pkg/promtail/api"
)
var (
pipelineDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "logentry",
Name: "pipeline_duration_seconds",
Help: "Label and metric extraction pipeline processing time, in seconds",
Buckets: []float64{.000005, .000010, .000025, .000050, .000100, .000250, .000500, .001000, .002500, .005000, .010000, .025000},
}, []string{"job_name"})
)
// PipelineStages contains configuration for each stage within a pipeline
type PipelineStages = []interface{}
@ -30,13 +20,30 @@ type PipelineStage = map[interface{}]interface{}
// Pipeline pass down a log entry to each stage for mutation and/or label extraction.
type Pipeline struct {
logger log.Logger
stages []Stage
jobName string
logger log.Logger
stages []Stage
jobName *string
plDuration *prometheus.HistogramVec
}
// NewPipeline creates a new log entry pipeline from a configuration
func NewPipeline(logger log.Logger, stgs PipelineStages, jobName string, registerer prometheus.Registerer) (*Pipeline, error) {
func NewPipeline(logger log.Logger, stgs PipelineStages, jobName *string, registerer prometheus.Registerer) (*Pipeline, error) {
hist := prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "logentry",
Name: "pipeline_duration_seconds",
Help: "Label and metric extraction pipeline processing time, in seconds",
Buckets: []float64{.000005, .000010, .000025, .000050, .000100, .000250, .000500, .001000, .002500, .005000, .010000, .025000},
}, []string{"job_name"})
err := registerer.Register(hist)
if err != nil {
if existing, ok := err.(prometheus.AlreadyRegisteredError); ok {
hist = existing.ExistingCollector.(*prometheus.HistogramVec)
} else {
// Same behavior as MustRegister if the error is not for AlreadyRegistered
panic(err)
}
}
st := []Stage{}
for _, s := range stgs {
stage, ok := s.(PipelineStage)
@ -60,9 +67,10 @@ func NewPipeline(logger log.Logger, stgs PipelineStages, jobName string, registe
}
}
return &Pipeline{
logger: log.With(logger, "component", "pipeline"),
stages: st,
jobName: jobName,
logger: log.With(logger, "component", "pipeline"),
stages: st,
jobName: jobName,
plDuration: hist,
}, nil
}
@ -75,7 +83,9 @@ func (p *Pipeline) Process(labels model.LabelSet, extracted map[string]interface
}
dur := time.Since(start).Seconds()
level.Debug(p.logger).Log("msg", "finished processing log line", "labels", labels, "time", ts, "entry", entry, "duration_s", dur)
pipelineDuration.WithLabelValues(p.jobName).Observe(dur)
if p.jobName != nil {
p.plDuration.WithLabelValues(*p.jobName).Observe(dur)
}
}
// Wrap implements EntryMiddleware

@ -24,7 +24,6 @@ var (
var testYaml = `
pipeline_stages:
- match:
pipeline_name: "test_pipeline"
selector: "{match=\"true\"}"
stages:
- docker:
@ -49,7 +48,7 @@ func loadConfig(yml string) PipelineStages {
func TestNewPipeline(t *testing.T) {
p, err := NewPipeline(util.Logger, loadConfig(testYaml), "test", prometheus.DefaultRegisterer)
p, err := NewPipeline(util.Logger, loadConfig(testYaml), nil, prometheus.DefaultRegisterer)
if err != nil {
panic(err)
}
@ -67,7 +66,7 @@ func TestPipeline_MultiStage(t *testing.T) {
if err != nil {
panic(err)
}
p, err := NewPipeline(util.Logger, config["pipeline_stages"].([]interface{}), "test", prometheus.DefaultRegisterer)
p, err := NewPipeline(util.Logger, config["pipeline_stages"].([]interface{}), nil, prometheus.DefaultRegisterer)
if err != nil {
panic(err)
}
@ -155,7 +154,7 @@ func BenchmarkPipeline(b *testing.B) {
}
for _, bm := range benchmarks {
b.Run(bm.name, func(b *testing.B) {
pl, err := NewPipeline(bm.logger, bm.stgs, "test", prometheus.DefaultRegisterer)
pl, err := NewPipeline(bm.logger, bm.stgs, nil, prometheus.DefaultRegisterer)
if err != nil {
panic(err)
}

@ -18,6 +18,7 @@ const (
ErrEmptyRegexStageConfig = "empty regex stage configuration"
)
// RegexConfig contains a regexStage configuration
type RegexConfig struct {
Expression string `mapstructure:"expression"`
}
@ -40,14 +41,14 @@ func validateRegexConfig(c *RegexConfig) (*regexp.Regexp, error) {
return expr, nil
}
// regexStage mutates log entries using regex
// regexStage sets extracted data using regular expressions
type regexStage struct {
cfg *RegexConfig
expression *regexp.Regexp
logger log.Logger
}
// newRegexStage creates a new regular expression Mutator.
// newRegexStage creates a newRegexStage
func newRegexStage(logger log.Logger, config interface{}) (Stage, error) {
cfg, err := parseRegexConfig(config)
if err != nil {
@ -64,6 +65,7 @@ func newRegexStage(logger log.Logger, config interface{}) (Stage, error) {
}, nil
}
// parseRegexConfig processes an incoming configuration into a RegexConfig
func parseRegexConfig(config interface{}) (*RegexConfig, error) {
cfg := &RegexConfig{}
err := mapstructure.Decode(config, cfg)

@ -129,7 +129,7 @@ func TestRegexParser_Parse(t *testing.T) {
tt := tt
t.Run(tName, func(t *testing.T) {
t.Parallel()
p, err := New(util.Logger, "test", StageTypeRegex, tt.config, nil)
p, err := New(util.Logger, nil, StageTypeRegex, tt.config, nil)
if err != nil {
t.Fatalf("failed to create regex parser: %s", err)
}
@ -171,7 +171,7 @@ func BenchmarkRegexStage(b *testing.B) {
}
for _, bm := range benchmarks {
b.Run(bm.name, func(b *testing.B) {
stage, err := New(util.Logger, "test", StageTypeRegex, bm.config, nil)
stage, err := New(util.Logger, nil, StageTypeRegex, bm.config, nil)
if err != nil {
panic(err)
}

@ -36,18 +36,18 @@ func (s StageFunc) Process(labels model.LabelSet, extracted map[string]interface
}
// New creates a new stage for the given type and configuration.
func New(logger log.Logger, jobName string, stageType string,
func New(logger log.Logger, jobName *string, stageType string,
cfg interface{}, registerer prometheus.Registerer) (Stage, error) {
var s Stage
var err error
switch stageType {
case StageTypeDocker:
s, err = NewDocker(logger, jobName, registerer)
s, err = NewDocker(logger, registerer)
if err != nil {
return nil, err
}
case StageTypeCRI:
s, err = NewCRI(logger, jobName, registerer)
s, err = NewCRI(logger, registerer)
if err != nil {
return nil, err
}
@ -62,27 +62,27 @@ func New(logger log.Logger, jobName string, stageType string,
return nil, err
}
case StageTypeMetric:
s, err = newMetric(logger, cfg, registerer)
s, err = newMetricStage(logger, cfg, registerer)
if err != nil {
return nil, err
}
case StageTypeLabel:
s, err = newLabel(logger, cfg)
s, err = newLabelStage(logger, cfg)
if err != nil {
return nil, err
}
case StageTypeTimestamp:
s, err = newTimestamp(logger, cfg)
s, err = newTimestampStage(logger, cfg)
if err != nil {
return nil, err
}
case StageTypeOutput:
s, err = newOutput(logger, cfg)
s, err = newOutputStage(logger, cfg)
if err != nil {
return nil, err
}
case StageTypeMatch:
s, err = newMatcherStage(logger, cfg, registerer)
s, err = newMatcherStage(logger, jobName, cfg, registerer)
if err != nil {
return nil, err
}

@ -23,6 +23,7 @@ type TimestampConfig struct {
Format string `mapstructure:"format"`
}
// validateTimestampConfig validates a timestampStage configuration
func validateTimestampConfig(cfg *TimestampConfig) (string, error) {
if cfg == nil {
return "", errors.New(ErrEmptyTimestampStageConfig)
@ -37,8 +38,8 @@ func validateTimestampConfig(cfg *TimestampConfig) (string, error) {
}
// newTimestamp creates a new timestamp extraction pipeline stage.
func newTimestamp(logger log.Logger, config interface{}) (*timestampStage, error) {
// newTimestampStage creates a new timestamp extraction pipeline stage.
func newTimestampStage(logger log.Logger, config interface{}) (*timestampStage, error) {
cfg := &TimestampConfig{}
err := mapstructure.Decode(config, cfg)
if err != nil {
@ -55,12 +56,14 @@ func newTimestamp(logger log.Logger, config interface{}) (*timestampStage, error
}, nil
}
// timestampStage will set the timestamp using extracted data
type timestampStage struct {
cfgs *TimestampConfig
logger log.Logger
format string
}
// Process implements Stage
func (ts *timestampStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) {
if ts.cfgs == nil {
return

@ -23,7 +23,7 @@ pipeline_stages:
var testTimestampLogLine = `
{
"time":"2012-11-01T22:08:41+00:00",
"time":"2012-11-01T22:08:41-04:00",
"app":"loki",
"component": ["parser","type"],
"level" : "WARN"
@ -31,7 +31,7 @@ var testTimestampLogLine = `
`
func TestTimestampPipeline(t *testing.T) {
pl, err := NewPipeline(util.Logger, loadConfig(testTimestampYaml), "test", prometheus.DefaultRegisterer)
pl, err := NewPipeline(util.Logger, loadConfig(testTimestampYaml), nil, prometheus.DefaultRegisterer)
if err != nil {
t.Fatal(err)
}
@ -40,7 +40,7 @@ func TestTimestampPipeline(t *testing.T) {
entry := testTimestampLogLine
extracted := map[string]interface{}{}
pl.Process(lbls, extracted, &ts, &entry)
assert.Equal(t, time.Date(2012, 11, 01, 22, 8, 41, 0, time.FixedZone("", 0)), ts)
assert.Equal(t, time.Date(2012, 11, 01, 22, 8, 41, 0, time.FixedZone("", -4*60*60)), ts)
}
func TestTimestampValidation(t *testing.T) {
@ -100,4 +100,37 @@ func TestTimestampValidation(t *testing.T) {
}
}
//TODO process tests
func TestTimestampStage_Process(t *testing.T) {
tests := map[string]struct {
config TimestampConfig
extracted map[string]interface{}
expected time.Time
}{
"set success": {
TimestampConfig{
Source: "ts",
Format: time.RFC3339,
},
map[string]interface{}{
"somethigelse": "notimportant",
"ts": "2106-01-02T23:04:05-04:00",
},
time.Date(2106, 01, 02, 23, 04, 05, 0, time.FixedZone("", -4*60*60)),
},
}
for name, test := range tests {
test := test
t.Run(name, func(t *testing.T) {
t.Parallel()
st, err := newTimestampStage(util.Logger, test.config)
if err != nil {
t.Fatal(err)
}
ts := time.Now()
lbls := model.LabelSet{}
st.Process(lbls, test.extracted, &ts, nil)
assert.Equal(t, test.expected, ts)
})
}
}

@ -34,6 +34,7 @@ func convertDateLayout(predef string) string {
}
}
// getString will convert the input variable to a string if possible
func getString(unk interface{}) (string, error) {
switch i := unk.(type) {

@ -77,7 +77,7 @@ func NewFileTargetManager(
config := map[string]sd_config.ServiceDiscoveryConfig{}
for _, cfg := range scrapeConfigs {
registerer := prometheus.DefaultRegisterer
pipeline, err := stages.NewPipeline(log.With(logger, "component", "pipeline"), cfg.PipelineStages, cfg.JobName, registerer)
pipeline, err := stages.NewPipeline(log.With(logger, "component", "pipeline"), cfg.PipelineStages, &cfg.JobName, registerer)
if err != nil {
return nil, err
}
@ -87,14 +87,14 @@ func NewFileTargetManager(
switch cfg.EntryParser {
case api.CRI:
level.Warn(logger).Log("msg", "WARNING!!! entry_parser config is deprecated, please change to pipeline_stages")
cri, err := stages.NewCRI(logger, cfg.JobName, registerer)
cri, err := stages.NewCRI(logger, registerer)
if err != nil {
return nil, err
}
pipeline.AddStage(cri)
case api.Docker:
level.Warn(logger).Log("msg", "WARNING!!! entry_parser config is deprecated, please change to pipeline_stages")
docker, err := stages.NewDocker(logger, cfg.JobName, registerer)
docker, err := stages.NewDocker(logger, registerer)
if err != nil {
return nil, err
}

Loading…
Cancel
Save