From 545fb1597c73b137b095db5eccdedcb90cc8afd1 Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Wed, 4 Jun 2025 16:39:25 +1000 Subject: [PATCH] refactor(stringlabels): Support stringlabels in `logql/log` package (#17838) This is another step towards supporting Prometheus' stringlabels implementation in Loki. It adds support in the `logql/log` package. Part of https://github.com/grafana/loki/issues/17122 The tests should now compile and pass with `-tags stringlabels`. --- Co-authored-by: Karsten Jeschkies --- pkg/chunkenc/memchunk.go | 4 +- pkg/chunkenc/memchunk_test.go | 2 +- pkg/chunkenc/symbols_test.go | 58 ++--- pkg/chunkenc/unordered.go | 2 +- pkg/chunkenc/unordered_test.go | 24 ++- pkg/chunkenc/variants.go | 2 +- pkg/compactor/retention/retention_test.go | 1 + pkg/dataobj/querier/iter.go | 2 +- pkg/dataobj/querier/store_test.go | 13 ++ pkg/ingester/checkpoint_test.go | 24 ++- pkg/ingester/flush_test.go | 13 ++ pkg/ingester/instance_test.go | 8 +- pkg/ingester/recovery_test.go | 6 +- pkg/ingester/tailer_test.go | 7 +- pkg/logproto/compat.go | 4 + .../log/consolidated_variant_extractor.go | 21 +- .../consolidated_variant_extractor_test.go | 20 +- pkg/logql/log/fmt_test.go | 50 ++--- pkg/logql/log/label_filter_test.go | 8 +- pkg/logql/log/labels.go | 202 +++++++----------- pkg/logql/log/labels_slicelabels.go | 47 ++++ pkg/logql/log/labels_stringlabels.go | 43 ++++ pkg/logql/log/labels_test.go | 110 +++++----- pkg/logql/log/metrics_extraction.go | 24 +-- pkg/logql/log/metrics_extraction_test.go | 72 +++---- pkg/logql/log/parser_hints_test.go | 2 +- pkg/logql/log/parser_test.go | 33 ++- pkg/logql/log/pipeline_test.go | 64 +++--- pkg/logql/syntax/extractor_test.go | 6 +- pkg/logql/syntax/parser.go | 6 +- pkg/logql/syntax/parser_test.go | 18 +- pkg/logql/test_utils.go | 2 +- pkg/storage/lazy_chunk_test.go | 12 +- pkg/storage/store_test.go | 14 +- 34 files changed, 512 insertions(+), 412 deletions(-) create mode 100644 pkg/logql/log/labels_slicelabels.go create mode 100644 pkg/logql/log/labels_stringlabels.go diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index c1fa1fa452..eba2b67f9d 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -1317,7 +1317,7 @@ func (hb *headBlock) SampleIterator( for _, e := range hb.entries { for _, extractor := range extractors { stats.AddHeadChunkBytes(int64(len(e.s))) - samples, ok := extractor.ProcessString(e.t, e.s, e.structuredMetadata...) + samples, ok := extractor.ProcessString(e.t, e.s, e.structuredMetadata) if !ok || len(samples) == 0 { continue } @@ -1769,7 +1769,7 @@ func (e *sampleBufferedIterator) Next() bool { for e.bufferedIterator.Next() { e.stats.AddPostFilterLines(1) - samples, ok := e.extractor.Process(e.currTs, e.currLine, e.currStructuredMetadata...) + samples, ok := e.extractor.Process(e.currTs, e.currLine, e.currStructuredMetadata) if !ok || len(samples) == 0 { continue } diff --git a/pkg/chunkenc/memchunk_test.go b/pkg/chunkenc/memchunk_test.go index 8a06559c70..8f0ecdec97 100644 --- a/pkg/chunkenc/memchunk_test.go +++ b/pkg/chunkenc/memchunk_test.go @@ -527,7 +527,7 @@ func TestSerialization(t *testing.T) { require.Equal(t, labels.FromStrings("foo", strconv.Itoa(i)), logproto.FromLabelAdaptersToLabels(e.StructuredMetadata)) } else { require.Equal(t, labels.EmptyLabels().String(), it.Labels()) - require.Nil(t, e.StructuredMetadata) + require.Empty(t, logproto.FromLabelAdaptersToLabels(e.StructuredMetadata)) } } require.NoError(t, it.Err()) diff --git a/pkg/chunkenc/symbols_test.go b/pkg/chunkenc/symbols_test.go index 77e47da250..31bfbecca5 100644 --- a/pkg/chunkenc/symbols_test.go +++ b/pkg/chunkenc/symbols_test.go @@ -161,32 +161,32 @@ func TestSymbolizerLabelNormalization(t *testing.T) { { name: "basic label normalization", labelsToAdd: []labels.Labels{ - { - {Name: "foo-bar", Value: "value1"}, - {Name: "fizz_buzz", Value: "value2"}, - }, + labels.FromStrings( + "foo-bar", "value1", + "fizz_buzz", "value2", + ), }, expectedLabels: []labels.Labels{ - { - {Name: "foo_bar", Value: "value1"}, - {Name: "fizz_buzz", Value: "value2"}, - }, + labels.FromStrings( + "foo_bar", "value1", + "fizz_buzz", "value2", + ), }, description: "hyphens should be converted to underscores in label names", }, { name: "same string as name and value", labelsToAdd: []labels.Labels{ - { - {Name: "foo-bar", Value: "foo-bar"}, - {Name: "fizz-buzz", Value: "fizz-buzz"}, - }, + labels.FromStrings( + "foo-bar", "foo-bar", + "fizz-buzz", "fizz-buzz", + ), }, expectedLabels: []labels.Labels{ - { - {Name: "foo_bar", Value: "foo-bar"}, - {Name: "fizz_buzz", Value: "fizz-buzz"}, - }, + labels.FromStrings( + "foo_bar", "foo-bar", + "fizz_buzz", "fizz-buzz", + ), }, description: "only normalize when string is used as a name, not as a value", }, @@ -267,10 +267,10 @@ func TestSymbolizerLabelNormalizationAfterCheckpointing(t *testing.T) { s := newSymbolizer() // Add some labels and serialize them - originalLabels := labels.Labels{ - {Name: "foo-bar", Value: "value1"}, - {Name: "fizz-buzz", Value: "value2"}, - } + originalLabels := labels.FromStrings( + "foo-bar", "value1", + "fizz-buzz", "value2", + ) _, err := s.Add(originalLabels) require.NoError(t, err) @@ -282,19 +282,21 @@ func TestSymbolizerLabelNormalizationAfterCheckpointing(t *testing.T) { loaded := symbolizerFromCheckpoint(buf.Bytes()) // Add new labels with the same names but different values - newLabels := labels.Labels{ - {Name: "foo-bar", Value: "new-value1"}, - {Name: "fizz-buzz", Value: "new-value2"}, - } + newLabels := labels.FromStrings( + "foo-bar", "new-value1", + "fizz-buzz", "new-value2", + ) symbols, err := loaded.Add(newLabels) require.NoError(t, err) // Check that the normalization is consistent result := loaded.Lookup(symbols, nil) - require.Equal(t, "foo_bar", result[0].Name, "first label should be normalized") - require.Equal(t, "new-value1", result[0].Value, "first value should be unchanged") - require.Equal(t, "fizz_buzz", result[1].Name, "second label should be normalized") - require.Equal(t, "new-value2", result[1].Value, "second value should be unchanged") + expected := map[string]string{ + "foo_bar": "new-value1", + "fizz_buzz": "new-value2", + } + + require.Equal(t, expected, result.Map(), "label names should be normalized") } func TestSymbolizerLabelNormalizationSameNameValue(t *testing.T) { diff --git a/pkg/chunkenc/unordered.go b/pkg/chunkenc/unordered.go index b982ea3d1f..fa818409ad 100644 --- a/pkg/chunkenc/unordered.go +++ b/pkg/chunkenc/unordered.go @@ -333,7 +333,7 @@ func (hb *unorderedHeadBlock) SampleIterator( structuredMetadata = hb.symbolizer.Lookup(structuredMetadataSymbols, labelsBuilder) for _, extractor := range extractor { - samples, ok := extractor.ProcessString(ts, line, structuredMetadata...) + samples, ok := extractor.ProcessString(ts, line, structuredMetadata) if !ok || len(samples) == 0 { return nil } diff --git a/pkg/chunkenc/unordered_test.go b/pkg/chunkenc/unordered_test.go index 3c2fa35c46..b318f40e4e 100644 --- a/pkg/chunkenc/unordered_test.go +++ b/pkg/chunkenc/unordered_test.go @@ -26,9 +26,7 @@ func iterEq(t *testing.T, exp []entry, got iter.EntryIterator) { Timestamp: time.Unix(0, exp[i].t), Line: exp[i].s, StructuredMetadata: logproto.FromLabelsToLabelAdapters(exp[i].structuredMetadata), - } - if exp[i].structuredMetadata.IsEmpty() { - expected.StructuredMetadata = nil + Parsed: logproto.EmptyLabelAdapters(), } require.Equal(t, expected, got.At()) require.Equal(t, exp[i].structuredMetadata.String(), got.Labels()) @@ -729,20 +727,24 @@ func TestReorderAcrossBlocks(t *testing.T) { exp := []entry{ { - t: time.Unix(1, 0).UnixNano(), - s: "1", + t: time.Unix(1, 0).UnixNano(), + s: "1", + structuredMetadata: labels.EmptyLabels(), }, { - t: time.Unix(3, 0).UnixNano(), - s: "3", + t: time.Unix(3, 0).UnixNano(), + s: "3", + structuredMetadata: labels.EmptyLabels(), }, { - t: time.Unix(5, 0).UnixNano(), - s: "5", + t: time.Unix(5, 0).UnixNano(), + s: "5", + structuredMetadata: labels.EmptyLabels(), }, { - t: time.Unix(7, 0).UnixNano(), - s: "7", + t: time.Unix(7, 0).UnixNano(), + s: "7", + structuredMetadata: labels.EmptyLabels(), }, } iterEq(t, exp, itr) diff --git a/pkg/chunkenc/variants.go b/pkg/chunkenc/variants.go index 9be50fecad..613488d247 100644 --- a/pkg/chunkenc/variants.go +++ b/pkg/chunkenc/variants.go @@ -58,7 +58,7 @@ func (e *multiExtractorSampleBufferedIterator) Next() bool { e.stats.AddPostFilterLines(1) for _, extractor := range e.extractors { - samples, ok := extractor.Process(e.currTs, e.currLine, e.currStructuredMetadata...) + samples, ok := extractor.Process(e.currTs, e.currLine, e.currStructuredMetadata) if !ok || len(samples) == 0 { continue } diff --git a/pkg/compactor/retention/retention_test.go b/pkg/compactor/retention/retention_test.go index fce7bb6dd8..90c58e7b73 100644 --- a/pkg/compactor/retention/retention_test.go +++ b/pkg/compactor/retention/retention_test.go @@ -631,6 +631,7 @@ func TestChunkRewriter(t *testing.T) { Timestamp: curr.Time(), Line: curr.String(), StructuredMetadata: logproto.FromLabelsToLabelAdapters(expectedStructuredMetadata), + Parsed: logproto.EmptyLabelAdapters(), }, newChunkItr.At()) require.Equal(t, expectedStructuredMetadata.String(), newChunkItr.Labels()) } diff --git a/pkg/dataobj/querier/iter.go b/pkg/dataobj/querier/iter.go index cd4a62ebf4..855a882c0b 100644 --- a/pkg/dataobj/querier/iter.go +++ b/pkg/dataobj/querier/iter.go @@ -252,7 +252,7 @@ func newSampleIterator(ctx context.Context, timestamp := record.Timestamp.UnixNano() statistics.AddDecompressedLines(1) - samples, ok := streamExtractor.Process(timestamp, record.Line, record.Metadata...) + samples, ok := streamExtractor.Process(timestamp, record.Line, record.Metadata) if !ok { continue } diff --git a/pkg/dataobj/querier/store_test.go b/pkg/dataobj/querier/store_test.go index e9c29d528b..a951d2792d 100644 --- a/pkg/dataobj/querier/store_test.go +++ b/pkg/dataobj/querier/store_test.go @@ -377,8 +377,21 @@ func TestStore_SelectLogs(t *testing.T) { }, } + emptyLabelAdapters := logproto.EmptyLabelAdapters() + for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + // Make sure all empty label sets use an empty slice, rather than nil, to make assertions below easier. + for i := range tt.want { + if len(tt.want[i].Entry.Parsed) == 0 { + tt.want[i].Entry.Parsed = emptyLabelAdapters + } + + if len(tt.want[i].Entry.StructuredMetadata) == 0 { + tt.want[i].Entry.StructuredMetadata = emptyLabelAdapters + } + } + it, err := store.SelectLogs(ctx, logql.SelectLogParams{ QueryRequest: &logproto.QueryRequest{ Start: tt.start, diff --git a/pkg/ingester/checkpoint_test.go b/pkg/ingester/checkpoint_test.go index e7f72153f1..ccffe4f34a 100644 --- a/pkg/ingester/checkpoint_test.go +++ b/pkg/ingester/checkpoint_test.go @@ -424,12 +424,16 @@ var ( Labels: labels.FromStrings("stream", "1").String(), Entries: []logproto.Entry{ { - Timestamp: time.Unix(0, 1), - Line: "1", + Timestamp: time.Unix(0, 1), + Line: "1", + StructuredMetadata: logproto.EmptyLabelAdapters(), + Parsed: logproto.EmptyLabelAdapters(), }, { - Timestamp: time.Unix(0, 2), - Line: "2", + Timestamp: time.Unix(0, 2), + Line: "2", + StructuredMetadata: logproto.EmptyLabelAdapters(), + Parsed: logproto.EmptyLabelAdapters(), }, }, } @@ -437,12 +441,16 @@ var ( Labels: labels.FromStrings("stream", "2").String(), Entries: []logproto.Entry{ { - Timestamp: time.Unix(0, 1), - Line: "3", + Timestamp: time.Unix(0, 1), + Line: "3", + StructuredMetadata: logproto.EmptyLabelAdapters(), + Parsed: logproto.EmptyLabelAdapters(), }, { - Timestamp: time.Unix(0, 2), - Line: "4", + Timestamp: time.Unix(0, 2), + Line: "4", + StructuredMetadata: logproto.EmptyLabelAdapters(), + Parsed: logproto.EmptyLabelAdapters(), }, }, } diff --git a/pkg/ingester/flush_test.go b/pkg/ingester/flush_test.go index 5ad4cc91b3..f9536b8df7 100644 --- a/pkg/ingester/flush_test.go +++ b/pkg/ingester/flush_test.go @@ -568,6 +568,19 @@ func buildTestStreams(offset int) []logproto.Stream { // check that the store is holding data equivalent to what we expect func (s *testStore) checkData(t *testing.T, testData map[string][]logproto.Stream) { for userID, expected := range testData { + // Ensure all empty label sets use an empty set of adapters, rather than a nil slice, to make the assertion below easier. + for _, stream := range expected { + for i := range stream.Entries { + if len(stream.Entries[i].Parsed) == 0 { + stream.Entries[i].Parsed = logproto.EmptyLabelAdapters() + } + + if len(stream.Entries[i].StructuredMetadata) == 0 { + stream.Entries[i].StructuredMetadata = logproto.EmptyLabelAdapters() + } + } + } + streams := s.getStreamsForUser(t, userID) require.Equal(t, expected, streams) } diff --git a/pkg/ingester/instance_test.go b/pkg/ingester/instance_test.go index ee6348c941..e83ba64792 100644 --- a/pkg/ingester/instance_test.go +++ b/pkg/ingester/instance_test.go @@ -1035,14 +1035,14 @@ func (p *mockStreamExtractor) BaseLabels() log.LabelsResult { return p.wrappedSP.BaseLabels() } -func (p *mockStreamExtractor) Process(ts int64, line []byte, lbs ...labels.Label) ([]log.ExtractedSample, bool) { +func (p *mockStreamExtractor) Process(ts int64, line []byte, lbs labels.Labels) ([]log.ExtractedSample, bool) { p.called++ - return p.wrappedSP.Process(ts, line, lbs...) + return p.wrappedSP.Process(ts, line, lbs) } -func (p *mockStreamExtractor) ProcessString(ts int64, line string, lbs ...labels.Label) ([]log.ExtractedSample, bool) { +func (p *mockStreamExtractor) ProcessString(ts int64, line string, lbs labels.Labels) ([]log.ExtractedSample, bool) { p.called++ - return p.wrappedSP.ProcessString(ts, line, lbs...) + return p.wrappedSP.ProcessString(ts, line, lbs) } func Test_QueryWithDelete(t *testing.T) { diff --git a/pkg/ingester/recovery_test.go b/pkg/ingester/recovery_test.go index ad6daf44a7..99b1c0b4a6 100644 --- a/pkg/ingester/recovery_test.go +++ b/pkg/ingester/recovery_test.go @@ -294,8 +294,10 @@ func TestSeriesRecoveryNoDuplicates(t *testing.T) { Labels: lbls.String(), Entries: []logproto.Entry{ { - Timestamp: time.Unix(1, 0), - Line: "line 1", + Timestamp: time.Unix(1, 0), + Line: "line 1", + StructuredMetadata: logproto.EmptyLabelAdapters(), + Parsed: logproto.EmptyLabelAdapters(), }, }, Hash: lbls.Hash(), diff --git a/pkg/ingester/tailer_test.go b/pkg/ingester/tailer_test.go index 1c5eb62652..c3d7b083d7 100644 --- a/pkg/ingester/tailer_test.go +++ b/pkg/ingester/tailer_test.go @@ -315,9 +315,10 @@ func Test_StructuredMetadata(t *testing.T) { Labels: labels.NewBuilder(lbs).Set("foo", "1").Labels().String(), Entries: []logproto.Entry{ { - Timestamp: time.Unix(0, 1), - Line: "foo=1", - Parsed: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "1")), + Timestamp: time.Unix(0, 1), + Line: "foo=1", + Parsed: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "1")), + StructuredMetadata: logproto.EmptyLabelAdapters(), }, }, }, diff --git a/pkg/logproto/compat.go b/pkg/logproto/compat.go index 812f54cbee..85d6faf15b 100644 --- a/pkg/logproto/compat.go +++ b/pkg/logproto/compat.go @@ -61,6 +61,10 @@ func FromLabelsToLabelAdapters(ls labels.Labels) []LabelAdapter { return *(*[]LabelAdapter)(unsafe.Pointer(&ls)) // #nosec G103 -- we know the string is not mutated } +func EmptyLabelAdapters() []LabelAdapter { + return FromLabelsToLabelAdapters(labels.EmptyLabels()) +} + // FromLabelAdaptersToMetric converts []LabelAdapter to a model.Metric. // Don't do this on any performance sensitive paths. func FromLabelAdaptersToMetric(ls []LabelAdapter) model.Metric { diff --git a/pkg/logql/log/consolidated_variant_extractor.go b/pkg/logql/log/consolidated_variant_extractor.go index ec81f818c9..0c3e1cd15a 100644 --- a/pkg/logql/log/consolidated_variant_extractor.go +++ b/pkg/logql/log/consolidated_variant_extractor.go @@ -37,7 +37,7 @@ func (c *consolidatedMultiVariantStreamExtractor) BaseLabels() LabelsResult { return c.commonPipeline.BaseLabels() } -func (c *consolidatedMultiVariantStreamExtractor) Process(ts int64, line []byte, structuredMetadata ...labels.Label) ([]ExtractedSample, bool) { +func (c *consolidatedMultiVariantStreamExtractor) Process(ts int64, line []byte, structuredMetadata labels.Labels) ([]ExtractedSample, bool) { // Process the line through the common pipeline processedLine, commonLabels, ok := c.commonPipeline.Process(ts, line, structuredMetadata) if !ok { @@ -57,7 +57,7 @@ func (c *consolidatedMultiVariantStreamExtractor) Process(ts int64, line []byte, lbls := commonLabels.Labels() for i, variant := range c.variants { streamVariantExtractor := variant.ForStream(lbls) - samples, ok := streamVariantExtractor.Process(ts, processedLine, commonStructuredMetadata...) + samples, ok := streamVariantExtractor.Process(ts, processedLine, commonStructuredMetadata) if ok { for _, sample := range samples { sample.Labels = appendVariantLabel(sample.Labels, i) @@ -74,14 +74,15 @@ func (c *consolidatedMultiVariantStreamExtractor) Process(ts int64, line []byte, } func appendVariantLabel(lbls LabelsResult, variantIndex int) LabelsResult { - streamLbls := lbls.Stream() + newLblsBuilder := labels.NewScratchBuilder(lbls.Stream().Len() + 1) - newLbls := make(labels.Labels, 0, len(streamLbls)+1) - newLbls = append(newLbls, labels.Label{ - Name: constants.VariantLabel, - Value: strconv.Itoa(variantIndex), + lbls.Stream().Range(func(l labels.Label) { + newLblsBuilder.Add(l.Name, l.Value) }) - newLbls = append(newLbls, streamLbls...) + + newLblsBuilder.Add(constants.VariantLabel, strconv.Itoa(variantIndex)) + newLblsBuilder.Sort() + newLbls := newLblsBuilder.Labels() builder := NewBaseLabelsBuilder().ForLabels(newLbls, newLbls.Hash()) builder.Add(StructuredMetadataLabel, lbls.StructuredMetadata()) @@ -89,8 +90,8 @@ func appendVariantLabel(lbls LabelsResult, variantIndex int) LabelsResult { return builder.LabelsResult() } -func (c *consolidatedMultiVariantStreamExtractor) ProcessString(ts int64, line string, structuredMetadata ...labels.Label) ([]ExtractedSample, bool) { - return c.Process(ts, unsafeGetBytes(line), structuredMetadata...) +func (c *consolidatedMultiVariantStreamExtractor) ProcessString(ts int64, line string, structuredMetadata labels.Labels) ([]ExtractedSample, bool) { + return c.Process(ts, unsafeGetBytes(line), structuredMetadata) } func (c *consolidatedMultiVariantStreamExtractor) ReferencedStructuredMetadata() bool { diff --git a/pkg/logql/log/consolidated_variant_extractor_test.go b/pkg/logql/log/consolidated_variant_extractor_test.go index b49415ab1a..eebe9ad53d 100644 --- a/pkg/logql/log/consolidated_variant_extractor_test.go +++ b/pkg/logql/log/consolidated_variant_extractor_test.go @@ -34,12 +34,10 @@ func Test_ConsolidatedMultiVariantExtractor(t *testing.T) { // Attempt to create a consolidated multi-variant extractor // This will fail because the function doesn't exist yet extractor := NewConsolidatedMultiVariantExtractor(commonPipeline, variants) - streamExtractor := extractor.ForStream(labels.Labels{ - {Name: "foo", Value: "bar"}, - }) + streamExtractor := extractor.ForStream(labels.FromStrings("foo", "bar")) // Process a log line - samples, ok := streamExtractor.ProcessString(1000, "test log line") + samples, ok := streamExtractor.ProcessString(1000, "test log line", labels.EmptyLabels()) require.True(t, ok) require.Len(t, samples, 2) @@ -80,8 +78,8 @@ func (m *MockCommonPipeline) Reset() { panic("not implemented") // TODO: Implement } -func (m *MockCommonPipeline) ForStream(labels labels.Labels) StreamPipeline { - lblsResult := NewLabelsResult(labels.String(), labels.Hash(), labels, nil, nil) +func (m *MockCommonPipeline) ForStream(lbls labels.Labels) StreamPipeline { + lblsResult := NewLabelsResult(lbls.String(), lbls.Hash(), lbls, labels.EmptyLabels(), labels.EmptyLabels()) return &MockCommonStreamPipeline{ counter: m.counter, shouldPass: m.shouldPass, @@ -122,8 +120,8 @@ type MockVariantSpecificExtractor struct { shouldExtract bool } -func (m *MockVariantSpecificExtractor) ForStream(labels labels.Labels) StreamSampleExtractor { - lblsResult := NewLabelsResult(labels.String(), labels.Hash(), labels, nil, nil) +func (m *MockVariantSpecificExtractor) ForStream(lbls labels.Labels) StreamSampleExtractor { + lblsResult := NewLabelsResult(lbls.String(), lbls.Hash(), lbls, labels.EmptyLabels(), labels.EmptyLabels()) return &mockVariantSpecificStreamExtractor{ valueToExtract: m.valueToExtract, shouldExtract: m.shouldExtract, @@ -142,7 +140,7 @@ func (m *mockVariantSpecificStreamExtractor) BaseLabels() LabelsResult { panic("not implemented") // TODO: Implement } -func (m *mockVariantSpecificStreamExtractor) Process(_ int64, _ []byte, _ ...labels.Label) ([]ExtractedSample, bool) { +func (m *mockVariantSpecificStreamExtractor) Process(_ int64, _ []byte, _ labels.Labels) ([]ExtractedSample, bool) { result := []ExtractedSample{ { Value: m.valueToExtract, @@ -153,8 +151,8 @@ func (m *mockVariantSpecificStreamExtractor) Process(_ int64, _ []byte, _ ...lab return result, m.shouldExtract } -func (m *mockVariantSpecificStreamExtractor) ProcessString(ts int64, line string, lbls ...labels.Label) ([]ExtractedSample, bool) { - return m.Process(ts, []byte(line), lbls...) +func (m *mockVariantSpecificStreamExtractor) ProcessString(ts int64, line string, lbls labels.Labels) ([]ExtractedSample, bool) { + return m.Process(ts, []byte(line), lbls) } func (m *mockVariantSpecificStreamExtractor) ReferencedStructuredMetadata() bool { diff --git a/pkg/logql/log/fmt_test.go b/pkg/logql/log/fmt_test.go index decc25f0fe..4bbc12cb2d 100644 --- a/pkg/logql/log/fmt_test.go +++ b/pkg/logql/log/fmt_test.go @@ -711,47 +711,47 @@ func Test_labelsFormatter_Format(t *testing.T) { { "unixToTime days", mustNewLabelsFormatter([]LabelFmt{NewTemplateLabelFmt("foo", `{{ .bar | unixToTime }}`)}), - labels.Labels{{Name: "foo", Value: ""}, {Name: "bar", Value: "19503"}}, - labels.Labels{ - {Name: "bar", Value: "19503"}, - {Name: "foo", Value: epochDay19503.String()}, - }, + labels.FromStrings("foo", "", "bar", "19503"), + labels.FromStrings( + "bar", "19503", + "foo", epochDay19503.String(), + ), }, { "unixToTime seconds", mustNewLabelsFormatter([]LabelFmt{NewTemplateLabelFmt("foo", `{{ .bar | unixToTime }}`)}), - labels.Labels{{Name: "foo", Value: ""}, {Name: "bar", Value: "1679577215"}}, - labels.Labels{ - {Name: "bar", Value: "1679577215"}, - {Name: "foo", Value: epochSeconds1679577215.String()}, - }, + labels.FromStrings("foo", "", "bar", "1679577215"), + labels.FromStrings( + "bar", "1679577215", + "foo", epochSeconds1679577215.String(), + ), }, { "unixToTime milliseconds", mustNewLabelsFormatter([]LabelFmt{NewTemplateLabelFmt("foo", `{{ .bar | unixToTime }}`)}), - labels.Labels{{Name: "foo", Value: ""}, {Name: "bar", Value: "1257894000000"}}, - labels.Labels{ - {Name: "bar", Value: "1257894000000"}, - {Name: "foo", Value: epochMilliseconds1257894000000.String()}, - }, + labels.FromStrings("foo", "", "bar", "1257894000000"), + labels.FromStrings( + "bar", "1257894000000", + "foo", epochMilliseconds1257894000000.String(), + ), }, { "unixToTime microseconds", mustNewLabelsFormatter([]LabelFmt{NewTemplateLabelFmt("foo", `{{ .bar | unixToTime }}`)}), - labels.Labels{{Name: "foo", Value: ""}, {Name: "bar", Value: "1673798889902000"}}, - labels.Labels{ - {Name: "bar", Value: "1673798889902000"}, - {Name: "foo", Value: epochMicroseconds1673798889902000.String()}, - }, + labels.FromStrings("foo", "", "bar", "1673798889902000"), + labels.FromStrings( + "bar", "1673798889902000", + "foo", epochMicroseconds1673798889902000.String(), + ), }, { "unixToTime nanoseconds", mustNewLabelsFormatter([]LabelFmt{NewTemplateLabelFmt("foo", `{{ .bar | unixToTime }}`)}), - labels.Labels{{Name: "foo", Value: ""}, {Name: "bar", Value: "1000000000000000000"}}, - labels.Labels{ - {Name: "bar", Value: "1000000000000000000"}, - {Name: "foo", Value: epochNanoseconds1000000000000000000.String()}, - }, + labels.FromStrings("foo", "", "bar", "1000000000000000000"), + labels.FromStrings( + "bar", "1000000000000000000", + "foo", epochNanoseconds1000000000000000000.String(), + ), }, } diff --git a/pkg/logql/log/label_filter_test.go b/pkg/logql/log/label_filter_test.go index b6364dc0c3..be52df1afb 100644 --- a/pkg/logql/log/label_filter_test.go +++ b/pkg/logql/log/label_filter_test.go @@ -385,25 +385,25 @@ func TestStringLabelFilter(t *testing.T) { { name: `logfmt|msg=~"(?i)hello" (with label)`, filter: NewStringLabelFilter(labels.MustNewMatcher(labels.MatchRegexp, "msg", "(?i)hello")), - labels: labels.Labels{{Name: "msg", Value: "HELLO"}, {Name: "subqueries", Value: ""}}, // label `msg` contains HELLO + labels: labels.FromStrings("msg", "HELLO", "subqueries", ""), // label `msg` contains HELLO shouldMatch: true, }, { name: `logfmt|msg=~"(?i)hello" (with label)`, filter: NewStringLabelFilter(labels.MustNewMatcher(labels.MatchRegexp, "msg", "(?i)hello")), - labels: labels.Labels{{Name: "msg", Value: "hello"}, {Name: "subqueries", Value: ""}}, // label `msg` contains hello + labels: labels.FromStrings("msg", "hello", "subqueries", ""), // label `msg` contains hello shouldMatch: true, }, { name: `logfmt|msg=~"(?i)HELLO" (with label)`, filter: NewStringLabelFilter(labels.MustNewMatcher(labels.MatchRegexp, "msg", "(?i)HELLO")), - labels: labels.Labels{{Name: "msg", Value: "HELLO"}, {Name: "subqueries", Value: ""}}, // label `msg` contains HELLO + labels: labels.FromStrings("msg", "HELLO", "subqueries", ""), // label `msg` contains HELLO shouldMatch: true, }, { name: `logfmt|msg=~"(?i)HELLO" (with label)`, filter: NewStringLabelFilter(labels.MustNewMatcher(labels.MatchRegexp, "msg", "(?i)HELLO")), - labels: labels.Labels{{Name: "msg", Value: "hello"}, {Name: "subqueries", Value: ""}}, // label `msg` contains hello + labels: labels.FromStrings("msg", "hello", "subqueries", ""), // label `msg` contains hello shouldMatch: true, }, } diff --git a/pkg/logql/log/labels.go b/pkg/logql/log/labels.go index 684be91499..3585009021 100644 --- a/pkg/logql/log/labels.go +++ b/pkg/logql/log/labels.go @@ -2,9 +2,6 @@ package log import ( "fmt" - "slices" - "sort" - "strings" "sync" "github.com/prometheus/prometheus/model/labels" @@ -53,7 +50,21 @@ func (l labelsResult) String() string { } func (l labelsResult) Labels() labels.Labels { - return flattenLabels(nil, l.stream, l.structuredMetadata, l.parsed) + size := l.stream.Len() + l.structuredMetadata.Len() + l.parsed.Len() + b := labels.NewScratchBuilder(size) + + l.stream.Range(func(l labels.Label) { + b.Add(l.Name, l.Value) + }) + l.structuredMetadata.Range(func(l labels.Label) { + b.Add(l.Name, l.Value) + }) + l.parsed.Range(func(l labels.Label) { + b.Add(l.Name, l.Value) + }) + + b.Sort() + return b.Labels() } func (l labelsResult) Hash() uint64 { @@ -61,44 +72,17 @@ func (l labelsResult) Hash() uint64 { } func (l labelsResult) Stream() labels.Labels { - if len(l.stream) == 0 { - return nil - } return l.stream } func (l labelsResult) StructuredMetadata() labels.Labels { - if len(l.structuredMetadata) == 0 { - return nil - } return l.structuredMetadata } func (l labelsResult) Parsed() labels.Labels { - if len(l.parsed) == 0 { - return nil - } return l.parsed } -type hasher struct { - buf []byte // buffer for computing hash without bytes slice allocation. -} - -// newHasher allow to compute hashes for labels by reusing the same buffer. -func newHasher() *hasher { - return &hasher{ - buf: make([]byte, 0, 1024), - } -} - -// Hash hashes the labels -func (h *hasher) Hash(lbs labels.Labels) uint64 { - var hash uint64 - hash, h.buf = lbs.HashWithoutLabels(h.buf, []string(nil)...) - return hash -} - type LabelCategory int const ( @@ -129,7 +113,7 @@ func categoriesContain(categories []LabelCategory, category LabelCategory) bool // Only one base builder is used and it contains cache for each LabelsBuilders. type BaseLabelsBuilder struct { del []string - add [numValidCategories]labels.Labels + add [numValidCategories][]labels.Label // nolint:structcheck // https://github.com/golangci/golangci-lint/issues/826 err string @@ -150,7 +134,7 @@ type BaseLabelsBuilder struct { // LabelsBuilder is the same as labels.Builder but tailored for this package. type LabelsBuilder struct { base labels.Labels - buf labels.Labels + buf []labels.Label currentResult LabelsResult groupedResult LabelsResult @@ -166,10 +150,10 @@ func NewBaseLabelsBuilderWithGrouping(groups []string, parserKeyHints ParserHint const labelsCapacity = 16 return &BaseLabelsBuilder{ del: make([]string, 0, 5), - add: [numValidCategories]labels.Labels{ - StreamLabel: make(labels.Labels, 0, labelsCapacity), - StructuredMetadataLabel: make(labels.Labels, 0, labelsCapacity), - ParsedLabel: make(labels.Labels, 0, labelsCapacity), + add: [numValidCategories][]labels.Label{ + StreamLabel: make([]labels.Label, 0, labelsCapacity), + StructuredMetadataLabel: make([]labels.Label, 0, labelsCapacity), + ParsedLabel: make([]labels.Label, 0, labelsCapacity), }, resultCache: make(map[uint64]LabelsResult), hasher: newHasher(), @@ -315,11 +299,12 @@ func (b *LabelsBuilder) getWithCategory(key string) (string, LabelCategory, bool } } - for _, l := range b.base { - if l.Name == key { - return l.Value, StreamLabel, true - } + value := b.base.Get(key) + + if value != "" { + return value, StreamLabel, true } + return "", InvalidCategory, false } @@ -437,15 +422,7 @@ func (b *LabelsBuilder) GetJSONPath(labelName string) []string { return path } -// Labels returns the labels from the builder. If no modifications -// were made, the original labels are returned. -func (b *LabelsBuilder) labels(categories ...LabelCategory) labels.Labels { - b.buf = b.UnsortedLabels(b.buf, categories...) - sort.Sort(b.buf) - return b.buf -} - -func (b *LabelsBuilder) appendErrors(buf labels.Labels) labels.Labels { +func (b *LabelsBuilder) appendErrors(buf []labels.Label) []labels.Label { if b.err != "" { buf = append(buf, labels.Label{ Name: logqlmodel.ErrorLabel, @@ -461,18 +438,22 @@ func (b *LabelsBuilder) appendErrors(buf labels.Labels) labels.Labels { return buf } -func (b *LabelsBuilder) UnsortedLabels(buf labels.Labels, categories ...LabelCategory) labels.Labels { +func (b *LabelsBuilder) UnsortedLabels(buf []labels.Label, categories ...LabelCategory) []labels.Label { if categories == nil { categories = allCategories } if !b.hasDel() && !b.hasAdd() && categoriesContain(categories, StreamLabel) { if buf == nil { - buf = make(labels.Labels, 0, len(b.base)+1) // +1 for error label. + buf = make([]labels.Label, 0, b.base.Len()+1) // +1 for error label. } else { buf = buf[:0] } - buf = append(buf, b.base...) + + b.base.Range(func(l labels.Label) { + buf = append(buf, l) + }) + if categoriesContain(categories, ParsedLabel) { buf = b.appendErrors(buf) } @@ -483,39 +464,38 @@ func (b *LabelsBuilder) UnsortedLabels(buf labels.Labels, categories ...LabelCat // In the general case, labels are removed, modified or moved // rather than added. if buf == nil { - size := len(b.base) + b.sizeAdd() + 1 - buf = make(labels.Labels, 0, size) + size := b.base.Len() + b.sizeAdd() + 1 + buf = make([]labels.Label, 0, size) } else { buf = buf[:0] } if categoriesContain(categories, StreamLabel) { - Outer: - for _, l := range b.base { + b.base.Range(func(l labels.Label) { // Skip stream labels to be deleted for _, n := range b.del { if l.Name == n { - continue Outer + return } } // Skip stream labels which value will be replaced by structured metadata if labelsContain(b.add[StructuredMetadataLabel], l.Name) { - continue + return } // Skip stream labels which value will be replaced by parsed labels if labelsContain(b.add[ParsedLabel], l.Name) { - continue + return } // Take value from stream label if present - if labelsContain(b.add[StreamLabel], l.Name) { - buf = append(buf, labels.Label{Name: l.Name, Value: b.add[StreamLabel].Get(l.Name)}) + if value, found := findLabelValue(b.add[StreamLabel], l.Name); found { + buf = append(buf, labels.Label{Name: l.Name, Value: value}) } else { buf = append(buf, l) } - } + }) } if categoriesContain(categories, StructuredMetadataLabel) { @@ -611,9 +591,8 @@ func (b *LabelsBuilder) LabelsResult() LabelsResult { // Get all labels at once and sort them b.buf = b.UnsortedLabels(b.buf) - // sort.Sort(b.buf) - slices.SortFunc(b.buf, func(a, b labels.Label) int { return strings.Compare(a.Name, b.Name) }) - hash := b.hasher.Hash(b.buf) + lbls := labels.New(b.buf...) + hash := b.hasher.Hash(lbls) if cached, ok := b.resultCache[hash]; ok { return cached @@ -639,47 +618,38 @@ func (b *LabelsBuilder) LabelsResult() LabelsResult { } } - result := NewLabelsResult(b.buf.String(), hash, labels.New(stream...), labels.New(meta...), labels.New(parsed...)) + result := NewLabelsResult(lbls.String(), hash, labels.New(stream...), labels.New(meta...), labels.New(parsed...)) b.resultCache[hash] = result return result } -func flattenLabels(buf labels.Labels, many ...labels.Labels) labels.Labels { - var size int - for _, lbls := range many { - size += len(lbls) - } - - if buf == nil || cap(buf) < size { - buf = make(labels.Labels, 0, size) - } else { - buf = buf[:0] - } - - for _, lbls := range many { - buf = append(buf, lbls...) +func labelsContain(labels []labels.Label, name string) bool { + for _, l := range labels { + if l.Name == name { + return true + } } - sort.Sort(buf) - return buf + return false } -func labelsContain(labels labels.Labels, name string) bool { +func findLabelValue(labels []labels.Label, name string) (string, bool) { for _, l := range labels { if l.Name == name { - return true + return l.Value, true } } - return false + return "", false } -func (b *BaseLabelsBuilder) toUncategorizedResult(buf labels.Labels) LabelsResult { - hash := b.hasher.Hash(buf) +func (b *BaseLabelsBuilder) toUncategorizedResult(buf []labels.Label) LabelsResult { + lbls := labels.New(buf...) + hash := b.hasher.Hash(lbls) if cached, ok := b.resultCache[hash]; ok { return cached } - res := NewLabelsResult(buf.String(), hash, buf.Copy(), nil, nil) + res := NewLabelsResult(lbls.String(), hash, lbls, labels.EmptyLabels(), labels.EmptyLabels()) b.resultCache[hash] = res return res } @@ -714,7 +684,7 @@ func (b *LabelsBuilder) GroupedLabels() LabelsResult { func (b *LabelsBuilder) withResult() LabelsResult { if b.buf == nil { - b.buf = make(labels.Labels, 0, len(b.groups)) + b.buf = make([]labels.Label, 0, len(b.groups)) } else { b.buf = b.buf[:0] } @@ -736,11 +706,10 @@ Outer: } } } - for _, l := range b.base { - if g == l.Name { - b.buf = append(b.buf, l) - continue Outer - } + + value := b.base.Get(g) + if value != "" { + b.buf = append(b.buf, labels.Label{Name: g, Value: value}) } } return b.toUncategorizedResult(b.buf) @@ -748,35 +717,35 @@ Outer: func (b *LabelsBuilder) withoutResult() LabelsResult { if b.buf == nil { - size := len(b.base) + b.sizeAdd() - len(b.del) - len(b.groups) + size := b.base.Len() + b.sizeAdd() - len(b.del) - len(b.groups) if size < 0 { size = 0 } - b.buf = make(labels.Labels, 0, size) + b.buf = make([]labels.Label, 0, size) } else { b.buf = b.buf[:0] } -Outer: - for _, l := range b.base { + + b.base.Range(func(l labels.Label) { for _, n := range b.del { if l.Name == n { - continue Outer + return } } for _, lbls := range b.add { for _, la := range lbls { if l.Name == la.Name { - continue Outer + return } } } for _, lg := range b.groups { if l.Name == lg { - continue Outer + return } } b.buf = append(b.buf, l) - } + }) for category, lbls := range b.add { OuterAdd: @@ -792,7 +761,7 @@ Outer: b.buf = append(b.buf, la) } } - sort.Sort(b.buf) + return b.toUncategorizedResult(b.buf) } @@ -806,7 +775,7 @@ func (b *LabelsBuilder) toBaseGroup() LabelsResult { } else { lbs = labels.NewBuilder(b.base).Keep(b.groups...).Labels() } - res := NewLabelsResult(lbs.String(), lbs.Hash(), lbs, nil, nil) + res := NewLabelsResult(lbs.String(), lbs.Hash(), lbs, labels.EmptyLabels(), labels.EmptyLabels()) b.groupedResult = res return res } @@ -831,26 +800,3 @@ func (i internedStringSet) Get(data []byte, createNew func() (string, bool)) (st }{s: newStr, ok: ok} return newStr, ok } - -// BufferedLabelsBuilder is a simple builder that uses a label buffer passed in. -// It is used to avoid allocations when building labels. -type BufferedLabelsBuilder struct { - buf labels.Labels -} - -func NewBufferedLabelsBuilder(labels labels.Labels) *BufferedLabelsBuilder { - return &BufferedLabelsBuilder{buf: labels[:0]} -} - -func (b *BufferedLabelsBuilder) Reset() { - b.buf = b.buf[:0] -} - -func (b *BufferedLabelsBuilder) Add(label labels.Label) { - b.buf = append(b.buf, label) -} - -func (b *BufferedLabelsBuilder) Labels() labels.Labels { - //slices.SortFunc(b.buf, func(a, b labels.Label) int { return strings.Compare(a.Name, b.Name) }) - return b.buf -} diff --git a/pkg/logql/log/labels_slicelabels.go b/pkg/logql/log/labels_slicelabels.go new file mode 100644 index 0000000000..aff9ef8c8f --- /dev/null +++ b/pkg/logql/log/labels_slicelabels.go @@ -0,0 +1,47 @@ +//go:build !stringlabels && !dedupelabels + +package log + +import "github.com/prometheus/prometheus/model/labels" + +type hasher struct { + buf []byte // buffer for computing hash without bytes slice allocation. +} + +// newHasher returns a hasher that computes hashes for labels by reusing the same buffer. +func newHasher() *hasher { + return &hasher{ + buf: make([]byte, 0, 1024), + } +} + +// Hash computes a hash of lbs. +// It is not guaranteed to be stable across different Loki processes or versions. +func (h *hasher) Hash(lbs labels.Labels) uint64 { + var hash uint64 + hash, h.buf = lbs.HashWithoutLabels(h.buf, []string(nil)...) + return hash +} + +// BufferedLabelsBuilder is a simple builder that uses a label buffer passed in. +// It is used to avoid allocations when building labels. +type BufferedLabelsBuilder struct { + buf labels.Labels +} + +func NewBufferedLabelsBuilder(labels labels.Labels) *BufferedLabelsBuilder { + return &BufferedLabelsBuilder{buf: labels[:0]} +} + +func (b *BufferedLabelsBuilder) Reset() { + b.buf = b.buf[:0] +} + +func (b *BufferedLabelsBuilder) Add(label labels.Label) { + b.buf = append(b.buf, label) +} + +func (b *BufferedLabelsBuilder) Labels() labels.Labels { + //slices.SortFunc(b.buf, func(a, b labels.Label) int { return strings.Compare(a.Name, b.Name) }) + return b.buf +} diff --git a/pkg/logql/log/labels_stringlabels.go b/pkg/logql/log/labels_stringlabels.go new file mode 100644 index 0000000000..b165aa3779 --- /dev/null +++ b/pkg/logql/log/labels_stringlabels.go @@ -0,0 +1,43 @@ +//go:build stringlabels + +package log + +import "github.com/prometheus/prometheus/model/labels" + +type hasher struct{} + +// newHasher returns a hasher that computes hashes for labels. +func newHasher() *hasher { + return &hasher{} +} + +// Hash computes a hash of lbs. +// It is not guaranteed to be stable across different Loki processes or versions. +func (h *hasher) Hash(lbs labels.Labels) uint64 { + // We use Hash() here because there's no performance advantage to using HashWithoutLabels() with stringlabels. + // The results from Hash(l) and HashWithoutLabels(l, []string{}) are different with stringlabels, so using Hash + // here also simplifies our tests. + return lbs.Hash() +} + +// BufferedLabelsBuilder is a simple builder that uses a label buffer passed in. +// It is used to avoid allocations when building labels. +type BufferedLabelsBuilder struct { + builder *labels.Builder +} + +func NewBufferedLabelsBuilder(l labels.Labels) *BufferedLabelsBuilder { + return &BufferedLabelsBuilder{builder: labels.NewBuilder(l)} +} + +func (b *BufferedLabelsBuilder) Reset() { + b.builder.Reset(labels.EmptyLabels()) +} + +func (b *BufferedLabelsBuilder) Add(label labels.Label) { + b.builder.Set(label.Name, label.Value) +} + +func (b *BufferedLabelsBuilder) Labels() labels.Labels { + return b.builder.Labels() +} diff --git a/pkg/logql/log/labels_test.go b/pkg/logql/log/labels_test.go index c1a2fcdeec..bd74324ad3 100644 --- a/pkg/logql/log/labels_test.go +++ b/pkg/logql/log/labels_test.go @@ -2,7 +2,8 @@ package log import ( "fmt" - "sort" + "slices" + "strings" "testing" "time" @@ -64,7 +65,7 @@ func TestLabelsBuilder_LabelsError(t *testing.T) { require.Equal(t, expectedLbs.String(), lbsWithErr.String()) require.Equal(t, expectedLbs.Hash(), lbsWithErr.Hash()) require.Equal(t, labels.FromStrings("already", "in"), lbsWithErr.Stream()) - require.Nil(t, lbsWithErr.StructuredMetadata()) + require.Equal(t, labels.EmptyLabels(), lbsWithErr.StructuredMetadata()) require.Equal(t, labels.FromStrings(logqlmodel.ErrorLabel, "err"), lbsWithErr.Parsed()) // make sure the original labels is unchanged. @@ -89,7 +90,7 @@ func TestLabelsBuilder_LabelsErrorFromAdd(t *testing.T) { require.Equal(t, expectedLbs.String(), lbsWithErr.String()) require.Equal(t, expectedLbs.Hash(), lbsWithErr.Hash()) require.Equal(t, labels.FromStrings("already", "in"), lbsWithErr.Stream()) - require.Nil(t, lbsWithErr.StructuredMetadata()) + require.Equal(t, labels.EmptyLabels(), lbsWithErr.StructuredMetadata()) require.Equal(t, labels.FromStrings(logqlmodel.ErrorLabel, "test error", logqlmodel.ErrorDetailsLabel, "test details"), lbsWithErr.Parsed()) // make sure the original labels is unchanged. @@ -187,11 +188,8 @@ func TestLabelsBuilder_LabelsResult(t *testing.T) { "buzz", "fuzz", "ToReplace", "other", ) - expected := make(labels.Labels, 0, len(expectedStreamLbls)+len(expectedStucturedMetadataLbls)+len(expectedParsedLbls)) - expected = append(expected, expectedStreamLbls...) - expected = append(expected, expectedStucturedMetadataLbls...) - expected = append(expected, expectedParsedLbls...) - expected = labels.New(expected...) + + expected := mergeLabels(expectedStreamLbls, expectedStucturedMetadataLbls, expectedParsedLbls) assertLabelResult(t, expected, b.LabelsResult()) // cached. @@ -220,11 +218,7 @@ func TestLabelsBuilder_LabelsResult(t *testing.T) { "ToReplace", "other", ) - expected = make(labels.Labels, 0, len(expectedStreamLbls)+len(expectedStucturedMetadataLbls)+len(expectedParsedLbls)) - expected = append(expected, expectedStreamLbls...) - expected = append(expected, expectedStucturedMetadataLbls...) - expected = append(expected, expectedParsedLbls...) - expected = labels.New(expected...) + expected = mergeLabels(expectedStreamLbls, expectedStucturedMetadataLbls, expectedParsedLbls) assertLabelResult(t, expected, b.LabelsResult()) // cached. assertLabelResult(t, expected, b.LabelsResult()) @@ -250,11 +244,7 @@ func TestLabelsBuilder_Set(t *testing.T) { expectedStucturedMetadataLbls := labels.FromStrings("stzz", "stvzz") expectedParsedLbls := labels.FromStrings("toreplace", "buzz") - expected := make(labels.Labels, 0, len(expectedStreamLbls)+len(expectedStucturedMetadataLbls)+len(expectedParsedLbls)) - expected = append(expected, expectedStreamLbls...) - expected = append(expected, expectedStucturedMetadataLbls...) - expected = append(expected, expectedParsedLbls...) - expected = labels.New(expected...) + expected := mergeLabels(expectedStreamLbls, expectedStucturedMetadataLbls, expectedParsedLbls) actual := b.LabelsResult() assertLabelResult(t, expected, actual) @@ -272,11 +262,7 @@ func TestLabelsBuilder_Set(t *testing.T) { expectedStucturedMetadataLbls = labels.FromStrings("stzz", "stvzz") expectedParsedLbls = labels.FromStrings("toreplace", "buzz") - expected = make(labels.Labels, 0, len(expectedStreamLbls)+len(expectedStucturedMetadataLbls)+len(expectedParsedLbls)) - expected = append(expected, expectedStreamLbls...) - expected = append(expected, expectedStucturedMetadataLbls...) - expected = append(expected, expectedParsedLbls...) - expected = labels.New(expected...) + expected = mergeLabels(expectedStreamLbls, expectedStucturedMetadataLbls, expectedParsedLbls) actual = b.LabelsResult() assertLabelResult(t, expected, actual) @@ -293,11 +279,7 @@ func TestLabelsBuilder_Set(t *testing.T) { expectedStucturedMetadataLbls = labels.FromStrings("toreplace", "muzz") expectedParsedLbls = labels.FromStrings("stzz", "stvzz") - expected = make(labels.Labels, 0, len(expectedStreamLbls)+len(expectedStucturedMetadataLbls)+len(expectedParsedLbls)) - expected = append(expected, expectedStreamLbls...) - expected = append(expected, expectedStucturedMetadataLbls...) - expected = append(expected, expectedParsedLbls...) - expected = labels.New(expected...) + expected = mergeLabels(expectedStreamLbls, expectedStucturedMetadataLbls, expectedParsedLbls) actual = b.LabelsResult() assertLabelResult(t, expected, actual) @@ -315,11 +297,7 @@ func TestLabelsBuilder_Set(t *testing.T) { expectedStucturedMetadataLbls = labels.FromStrings("stzz", "stvzzz") expectedParsedLbls = labels.FromStrings("toreplace", "puzz") - expected = make(labels.Labels, 0, len(expectedStreamLbls)+len(expectedStucturedMetadataLbls)+len(expectedParsedLbls)) - expected = append(expected, expectedStreamLbls...) - expected = append(expected, expectedStucturedMetadataLbls...) - expected = append(expected, expectedParsedLbls...) - expected = labels.New(expected...) + expected = mergeLabels(expectedStreamLbls, expectedStucturedMetadataLbls, expectedParsedLbls) actual = b.LabelsResult() assertLabelResult(t, expected, actual) @@ -336,11 +314,7 @@ func TestLabelsBuilder_Set(t *testing.T) { expectedStucturedMetadataLbls = labels.FromStrings("toreplace", "muzz") expectedParsedLbls = labels.FromStrings("stzz", "stvzzz") - expected = make(labels.Labels, 0, len(expectedStreamLbls)+len(expectedStucturedMetadataLbls)+len(expectedParsedLbls)) - expected = append(expected, expectedStreamLbls...) - expected = append(expected, expectedStucturedMetadataLbls...) - expected = append(expected, expectedParsedLbls...) - expected = labels.New(expected...) + expected = mergeLabels(expectedStreamLbls, expectedStucturedMetadataLbls, expectedParsedLbls) actual = b.LabelsResult() assertLabelResult(t, expected, actual) @@ -357,31 +331,37 @@ func TestLabelsBuilder_UnsortedLabels(t *testing.T) { } lbs := labels.FromStrings(strs...) b := NewBaseLabelsBuilder().ForLabels(lbs, lbs.Hash()) - b.add[StructuredMetadataLabel] = labels.FromStrings("toreplace", "buzz", "fzz", "bzz") - b.add[ParsedLabel] = labels.FromStrings("pzz", "pvzz") - expected := labels.FromStrings("cluster", "us-central1", "namespace", "loki", "fzz", "bzz", "toreplace", "buzz", "pzz", "pvzz") + b.add[StructuredMetadataLabel] = []labels.Label{{"toreplace", "buzz"}, {"fzz", "bzz"}} + b.add[ParsedLabel] = []labels.Label{{"pzz", "pvzz"}} + expected := []labels.Label{{"cluster", "us-central1"}, {"namespace", "loki"}, {"fzz", "bzz"}, {"toreplace", "buzz"}, {"pzz", "pvzz"}} actual := b.UnsortedLabels(nil) require.ElementsMatch(t, expected, actual) b.Reset() - b.add[StructuredMetadataLabel] = labels.FromStrings("fzz", "bzz") - b.add[ParsedLabel] = labels.FromStrings("toreplace", "buzz", "pzz", "pvzz") - expected = labels.FromStrings("cluster", "us-central1", "namespace", "loki", "fzz", "bzz", "toreplace", "buzz", "pzz", "pvzz") + b.add[StructuredMetadataLabel] = []labels.Label{{"fzz", "bzz"}} + b.add[ParsedLabel] = []labels.Label{{"toreplace", "buzz"}, {"pzz", "pvzz"}} + expected = []labels.Label{{"cluster", "us-central1"}, {"namespace", "loki"}, {"fzz", "bzz"}, {"toreplace", "buzz"}, {"pzz", "pvzz"}} actual = b.UnsortedLabels(nil) - sort.Sort(expected) - sort.Sort(actual) + sortLabelSlice(expected) + sortLabelSlice(actual) assert.Equal(t, expected, actual) b.Reset() - b.add[StructuredMetadataLabel] = labels.FromStrings("fzz", "bzz", "toreplacezz", "test") - b.add[ParsedLabel] = labels.FromStrings("toreplacezz", "buzz", "pzz", "pvzz") - expected = labels.FromStrings("cluster", "us-central1", "namespace", "loki", "fzz", "bzz", "toreplace", "fuzz", "pzz", "pvzz", "toreplacezz", "buzz") + b.add[StructuredMetadataLabel] = []labels.Label{{"fzz", "bzz"}, {"toreplacezz", "test"}} + b.add[ParsedLabel] = []labels.Label{{"toreplacezz", "buzz"}, {"pzz", "pvzz"}} + expected = []labels.Label{{"cluster", "us-central1"}, {"namespace", "loki"}, {"fzz", "bzz"}, {"toreplace", "fuzz"}, {"pzz", "pvzz"}, {"toreplacezz", "buzz"}} actual = b.UnsortedLabels(nil) - sort.Sort(expected) - sort.Sort(actual) + sortLabelSlice(expected) + sortLabelSlice(actual) assert.Equal(t, expected, actual) } +func sortLabelSlice(l []labels.Label) { + slices.SortFunc(l, func(a, b labels.Label) int { + return strings.Compare(a.Name, b.Name) + }) +} + func TestLabelsBuilder_GroupedLabelsResult(t *testing.T) { strs := []string{"namespace", "loki", "job", "us-central1/loki", @@ -466,6 +446,20 @@ func assertLabelResult(t *testing.T, lbs labels.Labels, res LabelsResult) { ) } +func mergeLabels(streamLabels, structuredMetadataLabels, parsedLabels labels.Labels) labels.Labels { + builder := labels.NewBuilder(streamLabels) + + structuredMetadataLabels.Range(func(l labels.Label) { + builder.Set(l.Name, l.Value) + }) + + parsedLabels.Range(func(l labels.Label) { + builder.Set(l.Name, l.Value) + }) + + return builder.Labels() +} + // benchmark streamLineSampleExtractor.Process method func BenchmarkStreamLineSampleExtractor_Process(b *testing.B) { // Setup some test data @@ -477,12 +471,12 @@ func BenchmarkStreamLineSampleExtractor_Process(b *testing.B) { "stream", "stdout", ) - structuredMeta := []labels.Label{ - {Name: "level", Value: "info"}, - {Name: "caller", Value: "http.go:42"}, - {Name: "user", Value: "john"}, - {Name: "trace_id", Value: "abc123"}, - } + structuredMeta := labels.FromStrings( + "level", "info", + "caller", "http.go:42", + "user", "john", + "trace_id", "abc123", + ) testLine := []byte(`{"timestamp":"2024-01-01T00:00:00Z","level":"info","message":"test message","duration_ms":150}`) @@ -500,7 +494,7 @@ func BenchmarkStreamLineSampleExtractor_Process(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - _, _ = streamEx.Process(time.Now().UnixNano(), testLine, structuredMeta...) + _, _ = streamEx.Process(time.Now().UnixNano(), testLine, structuredMeta) } } diff --git a/pkg/logql/log/metrics_extraction.go b/pkg/logql/log/metrics_extraction.go index 69b25b3bcd..e675aef05b 100644 --- a/pkg/logql/log/metrics_extraction.go +++ b/pkg/logql/log/metrics_extraction.go @@ -35,8 +35,8 @@ type SampleExtractor interface { // A StreamSampleExtractor never mutates the received line. type StreamSampleExtractor interface { BaseLabels() LabelsResult - Process(ts int64, line []byte, structuredMetadata ...labels.Label) ([]ExtractedSample, bool) - ProcessString(ts int64, line string, structuredMetadata ...labels.Label) ([]ExtractedSample, bool) + Process(ts int64, line []byte, structuredMetadata labels.Labels) ([]ExtractedSample, bool) + ProcessString(ts int64, line string, structuredMetadata labels.Labels) ([]ExtractedSample, bool) ReferencedStructuredMetadata() bool } @@ -99,7 +99,7 @@ func (l *streamLineSampleExtractor) ReferencedStructuredMetadata() bool { return l.builder.referencedStructuredMetadata } -func (l *streamLineSampleExtractor) Process(ts int64, line []byte, structuredMetadata ...labels.Label) ([]ExtractedSample, bool) { +func (l *streamLineSampleExtractor) Process(ts int64, line []byte, structuredMetadata labels.Labels) ([]ExtractedSample, bool) { l.builder.Reset() l.builder.Add(StructuredMetadataLabel, structuredMetadata) @@ -120,9 +120,9 @@ func (l *streamLineSampleExtractor) Process(ts int64, line []byte, structuredMet return []ExtractedSample{{Value: value, Labels: labels}}, true } -func (l *streamLineSampleExtractor) ProcessString(ts int64, line string, structuredMetadata ...labels.Label) ([]ExtractedSample, bool) { +func (l *streamLineSampleExtractor) ProcessString(ts int64, line string, structuredMetadata labels.Labels) ([]ExtractedSample, bool) { // unsafe get bytes since we have the guarantee that the line won't be mutated. - return l.Process(ts, unsafeGetBytes(line), structuredMetadata...) + return l.Process(ts, unsafeGetBytes(line), structuredMetadata) } func (l *streamLineSampleExtractor) BaseLabels() LabelsResult { return l.builder.currentResult } @@ -199,7 +199,7 @@ func (l *labelSampleExtractor) ForStream(labels labels.Labels) StreamSampleExtra return res } -func (l *streamLabelSampleExtractor) Process(ts int64, line []byte, structuredMetadata ...labels.Label) ([]ExtractedSample, bool) { +func (l *streamLabelSampleExtractor) Process(ts int64, line []byte, structuredMetadata labels.Labels) ([]ExtractedSample, bool) { // Apply the pipeline first. l.builder.Reset() l.builder.Add(StructuredMetadataLabel, structuredMetadata) @@ -230,9 +230,9 @@ func (l *streamLabelSampleExtractor) Process(ts int64, line []byte, structuredMe return []ExtractedSample{{Value: v, Labels: l.builder.GroupedLabels()}}, true } -func (l *streamLabelSampleExtractor) ProcessString(ts int64, line string, structuredMetadata ...labels.Label) ([]ExtractedSample, bool) { +func (l *streamLabelSampleExtractor) ProcessString(ts int64, line string, structuredMetadata labels.Labels) ([]ExtractedSample, bool) { // unsafe get bytes since we have the guarantee that the line won't be mutated. - return l.Process(ts, unsafeGetBytes(line), structuredMetadata...) + return l.Process(ts, unsafeGetBytes(line), structuredMetadata) } func (l *streamLabelSampleExtractor) BaseLabels() LabelsResult { return l.builder.currentResult } @@ -283,7 +283,7 @@ func (sp *filteringStreamExtractor) BaseLabels() LabelsResult { return sp.extractor.BaseLabels() } -func (sp *filteringStreamExtractor) Process(ts int64, line []byte, structuredMetadata ...labels.Label) ([]ExtractedSample, bool) { +func (sp *filteringStreamExtractor) Process(ts int64, line []byte, structuredMetadata labels.Labels) ([]ExtractedSample, bool) { for _, filter := range sp.filters { if ts < filter.start || ts > filter.end { continue @@ -295,10 +295,10 @@ func (sp *filteringStreamExtractor) Process(ts int64, line []byte, structuredMet } } - return sp.extractor.Process(ts, line, structuredMetadata...) + return sp.extractor.Process(ts, line, structuredMetadata) } -func (sp *filteringStreamExtractor) ProcessString(ts int64, line string, structuredMetadata ...labels.Label) ([]ExtractedSample, bool) { +func (sp *filteringStreamExtractor) ProcessString(ts int64, line string, structuredMetadata labels.Labels) ([]ExtractedSample, bool) { for _, filter := range sp.filters { if ts < filter.start || ts > filter.end { continue @@ -310,7 +310,7 @@ func (sp *filteringStreamExtractor) ProcessString(ts int64, line string, structu } } - return sp.extractor.ProcessString(ts, line, labels.EmptyLabels()...) + return sp.extractor.ProcessString(ts, line, structuredMetadata) } func convertFloat(v string) (float64, error) { diff --git a/pkg/logql/log/metrics_extraction_test.go b/pkg/logql/log/metrics_extraction_test.go index 6fca411169..bd8df89e58 100644 --- a/pkg/logql/log/metrics_extraction_test.go +++ b/pkg/logql/log/metrics_extraction_test.go @@ -251,7 +251,7 @@ func Test_labelSampleExtractor_Extract(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - samples, ok := tt.ex.ForStream(tt.in).Process(0, []byte(tt.line), tt.structuredMetadata...) + samples, ok := tt.ex.ForStream(tt.in).Process(0, []byte(tt.line), tt.structuredMetadata) require.Equal(t, tt.wantOk, ok) if ok { require.Len(t, samples, 1, "Expected exactly one sample") @@ -259,7 +259,7 @@ func Test_labelSampleExtractor_Extract(t *testing.T) { require.Equal(t, tt.wantLbs, samples[0].Labels.Labels()) } - samples, ok = tt.ex.ForStream(tt.in).ProcessString(0, tt.line, tt.structuredMetadata...) + samples, ok = tt.ex.ForStream(tt.in).ProcessString(0, tt.line, tt.structuredMetadata) require.Equal(t, tt.wantOk, ok) if ok { require.Len(t, samples, 1, "Expected exactly one sample") @@ -273,7 +273,7 @@ func Test_labelSampleExtractor_Extract(t *testing.T) { func Test_Extract_ExpectedLabels(t *testing.T) { ex := mustSampleExtractor(LabelExtractorWithStages("duration", ConvertDuration, []string{"foo"}, false, false, []Stage{NewJSONParser(false)}, NoopStage)) - samples, ok := ex.ForStream(labels.FromStrings("bar", "foo")).ProcessString(0, `{"duration":"20ms","foo":"json"}`) + samples, ok := ex.ForStream(labels.FromStrings("bar", "foo")).ProcessString(0, `{"duration":"20ms","foo":"json"}`, labels.EmptyLabels()) require.True(t, ok) require.Len(t, samples, 1, "Expected exactly one sample") require.Equal(t, (20 * time.Millisecond).Seconds(), samples[0].Value) @@ -325,7 +325,7 @@ func TestLabelExtractorWithStages(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { for _, line := range tc.checkLines { - samples, ok := tc.extractor.ForStream(labels.FromStrings("bar", "foo")).ProcessString(0, line.logLine) + samples, ok := tc.extractor.ForStream(labels.FromStrings("bar", "foo")).ProcessString(0, line.logLine, labels.EmptyLabels()) skipped := !ok assert.Equal(t, line.skip, skipped, "line", line.logLine) if !skipped { @@ -360,13 +360,13 @@ func TestNewLineSampleExtractor(t *testing.T) { ) sse := se.ForStream(lbs) - samples, ok := sse.Process(0, []byte(`foo`)) + samples, ok := sse.Process(0, []byte(`foo`), labels.EmptyLabels()) require.True(t, ok) require.Len(t, samples, 1, "Expected exactly one sample") require.Equal(t, 1., samples[0].Value) assertLabelResult(t, lbs, samples[0].Labels) - samples, ok = sse.ProcessString(0, `foo`) + samples, ok = sse.ProcessString(0, `foo`, labels.EmptyLabels()) require.True(t, ok) require.Len(t, samples, 1, "Expected exactly one sample") require.Equal(t, 1., samples[0].Value) @@ -377,21 +377,21 @@ func TestNewLineSampleExtractor(t *testing.T) { require.NoError(t, err) sse = se.ForStream(lbs) - samples, ok = sse.Process(0, []byte(`foo`)) + samples, ok = sse.Process(0, []byte(`foo`), labels.EmptyLabels()) require.True(t, ok) require.Len(t, samples, 1, "Expected exactly one sample") require.Equal(t, 3., samples[0].Value) assertLabelResult(t, labels.FromStrings("namespace", "dev"), samples[0].Labels) sse = se.ForStream(lbs) - _, ok = sse.Process(0, []byte(`nope`)) + _, ok = sse.Process(0, []byte(`nope`), labels.EmptyLabels()) require.False(t, ok) } func TestNewLineSampleExtractorWithStructuredMetadata(t *testing.T) { lbs := labels.FromStrings("foo", "bar") structuredMetadata := labels.FromStrings("user", "bob") - expectedLabelsResults := append(lbs, structuredMetadata...) + expectedLabelsResults := appendLabels(lbs, structuredMetadata) se, err := NewLineSampleExtractor(CountExtractor, []Stage{ NewStringLabelFilter(labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")), NewStringLabelFilter(labels.MustNewMatcher(labels.MatchEqual, "user", "bob")), @@ -399,34 +399,28 @@ func TestNewLineSampleExtractorWithStructuredMetadata(t *testing.T) { require.NoError(t, err) sse := se.ForStream(lbs) - samples, ok := sse.Process(0, []byte(`foo`), structuredMetadata...) + samples, ok := sse.Process(0, []byte(`foo`), structuredMetadata) require.True(t, ok) require.Len(t, samples, 1, "Expected exactly one sample") require.Equal(t, 1., samples[0].Value) assertLabelResult(t, expectedLabelsResults, samples[0].Labels) - samples, ok = sse.ProcessString(0, `foo`, structuredMetadata...) + samples, ok = sse.ProcessString(0, `foo`, structuredMetadata) require.True(t, ok) require.Len(t, samples, 1, "Expected exactly one sample") require.Equal(t, 1., samples[0].Value) assertLabelResult(t, expectedLabelsResults, samples[0].Labels) // test duplicated structured metadata with stream labels - expectedLabelsResults = append(lbs, labels.Label{ - Name: "foo_extracted", Value: "baz", - }) - expectedLabelsResults = append(expectedLabelsResults, structuredMetadata...) - samples, ok = sse.Process(0, []byte(`foo`), append(structuredMetadata, labels.Label{ - Name: "foo", Value: "baz", - })...) + expectedLabelsResults = appendLabel(lbs, "foo_extracted", "baz") + expectedLabelsResults = appendLabels(expectedLabelsResults, structuredMetadata) + samples, ok = sse.Process(0, []byte(`foo`), appendLabel(structuredMetadata, "foo", "baz")) require.True(t, ok) require.Len(t, samples, 1, "Expected exactly one sample") require.Equal(t, 1., samples[0].Value) assertLabelResult(t, expectedLabelsResults, samples[0].Labels) - samples, ok = sse.ProcessString(0, `foo`, append(structuredMetadata, labels.Label{ - Name: "foo", Value: "baz", - })...) + samples, ok = sse.ProcessString(0, `foo`, appendLabel(structuredMetadata, "foo", "baz")) require.True(t, ok) require.Len(t, samples, 1, "Expected exactly one sample") require.Equal(t, 1., samples[0].Value) @@ -440,21 +434,27 @@ func TestNewLineSampleExtractorWithStructuredMetadata(t *testing.T) { require.NoError(t, err) sse = se.ForStream(lbs) - samples, ok = sse.Process(0, []byte(`foo`), structuredMetadata...) + samples, ok = sse.Process(0, []byte(`foo`), structuredMetadata) require.True(t, ok) require.Len(t, samples, 1, "Expected exactly one sample") require.Equal(t, 3., samples[0].Value) assertLabelResult(t, labels.FromStrings("foo", "bar"), samples[0].Labels) sse = se.ForStream(lbs) - _, ok = sse.Process(0, []byte(`nope`)) + _, ok = sse.Process(0, []byte(`nope`), labels.EmptyLabels()) require.False(t, ok) } +func appendLabel(l labels.Labels, name, value string) labels.Labels { + b := labels.NewBuilder(l) + b.Set(name, value) + return b.Labels() +} + func TestFilteringSampleExtractor(t *testing.T) { se := NewFilteringSampleExtractor([]PipelineFilter{ - newPipelineFilter(2, 4, labels.FromStrings("foo", "bar", "bar", "baz"), nil, "e"), - newPipelineFilter(3, 5, labels.FromStrings("baz", "foo"), nil, "e"), + newPipelineFilter(2, 4, labels.FromStrings("foo", "bar", "bar", "baz"), labels.EmptyLabels(), "e"), + newPipelineFilter(3, 5, labels.FromStrings("baz", "foo"), labels.EmptyLabels(), "e"), newPipelineFilter(3, 5, labels.FromStrings("foo", "baz"), labels.FromStrings("user", "bob"), "e"), }, newStubExtractor()) @@ -466,23 +466,23 @@ func TestFilteringSampleExtractor(t *testing.T) { structuredMetadata labels.Labels ok bool }{ - {"it is after the timerange", 6, "line", labels.FromStrings("baz", "foo"), nil, true}, - {"it is before the timerange", 1, "line", labels.FromStrings("baz", "foo"), nil, true}, - {"it doesn't match the filter", 3, "all good", labels.FromStrings("baz", "foo"), nil, true}, - {"it doesn't match all the selectors", 3, "line", labels.FromStrings("foo", "bar"), nil, true}, - {"it doesn't match any selectors", 3, "line", labels.FromStrings("beep", "boop"), nil, true}, - {"it matches all selectors", 3, "line", labels.FromStrings("foo", "bar", "bar", "baz"), nil, false}, + {"it is after the timerange", 6, "line", labels.FromStrings("baz", "foo"), labels.EmptyLabels(), true}, + {"it is before the timerange", 1, "line", labels.FromStrings("baz", "foo"), labels.EmptyLabels(), true}, + {"it doesn't match the filter", 3, "all good", labels.FromStrings("baz", "foo"), labels.EmptyLabels(), true}, + {"it doesn't match all the selectors", 3, "line", labels.FromStrings("foo", "bar"), labels.EmptyLabels(), true}, + {"it doesn't match any selectors", 3, "line", labels.FromStrings("beep", "boop"), labels.EmptyLabels(), true}, + {"it matches all selectors", 3, "line", labels.FromStrings("foo", "bar", "bar", "baz"), labels.EmptyLabels(), false}, {"it doesn't match all structured metadata", 3, "line", labels.FromStrings("foo", "baz"), labels.FromStrings("user", "alice"), true}, {"it matches all structured metadata", 3, "line", labels.FromStrings("foo", "baz"), labels.FromStrings("user", "bob"), false}, - {"it tries all the filters", 5, "line", labels.FromStrings("baz", "foo"), nil, false}, + {"it tries all the filters", 5, "line", labels.FromStrings("baz", "foo"), labels.EmptyLabels(), false}, } for _, test := range tt { t.Run(test.name, func(t *testing.T) { - _, ok := se.ForStream(test.labels).Process(test.ts, []byte(test.line), test.structuredMetadata...) + _, ok := se.ForStream(test.labels).Process(test.ts, []byte(test.line), test.structuredMetadata) require.Equal(t, test.ok, ok) - _, ok = se.ForStream(test.labels).ProcessString(test.ts, test.line, test.structuredMetadata...) + _, ok = se.ForStream(test.labels).ProcessString(test.ts, test.line, test.structuredMetadata) require.Equal(t, test.ok, ok) }) } @@ -514,7 +514,7 @@ func (p *stubStreamExtractor) BaseLabels() LabelsResult { func (p *stubStreamExtractor) Process( _ int64, _ []byte, - structuredMetadata ...labels.Label, + structuredMetadata labels.Labels, ) ([]ExtractedSample, bool) { builder := NewBaseLabelsBuilder().ForLabels(labels.FromStrings("foo", "bar"), 0) builder.Add(StructuredMetadataLabel, structuredMetadata) @@ -527,7 +527,7 @@ func (p *stubStreamExtractor) Process( func (p *stubStreamExtractor) ProcessString( _ int64, _ string, - structuredMetadata ...labels.Label, + structuredMetadata labels.Labels, ) ([]ExtractedSample, bool) { builder := NewBaseLabelsBuilder().ForLabels(labels.FromStrings("foo", "bar"), 0) builder.Add(StructuredMetadataLabel, structuredMetadata) diff --git a/pkg/logql/log/parser_hints_test.go b/pkg/logql/log/parser_hints_test.go index 6da8516091..d55eafd1c3 100644 --- a/pkg/logql/log/parser_hints_test.go +++ b/pkg/logql/log/parser_hints_test.go @@ -234,7 +234,7 @@ func Test_ParserHints(t *testing.T) { require.NoError(t, err) for i, ex := range exs { - res, ok := ex.ForStream(lbs).Process(0, append([]byte{}, tt.line...)) + res, ok := ex.ForStream(lbs).Process(0, append([]byte{}, tt.line...), labels.EmptyLabels()) require.Equal(t, tt.expectOk, ok) for _, sample := range res { diff --git a/pkg/logql/log/parser_test.go b/pkg/logql/log/parser_test.go index 3f6eb449bf..2a855389aa 100644 --- a/pkg/logql/log/parser_test.go +++ b/pkg/logql/log/parser_test.go @@ -2,7 +2,6 @@ package log import ( "fmt" - "sort" "testing" "github.com/grafana/loki/v3/pkg/logqlmodel" @@ -238,7 +237,7 @@ func TestKeyShortCircuit(t *testing.T) { t.Run(tt.name, func(t *testing.T) { _, result = tt.p.Process(0, tt.line, lbs) - require.Len(t, lbs.labels(), 1) + require.Equal(t, 1, lbs.LabelsResult().Labels().Len()) require.False(t, result) }) } @@ -278,7 +277,7 @@ func TestLabelShortCircuit(t *testing.T) { t.Run(tt.name, func(t *testing.T) { _, result = tt.p.Process(0, tt.line, lbs) - require.Len(t, lbs.labels(), 1) + require.Equal(t, 1, lbs.LabelsResult().Labels().Len()) name, category, ok := lbs.GetWithCategory("name") require.True(t, ok) require.Equal(t, ParsedLabel, category) @@ -1075,7 +1074,7 @@ func TestLogfmtParser_parse(t *testing.T) { []byte(`buzz=foo bar=�f`), labels.EmptyLabels(), labels.FromStrings("bar", " f", "buzz", "foo"), - nil, + labels.EmptyLabels(), NoParserHints(), }, { @@ -1084,7 +1083,7 @@ func TestLogfmtParser_parse(t *testing.T) { labels.FromStrings("foo", "bar"), labels.FromStrings("foo", "bar", "bar", "foo"), - nil, + labels.EmptyLabels(), NoParserHints(), }, { @@ -1094,7 +1093,7 @@ func TestLogfmtParser_parse(t *testing.T) { labels.FromStrings("foo", "bar", "foobar", "foo bar", ), - nil, + labels.EmptyLabels(), NoParserHints(), }, { @@ -1104,7 +1103,7 @@ func TestLogfmtParser_parse(t *testing.T) { labels.FromStrings("a", "b", "foobar", "foo\nbar\tbaz", ), - nil, + labels.EmptyLabels(), NoParserHints(), }, { @@ -1114,7 +1113,7 @@ func TestLogfmtParser_parse(t *testing.T) { labels.FromStrings("a", "b", "foobar", "foo\nbar\tbaz", ), - nil, + labels.EmptyLabels(), NoParserHints(), }, { @@ -1124,7 +1123,7 @@ func TestLogfmtParser_parse(t *testing.T) { labels.FromStrings("a", "b", "foobar", `foo ba\r baz`, ), - nil, + labels.EmptyLabels(), NoParserHints(), }, { @@ -1134,7 +1133,7 @@ func TestLogfmtParser_parse(t *testing.T) { labels.FromStrings("a", "b", "foobar", "foo bar\nb\\az", ), - nil, + labels.EmptyLabels(), NoParserHints(), }, { @@ -1145,7 +1144,7 @@ func TestLogfmtParser_parse(t *testing.T) { "foobar", "foo bar", "latency", "10ms", ), - nil, + labels.EmptyLabels(), NoParserHints(), }, { @@ -1155,7 +1154,7 @@ func TestLogfmtParser_parse(t *testing.T) { labels.FromStrings("foo", "bar", "foobar", "foo bar", ), - nil, + labels.EmptyLabels(), NoParserHints(), }, { @@ -1166,7 +1165,7 @@ func TestLogfmtParser_parse(t *testing.T) { "foo_extracted", "foo bar", "foobar", "10ms", ), - nil, + labels.EmptyLabels(), NoParserHints(), }, { @@ -1178,7 +1177,7 @@ func TestLogfmtParser_parse(t *testing.T) { "foo_bar", "10ms", "test_dash", "foo", ), - nil, + labels.EmptyLabels(), NoParserHints(), }, { @@ -1186,7 +1185,7 @@ func TestLogfmtParser_parse(t *testing.T) { nil, labels.FromStrings("foo", "bar"), labels.FromStrings("foo", "bar"), - nil, + labels.EmptyLabels(), NoParserHints(), }, { @@ -1268,10 +1267,9 @@ func TestLogfmtParser_parse(t *testing.T) { _, _ = p.Process(0, tt.line, b) want := tt.want - if tt.wantStrict != nil { + if !tt.wantStrict.IsEmpty() { want = tt.wantStrict } - sort.Sort(want) require.Equal(t, want, b.LabelsResult().Labels()) }) } @@ -1338,7 +1336,6 @@ func TestLogfmtParser_keepEmpty(t *testing.T) { _, _ = p.Process(0, tt.line, b) want := tt.want - sort.Sort(want) require.Equal(t, want, b.LabelsResult().Labels()) }) } diff --git a/pkg/logql/log/pipeline_test.go b/pkg/logql/log/pipeline_test.go index a40dacfbda..1b078a902a 100644 --- a/pkg/logql/log/pipeline_test.go +++ b/pkg/logql/log/pipeline_test.go @@ -29,7 +29,7 @@ func TestNoopPipeline(t *testing.T) { require.Equal(t, true, matches) structuredMetadata := labels.FromStrings("y", "1", "z", "2") - expectedLabelsResults := append(lbs, structuredMetadata...) + expectedLabelsResults := appendLabels(lbs, structuredMetadata) l, lbr, matches = pipeline.ForStream(lbs).Process(0, []byte(""), structuredMetadata) require.Equal(t, []byte(""), l) require.Equal(t, NewLabelsResult(expectedLabelsResults.String(), expectedLabelsResults.Hash(), lbs, structuredMetadata, labels.EmptyLabels()), lbr) @@ -91,8 +91,8 @@ func TestPipeline(t *testing.T) { // Reset caches p.baseBuilder.del = []string{"foo", "bar"} - p.baseBuilder.add = [numValidCategories]labels.Labels{ - ParsedLabel: labels.FromStrings("baz", "blip"), + p.baseBuilder.add = [numValidCategories][]labels.Label{ + ParsedLabel: {{"baz", "blip"}}, } p.Reset() @@ -106,7 +106,7 @@ func TestPipeline(t *testing.T) { func TestPipelineWithStructuredMetadata(t *testing.T) { lbs := labels.FromStrings("foo", "bar") structuredMetadata := labels.FromStrings("user", "bob") - expectedLabelsResults := append(lbs, structuredMetadata...) + expectedLabelsResults := appendLabels(lbs, structuredMetadata) p := NewPipeline([]Stage{ NewStringLabelFilter(labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")), NewStringLabelFilter(labels.MustNewMatcher(labels.MatchEqual, "user", "bob")), @@ -130,7 +130,7 @@ func TestPipelineWithStructuredMetadata(t *testing.T) { // test duplicated structured metadata with stream labels expectedNonIndexedLabels := labels.FromStrings("user", "bob", "foo_extracted", "baz") expectedLabelsResults = labels.FromStrings("foo", "bar", "foo_extracted", "baz") - expectedLabelsResults = append(expectedLabelsResults, structuredMetadata...) + expectedLabelsResults = appendLabels(expectedLabelsResults, structuredMetadata) l, lbr, matches = p.ForStream(lbs).Process(0, []byte("line"), labels.NewBuilder(structuredMetadata).Set("foo", "baz").Labels()) require.Equal(t, []byte("lbs bar bob"), l) require.Equal(t, NewLabelsResult(expectedLabelsResults.String(), expectedLabelsResults.Hash(), lbs, expectedNonIndexedLabels, labels.EmptyLabels()), lbr) @@ -167,8 +167,8 @@ func TestPipelineWithStructuredMetadata(t *testing.T) { // Reset caches p.baseBuilder.del = []string{"foo", "bar"} - p.baseBuilder.add = [numValidCategories]labels.Labels{ - ParsedLabel: labels.FromStrings("baz", "blip"), + p.baseBuilder.add = [numValidCategories][]labels.Label{ + ParsedLabel: {{"baz", "blip"}}, } p.Reset() @@ -179,6 +179,16 @@ func TestPipelineWithStructuredMetadata(t *testing.T) { } } +func appendLabels(base labels.Labels, l labels.Labels) labels.Labels { + b := labels.NewBuilder(base) + + l.Range(func(l labels.Label) { + b.Set(l.Name, l.Value) + }) + + return b.Labels() +} + func TestFilteringPipeline(t *testing.T) { tt := []struct { name string @@ -188,22 +198,22 @@ func TestFilteringPipeline(t *testing.T) { structuredMetadata labels.Labels ok bool }{ - {"it is before the timerange", 1, "line", labels.FromStrings("baz", "foo"), nil, true}, - {"it is after the timerange", 6, "line", labels.FromStrings("baz", "foo"), nil, true}, - {"it doesn't match the filter", 3, "all good", labels.FromStrings("baz", "foo"), nil, true}, - {"it doesn't match all the selectors", 3, "line", labels.FromStrings("foo", "bar"), nil, true}, - {"it doesn't match any selectors", 3, "line", labels.FromStrings("beep", "boop"), nil, true}, - {"it matches all selectors", 3, "line", labels.FromStrings("foo", "bar", "bar", "baz"), nil, false}, + {"it is before the timerange", 1, "line", labels.FromStrings("baz", "foo"), labels.EmptyLabels(), true}, + {"it is after the timerange", 6, "line", labels.FromStrings("baz", "foo"), labels.EmptyLabels(), true}, + {"it doesn't match the filter", 3, "all good", labels.FromStrings("baz", "foo"), labels.EmptyLabels(), true}, + {"it doesn't match all the selectors", 3, "line", labels.FromStrings("foo", "bar"), labels.EmptyLabels(), true}, + {"it doesn't match any selectors", 3, "line", labels.FromStrings("beep", "boop"), labels.EmptyLabels(), true}, + {"it matches all selectors", 3, "line", labels.FromStrings("foo", "bar", "bar", "baz"), labels.EmptyLabels(), false}, {"it doesn't match all structured metadata", 3, "line", labels.FromStrings("foo", "baz"), labels.FromStrings("user", "alice"), true}, {"it matches all structured metadata", 3, "line", labels.FromStrings("foo", "baz"), labels.FromStrings("user", "bob"), false}, - {"it tries all the filters", 5, "line", labels.FromStrings("baz", "foo"), nil, false}, + {"it tries all the filters", 5, "line", labels.FromStrings("baz", "foo"), labels.EmptyLabels(), false}, } for _, test := range tt { downstream := newStubPipeline() p := NewFilteringPipeline([]PipelineFilter{ - newPipelineFilter(2, 4, labels.FromStrings("foo", "bar", "bar", "baz"), nil, "e"), - newPipelineFilter(3, 5, labels.FromStrings("baz", "foo"), nil, "e"), + newPipelineFilter(2, 4, labels.FromStrings("foo", "bar", "bar", "baz"), labels.EmptyLabels(), "e"), + newPipelineFilter(3, 5, labels.FromStrings("baz", "foo"), labels.EmptyLabels(), "e"), newPipelineFilter(3, 5, labels.FromStrings("foo", "baz"), labels.FromStrings("user", "bob"), "e"), }, downstream) @@ -380,8 +390,8 @@ func TestDropLabelsPipeline(t *testing.T) { for i, line := range tt.lines { _, finalLbs, _ := sp.Process(0, line, labels.EmptyLabels()) require.Equal(t, tt.wantLabels[i], finalLbs.Labels()) - require.Nil(t, finalLbs.Stream()) - require.Nil(t, finalLbs.StructuredMetadata()) + require.Equal(t, labels.EmptyLabels(), finalLbs.Stream()) + require.Equal(t, labels.EmptyLabels(), finalLbs.StructuredMetadata()) require.Equal(t, tt.wantLabels[i], finalLbs.Parsed()) require.Equal(t, tt.wantLabels[i].Hash(), finalLbs.Hash()) } @@ -502,13 +512,9 @@ func TestKeepLabelsPipeline(t *testing.T) { finalLine, finalLbs, _ := sp.Process(0, line, labels.EmptyLabels()) require.Equal(t, tt.wantLine[i], finalLine) require.Equal(t, tt.wantLabels[i], finalLbs.Labels()) - require.Nil(t, finalLbs.Stream()) - require.Nil(t, finalLbs.StructuredMetadata()) - if len(tt.wantLabels[i]) > 0 { - require.Equal(t, tt.wantLabels[i], finalLbs.Parsed()) - } else { - require.Nil(t, finalLbs.Parsed()) - } + require.Equal(t, labels.EmptyLabels(), finalLbs.Stream()) + require.Equal(t, labels.EmptyLabels(), finalLbs.StructuredMetadata()) + require.Equal(t, tt.wantLabels[i], finalLbs.Parsed()) require.Equal(t, tt.wantLabels[i].Hash(), finalLbs.Hash()) require.Equal(t, tt.wantLabels[i].String(), finalLbs.String()) } @@ -615,7 +621,7 @@ func Benchmark_Pipeline(b *testing.B) { b.Run("line extractor bytes", func(b *testing.B) { b.ResetTimer() for n := 0; n < b.N; n++ { - samples, ok := ex.Process(0, line) + samples, ok := ex.Process(0, line, labels.EmptyLabels()) if ok && len(samples) > 0 { resSample = samples[0].Value resLbs = samples[0].Labels @@ -628,7 +634,7 @@ func Benchmark_Pipeline(b *testing.B) { b.Run("line extractor string", func(b *testing.B) { b.ResetTimer() for n := 0; n < b.N; n++ { - samples, ok := ex.ProcessString(0, lineString) + samples, ok := ex.ProcessString(0, lineString, labels.EmptyLabels()) if ok && len(samples) > 0 { resSample = samples[0].Value resLbs = samples[0].Labels @@ -646,7 +652,7 @@ func Benchmark_Pipeline(b *testing.B) { b.Run("label extractor bytes", func(b *testing.B) { b.ResetTimer() for n := 0; n < b.N; n++ { - samples, ok := ex.Process(0, line) + samples, ok := ex.Process(0, line, labels.EmptyLabels()) if ok && len(samples) > 0 { resSample = samples[0].Value resLbs = samples[0].Labels @@ -659,7 +665,7 @@ func Benchmark_Pipeline(b *testing.B) { b.Run("label extractor string", func(b *testing.B) { b.ResetTimer() for n := 0; n < b.N; n++ { - samples, ok := ex.ProcessString(0, lineString) + samples, ok := ex.ProcessString(0, lineString, labels.EmptyLabels()) if ok && len(samples) > 0 { resSample = samples[0].Value resLbs = samples[0].Labels diff --git a/pkg/logql/syntax/extractor_test.go b/pkg/logql/syntax/extractor_test.go index 0e74f4edd8..8d514a0e45 100644 --- a/pkg/logql/syntax/extractor_test.go +++ b/pkg/logql/syntax/extractor_test.go @@ -157,7 +157,7 @@ func Test_MultiVariantExpr_Extractors(t *testing.T) { streamExtractor := extractors[0].ForStream(lbls) require.NotNil(t, streamExtractor, "stream extractor should not be nil") - samples, ok := streamExtractor.Process(now.UnixNano(), []byte(tc.testLine)) + samples, ok := streamExtractor.Process(now.UnixNano(), []byte(tc.testLine), labels.EmptyLabels()) require.True(t, ok) seen := make(map[string]float64, len(samples)) @@ -176,7 +176,7 @@ func Test_MultiVariantExpr_Extractors(t *testing.T) { streamExtractor = extractors[0].ForStream(lbls) require.NotNil(t, streamExtractor, "multi-variant stream extractor should not be nil") - mvSamples, ok := streamExtractor.Process(now.UnixNano(), []byte(tc.testLine)) + mvSamples, ok := streamExtractor.Process(now.UnixNano(), []byte(tc.testLine), labels.EmptyLabels()) require.True(t, ok) // remove variant label @@ -412,7 +412,7 @@ func Test_MultiVariantExpr_Extractors(t *testing.T) { streamExtractor := extractors[0].ForStream(lbls) require.NotNil(t, streamExtractor, "stream extractor should not be nil") - samples, ok := streamExtractor.Process(now.UnixNano(), []byte(tc.testLine)) + samples, ok := streamExtractor.Process(now.UnixNano(), []byte(tc.testLine), labels.EmptyLabels()) require.True(t, ok) expectedSamples := make(map[string]float64, len(tc.expected)) diff --git a/pkg/logql/syntax/parser.go b/pkg/logql/syntax/parser.go index 2a257c3662..28d199cec7 100644 --- a/pkg/logql/syntax/parser.go +++ b/pkg/logql/syntax/parser.go @@ -3,7 +3,6 @@ package syntax import ( "errors" "fmt" - "sort" "strings" "sync" @@ -272,11 +271,8 @@ func ParseLogSelector(input string, validate bool) (LogSelectorExpr, error) { func ParseLabels(lbs string) (labels.Labels, error) { ls, err := promql_parser.ParseMetric(lbs) if err != nil { - return nil, err + return labels.EmptyLabels(), err } - // Sort labels to ensure functionally equivalent - // inputs map to the same output - sort.Sort(ls) // Use the label builder to trim empty label values. // Empty label values are equivalent to absent labels diff --git a/pkg/logql/syntax/parser_test.go b/pkg/logql/syntax/parser_test.go index 44d1b15608..fd8298b214 100644 --- a/pkg/logql/syntax/parser_test.go +++ b/pkg/logql/syntax/parser_test.go @@ -3506,7 +3506,7 @@ func Benchmark_MetricPipelineCombined(b *testing.B) { b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { - samples, matches = sp.Process(0, in) + samples, matches = sp.Process(0, in, labels.EmptyLabels()) } v = samples[0].Value @@ -3678,6 +3678,22 @@ func TestParseLabels(t *testing.T) { input: `{job="foo"}`, output: labels.FromStrings("job", "foo"), }, + { + desc: "multiple labels, already sorted", + input: `{env="a", job="foo"}`, + output: labels.FromStrings( + "env", "a", + "job", "foo", + ), + }, + { + desc: "multiple labels, not sorted", + input: `{job="foo", env="a"}`, + output: labels.FromStrings( + "env", "a", + "job", "foo", + ), + }, { desc: "strip empty label value", input: `{job="foo", bar=""}`, diff --git a/pkg/logql/test_utils.go b/pkg/logql/test_utils.go index e1aca81f67..49387692de 100644 --- a/pkg/logql/test_utils.go +++ b/pkg/logql/test_utils.go @@ -148,7 +148,7 @@ func processSeries(in []logproto.Stream, ex []log.SampleExtractor) ([]logproto.S exs := extractor.ForStream(mustParseLabels(stream.Labels)) for _, e := range stream.Entries { - if samples, ok := exs.Process(e.Timestamp.UnixNano(), []byte(e.Line)); ok { + if samples, ok := exs.Process(e.Timestamp.UnixNano(), []byte(e.Line), labels.EmptyLabels()); ok { for _, sample := range samples { lbs := sample.Labels f := sample.Value diff --git a/pkg/storage/lazy_chunk_test.go b/pkg/storage/lazy_chunk_test.go index 1bf9d05b2c..2c2b7abae0 100644 --- a/pkg/storage/lazy_chunk_test.go +++ b/pkg/storage/lazy_chunk_test.go @@ -52,8 +52,10 @@ func TestLazyChunkIterator(t *testing.T) { Hash: fooLabelsWithName.Hash(), Entries: []logproto.Entry{ { - Timestamp: from, - Line: "1", + Timestamp: from, + Line: "1", + Parsed: logproto.EmptyLabelAdapters(), + StructuredMetadata: logproto.EmptyLabelAdapters(), }, }, }), @@ -63,8 +65,10 @@ func TestLazyChunkIterator(t *testing.T) { Hash: fooLabels.Hash(), Entries: []logproto.Entry{ { - Timestamp: from, - Line: "1", + Timestamp: from, + Line: "1", + Parsed: logproto.EmptyLabelAdapters(), + StructuredMetadata: logproto.EmptyLabelAdapters(), }, }, }, diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index 91df335e2b..f9172589f2 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -1133,14 +1133,14 @@ func (p *mockStreamExtractor) BaseLabels() lokilog.LabelsResult { return p.wrappedSP.BaseLabels() } -func (p *mockStreamExtractor) Process(ts int64, line []byte, lbs ...labels.Label) ([]lokilog.ExtractedSample, bool) { +func (p *mockStreamExtractor) Process(ts int64, line []byte, lbs labels.Labels) ([]lokilog.ExtractedSample, bool) { p.called++ - return p.wrappedSP.Process(ts, line, lbs...) + return p.wrappedSP.Process(ts, line, lbs) } -func (p *mockStreamExtractor) ProcessString(ts int64, line string, lbs ...labels.Label) ([]lokilog.ExtractedSample, bool) { +func (p *mockStreamExtractor) ProcessString(ts int64, line string, lbs labels.Labels) ([]lokilog.ExtractedSample, bool) { p.called++ - return p.wrappedSP.ProcessString(ts, line, lbs...) + return p.wrappedSP.ProcessString(ts, line, lbs) } func Test_store_GetSeries(t *testing.T) { @@ -2041,6 +2041,7 @@ func TestQueryReferencingStructuredMetadata(t *testing.T) { entry := logproto.Entry{ Timestamp: ts, Line: fmt.Sprintf("ts=%d level=info", ts.Unix()), + Parsed: logproto.EmptyLabelAdapters(), } if withStructuredMetadata { @@ -2054,6 +2055,8 @@ func TestQueryReferencingStructuredMetadata(t *testing.T) { Value: "1", }, } + } else { + entry.StructuredMetadata = logproto.EmptyLabelAdapters() } dup, err := chunkEnc.Append(&entry) require.False(t, dup) @@ -2086,6 +2089,7 @@ func TestQueryReferencingStructuredMetadata(t *testing.T) { expectedEntry := logproto.Entry{ Timestamp: ts.Truncate(0), Line: fmt.Sprintf("ts=%d level=info", ts.Unix()), + Parsed: logproto.EmptyLabelAdapters(), } if withStructuredMetadata { @@ -2099,6 +2103,8 @@ func TestQueryReferencingStructuredMetadata(t *testing.T) { Value: "1", }, } + } else { + expectedEntry.StructuredMetadata = logproto.EmptyLabelAdapters() } require.Equal(t, expectedEntry, it.At()) }