From 4f75939efbc7b76c67df88754492a819107f4d48 Mon Sep 17 00:00:00 2001 From: Ed Welch Date: Wed, 4 May 2022 15:52:55 -0400 Subject: [PATCH] Revert "Loki: Change live tailing to only allow mutating the log line not the number of streams. (#6063)" (#6097) This reverts commit 98fda9b0a063f5a4160802ccd8490f1aa18d3d54. --- CHANGELOG.md | 1 - docs/sources/upgrading/_index.md | 24 ------------- pkg/ingester/tailer.go | 59 ++++++++++++++++++-------------- 3 files changed, 34 insertions(+), 50 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 07d36e3493..dbbb274439 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -73,7 +73,6 @@ ##### Fixes * [5685](https://github.com/grafana/loki/pull/5685) **chaudum**: Assert that push values tuples consist of string values ##### Changes -* [6063](https://github.com/grafana/loki/pull/6063) **slim-bean**: Changes tailing API responses to not split a stream when using parsers in the query * [6042](https://github.com/grafana/loki/pull/6042) **slim-bean**: Add a new configuration to allow fudging of ingested timestamps to guarantee sort order of duplicate timestamps at query time. * [5777](https://github.com/grafana/loki/pull/5777) **tatchiuleung**: storage: make Azure blobID chunk delimiter configurable * [5650](https://github.com/grafana/loki/pull/5650) **cyriltovena**: Remove more chunkstore and schema version below v9 diff --git a/docs/sources/upgrading/_index.md b/docs/sources/upgrading/_index.md index 5dc4f8dfab..4b012c2b7b 100644 --- a/docs/sources/upgrading/_index.md +++ b/docs/sources/upgrading/_index.md @@ -31,30 +31,6 @@ The output is incredibly verbose as it shows the entire internal config struct u ## Main / Unreleased -### Loki - -#### Tail API no longer creates multiple streams when using parsers. - -We expect this change to be non-impactful however it is a breaking change to existing behavior. - -This change would likely only affect anyone who's doing machine to machine type work with Loki's tail API -and is expecting a parser in a query to alter the streams in a tail response. - -Prior to this change a tail request with a parser (e.g. json, logfmt, regexp, pattern) would split the -incoming log stream into multiple streams based on the extracted labels after running the parser. - -[PR 6063](https://github.com/grafana/loki/pull/6063) changes this behavior -to no longer split incoming streams when using a parser in the query, instead Loki will return exactly -the same streams with and without a parser in the query. - -We found a significant performance impact when using parsers on live tailing queries which would -result in turning a single stream with multiple entries into multiple streams with single entries. -Often leading to the case where the tailing client could not keep up with the number of streams -being pushed and tailing logs being dropped. - -This change will have no impact on viewing the tail output from Grafana or logcli. -Parsers can still be used to do filtering and reformatting of live tailed log lines. - ## 2.5.0 ### Loki diff --git a/pkg/ingester/tailer.go b/pkg/ingester/tailer.go index 4a16f155de..4d034d7acb 100644 --- a/pkg/ingester/tailer.go +++ b/pkg/ingester/tailer.go @@ -28,7 +28,8 @@ type tailer struct { id uint32 orgID string matchers []*labels.Matcher - expr syntax.LogSelectorExpr + pipeline syntax.Pipeline + expr syntax.Expr pipelineMtx sync.Mutex sendChan chan *logproto.Stream @@ -51,9 +52,7 @@ func newTailer(orgID, query string, conn TailServer, maxDroppedStreams int) (*ta if err != nil { return nil, err } - // Make sure we can build a pipeline. The stream processing code doesn't have a place to handle - // this error so make sure we handle it here. - _, err = expr.Pipeline() + pipeline, err := expr.Pipeline() if err != nil { return nil, err } @@ -62,6 +61,7 @@ func newTailer(orgID, query string, conn TailServer, maxDroppedStreams int) (*ta return &tailer{ orgID: orgID, matchers: matchers, + pipeline: pipeline, sendChan: make(chan *logproto.Stream, bufferSizeForTailResponse), conn: conn, droppedStreams: make([]*logproto.DroppedStream, 0, maxDroppedStreams), @@ -121,44 +121,53 @@ func (t *tailer) send(stream logproto.Stream, lbs labels.Labels) { return } - processed := t.processStream(stream, lbs) - select { - case t.sendChan <- processed: - default: - t.dropStream(stream) + streams := t.processStream(stream, lbs) + if len(streams) == 0 { + return + } + for _, s := range streams { + select { + case t.sendChan <- s: + default: + t.dropStream(*s) + } } } -func (t *tailer) processStream(stream logproto.Stream, lbs labels.Labels) *logproto.Stream { - // Build a new pipeline for each call because the pipeline builds a cache of labels - // and if we don't start with a new pipeline that cache will grow unbounded. - // The error is ignored because it would be handled in the constructor of the tailer. - pipeline, _ := t.expr.Pipeline() - +func (t *tailer) processStream(stream logproto.Stream, lbs labels.Labels) []*logproto.Stream { // Optimization: skip filtering entirely, if no filter is set - if log.IsNoopPipeline(pipeline) { - return &stream + if log.IsNoopPipeline(t.pipeline) { + return []*logproto.Stream{&stream} } // pipeline are not thread safe and tailer can process multiple stream at once. t.pipelineMtx.Lock() defer t.pipelineMtx.Unlock() - responseStream := &logproto.Stream{ - Labels: stream.Labels, - Entries: make([]logproto.Entry, 0, len(stream.Entries)), - } - sp := pipeline.ForStream(lbs) + streams := map[uint64]*logproto.Stream{} + + sp := t.pipeline.ForStream(lbs) for _, e := range stream.Entries { - newLine, _, ok := sp.ProcessString(e.Timestamp.UnixNano(), e.Line) + newLine, parsedLbs, ok := sp.ProcessString(e.Timestamp.UnixNano(), e.Line) if !ok { continue } - responseStream.Entries = append(responseStream.Entries, logproto.Entry{ + var stream *logproto.Stream + if stream, ok = streams[parsedLbs.Hash()]; !ok { + stream = &logproto.Stream{ + Labels: parsedLbs.String(), + } + streams[parsedLbs.Hash()] = stream + } + stream.Entries = append(stream.Entries, logproto.Entry{ Timestamp: e.Timestamp, Line: newLine, }) } - return responseStream + streamsResult := make([]*logproto.Stream, 0, len(streams)) + for _, stream := range streams { + streamsResult = append(streamsResult, stream) + } + return streamsResult } // isMatching returns true if lbs matches all matchers.