Promtail: improve behavior of partial lines (#9508)

**What this PR does / why we need it**:

I started to see OOM after upgrading to a promtail version with #8497.
So played around to make partial line behavior more stable. You can now
truncate partial lines and configure max partial lines buffer.

**Which issue(s) this PR fixes**:
Fixes #<issue number>

**Special notes for your reviewer**:

**Checklist**
- [ ] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- [x] Documentation added
- [x] Tests updated
- [x] `CHANGELOG.md` updated
- [ ] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/upgrading/_index.md`

Signed-off-by: Jan Jansen <jan.jansen@gdata.de>
Co-authored-by: Michel Hollands <42814411+MichelHollands@users.noreply.github.com>
pull/9707/head
Jan Jansen 2 years ago committed by GitHub
parent 14f96c0c7b
commit 8abe080fd6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 68
      clients/pkg/logentry/stages/extensions.go
  3. 46
      clients/pkg/logentry/stages/extensions_test.go
  4. 2
      clients/pkg/logentry/stages/stage.go
  5. 10
      docs/sources/clients/promtail/stages/cri.md

@ -62,6 +62,7 @@
* [8474](https://github.com/grafana/loki/pull/8787) **andriikushch**: Promtail: Add a new target for the Azure Event Hubs
* [8874](https://github.com/grafana/loki/pull/8874) **rfratto**: Promtail: Support expoential backoff when polling unchanged files for logs.
* [9508](https://github.com/grafana/loki/pull/9508) **farodin91**: Promtail: improve behavior of partial lines.
##### Fixes

@ -2,11 +2,15 @@ package stages
import (
"strings"
"sync"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/mitchellh/mapstructure"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/grafana/loki/pkg/util/flagext"
)
const (
@ -44,9 +48,10 @@ func NewDocker(logger log.Logger, registerer prometheus.Registerer) (Stage, erro
type cri struct {
// bounded buffer for CRI-O Partial logs lines (identified with tag `P` till we reach first `F`)
partialLines map[model.Fingerprint]Entry
maxPartialLines int
base *Pipeline
partialLines map[model.Fingerprint]Entry
cfg *CriConfig
base *Pipeline
lock sync.Mutex
}
// implement Stage interface
@ -63,16 +68,17 @@ func (c *cri) Run(entry chan Entry) chan Entry {
// We received partial-line (tag: "P")
if e.Extracted["flags"] == "P" {
if len(c.partialLines) > c.maxPartialLines {
if len(c.partialLines) >= c.cfg.MaxPartialLines {
// Merge existing partialLines
entries := make([]Entry, 0, len(c.partialLines))
for _, v := range c.partialLines {
entries = append(entries, v)
}
level.Warn(c.base.logger).Log("msg", "cri stage: partial lines upperbound exceeded. merging it to single line", "threshold", MaxPartialLinesSize)
level.Warn(c.base.logger).Log("msg", "cri stage: partial lines upperbound exceeded. merging it to single line", "threshold", c.cfg.MaxPartialLines)
c.partialLines = make(map[model.Fingerprint]Entry)
c.partialLines = make(map[model.Fingerprint]Entry, c.cfg.MaxPartialLines)
c.ensureTruncateIfRequired(&e)
c.partialLines[fingerprint] = e
return entries, false
@ -80,8 +86,12 @@ func (c *cri) Run(entry chan Entry) chan Entry {
prev, ok := c.partialLines[fingerprint]
if ok {
e.Line = strings.Join([]string{prev.Line, e.Line}, "")
var builder strings.Builder
builder.WriteString(prev.Line)
builder.WriteString(e.Line)
e.Line = builder.String()
}
c.ensureTruncateIfRequired(&e)
c.partialLines[fingerprint] = e
return []Entry{e}, true // it's a partial-line so skip it.
@ -92,7 +102,11 @@ func (c *cri) Run(entry chan Entry) chan Entry {
// 2. Else just return the full line.
prev, ok := c.partialLines[fingerprint]
if ok {
e.Line = strings.Join([]string{prev.Line, e.Line}, "")
var builder strings.Builder
builder.WriteString(prev.Line)
builder.WriteString(e.Line)
e.Line = builder.String()
c.ensureTruncateIfRequired(&e)
delete(c.partialLines, fingerprint)
}
return []Entry{e}, false
@ -101,8 +115,29 @@ func (c *cri) Run(entry chan Entry) chan Entry {
return in
}
func (c *cri) ensureTruncateIfRequired(e *Entry) {
if c.cfg.MaxPartialLineSizeTruncate && len(e.Line) > c.cfg.MaxPartialLineSize.Val() {
e.Line = e.Line[:c.cfg.MaxPartialLineSize.Val()]
}
}
// CriConfig contains the configuration for the cri stage
type CriConfig struct {
MaxPartialLines int `mapstructure:"max_partial_lines"`
MaxPartialLineSize flagext.ByteSize `mapstructure:"max_partial_line_size"`
MaxPartialLineSizeTruncate bool `mapstructure:"max_partial_line_size_truncate"`
}
// validateDropConfig validates the DropConfig for the dropStage
func validateCriConfig(cfg *CriConfig) error {
if cfg.MaxPartialLines == 0 {
cfg.MaxPartialLines = MaxPartialLinesSize
}
return nil
}
// NewCRI creates a CRI format specific pipeline stage
func NewCRI(logger log.Logger, registerer prometheus.Registerer) (Stage, error) {
func NewCRI(logger log.Logger, config interface{}, registerer prometheus.Registerer) (Stage, error) {
base := PipelineStages{
PipelineStage{
StageTypeRegex: RegexConfig{
@ -137,10 +172,19 @@ func NewCRI(logger log.Logger, registerer prometheus.Registerer) (Stage, error)
return nil, err
}
cfg := &CriConfig{}
if err := mapstructure.WeakDecode(config, cfg); err != nil {
return nil, err
}
if err = validateCriConfig(cfg); err != nil {
return nil, err
}
c := cri{
maxPartialLines: MaxPartialLinesSize,
base: p,
cfg: cfg,
base: p,
}
c.partialLines = make(map[model.Fingerprint]Entry)
c.partialLines = make(map[model.Fingerprint]Entry, c.cfg.MaxPartialLines)
return &c, nil
}

@ -96,12 +96,14 @@ type testEntry struct {
func TestCRI_tags(t *testing.T) {
cases := []struct {
name string
lines []string
expected []string
maxPartialLines int
entries []testEntry
err error
name string
lines []string
expected []string
maxPartialLines int
maxPartialLineSize int
maxPartialLineSizeTruncate bool
entries []testEntry
err error
}{
{
name: "tag F",
@ -137,7 +139,7 @@ func TestCRI_tags(t *testing.T) {
{line: "2019-05-07T18:57:55.904275087+00:00 stdout F another full log", labels: model.LabelSet{"label1": "val3"}},
{line: "2019-05-07T18:57:55.904275087+00:00 stdout F yet an another full log", labels: model.LabelSet{"label1": "val4"}},
},
maxPartialLines: 2,
maxPartialLines: 3,
expected: []string{
"partial line 1 partial line 3 ",
"partial line 2 ",
@ -163,20 +165,35 @@ func TestCRI_tags(t *testing.T) {
"another full log",
},
},
{
name: "tag P multi-stream with truncation",
entries: []testEntry{
{line: "2019-05-07T18:57:50.904275087+00:00 stdout P partial line 1 ", labels: model.LabelSet{"foo": "bar"}},
{line: "2019-05-07T18:57:50.904275087+00:00 stdout P partial", labels: model.LabelSet{"foo": "bar2"}},
{line: "2019-05-07T18:57:55.904275087+00:00 stdout F log finished", labels: model.LabelSet{"foo": "bar"}},
{line: "2019-05-07T18:57:55.904275087+00:00 stdout F full", labels: model.LabelSet{"foo": "bar2"}},
},
maxPartialLineSizeTruncate: true,
maxPartialLineSize: 11,
expected: []string{
"partial lin",
"partialfull",
},
},
}
for _, tt := range cases {
t.Run(tt.name, func(t *testing.T) {
p, err := NewCRI(util_log.Logger, prometheus.DefaultRegisterer)
cfg := map[string]interface{}{
"max_partial_lines": tt.maxPartialLines,
"max_partial_line_size": tt.maxPartialLineSize,
"max_partial_line_size_truncate": tt.maxPartialLineSizeTruncate,
}
p, err := NewCRI(util_log.Logger, cfg, prometheus.DefaultRegisterer)
require.NoError(t, err)
got := make([]string, 0)
// tweak `maxPartialLines`
if tt.maxPartialLines != 0 {
p.(*cri).maxPartialLines = tt.maxPartialLines
}
for _, entry := range tt.entries {
out := processEntries(p, newEntry(nil, entry.labels, entry.line, time.Now()))
if len(out) > 0 {
@ -254,7 +271,8 @@ func TestNewCri(t *testing.T) {
tt := tt
t.Run(tName, func(t *testing.T) {
t.Parallel()
p, err := NewCRI(util_log.Logger, prometheus.DefaultRegisterer)
cfg := map[string]interface{}{}
p, err := NewCRI(util_log.Logger, cfg, prometheus.DefaultRegisterer)
if err != nil {
t.Fatalf("failed to create CRI parser: %s", err)
}

@ -119,7 +119,7 @@ func New(logger log.Logger, jobName *string, stageType string,
return nil, err
}
case StageTypeCRI:
s, err = NewCRI(logger, registerer)
s, err = NewCRI(logger, cfg, registerer)
if err != nil {
return nil, err
}

@ -9,7 +9,15 @@ The `cri` stage is a parsing stage that reads the log line using the standard CR
## Schema
```yaml
cri: {}
cri:
# Max buffer size to hold partial lines.
[max_partial_lines: <int> | default = 100]
# Max line size to hold a single partial line, if max_partial_line_size_truncate is true. Example: 262144.
[max_partial_line_size: <int> | default = 0]
# Allows to pretruncate partial lines before storing in partial buffer.
[max_partial_line_size_truncate: <bool> | default = false]
```
Unlike most stages, the `cri` stage provides no configuration options and only

Loading…
Cancel
Save