diff --git a/pkg/logentry/pipeline.go b/pkg/logentry/pipeline.go index afac87bdff..c58f72c716 100644 --- a/pkg/logentry/pipeline.go +++ b/pkg/logentry/pipeline.go @@ -24,7 +24,8 @@ func NewPipeline(log log.Logger, stgs PipelineStages) (*Pipeline, error) { for _, s := range stgs { stage, ok := s.(map[interface{}]interface{}) if !ok { - return nil, errors.New("invalid YAML config") + return nil, errors.Errorf("invalid YAML config, "+ + "make sure each stage of your pipeline is a YAML object (must end with a `:`), check stage `- %s`", s) } if len(stage) > 1 { return nil, errors.New("pipeline stage must contain only one key") @@ -47,6 +48,18 @@ func NewPipeline(log log.Logger, stgs PipelineStages) (*Pipeline, error) { return nil, errors.Wrap(err, "invalid regex stage config") } st = append(st, regex) + case "docker": + docker, err := stages.NewDocker(log) + if err != nil { + return nil, errors.Wrap(err, "invalid docker stage config") + } + st = append(st, docker) + case "cri": + cri, err := stages.NewCri(log) + if err != nil { + return nil, errors.Wrap(err, "invalid cri stage config") + } + st = append(st, cri) } } } @@ -71,3 +84,13 @@ func (p *Pipeline) Wrap(next api.EntryHandler) api.EntryHandler { return next.Handle(labels, timestamp, line) }) } + +// AddStage adds a stage to the pipeline +func (p *Pipeline) AddStage(stage stages.Stage) { + p.stages = append(p.stages, stage) +} + +// Size gets the current number of stages in the pipeline +func (p *Pipeline) Size() int { + return len(p.stages) +} diff --git a/pkg/logentry/pipeline_test.go b/pkg/logentry/pipeline_test.go index 75c529f0da..07e047b78d 100644 --- a/pkg/logentry/pipeline_test.go +++ b/pkg/logentry/pipeline_test.go @@ -2,27 +2,31 @@ package logentry import ( "testing" + "time" "github.com/cortexproject/cortex/pkg/util" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/assert" "gopkg.in/yaml.v2" ) +var rawTestLine = `{"log":"11.11.11.11 - frank [25/Jan/2000:14:00:01 -0500] \"GET /1986.js HTTP/1.1\" 200 932 \"-\" \"Mozilla/5.0 (Windows; U; Windows NT 5.1; de; rv:1.9.1.7) Gecko/20091221 Firefox/3.5.7 GTB6\"","stream":"stderr","time":"2019-04-30T02:12:41.8443515Z"}` +var processedTestLine = `11.11.11.11 - frank [25/Jan/2000:14:00:01 -0500] "GET /1986.js HTTP/1.1" 200 932 "-" "Mozilla/5.0 (Windows; U; Windows NT 5.1; de; rv:1.9.1.7) Gecko/20091221 Firefox/3.5.7 GTB6"` + var testYaml = ` pipeline_stages: +- docker: - regex: - expression: "./*" - labels: - stream: -- json: + expression: "^(?P\\S+) (?P\\S+) (?P\\S+) \\[(?P[\\w:/]+\\s[+\\-]\\d{4})\\] \"(?P\\S+)\\s?(?P\\S+)?\\s?(?P\\S+)?\" (?P\\d{3}|-) (?P\\d+|-)\\s?\"?(?P[^\"]*)\"?\\s?\"?(?P[^\"]*)?\"?$" timestamp: - source: time - format: RFC3339 + source: timestamp + format: "02/Jan/2006:15:04:05 -0700" labels: - stream: - source: json_key_name.json_sub_key_name - output: - source: log + action: + source: "action" + status_code: + source: "status" ` func TestNewPipeline(t *testing.T) { @@ -39,3 +43,60 @@ func TestNewPipeline(t *testing.T) { t.Fatal("missing stages") } } + +func TestPipeline_MultiStage(t *testing.T) { + est, err := time.LoadLocation("America/New_York") + if err != nil { + t.Fatal("could not parse timestamp", err) + } + + var config map[string]interface{} + err = yaml.Unmarshal([]byte(testYaml), &config) + if err != nil { + panic(err) + } + p, err := NewPipeline(util.Logger, config["pipeline_stages"].([]interface{})) + if err != nil { + panic(err) + } + + tests := map[string]struct { + entry string + expectedEntry string + t time.Time + expectedT time.Time + labels model.LabelSet + expectedLabels model.LabelSet + }{ + "happy path": { + rawTestLine, + processedTestLine, + time.Now(), + time.Date(2000, 01, 25, 14, 00, 01, 0, est), + map[model.LabelName]model.LabelValue{ + "test": "test", + }, + map[model.LabelName]model.LabelValue{ + "test": "test", + "stream": "stderr", + "action": "GET", + "status_code": "200", + }, + }, + } + + for tName, tt := range tests { + tt := tt + t.Run(tName, func(t *testing.T) { + t.Parallel() + + p.Process(tt.labels, &tt.t, &tt.entry) + + assert.Equal(t, tt.expectedLabels, tt.labels, "did not get expected labels") + assert.Equal(t, tt.expectedEntry, tt.entry, "did not receive expected log entry") + if tt.t.Unix() != tt.expectedT.Unix() { + t.Fatalf("mismatch ts want: %s got:%s", tt.expectedT, tt.t) + } + }) + } +} diff --git a/pkg/logentry/stages/extensions.go b/pkg/logentry/stages/extensions.go new file mode 100644 index 0000000000..184457a144 --- /dev/null +++ b/pkg/logentry/stages/extensions.go @@ -0,0 +1,44 @@ +package stages + +import ( + "github.com/go-kit/kit/log" +) + +// NewDocker creates a Docker json log format specific pipeline stage. +func NewDocker(logger log.Logger) (Stage, error) { + config := map[string]interface{}{ + "timestamp": map[string]interface{}{ + "source": "time", + "format": "RFC3339", + }, + "labels": map[string]interface{}{ + "stream": map[string]interface{}{ + "source": "stream", + }, + }, + "output": map[string]interface{}{ + "source": "log", + }, + } + return NewJSON(logger, config) +} + +// NewCri creates a CRI format specific pipeline stage +func NewCri(logger log.Logger) (Stage, error) { + config := map[string]interface{}{ + "expression": "^(?s)(?P