From 8a2081a39ea1ff0458376196f44324d63be982a5 Mon Sep 17 00:00:00 2001 From: Travis Patterson Date: Thu, 5 Aug 2021 01:37:24 -0600 Subject: [PATCH] Distributor: Truncate rather than drop log lines (#4051) * Truncate rather than drop logs - When log lines are longer than maxLineSize, truncate them to maxLineSize rather than dropping them Fixes: #3747 * Review feedback: - Rename 'max_line_size_should_truncate' to 'max_line_size_truncate' - Add configurable truncated indicator - Add Truncated Line metric - Changed truncated metrics to have the name mutated_{samples,bytes}_total * Linter * Review Feedback - Make metric variable names match metric names - Add benchmarks - reduce truncation allocs by 100x * Linter * Remove truncation indicator --- docs/sources/configuration/_index.md | 4 +++ pkg/distributor/distributor.go | 24 ++++++++++++++ pkg/distributor/distributor_test.go | 48 ++++++++++++++++++++++++++++ pkg/distributor/limits.go | 1 + pkg/distributor/validator.go | 5 ++- pkg/validation/limits.go | 7 ++++ pkg/validation/limits_test.go | 2 ++ pkg/validation/validate.go | 26 +++++++++++++-- 8 files changed, 113 insertions(+), 4 deletions(-) diff --git a/docs/sources/configuration/_index.md b/docs/sources/configuration/_index.md index 8dca784902..86ffec0445 100644 --- a/docs/sources/configuration/_index.md +++ b/docs/sources/configuration/_index.md @@ -1781,6 +1781,10 @@ logs in Loki. # CLI flag: -distributor.max-line-size [max_line_size: | default = none ] +# Truncate log lines when they exceed max_line_size. +# CLI flag: -distributor.max-line-size-truncate +[max_line_size_truncate: | default = false ] + # Maximum number of log entries that will be returned for a query. # CLI flag: -validation.max-entries-limit [max_entries_limit_per_query: | default = 5000 ] diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 6fc896ac0f..a914dae620 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -209,6 +209,9 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log validationContext := d.validator.getValidationContextFor(userID) for _, stream := range req.Streams { + // Truncate first so subsequent steps have consistent line lengths + d.truncateLines(validationContext, &stream) + stream.Labels, err = d.parseStreamLabels(validationContext, stream.Labels, &stream) if err != nil { validationErr = err @@ -220,6 +223,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log validation.DiscardedBytes.WithLabelValues(validation.InvalidLabels, userID).Add(float64(bytes)) continue } + n := 0 for _, entry := range stream.Entries { if err := d.validator.ValidateEntry(validationContext, stream.Labels, entry); err != nil { @@ -236,6 +240,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log if len(stream.Entries) == 0 { continue } + keys = append(keys, util.TokenFor(userID, stream.Labels)) streams = append(streams, streamTracker{ stream: stream, @@ -300,6 +305,25 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log } } +func (d *Distributor) truncateLines(vContext validationContext, stream *logproto.Stream) { + if !vContext.maxLineSizeTruncate { + return + } + + var truncatedSamples, truncatedBytes int + for i, e := range stream.Entries { + if maxSize := vContext.maxLineSize; maxSize != 0 && len(e.Line) > maxSize { + stream.Entries[i].Line = e.Line[:maxSize] + + truncatedSamples++ + truncatedBytes = len(e.Line) - maxSize + } + } + + validation.MutatedSamples.WithLabelValues(validation.LineTooLong, vContext.userID).Add(float64(truncatedSamples)) + validation.MutatedBytes.WithLabelValues(validation.LineTooLong, vContext.userID).Add(float64(truncatedBytes)) +} + // TODO taken from Cortex, see if we can refactor out an usable interface. func (d *Distributor) sendSamples(ctx context.Context, ingester ring.InstanceDesc, streamTrackers []*streamTracker, pushTracker *pushTracker) { err := d.sendSamplesErr(ctx, ingester, streamTrackers) diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index d7dfe2220a..6127ecd708 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -115,6 +115,29 @@ func Test_SortLabelsOnPush(t *testing.T) { require.Equal(t, `{a="b", buzz="f"}`, ingester.pushed[0].Streams[0].Labels) } +func Test_TruncateLogLines(t *testing.T) { + setup := func() (*validation.Limits, *mockIngester) { + limits := &validation.Limits{} + flagext.DefaultValues(limits) + + limits.EnforceMetricName = false + limits.MaxLineSize = 5 + limits.MaxLineSizeTruncate = true + return limits, &mockIngester{} + } + + t.Run("it truncates lines to MaxLineSize when MaxLineSizeTruncate is true", func(t *testing.T) { + limits, ingester := setup() + + d := prepare(t, limits, nil, func(addr string) (ring_client.PoolClient, error) { return ingester, nil }) + defer services.StopAndAwaitTerminated(context.Background(), d) //nolint:errcheck + + _, err := d.Push(ctx, makeWriteRequest(1, 10)) + require.NoError(t, err) + require.Len(t, ingester.pushed[0].Streams[0].Entries[0].Line, 5) + }) +} + func Benchmark_SortLabelsOnPush(b *testing.B) { limits := &validation.Limits{} flagext.DefaultValues(limits) @@ -162,6 +185,31 @@ func Benchmark_Push(b *testing.B) { } } +func Benchmark_PushWithLineTruncation(b *testing.B) { + limits := &validation.Limits{} + flagext.DefaultValues(limits) + + limits.IngestionRateMB = math.MaxInt32 + limits.MaxLineSizeTruncate = true + limits.MaxLineSize = 50 + + ingester := &mockIngester{} + d := prepare(&testing.T{}, limits, nil, func(addr string) (ring_client.PoolClient, error) { return ingester, nil }) + defer services.StopAndAwaitTerminated(context.Background(), d) //nolint:errcheck + request := makeWriteRequest(100000, 100) + + b.ResetTimer() + b.ReportAllocs() + + for n := 0; n < b.N; n++ { + + _, err := d.Push(ctx, request) + if err != nil { + require.NoError(b, err) + } + } +} + func TestDistributor_PushIngestionRateLimiter(t *testing.T) { type testPush struct { bytes int diff --git a/pkg/distributor/limits.go b/pkg/distributor/limits.go index 520cc514fe..559f4b5505 100644 --- a/pkg/distributor/limits.go +++ b/pkg/distributor/limits.go @@ -5,6 +5,7 @@ import "time" // Limits is an interface for distributor limits/related configs type Limits interface { MaxLineSize(userID string) int + MaxLineSizeTruncate(userID string) bool EnforceMetricName(userID string) bool MaxLabelNamesPerSeries(userID string) int MaxLabelNameLength(userID string) int diff --git a/pkg/distributor/validator.go b/pkg/distributor/validator.go index 5c6ab2374c..b095bb1fdd 100644 --- a/pkg/distributor/validator.go +++ b/pkg/distributor/validator.go @@ -28,7 +28,9 @@ type validationContext struct { rejectOldSample bool rejectOldSampleMaxAge int64 creationGracePeriod int64 - maxLineSize int + + maxLineSize int + maxLineSizeTruncate bool maxLabelNamesPerSeries int maxLabelNameLength int @@ -45,6 +47,7 @@ func (v Validator) getValidationContextFor(userID string) validationContext { rejectOldSampleMaxAge: now.Add(-v.RejectOldSamplesMaxAge(userID)).UnixNano(), creationGracePeriod: now.Add(v.CreationGracePeriod(userID)).UnixNano(), maxLineSize: v.MaxLineSize(userID), + maxLineSizeTruncate: v.MaxLineSizeTruncate(userID), maxLabelNamesPerSeries: v.MaxLabelNamesPerSeries(userID), maxLabelNameLength: v.MaxLabelNameLength(userID), maxLabelValueLength: v.MaxLabelValueLength(userID), diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index 6f6a27c1ec..03ad3670c0 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -39,6 +39,7 @@ type Limits struct { CreationGracePeriod model.Duration `yaml:"creation_grace_period" json:"creation_grace_period"` EnforceMetricName bool `yaml:"enforce_metric_name" json:"enforce_metric_name"` MaxLineSize flagext.ByteSize `yaml:"max_line_size" json:"max_line_size"` + MaxLineSizeTruncate bool `yaml:"max_line_size_truncate" json:"max_line_size_truncate"` // Ingester enforced limits. MaxLocalStreamsPerUser int `yaml:"max_streams_per_user" json:"max_streams_per_user"` @@ -88,6 +89,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.Float64Var(&l.IngestionRateMB, "distributor.ingestion-rate-limit-mb", 4, "Per-user ingestion rate limit in sample size per second. Units in MB.") f.Float64Var(&l.IngestionBurstSizeMB, "distributor.ingestion-burst-size-mb", 6, "Per-user allowed ingestion burst size (in sample size). Units in MB.") f.Var(&l.MaxLineSize, "distributor.max-line-size", "maximum line length allowed, i.e. 100mb. Default (0) means unlimited.") + f.BoolVar(&l.MaxLineSizeTruncate, "distributor.max-line-size-truncate", false, "Whether to truncate lines that exceed max_line_size") f.IntVar(&l.MaxLabelNameLength, "validation.max-length-label-name", 1024, "Maximum length accepted for label names") f.IntVar(&l.MaxLabelValueLength, "validation.max-length-label-value", 2048, "Maximum length accepted for label value. This setting also applies to the metric name") f.IntVar(&l.MaxLabelNamesPerSeries, "validation.max-label-names-per-series", 30, "Maximum number of label names per series.") @@ -335,6 +337,11 @@ func (o *Overrides) MaxLineSize(userID string) int { return o.getOverridesForUser(userID).MaxLineSize.Val() } +// MaxLineSizeShouldTruncate returns whether lines longer than max should be truncated. +func (o *Overrides) MaxLineSizeTruncate(userID string) bool { + return o.getOverridesForUser(userID).MaxLineSizeTruncate +} + // MaxEntriesLimitPerQuery returns the limit to number of entries the querier should return per query. func (o *Overrides) MaxEntriesLimitPerQuery(userID string) int { return o.getOverridesForUser(userID).MaxEntriesLimitPerQuery diff --git a/pkg/validation/limits_test.go b/pkg/validation/limits_test.go index 6a7a310184..9605bbd44f 100644 --- a/pkg/validation/limits_test.go +++ b/pkg/validation/limits_test.go @@ -44,6 +44,7 @@ reject_old_samples_max_age: 40s creation_grace_period: 50s enforce_metric_name: true max_line_size: 60 +max_line_size_truncate: true max_streams_per_user: 70 max_global_streams_per_user: 80 max_chunks_per_query: 90 @@ -76,6 +77,7 @@ per_tenant_override_period: 230s "creation_grace_period": "50s", "enforce_metric_name": true, "max_line_size": 60, + "max_line_size_truncate": true, "max_streams_per_user": 70, "max_global_streams_per_user": 80, "max_chunks_per_query": 90, diff --git a/pkg/validation/validate.go b/pkg/validation/validate.go index 409f3047c3..c8f2551807 100644 --- a/pkg/validation/validate.go +++ b/pkg/validation/validate.go @@ -5,7 +5,7 @@ import ( ) const ( - discardReasonLabel = "reason" + reasonLabel = "reason" // InvalidLabels is a reason for discarding log lines which have labels that cannot be parsed. InvalidLabels = "invalid_labels" MissingLabels = "missing_labels" @@ -43,6 +43,26 @@ const ( DuplicateLabelNamesErrorMsg = "stream '%s' has duplicate label name: '%s'" ) +// MutatedSamples is a metric of the total number of lines mutated, by reason. +var MutatedSamples = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "loki", + Name: "mutated_samples_total", + Help: "The total number of samples that have been mutated.", + }, + []string{reasonLabel, "truncated"}, +) + +// MutatedBytes is a metric of the total mutated bytes, by reason. +var MutatedBytes = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "loki", + Name: "mutated_bytes_total", + Help: "The total number of bytes that have been mutated.", + }, + []string{reasonLabel, "truncated"}, +) + // DiscardedBytes is a metric of the total discarded bytes, by reason. var DiscardedBytes = prometheus.NewCounterVec( prometheus.CounterOpts{ @@ -50,7 +70,7 @@ var DiscardedBytes = prometheus.NewCounterVec( Name: "discarded_bytes_total", Help: "The total number of bytes that were discarded.", }, - []string{discardReasonLabel, "tenant"}, + []string{reasonLabel, "tenant"}, ) // DiscardedSamples is a metric of the number of discarded samples, by reason. @@ -60,7 +80,7 @@ var DiscardedSamples = prometheus.NewCounterVec( Name: "discarded_samples_total", Help: "The total number of samples that were discarded.", }, - []string{discardReasonLabel, "tenant"}, + []string{reasonLabel, "tenant"}, ) func init() {