From eaecf0ff90258b83cac29e8fac8cf1ea72c83b87 Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Tue, 21 May 2019 17:28:46 -0400 Subject: [PATCH] Making the pipeline itself a Stage so that we can use it to better implement the Match stage (and it cleans up the Docker and CRI extensions some too) Still needs to fix the metrics test --- pkg/logentry/stages/extensions.go | 142 +++++++-------------- pkg/logentry/stages/extensions_test.go | 4 +- pkg/logentry/stages/json_test.go | 2 +- pkg/logentry/stages/match.go | 84 ++++++++++-- pkg/logentry/stages/match_test.go | 42 ++++-- pkg/logentry/stages/metrics_test.go | 6 +- pkg/logentry/{ => stages}/pipeline.go | 26 ++-- pkg/logentry/{ => stages}/pipeline_test.go | 72 ++++++----- pkg/logentry/stages/regex_test.go | 6 +- pkg/logentry/stages/stage.go | 13 +- pkg/logentry/stages/timestamp.go | 1 - pkg/promtail/scrape/scrape.go | 4 +- pkg/promtail/targets/filetargetmanager.go | 7 +- 13 files changed, 229 insertions(+), 180 deletions(-) rename pkg/logentry/{ => stages}/pipeline.go (80%) rename pkg/logentry/{ => stages}/pipeline_test.go (71%) diff --git a/pkg/logentry/stages/extensions.go b/pkg/logentry/stages/extensions.go index c3560b0ddc..10566cfda4 100644 --- a/pkg/logentry/stages/extensions.go +++ b/pkg/logentry/stages/extensions.go @@ -1,114 +1,70 @@ package stages import ( - "time" - "github.com/go-kit/kit/log" - "github.com/prometheus/common/model" ) const RFC3339Nano = "RFC3339Nano" // NewDocker creates a Docker json log format specific pipeline stage. -func NewDocker(logger log.Logger) (Stage, error) { - - // JSON Stage to extract output, stream, and timestamp - jsonCfg := &JSONConfig{ - Expressions: map[string]string{ - "output": "log", - "stream": "stream", - "timestamp": "time", - }, - } - jsonStage, err := New(logger, StageTypeJSON, jsonCfg, nil) - if err != nil { - return nil, err - } - - // Set the stream as a label - lblCfg := &LabelsConfig{ - "stream": nil, - } - labelStage, err := New(logger, StageTypeLabel, lblCfg, nil) - if err != nil { - return nil, err - } - - // Parse the time into the current timestamp +func NewDocker(logger log.Logger, jobName string) (Stage, error) { t := "timestamp" f := RFC3339Nano - tsCfg := &TimestampConfig{ - &t, - &f, - } - tsStage, err := New(logger, StageTypeTimestamp, tsCfg, nil) - if err != nil { - return nil, err - } - - // Set the output to the log message o := "output" - outputCfg := &OutputConfig{ - &o, - } - outputStage, err := New(logger, StageTypeOutput, outputCfg, nil) - if err != nil { - return nil, err - } + stages := PipelineStages{ + PipelineStage{ + StageTypeJSON: JSONConfig{ + Expressions: map[string]string{ + "output": "log", + "stream": "stream", + "timestamp": "time", + }, + }}, + PipelineStage{ + StageTypeLabel: LabelsConfig{ + "stream": nil, + }}, + PipelineStage{ + StageTypeTimestamp: TimestampConfig{ + &t, + &f, + }}, + PipelineStage{ + StageTypeOutput: OutputConfig{ + &o, + }, + }} - return StageFunc(func(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) { - jsonStage.Process(labels, extracted, t, entry) - labelStage.Process(labels, extracted, t, entry) - tsStage.Process(labels, extracted, t, entry) - outputStage.Process(labels, extracted, t, entry) - }), nil + return NewPipeline(logger, stages, jobName+"_docker") } // NewCRI creates a CRI format specific pipeline stage -func NewCRI(logger log.Logger) (Stage, error) { - regexCfg := &RegexConfig{ - "^(?s)(?P