Making the pipeline itself a Stage so that we can use it to better implement the Match stage (and it cleans up the Docker and CRI extensions some too)

Still needs to fix the metrics test
pull/640/head
Edward Welch 7 years ago committed by Ed
parent 07da8e1d11
commit eaecf0ff90
  1. 142
      pkg/logentry/stages/extensions.go
  2. 4
      pkg/logentry/stages/extensions_test.go
  3. 2
      pkg/logentry/stages/json_test.go
  4. 84
      pkg/logentry/stages/match.go
  5. 42
      pkg/logentry/stages/match_test.go
  6. 6
      pkg/logentry/stages/metrics_test.go
  7. 26
      pkg/logentry/stages/pipeline.go
  8. 72
      pkg/logentry/stages/pipeline_test.go
  9. 6
      pkg/logentry/stages/regex_test.go
  10. 13
      pkg/logentry/stages/stage.go
  11. 1
      pkg/logentry/stages/timestamp.go
  12. 4
      pkg/promtail/scrape/scrape.go
  13. 7
      pkg/promtail/targets/filetargetmanager.go

@ -1,114 +1,70 @@
package stages
import (
"time"
"github.com/go-kit/kit/log"
"github.com/prometheus/common/model"
)
const RFC3339Nano = "RFC3339Nano"
// NewDocker creates a Docker json log format specific pipeline stage.
func NewDocker(logger log.Logger) (Stage, error) {
// JSON Stage to extract output, stream, and timestamp
jsonCfg := &JSONConfig{
Expressions: map[string]string{
"output": "log",
"stream": "stream",
"timestamp": "time",
},
}
jsonStage, err := New(logger, StageTypeJSON, jsonCfg, nil)
if err != nil {
return nil, err
}
// Set the stream as a label
lblCfg := &LabelsConfig{
"stream": nil,
}
labelStage, err := New(logger, StageTypeLabel, lblCfg, nil)
if err != nil {
return nil, err
}
// Parse the time into the current timestamp
func NewDocker(logger log.Logger, jobName string) (Stage, error) {
t := "timestamp"
f := RFC3339Nano
tsCfg := &TimestampConfig{
&t,
&f,
}
tsStage, err := New(logger, StageTypeTimestamp, tsCfg, nil)
if err != nil {
return nil, err
}
// Set the output to the log message
o := "output"
outputCfg := &OutputConfig{
&o,
}
outputStage, err := New(logger, StageTypeOutput, outputCfg, nil)
if err != nil {
return nil, err
}
stages := PipelineStages{
PipelineStage{
StageTypeJSON: JSONConfig{
Expressions: map[string]string{
"output": "log",
"stream": "stream",
"timestamp": "time",
},
}},
PipelineStage{
StageTypeLabel: LabelsConfig{
"stream": nil,
}},
PipelineStage{
StageTypeTimestamp: TimestampConfig{
&t,
&f,
}},
PipelineStage{
StageTypeOutput: OutputConfig{
&o,
},
}}
return StageFunc(func(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) {
jsonStage.Process(labels, extracted, t, entry)
labelStage.Process(labels, extracted, t, entry)
tsStage.Process(labels, extracted, t, entry)
outputStage.Process(labels, extracted, t, entry)
}), nil
return NewPipeline(logger, stages, jobName+"_docker")
}
// NewCRI creates a CRI format specific pipeline stage
func NewCRI(logger log.Logger) (Stage, error) {
regexCfg := &RegexConfig{
"^(?s)(?P<time>\\S+?) (?P<stream>stdout|stderr) (?P<flags>\\S+?) (?P<content>.*)$",
}
regexStage, err := New(logger, StageTypeRegex, regexCfg, nil)
if err != nil {
return nil, err
}
// Set the stream as a label
lblCfg := &LabelsConfig{
"stream": nil,
}
labelStage, err := New(logger, StageTypeLabel, lblCfg, nil)
if err != nil {
return nil, err
}
// Parse the time into the current timestamp
func NewCRI(logger log.Logger, jobName string) (Stage, error) {
t := "time"
f := RFC3339Nano
tsCfg := &TimestampConfig{
&t,
&f,
}
tsStage, err := New(logger, StageTypeTimestamp, tsCfg, nil)
if err != nil {
return nil, err
}
// Set the output to the log message
o := "content"
outputCfg := &OutputConfig{
&o,
}
outputStage, err := New(logger, StageTypeOutput, outputCfg, nil)
if err != nil {
return nil, err
stages := PipelineStages{
PipelineStage{
StageTypeRegex: RegexConfig{
"^(?s)(?P<time>\\S+?) (?P<stream>stdout|stderr) (?P<flags>\\S+?) (?P<content>.*)$",
},
},
PipelineStage{
StageTypeLabel: LabelsConfig{
"stream": nil,
},
},
PipelineStage{
StageTypeTimestamp: TimestampConfig{
&t,
&f,
},
},
PipelineStage{
StageTypeOutput: OutputConfig{
&o,
},
},
}
return StageFunc(func(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) {
regexStage.Process(labels, extracted, t, entry)
labelStage.Process(labels, extracted, t, entry)
tsStage.Process(labels, extracted, t, entry)
outputStage.Process(labels, extracted, t, entry)
}), nil
return NewPipeline(logger, stages, jobName+"_cri")
}

@ -64,7 +64,7 @@ func TestNewDocker(t *testing.T) {
tt := tt
t.Run(tName, func(t *testing.T) {
t.Parallel()
p, err := NewDocker(util.Logger)
p, err := NewDocker(util.Logger, "test")
if err != nil {
t.Fatalf("failed to create Docker parser: %s", err)
}
@ -140,7 +140,7 @@ func TestNewCri(t *testing.T) {
tt := tt
t.Run(tName, func(t *testing.T) {
t.Parallel()
p, err := NewCRI(util.Logger)
p, err := NewCRI(util.Logger, "test")
if err != nil {
t.Fatalf("failed to create CRI parser: %s", err)
}

@ -174,7 +174,7 @@ func TestJSONParser_Parse(t *testing.T) {
tt := tt
t.Run(tName, func(t *testing.T) {
t.Parallel()
p, err := New(util.Logger, StageTypeJSON, tt.config, nil)
p, err := New(util.Logger, "test", StageTypeJSON, tt.config, nil)
if err != nil {
t.Fatalf("failed to create json parser: %s", err)
}

@ -3,27 +3,85 @@ package stages
import (
"time"
"github.com/go-kit/kit/log"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/grafana/loki/pkg/logql"
)
"github.com/prometheus/common/model"
const (
ErrEmptyMatchStageConfig = "match stage config cannot be empty"
ErrPipelineNameRequired = "match stage must specify a pipeline name which is used in the exported metrics"
ErrSelectorRequired = "selector statement required for match stage"
ErrMatchRequiresStages = "match stage requires at least one additional stage to be defined in '- stages'"
ErrSelectorSyntax = "invalid selector syntax for match stage"
)
// withMatcher runs a stage if matcher matches an entry labelset.
func withMatcher(s Stage, matcher string) (Stage, error) {
if matcher == "" {
return s, nil
type MatcherConfig struct {
PipelineName *string `mapstructure:"pipeline_name"`
Selector *string `mapstructure:"selector"`
Stages PipelineStages `mapstructure:"stages"`
}
func validateMatcherConfig(cfg *MatcherConfig) ([]*labels.Matcher, error) {
if cfg == nil {
return nil, errors.New(ErrEmptyMatchStageConfig)
}
if cfg.PipelineName == nil || *cfg.PipelineName == "" {
return nil, errors.New(ErrPipelineNameRequired)
}
if cfg.Selector == nil || *cfg.Selector == "" {
return nil, errors.New(ErrSelectorRequired)
}
if cfg.Stages == nil || len(cfg.Stages) == 0 {
return nil, errors.New(ErrMatchRequiresStages)
}
matchers, err := logql.ParseMatchers(*cfg.Selector)
if err != nil {
return nil, errors.Wrap(err, ErrSelectorSyntax)
}
return matchers, nil
}
func newMatcherStage(logger log.Logger, config interface{}) (Stage, error) {
cfg := &MatcherConfig{}
err := mapstructure.Decode(config, cfg)
if err != nil {
return nil, err
}
matchers, err := logql.ParseMatchers(matcher)
matchers, err := validateMatcherConfig(cfg)
if err != nil {
return nil, err
}
return StageFunc(func(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) {
for _, filter := range matchers {
if !filter.Matches(string(labels[model.LabelName(filter.Name)])) {
return
}
pl, err := NewPipeline(logger, cfg.Stages, *cfg.PipelineName)
if err != nil {
return nil, errors.Wrapf(err, "match stage %s failed to create pipeline", *cfg.PipelineName)
}
return &matcherStage{
cfgs: cfg,
logger: logger,
matchers: matchers,
pipeline: pl,
}, nil
}
type matcherStage struct {
cfgs *MatcherConfig
matchers []*labels.Matcher
logger log.Logger
pipeline Stage
}
func (m *matcherStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) {
for _, filter := range m.matchers {
if !filter.Matches(string(labels[model.LabelName(filter.Name)])) {
return
}
s.Process(labels, extracted, t, entry)
}), nil
}
m.pipeline.Process(labels, extracted, t, entry)
}

@ -5,16 +5,12 @@ import (
"testing"
"time"
"github.com/prometheus/common/model"
"github.com/cortexproject/cortex/pkg/util"
)
type hasRunStage bool
var pipelineName = "pl_name"
func (h *hasRunStage) Process(labels model.LabelSet, exctracted map[string]interface{}, t *time.Time, entry *string) {
*h = true
}
func Test_withMatcher(t *testing.T) {
func TestMatcher(t *testing.T) {
t.Parallel()
tests := []struct {
matcher string
@ -28,7 +24,7 @@ func Test_withMatcher(t *testing.T) {
{"foo", map[string]string{"foo": "bar"}, false, true},
{"{}", map[string]string{"foo": "bar"}, false, true},
{"{", map[string]string{"foo": "bar"}, false, true},
{"", map[string]string{"foo": "bar"}, true, false},
{"", map[string]string{"foo": "bar"}, true, true},
{"{foo=\"bar\"}", map[string]string{"foo": "bar"}, true, false},
{"{foo=\"\"}", map[string]string{"foo": "bar"}, false, false},
{"{foo=\"\"}", map[string]string{}, true, false},
@ -38,20 +34,40 @@ func Test_withMatcher(t *testing.T) {
{"{foo=\"bar\",bar=~\"te.*\"}", map[string]string{"foo": "bar", "bar": "test"}, true, false},
{"{foo=\"bar\",bar!~\"te.*\"}", map[string]string{"foo": "bar", "bar": "test"}, false, false},
}
for _, tt := range tests {
t.Run(fmt.Sprintf("%s/%s", tt.matcher, tt.labels), func(t *testing.T) {
hasRun := hasRunStage(false)
s, err := withMatcher(&hasRun, tt.matcher)
// Build a match config which has a simple label stage that when matched will add the test_label to
// the labels in the pipeline.
matchConfig := MatcherConfig{
&pipelineName,
&tt.matcher,
PipelineStages{
PipelineStage{
StageTypeLabel: LabelsConfig{
"test_label": nil,
},
},
},
}
s, err := newMatcherStage(util.Logger, matchConfig)
if (err != nil) != tt.wantErr {
t.Errorf("withMatcher() error = %v, wantErr %v", err, tt.wantErr)
return
}
if s != nil {
ts, entry := time.Now(), ""
s.Process(toLabelSet(tt.labels), map[string]interface{}{}, &ts, &entry)
extracted := map[string]interface{}{
"test_label": "unimportant value",
}
labels := toLabelSet(tt.labels)
s.Process(labels, extracted, &ts, &entry)
if bool(hasRun) != tt.shouldRun {
t.Error("stage ran but should have not")
// test_label should only be in the label set if the stage ran
if _, ok := labels["test_label"]; ok {
if !tt.shouldRun {
t.Error("stage ran but should have not")
}
}
}
})

@ -80,15 +80,15 @@ func Test_withMetric(t *testing.T) {
}
registry := prometheus.NewRegistry()
jsonStage, err := New(util.Logger, StageTypeJSON, jsonConfig, registry)
jsonStage, err := New(util.Logger, "test", StageTypeJSON, jsonConfig, registry)
if err != nil {
t.Fatalf("failed to create stage with metrics: %v", err)
}
regexStage, err := New(util.Logger, StageTypeRegex, regexConfig, registry)
regexStage, err := New(util.Logger, "test", StageTypeRegex, regexConfig, registry)
if err != nil {
t.Fatalf("failed to create stage with metrics: %v", err)
}
metricStage, err := New(util.Logger, StageTypeMetric, metricsConfig, registry)
metricStage, err := New(util.Logger, "test", StageTypeMetric, metricsConfig, registry)
if err != nil {
t.Fatalf("failed to create stage with metrics: %v", err)
}

@ -1,4 +1,4 @@
package logentry
package stages
import (
"time"
@ -10,7 +10,6 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/grafana/loki/pkg/logentry/stages"
"github.com/grafana/loki/pkg/promtail/api"
)
@ -24,20 +23,23 @@ var (
)
// PipelineStages contains configuration for each stage within a pipeline
type PipelineStages []interface{}
type PipelineStages = []interface{}
// PipelineStage contains configuration for a single pipeline stage
type PipelineStage = map[interface{}]interface{}
// Pipeline pass down a log entry to each stage for mutation and/or label extraction.
type Pipeline struct {
logger log.Logger
stages []stages.Stage
stages []Stage
jobName string
}
// NewPipeline creates a new log entry pipeline from a configuration
func NewPipeline(logger log.Logger, stgs PipelineStages, jobName string) (*Pipeline, error) {
st := []stages.Stage{}
st := []Stage{}
for _, s := range stgs {
stage, ok := s.(map[interface{}]interface{})
stage, ok := s.(PipelineStage)
if !ok {
return nil, errors.Errorf("invalid YAML config, "+
"make sure each stage of your pipeline is a YAML object (must end with a `:`), check stage `- %s`", s)
@ -50,7 +52,7 @@ func NewPipeline(logger log.Logger, stgs PipelineStages, jobName string) (*Pipel
if !ok {
return nil, errors.New("pipeline stage key must be a string")
}
newStage, err := stages.New(logger, name, config, prometheus.DefaultRegisterer)
newStage, err := New(logger, jobName, name, config, prometheus.DefaultRegisterer)
if err != nil {
return nil, errors.Wrapf(err, "invalid %s stage config", name)
}
@ -64,10 +66,9 @@ func NewPipeline(logger log.Logger, stgs PipelineStages, jobName string) (*Pipel
}, nil
}
// Process mutates an entry and its metadata by using multiple configure stage.
func (p *Pipeline) Process(labels model.LabelSet, ts *time.Time, entry *string) {
// Process implements Stage allowing a pipeline stage to also be an entire pipeline
func (p *Pipeline) Process(labels model.LabelSet, extracted map[string]interface{}, ts *time.Time, entry *string) {
start := time.Now()
extracted := map[string]interface{}{}
for i, stage := range p.stages {
level.Debug(p.logger).Log("msg", "processing pipeline", "stage", i, "labels", labels, "time", ts, "entry", entry)
stage.Process(labels, extracted, ts, entry)
@ -80,13 +81,14 @@ func (p *Pipeline) Process(labels model.LabelSet, ts *time.Time, entry *string)
// Wrap implements EntryMiddleware
func (p *Pipeline) Wrap(next api.EntryHandler) api.EntryHandler {
return api.EntryHandlerFunc(func(labels model.LabelSet, timestamp time.Time, line string) error {
p.Process(labels, &timestamp, &line)
extracted := map[string]interface{}{}
p.Process(labels, extracted, &timestamp, &line)
return next.Handle(labels, timestamp, line)
})
}
// AddStage adds a stage to the pipeline
func (p *Pipeline) AddStage(stage stages.Stage) {
func (p *Pipeline) AddStage(stage Stage) {
p.stages = append(p.stages, stage)
}

@ -1,4 +1,4 @@
package logentry
package stages
import (
"testing"
@ -9,34 +9,32 @@ import (
"github.com/go-kit/kit/log/level"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v2"
)
var rawTestLine = `{"log":"11.11.11.11 - frank [25/Jan/2000:14:00:01 -0500] \"GET /1986.js HTTP/1.1\" 200 932 \"-\" \"Mozilla/5.0 (Windows; U; Windows NT 5.1; de; rv:1.9.1.7) Gecko/20091221 Firefox/3.5.7 GTB6\"","stream":"stderr","time":"2019-04-30T02:12:41.8443515Z"}`
var processedTestLine = `11.11.11.11 - frank [25/Jan/2000:14:00:01 -0500] "GET /1986.js HTTP/1.1" 200 932 "-" "Mozilla/5.0 (Windows; U; Windows NT 5.1; de; rv:1.9.1.7) Gecko/20091221 Firefox/3.5.7 GTB6"`
var (
ct = time.Now()
rawTestLine = `{"log":"11.11.11.11 - frank [25/Jan/2000:14:00:01 -0500] \"GET /1986.js HTTP/1.1\" 200 932 \"-\" \"Mozilla/5.0 (Windows; U; Windows NT 5.1; de; rv:1.9.1.7) Gecko/20091221 Firefox/3.5.7 GTB6\"","stream":"stderr","time":"2019-04-30T02:12:41.8443515Z"}`
processedTestLine = `11.11.11.11 - frank [25/Jan/2000:14:00:01 -0500] "GET /1986.js HTTP/1.1" 200 932 "-" "Mozilla/5.0 (Windows; U; Windows NT 5.1; de; rv:1.9.1.7) Gecko/20091221 Firefox/3.5.7 GTB6"`
)
var testYaml = `
pipeline_stages:
- match:
selector:
pipeline_name: "test_pipeline"
selector: "{match=\"true\"}"
stages:
- match:
matchers:
stages:
- json:
- docker:
- regex:
- docker:
- regex:
expression: "^(?P<ip>\\S+) (?P<identd>\\S+) (?P<user>\\S+) \\[(?P<timestamp>[\\w:/]+\\s[+\\-]\\d{4})\\] \"(?P<action>\\S+)\\s?(?P<path>\\S+)?\\s?(?P<protocol>\\S+)?\" (?P<status>\\d{3}|-) (?P<size>\\d+|-)\\s?\"?(?P<referer>[^\"]*)\"?\\s?\"?(?P<useragent>[^\"]*)?\"?$"
- timestamp:
source: timestamp
format: "02/Jan/2006:15:04:05 -0700"
- labels:
action:
status_code: "status"
- metrics:
expression: "^(?P<ip>\\S+) (?P<identd>\\S+) (?P<user>\\S+) \\[(?P<timestamp>[\\w:/]+\\s[+\\-]\\d{4})\\] \"(?P<action>\\S+)\\s?(?P<path>\\S+)?\\s?(?P<protocol>\\S+)?\" (?P<status>\\d{3}|-) (?P<size>\\d+|-)\\s?\"?(?P<referer>[^\"]*)\"?\\s?\"?(?P<useragent>[^\"]*)?\"?$"
- timestamp:
source: timestamp
format: "02/Jan/2006:15:04:05 -0700"
- labels:
action:
status_code: "status"
`
func loadConfig(yml string) PipelineStages {
@ -54,9 +52,7 @@ func TestNewPipeline(t *testing.T) {
if err != nil {
panic(err)
}
if len(p.stages) != 2 {
t.Fatal("missing stages")
}
require.Equal(t, 1, len(p.stages))
}
func TestPipeline_MultiStage(t *testing.T) {
@ -89,23 +85,35 @@ func TestPipeline_MultiStage(t *testing.T) {
time.Now(),
time.Date(2000, 01, 25, 14, 00, 01, 0, est),
map[model.LabelName]model.LabelValue{
"test": "test",
"match": "true",
},
map[model.LabelName]model.LabelValue{
"test": "test",
"match": "true",
"stream": "stderr",
"action": "GET",
"status_code": "200",
},
},
"no match": {
rawTestLine,
rawTestLine,
ct,
ct,
map[model.LabelName]model.LabelValue{
"nomatch": "true",
},
map[model.LabelName]model.LabelValue{
"nomatch": "true",
},
},
}
for tName, tt := range tests {
tt := tt
t.Run(tName, func(t *testing.T) {
t.Parallel()
p.Process(tt.labels, &tt.t, &tt.entry)
extracted := map[string]interface{}{}
p.Process(tt.labels, extracted, &tt.t, &tt.entry)
assert.Equal(t, tt.expectedLabels, tt.labels, "did not get expected labels")
assert.Equal(t, tt.expectedEntry, tt.entry, "did not receive expected log entry")
@ -144,7 +152,8 @@ func TestPipeline_Output(t *testing.T) {
lbls := model.LabelSet{}
ts := time.Now()
entry := testOutputLogLine
pl.Process(lbls, &ts, &entry)
extracted := map[string]interface{}{}
pl.Process(lbls, extracted, &ts, &entry)
assert.Equal(t, "this is a log line", entry)
}
@ -181,7 +190,8 @@ func TestPipeline_Labels(t *testing.T) {
}
ts := time.Now()
entry := testLabelsLogLine
pl.Process(lbls, &ts, &entry)
extracted := map[string]interface{}{}
pl.Process(lbls, extracted, &ts, &entry)
assert.Equal(t, expectedLbls, lbls)
}
@ -212,7 +222,8 @@ func TestPipeline_Timestamp(t *testing.T) {
lbls := model.LabelSet{}
ts := time.Now()
entry := testTimestampLogLine
pl.Process(lbls, &ts, &entry)
extracted := map[string]interface{}{}
pl.Process(lbls, extracted, &ts, &entry)
assert.Equal(t, time.Date(2012, 11, 01, 22, 8, 41, 0, time.FixedZone("", 0)), ts)
}
@ -224,7 +235,7 @@ var (
debugLogger = level.NewFilter(l, level.AllowDebug())
)
func Benchmark(b *testing.B) {
func BenchmarkPipeline(b *testing.B) {
benchmarks := []struct {
name string
stgs PipelineStages
@ -254,7 +265,8 @@ func Benchmark(b *testing.B) {
ts := time.Now()
for i := 0; i < b.N; i++ {
entry := bm.entry
pl.Process(lb, &ts, &entry)
extracted := map[string]interface{}{}
pl.Process(lb, extracted, &ts, &entry)
}
})
}

@ -129,7 +129,7 @@ func TestRegexParser_Parse(t *testing.T) {
tt := tt
t.Run(tName, func(t *testing.T) {
t.Parallel()
p, err := New(util.Logger, StageTypeRegex, tt.config, nil)
p, err := New(util.Logger, "test", StageTypeRegex, tt.config, nil)
if err != nil {
t.Fatalf("failed to create regex parser: %s", err)
}
@ -143,7 +143,7 @@ func TestRegexParser_Parse(t *testing.T) {
}
}
func Benchmark(b *testing.B) {
func BenchmarkRegexStage(b *testing.B) {
benchmarks := []struct {
name string
config map[string]interface{}
@ -171,7 +171,7 @@ func Benchmark(b *testing.B) {
}
for _, bm := range benchmarks {
b.Run(bm.name, func(b *testing.B) {
stage, err := New(util.Logger, StageTypeRegex, bm.config, nil)
stage, err := New(util.Logger, "test", StageTypeRegex, bm.config, nil)
if err != nil {
panic(err)
}

@ -18,6 +18,7 @@ const (
StageTypeOutput = "output"
StageTypeDocker = "docker"
StageTypeCRI = "cri"
StageTypeMatch = "match"
)
// Stage takes an existing set of labels, timestamp and log entry and returns either a possibly mutated
@ -35,17 +36,18 @@ func (s StageFunc) Process(labels model.LabelSet, extracted map[string]interface
}
// New creates a new stage for the given type and configuration.
func New(logger log.Logger, stageType string, cfg interface{}, registerer prometheus.Registerer) (Stage, error) {
func New(logger log.Logger, jobName string, stageType string,
cfg interface{}, registerer prometheus.Registerer) (Stage, error) {
var s Stage
var err error
switch stageType {
case StageTypeDocker:
s, err = NewDocker(logger)
s, err = NewDocker(logger, jobName)
if err != nil {
return nil, err
}
case StageTypeCRI:
s, err = NewCRI(logger)
s, err = NewCRI(logger, jobName)
if err != nil {
return nil, err
}
@ -79,6 +81,11 @@ func New(logger log.Logger, stageType string, cfg interface{}, registerer promet
if err != nil {
return nil, err
}
case StageTypeMatch:
s, err = newMatcherStage(logger, cfg)
if err != nil {
return nil, err
}
default:
return nil, errors.Errorf("Unknown stage type: %s", stageType)
}

@ -15,7 +15,6 @@ const (
ErrEmptyTimestampStageConfig = "timestamp stage config cannot be empty"
ErrTimestampSourceRequired = "timestamp source value is required if timestamp is specified"
ErrTimestampFormatRequired = "timestamp format is required"
ErrInvalidTimestampFormat = "failed to parse timestamp format: %s"
)
// TimestampConfig configures timestamp extraction

@ -6,7 +6,7 @@ import (
sd_config "github.com/prometheus/prometheus/discovery/config"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/grafana/loki/pkg/logentry"
"github.com/grafana/loki/pkg/logentry/stages"
"github.com/grafana/loki/pkg/promtail/api"
)
@ -14,7 +14,7 @@ import (
type Config struct {
JobName string `yaml:"job_name,omitempty"`
EntryParser api.EntryParser `yaml:"entry_parser"`
PipelineStages logentry.PipelineStages `yaml:"pipeline_stages,omitempty"`
PipelineStages stages.PipelineStages `yaml:"pipeline_stages,omitempty"`
RelabelConfigs []*relabel.Config `yaml:"relabel_configs,omitempty"`
ServiceDiscoveryConfig sd_config.ServiceDiscoveryConfig `yaml:",inline"`
}

@ -21,7 +21,6 @@ import (
"github.com/prometheus/prometheus/relabel"
"github.com/grafana/loki/pkg/helpers"
"github.com/grafana/loki/pkg/logentry"
"github.com/grafana/loki/pkg/logentry/stages"
"github.com/grafana/loki/pkg/promtail/api"
"github.com/grafana/loki/pkg/promtail/positions"
@ -77,7 +76,7 @@ func NewFileTargetManager(
config := map[string]sd_config.ServiceDiscoveryConfig{}
for _, cfg := range scrapeConfigs {
pipeline, err := logentry.NewPipeline(log.With(logger, "component", "pipeline"), cfg.PipelineStages, cfg.JobName)
pipeline, err := stages.NewPipeline(log.With(logger, "component", "pipeline"), cfg.PipelineStages, cfg.JobName)
if err != nil {
return nil, err
}
@ -87,14 +86,14 @@ func NewFileTargetManager(
switch cfg.EntryParser {
case api.CRI:
level.Warn(logger).Log("msg", "WARNING!!! entry_parser config is deprecated, please change to pipeline_stages")
cri, err := stages.NewCRI(logger)
cri, err := stages.NewCRI(logger, cfg.JobName)
if err != nil {
return nil, err
}
pipeline.AddStage(cri)
case api.Docker:
level.Warn(logger).Log("msg", "WARNING!!! entry_parser config is deprecated, please change to pipeline_stages")
docker, err := stages.NewDocker(logger)
docker, err := stages.NewDocker(logger, cfg.JobName)
if err != nil {
return nil, err
}

Loading…
Cancel
Save