Distributor: Truncate rather than drop log lines (#4051)

* Truncate rather than drop logs

- When log lines are longer than maxLineSize, truncate them to maxLineSize rather than dropping them

Fixes: #3747

* Review feedback:

- Rename 'max_line_size_should_truncate' to 'max_line_size_truncate'
- Add configurable truncated indicator
- Add Truncated Line metric
- Changed truncated metrics to have the name mutated_{samples,bytes}_total

* Linter

* Review Feedback

- Make metric variable names match metric names
- Add benchmarks
- reduce truncation allocs by 100x

* Linter

* Remove truncation indicator
pull/4105/head
Travis Patterson 4 years ago committed by GitHub
parent c00af4d38a
commit 8a2081a39e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      docs/sources/configuration/_index.md
  2. 24
      pkg/distributor/distributor.go
  3. 48
      pkg/distributor/distributor_test.go
  4. 1
      pkg/distributor/limits.go
  5. 5
      pkg/distributor/validator.go
  6. 7
      pkg/validation/limits.go
  7. 2
      pkg/validation/limits_test.go
  8. 26
      pkg/validation/validate.go

@ -1781,6 +1781,10 @@ logs in Loki.
# CLI flag: -distributor.max-line-size
[max_line_size: <string> | default = none ]
# Truncate log lines when they exceed max_line_size.
# CLI flag: -distributor.max-line-size-truncate
[max_line_size_truncate: <boolean> | default = false ]
# Maximum number of log entries that will be returned for a query.
# CLI flag: -validation.max-entries-limit
[max_entries_limit_per_query: <int> | default = 5000 ]

@ -209,6 +209,9 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
validationContext := d.validator.getValidationContextFor(userID)
for _, stream := range req.Streams {
// Truncate first so subsequent steps have consistent line lengths
d.truncateLines(validationContext, &stream)
stream.Labels, err = d.parseStreamLabels(validationContext, stream.Labels, &stream)
if err != nil {
validationErr = err
@ -220,6 +223,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
validation.DiscardedBytes.WithLabelValues(validation.InvalidLabels, userID).Add(float64(bytes))
continue
}
n := 0
for _, entry := range stream.Entries {
if err := d.validator.ValidateEntry(validationContext, stream.Labels, entry); err != nil {
@ -236,6 +240,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
if len(stream.Entries) == 0 {
continue
}
keys = append(keys, util.TokenFor(userID, stream.Labels))
streams = append(streams, streamTracker{
stream: stream,
@ -300,6 +305,25 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
}
}
func (d *Distributor) truncateLines(vContext validationContext, stream *logproto.Stream) {
if !vContext.maxLineSizeTruncate {
return
}
var truncatedSamples, truncatedBytes int
for i, e := range stream.Entries {
if maxSize := vContext.maxLineSize; maxSize != 0 && len(e.Line) > maxSize {
stream.Entries[i].Line = e.Line[:maxSize]
truncatedSamples++
truncatedBytes = len(e.Line) - maxSize
}
}
validation.MutatedSamples.WithLabelValues(validation.LineTooLong, vContext.userID).Add(float64(truncatedSamples))
validation.MutatedBytes.WithLabelValues(validation.LineTooLong, vContext.userID).Add(float64(truncatedBytes))
}
// TODO taken from Cortex, see if we can refactor out an usable interface.
func (d *Distributor) sendSamples(ctx context.Context, ingester ring.InstanceDesc, streamTrackers []*streamTracker, pushTracker *pushTracker) {
err := d.sendSamplesErr(ctx, ingester, streamTrackers)

@ -115,6 +115,29 @@ func Test_SortLabelsOnPush(t *testing.T) {
require.Equal(t, `{a="b", buzz="f"}`, ingester.pushed[0].Streams[0].Labels)
}
func Test_TruncateLogLines(t *testing.T) {
setup := func() (*validation.Limits, *mockIngester) {
limits := &validation.Limits{}
flagext.DefaultValues(limits)
limits.EnforceMetricName = false
limits.MaxLineSize = 5
limits.MaxLineSizeTruncate = true
return limits, &mockIngester{}
}
t.Run("it truncates lines to MaxLineSize when MaxLineSizeTruncate is true", func(t *testing.T) {
limits, ingester := setup()
d := prepare(t, limits, nil, func(addr string) (ring_client.PoolClient, error) { return ingester, nil })
defer services.StopAndAwaitTerminated(context.Background(), d) //nolint:errcheck
_, err := d.Push(ctx, makeWriteRequest(1, 10))
require.NoError(t, err)
require.Len(t, ingester.pushed[0].Streams[0].Entries[0].Line, 5)
})
}
func Benchmark_SortLabelsOnPush(b *testing.B) {
limits := &validation.Limits{}
flagext.DefaultValues(limits)
@ -162,6 +185,31 @@ func Benchmark_Push(b *testing.B) {
}
}
func Benchmark_PushWithLineTruncation(b *testing.B) {
limits := &validation.Limits{}
flagext.DefaultValues(limits)
limits.IngestionRateMB = math.MaxInt32
limits.MaxLineSizeTruncate = true
limits.MaxLineSize = 50
ingester := &mockIngester{}
d := prepare(&testing.T{}, limits, nil, func(addr string) (ring_client.PoolClient, error) { return ingester, nil })
defer services.StopAndAwaitTerminated(context.Background(), d) //nolint:errcheck
request := makeWriteRequest(100000, 100)
b.ResetTimer()
b.ReportAllocs()
for n := 0; n < b.N; n++ {
_, err := d.Push(ctx, request)
if err != nil {
require.NoError(b, err)
}
}
}
func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
type testPush struct {
bytes int

@ -5,6 +5,7 @@ import "time"
// Limits is an interface for distributor limits/related configs
type Limits interface {
MaxLineSize(userID string) int
MaxLineSizeTruncate(userID string) bool
EnforceMetricName(userID string) bool
MaxLabelNamesPerSeries(userID string) int
MaxLabelNameLength(userID string) int

@ -28,7 +28,9 @@ type validationContext struct {
rejectOldSample bool
rejectOldSampleMaxAge int64
creationGracePeriod int64
maxLineSize int
maxLineSize int
maxLineSizeTruncate bool
maxLabelNamesPerSeries int
maxLabelNameLength int
@ -45,6 +47,7 @@ func (v Validator) getValidationContextFor(userID string) validationContext {
rejectOldSampleMaxAge: now.Add(-v.RejectOldSamplesMaxAge(userID)).UnixNano(),
creationGracePeriod: now.Add(v.CreationGracePeriod(userID)).UnixNano(),
maxLineSize: v.MaxLineSize(userID),
maxLineSizeTruncate: v.MaxLineSizeTruncate(userID),
maxLabelNamesPerSeries: v.MaxLabelNamesPerSeries(userID),
maxLabelNameLength: v.MaxLabelNameLength(userID),
maxLabelValueLength: v.MaxLabelValueLength(userID),

@ -39,6 +39,7 @@ type Limits struct {
CreationGracePeriod model.Duration `yaml:"creation_grace_period" json:"creation_grace_period"`
EnforceMetricName bool `yaml:"enforce_metric_name" json:"enforce_metric_name"`
MaxLineSize flagext.ByteSize `yaml:"max_line_size" json:"max_line_size"`
MaxLineSizeTruncate bool `yaml:"max_line_size_truncate" json:"max_line_size_truncate"`
// Ingester enforced limits.
MaxLocalStreamsPerUser int `yaml:"max_streams_per_user" json:"max_streams_per_user"`
@ -88,6 +89,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.Float64Var(&l.IngestionRateMB, "distributor.ingestion-rate-limit-mb", 4, "Per-user ingestion rate limit in sample size per second. Units in MB.")
f.Float64Var(&l.IngestionBurstSizeMB, "distributor.ingestion-burst-size-mb", 6, "Per-user allowed ingestion burst size (in sample size). Units in MB.")
f.Var(&l.MaxLineSize, "distributor.max-line-size", "maximum line length allowed, i.e. 100mb. Default (0) means unlimited.")
f.BoolVar(&l.MaxLineSizeTruncate, "distributor.max-line-size-truncate", false, "Whether to truncate lines that exceed max_line_size")
f.IntVar(&l.MaxLabelNameLength, "validation.max-length-label-name", 1024, "Maximum length accepted for label names")
f.IntVar(&l.MaxLabelValueLength, "validation.max-length-label-value", 2048, "Maximum length accepted for label value. This setting also applies to the metric name")
f.IntVar(&l.MaxLabelNamesPerSeries, "validation.max-label-names-per-series", 30, "Maximum number of label names per series.")
@ -335,6 +337,11 @@ func (o *Overrides) MaxLineSize(userID string) int {
return o.getOverridesForUser(userID).MaxLineSize.Val()
}
// MaxLineSizeShouldTruncate returns whether lines longer than max should be truncated.
func (o *Overrides) MaxLineSizeTruncate(userID string) bool {
return o.getOverridesForUser(userID).MaxLineSizeTruncate
}
// MaxEntriesLimitPerQuery returns the limit to number of entries the querier should return per query.
func (o *Overrides) MaxEntriesLimitPerQuery(userID string) int {
return o.getOverridesForUser(userID).MaxEntriesLimitPerQuery

@ -44,6 +44,7 @@ reject_old_samples_max_age: 40s
creation_grace_period: 50s
enforce_metric_name: true
max_line_size: 60
max_line_size_truncate: true
max_streams_per_user: 70
max_global_streams_per_user: 80
max_chunks_per_query: 90
@ -76,6 +77,7 @@ per_tenant_override_period: 230s
"creation_grace_period": "50s",
"enforce_metric_name": true,
"max_line_size": 60,
"max_line_size_truncate": true,
"max_streams_per_user": 70,
"max_global_streams_per_user": 80,
"max_chunks_per_query": 90,

@ -5,7 +5,7 @@ import (
)
const (
discardReasonLabel = "reason"
reasonLabel = "reason"
// InvalidLabels is a reason for discarding log lines which have labels that cannot be parsed.
InvalidLabels = "invalid_labels"
MissingLabels = "missing_labels"
@ -43,6 +43,26 @@ const (
DuplicateLabelNamesErrorMsg = "stream '%s' has duplicate label name: '%s'"
)
// MutatedSamples is a metric of the total number of lines mutated, by reason.
var MutatedSamples = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "loki",
Name: "mutated_samples_total",
Help: "The total number of samples that have been mutated.",
},
[]string{reasonLabel, "truncated"},
)
// MutatedBytes is a metric of the total mutated bytes, by reason.
var MutatedBytes = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "loki",
Name: "mutated_bytes_total",
Help: "The total number of bytes that have been mutated.",
},
[]string{reasonLabel, "truncated"},
)
// DiscardedBytes is a metric of the total discarded bytes, by reason.
var DiscardedBytes = prometheus.NewCounterVec(
prometheus.CounterOpts{
@ -50,7 +70,7 @@ var DiscardedBytes = prometheus.NewCounterVec(
Name: "discarded_bytes_total",
Help: "The total number of bytes that were discarded.",
},
[]string{discardReasonLabel, "tenant"},
[]string{reasonLabel, "tenant"},
)
// DiscardedSamples is a metric of the number of discarded samples, by reason.
@ -60,7 +80,7 @@ var DiscardedSamples = prometheus.NewCounterVec(
Name: "discarded_samples_total",
Help: "The total number of samples that were discarded.",
},
[]string{discardReasonLabel, "tenant"},
[]string{reasonLabel, "tenant"},
)
func init() {

Loading…
Cancel
Save