diff --git a/pkg/logentry/stages/extensions.go b/pkg/logentry/stages/extensions.go index c3560b0ddc..10566cfda4 100644 --- a/pkg/logentry/stages/extensions.go +++ b/pkg/logentry/stages/extensions.go @@ -1,114 +1,70 @@ package stages import ( - "time" - "github.com/go-kit/kit/log" - "github.com/prometheus/common/model" ) const RFC3339Nano = "RFC3339Nano" // NewDocker creates a Docker json log format specific pipeline stage. -func NewDocker(logger log.Logger) (Stage, error) { - - // JSON Stage to extract output, stream, and timestamp - jsonCfg := &JSONConfig{ - Expressions: map[string]string{ - "output": "log", - "stream": "stream", - "timestamp": "time", - }, - } - jsonStage, err := New(logger, StageTypeJSON, jsonCfg, nil) - if err != nil { - return nil, err - } - - // Set the stream as a label - lblCfg := &LabelsConfig{ - "stream": nil, - } - labelStage, err := New(logger, StageTypeLabel, lblCfg, nil) - if err != nil { - return nil, err - } - - // Parse the time into the current timestamp +func NewDocker(logger log.Logger, jobName string) (Stage, error) { t := "timestamp" f := RFC3339Nano - tsCfg := &TimestampConfig{ - &t, - &f, - } - tsStage, err := New(logger, StageTypeTimestamp, tsCfg, nil) - if err != nil { - return nil, err - } - - // Set the output to the log message o := "output" - outputCfg := &OutputConfig{ - &o, - } - outputStage, err := New(logger, StageTypeOutput, outputCfg, nil) - if err != nil { - return nil, err - } + stages := PipelineStages{ + PipelineStage{ + StageTypeJSON: JSONConfig{ + Expressions: map[string]string{ + "output": "log", + "stream": "stream", + "timestamp": "time", + }, + }}, + PipelineStage{ + StageTypeLabel: LabelsConfig{ + "stream": nil, + }}, + PipelineStage{ + StageTypeTimestamp: TimestampConfig{ + &t, + &f, + }}, + PipelineStage{ + StageTypeOutput: OutputConfig{ + &o, + }, + }} - return StageFunc(func(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) { - jsonStage.Process(labels, extracted, t, entry) - labelStage.Process(labels, extracted, t, entry) - tsStage.Process(labels, extracted, t, entry) - outputStage.Process(labels, extracted, t, entry) - }), nil + return NewPipeline(logger, stages, jobName+"_docker") } // NewCRI creates a CRI format specific pipeline stage -func NewCRI(logger log.Logger) (Stage, error) { - regexCfg := &RegexConfig{ - "^(?s)(?P