adding regex pipeline stage

pull/565/head
Edward Welch 7 years ago committed by Ed
parent 7b085e1ead
commit 0fac61801c
  1. 7
      pkg/logentry/pipeline.go
  2. 6
      pkg/logentry/pipeline_test.go
  3. 28
      pkg/logentry/stages/json.go
  4. 32
      pkg/logentry/stages/json_test.go
  5. 192
      pkg/logentry/stages/regex.go
  6. 371
      pkg/logentry/stages/regex_test.go
  7. 31
      pkg/logentry/stages/util.go
  8. 39
      pkg/logentry/stages/util_test.go
  9. 2
      pkg/promtail/scrape/scrape.go

@ -13,7 +13,7 @@ import (
// PipelineStages contains configuration for each stage within a pipeline
type PipelineStages []interface{}
// Pipeline pass down a log entry to each stage for mutation.
// Pipeline pass down a log entry to each stage for mutation and/or label extraction.
type Pipeline struct {
stages []stages.Stage
}
@ -42,6 +42,11 @@ func NewPipeline(log log.Logger, stgs PipelineStages) (*Pipeline, error) {
}
st = append(st, json)
case "regex":
regex, err := stages.NewRegex(log, config)
if err != nil {
return nil, errors.Wrap(err, "invalid regex stage config")
}
st = append(st, regex)
}
}
}

@ -11,7 +11,9 @@ import (
var testYaml = `
pipeline_stages:
- regex:
expr: "./*"
expression: "./*"
labels:
stream:
- json:
timestamp:
source: time
@ -33,7 +35,7 @@ func TestNewPipeline(t *testing.T) {
if err != nil {
panic(err)
}
if len(p.stages) != 1 {
if len(p.stages) != 2 {
t.Fatal("missing stages")
}
}

@ -207,31 +207,3 @@ func (j *jsonStage) Process(labels model.LabelSet, t *time.Time, entry *string)
}
}
}
// convertDateLayout converts pre-defined date format layout into date format
func convertDateLayout(predef string) string {
switch predef {
case "ANSIC":
return time.ANSIC
case "UnixDate":
return time.UnixDate
case "RubyDate":
return time.RubyDate
case "RFC822":
return time.RFC822
case "RFC822Z":
return time.RFC822Z
case "RFC850":
return time.RFC850
case "RFC1123":
return time.RFC1123
case "RFC1123Z":
return time.RFC1123Z
case "RFC3339":
return time.RFC3339
case "RFC3339Nano":
return time.RFC3339Nano
default:
return predef
}
}

@ -6,7 +6,6 @@ import (
"time"
"github.com/cortexproject/cortex/pkg/util"
"github.com/prometheus/common/model"
"gopkg.in/yaml.v2"
)
@ -331,34 +330,3 @@ func TestJSONParser_Parse(t *testing.T) {
})
}
}
func mustParseTime(layout, value string) time.Time {
t, err := time.Parse(layout, value)
if err != nil {
panic(err)
}
return t
}
func toLabelSet(lbs map[string]string) model.LabelSet {
res := model.LabelSet{}
for k, v := range lbs {
res[model.LabelName(k)] = model.LabelValue(v)
}
return res
}
func assertLabels(t *testing.T, expect map[string]string, got model.LabelSet) {
if len(expect) != len(got) {
t.Fatalf("labels are not equal in size want: %s got: %s", expect, got)
}
for k, v := range expect {
gotV, ok := got[model.LabelName(k)]
if !ok {
t.Fatalf("missing expected label key: %s", k)
}
if gotV != model.LabelValue(v) {
t.Fatalf("mismatch label value got: %s/%s want %s/%s", k, gotV, k, model.LabelValue(v))
}
}
}

@ -1,26 +1,190 @@
package stages
import (
"fmt"
"regexp"
"time"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
)
// RegexTimestamp configures timestamp extraction
type RegexTimestamp struct {
Source *string `mapstructure:"source"`
Format string `mapstructure:"format"`
}
// RegexLabel configures a labels value extraction
type RegexLabel struct {
Source *string `mapstructure:"source"`
}
// RegexOutput configures output value extraction
type RegexOutput struct {
Source *string `mapstructure:"source"`
}
// RegexConfig configures the log entry parser to extract value from regex
type RegexConfig struct {
Timestamp *RegexTimestamp `mapstructure:"timestamp"`
Expression string `mapstructure:"expression"`
Output *RegexOutput `mapstructure:"output"`
Labels map[string]*RegexLabel `mapstructure:"labels"`
}
func newRegexConfig(config interface{}) (*RegexConfig, error) {
cfg := &RegexConfig{}
err := mapstructure.Decode(config, cfg)
if err != nil {
return nil, err
}
return cfg, nil
}
const (
ErrExpressionRequired = "expression is required"
ErrCouldNotCompileRegex = "could not compile regular expression"
ErrEmptyRegexStageConfig = "empty regex parser configuration"
ErrOutputSourceRequired = "output source value is required if output is specified"
ErrTimestampSourceRequired = "timestamp source value is required if timestamp is specified"
ErrTimestampGroupRequired = "regex must contain a named group to match the timestamp with name: %s"
ErrTimestampFormatRequired = "timestamp format is required"
ErrInvalidLabelName = "invalid label name: %s"
)
// type Config struct {
// Expr string
// Labels []parser.Label
// }
// validate the config and return a
func (c *RegexConfig) validate() (*regexp.Regexp, error) {
type Regex struct {
expr *regexp.Regexp
if c.Output == nil && len(c.Labels) == 0 && c.Timestamp == nil {
return nil, errors.New(ErrEmptyRegexStageConfig)
}
if c.Expression == "" {
return nil, errors.New(ErrExpressionRequired)
}
expr, err := regexp.Compile(c.Expression)
if err != nil {
return nil, errors.Wrap(err, ErrCouldNotCompileRegex)
}
if c.Output != nil && (c.Output.Source == nil || (c.Output.Source != nil && *c.Output.Source == "")) {
return nil, errors.New(ErrOutputSourceRequired)
}
if c.Timestamp != nil {
if c.Timestamp.Source == nil || *c.Timestamp.Source == "" {
return nil, errors.New(ErrTimestampSourceRequired)
}
if c.Timestamp.Format == "" {
return nil, errors.New(ErrTimestampFormatRequired)
}
foundName := false
for _, n := range expr.SubexpNames() {
if n == *c.Timestamp.Source {
foundName = true
}
}
if !foundName {
return nil, errors.Errorf(ErrTimestampGroupRequired, *c.Timestamp.Source)
}
c.Timestamp.Format = convertDateLayout(c.Timestamp.Format)
}
for labelName, labelSrc := range c.Labels {
if !model.LabelName(labelName).IsValid() {
return nil, fmt.Errorf(ErrInvalidLabelName, labelName)
}
if labelSrc == nil || *labelSrc.Source == "" {
lName := labelName
c.Labels[labelName] = &RegexLabel{
&lName,
}
}
}
return expr, nil
}
// func NewRegex(config map[interface{}]interface{}) Regex {
type regexStage struct {
cfg *RegexConfig
expression *regexp.Regexp
logger log.Logger
}
// err := mapstructure.Decode(rg, &cfg2)
// return Regex{
// expr: regexp.MustCompile(config.Expr),
// }
// }
func NewRegex(logger log.Logger, config interface{}) (Stage, error) {
cfg, err := newRegexConfig(config)
if err != nil {
return nil, err
}
expression, err := cfg.validate()
if err != nil {
return nil, err
}
return &regexStage{
cfg: cfg,
expression: expression,
logger: log.With(logger, "component", "parser", "type", "regex"),
}, nil
}
func (r *regexStage) Process(labels model.LabelSet, t *time.Time, entry *string) {
if entry == nil {
level.Debug(r.logger).Log("msg", "cannot parse a nil entry")
return
}
match := r.expression.FindStringSubmatch(*entry)
if match == nil {
level.Debug(r.logger).Log("msg", "regex failed to match")
return
}
groups := make(map[string]string)
for i, name := range r.expression.SubexpNames() {
if i != 0 && name != "" {
groups[name] = match[i]
}
}
// func (r *Regex) Parse(labels model.LabelSet, time time.Time, entry string) (time.Time, string, error) {
// Parsing timestamp.
if r.cfg.Timestamp != nil {
if ts, ok := groups[*r.cfg.Timestamp.Source]; ok {
parsedTs, err := time.Parse(r.cfg.Timestamp.Format, ts)
if err != nil {
level.Debug(r.logger).Log("msg", "failed to parse time", "err", err, "format", r.cfg.Timestamp.Format, "value", ts)
} else {
*t = parsedTs
}
} else {
level.Debug(r.logger).Log("msg", "regex didn't match timestamp source")
}
}
// }
// Parsing labels.
for lName, lSrc := range r.cfg.Labels {
lValue, ok := groups[*lSrc.Source]
if !ok {
continue
}
labelValue := model.LabelValue(lValue)
if !labelValue.IsValid() {
level.Debug(r.logger).Log("msg", "invalid label value parsed", "value", labelValue)
continue
}
labels[model.LabelName(lName)] = labelValue
}
// Parsing output.
if r.cfg.Output != nil {
if o, ok := groups[*r.cfg.Output.Source]; ok {
*entry = o
} else {
level.Debug(r.logger).Log("msg", "regex didn't match output source")
}
}
}

@ -0,0 +1,371 @@
package stages
import (
"fmt"
"reflect"
"testing"
"time"
"github.com/cortexproject/cortex/pkg/util"
"github.com/pkg/errors"
"gopkg.in/yaml.v2"
)
var regexCfg = `regex:
timestamp:
source: time
format: RFC3339
labels:
stream:
source: stream
output:
source: log`
func TestRegexMapStructure(t *testing.T) {
t.Parallel()
// testing that we can use yaml data into mapstructure.
var mapstruct map[interface{}]interface{}
if err := yaml.Unmarshal([]byte(regexCfg), &mapstruct); err != nil {
t.Fatalf("error while un-marshalling config: %s", err)
}
p, ok := mapstruct["regex"].(map[interface{}]interface{})
if !ok {
t.Fatalf("could not read parser %+v", mapstruct["regex"])
}
got, err := newRegexConfig(p)
if err != nil {
t.Fatalf("could not create parser from yaml: %s", err)
}
want := &RegexConfig{
Labels: map[string]*RegexLabel{
"stream": &RegexLabel{
Source: String("stream"),
},
},
Output: &RegexOutput{Source: String("log")},
Timestamp: &RegexTimestamp{
Format: "RFC3339",
Source: String("time"),
},
}
if !reflect.DeepEqual(got, want) {
t.Fatalf("want: %+v got: %+v", want, got)
}
}
func TestRegexConfig_validate(t *testing.T) {
t.Parallel()
tests := map[string]struct {
config interface{}
err error
}{
"empty": {
map[string]interface{}{},
errors.New(ErrEmptyRegexStageConfig),
},
"missing output info": {
map[string]interface{}{
"expression": ".*",
"output": map[string]interface{}{},
},
errors.New(ErrOutputSourceRequired),
},
"missing regex_expression": {
map[string]interface{}{
"output": map[string]interface{}{},
},
errors.New(ErrExpressionRequired),
},
"invalid regex_expression": {
map[string]interface{}{
"expression": "(?P<ts[0-9]+).*",
"output": map[string]interface{}{},
},
errors.New(ErrCouldNotCompileRegex + ": error parsing regexp: invalid named capture: `(?P<ts[0-9]+).*`"),
},
"missing output source": {
map[string]interface{}{
"expression": ".*",
"output": map[string]interface{}{
"source": "",
},
},
errors.New(ErrOutputSourceRequired),
},
"invalid output source": {
map[string]interface{}{
"expression": ".*",
"output": map[string]interface{}{
"source": "[",
},
},
nil,
},
"missing timestamp source": {
map[string]interface{}{
"expression": ".*",
"timestamp": map[string]interface{}{
"source": "",
"format": "ANSIC",
},
},
errors.New(ErrTimestampSourceRequired),
},
"missing timestamp format": {
map[string]interface{}{
"expression": ".*",
"timestamp": map[string]interface{}{
"source": "test",
"format": "",
},
},
errors.New(ErrTimestampFormatRequired),
},
"invalid label name": {
map[string]interface{}{
"expression": ".*",
"labels": map[string]interface{}{
"": map[string]interface{}{},
},
},
fmt.Errorf(ErrInvalidLabelName, ""),
},
"invalid label source": {
map[string]interface{}{
"expression": ".*",
"labels": map[string]interface{}{
"stream": map[string]interface{}{
"source": "]",
},
},
},
nil,
},
"missing_timestamp_group": {
map[string]interface{}{
"expression": ".*",
"timestamp": map[string]interface{}{
"source": "ts",
"format": "RFC3339",
},
},
errors.Errorf(ErrTimestampGroupRequired, "ts"),
},
"valid": {
map[string]interface{}{
"expression": "(?P<ts>[0-9]+).*",
"output": map[string]interface{}{
"source": "log",
},
"timestamp": map[string]interface{}{
"source": "ts",
"format": "RFC3339",
},
"labels": map[string]interface{}{
"stream": map[string]interface{}{
"source": "test",
},
"app": map[string]interface{}{
"source": "app",
},
"level": nil,
},
},
nil,
},
}
for tName, tt := range tests {
tt := tt
t.Run(tName, func(t *testing.T) {
c, err := newRegexConfig(tt.config)
if err != nil {
t.Fatalf("failed to create config: %s", err)
}
_, err = c.validate()
if (err != nil) != (tt.err != nil) {
t.Errorf("RegexConfig.validate() expected error = %v, actual error = %v", tt.err, err)
return
}
if (err != nil) && (err.Error() != tt.err.Error()) {
t.Errorf("RegexConfig.validate() expected error = %v, actual error = %v", tt.err, err)
return
}
})
}
}
var regexLogFixture = `11.11.11.11 - frank [25/Jan/2000:14:00:01 -0500] "GET /1986.js HTTP/1.1" 200 932 "-" "Mozilla/5.0 (Windows; U; Windows NT 5.1; de; rv:1.9.1.7) Gecko/20091221 Firefox/3.5.7 GTB6"`
var regexLogFixture_missingLabel = `2016-10-06T00:17:09.669794202Z stdout k `
var regexLogFixture_invalidTimestamp = `2016-10-06sfsT00:17:09.669794202Z stdout k `
func TestRegexParser_Parse(t *testing.T) {
t.Parallel()
est, err := time.LoadLocation("America/New_York")
if err != nil {
t.Fatal("could not parse timestamp", err)
}
utc, err := time.LoadLocation("UTC")
if err != nil {
t.Fatal("could not parse timestamp", err)
}
tests := map[string]struct {
config interface{}
entry string
expectedEntry string
t time.Time
expectedT time.Time
labels map[string]string
expectedLabels map[string]string
}{
"happy path": {
map[string]interface{}{
"expression": "^(?P<ip>\\S+) (?P<identd>\\S+) (?P<user>\\S+) \\[(?P<timestamp>[\\w:/]+\\s[+\\-]\\d{4})\\] \"(?P<action>\\S+)\\s?(?P<path>\\S+)?\\s?(?P<protocol>\\S+)?\" (?P<status>\\d{3}|-) (?P<size>\\d+|-)\\s?\"?(?P<referer>[^\"]*)\"?\\s?\"?(?P<useragent>[^\"]*)?\"?$",
"timestamp": map[string]interface{}{
"source": "timestamp",
"format": "02/Jan/2006:15:04:05 -0700",
},
"labels": map[string]interface{}{
"action": map[string]interface{}{
"source": "action",
},
"status_code": map[string]interface{}{
"source": "status",
},
},
},
regexLogFixture,
regexLogFixture,
time.Now(),
time.Date(2000, 01, 25, 14, 00, 01, 0, est),
nil,
map[string]string{
"action": "GET",
"status_code": "200",
},
},
"modify output": {
map[string]interface{}{
"expression": "^(?P<ip>\\S+) (?P<identd>\\S+) (?P<user>\\S+) \\[(?P<timestamp>[\\w:/]+\\s[+\\-]\\d{4})\\] \"(?P<action>\\S+)\\s?(?P<path>\\S+)?\\s?(?P<protocol>\\S+)?\" (?P<status>\\d{3}|-) (?P<size>\\d+|-)\\s?\"?(?P<referer>[^\"]*)\"?\\s?\"?(?P<useragent>[^\"]*)?\"?$",
"timestamp": map[string]interface{}{
"source": "timestamp",
"format": "02/Jan/2006:15:04:05 -0700",
},
"labels": map[string]interface{}{
"action": map[string]interface{}{
"source": "action",
},
"status_code": map[string]interface{}{
"source": "status",
},
},
"output": map[string]interface{}{
"source": "path",
},
},
regexLogFixture,
"/1986.js",
time.Now(),
time.Date(2000, 01, 25, 14, 00, 01, 0, est),
nil,
map[string]string{
"action": "GET",
"status_code": "200",
},
},
"missing label": {
map[string]interface{}{
"expression": "^(?s)(?P<time>\\S+?) (?P<stream>stdout|stderr) (?P<flags>\\S+?) (?P<missing_label>.*)$",
"timestamp": map[string]interface{}{
"source": "time",
"format": "RFC3339",
},
"labels": map[string]interface{}{
"stream": map[string]interface{}{
"source": "stream",
},
"missing_label": map[string]interface{}{
"source": "missing_label",
},
},
},
regexLogFixture_missingLabel,
regexLogFixture_missingLabel,
time.Now(),
time.Date(2016, 10, 06, 00, 17, 9, 669794202, utc),
nil,
map[string]string{
"stream": "stdout",
"missing_label": "",
},
},
"invalid timestamp skipped": {
map[string]interface{}{
"expression": "^(?s)(?P<time>\\S+?) (?P<stream>stdout|stderr) (?P<flags>\\S+?) (?P<message>.*)$",
"timestamp": map[string]interface{}{
"source": "time",
"format": "UnixDate",
},
"labels": map[string]interface{}{
"stream": map[string]interface{}{
"source": "stream",
},
},
},
regexLogFixture_missingLabel,
regexLogFixture_missingLabel,
time.Date(2019, 10, 06, 00, 17, 9, 0, utc),
time.Date(2019, 10, 06, 00, 17, 9, 0, utc),
nil,
map[string]string{
"stream": "stdout",
},
},
"match failed": {
map[string]interface{}{
"expression": "^(?s)(?P<time>\\S+?) (?P<stream>stdout|stderr) (?P<flags>\\S+?) (?P<message>.*)$",
"timestamp": map[string]interface{}{
"source": "time",
"format": "UnixDate",
},
"labels": map[string]interface{}{
"stream": map[string]interface{}{
"source": "stream",
},
},
},
"blahblahblah",
"blahblahblah",
time.Date(2019, 10, 06, 00, 17, 9, 0, utc),
time.Date(2019, 10, 06, 00, 17, 9, 0, utc),
map[string]string{
"stream": "stdout",
},
map[string]string{
"stream": "stdout",
},
},
}
for tName, tt := range tests {
tt := tt
t.Run(tName, func(t *testing.T) {
p, err := NewRegex(util.Logger, tt.config)
if err != nil {
t.Fatalf("failed to create regex parser: %s", err)
}
lbs := toLabelSet(tt.labels)
p.Process(lbs, &tt.t, &tt.entry)
assertLabels(t, tt.expectedLabels, lbs)
if tt.entry != tt.expectedEntry {
t.Fatalf("mismatch entry want: %s got:%s", tt.expectedEntry, tt.entry)
}
if tt.t.Unix() != tt.expectedT.Unix() {
t.Fatalf("mismatch ts want: %s got:%s", tt.expectedT, tt.t)
}
})
}
}

@ -0,0 +1,31 @@
package stages
import "time"
// convertDateLayout converts pre-defined date format layout into date format
func convertDateLayout(predef string) string {
switch predef {
case "ANSIC":
return time.ANSIC
case "UnixDate":
return time.UnixDate
case "RubyDate":
return time.RubyDate
case "RFC822":
return time.RFC822
case "RFC822Z":
return time.RFC822Z
case "RFC850":
return time.RFC850
case "RFC1123":
return time.RFC1123
case "RFC1123Z":
return time.RFC1123Z
case "RFC3339":
return time.RFC3339
case "RFC3339Nano":
return time.RFC3339Nano
default:
return predef
}
}

@ -0,0 +1,39 @@
package stages
import (
"testing"
"time"
"github.com/prometheus/common/model"
)
func mustParseTime(layout, value string) time.Time {
t, err := time.Parse(layout, value)
if err != nil {
panic(err)
}
return t
}
func toLabelSet(lbs map[string]string) model.LabelSet {
res := model.LabelSet{}
for k, v := range lbs {
res[model.LabelName(k)] = model.LabelValue(v)
}
return res
}
func assertLabels(t *testing.T, expect map[string]string, got model.LabelSet) {
if len(expect) != len(got) {
t.Fatalf("labels are not equal in size want: %s got: %s", expect, got)
}
for k, v := range expect {
gotV, ok := got[model.LabelName(k)]
if !ok {
t.Fatalf("missing expected label key: %s", k)
}
if gotV != model.LabelValue(v) {
t.Fatalf("mismatch label value got: %s/%s want %s/%s", k, gotV, k, model.LabelValue(v))
}
}
}

@ -12,7 +12,7 @@ import (
// Config describes a job to scrape.
type Config struct {
JobName string `yaml:"job_name,omitempty"`
PipelineStages logentry.PipelineStages `yaml:"pipeline_stages,omitempty"`
PipelineStages logentry.PipelineStages `yaml:"pipeline_stages,omitempty"`
RelabelConfigs []*relabel.Config `yaml:"relabel_configs,omitempty"`
ServiceDiscoveryConfig sd_config.ServiceDiscoveryConfig `yaml:",inline"`
}

Loading…
Cancel
Save