From 9be3c0863e28ade5c060fa09c5efc8e397da2f19 Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Tue, 31 Oct 2023 14:01:01 +0100 Subject: [PATCH] Support categorized labels in Tailing (#11079) **What this PR does / why we need it**: This is a follow-up PR for https://github.com/grafana/loki/pull/10419 adding support for tailing. I tested it on a dev cell and works fine. image **Note**: With these changes, the JSON marshal unmarshal functions for the tail are no longer used ([example][1]) so I think we can remove them. Also, the new Tail response is no longer used, so we can also make it an alias to the _legacy_ one. Let's do it on a follow-up PR to avoid making this one bigger. [1]: https://github.com/grafana/loki/blob/52a3f16039dd5ff655fc3681257d99794f620ec4/pkg/loghttp/entry.go#L210-L238 --- pkg/ingester/tailer.go | 8 +- pkg/ingester/tailer_test.go | 144 ++++++++++++- pkg/iter/categorized_labels_iterator.go | 71 +++++++ pkg/iter/categorized_labels_iterator_test.go | 145 +++++++++++++ pkg/logql/engine.go | 33 +-- pkg/querier/http.go | 11 +- pkg/querier/querier.go | 5 +- pkg/querier/querier_mock_test.go | 2 +- pkg/querier/querier_test.go | 4 +- pkg/querier/tail.go | 27 ++- pkg/querier/tail_test.go | 213 ++++++++++++++++++- pkg/util/httpreq/encoding_flags.go | 20 +- pkg/util/marshal/marshal.go | 36 +++- pkg/util/marshal/marshal_test.go | 105 ++++++++- pkg/util/marshal/query.go | 58 ++++- pkg/util/marshal/tail.go | 26 --- pkg/util/unmarshal/unmarshal_test.go | 3 +- 17 files changed, 803 insertions(+), 108 deletions(-) create mode 100644 pkg/iter/categorized_labels_iterator.go create mode 100644 pkg/iter/categorized_labels_iterator_test.go diff --git a/pkg/ingester/tailer.go b/pkg/ingester/tailer.go index 106fe25bbf..72e7026e81 100644 --- a/pkg/ingester/tailer.go +++ b/pkg/ingester/tailer.go @@ -151,7 +151,7 @@ func (t *tailer) processStream(stream logproto.Stream, lbs labels.Labels) []*log sp := t.pipeline.ForStream(lbs) for _, e := range stream.Entries { - newLine, parsedLbs, ok := sp.ProcessString(e.Timestamp.UnixNano(), e.Line) + newLine, parsedLbs, ok := sp.ProcessString(e.Timestamp.UnixNano(), e.Line, logproto.FromLabelAdaptersToLabels(e.StructuredMetadata)...) if !ok { continue } @@ -163,8 +163,10 @@ func (t *tailer) processStream(stream logproto.Stream, lbs labels.Labels) []*log streams[parsedLbs.Hash()] = stream } stream.Entries = append(stream.Entries, logproto.Entry{ - Timestamp: e.Timestamp, - Line: newLine, + Timestamp: e.Timestamp, + Line: newLine, + StructuredMetadata: logproto.FromLabelsToLabelAdapters(parsedLbs.StructuredMetadata()), + Parsed: logproto.FromLabelsToLabelAdapters(parsedLbs.Parsed()), }) } streamsResult := make([]*logproto.Stream, 0, len(streams)) diff --git a/pkg/ingester/tailer_test.go b/pkg/ingester/tailer_test.go index 9b06e45600..5929335203 100644 --- a/pkg/ingester/tailer_test.go +++ b/pkg/ingester/tailer_test.go @@ -93,10 +93,25 @@ func Test_dropstream(t *testing.T) { } } -type fakeTailServer struct{} +type fakeTailServer struct { + responses []logproto.TailResponse +} + +func (f *fakeTailServer) Send(response *logproto.TailResponse) error { + f.responses = append(f.responses, *response) + return nil + +} + +func (f *fakeTailServer) Context() context.Context { return context.Background() } -func (f *fakeTailServer) Send(*logproto.TailResponse) error { return nil } -func (f *fakeTailServer) Context() context.Context { return context.Background() } +func (f *fakeTailServer) GetResponses() []logproto.TailResponse { + return f.responses +} + +func (f *fakeTailServer) Reset() { + f.responses = f.responses[:0] +} func Test_TailerSendRace(t *testing.T) { tail, err := newTailer("foo", `{app="foo"} |= "foo"`, &fakeTailServer{}, 10) @@ -137,3 +152,126 @@ func Test_IsMatching(t *testing.T) { }) } } + +func Test_StructuredMetadata(t *testing.T) { + lbs := makeRandomLabels() + + for _, tc := range []struct { + name string + query string + sentStream logproto.Stream + expectedResponses []logproto.TailResponse + }{ + { + // Optimization will make the same stream to be returned regardless of structured metadata. + name: "noop pipeline", + query: `{app="foo"}`, + sentStream: logproto.Stream{ + Labels: lbs.String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(0, 1), + Line: "foo=1", + }, + { + Timestamp: time.Unix(0, 2), + Line: "foo=2", + StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")), + }, + }, + }, + expectedResponses: []logproto.TailResponse{ + { + Stream: &logproto.Stream{ + Labels: lbs.String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(0, 1), + Line: "foo=1", + }, + { + Timestamp: time.Unix(0, 2), + Line: "foo=2", + StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")), + }, + }, + }, + DroppedStreams: nil, + }, + }, + }, + { + name: "parse pipeline labels", + query: `{app="foo"} | logfmt`, + sentStream: logproto.Stream{ + Labels: lbs.String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(0, 1), + Line: "foo=1", + }, + { + Timestamp: time.Unix(0, 2), + Line: "foo=2", + StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")), + }, + }, + }, + expectedResponses: []logproto.TailResponse{ + { + Stream: &logproto.Stream{ + Labels: labels.NewBuilder(lbs).Set("foo", "1").Labels().String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(0, 1), + Line: "foo=1", + Parsed: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "1")), + }, + }, + }, + DroppedStreams: nil, + }, + { + Stream: &logproto.Stream{ + Labels: labels.NewBuilder(lbs).Set("traceID", "123").Set("foo", "2").Labels().String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(0, 2), + Line: "foo=2", + StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")), + Parsed: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "2")), + }, + }, + }, + DroppedStreams: nil, + }, + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + var server fakeTailServer + tail, err := newTailer("foo", tc.query, &server, 10) + require.NoError(t, err) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + tail.loop() + wg.Done() + }() + + tail.send(tc.sentStream, lbs) + + // Wait for the stream to be received by the server. + require.Eventually(t, func() bool { + return len(server.GetResponses()) > 0 + }, 30*time.Second, 1*time.Second, "stream was not received") + + responses := server.GetResponses() + require.ElementsMatch(t, tc.expectedResponses, responses) + + tail.close() + wg.Wait() + }) + } +} diff --git a/pkg/iter/categorized_labels_iterator.go b/pkg/iter/categorized_labels_iterator.go new file mode 100644 index 0000000000..1e95cad09a --- /dev/null +++ b/pkg/iter/categorized_labels_iterator.go @@ -0,0 +1,71 @@ +package iter + +import ( + "fmt" + + "github.com/prometheus/prometheus/model/labels" + + "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/logql/syntax" +) + +type categorizeLabelsIterator struct { + EntryIterator + currEntry logproto.Entry + currStreamLabels string + currHash uint64 + currErr error +} + +func NewCategorizeLabelsIterator(wrap EntryIterator) EntryIterator { + return &categorizeLabelsIterator{ + EntryIterator: wrap, + } +} + +func (c *categorizeLabelsIterator) Next() bool { + if !c.EntryIterator.Next() { + return false + } + + c.currEntry = c.Entry() + if len(c.currEntry.StructuredMetadata) == 0 && len(c.currEntry.Parsed) == 0 { + c.currStreamLabels = c.EntryIterator.Labels() + c.currHash = c.EntryIterator.StreamHash() + return true + } + + // We need to remove the structured metadata labels and parsed labels from the stream labels. + streamLabels := c.EntryIterator.Labels() + lbls, err := syntax.ParseLabels(streamLabels) + if err != nil { + c.currErr = fmt.Errorf("failed to parse series labels to categorize labels: %w", err) + return false + } + + builder := labels.NewBuilder(lbls) + for _, label := range c.currEntry.StructuredMetadata { + builder.Del(label.Name) + } + for _, label := range c.currEntry.Parsed { + builder.Del(label.Name) + } + + newLabels := builder.Labels() + c.currStreamLabels = newLabels.String() + c.currHash = newLabels.Hash() + + return true +} + +func (c *categorizeLabelsIterator) Error() error { + return c.currErr +} + +func (c *categorizeLabelsIterator) Labels() string { + return c.currStreamLabels +} + +func (c *categorizeLabelsIterator) StreamHash() uint64 { + return c.currHash +} diff --git a/pkg/iter/categorized_labels_iterator_test.go b/pkg/iter/categorized_labels_iterator_test.go new file mode 100644 index 0000000000..18259edfbf --- /dev/null +++ b/pkg/iter/categorized_labels_iterator_test.go @@ -0,0 +1,145 @@ +package iter + +import ( + "testing" + "time" + + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/pkg/logproto" +) + +func TestNewCategorizeLabelsIterator(t *testing.T) { + for _, tc := range []struct { + name string + inner EntryIterator + expectedStreams []logproto.Stream + }{ + { + name: "no structured metadata nor parsed labels", + inner: NewSortEntryIterator([]EntryIterator{ + NewStreamIterator(logproto.Stream{ + Labels: labels.FromStrings("namespace", "default").String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(0, 1), + Line: "foo=1", + }, + { + Timestamp: time.Unix(0, 2), + Line: "foo=2", + }, + }, + }), + }, logproto.FORWARD), + expectedStreams: []logproto.Stream{ + { + Labels: labels.FromStrings("namespace", "default").String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(0, 1), + Line: "foo=1", + }, + { + Timestamp: time.Unix(0, 2), + Line: "foo=2", + }, + }, + }, + }, + }, + { + name: "structured metadata and parsed labels", + inner: NewSortEntryIterator([]EntryIterator{ + NewStreamIterator(logproto.Stream{ + Labels: labels.FromStrings("namespace", "default").String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(0, 1), + Line: "foo=1", + }, + }, + }), + NewStreamIterator(logproto.Stream{ + Labels: labels.FromStrings("namespace", "default", "traceID", "123").String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(0, 2), + Line: "foo=2", + StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")), + }, + }, + }), + NewStreamIterator(logproto.Stream{ + Labels: labels.FromStrings("namespace", "default", "foo", "3").String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(0, 3), + Line: "foo=3", + Parsed: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "3")), + }, + }, + }), + NewStreamIterator(logproto.Stream{ + Labels: labels.FromStrings("namespace", "default", "traceID", "123", "foo", "4").String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(0, 4), + Line: "foo=4", + StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")), + Parsed: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "4")), + }, + }, + }), + }, logproto.FORWARD), + expectedStreams: []logproto.Stream{ + { + Labels: labels.FromStrings("namespace", "default").String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(0, 1), + Line: "foo=1", + }, + { + Timestamp: time.Unix(0, 2), + Line: "foo=2", + StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")), + }, + { + Timestamp: time.Unix(0, 3), + Line: "foo=3", + Parsed: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "3")), + }, + { + Timestamp: time.Unix(0, 4), + Line: "foo=4", + StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")), + Parsed: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "4")), + }, + }, + }, + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + itr := NewCategorizeLabelsIterator(tc.inner) + + streamsEntries := make(map[string][]logproto.Entry) + for itr.Next() { + streamsEntries[itr.Labels()] = append(streamsEntries[itr.Labels()], itr.Entry()) + require.NoError(t, itr.Error()) + } + + var streams []logproto.Stream + for lbls, entries := range streamsEntries { + streams = append(streams, logproto.Stream{ + Labels: lbls, + Entries: entries, + }) + } + + require.ElementsMatch(t, tc.expectedStreams, streams) + }) + } +} diff --git a/pkg/logql/engine.go b/pkg/logql/engine.go index 8bdcfe8501..1edf86da3e 100644 --- a/pkg/logql/engine.go +++ b/pkg/logql/engine.go @@ -287,16 +287,18 @@ func (q *query) Eval(ctx context.Context) (promql_parser.Value, error) { return value, err case syntax.LogSelectorExpr: - iter, err := q.evaluator.NewIterator(ctx, e, q.params) + itr, err := q.evaluator.NewIterator(ctx, e, q.params) if err != nil { return nil, err } encodingFlags := httpreq.ExtractEncodingFlagsFromCtx(ctx) - categorizeLabels := encodingFlags.Has(httpreq.FlagCategorizeLabels) + if encodingFlags.Has(httpreq.FlagCategorizeLabels) { + itr = iter.NewCategorizeLabelsIterator(itr) + } - defer util.LogErrorWithContext(ctx, "closing iterator", iter.Close) - streams, err := readStreams(iter, q.params.Limit(), q.params.Direction(), q.params.Interval(), categorizeLabels) + defer util.LogErrorWithContext(ctx, "closing iterator", itr.Close) + streams, err := readStreams(itr, q.params.Limit(), q.params.Direction(), q.params.Interval(), true) return streams, err default: return nil, fmt.Errorf("unexpected type (%T): cannot evaluate", e) @@ -513,7 +515,7 @@ func readStreams(i iter.EntryIterator, size uint32, dir logproto.Direction, inte // value here because many unit tests start at time.Unix(0,0) lastEntry := lastEntryMinTime for respSize < size && i.Next() { - entry := i.Entry() + streamLabels, entry := i.Labels(), i.Entry() forwardShouldOutput := dir == logproto.FORWARD && (entry.Timestamp.Equal(lastEntry.Add(interval)) || entry.Timestamp.After(lastEntry.Add(interval))) @@ -524,27 +526,6 @@ func readStreams(i iter.EntryIterator, size uint32, dir logproto.Direction, inte // If lastEntry.Unix < 0 this is the first pass through the loop and we should output the line. // Then check to see if the entry is equal to, or past a forward or reverse step if interval == 0 || lastEntry.Unix() < 0 || forwardShouldOutput || backwardShouldOutput { - streamLabels := i.Labels() - - // If categorizeLabels is true, We need to remove the structured metadata labels and parsed labels from the stream labels. - // TODO(salvacorts): If this is too slow, provided this is in the hot path, we can consider doing this in the iterator. - if categorizeLabels && (len(entry.StructuredMetadata) > 0 || len(entry.Parsed) > 0) { - lbls, err := syntax.ParseLabels(streamLabels) - if err != nil { - return nil, fmt.Errorf("failed to parse series labels to categorize labels: %w", err) - } - - builder := labels.NewBuilder(lbls) - for _, label := range entry.StructuredMetadata { - builder.Del(label.Name) - } - for _, label := range entry.Parsed { - builder.Del(label.Name) - } - - streamLabels = builder.Labels().String() - } - stream, ok := streams[streamLabels] if !ok { stream = &logproto.Stream{ diff --git a/pkg/querier/http.go b/pkg/querier/http.go index 3bf777659a..b6ba4750ae 100644 --- a/pkg/querier/http.go +++ b/pkg/querier/http.go @@ -145,6 +145,9 @@ func (q *QuerierAPI) TailHandler(w http.ResponseWriter, r *http.Request) { return } + encodingFlags := httpreq.ExtractEncodingFlags(r) + version := loghttp.GetVersion(r.RequestURI) + conn, err := upgrader.Upgrade(w, r, nil) if err != nil { level.Error(logger).Log("msg", "Error in upgrading websocket", "err", err) @@ -163,7 +166,7 @@ func (q *QuerierAPI) TailHandler(w http.ResponseWriter, r *http.Request) { } }() - tailer, err := q.querier.Tail(r.Context(), req) + tailer, err := q.querier.Tail(r.Context(), req, encodingFlags.Has(httpreq.FlagCategorizeLabels)) if err != nil { if err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())); err != nil { level.Error(logger).Log("msg", "Error connecting to ingesters for tailing", "err", err) @@ -179,6 +182,8 @@ func (q *QuerierAPI) TailHandler(w http.ResponseWriter, r *http.Request) { ticker := time.NewTicker(wsPingPeriod) defer ticker.Stop() + connWriter := marshal.NewWebsocketJSONWriter(conn) + var response *loghttp_legacy.TailResponse responseChan := tailer.getResponseChan() closeErrChan := tailer.getCloseErrorChan() @@ -209,8 +214,8 @@ func (q *QuerierAPI) TailHandler(w http.ResponseWriter, r *http.Request) { select { case response = <-responseChan: var err error - if loghttp.GetVersion(r.RequestURI) == loghttp.VersionV1 { - err = marshal.WriteTailResponseJSON(*response, conn) + if version == loghttp.VersionV1 { + err = marshal.WriteTailResponseJSON(*response, connWriter, encodingFlags) } else { err = marshal_legacy.WriteTailResponseJSON(*response, conn) } diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 8295f02c64..23e2c89f5e 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -88,7 +88,7 @@ type Querier interface { logql.Querier Label(ctx context.Context, req *logproto.LabelRequest) (*logproto.LabelResponse, error) Series(ctx context.Context, req *logproto.SeriesRequest) (*logproto.SeriesResponse, error) - Tail(ctx context.Context, req *logproto.TailRequest) (*Tailer, error) + Tail(ctx context.Context, req *logproto.TailRequest, categorizedLabels bool) (*Tailer, error) IndexStats(ctx context.Context, req *loghttp.RangeQuery) (*stats.Stats, error) Volume(ctx context.Context, req *logproto.VolumeRequest) (*logproto.VolumeResponse, error) } @@ -434,7 +434,7 @@ func (*SingleTenantQuerier) Check(_ context.Context, _ *grpc_health_v1.HealthChe } // Tail keeps getting matching logs from all ingesters for given query -func (q *SingleTenantQuerier) Tail(ctx context.Context, req *logproto.TailRequest) (*Tailer, error) { +func (q *SingleTenantQuerier) Tail(ctx context.Context, req *logproto.TailRequest, categorizedLabels bool) (*Tailer, error) { err := q.checkTailRequestLimit(ctx) if err != nil { return nil, err @@ -496,6 +496,7 @@ func (q *SingleTenantQuerier) Tail(ctx context.Context, req *logproto.TailReques }, q.cfg.TailMaxDuration, tailerWaitEntryThrottle, + categorizedLabels, q.metrics, ), nil } diff --git a/pkg/querier/querier_mock_test.go b/pkg/querier/querier_mock_test.go index aef4744032..fa2de75904 100644 --- a/pkg/querier/querier_mock_test.go +++ b/pkg/querier/querier_mock_test.go @@ -530,7 +530,7 @@ func (q *querierMock) Series(ctx context.Context, req *logproto.SeriesRequest) ( return args.Get(0).(func() *logproto.SeriesResponse)(), args.Error(1) } -func (q *querierMock) Tail(_ context.Context, _ *logproto.TailRequest) (*Tailer, error) { +func (q *querierMock) Tail(_ context.Context, _ *logproto.TailRequest, _ bool) (*Tailer, error) { return nil, errors.New("querierMock.Tail() has not been mocked") } diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index c585a5386c..2de9299425 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -118,7 +118,7 @@ func TestQuerier_Tail_QueryTimeoutConfigFlag(t *testing.T) { require.NoError(t, err) ctx := user.InjectOrgID(context.Background(), "test") - _, err = q.Tail(ctx, &request) + _, err = q.Tail(ctx, &request, false) require.NoError(t, err) calls := ingesterClient.GetMockedCallsByMethod("Query") @@ -512,7 +512,7 @@ func TestQuerier_concurrentTailLimits(t *testing.T) { require.NoError(t, err) ctx := user.InjectOrgID(context.Background(), "test") - _, err = q.Tail(ctx, &request) + _, err = q.Tail(ctx, &request, false) assert.Equal(t, testData.expectedError, err) }) } diff --git a/pkg/querier/tail.go b/pkg/querier/tail.go index 09e785a13b..bccedfb7c5 100644 --- a/pkg/querier/tail.go +++ b/pkg/querier/tail.go @@ -50,11 +50,12 @@ type Tailer struct { querierTailClients map[string]logproto.Querier_TailClient // addr -> grpc clients for tailing logs from ingesters querierTailClientsMtx sync.RWMutex - stopped bool - delayFor time.Duration - responseChan chan *loghttp.TailResponse - closeErrChan chan error - tailMaxDuration time.Duration + stopped bool + delayFor time.Duration + responseChan chan *loghttp.TailResponse + closeErrChan chan error + tailMaxDuration time.Duration + categorizeLabels bool // if we are not seeing any response from ingester, // how long do we want to wait by going into sleep @@ -234,7 +235,12 @@ func (t *Tailer) pushTailResponseFromIngester(resp *logproto.TailResponse) { t.streamMtx.Lock() defer t.streamMtx.Unlock() - t.openStreamIterator.Push(iter.NewStreamIterator(*resp.Stream)) + itr := iter.NewStreamIterator(*resp.Stream) + if t.categorizeLabels { + itr = iter.NewCategorizeLabelsIterator(itr) + } + + t.openStreamIterator.Push(itr) } // finds oldest entry by peeking at open stream iterator. @@ -305,10 +311,16 @@ func newTailer( tailDisconnectedIngesters func([]string) (map[string]logproto.Querier_TailClient, error), tailMaxDuration time.Duration, waitEntryThrottle time.Duration, + categorizeLabels bool, m *Metrics, ) *Tailer { + historicEntriesIter := historicEntries + if categorizeLabels { + historicEntriesIter = iter.NewCategorizeLabelsIterator(historicEntries) + } + t := Tailer{ - openStreamIterator: iter.NewMergeEntryIterator(context.Background(), []iter.EntryIterator{historicEntries}, logproto.FORWARD), + openStreamIterator: iter.NewMergeEntryIterator(context.Background(), []iter.EntryIterator{historicEntriesIter}, logproto.FORWARD), querierTailClients: querierTailClients, delayFor: delayFor, responseChan: make(chan *loghttp.TailResponse, maxBufferedTailResponses), @@ -317,6 +329,7 @@ func newTailer( tailDisconnectedIngesters: tailDisconnectedIngesters, tailMaxDuration: tailMaxDuration, waitEntryThrottle: waitEntryThrottle, + categorizeLabels: categorizeLabels, metrics: m, } diff --git a/pkg/querier/tail_test.go b/pkg/querier/tail_test.go index a2186cd614..32c6bed36e 100644 --- a/pkg/querier/tail_test.go +++ b/pkg/querier/tail_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -161,7 +162,7 @@ func TestTailer(t *testing.T) { tailClients["test"] = test.tailClient } - tailer := newTailer(0, tailClients, test.historicEntries, tailDisconnectedIngesters, timeout, throttle, NewMetrics(nil)) + tailer := newTailer(0, tailClients, test.historicEntries, tailDisconnectedIngesters, timeout, throttle, false, NewMetrics(nil)) defer tailer.close() test.tester(t, tailer, test.tailClient) @@ -169,6 +170,214 @@ func TestTailer(t *testing.T) { } } +func TestCategorizedLabels(t *testing.T) { + t.Parallel() + + lbs := labels.FromStrings("app", "foo") + createHistoricalEntries := func() iter.EntryIterator { + return iter.NewStreamIterator(logproto.Stream{ + Labels: lbs.String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(1, 0), + Line: "foo=1", + }, + { + Timestamp: time.Unix(2, 0), + Line: "foo=2", + StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")), + }, + }, + }) + } + createTailClients := func() map[string]*tailClientMock { + return map[string]*tailClientMock{ + "test1": newTailClientMock().mockRecvWithTrigger(mockTailResponse(logproto.Stream{ + Labels: lbs.String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(3, 0), + Line: "foo=3", + }, + }, + })), + "test2": newTailClientMock().mockRecvWithTrigger(mockTailResponse(logproto.Stream{ + Labels: labels.NewBuilder(lbs).Set("traceID", "123").Labels().String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(4, 0), + Line: "foo=4", + StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")), + }, + }, + })), + "test3": newTailClientMock().mockRecvWithTrigger(mockTailResponse(logproto.Stream{ + Labels: labels.NewBuilder(lbs).Set("traceID", "123").Set("foo", "5").Labels().String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(5, 0), + Line: "foo=5", + StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")), + Parsed: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "5")), + }, + }, + })), + } + } + + for _, tc := range []struct { + name string + categorizeLabels bool + historicEntries iter.EntryIterator + tailClients map[string]*tailClientMock + expectedStreams []logproto.Stream + }{ + { + name: "without categorize", + categorizeLabels: false, + historicEntries: createHistoricalEntries(), + tailClients: createTailClients(), + expectedStreams: []logproto.Stream{ + { + Labels: lbs.String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(1, 0), + Line: "foo=1", + }, + }, + }, + { + Labels: lbs.String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(2, 0), + Line: "foo=2", + StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")), + }, + }, + }, + { + Labels: lbs.String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(3, 0), + Line: "foo=3", + }, + }, + }, + { + Labels: labels.NewBuilder(lbs).Set("traceID", "123").Labels().String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(4, 0), + Line: "foo=4", + StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")), + }, + }, + }, + { + Labels: labels.NewBuilder(lbs).Set("traceID", "123").Set("foo", "5").Labels().String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(5, 0), + Line: "foo=5", + StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")), + Parsed: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "5")), + }, + }, + }, + }, + }, + { + name: "categorize", + categorizeLabels: true, + historicEntries: createHistoricalEntries(), + tailClients: createTailClients(), + expectedStreams: []logproto.Stream{ + { + Labels: lbs.String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(1, 0), + Line: "foo=1", + }, + }, + }, + { + Labels: lbs.String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(2, 0), + Line: "foo=2", + StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")), + }, + }, + }, + { + Labels: lbs.String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(3, 0), + Line: "foo=3", + }, + }, + }, + { + Labels: lbs.String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(4, 0), + Line: "foo=4", + StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")), + }, + }, + }, + { + Labels: lbs.String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(5, 0), + Line: "foo=5", + StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")), + Parsed: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "5")), + }, + }, + }, + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + tailDisconnectedIngesters := func([]string) (map[string]logproto.Querier_TailClient, error) { + return map[string]logproto.Querier_TailClient{}, nil + } + + tailClients := map[string]logproto.Querier_TailClient{} + for k, v := range tc.tailClients { + tailClients[k] = v + } + + tailer := newTailer(0, tailClients, tc.historicEntries, tailDisconnectedIngesters, timeout, throttle, tc.categorizeLabels, NewMetrics(nil)) + defer tailer.close() + + // Make tail clients receive their responses + for _, client := range tc.tailClients { + client.triggerRecv() + } + + err := waitUntilTailerOpenStreamsHaveBeenConsumed(tailer) + require.NoError(t, err) + + maxEntries := countEntriesInStreams(tc.expectedStreams) + responses, err := readFromTailer(tailer, maxEntries) + require.NoError(t, err) + + streams := flattenStreamsFromResponses(responses) + require.ElementsMatch(t, tc.expectedStreams, streams) + }) + } +} + func readFromTailer(tailer *Tailer, maxEntries int) ([]*loghttp.TailResponse, error) { responses := make([]*loghttp.TailResponse, 0) entriesCount := 0 @@ -204,7 +413,7 @@ func waitUntilTailerOpenStreamsHaveBeenConsumed(tailer *Tailer) error { select { case <-timeoutTicker.C: - return errors.New("timeout expired while reading responses from Tailer") + return errors.New("timeout expired while waiting for Tailer to consume open streams") default: time.Sleep(throttle) } diff --git a/pkg/util/httpreq/encoding_flags.go b/pkg/util/httpreq/encoding_flags.go index 89656618eb..232f2bc4e0 100644 --- a/pkg/util/httpreq/encoding_flags.go +++ b/pkg/util/httpreq/encoding_flags.go @@ -71,11 +71,7 @@ func AddEncodingFlagsToContext(ctx context.Context, flags EncodingFlags) context func ExtractEncodingFlags(req *http.Request) EncodingFlags { rawValue := req.Header.Get(LokiEncodingFlagsHeader) - if rawValue == "" { - return nil - } - - return parseEncodingFlags(rawValue) + return ParseEncodingFlags(rawValue) } func ExtractEncodingFlagsFromProto(req *httpgrpc.HTTPRequest) EncodingFlags { @@ -83,11 +79,7 @@ func ExtractEncodingFlagsFromProto(req *httpgrpc.HTTPRequest) EncodingFlags { for _, header := range req.GetHeaders() { if header.GetKey() == LokiEncodingFlagsHeader { rawValue = header.GetValues()[0] - if rawValue == "" { - return nil - } - - return parseEncodingFlags(rawValue) + return ParseEncodingFlags(rawValue) } } @@ -100,10 +92,14 @@ func ExtractEncodingFlagsFromCtx(ctx context.Context) EncodingFlags { return nil } - return parseEncodingFlags(rawValue) + return ParseEncodingFlags(rawValue) } -func parseEncodingFlags(rawFlags string) EncodingFlags { +func ParseEncodingFlags(rawFlags string) EncodingFlags { + if rawFlags == "" { + return nil + } + split := strings.Split(rawFlags, EncodeFlagsDelimiter) flags := make(EncodingFlags, len(split)) for _, rawFlag := range split { diff --git a/pkg/util/marshal/marshal.go b/pkg/util/marshal/marshal.go index 562808b300..fd28907d05 100644 --- a/pkg/util/marshal/marshal.go +++ b/pkg/util/marshal/marshal.go @@ -81,18 +81,38 @@ type WebsocketWriter interface { WriteMessage(int, []byte) error } -// WriteTailResponseJSON marshals the legacy.TailResponse to v1 loghttp JSON and -// then writes it to the provided connection. -func WriteTailResponseJSON(r legacy.TailResponse, c WebsocketWriter) error { - v1Response, err := NewTailResponse(r) +type websocketJSONWriter struct { + WebsocketWriter +} + +func (w *websocketJSONWriter) Write(p []byte) (n int, err error) { + err = w.WriteMessage(websocket.TextMessage, p) if err != nil { - return err + return 0, err } - data, err := jsoniter.Marshal(v1Response) + return len(p), nil +} + +func NewWebsocketJSONWriter(ws WebsocketWriter) io.Writer { + return &websocketJSONWriter{ws} +} + +// WriteTailResponseJSON marshals the legacy.TailResponse to v1 loghttp JSON and +// then writes it to the provided writer. +func WriteTailResponseJSON(r legacy.TailResponse, w io.Writer, encodeFlags httpreq.EncodingFlags) error { + // TODO(salvacorts): I think we can dismiss the new TailResponse and be an alias of legacy.TailResponse + // v1Response, err := NewTailResponse(r) + // if err != nil { + // return err + // } + s := jsoniter.ConfigFastest.BorrowStream(w) + defer jsoniter.ConfigFastest.ReturnStream(s) + + err := EncodeTailResult(r, s, encodeFlags) if err != nil { - return err + return fmt.Errorf("could not write JSON tail response: %w", err) } - return c.WriteMessage(websocket.TextMessage, data) + return s.Flush() } // WriteSeriesResponseJSON marshals a logproto.SeriesResponse to v1 loghttp JSON and then diff --git a/pkg/util/marshal/marshal_test.go b/pkg/util/marshal/marshal_test.go index 070f7b0ed4..fa8cc5d8aa 100644 --- a/pkg/util/marshal/marshal_test.go +++ b/pkg/util/marshal/marshal_test.go @@ -410,7 +410,6 @@ var labelTests = []struct { } // covers responses from /loki/api/v1/tail -// TODO(salvacorts): Support encoding flags. And fix serialized structured metadata labels which shouldn't be there unless the categorize flag is set. var tailTests = []struct { actual legacy.TailResponse expected string @@ -451,7 +450,7 @@ var tailTests = []struct { }, "values":[ [ "123456789012345", "super line"], - [ "123456789012346", "super line with labels", { "foo": "a", "bar": "b" } ] + [ "123456789012346", "super line with labels" ] ] } ], @@ -467,6 +466,90 @@ var tailTests = []struct { }, } +var tailTestWithEncodingFlags = []struct { + actual legacy.TailResponse + encodingFlags httpreq.EncodingFlags + expected string +}{ + { + actual: legacy.TailResponse{ + Streams: []logproto.Stream{ + { + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(0, 123456789012345), + Line: "super line", + }, + { + Timestamp: time.Unix(0, 123456789012346), + Line: "super line with labels", + StructuredMetadata: []logproto.LabelAdapter{ + {Name: "foo", Value: "a"}, + {Name: "bar", Value: "b"}, + }, + }, + { + Timestamp: time.Unix(0, 123456789012347), + Line: "super line with labels msg=text", + StructuredMetadata: []logproto.LabelAdapter{ + {Name: "foo", Value: "a"}, + {Name: "bar", Value: "b"}, + }, + Parsed: []logproto.LabelAdapter{ + {Name: "msg", Value: "text"}, + }, + }, + }, + Labels: `{test="test"}`, + }, + }, + DroppedEntries: []legacy.DroppedEntry{ + { + Timestamp: time.Unix(0, 123456789022345), + Labels: "{test=\"test\"}", + }, + }, + }, + encodingFlags: httpreq.NewEncodingFlags(httpreq.FlagCategorizeLabels), + expected: fmt.Sprintf(`{ + "streams": [ + { + "stream": { + "test": "test" + }, + "values":[ + [ "123456789012345", "super line"], + [ "123456789012346", "super line with labels", { + "structuredMetadata": { + "foo": "a", + "bar": "b" + } + }], + [ "123456789012347", "super line with labels msg=text", { + "structuredMetadata": { + "foo": "a", + "bar": "b" + }, + "parsed": { + "msg": "text" + } + }] + ] + } + ], + "dropped_entries": [ + { + "timestamp": "123456789022345", + "labels": { + "test": "test" + } + } + ], + "encodingFlags": ["%s"] + }`, httpreq.FlagCategorizeLabels), + }, +} + func Test_WriteQueryResponseJSON(t *testing.T) { for i, queryTest := range queryTests { var b bytes.Buffer @@ -515,15 +598,18 @@ func Test_WriteQueryResponseJSONWithError(t *testing.T) { func Test_MarshalTailResponse(t *testing.T) { for i, tailTest := range tailTests { - // convert logproto to model objects - model, err := NewTailResponse(tailTest.actual) + var b bytes.Buffer + err := WriteTailResponseJSON(tailTest.actual, &b, nil) require.NoError(t, err) - // marshal model object - bytes, err := json.Marshal(model) + require.JSONEqf(t, tailTest.expected, b.String(), "Tail Test %d failed", i) + } + for i, tailTest := range tailTestWithEncodingFlags { + var b bytes.Buffer + err := WriteTailResponseJSON(tailTest.actual, &b, tailTest.encodingFlags) require.NoError(t, err) - require.JSONEqf(t, tailTest.expected, string(bytes), "Tail Test %d failed", i) + require.JSONEqf(t, tailTest.expected, b.String(), "Tail Test %d failed", i) } } @@ -925,10 +1011,11 @@ func Test_WriteTailResponseJSON(t *testing.T) { {Timestamp: time.Unix(0, 2), Labels: `{app="dropped"}`}, }, }, - WebsocketWriterFunc(func(i int, b []byte) error { + NewWebsocketJSONWriter(WebsocketWriterFunc(func(i int, b []byte) error { require.Equal(t, `{"streams":[{"stream":{"app":"foo"},"values":[["1","foobar"]]}],"dropped_entries":[{"timestamp":"2","labels":{"app":"dropped"}}]}`, string(b)) return nil - }), + })), + nil, ), ) } diff --git a/pkg/util/marshal/query.go b/pkg/util/marshal/query.go index fb6aead8a7..b048b0a952 100644 --- a/pkg/util/marshal/query.go +++ b/pkg/util/marshal/query.go @@ -13,6 +13,7 @@ import ( "github.com/prometheus/prometheus/promql/parser" "github.com/grafana/loki/pkg/loghttp" + legacy "github.com/grafana/loki/pkg/loghttp/legacy" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logqlmodel" "github.com/grafana/loki/pkg/logqlmodel/stats" @@ -191,6 +192,60 @@ func EncodeResult(data parser.Value, statistics stats.Result, s *jsoniter.Stream return nil } +func EncodeTailResult(data legacy.TailResponse, s *jsoniter.Stream, encodeFlags httpreq.EncodingFlags) error { + s.WriteObjectStart() + s.WriteObjectField("streams") + err := encodeStreams(data.Streams, s, encodeFlags) + if err != nil { + return err + } + + if len(data.DroppedEntries) > 0 { + s.WriteMore() + s.WriteObjectField("dropped_entries") + err = encodeDroppedEntries(data.DroppedEntries, s) + if err != nil { + return err + } + } + + if len(encodeFlags) > 0 { + s.WriteMore() + s.WriteObjectField("encodingFlags") + if err := encodeEncodingFlags(s, encodeFlags); err != nil { + return err + } + } + + s.WriteObjectEnd() + return nil +} + +func encodeDroppedEntries(entries []legacy.DroppedEntry, s *jsoniter.Stream) error { + s.WriteArrayStart() + defer s.WriteArrayEnd() + + for i, entry := range entries { + if i > 0 { + s.WriteMore() + } + + ds, err := NewDroppedStream(&entry) + if err != nil { + return err + } + + jsonEntry, err := ds.MarshalJSON() + if err != nil { + return err + } + + s.WriteRaw(string(jsonEntry)) + } + + return nil +} + func encodeEncodingFlags(s *jsoniter.Stream, flags httpreq.EncodingFlags) error { s.WriteArrayStart() defer s.WriteArrayEnd() @@ -329,7 +384,6 @@ func encodeStream(stream logproto.Stream, s *jsoniter.Stream, encodeFlags httpre encodeLabels(logproto.FromLabelsToLabelAdapters(lbls), s) s.WriteObjectEnd() - s.Flush() s.WriteMore() s.WriteObjectField("values") @@ -373,8 +427,6 @@ func encodeStream(stream logproto.Stream, s *jsoniter.Stream, encodeFlags httpre s.WriteObjectEnd() } s.WriteArrayEnd() - - s.Flush() } s.WriteArrayEnd() diff --git a/pkg/util/marshal/tail.go b/pkg/util/marshal/tail.go index 5655aee09c..222b76c046 100644 --- a/pkg/util/marshal/tail.go +++ b/pkg/util/marshal/tail.go @@ -5,32 +5,6 @@ import ( legacy "github.com/grafana/loki/pkg/loghttp/legacy" ) -// NewTailResponse constructs a TailResponse from a legacy.TailResponse -func NewTailResponse(r legacy.TailResponse) (loghttp.TailResponse, error) { - var err error - ret := loghttp.TailResponse{ - Streams: make([]loghttp.Stream, len(r.Streams)), - DroppedStreams: make([]loghttp.DroppedStream, len(r.DroppedEntries)), - } - - for i, s := range r.Streams { - ret.Streams[i], err = NewStream(s) - - if err != nil { - return loghttp.TailResponse{}, err - } - } - - for i, d := range r.DroppedEntries { - ret.DroppedStreams[i], err = NewDroppedStream(&d) - if err != nil { - return loghttp.TailResponse{}, err - } - } - - return ret, nil -} - // NewDroppedStream constructs a DroppedStream from a legacy.DroppedEntry func NewDroppedStream(s *legacy.DroppedEntry) (loghttp.DroppedStream, error) { l, err := NewLabelSet(s.Labels) diff --git a/pkg/util/unmarshal/unmarshal_test.go b/pkg/util/unmarshal/unmarshal_test.go index 9fdaf27512..93372f62eb 100644 --- a/pkg/util/unmarshal/unmarshal_test.go +++ b/pkg/util/unmarshal/unmarshal_test.go @@ -224,6 +224,7 @@ func (ws *websocket) ReadMessage() (int, []byte, error) { func Test_ReadTailResponse(t *testing.T) { ws := &websocket{} + wsJSON := marshal.NewWebsocketJSONWriter(ws) require.NoError(t, marshal.WriteTailResponseJSON(legacy_loghttp.TailResponse{ Streams: []logproto.Stream{ {Labels: `{app="bar"}`, Entries: []logproto.Entry{{Timestamp: time.Unix(0, 2), Line: "2"}}}, @@ -231,7 +232,7 @@ func Test_ReadTailResponse(t *testing.T) { DroppedEntries: []legacy_loghttp.DroppedEntry{ {Timestamp: time.Unix(0, 1), Labels: `{app="foo"}`}, }, - }, ws)) + }, wsJSON, nil)) res := &loghttp.TailResponse{} require.NoError(t, ReadTailResponseJSON(res, ws))