Promtail: Drop stage (#2496)

* Adding a drop stage, it's very broken but want feedback on the design in the docs.

* documentation fixes

* documentation fixes

* implemented and added tests

* add a metric to track dropped logs with a configurable reason label.
pull/2500/head
Ed Welch 5 years ago committed by GitHub
parent 8ff8b42899
commit e69b64f6bf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      docs/sources/clients/promtail/pipelines.md
  2. 1
      docs/sources/clients/promtail/stages/_index.md
  3. 193
      docs/sources/clients/promtail/stages/drop.md
  4. 10
      docs/sources/clients/promtail/stages/match.md
  5. 233
      pkg/logentry/stages/drop.go
  6. 377
      pkg/logentry/stages/drop_test.go
  7. 26
      pkg/logentry/stages/match.go
  8. 1
      pkg/logentry/stages/match_test.go
  9. 22
      pkg/logentry/stages/pipeline.go
  10. 6
      pkg/logentry/stages/stage.go

@ -219,3 +219,4 @@ Action stages:
Filtering stages:
* [match](../stages/match/): Conditionally run stages based on the label set.
* [drop](../stages/drop/): Conditionally drop log lines based on several options.

@ -29,4 +29,5 @@ Action stages:
Filtering stages:
* [match](match/): Conditionally run stages based on the label set.
* [drop](drop/): Conditionally drop log lines based on several options.

@ -0,0 +1,193 @@
---
title: drop
---
# `drop` stage
The `drop` stage is a filtering stage that lets you drop logs based on several options.
It's important to note that if you provide multiple options they will be treated like an AND clause,
where each option has to be true to drop the log.
If you wish to drop with an OR clause, then specify multiple drop stages.
There are examples below to help explain.
## Drop stage schema
```yaml
drop:
# Name from extracted data to parse. If empty, uses the log message.
[source: <string>]
# RE2 regular expression, if source is provided the regex will attempt to match the source
# If no source is provided, then the regex attempts to match the log line
# If the provided regex matches the log line or a provided source, the line will be dropped.
[expression: <string>]
# value can only be specified when source is specified. It is an error to specify value and regex.
# If the value provided is an exact match for the `source` the line will be dropped.
[value: <string>]
# older_than will be parsed as a Go duration: https://golang.org/pkg/time/#ParseDuration
# If the log line timestamp is older than the current time minus the provided duration it will be dropped.
[older_than: <duration>]
# longer_than is a value in bytes, any log line longer than this value will be dropped.
# Can be specified as an exact number of bytes in integer format: 8192
# Or can be expressed with a suffix such as 8kb
[longer_than: <string>|<int>]
# Every time a log line is dropped the metric `logentry_dropped_lines_total`
# will be incremented. By default the reason label will be `drop_stage`
# however you can optionally specify a custom value to be used in the `reason`
# label of that metric here.
[drop_counter_reason: <string> | default = "drop_stage"]
```
## Examples
The following are examples showing the use of the `drop` stage.
### Simple drops
Simple `drop` stage configurations only specify one of the options, or two options when using the `source` option.
#### Regex match a line
Given the pipeline:
```yaml
- drop:
expression: ".*debug.*"
```
Would drop any log line with the word `debug` in it.
#### Regex match a source
Given the pipeline:
```yaml
- json:
expressions:
level:
msg:
- drop:
source: "level"
expression: "(error|ERROR)"
```
Would drop both of these log lines:
```
{"time":"2019-01-01T01:00:00.000000001Z", "level": "error", "msg":"11.11.11.11 - "POST /loki/api/push/ HTTP/1.1" 200 932 "-" "Mozilla/5.0 (Windows; U; Windows NT 5.1; de; rv:1.9.1.7) Gecko/20091221 Firefox/3.5.7 GTB6"}
{"time":"2019-01-01T01:00:00.000000001Z", "level": "ERROR", "msg":"11.11.11.11 - "POST /loki/api/push/ HTTP/1.1" 200 932 "-" "Mozilla/5.0 (Windows; U; Windows NT 5.1; de; rv:1.9.1.7) Gecko/20091221 Firefox/3.5.7 GTB6"}
```
#### Value match a source
Given the pipeline:
```yaml
- json:
expressions:
level:
msg:
- drop:
source: "level"
value: "error"
```
Would drop this log line:
```
{"time":"2019-01-01T01:00:00.000000001Z", "level": "error", "msg":"11.11.11.11 - "POST /loki/api/push/ HTTP/1.1" 200 932 "-" "Mozilla/5.0 (Windows; U; Windows NT 5.1; de; rv:1.9.1.7) Gecko/20091221 Firefox/3.5.7 GTB6"}
```
#### Drop old log lines
**NOTE** For `older_than` to work, you must be using the [timestamp](timestamp.md) stage to set the timestamp from the ingested log line _before_ applying the `drop` stage.
Given the pipeline:
```yaml
- json:
expressions:
time:
msg:
- timestamp:
source: time
format: RFC3339
- drop:
older_than: 24h
drop_counter_reason: "line_too_old"
```
With a current ingestion time of 2020-08-12T12:00:00Z would drop this log line when read from a file:
```
{"time":"2020-08-11T11:00:00Z", "level": "error", "msg":"11.11.11.11 - "POST /loki/api/push/ HTTP/1.1" 200 932 "-" "Mozilla/5.0 (Windows; U; Windows NT 5.1; de; rv:1.9.1.7) Gecko/20091221 Firefox/3.5.7 GTB6"}
```
However it would _not_ drop this log line:
```
{"time":"2020-08-11T13:00:00Z", "level": "error", "msg":"11.11.11.11 - "POST /loki/api/push/ HTTP/1.1" 200 932 "-" "Mozilla/5.0 (Windows; U; Windows NT 5.1; de; rv:1.9.1.7) Gecko/20091221 Firefox/3.5.7 GTB6"}
```
In this example the current time is 2020-08-12T12:00:00Z and `older_than` is 24h. All log lines which have a timestamp older than 2020-08-11T12:00:00Z will be dropped.
All lines dropped by this drop stage would also increment the `logentry_drop_lines_total` metric with a label `reason="line_too_old"`
#### Dropping long log lines
Given the pipeline:
```yaml
- drop:
longer_than: 8kb
drop_counter_reason: "line_too_long"
```
Would drop any log line longer than 8kb bytes, this is useful when Loki would reject a line for being too long.
All lines dropped by this drop stage would also increment the `logentry_drop_lines_total` metric with a label `reason="line_too_long"`
### Complex drops
Complex `drop` stage configurations specify multiple options in one stage or specify multiple drop stages
#### Drop logs by regex AND length
Given the pipeline:
```yaml
- drop:
expression: ".*debug.*"
longer_than: 1kb
```
Would drop all logs that contain the word _debug_ *AND* are longer than 1kb bytes
#### Drop logs by time OR length OR regex
Given the pipeline:
```yaml
- json:
expressions:
time:
msg:
- timestamp:
source: time
format: RFC3339
- drop:
older_than: 24h
- drop:
longer_than: 8kb
- drop:
source: msg
regex: ".*trace.*"
```
Would drop all logs older than 24h OR longer than 8kb bytes OR have a json `msg` field containing the word _trace_

@ -24,6 +24,12 @@ match:
# and no later metrics will be recorded.
# Stages must be not defined when dropping entries.
[action: <string> | default = "keep"]
# If you specify `action: drop` the metric `logentry_dropped_lines_total`
# will be incremented for every line dropped. By default the reason
# label will be `match_stage` however you can optionally specify a custom value
# to be used in the `reason` label of that metric here.
[drop_counter_reason: <string> | default = "match_stage"]
# Nested set of pipeline stages only if the selector
# matches the labels of the log entries:
@ -72,6 +78,7 @@ pipeline_stages:
- match:
selector: '{app="promtail"} |~ ".*noisy error.*"'
action: drop
drop_counter_reason: promtail_noisy_error
- output:
source: msg
```
@ -97,7 +104,8 @@ label of `app` whose value is `pokey`. This does **not** match in our case, so
the nested `json` stage is not ran.
The fifth stage will drop any entries from the application `promtail` that matches
the regex `.*noisy error`.
the regex `.*noisy error`. and will also increment the `logentry_drop_lines_total`
metric with a label `reason="promtail_noisy_error"`
The final `output` stage changes the contents of the log line to be the value of
`msg` from the extracted map. In this case, the log line is changed to `app1 log

@ -0,0 +1,233 @@
package stages
import (
"fmt"
"reflect"
"regexp"
"time"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/grafana/loki/pkg/util/flagext"
)
const (
ErrDropStageEmptyConfig = "drop stage config must contain at least one of `source`, `expression`, `older_than` or `longer_than`"
ErrDropStageInvalidDuration = "drop stage invalid duration, %v cannot be converted to a duration: %v"
ErrDropStageInvalidConfig = "drop stage config error, `value` and `expression` cannot both be defined at the same time."
ErrDropStageInvalidRegex = "drop stage regex compilation error: %v"
ErrDropStageInvalidByteSize = "drop stage failed to parse longer_than to bytes: %v"
)
var (
defaultDropReason = "drop_stage"
)
// DropConfig contains the configuration for a dropStage
type DropConfig struct {
DropReason *string `mapstructure:"drop_counter_reason"`
Source *string `mapstructure:"source"`
Value *string `mapstructure:"value"`
Expression *string `mapstructure:"expression"`
regex *regexp.Regexp
OlderThan *string `mapstructure:"older_than"`
olderThan time.Duration
LongerThan *string `mapstructure:"longer_than"`
longerThan flagext.ByteSize
}
// validateDropConfig validates the DropConfig for the dropStage
func validateDropConfig(cfg *DropConfig) error {
if cfg == nil ||
(cfg.Source == nil && cfg.Expression == nil && cfg.OlderThan == nil && cfg.LongerThan == nil) {
return errors.New(ErrDropStageEmptyConfig)
}
if cfg.DropReason == nil || *cfg.DropReason == "" {
cfg.DropReason = &defaultDropReason
}
if cfg.OlderThan != nil {
dur, err := time.ParseDuration(*cfg.OlderThan)
if err != nil {
return errors.Errorf(ErrDropStageInvalidDuration, *cfg.OlderThan, err)
}
cfg.olderThan = dur
}
if cfg.Value != nil && cfg.Expression != nil {
return errors.New(ErrDropStageInvalidConfig)
}
if cfg.Expression != nil {
expr, err := regexp.Compile(*cfg.Expression)
if err != nil {
return errors.Errorf(ErrDropStageInvalidRegex, err)
}
cfg.regex = expr
}
if cfg.LongerThan != nil {
err := cfg.longerThan.Set(*cfg.LongerThan)
if err != nil {
return errors.Errorf(ErrDropStageInvalidByteSize, err)
}
}
return nil
}
// newDropStage creates a DropStage from config
func newDropStage(logger log.Logger, config interface{}) (Stage, error) {
cfg := &DropConfig{}
err := mapstructure.WeakDecode(config, cfg)
if err != nil {
return nil, err
}
err = validateDropConfig(cfg)
if err != nil {
return nil, err
}
return &dropStage{
logger: log.With(logger, "component", "stage", "type", "drop"),
cfg: cfg,
}, nil
}
// dropStage applies Label matchers to determine if the include stages should be run
type dropStage struct {
logger log.Logger
cfg *DropConfig
}
// Process implements Stage
func (m *dropStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) {
// There are many options for dropping a log and if multiple are defined it's treated like an AND condition
// where all drop conditions must be met to drop the log.
// Therefore if at any point there is a condition which does not match we can return.
// The order is what I roughly think would be fastest check to slowest check to try to quit early whenever possible
if m.cfg.LongerThan != nil {
if len([]byte(*entry)) > m.cfg.longerThan.Val() {
// Too long, drop
if Debug {
level.Debug(m.logger).Log("msg", fmt.Sprintf("line met drop criteria for length %v > %v", len([]byte(*entry)), m.cfg.longerThan.Val()))
}
} else {
if Debug {
level.Debug(m.logger).Log("msg", fmt.Sprintf("line will not be dropped, it did not meet criteria for drop length %v is not greater than %v", len([]byte(*entry)), m.cfg.longerThan.Val()))
}
return
}
}
if m.cfg.OlderThan != nil {
ct := time.Now()
if t.Before(ct.Add(-m.cfg.olderThan)) {
// Too old, drop
if Debug {
level.Debug(m.logger).Log("msg", fmt.Sprintf("line met drop criteria for age; current time=%v, drop before=%v, log timestamp=%v", ct, ct.Add(-m.cfg.olderThan), t))
}
} else {
if Debug {
level.Debug(m.logger).Log("msg", fmt.Sprintf("line will not be dropped, it did not meet drop criteria for age; current time=%v, drop before=%v, log timestamp=%v", ct, ct.Add(-m.cfg.olderThan), t))
}
return
}
}
if m.cfg.Source != nil && m.cfg.Expression == nil {
if v, ok := extracted[*m.cfg.Source]; ok {
if m.cfg.Value == nil {
// Found in map, no value set meaning drop if found in map
if Debug {
level.Debug(m.logger).Log("msg", "line met drop criteria for finding source key in extracted map")
}
} else {
if *m.cfg.Value == v {
// Found in map with value set for drop
if Debug {
level.Debug(m.logger).Log("msg", "line met drop criteria for finding source key in extracted map with value matching desired drop value")
}
} else {
// Value doesn't match, don't drop
if Debug {
level.Debug(m.logger).Log("msg", fmt.Sprintf("line will not be dropped, source key was found in extracted map but value '%v' did not match desired value '%v'", v, *m.cfg.Value))
}
return
}
}
} else {
// Not found in extact map, don't drop
if Debug {
level.Debug(m.logger).Log("msg", "line will not be dropped, the provided source was not found in the extracted map")
}
return
}
}
if m.cfg.Expression != nil {
if m.cfg.Source != nil {
if v, ok := extracted[*m.cfg.Source]; ok {
s, err := getString(v)
if err != nil {
if Debug {
level.Debug(m.logger).Log("msg", "Failed to convert extracted map value to string, cannot test regex line will not be dropped.", "err", err, "type", reflect.TypeOf(v))
}
return
}
match := m.cfg.regex.FindStringSubmatch(s)
if match == nil {
// Not a match to the regex, don't drop
if Debug {
level.Debug(m.logger).Log("msg", fmt.Sprintf("line will not be dropped, the provided regular expression did not match the value found in the extracted map for source key: %v", *m.cfg.Source))
}
return
} else {
// regex match, will be dropped
if Debug {
level.Debug(m.logger).Log("msg", "line met drop criteria, regex matched the value in the extracted map source key")
}
}
} else {
// Not found in extact map, don't drop
if Debug {
level.Debug(m.logger).Log("msg", "line will not be dropped, the provided source was not found in the extracted map")
}
return
}
} else {
if entry != nil {
match := m.cfg.regex.FindStringSubmatch(*entry)
if match == nil {
// Not a match to the regex, don't drop
if Debug {
level.Debug(m.logger).Log("msg", "line will not be dropped, the provided regular expression did not match the log line")
}
return
} else {
if Debug {
level.Debug(m.logger).Log("msg", "line met drop criteria, the provided regular expression matched the log line")
}
}
} else {
// Not a match to entry was nil, do not drop
if Debug {
level.Debug(m.logger).Log("msg", "line will not be dropped, because it was nil and we can't regex match to nil")
}
return
}
}
}
// Everything matched, drop the line
if Debug {
level.Debug(m.logger).Log("msg", "all critera met, line will be dropped")
}
// Adds the drop label to not be sent by the api.EntryHandler
labels[dropLabel] = model.LabelValue(*m.cfg.DropReason)
}
// Name implements Stage
func (m *dropStage) Name() string {
return StageTypeDrop
}

@ -0,0 +1,377 @@
package stages
import (
"errors"
"fmt"
"testing"
"time"
"github.com/cortexproject/cortex/pkg/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
ww "github.com/weaveworks/common/server"
)
// Not all these are tested but are here to make sure the different types marshal without error
var testDropYaml = `
pipeline_stages:
- json:
expressions:
app:
msg:
- drop:
source: src
expression: ".*test.*"
older_than: 24h
longer_than: 8kb
- drop:
expression: ".*app1.*"
- drop:
source: app
value: loki
- drop:
longer_than: 10000
`
func Test_dropStage_Process(t *testing.T) {
// Enable debug logging
cfg := &ww.Config{}
cfg.LogLevel.Set("debug")
util.InitLogger(cfg)
Debug = true
tests := []struct {
name string
config *DropConfig
labels model.LabelSet
extracted map[string]interface{}
t *time.Time
entry *string
shouldDrop bool
}{
{
name: "Longer Than Should Drop",
config: &DropConfig{
LongerThan: ptrFromString("10b"),
},
labels: model.LabelSet{},
extracted: map[string]interface{}{},
t: nil,
entry: ptrFromString("12345678901"),
shouldDrop: true,
},
{
name: "Longer Than Should Not Drop When Equal",
config: &DropConfig{
LongerThan: ptrFromString("10b"),
},
labels: model.LabelSet{},
extracted: map[string]interface{}{},
t: nil,
entry: ptrFromString("1234567890"),
shouldDrop: false,
},
{
name: "Longer Than Should Not Drop When Less",
config: &DropConfig{
LongerThan: ptrFromString("10b"),
},
labels: model.LabelSet{},
extracted: map[string]interface{}{},
t: nil,
entry: ptrFromString("123456789"),
shouldDrop: false,
},
{
name: "Older than Should Drop",
config: &DropConfig{
OlderThan: ptrFromString("1h"),
},
labels: model.LabelSet{},
extracted: map[string]interface{}{},
t: ptrFromTime(time.Now().Add(-2 * time.Hour)),
entry: nil,
shouldDrop: true,
},
{
name: "Older than Should Not Drop",
config: &DropConfig{
OlderThan: ptrFromString("1h"),
},
labels: model.LabelSet{},
extracted: map[string]interface{}{},
t: ptrFromTime(time.Now().Add(-5 * time.Minute)),
entry: nil,
shouldDrop: false,
},
{
name: "Matched Source",
config: &DropConfig{
Source: ptrFromString("key"),
},
labels: model.LabelSet{},
extracted: map[string]interface{}{
"key": "",
},
shouldDrop: true,
},
{
name: "Did not match Source",
config: &DropConfig{
Source: ptrFromString("key1"),
},
labels: model.LabelSet{},
extracted: map[string]interface{}{
"key": "val1",
},
shouldDrop: false,
},
{
name: "Matched Source and Value",
config: &DropConfig{
Source: ptrFromString("key"),
Value: ptrFromString("val1"),
},
labels: model.LabelSet{},
extracted: map[string]interface{}{
"key": "val1",
},
shouldDrop: true,
},
{
name: "Did not match Source and Value",
config: &DropConfig{
Source: ptrFromString("key"),
Value: ptrFromString("val1"),
},
labels: model.LabelSet{},
extracted: map[string]interface{}{
"key": "VALRUE1",
},
shouldDrop: false,
},
{
name: "Regex Matched Source and Value",
config: &DropConfig{
Source: ptrFromString("key"),
Expression: ptrFromString(".*val.*"),
},
labels: model.LabelSet{},
extracted: map[string]interface{}{
"key": "val1",
},
shouldDrop: true,
},
{
name: "Regex Did not match Source and Value",
config: &DropConfig{
Source: ptrFromString("key"),
Expression: ptrFromString(".*val.*"),
},
labels: model.LabelSet{},
extracted: map[string]interface{}{
"key": "pal1",
},
shouldDrop: false,
},
{
name: "Regex No Matching Source",
config: &DropConfig{
Source: ptrFromString("key"),
Expression: ptrFromString(".*val.*"),
},
labels: model.LabelSet{},
extracted: map[string]interface{}{
"pokey": "pal1",
},
shouldDrop: false,
},
{
name: "Regex Did Not Match Line",
config: &DropConfig{
Expression: ptrFromString(".*val.*"),
},
labels: model.LabelSet{},
entry: ptrFromString("this is a line which does not match the regex"),
extracted: map[string]interface{}{},
shouldDrop: false,
},
{
name: "Regex Matched Line",
config: &DropConfig{
Expression: ptrFromString(".*val.*"),
},
labels: model.LabelSet{},
entry: ptrFromString("this is a line with the word value in it"),
extracted: map[string]interface{}{},
shouldDrop: true,
},
{
name: "Match Source and Length Both Match",
config: &DropConfig{
Source: ptrFromString("key"),
LongerThan: ptrFromString("10b"),
},
labels: model.LabelSet{},
extracted: map[string]interface{}{
"key": "pal1",
},
t: nil,
entry: ptrFromString("12345678901"),
shouldDrop: true,
},
{
name: "Match Source and Length Only First Matches",
config: &DropConfig{
Source: ptrFromString("key"),
LongerThan: ptrFromString("10b"),
},
labels: model.LabelSet{},
extracted: map[string]interface{}{
"key": "pal1",
},
t: nil,
entry: ptrFromString("123456789"),
shouldDrop: false,
},
{
name: "Match Source and Length Only Second Matches",
config: &DropConfig{
Source: ptrFromString("key"),
LongerThan: ptrFromString("10b"),
},
labels: model.LabelSet{},
extracted: map[string]interface{}{
"WOOOOOOOOOOOOOO": "pal1",
},
t: nil,
entry: ptrFromString("123456789012"),
shouldDrop: false,
},
{
name: "Everything Must Match",
config: &DropConfig{
Source: ptrFromString("key"),
Expression: ptrFromString(".*val.*"),
OlderThan: ptrFromString("1h"),
LongerThan: ptrFromString("10b"),
},
labels: model.LabelSet{},
extracted: map[string]interface{}{
"key": "must contain value to match",
},
t: ptrFromTime(time.Now().Add(-2 * time.Hour)),
entry: ptrFromString("12345678901"),
shouldDrop: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := validateDropConfig(tt.config)
if err != nil {
t.Error(err)
}
m := &dropStage{
cfg: tt.config,
logger: util.Logger,
}
m.Process(tt.labels, tt.extracted, tt.t, tt.entry)
if tt.shouldDrop {
assert.Contains(t, tt.labels.String(), dropLabel)
} else {
assert.NotContains(t, tt.labels.String(), dropLabel)
}
})
}
}
func ptrFromString(str string) *string {
return &str
}
func ptrFromTime(t time.Time) *time.Time {
return &t
}
// TestDropPipeline is used to verify we properly parse the yaml config and create a working pipeline
func TestDropPipeline(t *testing.T) {
registry := prometheus.NewRegistry()
plName := "test_pipeline"
pl, err := NewPipeline(util.Logger, loadConfig(testDropYaml), &plName, registry)
require.NoError(t, err)
lbls := model.LabelSet{}
ts := time.Now()
// Process the first log line which should be dropped
entry := testMatchLogLineApp1
extracted := map[string]interface{}{}
pl.Process(lbls, extracted, &ts, &entry)
assert.Contains(t, lbls.String(), dropLabel)
// Process the second line which should not be dropped.
entry = testMatchLogLineApp2
extracted = map[string]interface{}{}
lbls = model.LabelSet{}
pl.Process(lbls, extracted, &ts, &entry)
assert.NotContains(t, lbls.String(), dropLabel)
}
var (
dropInvalidDur = "10y"
dropVal = "msg"
dropRegex = ".*blah"
dropInvalidRegex = "(?P<ts[0-9]+).*"
dropInvalidByteSize = "23QB"
)
func Test_validateDropConfig(t *testing.T) {
tests := []struct {
name string
config *DropConfig
wantErr error
}{
{
name: "ErrEmpty",
config: &DropConfig{},
wantErr: errors.New(ErrDropStageEmptyConfig),
},
{
name: "Invalid Duration",
config: &DropConfig{
OlderThan: &dropInvalidDur,
},
wantErr: fmt.Errorf(ErrDropStageInvalidDuration, dropInvalidDur, "time: unknown unit y in duration 10y"),
},
{
name: "Invalid Config",
config: &DropConfig{
Value: &dropVal,
Expression: &dropRegex,
},
wantErr: errors.New(ErrDropStageInvalidConfig),
},
{
name: "Invalid Regex",
config: &DropConfig{
Expression: &dropInvalidRegex,
},
wantErr: fmt.Errorf(ErrDropStageInvalidRegex, "error parsing regexp: invalid named capture: `(?P<ts[0-9]+).*`"),
},
{
name: "Invalid Bytesize",
config: &DropConfig{
LongerThan: &dropInvalidByteSize,
},
wantErr: fmt.Errorf(ErrDropStageInvalidByteSize, "strconv.UnmarshalText: parsing \"23QB\": invalid syntax"),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if err := validateDropConfig(tt.config); ((err != nil) && (err.Error() != tt.wantErr.Error())) || (err == nil && tt.wantErr != nil) {
t.Errorf("validateDropConfig() error = %v, wantErr = %v", err, tt.wantErr)
}
})
}
}

@ -32,6 +32,7 @@ type MatcherConfig struct {
Selector string `mapstructure:"selector"`
Stages PipelineStages `mapstructure:"stages"`
Action string `mapstructure:"action"`
DropReason *string `mapstructure:"drop_counter_reason"`
}
// validateMatcherConfig validates the MatcherConfig for the matcherStage
@ -99,20 +100,27 @@ func newMatcherStage(logger log.Logger, jobName *string, config interface{}, reg
return nil, errors.Wrap(err, "error parsing filter")
}
dropReason := "match_stage"
if cfg.DropReason != nil && *cfg.DropReason != "" {
dropReason = *cfg.DropReason
}
return &matcherStage{
matchers: selector.Matchers(),
pipeline: pl,
action: cfg.Action,
filter: filter,
dropReason: dropReason,
matchers: selector.Matchers(),
pipeline: pl,
action: cfg.Action,
filter: filter,
}, nil
}
// matcherStage applies Label matchers to determine if the include stages should be run
type matcherStage struct {
matchers []*labels.Matcher
filter logql.LineFilter
pipeline Stage
action string
dropReason string
matchers []*labels.Matcher
filter logql.LineFilter
pipeline Stage
action string
}
// Process implements Stage
@ -126,7 +134,7 @@ func (m *matcherStage) Process(labels model.LabelSet, extracted map[string]inter
switch m.action {
case MatchActionDrop:
// Adds the drop label to not be sent by the api.EntryHandler
labels[dropLabel] = ""
labels[dropLabel] = model.LabelValue(m.dropReason)
case MatchActionKeep:
m.pipeline.Process(labels, extracted, t, entry)
}

@ -163,6 +163,7 @@ func TestMatcher(t *testing.T) {
tt.selector,
stages,
tt.action,
nil,
}
s, err := newMatcherStage(util.Logger, nil, matchConfig, prometheus.DefaultRegisterer)
if (err != nil) != tt.wantErr {

@ -26,6 +26,7 @@ type Pipeline struct {
stages []Stage
jobName *string
plDuration *prometheus.HistogramVec
dropCount *prometheus.CounterVec
}
// NewPipeline creates a new log entry pipeline from a configuration
@ -45,6 +46,20 @@ func NewPipeline(logger log.Logger, stgs PipelineStages, jobName *string, regist
panic(err)
}
}
dropCount := prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "logentry",
Name: "dropped_lines_total",
Help: "A count of all log lines dropped as a result of a pipeline stage",
}, []string{"reason"})
err = registerer.Register(dropCount)
if err != nil {
if existing, ok := err.(prometheus.AlreadyRegisteredError); ok {
dropCount = existing.ExistingCollector.(*prometheus.CounterVec)
} else {
// Same behavior as MustRegister if the error is not for AlreadyRegistered
panic(err)
}
}
st := []Stage{}
for _, s := range stgs {
@ -73,6 +88,7 @@ func NewPipeline(logger log.Logger, stgs PipelineStages, jobName *string, regist
stages: st,
jobName: jobName,
plDuration: hist,
dropCount: dropCount,
}, nil
}
@ -112,7 +128,11 @@ func (p *Pipeline) Wrap(next api.EntryHandler) api.EntryHandler {
extracted := map[string]interface{}{}
p.Process(labels, extracted, &timestamp, &line)
// if the labels set contains the __drop__ label we don't send this entry to the next EntryHandler
if _, ok := labels[dropLabel]; ok {
if reason, ok := labels[dropLabel]; ok {
if reason == "" {
reason = "undefined"
}
p.dropCount.WithLabelValues(string(reason)).Inc()
return nil
}
return next.Handle(labels, timestamp, line)

@ -23,6 +23,7 @@ const (
StageTypeTemplate = "template"
StageTypePipeline = "pipeline"
StageTypeTenant = "tenant"
StageTypeDrop = "drop"
)
// Stage takes an existing set of labels, timestamp and log entry and returns either a possibly mutated
@ -106,6 +107,11 @@ func New(logger log.Logger, jobName *string, stageType string,
if err != nil {
return nil, err
}
case StageTypeDrop:
s, err = newDropStage(logger, cfg)
if err != nil {
return nil, err
}
default:
return nil, errors.Errorf("Unknown stage type: %s", stageType)
}

Loading…
Cancel
Save