Support categorized labels in Tailing (#11079)

**What this PR does / why we need it**:

This is a follow-up PR for https://github.com/grafana/loki/pull/10419
adding support for tailing.

I tested it on a dev cell and works fine.
<img width="1296" alt="image"
src="https://github.com/grafana/loki/assets/8354290/6177e0ca-02ce-48cd-a17f-0739dc3caa0a">


**Note**: With these changes, the JSON marshal unmarshal functions for
the tail are no longer used ([example][1]) so I think we can remove
them. Also, the new Tail response is no longer used, so we can also make
it an alias to the _legacy_ one. Let's do it on a follow-up PR to avoid
making this one bigger.

[1]:
52a3f16039/pkg/loghttp/entry.go (L210-L238)
pull/10096/head^2
Salva Corts 2 years ago committed by GitHub
parent 8742599be1
commit 9be3c0863e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      pkg/ingester/tailer.go
  2. 144
      pkg/ingester/tailer_test.go
  3. 71
      pkg/iter/categorized_labels_iterator.go
  4. 145
      pkg/iter/categorized_labels_iterator_test.go
  5. 33
      pkg/logql/engine.go
  6. 11
      pkg/querier/http.go
  7. 5
      pkg/querier/querier.go
  8. 2
      pkg/querier/querier_mock_test.go
  9. 4
      pkg/querier/querier_test.go
  10. 27
      pkg/querier/tail.go
  11. 213
      pkg/querier/tail_test.go
  12. 20
      pkg/util/httpreq/encoding_flags.go
  13. 36
      pkg/util/marshal/marshal.go
  14. 105
      pkg/util/marshal/marshal_test.go
  15. 58
      pkg/util/marshal/query.go
  16. 26
      pkg/util/marshal/tail.go
  17. 3
      pkg/util/unmarshal/unmarshal_test.go

@ -151,7 +151,7 @@ func (t *tailer) processStream(stream logproto.Stream, lbs labels.Labels) []*log
sp := t.pipeline.ForStream(lbs)
for _, e := range stream.Entries {
newLine, parsedLbs, ok := sp.ProcessString(e.Timestamp.UnixNano(), e.Line)
newLine, parsedLbs, ok := sp.ProcessString(e.Timestamp.UnixNano(), e.Line, logproto.FromLabelAdaptersToLabels(e.StructuredMetadata)...)
if !ok {
continue
}
@ -163,8 +163,10 @@ func (t *tailer) processStream(stream logproto.Stream, lbs labels.Labels) []*log
streams[parsedLbs.Hash()] = stream
}
stream.Entries = append(stream.Entries, logproto.Entry{
Timestamp: e.Timestamp,
Line: newLine,
Timestamp: e.Timestamp,
Line: newLine,
StructuredMetadata: logproto.FromLabelsToLabelAdapters(parsedLbs.StructuredMetadata()),
Parsed: logproto.FromLabelsToLabelAdapters(parsedLbs.Parsed()),
})
}
streamsResult := make([]*logproto.Stream, 0, len(streams))

@ -93,10 +93,25 @@ func Test_dropstream(t *testing.T) {
}
}
type fakeTailServer struct{}
type fakeTailServer struct {
responses []logproto.TailResponse
}
func (f *fakeTailServer) Send(response *logproto.TailResponse) error {
f.responses = append(f.responses, *response)
return nil
}
func (f *fakeTailServer) Context() context.Context { return context.Background() }
func (f *fakeTailServer) Send(*logproto.TailResponse) error { return nil }
func (f *fakeTailServer) Context() context.Context { return context.Background() }
func (f *fakeTailServer) GetResponses() []logproto.TailResponse {
return f.responses
}
func (f *fakeTailServer) Reset() {
f.responses = f.responses[:0]
}
func Test_TailerSendRace(t *testing.T) {
tail, err := newTailer("foo", `{app="foo"} |= "foo"`, &fakeTailServer{}, 10)
@ -137,3 +152,126 @@ func Test_IsMatching(t *testing.T) {
})
}
}
func Test_StructuredMetadata(t *testing.T) {
lbs := makeRandomLabels()
for _, tc := range []struct {
name string
query string
sentStream logproto.Stream
expectedResponses []logproto.TailResponse
}{
{
// Optimization will make the same stream to be returned regardless of structured metadata.
name: "noop pipeline",
query: `{app="foo"}`,
sentStream: logproto.Stream{
Labels: lbs.String(),
Entries: []logproto.Entry{
{
Timestamp: time.Unix(0, 1),
Line: "foo=1",
},
{
Timestamp: time.Unix(0, 2),
Line: "foo=2",
StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")),
},
},
},
expectedResponses: []logproto.TailResponse{
{
Stream: &logproto.Stream{
Labels: lbs.String(),
Entries: []logproto.Entry{
{
Timestamp: time.Unix(0, 1),
Line: "foo=1",
},
{
Timestamp: time.Unix(0, 2),
Line: "foo=2",
StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")),
},
},
},
DroppedStreams: nil,
},
},
},
{
name: "parse pipeline labels",
query: `{app="foo"} | logfmt`,
sentStream: logproto.Stream{
Labels: lbs.String(),
Entries: []logproto.Entry{
{
Timestamp: time.Unix(0, 1),
Line: "foo=1",
},
{
Timestamp: time.Unix(0, 2),
Line: "foo=2",
StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")),
},
},
},
expectedResponses: []logproto.TailResponse{
{
Stream: &logproto.Stream{
Labels: labels.NewBuilder(lbs).Set("foo", "1").Labels().String(),
Entries: []logproto.Entry{
{
Timestamp: time.Unix(0, 1),
Line: "foo=1",
Parsed: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "1")),
},
},
},
DroppedStreams: nil,
},
{
Stream: &logproto.Stream{
Labels: labels.NewBuilder(lbs).Set("traceID", "123").Set("foo", "2").Labels().String(),
Entries: []logproto.Entry{
{
Timestamp: time.Unix(0, 2),
Line: "foo=2",
StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")),
Parsed: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "2")),
},
},
},
DroppedStreams: nil,
},
},
},
} {
t.Run(tc.name, func(t *testing.T) {
var server fakeTailServer
tail, err := newTailer("foo", tc.query, &server, 10)
require.NoError(t, err)
var wg sync.WaitGroup
wg.Add(1)
go func() {
tail.loop()
wg.Done()
}()
tail.send(tc.sentStream, lbs)
// Wait for the stream to be received by the server.
require.Eventually(t, func() bool {
return len(server.GetResponses()) > 0
}, 30*time.Second, 1*time.Second, "stream was not received")
responses := server.GetResponses()
require.ElementsMatch(t, tc.expectedResponses, responses)
tail.close()
wg.Wait()
})
}
}

@ -0,0 +1,71 @@
package iter
import (
"fmt"
"github.com/prometheus/prometheus/model/labels"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/syntax"
)
type categorizeLabelsIterator struct {
EntryIterator
currEntry logproto.Entry
currStreamLabels string
currHash uint64
currErr error
}
func NewCategorizeLabelsIterator(wrap EntryIterator) EntryIterator {
return &categorizeLabelsIterator{
EntryIterator: wrap,
}
}
func (c *categorizeLabelsIterator) Next() bool {
if !c.EntryIterator.Next() {
return false
}
c.currEntry = c.Entry()
if len(c.currEntry.StructuredMetadata) == 0 && len(c.currEntry.Parsed) == 0 {
c.currStreamLabels = c.EntryIterator.Labels()
c.currHash = c.EntryIterator.StreamHash()
return true
}
// We need to remove the structured metadata labels and parsed labels from the stream labels.
streamLabels := c.EntryIterator.Labels()
lbls, err := syntax.ParseLabels(streamLabels)
if err != nil {
c.currErr = fmt.Errorf("failed to parse series labels to categorize labels: %w", err)
return false
}
builder := labels.NewBuilder(lbls)
for _, label := range c.currEntry.StructuredMetadata {
builder.Del(label.Name)
}
for _, label := range c.currEntry.Parsed {
builder.Del(label.Name)
}
newLabels := builder.Labels()
c.currStreamLabels = newLabels.String()
c.currHash = newLabels.Hash()
return true
}
func (c *categorizeLabelsIterator) Error() error {
return c.currErr
}
func (c *categorizeLabelsIterator) Labels() string {
return c.currStreamLabels
}
func (c *categorizeLabelsIterator) StreamHash() uint64 {
return c.currHash
}

@ -0,0 +1,145 @@
package iter
import (
"testing"
"time"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/logproto"
)
func TestNewCategorizeLabelsIterator(t *testing.T) {
for _, tc := range []struct {
name string
inner EntryIterator
expectedStreams []logproto.Stream
}{
{
name: "no structured metadata nor parsed labels",
inner: NewSortEntryIterator([]EntryIterator{
NewStreamIterator(logproto.Stream{
Labels: labels.FromStrings("namespace", "default").String(),
Entries: []logproto.Entry{
{
Timestamp: time.Unix(0, 1),
Line: "foo=1",
},
{
Timestamp: time.Unix(0, 2),
Line: "foo=2",
},
},
}),
}, logproto.FORWARD),
expectedStreams: []logproto.Stream{
{
Labels: labels.FromStrings("namespace", "default").String(),
Entries: []logproto.Entry{
{
Timestamp: time.Unix(0, 1),
Line: "foo=1",
},
{
Timestamp: time.Unix(0, 2),
Line: "foo=2",
},
},
},
},
},
{
name: "structured metadata and parsed labels",
inner: NewSortEntryIterator([]EntryIterator{
NewStreamIterator(logproto.Stream{
Labels: labels.FromStrings("namespace", "default").String(),
Entries: []logproto.Entry{
{
Timestamp: time.Unix(0, 1),
Line: "foo=1",
},
},
}),
NewStreamIterator(logproto.Stream{
Labels: labels.FromStrings("namespace", "default", "traceID", "123").String(),
Entries: []logproto.Entry{
{
Timestamp: time.Unix(0, 2),
Line: "foo=2",
StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")),
},
},
}),
NewStreamIterator(logproto.Stream{
Labels: labels.FromStrings("namespace", "default", "foo", "3").String(),
Entries: []logproto.Entry{
{
Timestamp: time.Unix(0, 3),
Line: "foo=3",
Parsed: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "3")),
},
},
}),
NewStreamIterator(logproto.Stream{
Labels: labels.FromStrings("namespace", "default", "traceID", "123", "foo", "4").String(),
Entries: []logproto.Entry{
{
Timestamp: time.Unix(0, 4),
Line: "foo=4",
StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")),
Parsed: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "4")),
},
},
}),
}, logproto.FORWARD),
expectedStreams: []logproto.Stream{
{
Labels: labels.FromStrings("namespace", "default").String(),
Entries: []logproto.Entry{
{
Timestamp: time.Unix(0, 1),
Line: "foo=1",
},
{
Timestamp: time.Unix(0, 2),
Line: "foo=2",
StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")),
},
{
Timestamp: time.Unix(0, 3),
Line: "foo=3",
Parsed: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "3")),
},
{
Timestamp: time.Unix(0, 4),
Line: "foo=4",
StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")),
Parsed: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "4")),
},
},
},
},
},
} {
t.Run(tc.name, func(t *testing.T) {
itr := NewCategorizeLabelsIterator(tc.inner)
streamsEntries := make(map[string][]logproto.Entry)
for itr.Next() {
streamsEntries[itr.Labels()] = append(streamsEntries[itr.Labels()], itr.Entry())
require.NoError(t, itr.Error())
}
var streams []logproto.Stream
for lbls, entries := range streamsEntries {
streams = append(streams, logproto.Stream{
Labels: lbls,
Entries: entries,
})
}
require.ElementsMatch(t, tc.expectedStreams, streams)
})
}
}

@ -287,16 +287,18 @@ func (q *query) Eval(ctx context.Context) (promql_parser.Value, error) {
return value, err
case syntax.LogSelectorExpr:
iter, err := q.evaluator.NewIterator(ctx, e, q.params)
itr, err := q.evaluator.NewIterator(ctx, e, q.params)
if err != nil {
return nil, err
}
encodingFlags := httpreq.ExtractEncodingFlagsFromCtx(ctx)
categorizeLabels := encodingFlags.Has(httpreq.FlagCategorizeLabels)
if encodingFlags.Has(httpreq.FlagCategorizeLabels) {
itr = iter.NewCategorizeLabelsIterator(itr)
}
defer util.LogErrorWithContext(ctx, "closing iterator", iter.Close)
streams, err := readStreams(iter, q.params.Limit(), q.params.Direction(), q.params.Interval(), categorizeLabels)
defer util.LogErrorWithContext(ctx, "closing iterator", itr.Close)
streams, err := readStreams(itr, q.params.Limit(), q.params.Direction(), q.params.Interval(), true)
return streams, err
default:
return nil, fmt.Errorf("unexpected type (%T): cannot evaluate", e)
@ -513,7 +515,7 @@ func readStreams(i iter.EntryIterator, size uint32, dir logproto.Direction, inte
// value here because many unit tests start at time.Unix(0,0)
lastEntry := lastEntryMinTime
for respSize < size && i.Next() {
entry := i.Entry()
streamLabels, entry := i.Labels(), i.Entry()
forwardShouldOutput := dir == logproto.FORWARD &&
(entry.Timestamp.Equal(lastEntry.Add(interval)) || entry.Timestamp.After(lastEntry.Add(interval)))
@ -524,27 +526,6 @@ func readStreams(i iter.EntryIterator, size uint32, dir logproto.Direction, inte
// If lastEntry.Unix < 0 this is the first pass through the loop and we should output the line.
// Then check to see if the entry is equal to, or past a forward or reverse step
if interval == 0 || lastEntry.Unix() < 0 || forwardShouldOutput || backwardShouldOutput {
streamLabels := i.Labels()
// If categorizeLabels is true, We need to remove the structured metadata labels and parsed labels from the stream labels.
// TODO(salvacorts): If this is too slow, provided this is in the hot path, we can consider doing this in the iterator.
if categorizeLabels && (len(entry.StructuredMetadata) > 0 || len(entry.Parsed) > 0) {
lbls, err := syntax.ParseLabels(streamLabels)
if err != nil {
return nil, fmt.Errorf("failed to parse series labels to categorize labels: %w", err)
}
builder := labels.NewBuilder(lbls)
for _, label := range entry.StructuredMetadata {
builder.Del(label.Name)
}
for _, label := range entry.Parsed {
builder.Del(label.Name)
}
streamLabels = builder.Labels().String()
}
stream, ok := streams[streamLabels]
if !ok {
stream = &logproto.Stream{

@ -145,6 +145,9 @@ func (q *QuerierAPI) TailHandler(w http.ResponseWriter, r *http.Request) {
return
}
encodingFlags := httpreq.ExtractEncodingFlags(r)
version := loghttp.GetVersion(r.RequestURI)
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
level.Error(logger).Log("msg", "Error in upgrading websocket", "err", err)
@ -163,7 +166,7 @@ func (q *QuerierAPI) TailHandler(w http.ResponseWriter, r *http.Request) {
}
}()
tailer, err := q.querier.Tail(r.Context(), req)
tailer, err := q.querier.Tail(r.Context(), req, encodingFlags.Has(httpreq.FlagCategorizeLabels))
if err != nil {
if err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())); err != nil {
level.Error(logger).Log("msg", "Error connecting to ingesters for tailing", "err", err)
@ -179,6 +182,8 @@ func (q *QuerierAPI) TailHandler(w http.ResponseWriter, r *http.Request) {
ticker := time.NewTicker(wsPingPeriod)
defer ticker.Stop()
connWriter := marshal.NewWebsocketJSONWriter(conn)
var response *loghttp_legacy.TailResponse
responseChan := tailer.getResponseChan()
closeErrChan := tailer.getCloseErrorChan()
@ -209,8 +214,8 @@ func (q *QuerierAPI) TailHandler(w http.ResponseWriter, r *http.Request) {
select {
case response = <-responseChan:
var err error
if loghttp.GetVersion(r.RequestURI) == loghttp.VersionV1 {
err = marshal.WriteTailResponseJSON(*response, conn)
if version == loghttp.VersionV1 {
err = marshal.WriteTailResponseJSON(*response, connWriter, encodingFlags)
} else {
err = marshal_legacy.WriteTailResponseJSON(*response, conn)
}

@ -88,7 +88,7 @@ type Querier interface {
logql.Querier
Label(ctx context.Context, req *logproto.LabelRequest) (*logproto.LabelResponse, error)
Series(ctx context.Context, req *logproto.SeriesRequest) (*logproto.SeriesResponse, error)
Tail(ctx context.Context, req *logproto.TailRequest) (*Tailer, error)
Tail(ctx context.Context, req *logproto.TailRequest, categorizedLabels bool) (*Tailer, error)
IndexStats(ctx context.Context, req *loghttp.RangeQuery) (*stats.Stats, error)
Volume(ctx context.Context, req *logproto.VolumeRequest) (*logproto.VolumeResponse, error)
}
@ -434,7 +434,7 @@ func (*SingleTenantQuerier) Check(_ context.Context, _ *grpc_health_v1.HealthChe
}
// Tail keeps getting matching logs from all ingesters for given query
func (q *SingleTenantQuerier) Tail(ctx context.Context, req *logproto.TailRequest) (*Tailer, error) {
func (q *SingleTenantQuerier) Tail(ctx context.Context, req *logproto.TailRequest, categorizedLabels bool) (*Tailer, error) {
err := q.checkTailRequestLimit(ctx)
if err != nil {
return nil, err
@ -496,6 +496,7 @@ func (q *SingleTenantQuerier) Tail(ctx context.Context, req *logproto.TailReques
},
q.cfg.TailMaxDuration,
tailerWaitEntryThrottle,
categorizedLabels,
q.metrics,
), nil
}

@ -530,7 +530,7 @@ func (q *querierMock) Series(ctx context.Context, req *logproto.SeriesRequest) (
return args.Get(0).(func() *logproto.SeriesResponse)(), args.Error(1)
}
func (q *querierMock) Tail(_ context.Context, _ *logproto.TailRequest) (*Tailer, error) {
func (q *querierMock) Tail(_ context.Context, _ *logproto.TailRequest, _ bool) (*Tailer, error) {
return nil, errors.New("querierMock.Tail() has not been mocked")
}

@ -118,7 +118,7 @@ func TestQuerier_Tail_QueryTimeoutConfigFlag(t *testing.T) {
require.NoError(t, err)
ctx := user.InjectOrgID(context.Background(), "test")
_, err = q.Tail(ctx, &request)
_, err = q.Tail(ctx, &request, false)
require.NoError(t, err)
calls := ingesterClient.GetMockedCallsByMethod("Query")
@ -512,7 +512,7 @@ func TestQuerier_concurrentTailLimits(t *testing.T) {
require.NoError(t, err)
ctx := user.InjectOrgID(context.Background(), "test")
_, err = q.Tail(ctx, &request)
_, err = q.Tail(ctx, &request, false)
assert.Equal(t, testData.expectedError, err)
})
}

@ -50,11 +50,12 @@ type Tailer struct {
querierTailClients map[string]logproto.Querier_TailClient // addr -> grpc clients for tailing logs from ingesters
querierTailClientsMtx sync.RWMutex
stopped bool
delayFor time.Duration
responseChan chan *loghttp.TailResponse
closeErrChan chan error
tailMaxDuration time.Duration
stopped bool
delayFor time.Duration
responseChan chan *loghttp.TailResponse
closeErrChan chan error
tailMaxDuration time.Duration
categorizeLabels bool
// if we are not seeing any response from ingester,
// how long do we want to wait by going into sleep
@ -234,7 +235,12 @@ func (t *Tailer) pushTailResponseFromIngester(resp *logproto.TailResponse) {
t.streamMtx.Lock()
defer t.streamMtx.Unlock()
t.openStreamIterator.Push(iter.NewStreamIterator(*resp.Stream))
itr := iter.NewStreamIterator(*resp.Stream)
if t.categorizeLabels {
itr = iter.NewCategorizeLabelsIterator(itr)
}
t.openStreamIterator.Push(itr)
}
// finds oldest entry by peeking at open stream iterator.
@ -305,10 +311,16 @@ func newTailer(
tailDisconnectedIngesters func([]string) (map[string]logproto.Querier_TailClient, error),
tailMaxDuration time.Duration,
waitEntryThrottle time.Duration,
categorizeLabels bool,
m *Metrics,
) *Tailer {
historicEntriesIter := historicEntries
if categorizeLabels {
historicEntriesIter = iter.NewCategorizeLabelsIterator(historicEntries)
}
t := Tailer{
openStreamIterator: iter.NewMergeEntryIterator(context.Background(), []iter.EntryIterator{historicEntries}, logproto.FORWARD),
openStreamIterator: iter.NewMergeEntryIterator(context.Background(), []iter.EntryIterator{historicEntriesIter}, logproto.FORWARD),
querierTailClients: querierTailClients,
delayFor: delayFor,
responseChan: make(chan *loghttp.TailResponse, maxBufferedTailResponses),
@ -317,6 +329,7 @@ func newTailer(
tailDisconnectedIngesters: tailDisconnectedIngesters,
tailMaxDuration: tailMaxDuration,
waitEntryThrottle: waitEntryThrottle,
categorizeLabels: categorizeLabels,
metrics: m,
}

@ -5,6 +5,7 @@ import (
"testing"
"time"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -161,7 +162,7 @@ func TestTailer(t *testing.T) {
tailClients["test"] = test.tailClient
}
tailer := newTailer(0, tailClients, test.historicEntries, tailDisconnectedIngesters, timeout, throttle, NewMetrics(nil))
tailer := newTailer(0, tailClients, test.historicEntries, tailDisconnectedIngesters, timeout, throttle, false, NewMetrics(nil))
defer tailer.close()
test.tester(t, tailer, test.tailClient)
@ -169,6 +170,214 @@ func TestTailer(t *testing.T) {
}
}
func TestCategorizedLabels(t *testing.T) {
t.Parallel()
lbs := labels.FromStrings("app", "foo")
createHistoricalEntries := func() iter.EntryIterator {
return iter.NewStreamIterator(logproto.Stream{
Labels: lbs.String(),
Entries: []logproto.Entry{
{
Timestamp: time.Unix(1, 0),
Line: "foo=1",
},
{
Timestamp: time.Unix(2, 0),
Line: "foo=2",
StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")),
},
},
})
}
createTailClients := func() map[string]*tailClientMock {
return map[string]*tailClientMock{
"test1": newTailClientMock().mockRecvWithTrigger(mockTailResponse(logproto.Stream{
Labels: lbs.String(),
Entries: []logproto.Entry{
{
Timestamp: time.Unix(3, 0),
Line: "foo=3",
},
},
})),
"test2": newTailClientMock().mockRecvWithTrigger(mockTailResponse(logproto.Stream{
Labels: labels.NewBuilder(lbs).Set("traceID", "123").Labels().String(),
Entries: []logproto.Entry{
{
Timestamp: time.Unix(4, 0),
Line: "foo=4",
StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")),
},
},
})),
"test3": newTailClientMock().mockRecvWithTrigger(mockTailResponse(logproto.Stream{
Labels: labels.NewBuilder(lbs).Set("traceID", "123").Set("foo", "5").Labels().String(),
Entries: []logproto.Entry{
{
Timestamp: time.Unix(5, 0),
Line: "foo=5",
StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")),
Parsed: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "5")),
},
},
})),
}
}
for _, tc := range []struct {
name string
categorizeLabels bool
historicEntries iter.EntryIterator
tailClients map[string]*tailClientMock
expectedStreams []logproto.Stream
}{
{
name: "without categorize",
categorizeLabels: false,
historicEntries: createHistoricalEntries(),
tailClients: createTailClients(),
expectedStreams: []logproto.Stream{
{
Labels: lbs.String(),
Entries: []logproto.Entry{
{
Timestamp: time.Unix(1, 0),
Line: "foo=1",
},
},
},
{
Labels: lbs.String(),
Entries: []logproto.Entry{
{
Timestamp: time.Unix(2, 0),
Line: "foo=2",
StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")),
},
},
},
{
Labels: lbs.String(),
Entries: []logproto.Entry{
{
Timestamp: time.Unix(3, 0),
Line: "foo=3",
},
},
},
{
Labels: labels.NewBuilder(lbs).Set("traceID", "123").Labels().String(),
Entries: []logproto.Entry{
{
Timestamp: time.Unix(4, 0),
Line: "foo=4",
StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")),
},
},
},
{
Labels: labels.NewBuilder(lbs).Set("traceID", "123").Set("foo", "5").Labels().String(),
Entries: []logproto.Entry{
{
Timestamp: time.Unix(5, 0),
Line: "foo=5",
StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")),
Parsed: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "5")),
},
},
},
},
},
{
name: "categorize",
categorizeLabels: true,
historicEntries: createHistoricalEntries(),
tailClients: createTailClients(),
expectedStreams: []logproto.Stream{
{
Labels: lbs.String(),
Entries: []logproto.Entry{
{
Timestamp: time.Unix(1, 0),
Line: "foo=1",
},
},
},
{
Labels: lbs.String(),
Entries: []logproto.Entry{
{
Timestamp: time.Unix(2, 0),
Line: "foo=2",
StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")),
},
},
},
{
Labels: lbs.String(),
Entries: []logproto.Entry{
{
Timestamp: time.Unix(3, 0),
Line: "foo=3",
},
},
},
{
Labels: lbs.String(),
Entries: []logproto.Entry{
{
Timestamp: time.Unix(4, 0),
Line: "foo=4",
StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")),
},
},
},
{
Labels: lbs.String(),
Entries: []logproto.Entry{
{
Timestamp: time.Unix(5, 0),
Line: "foo=5",
StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")),
Parsed: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "5")),
},
},
},
},
},
} {
t.Run(tc.name, func(t *testing.T) {
tailDisconnectedIngesters := func([]string) (map[string]logproto.Querier_TailClient, error) {
return map[string]logproto.Querier_TailClient{}, nil
}
tailClients := map[string]logproto.Querier_TailClient{}
for k, v := range tc.tailClients {
tailClients[k] = v
}
tailer := newTailer(0, tailClients, tc.historicEntries, tailDisconnectedIngesters, timeout, throttle, tc.categorizeLabels, NewMetrics(nil))
defer tailer.close()
// Make tail clients receive their responses
for _, client := range tc.tailClients {
client.triggerRecv()
}
err := waitUntilTailerOpenStreamsHaveBeenConsumed(tailer)
require.NoError(t, err)
maxEntries := countEntriesInStreams(tc.expectedStreams)
responses, err := readFromTailer(tailer, maxEntries)
require.NoError(t, err)
streams := flattenStreamsFromResponses(responses)
require.ElementsMatch(t, tc.expectedStreams, streams)
})
}
}
func readFromTailer(tailer *Tailer, maxEntries int) ([]*loghttp.TailResponse, error) {
responses := make([]*loghttp.TailResponse, 0)
entriesCount := 0
@ -204,7 +413,7 @@ func waitUntilTailerOpenStreamsHaveBeenConsumed(tailer *Tailer) error {
select {
case <-timeoutTicker.C:
return errors.New("timeout expired while reading responses from Tailer")
return errors.New("timeout expired while waiting for Tailer to consume open streams")
default:
time.Sleep(throttle)
}

@ -71,11 +71,7 @@ func AddEncodingFlagsToContext(ctx context.Context, flags EncodingFlags) context
func ExtractEncodingFlags(req *http.Request) EncodingFlags {
rawValue := req.Header.Get(LokiEncodingFlagsHeader)
if rawValue == "" {
return nil
}
return parseEncodingFlags(rawValue)
return ParseEncodingFlags(rawValue)
}
func ExtractEncodingFlagsFromProto(req *httpgrpc.HTTPRequest) EncodingFlags {
@ -83,11 +79,7 @@ func ExtractEncodingFlagsFromProto(req *httpgrpc.HTTPRequest) EncodingFlags {
for _, header := range req.GetHeaders() {
if header.GetKey() == LokiEncodingFlagsHeader {
rawValue = header.GetValues()[0]
if rawValue == "" {
return nil
}
return parseEncodingFlags(rawValue)
return ParseEncodingFlags(rawValue)
}
}
@ -100,10 +92,14 @@ func ExtractEncodingFlagsFromCtx(ctx context.Context) EncodingFlags {
return nil
}
return parseEncodingFlags(rawValue)
return ParseEncodingFlags(rawValue)
}
func parseEncodingFlags(rawFlags string) EncodingFlags {
func ParseEncodingFlags(rawFlags string) EncodingFlags {
if rawFlags == "" {
return nil
}
split := strings.Split(rawFlags, EncodeFlagsDelimiter)
flags := make(EncodingFlags, len(split))
for _, rawFlag := range split {

@ -81,18 +81,38 @@ type WebsocketWriter interface {
WriteMessage(int, []byte) error
}
// WriteTailResponseJSON marshals the legacy.TailResponse to v1 loghttp JSON and
// then writes it to the provided connection.
func WriteTailResponseJSON(r legacy.TailResponse, c WebsocketWriter) error {
v1Response, err := NewTailResponse(r)
type websocketJSONWriter struct {
WebsocketWriter
}
func (w *websocketJSONWriter) Write(p []byte) (n int, err error) {
err = w.WriteMessage(websocket.TextMessage, p)
if err != nil {
return err
return 0, err
}
data, err := jsoniter.Marshal(v1Response)
return len(p), nil
}
func NewWebsocketJSONWriter(ws WebsocketWriter) io.Writer {
return &websocketJSONWriter{ws}
}
// WriteTailResponseJSON marshals the legacy.TailResponse to v1 loghttp JSON and
// then writes it to the provided writer.
func WriteTailResponseJSON(r legacy.TailResponse, w io.Writer, encodeFlags httpreq.EncodingFlags) error {
// TODO(salvacorts): I think we can dismiss the new TailResponse and be an alias of legacy.TailResponse
// v1Response, err := NewTailResponse(r)
// if err != nil {
// return err
// }
s := jsoniter.ConfigFastest.BorrowStream(w)
defer jsoniter.ConfigFastest.ReturnStream(s)
err := EncodeTailResult(r, s, encodeFlags)
if err != nil {
return err
return fmt.Errorf("could not write JSON tail response: %w", err)
}
return c.WriteMessage(websocket.TextMessage, data)
return s.Flush()
}
// WriteSeriesResponseJSON marshals a logproto.SeriesResponse to v1 loghttp JSON and then

@ -410,7 +410,6 @@ var labelTests = []struct {
}
// covers responses from /loki/api/v1/tail
// TODO(salvacorts): Support encoding flags. And fix serialized structured metadata labels which shouldn't be there unless the categorize flag is set.
var tailTests = []struct {
actual legacy.TailResponse
expected string
@ -451,7 +450,7 @@ var tailTests = []struct {
},
"values":[
[ "123456789012345", "super line"],
[ "123456789012346", "super line with labels", { "foo": "a", "bar": "b" } ]
[ "123456789012346", "super line with labels" ]
]
}
],
@ -467,6 +466,90 @@ var tailTests = []struct {
},
}
var tailTestWithEncodingFlags = []struct {
actual legacy.TailResponse
encodingFlags httpreq.EncodingFlags
expected string
}{
{
actual: legacy.TailResponse{
Streams: []logproto.Stream{
{
Entries: []logproto.Entry{
{
Timestamp: time.Unix(0, 123456789012345),
Line: "super line",
},
{
Timestamp: time.Unix(0, 123456789012346),
Line: "super line with labels",
StructuredMetadata: []logproto.LabelAdapter{
{Name: "foo", Value: "a"},
{Name: "bar", Value: "b"},
},
},
{
Timestamp: time.Unix(0, 123456789012347),
Line: "super line with labels msg=text",
StructuredMetadata: []logproto.LabelAdapter{
{Name: "foo", Value: "a"},
{Name: "bar", Value: "b"},
},
Parsed: []logproto.LabelAdapter{
{Name: "msg", Value: "text"},
},
},
},
Labels: `{test="test"}`,
},
},
DroppedEntries: []legacy.DroppedEntry{
{
Timestamp: time.Unix(0, 123456789022345),
Labels: "{test=\"test\"}",
},
},
},
encodingFlags: httpreq.NewEncodingFlags(httpreq.FlagCategorizeLabels),
expected: fmt.Sprintf(`{
"streams": [
{
"stream": {
"test": "test"
},
"values":[
[ "123456789012345", "super line"],
[ "123456789012346", "super line with labels", {
"structuredMetadata": {
"foo": "a",
"bar": "b"
}
}],
[ "123456789012347", "super line with labels msg=text", {
"structuredMetadata": {
"foo": "a",
"bar": "b"
},
"parsed": {
"msg": "text"
}
}]
]
}
],
"dropped_entries": [
{
"timestamp": "123456789022345",
"labels": {
"test": "test"
}
}
],
"encodingFlags": ["%s"]
}`, httpreq.FlagCategorizeLabels),
},
}
func Test_WriteQueryResponseJSON(t *testing.T) {
for i, queryTest := range queryTests {
var b bytes.Buffer
@ -515,15 +598,18 @@ func Test_WriteQueryResponseJSONWithError(t *testing.T) {
func Test_MarshalTailResponse(t *testing.T) {
for i, tailTest := range tailTests {
// convert logproto to model objects
model, err := NewTailResponse(tailTest.actual)
var b bytes.Buffer
err := WriteTailResponseJSON(tailTest.actual, &b, nil)
require.NoError(t, err)
// marshal model object
bytes, err := json.Marshal(model)
require.JSONEqf(t, tailTest.expected, b.String(), "Tail Test %d failed", i)
}
for i, tailTest := range tailTestWithEncodingFlags {
var b bytes.Buffer
err := WriteTailResponseJSON(tailTest.actual, &b, tailTest.encodingFlags)
require.NoError(t, err)
require.JSONEqf(t, tailTest.expected, string(bytes), "Tail Test %d failed", i)
require.JSONEqf(t, tailTest.expected, b.String(), "Tail Test %d failed", i)
}
}
@ -925,10 +1011,11 @@ func Test_WriteTailResponseJSON(t *testing.T) {
{Timestamp: time.Unix(0, 2), Labels: `{app="dropped"}`},
},
},
WebsocketWriterFunc(func(i int, b []byte) error {
NewWebsocketJSONWriter(WebsocketWriterFunc(func(i int, b []byte) error {
require.Equal(t, `{"streams":[{"stream":{"app":"foo"},"values":[["1","foobar"]]}],"dropped_entries":[{"timestamp":"2","labels":{"app":"dropped"}}]}`, string(b))
return nil
}),
})),
nil,
),
)
}

@ -13,6 +13,7 @@ import (
"github.com/prometheus/prometheus/promql/parser"
"github.com/grafana/loki/pkg/loghttp"
legacy "github.com/grafana/loki/pkg/loghttp/legacy"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logqlmodel"
"github.com/grafana/loki/pkg/logqlmodel/stats"
@ -191,6 +192,60 @@ func EncodeResult(data parser.Value, statistics stats.Result, s *jsoniter.Stream
return nil
}
func EncodeTailResult(data legacy.TailResponse, s *jsoniter.Stream, encodeFlags httpreq.EncodingFlags) error {
s.WriteObjectStart()
s.WriteObjectField("streams")
err := encodeStreams(data.Streams, s, encodeFlags)
if err != nil {
return err
}
if len(data.DroppedEntries) > 0 {
s.WriteMore()
s.WriteObjectField("dropped_entries")
err = encodeDroppedEntries(data.DroppedEntries, s)
if err != nil {
return err
}
}
if len(encodeFlags) > 0 {
s.WriteMore()
s.WriteObjectField("encodingFlags")
if err := encodeEncodingFlags(s, encodeFlags); err != nil {
return err
}
}
s.WriteObjectEnd()
return nil
}
func encodeDroppedEntries(entries []legacy.DroppedEntry, s *jsoniter.Stream) error {
s.WriteArrayStart()
defer s.WriteArrayEnd()
for i, entry := range entries {
if i > 0 {
s.WriteMore()
}
ds, err := NewDroppedStream(&entry)
if err != nil {
return err
}
jsonEntry, err := ds.MarshalJSON()
if err != nil {
return err
}
s.WriteRaw(string(jsonEntry))
}
return nil
}
func encodeEncodingFlags(s *jsoniter.Stream, flags httpreq.EncodingFlags) error {
s.WriteArrayStart()
defer s.WriteArrayEnd()
@ -329,7 +384,6 @@ func encodeStream(stream logproto.Stream, s *jsoniter.Stream, encodeFlags httpre
encodeLabels(logproto.FromLabelsToLabelAdapters(lbls), s)
s.WriteObjectEnd()
s.Flush()
s.WriteMore()
s.WriteObjectField("values")
@ -373,8 +427,6 @@ func encodeStream(stream logproto.Stream, s *jsoniter.Stream, encodeFlags httpre
s.WriteObjectEnd()
}
s.WriteArrayEnd()
s.Flush()
}
s.WriteArrayEnd()

@ -5,32 +5,6 @@ import (
legacy "github.com/grafana/loki/pkg/loghttp/legacy"
)
// NewTailResponse constructs a TailResponse from a legacy.TailResponse
func NewTailResponse(r legacy.TailResponse) (loghttp.TailResponse, error) {
var err error
ret := loghttp.TailResponse{
Streams: make([]loghttp.Stream, len(r.Streams)),
DroppedStreams: make([]loghttp.DroppedStream, len(r.DroppedEntries)),
}
for i, s := range r.Streams {
ret.Streams[i], err = NewStream(s)
if err != nil {
return loghttp.TailResponse{}, err
}
}
for i, d := range r.DroppedEntries {
ret.DroppedStreams[i], err = NewDroppedStream(&d)
if err != nil {
return loghttp.TailResponse{}, err
}
}
return ret, nil
}
// NewDroppedStream constructs a DroppedStream from a legacy.DroppedEntry
func NewDroppedStream(s *legacy.DroppedEntry) (loghttp.DroppedStream, error) {
l, err := NewLabelSet(s.Labels)

@ -224,6 +224,7 @@ func (ws *websocket) ReadMessage() (int, []byte, error) {
func Test_ReadTailResponse(t *testing.T) {
ws := &websocket{}
wsJSON := marshal.NewWebsocketJSONWriter(ws)
require.NoError(t, marshal.WriteTailResponseJSON(legacy_loghttp.TailResponse{
Streams: []logproto.Stream{
{Labels: `{app="bar"}`, Entries: []logproto.Entry{{Timestamp: time.Unix(0, 2), Line: "2"}}},
@ -231,7 +232,7 @@ func Test_ReadTailResponse(t *testing.T) {
DroppedEntries: []legacy_loghttp.DroppedEntry{
{Timestamp: time.Unix(0, 1), Labels: `{app="foo"}`},
},
}, ws))
}, wsJSON, nil))
res := &loghttp.TailResponse{}
require.NoError(t, ReadTailResponseJSON(res, ws))

Loading…
Cancel
Save