From 0fac61801c27cb91ee84d10ddee165688a5cc39d Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Mon, 29 Apr 2019 17:12:10 -0400 Subject: [PATCH] adding regex pipeline stage --- pkg/logentry/pipeline.go | 7 +- pkg/logentry/pipeline_test.go | 6 +- pkg/logentry/stages/json.go | 28 --- pkg/logentry/stages/json_test.go | 32 --- pkg/logentry/stages/regex.go | 192 ++++++++++++++-- pkg/logentry/stages/regex_test.go | 371 ++++++++++++++++++++++++++++++ pkg/logentry/stages/util.go | 31 +++ pkg/logentry/stages/util_test.go | 39 ++++ pkg/promtail/scrape/scrape.go | 2 +- 9 files changed, 630 insertions(+), 78 deletions(-) create mode 100644 pkg/logentry/stages/regex_test.go create mode 100644 pkg/logentry/stages/util.go create mode 100644 pkg/logentry/stages/util_test.go diff --git a/pkg/logentry/pipeline.go b/pkg/logentry/pipeline.go index cec7cf997e..afac87bdff 100644 --- a/pkg/logentry/pipeline.go +++ b/pkg/logentry/pipeline.go @@ -13,7 +13,7 @@ import ( // PipelineStages contains configuration for each stage within a pipeline type PipelineStages []interface{} -// Pipeline pass down a log entry to each stage for mutation. +// Pipeline pass down a log entry to each stage for mutation and/or label extraction. type Pipeline struct { stages []stages.Stage } @@ -42,6 +42,11 @@ func NewPipeline(log log.Logger, stgs PipelineStages) (*Pipeline, error) { } st = append(st, json) case "regex": + regex, err := stages.NewRegex(log, config) + if err != nil { + return nil, errors.Wrap(err, "invalid regex stage config") + } + st = append(st, regex) } } } diff --git a/pkg/logentry/pipeline_test.go b/pkg/logentry/pipeline_test.go index cbd1e12327..75c529f0da 100644 --- a/pkg/logentry/pipeline_test.go +++ b/pkg/logentry/pipeline_test.go @@ -11,7 +11,9 @@ import ( var testYaml = ` pipeline_stages: - regex: - expr: "./*" + expression: "./*" + labels: + stream: - json: timestamp: source: time @@ -33,7 +35,7 @@ func TestNewPipeline(t *testing.T) { if err != nil { panic(err) } - if len(p.stages) != 1 { + if len(p.stages) != 2 { t.Fatal("missing stages") } } diff --git a/pkg/logentry/stages/json.go b/pkg/logentry/stages/json.go index 49d28af525..2ccb5a1337 100644 --- a/pkg/logentry/stages/json.go +++ b/pkg/logentry/stages/json.go @@ -207,31 +207,3 @@ func (j *jsonStage) Process(labels model.LabelSet, t *time.Time, entry *string) } } } - -// convertDateLayout converts pre-defined date format layout into date format -func convertDateLayout(predef string) string { - switch predef { - case "ANSIC": - return time.ANSIC - case "UnixDate": - return time.UnixDate - case "RubyDate": - return time.RubyDate - case "RFC822": - return time.RFC822 - case "RFC822Z": - return time.RFC822Z - case "RFC850": - return time.RFC850 - case "RFC1123": - return time.RFC1123 - case "RFC1123Z": - return time.RFC1123Z - case "RFC3339": - return time.RFC3339 - case "RFC3339Nano": - return time.RFC3339Nano - default: - return predef - } -} diff --git a/pkg/logentry/stages/json_test.go b/pkg/logentry/stages/json_test.go index 40d3aa5dcf..3756dbd901 100644 --- a/pkg/logentry/stages/json_test.go +++ b/pkg/logentry/stages/json_test.go @@ -6,7 +6,6 @@ import ( "time" "github.com/cortexproject/cortex/pkg/util" - "github.com/prometheus/common/model" "gopkg.in/yaml.v2" ) @@ -331,34 +330,3 @@ func TestJSONParser_Parse(t *testing.T) { }) } } - -func mustParseTime(layout, value string) time.Time { - t, err := time.Parse(layout, value) - if err != nil { - panic(err) - } - return t -} - -func toLabelSet(lbs map[string]string) model.LabelSet { - res := model.LabelSet{} - for k, v := range lbs { - res[model.LabelName(k)] = model.LabelValue(v) - } - return res -} - -func assertLabels(t *testing.T, expect map[string]string, got model.LabelSet) { - if len(expect) != len(got) { - t.Fatalf("labels are not equal in size want: %s got: %s", expect, got) - } - for k, v := range expect { - gotV, ok := got[model.LabelName(k)] - if !ok { - t.Fatalf("missing expected label key: %s", k) - } - if gotV != model.LabelValue(v) { - t.Fatalf("mismatch label value got: %s/%s want %s/%s", k, gotV, k, model.LabelValue(v)) - } - } -} diff --git a/pkg/logentry/stages/regex.go b/pkg/logentry/stages/regex.go index 55f2b8fbb4..49e8f8a505 100644 --- a/pkg/logentry/stages/regex.go +++ b/pkg/logentry/stages/regex.go @@ -1,26 +1,190 @@ package stages import ( + "fmt" "regexp" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/mitchellh/mapstructure" + "github.com/pkg/errors" + "github.com/prometheus/common/model" +) + +// RegexTimestamp configures timestamp extraction +type RegexTimestamp struct { + Source *string `mapstructure:"source"` + Format string `mapstructure:"format"` +} + +// RegexLabel configures a labels value extraction +type RegexLabel struct { + Source *string `mapstructure:"source"` +} + +// RegexOutput configures output value extraction +type RegexOutput struct { + Source *string `mapstructure:"source"` +} + +// RegexConfig configures the log entry parser to extract value from regex +type RegexConfig struct { + Timestamp *RegexTimestamp `mapstructure:"timestamp"` + Expression string `mapstructure:"expression"` + Output *RegexOutput `mapstructure:"output"` + Labels map[string]*RegexLabel `mapstructure:"labels"` +} + +func newRegexConfig(config interface{}) (*RegexConfig, error) { + cfg := &RegexConfig{} + err := mapstructure.Decode(config, cfg) + if err != nil { + return nil, err + } + return cfg, nil +} + +const ( + ErrExpressionRequired = "expression is required" + ErrCouldNotCompileRegex = "could not compile regular expression" + ErrEmptyRegexStageConfig = "empty regex parser configuration" + ErrOutputSourceRequired = "output source value is required if output is specified" + ErrTimestampSourceRequired = "timestamp source value is required if timestamp is specified" + ErrTimestampGroupRequired = "regex must contain a named group to match the timestamp with name: %s" + ErrTimestampFormatRequired = "timestamp format is required" + ErrInvalidLabelName = "invalid label name: %s" ) -// type Config struct { -// Expr string -// Labels []parser.Label -// } +// validate the config and return a +func (c *RegexConfig) validate() (*regexp.Regexp, error) { -type Regex struct { - expr *regexp.Regexp + if c.Output == nil && len(c.Labels) == 0 && c.Timestamp == nil { + return nil, errors.New(ErrEmptyRegexStageConfig) + } + + if c.Expression == "" { + return nil, errors.New(ErrExpressionRequired) + } + + expr, err := regexp.Compile(c.Expression) + if err != nil { + return nil, errors.Wrap(err, ErrCouldNotCompileRegex) + } + + if c.Output != nil && (c.Output.Source == nil || (c.Output.Source != nil && *c.Output.Source == "")) { + return nil, errors.New(ErrOutputSourceRequired) + } + + if c.Timestamp != nil { + if c.Timestamp.Source == nil || *c.Timestamp.Source == "" { + return nil, errors.New(ErrTimestampSourceRequired) + } + if c.Timestamp.Format == "" { + return nil, errors.New(ErrTimestampFormatRequired) + } + foundName := false + for _, n := range expr.SubexpNames() { + if n == *c.Timestamp.Source { + foundName = true + } + } + if !foundName { + return nil, errors.Errorf(ErrTimestampGroupRequired, *c.Timestamp.Source) + } + c.Timestamp.Format = convertDateLayout(c.Timestamp.Format) + } + + for labelName, labelSrc := range c.Labels { + if !model.LabelName(labelName).IsValid() { + return nil, fmt.Errorf(ErrInvalidLabelName, labelName) + } + if labelSrc == nil || *labelSrc.Source == "" { + lName := labelName + c.Labels[labelName] = &RegexLabel{ + &lName, + } + } + } + + return expr, nil } -// func NewRegex(config map[interface{}]interface{}) Regex { +type regexStage struct { + cfg *RegexConfig + expression *regexp.Regexp + logger log.Logger +} -// err := mapstructure.Decode(rg, &cfg2) -// return Regex{ -// expr: regexp.MustCompile(config.Expr), -// } -// } +func NewRegex(logger log.Logger, config interface{}) (Stage, error) { + cfg, err := newRegexConfig(config) + if err != nil { + return nil, err + } + expression, err := cfg.validate() + if err != nil { + return nil, err + } + return ®exStage{ + cfg: cfg, + expression: expression, + logger: log.With(logger, "component", "parser", "type", "regex"), + }, nil +} + +func (r *regexStage) Process(labels model.LabelSet, t *time.Time, entry *string) { + if entry == nil { + level.Debug(r.logger).Log("msg", "cannot parse a nil entry") + return + } + + match := r.expression.FindStringSubmatch(*entry) + if match == nil { + level.Debug(r.logger).Log("msg", "regex failed to match") + return + } + groups := make(map[string]string) + for i, name := range r.expression.SubexpNames() { + if i != 0 && name != "" { + groups[name] = match[i] + } + } -// func (r *Regex) Parse(labels model.LabelSet, time time.Time, entry string) (time.Time, string, error) { + // Parsing timestamp. + if r.cfg.Timestamp != nil { + if ts, ok := groups[*r.cfg.Timestamp.Source]; ok { + parsedTs, err := time.Parse(r.cfg.Timestamp.Format, ts) + if err != nil { + level.Debug(r.logger).Log("msg", "failed to parse time", "err", err, "format", r.cfg.Timestamp.Format, "value", ts) + } else { + *t = parsedTs + } + } else { + level.Debug(r.logger).Log("msg", "regex didn't match timestamp source") + } + } -// } + // Parsing labels. + for lName, lSrc := range r.cfg.Labels { + lValue, ok := groups[*lSrc.Source] + if !ok { + continue + } + labelValue := model.LabelValue(lValue) + if !labelValue.IsValid() { + level.Debug(r.logger).Log("msg", "invalid label value parsed", "value", labelValue) + continue + } + labels[model.LabelName(lName)] = labelValue + } + + // Parsing output. + if r.cfg.Output != nil { + if o, ok := groups[*r.cfg.Output.Source]; ok { + *entry = o + } else { + level.Debug(r.logger).Log("msg", "regex didn't match output source") + } + } + +} diff --git a/pkg/logentry/stages/regex_test.go b/pkg/logentry/stages/regex_test.go new file mode 100644 index 0000000000..37b039805f --- /dev/null +++ b/pkg/logentry/stages/regex_test.go @@ -0,0 +1,371 @@ +package stages + +import ( + "fmt" + "reflect" + "testing" + "time" + + "github.com/cortexproject/cortex/pkg/util" + "github.com/pkg/errors" + "gopkg.in/yaml.v2" +) + +var regexCfg = `regex: + timestamp: + source: time + format: RFC3339 + labels: + stream: + source: stream + output: + source: log` + +func TestRegexMapStructure(t *testing.T) { + t.Parallel() + + // testing that we can use yaml data into mapstructure. + var mapstruct map[interface{}]interface{} + if err := yaml.Unmarshal([]byte(regexCfg), &mapstruct); err != nil { + t.Fatalf("error while un-marshalling config: %s", err) + } + p, ok := mapstruct["regex"].(map[interface{}]interface{}) + if !ok { + t.Fatalf("could not read parser %+v", mapstruct["regex"]) + } + got, err := newRegexConfig(p) + if err != nil { + t.Fatalf("could not create parser from yaml: %s", err) + } + want := &RegexConfig{ + Labels: map[string]*RegexLabel{ + "stream": &RegexLabel{ + Source: String("stream"), + }, + }, + Output: &RegexOutput{Source: String("log")}, + Timestamp: &RegexTimestamp{ + Format: "RFC3339", + Source: String("time"), + }, + } + if !reflect.DeepEqual(got, want) { + t.Fatalf("want: %+v got: %+v", want, got) + } +} + +func TestRegexConfig_validate(t *testing.T) { + t.Parallel() + tests := map[string]struct { + config interface{} + err error + }{ + "empty": { + map[string]interface{}{}, + errors.New(ErrEmptyRegexStageConfig), + }, + "missing output info": { + map[string]interface{}{ + "expression": ".*", + "output": map[string]interface{}{}, + }, + errors.New(ErrOutputSourceRequired), + }, + "missing regex_expression": { + map[string]interface{}{ + "output": map[string]interface{}{}, + }, + errors.New(ErrExpressionRequired), + }, + "invalid regex_expression": { + map[string]interface{}{ + "expression": "(?P[0-9]+).*", + "output": map[string]interface{}{ + "source": "log", + }, + "timestamp": map[string]interface{}{ + "source": "ts", + "format": "RFC3339", + }, + "labels": map[string]interface{}{ + "stream": map[string]interface{}{ + "source": "test", + }, + "app": map[string]interface{}{ + "source": "app", + }, + "level": nil, + }, + }, + nil, + }, + } + for tName, tt := range tests { + tt := tt + t.Run(tName, func(t *testing.T) { + c, err := newRegexConfig(tt.config) + if err != nil { + t.Fatalf("failed to create config: %s", err) + } + _, err = c.validate() + if (err != nil) != (tt.err != nil) { + t.Errorf("RegexConfig.validate() expected error = %v, actual error = %v", tt.err, err) + return + } + if (err != nil) && (err.Error() != tt.err.Error()) { + t.Errorf("RegexConfig.validate() expected error = %v, actual error = %v", tt.err, err) + return + } + }) + } +} + +var regexLogFixture = `11.11.11.11 - frank [25/Jan/2000:14:00:01 -0500] "GET /1986.js HTTP/1.1" 200 932 "-" "Mozilla/5.0 (Windows; U; Windows NT 5.1; de; rv:1.9.1.7) Gecko/20091221 Firefox/3.5.7 GTB6"` +var regexLogFixture_missingLabel = `2016-10-06T00:17:09.669794202Z stdout k ` +var regexLogFixture_invalidTimestamp = `2016-10-06sfsT00:17:09.669794202Z stdout k ` + +func TestRegexParser_Parse(t *testing.T) { + t.Parallel() + + est, err := time.LoadLocation("America/New_York") + if err != nil { + t.Fatal("could not parse timestamp", err) + } + + utc, err := time.LoadLocation("UTC") + if err != nil { + t.Fatal("could not parse timestamp", err) + } + + tests := map[string]struct { + config interface{} + entry string + expectedEntry string + t time.Time + expectedT time.Time + labels map[string]string + expectedLabels map[string]string + }{ + "happy path": { + map[string]interface{}{ + "expression": "^(?P\\S+) (?P\\S+) (?P\\S+) \\[(?P[\\w:/]+\\s[+\\-]\\d{4})\\] \"(?P\\S+)\\s?(?P\\S+)?\\s?(?P\\S+)?\" (?P\\d{3}|-) (?P\\d+|-)\\s?\"?(?P[^\"]*)\"?\\s?\"?(?P[^\"]*)?\"?$", + "timestamp": map[string]interface{}{ + "source": "timestamp", + "format": "02/Jan/2006:15:04:05 -0700", + }, + "labels": map[string]interface{}{ + "action": map[string]interface{}{ + "source": "action", + }, + "status_code": map[string]interface{}{ + "source": "status", + }, + }, + }, + regexLogFixture, + regexLogFixture, + time.Now(), + time.Date(2000, 01, 25, 14, 00, 01, 0, est), + nil, + map[string]string{ + "action": "GET", + "status_code": "200", + }, + }, + "modify output": { + map[string]interface{}{ + "expression": "^(?P\\S+) (?P\\S+) (?P\\S+) \\[(?P[\\w:/]+\\s[+\\-]\\d{4})\\] \"(?P\\S+)\\s?(?P\\S+)?\\s?(?P\\S+)?\" (?P\\d{3}|-) (?P\\d+|-)\\s?\"?(?P[^\"]*)\"?\\s?\"?(?P[^\"]*)?\"?$", + "timestamp": map[string]interface{}{ + "source": "timestamp", + "format": "02/Jan/2006:15:04:05 -0700", + }, + "labels": map[string]interface{}{ + "action": map[string]interface{}{ + "source": "action", + }, + "status_code": map[string]interface{}{ + "source": "status", + }, + }, + "output": map[string]interface{}{ + "source": "path", + }, + }, + regexLogFixture, + "/1986.js", + time.Now(), + time.Date(2000, 01, 25, 14, 00, 01, 0, est), + nil, + map[string]string{ + "action": "GET", + "status_code": "200", + }, + }, + "missing label": { + map[string]interface{}{ + "expression": "^(?s)(?P