Labels computation LogQLv2 (#2875)

* Adding a benchmark to compare before and after.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Improves labels management in logql v2.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Cache grouped result when no changes has occured.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Removes unused methods.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Adds docs and tests.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Uncomment tests.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
pull/2789/head^2
Cyril Tovena 5 years ago committed by GitHub
parent bc202ae7ec
commit 53f4aa4ac3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      pkg/chunkenc/dumb_chunk.go
  2. 12
      pkg/chunkenc/interface.go
  3. 56
      pkg/chunkenc/memchunk.go
  4. 148
      pkg/chunkenc/memchunk_test.go
  5. 4
      pkg/ingester/chunk_test.go
  6. 2
      pkg/ingester/flush_test.go
  7. 4
      pkg/ingester/instance.go
  8. 10
      pkg/ingester/stream.go
  9. 4
      pkg/ingester/stream_test.go
  10. 8
      pkg/ingester/tailer.go
  11. 2
      pkg/ingester/transfer_test.go
  12. 5
      pkg/logentry/stages/match.go
  13. 10
      pkg/logql/ast.go
  14. 12
      pkg/logql/ast_test.go
  15. 4
      pkg/logql/functions.go
  16. 16
      pkg/logql/log/fmt_test.go
  17. 8
      pkg/logql/log/label_filter_test.go
  18. 251
      pkg/logql/log/labels.go
  19. 109
      pkg/logql/log/labels_test.go
  20. 179
      pkg/logql/log/metrics_extraction.go
  21. 35
      pkg/logql/log/metrics_extraction_test.go
  22. 29
      pkg/logql/log/parser.go
  23. 12
      pkg/logql/log/parser_test.go
  24. 89
      pkg/logql/log/pipeline.go
  25. 5
      pkg/logql/log/pipeline_test.go
  26. 6
      pkg/logql/parser_test.go
  27. 6
      pkg/logql/test_utils.go
  28. 50
      pkg/storage/batch.go
  29. 49
      pkg/storage/batch_test.go
  30. 15
      pkg/storage/lazy_chunk.go
  31. 7
      pkg/storage/lazy_chunk_test.go

@ -5,11 +5,9 @@ import (
"sort"
"time"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/log"
)
const (
@ -72,7 +70,7 @@ func (c *dumbChunk) Encoding() Encoding { return EncNone }
// Returns an iterator that goes from _most_ recent to _least_ recent (ie,
// backwards).
func (c *dumbChunk) Iterator(_ context.Context, from, through time.Time, direction logproto.Direction, _ labels.Labels, _ logql.Pipeline) (iter.EntryIterator, error) {
func (c *dumbChunk) Iterator(_ context.Context, from, through time.Time, direction logproto.Direction, _ log.StreamPipeline) (iter.EntryIterator, error) {
i := sort.Search(len(c.entries), func(i int) bool {
return !from.After(c.entries[i].Timestamp)
})
@ -97,7 +95,7 @@ func (c *dumbChunk) Iterator(_ context.Context, from, through time.Time, directi
}, nil
}
func (c *dumbChunk) SampleIterator(_ context.Context, from, through time.Time, _ labels.Labels, _ logql.SampleExtractor) iter.SampleIterator {
func (c *dumbChunk) SampleIterator(_ context.Context, from, through time.Time, _ log.StreamSampleExtractor) iter.SampleIterator {
return nil
}

@ -7,11 +7,9 @@ import (
"strings"
"time"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/log"
)
// Errors returned by the chunk interface.
@ -100,8 +98,8 @@ type Chunk interface {
Bounds() (time.Time, time.Time)
SpaceFor(*logproto.Entry) bool
Append(*logproto.Entry) error
Iterator(ctx context.Context, mintT, maxtT time.Time, direction logproto.Direction, lbs labels.Labels, pipeline logql.Pipeline) (iter.EntryIterator, error)
SampleIterator(ctx context.Context, from, through time.Time, lbs labels.Labels, extractor logql.SampleExtractor) iter.SampleIterator
Iterator(ctx context.Context, mintT, maxtT time.Time, direction logproto.Direction, pipeline log.StreamPipeline) (iter.EntryIterator, error)
SampleIterator(ctx context.Context, from, through time.Time, extractor log.StreamSampleExtractor) iter.SampleIterator
// Returns the list of blocks in the chunks.
Blocks(mintT, maxtT time.Time) []Block
Size() int
@ -126,7 +124,7 @@ type Block interface {
// Entries is the amount of entries in the block.
Entries() int
// Iterator returns an entry iterator for the block.
Iterator(ctx context.Context, lbs labels.Labels, pipeline logql.Pipeline) iter.EntryIterator
Iterator(ctx context.Context, pipeline log.StreamPipeline) iter.EntryIterator
// SampleIterator returns a sample iterator for the block.
SampleIterator(ctx context.Context, lbs labels.Labels, extractor logql.SampleExtractor) iter.SampleIterator
SampleIterator(ctx context.Context, extractor log.StreamSampleExtractor) iter.SampleIterator
}

@ -16,11 +16,10 @@ import (
"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/log"
"github.com/grafana/loki/pkg/logql/stats"
)
@ -471,7 +470,7 @@ func (c *MemChunk) Bounds() (fromT, toT time.Time) {
}
// Iterator implements Chunk.
func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, direction logproto.Direction, lbs labels.Labels, pipeline logql.Pipeline) (iter.EntryIterator, error) {
func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, direction logproto.Direction, pipeline log.StreamPipeline) (iter.EntryIterator, error) {
mint, maxt := mintT.UnixNano(), maxtT.UnixNano()
its := make([]iter.EntryIterator, 0, len(c.blocks)+1)
@ -479,11 +478,11 @@ func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, directi
if maxt < b.mint || b.maxt < mint {
continue
}
its = append(its, encBlock{c.encoding, b}.Iterator(ctx, lbs, pipeline))
its = append(its, encBlock{c.encoding, b}.Iterator(ctx, pipeline))
}
if !c.head.isEmpty() {
its = append(its, c.head.iterator(ctx, direction, mint, maxt, lbs, pipeline))
its = append(its, c.head.iterator(ctx, direction, mint, maxt, pipeline))
}
if direction == logproto.FORWARD {
@ -513,7 +512,7 @@ func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, directi
}
// Iterator implements Chunk.
func (c *MemChunk) SampleIterator(ctx context.Context, from, through time.Time, lbs labels.Labels, extractor logql.SampleExtractor) iter.SampleIterator {
func (c *MemChunk) SampleIterator(ctx context.Context, from, through time.Time, extractor log.StreamSampleExtractor) iter.SampleIterator {
mint, maxt := from.UnixNano(), through.UnixNano()
its := make([]iter.SampleIterator, 0, len(c.blocks)+1)
@ -521,11 +520,11 @@ func (c *MemChunk) SampleIterator(ctx context.Context, from, through time.Time,
if maxt < b.mint || b.maxt < mint {
continue
}
its = append(its, encBlock{c.encoding, b}.SampleIterator(ctx, lbs, extractor))
its = append(its, encBlock{c.encoding, b}.SampleIterator(ctx, extractor))
}
if !c.head.isEmpty() {
its = append(its, c.head.sampleIterator(ctx, mint, maxt, lbs, extractor))
its = append(its, c.head.sampleIterator(ctx, mint, maxt, extractor))
}
return iter.NewTimeRangedSampleIterator(
@ -557,18 +556,18 @@ type encBlock struct {
block
}
func (b encBlock) Iterator(ctx context.Context, lbs labels.Labels, pipeline logql.Pipeline) iter.EntryIterator {
func (b encBlock) Iterator(ctx context.Context, pipeline log.StreamPipeline) iter.EntryIterator {
if len(b.b) == 0 {
return iter.NoopIterator
}
return newEntryIterator(ctx, getReaderPool(b.enc), b.b, lbs, pipeline)
return newEntryIterator(ctx, getReaderPool(b.enc), b.b, pipeline)
}
func (b encBlock) SampleIterator(ctx context.Context, lbs labels.Labels, extractor logql.SampleExtractor) iter.SampleIterator {
func (b encBlock) SampleIterator(ctx context.Context, extractor log.StreamSampleExtractor) iter.SampleIterator {
if len(b.b) == 0 {
return iter.NoopIterator
}
return newSampleIterator(ctx, getReaderPool(b.enc), b.b, lbs, extractor)
return newSampleIterator(ctx, getReaderPool(b.enc), b.b, extractor)
}
func (b block) Offset() int {
@ -585,7 +584,7 @@ func (b block) MaxTime() int64 {
return b.maxt
}
func (hb *headBlock) iterator(ctx context.Context, direction logproto.Direction, mint, maxt int64, lbs labels.Labels, pipeline logql.Pipeline) iter.EntryIterator {
func (hb *headBlock) iterator(ctx context.Context, direction logproto.Direction, mint, maxt int64, pipeline log.StreamPipeline) iter.EntryIterator {
if hb.isEmpty() || (maxt < hb.mint || hb.maxt < mint) {
return iter.NoopIterator
}
@ -601,7 +600,7 @@ func (hb *headBlock) iterator(ctx context.Context, direction logproto.Direction,
for _, e := range hb.entries {
chunkStats.HeadChunkBytes += int64(len(e.s))
line := []byte(e.s)
newLine, parsedLbs, ok := pipeline.Process(line, lbs)
newLine, parsedLbs, ok := pipeline.Process(line)
if !ok {
continue
}
@ -630,7 +629,7 @@ func (hb *headBlock) iterator(ctx context.Context, direction logproto.Direction,
return iter.NewStreamsIterator(ctx, streamsResult, direction)
}
func (hb *headBlock) sampleIterator(ctx context.Context, mint, maxt int64, lbs labels.Labels, extractor logql.SampleExtractor) iter.SampleIterator {
func (hb *headBlock) sampleIterator(ctx context.Context, mint, maxt int64, extractor log.StreamSampleExtractor) iter.SampleIterator {
if hb.isEmpty() || (maxt < hb.mint || hb.maxt < mint) {
return iter.NoopIterator
}
@ -640,7 +639,7 @@ func (hb *headBlock) sampleIterator(ctx context.Context, mint, maxt int64, lbs l
for _, e := range hb.entries {
chunkStats.HeadChunkBytes += int64(len(e.s))
line := []byte(e.s)
value, parsedLabels, ok := extractor.Process(line, lbs)
value, parsedLabels, ok := extractor.Process(line)
if !ok {
continue
}
@ -687,11 +686,9 @@ type bufferedIterator struct {
currTs int64
closed bool
baseLbs labels.Labels
}
func newBufferedIterator(ctx context.Context, pool ReaderPool, b []byte, lbs labels.Labels) *bufferedIterator {
func newBufferedIterator(ctx context.Context, pool ReaderPool, b []byte) *bufferedIterator {
chunkStats := stats.GetChunkData(ctx)
chunkStats.CompressedBytes += int64(len(b))
return &bufferedIterator{
@ -701,7 +698,6 @@ func newBufferedIterator(ctx context.Context, pool ReaderPool, b []byte, lbs lab
bufReader: nil, // will be initialized later
pool: pool,
decBuf: make([]byte, binary.MaxVarintLen64),
baseLbs: lbs,
}
}
@ -806,19 +802,19 @@ func (si *bufferedIterator) close() {
si.decBuf = nil
}
func newEntryIterator(ctx context.Context, pool ReaderPool, b []byte, lbs labels.Labels, pipeline logql.Pipeline) iter.EntryIterator {
func newEntryIterator(ctx context.Context, pool ReaderPool, b []byte, pipeline log.StreamPipeline) iter.EntryIterator {
return &entryBufferedIterator{
bufferedIterator: newBufferedIterator(ctx, pool, b, lbs),
bufferedIterator: newBufferedIterator(ctx, pool, b),
pipeline: pipeline,
}
}
type entryBufferedIterator struct {
*bufferedIterator
pipeline logql.Pipeline
pipeline log.StreamPipeline
cur logproto.Entry
currLabels labels.Labels
currLabels log.LabelsResult
}
func (e *entryBufferedIterator) Entry() logproto.Entry {
@ -829,7 +825,7 @@ func (e *entryBufferedIterator) Labels() string { return e.currLabels.String() }
func (e *entryBufferedIterator) Next() bool {
for e.bufferedIterator.Next() {
newLine, lbs, ok := e.pipeline.Process(e.currLine, e.baseLbs)
newLine, lbs, ok := e.pipeline.Process(e.currLine)
if !ok {
continue
}
@ -841,9 +837,9 @@ func (e *entryBufferedIterator) Next() bool {
return false
}
func newSampleIterator(ctx context.Context, pool ReaderPool, b []byte, lbs labels.Labels, extractor logql.SampleExtractor) iter.SampleIterator {
func newSampleIterator(ctx context.Context, pool ReaderPool, b []byte, extractor log.StreamSampleExtractor) iter.SampleIterator {
it := &sampleBufferedIterator{
bufferedIterator: newBufferedIterator(ctx, pool, b, lbs),
bufferedIterator: newBufferedIterator(ctx, pool, b),
extractor: extractor,
}
return it
@ -852,15 +848,15 @@ func newSampleIterator(ctx context.Context, pool ReaderPool, b []byte, lbs label
type sampleBufferedIterator struct {
*bufferedIterator
extractor logql.SampleExtractor
extractor log.StreamSampleExtractor
cur logproto.Sample
currLabels labels.Labels
currLabels log.LabelsResult
}
func (e *sampleBufferedIterator) Next() bool {
for e.bufferedIterator.Next() {
val, labels, ok := e.extractor.Process(e.currLine, e.baseLbs)
val, labels, ok := e.extractor.Process(e.currLine)
if !ok {
continue
}

@ -13,6 +13,7 @@ import (
"time"
"github.com/dustin/go-humanize"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -35,8 +36,16 @@ var testEncoding = []Encoding{
}
var (
testBlockSize = 256 * 1024
testTargetSize = 1500 * 1024
testBlockSize = 256 * 1024
testTargetSize = 1500 * 1024
noopStreamPipeline = log.NewNoopPipeline().ForStream(labels.Labels{})
countExtractor = func() log.StreamSampleExtractor {
ex, err := log.NewLineSampleExtractor(log.CountExtractor, nil, nil, false, false)
if err != nil {
panic(err)
}
return ex.ForStream(labels.Labels{})
}()
)
func TestBlocksInclusive(t *testing.T) {
@ -114,7 +123,7 @@ func TestBlock(t *testing.T) {
}
}
it, err := chk.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, nil, logql.NoopPipeline)
it, err := chk.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, noopStreamPipeline)
require.NoError(t, err)
idx := 0
@ -129,7 +138,7 @@ func TestBlock(t *testing.T) {
require.NoError(t, it.Close())
require.Equal(t, len(cases), idx)
sampleIt := chk.SampleIterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), nil, log.CountExtractor.ToSampleExtractor(nil, false, false))
sampleIt := chk.SampleIterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), countExtractor)
idx = 0
for sampleIt.Next() {
s := sampleIt.Sample()
@ -144,7 +153,7 @@ func TestBlock(t *testing.T) {
require.Equal(t, len(cases), idx)
t.Run("bounded-iteration", func(t *testing.T) {
it, err := chk.Iterator(context.Background(), time.Unix(0, 3), time.Unix(0, 7), logproto.FORWARD, nil, logql.NoopPipeline)
it, err := chk.Iterator(context.Background(), time.Unix(0, 3), time.Unix(0, 7), logproto.FORWARD, noopStreamPipeline)
require.NoError(t, err)
idx := 2
@ -177,7 +186,7 @@ func TestReadFormatV1(t *testing.T) {
t.Fatal(err)
}
it, err := r.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, nil, logql.NoopPipeline)
it, err := r.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, noopStreamPipeline)
if err != nil {
t.Fatal(err)
}
@ -204,7 +213,7 @@ func TestRoundtripV2(t *testing.T) {
assertLines := func(c *MemChunk) {
require.Equal(t, enc, c.Encoding())
it, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, nil, logql.NoopPipeline)
it, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, noopStreamPipeline)
if err != nil {
t.Fatal(err)
}
@ -266,7 +275,7 @@ func TestSerialization(t *testing.T) {
bc, err := NewByteChunk(byt, testBlockSize, testTargetSize)
require.NoError(t, err)
it, err := bc.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, nil, logql.NoopPipeline)
it, err := bc.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, noopStreamPipeline)
require.NoError(t, err)
for i := 0; i < numSamples; i++ {
require.True(t, it.Next())
@ -277,7 +286,7 @@ func TestSerialization(t *testing.T) {
}
require.NoError(t, it.Error())
sampleIt := bc.SampleIterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), nil, log.CountExtractor.ToSampleExtractor(nil, false, false))
sampleIt := bc.SampleIterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), countExtractor)
for i := 0; i < numSamples; i++ {
require.True(t, sampleIt.Next(), i)
@ -320,7 +329,7 @@ func TestChunkFilling(t *testing.T) {
require.Equal(t, int64(lines), i)
it, err := chk.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, 100), logproto.FORWARD, nil, logql.NoopPipeline)
it, err := chk.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, 100), logproto.FORWARD, noopStreamPipeline)
require.NoError(t, err)
i = 0
for it.Next() {
@ -463,7 +472,7 @@ func TestChunkStats(t *testing.T) {
expectedSize := (inserted * len(entry.Line)) + (inserted * 2 * binary.MaxVarintLen64)
ctx := stats.NewContext(context.Background())
it, err := c.Iterator(ctx, first.Add(-time.Hour), entry.Timestamp.Add(time.Hour), logproto.BACKWARD, nil, logql.NoopPipeline)
it, err := c.Iterator(ctx, first.Add(-time.Hour), entry.Timestamp.Add(time.Hour), logproto.BACKWARD, noopStreamPipeline)
if err != nil {
t.Fatal(err)
}
@ -492,7 +501,7 @@ func TestChunkStats(t *testing.T) {
t.Fatal(err)
}
ctx = stats.NewContext(context.Background())
it, err = cb.Iterator(ctx, first.Add(-time.Hour), entry.Timestamp.Add(time.Hour), logproto.BACKWARD, nil, logql.NoopPipeline)
it, err = cb.Iterator(ctx, first.Add(-time.Hour), entry.Timestamp.Add(time.Hour), logproto.BACKWARD, noopStreamPipeline)
if err != nil {
t.Fatal(err)
}
@ -540,7 +549,7 @@ func TestIteratorClose(t *testing.T) {
} {
c := NewMemChunk(enc, testBlockSize, testTargetSize)
inserted := fillChunk(c)
iter, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, inserted), logproto.BACKWARD, nil, logql.NoopPipeline)
iter, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, inserted), logproto.BACKWARD, noopStreamPipeline)
if err != nil {
t.Fatal(err)
}
@ -591,7 +600,7 @@ func BenchmarkRead(b *testing.B) {
for n := 0; n < b.N; n++ {
for _, c := range chunks {
// use forward iterator for benchmark -- backward iterator does extra allocations by keeping entries in memory
iterator, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Now(), logproto.FORWARD, nil, logql.NoopPipeline)
iterator, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Now(), logproto.FORWARD, noopStreamPipeline)
if err != nil {
panic(err)
}
@ -616,7 +625,7 @@ func BenchmarkBackwardIterator(b *testing.B) {
_ = fillChunk(c)
b.ResetTimer()
for n := 0; n < b.N; n++ {
iterator, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Now(), logproto.BACKWARD, nil, logql.NoopPipeline)
iterator, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Now(), logproto.BACKWARD, noopStreamPipeline)
if err != nil {
panic(err)
}
@ -637,7 +646,7 @@ func TestGenerateDataSize(t *testing.T) {
bytesRead := uint64(0)
for _, c := range chunks {
// use forward iterator for benchmark -- backward iterator does extra allocations by keeping entries in memory
iterator, err := c.Iterator(context.TODO(), time.Unix(0, 0), time.Now(), logproto.FORWARD, nil, logql.NoopPipeline)
iterator, err := c.Iterator(context.TODO(), time.Unix(0, 0), time.Now(), logproto.FORWARD, noopStreamPipeline)
if err != nil {
panic(err)
}
@ -671,7 +680,7 @@ func BenchmarkHeadBlockIterator(b *testing.B) {
b.ResetTimer()
for n := 0; n < b.N; n++ {
iter := h.iterator(context.Background(), logproto.BACKWARD, 0, math.MaxInt64, nil, logql.NoopPipeline)
iter := h.iterator(context.Background(), logproto.BACKWARD, 0, math.MaxInt64, noopStreamPipeline)
for iter.Next() {
_ = iter.Entry()
@ -730,7 +739,7 @@ func TestMemChunk_IteratorBounds(t *testing.T) {
c := createChunk()
// testing headchunk
it, err := c.Iterator(context.Background(), tt.mint, tt.maxt, tt.direction, nil, logql.NoopPipeline)
it, err := c.Iterator(context.Background(), tt.mint, tt.maxt, tt.direction, noopStreamPipeline)
require.NoError(t, err)
for i := range tt.expect {
require.Equal(t, tt.expect[i], it.Next())
@ -739,7 +748,7 @@ func TestMemChunk_IteratorBounds(t *testing.T) {
// testing chunk blocks
require.NoError(t, c.cut())
it, err = c.Iterator(context.Background(), tt.mint, tt.maxt, tt.direction, nil, logql.NoopPipeline)
it, err = c.Iterator(context.Background(), tt.mint, tt.maxt, tt.direction, noopStreamPipeline)
require.NoError(t, err)
for i := range tt.expect {
require.Equal(t, tt.expect[i], it.Next())
@ -758,7 +767,7 @@ func TestMemchunkLongLine(t *testing.T) {
for i := 1; i <= 10; i++ {
require.NoError(t, c.Append(&logproto.Entry{Timestamp: time.Unix(0, int64(i)), Line: strings.Repeat("e", 200000)}))
}
it, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, 100), logproto.FORWARD, nil, logql.NoopPipeline)
it, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, 100), logproto.FORWARD, noopStreamPipeline)
require.NoError(t, err)
for i := 1; i <= 10; i++ {
require.True(t, it.Next())
@ -777,3 +786,102 @@ func TestBytesWith(t *testing.T) {
require.Equal(t, exp, out)
}
var streams = []logproto.Stream{}
var series = []logproto.Series{}
func BenchmarkBufferedIteratorLabels(b *testing.B) {
c := NewMemChunk(EncSnappy, testBlockSize, testTargetSize)
_ = fillChunk(c)
labelsSet := []labels.Labels{
{
{Name: "cluster", Value: "us-central1"},
{Name: "stream", Value: "stdout"},
{Name: "filename", Value: "/var/log/pods/loki-prod_query-frontend-6894f97b98-89q2n_eac98024-f60f-44af-a46f-d099bc99d1e7/query-frontend/0.log"},
{Name: "namespace", Value: "loki-dev"},
{Name: "job", Value: "loki-prod/query-frontend"},
{Name: "container", Value: "query-frontend"},
{Name: "pod", Value: "query-frontend-6894f97b98-89q2n"},
},
{
{Name: "cluster", Value: "us-central2"},
{Name: "stream", Value: "stderr"},
{Name: "filename", Value: "/var/log/pods/loki-prod_querier-6894f97b98-89q2n_eac98024-f60f-44af-a46f-d099bc99d1e7/query-frontend/0.log"},
{Name: "namespace", Value: "loki-dev"},
{Name: "job", Value: "loki-prod/querier"},
{Name: "container", Value: "querier"},
{Name: "pod", Value: "querier-6894f97b98-89q2n"},
},
}
for _, test := range []string{
`{app="foo"}`,
`{app="foo"} != "foo"`,
`{app="foo"} != "foo" | logfmt `,
`{app="foo"} != "foo" | logfmt | duration > 10ms`,
`{app="foo"} != "foo" | logfmt | duration > 10ms and component="tsdb"`,
} {
b.Run(test, func(b *testing.B) {
b.ReportAllocs()
expr, err := logql.ParseLogSelector(test)
if err != nil {
b.Fatal(err)
}
p, err := expr.Pipeline()
if err != nil {
b.Fatal(err)
}
var iters []iter.EntryIterator
for _, lbs := range labelsSet {
it, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Now(), logproto.FORWARD, p.ForStream(lbs))
if err != nil {
b.Fatal(err)
}
iters = append(iters, it)
}
b.ResetTimer()
for n := 0; n < b.N; n++ {
for _, it := range iters {
for it.Next() {
streams = append(streams, logproto.Stream{Labels: it.Labels(), Entries: []logproto.Entry{it.Entry()}})
}
}
}
streams = streams[:0]
})
}
for _, test := range []string{
`rate({app="foo"}[1m])`,
`sum by (cluster) (rate({app="foo"}[10s]))`,
`sum by (cluster) (rate({app="foo"} != "foo" [10s]))`,
`sum by (cluster) (rate({app="foo"} != "foo" | logfmt[10s]))`,
`sum by (caller) (rate({app="foo"} != "foo" | logfmt[10s]))`,
`sum by (cluster) (rate({app="foo"} != "foo" | logfmt | duration > 10ms[10s]))`,
`sum by (cluster) (rate({app="foo"} != "foo" | logfmt | duration > 10ms and component="tsdb"[1m]))`,
} {
b.Run(test, func(b *testing.B) {
b.ReportAllocs()
expr, err := logql.ParseSampleExpr(test)
if err != nil {
b.Fatal(err)
}
ex, err := expr.Extractor()
if err != nil {
b.Fatal(err)
}
var iters []iter.SampleIterator
for _, lbs := range labelsSet {
iters = append(iters, c.SampleIterator(context.Background(), time.Unix(0, 0), time.Now(), ex.ForStream(lbs)))
}
b.ResetTimer()
for n := 0; n < b.N; n++ {
for _, it := range iters {
for it.Next() {
series = append(series, logproto.Series{Labels: it.Labels(), Samples: []logproto.Sample{it.Sample()}})
}
}
}
series = series[:0]
})
}
}

@ -64,7 +64,7 @@ func TestIterator(t *testing.T) {
for i := 0; i < entries; i++ {
from := rand.Intn(entries - 1)
len := rand.Intn(entries-from) + 1
iter, err := chunk.Iterator(context.TODO(), time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.FORWARD, labels.Labels{}, logql.NoopPipeline)
iter, err := chunk.Iterator(context.TODO(), time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.FORWARD, logql.NoopPipeline.ForStream(labels.Labels{}))
require.NoError(t, err)
testIteratorForward(t, iter, int64(from), int64(from+len))
_ = iter.Close()
@ -73,7 +73,7 @@ func TestIterator(t *testing.T) {
for i := 0; i < entries; i++ {
from := rand.Intn(entries - 1)
len := rand.Intn(entries-from) + 1
iter, err := chunk.Iterator(context.TODO(), time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.BACKWARD, labels.Labels{}, logql.NoopPipeline)
iter, err := chunk.Iterator(context.TODO(), time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.BACKWARD, logql.NoopPipeline.ForStream(labels.Labels{}))
require.NoError(t, err)
testIteratorBackward(t, iter, int64(from), int64(from+len))
_ = iter.Close()

@ -320,7 +320,7 @@ func (s *testStore) getChunksForUser(userID string) []chunk.Chunk {
}
func buildStreamsFromChunk(t *testing.T, lbs string, chk chunkenc.Chunk) logproto.Stream {
it, err := chk.Iterator(context.TODO(), time.Unix(0, 0), time.Unix(1000, 0), logproto.FORWARD, labels.Labels{}, logql.NoopPipeline)
it, err := chk.Iterator(context.TODO(), time.Unix(0, 0), time.Unix(1000, 0), logproto.FORWARD, logql.NoopPipeline.ForStream(labels.Labels{}))
require.NoError(t, err)
stream := logproto.Stream{

@ -211,7 +211,7 @@ func (i *instance) Query(ctx context.Context, req logql.SelectLogParams) ([]iter
expr.Matchers(),
func(stream *stream) error {
ingStats.TotalChunksMatched += int64(len(stream.chunks))
iter, err := stream.Iterator(ctx, req.Start, req.End, req.Direction, pipeline)
iter, err := stream.Iterator(ctx, req.Start, req.End, req.Direction, pipeline.ForStream(stream.labels))
if err != nil {
return err
}
@ -242,7 +242,7 @@ func (i *instance) QuerySample(ctx context.Context, req logql.SelectSampleParams
expr.Selector().Matchers(),
func(stream *stream) error {
ingStats.TotalChunksMatched += int64(len(stream.chunks))
iter, err := stream.SampleIterator(ctx, req.Start, req.End, extractor)
iter, err := stream.SampleIterator(ctx, req.Start, req.End, extractor.ForStream(stream.labels))
if err != nil {
return err
}

@ -18,7 +18,7 @@ import (
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/log"
)
var (
@ -258,10 +258,10 @@ func (s *stream) cutChunkForSynchronization(entryTimestamp, prevEntryTimestamp t
}
// Returns an iterator.
func (s *stream) Iterator(ctx context.Context, from, through time.Time, direction logproto.Direction, pipeline logql.Pipeline) (iter.EntryIterator, error) {
func (s *stream) Iterator(ctx context.Context, from, through time.Time, direction logproto.Direction, pipeline log.StreamPipeline) (iter.EntryIterator, error) {
iterators := make([]iter.EntryIterator, 0, len(s.chunks))
for _, c := range s.chunks {
itr, err := c.chunk.Iterator(ctx, from, through, direction, s.labels, pipeline)
itr, err := c.chunk.Iterator(ctx, from, through, direction, pipeline)
if err != nil {
return nil, err
}
@ -280,10 +280,10 @@ func (s *stream) Iterator(ctx context.Context, from, through time.Time, directio
}
// Returns an SampleIterator.
func (s *stream) SampleIterator(ctx context.Context, from, through time.Time, extractor logql.SampleExtractor) (iter.SampleIterator, error) {
func (s *stream) SampleIterator(ctx context.Context, from, through time.Time, extractor log.StreamSampleExtractor) (iter.SampleIterator, error) {
iterators := make([]iter.SampleIterator, 0, len(s.chunks))
for _, c := range s.chunks {
if itr := c.chunk.SampleIterator(ctx, from, through, s.labels, extractor); itr != nil {
if itr := c.chunk.SampleIterator(ctx, from, through, extractor); itr != nil {
iterators = append(iterators, itr)
}
}

@ -120,7 +120,7 @@ func TestStreamIterator(t *testing.T) {
for i := 0; i < 100; i++ {
from := rand.Intn(chunks*entries - 1)
len := rand.Intn(chunks*entries-from) + 1
iter, err := s.Iterator(context.TODO(), time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.FORWARD, logql.NoopPipeline)
iter, err := s.Iterator(context.TODO(), time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.FORWARD, logql.NoopPipeline.ForStream(s.labels))
require.NotNil(t, iter)
require.NoError(t, err)
testIteratorForward(t, iter, int64(from), int64(from+len))
@ -130,7 +130,7 @@ func TestStreamIterator(t *testing.T) {
for i := 0; i < 100; i++ {
from := rand.Intn(entries - 1)
len := rand.Intn(chunks*entries-from) + 1
iter, err := s.Iterator(context.TODO(), time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.BACKWARD, logql.NoopPipeline)
iter, err := s.Iterator(context.TODO(), time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.BACKWARD, logql.NoopPipeline.ForStream(s.labels))
require.NotNil(t, iter)
require.NoError(t, err)
testIteratorBackward(t, iter, int64(from), int64(from+len))

@ -145,18 +145,18 @@ func (t *tailer) processStream(stream logproto.Stream) ([]logproto.Stream, error
if err != nil {
return nil, err
}
sp := t.pipeline.ForStream(lbs)
for _, e := range stream.Entries {
newLine, parsedLbs, ok := t.pipeline.Process([]byte(e.Line), lbs)
newLine, parsedLbs, ok := sp.Process([]byte(e.Line))
if !ok {
continue
}
var stream *logproto.Stream
lhash := parsedLbs.Hash()
if stream, ok = streams[lhash]; !ok {
if stream, ok = streams[parsedLbs.Hash()]; !ok {
stream = &logproto.Stream{
Labels: parsedLbs.String(),
}
streams[lhash] = stream
streams[parsedLbs.Hash()] = stream
}
stream.Entries = append(stream.Entries, logproto.Entry{
Timestamp: e.Timestamp,

@ -95,7 +95,7 @@ func TestTransferOut(t *testing.T) {
time.Unix(0, 0),
time.Unix(10, 0),
logproto.FORWARD,
logql.NoopPipeline,
logql.NoopPipeline.ForStream(stream.labels),
)
if !assert.NoError(t, err) {
continue

@ -132,7 +132,8 @@ func (m *matcherStage) Process(lbs model.LabelSet, extracted map[string]interfac
}
}
if newLine, newLabels, ok := m.pipeline.Process([]byte(*entry), labels.FromMap(util.ModelLabelSetToMap(lbs))); ok {
sp := m.pipeline.ForStream(labels.FromMap(util.ModelLabelSetToMap(lbs)))
if newLine, newLabels, ok := sp.Process([]byte(*entry)); ok {
switch m.action {
case MatchActionDrop:
// Adds the drop label to not be sent by the api.EntryHandler
@ -142,7 +143,7 @@ func (m *matcherStage) Process(lbs model.LabelSet, extracted map[string]interfac
for k := range lbs {
delete(lbs, k)
}
for _, l := range newLabels {
for _, l := range newLabels.Labels() {
lbs[model.LabelName(l.Name)] = model.LabelValue(l.Value)
}
m.stage.Process(lbs, extracted, t, entry)

@ -85,7 +85,7 @@ type Pipeline = log.Pipeline
type SampleExtractor = log.SampleExtractor
var (
NoopPipeline = log.NoopPipeline
NoopPipeline = log.NewNoopPipeline()
)
// PipelineExpr is an expression defining a log pipeline.
@ -109,7 +109,7 @@ func (m MultiStageExpr) Pipeline() (log.Pipeline, error) {
return nil, err
}
if len(stages) == 0 {
return log.NoopPipeline, nil
return NoopPipeline, nil
}
return log.NewPipeline(stages), nil
}
@ -169,7 +169,7 @@ func (e *matchersExpr) String() string {
}
func (e *matchersExpr) Pipeline() (log.Pipeline, error) {
return log.NoopPipeline, nil
return NoopPipeline, nil
}
func (e *matchersExpr) HasFilter() bool {
@ -719,7 +719,7 @@ func (e *vectorAggregationExpr) Extractor() (log.SampleExtractor, error) {
// inject in the range vector extractor the outer groups to improve performance.
// This is only possible if the operation is a sum. Anything else needs all labels.
if r, ok := e.left.(*rangeAggregationExpr); ok && e.operation == OpTypeSum {
return r.extractor(e.grouping, true)
return r.extractor(e.grouping, len(e.grouping.groups) == 0)
}
return e.left.Extractor()
}
@ -860,7 +860,7 @@ func (e *literalExpr) String() string {
func (e *literalExpr) Selector() LogSelectorExpr { return e }
func (e *literalExpr) HasFilter() bool { return false }
func (e *literalExpr) Operations() []string { return nil }
func (e *literalExpr) Pipeline() (log.Pipeline, error) { return log.NoopPipeline, nil }
func (e *literalExpr) Pipeline() (log.Pipeline, error) { return NoopPipeline, nil }
func (e *literalExpr) Matchers() []*labels.Matcher { return nil }
func (e *literalExpr) Extractor() (log.SampleExtractor, error) { return nil, nil }

@ -43,7 +43,7 @@ func Test_logSelectorExpr_String(t *testing.T) {
if err != nil {
t.Fatalf("failed to get filter: %s", err)
}
require.Equal(t, tt.expectFilter, p != log.NoopPipeline)
require.Equal(t, tt.expectFilter, p != NoopPipeline)
if expr.String() != tt.selector {
t.Fatalf("error expected: %s got: %s", tt.selector, expr.String())
}
@ -132,7 +132,7 @@ func Test_NilFilterDoesntPanic(t *testing.T) {
p, err := expr.Pipeline()
require.Nil(t, err)
_, _, ok := p.Process([]byte("bleepbloop"), labelBar)
_, _, ok := p.ForStream(labelBar).Process([]byte("bleepbloop"))
require.True(t, ok)
})
@ -216,10 +216,11 @@ func Test_FilterMatcher(t *testing.T) {
p, err := expr.Pipeline()
assert.Nil(t, err)
if tt.lines == nil {
assert.Equal(t, p, log.NoopPipeline)
assert.Equal(t, p, NoopPipeline)
} else {
sp := p.ForStream(labelBar)
for _, lc := range tt.lines {
_, _, ok := p.Process([]byte(lc.l), labelBar)
_, _, ok := sp.Process([]byte(lc.l))
assert.Equal(t, lc.e, ok)
}
}
@ -281,8 +282,9 @@ func BenchmarkContainsFilter(b *testing.B) {
b.ResetTimer()
sp := p.ForStream(labelBar)
for i := 0; i < b.N; i++ {
if _, _, ok := p.Process(line, labelBar); !ok {
if _, _, ok := sp.Process(line); !ok {
b.Fatal("doesn't match")
}
}

@ -68,9 +68,9 @@ func (r rangeAggregationExpr) extractor(gr *grouping, all bool) (log.SampleExtra
// otherwise we extract metrics from the log line.
switch r.operation {
case OpRangeTypeRate, OpRangeTypeCount:
return log.LineExtractorWithStages(log.CountExtractor, stages, groups, without, all)
return log.NewLineSampleExtractor(log.CountExtractor, stages, groups, without, all)
case OpRangeTypeBytes, OpRangeTypeBytesRate:
return log.LineExtractorWithStages(log.BytesExtractor, stages, groups, without, all)
return log.NewLineSampleExtractor(log.BytesExtractor, stages, groups, without, all)
default:
return nil, fmt.Errorf(unsupportedErr, r.operation)
}

@ -41,11 +41,11 @@ func Test_lineFormatter_Format(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
b := NewLabelsBuilder()
b.Reset(tt.lbs)
outLine, _ := tt.fmter.Process(nil, b)
builder := NewBaseLabelsBuilder().ForLabels(tt.lbs, tt.lbs.Hash())
builder.Reset()
outLine, _ := tt.fmter.Process(nil, builder)
require.Equal(t, tt.want, outLine)
require.Equal(t, tt.wantLbs, b.Labels())
require.Equal(t, tt.wantLbs, builder.Labels())
})
}
}
@ -94,11 +94,11 @@ func Test_labelsFormatter_Format(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
b := NewLabelsBuilder()
b.Reset(tt.in)
_, _ = tt.fmter.Process(nil, b)
builder := NewBaseLabelsBuilder().ForLabels(tt.in, tt.in.Hash())
builder.Reset()
_, _ = tt.fmter.Process(nil, builder)
sort.Sort(tt.want)
require.Equal(t, tt.want, b.Labels())
require.Equal(t, tt.want, builder.Labels())
})
}
}

@ -151,8 +151,8 @@ func TestBinary_Filter(t *testing.T) {
for _, tt := range tests {
t.Run(tt.f.String(), func(t *testing.T) {
sort.Sort(tt.lbs)
b := NewLabelsBuilder()
b.Reset(tt.lbs)
b := NewBaseLabelsBuilder().ForLabels(tt.lbs, tt.lbs.Hash())
b.Reset()
_, got := tt.f.Process(nil, b)
require.Equal(t, tt.want, got)
sort.Sort(tt.wantLbs)
@ -231,8 +231,8 @@ func TestErrorFiltering(t *testing.T) {
for _, tt := range tests {
t.Run(tt.f.String(), func(t *testing.T) {
sort.Sort(tt.lbs)
b := NewLabelsBuilder()
b.Reset(tt.lbs)
b := NewBaseLabelsBuilder().ForLabels(tt.lbs, tt.lbs.Hash())
b.Reset()
b.SetErr(tt.err)
_, got := tt.f.Process(nil, b)
require.Equal(t, tt.want, got)

@ -6,26 +6,124 @@ import (
"github.com/prometheus/prometheus/pkg/labels"
)
var (
emptyLabelsResult = NewLabelsResult(labels.Labels{}, labels.Labels{}.Hash())
)
// LabelsResult is a computed labels result that contains the labels set with associated string and hash.
// The is mainly used for caching and returning labels computations out of pipelines and stages.
type LabelsResult interface {
String() string
Labels() labels.Labels
Hash() uint64
}
// NewLabelsResult creates a new LabelsResult from a labels set and a hash.
func NewLabelsResult(lbs labels.Labels, hash uint64) LabelsResult {
return &labelsResult{lbs: lbs, s: lbs.String(), h: hash}
}
type labelsResult struct {
lbs labels.Labels
s string
h uint64
}
func (l labelsResult) String() string {
return l.s
}
func (l labelsResult) Labels() labels.Labels {
return l.lbs
}
func (l labelsResult) Hash() uint64 {
return l.h
}
type hasher struct {
buf []byte // buffer for computing hash without bytes slice allocation.
}
// newHasher allow to compute hashes for labels by reusing the same buffer.
func newHasher() *hasher {
return &hasher{
buf: make([]byte, 0, 1024),
}
}
// Hash hashes the labels
func (h *hasher) Hash(lbs labels.Labels) uint64 {
var hash uint64
hash, h.buf = lbs.HashWithoutLabels(h.buf, []string(nil)...)
return hash
}
// BaseLabelsBuilder is a label builder used by pipeline and stages.
// Only one base builder is used and it contains cache for each LabelsBuilders.
type BaseLabelsBuilder struct {
del []string
add []labels.Label
// nolint(structcheck) https://github.com/golangci/golangci-lint/issues/826
err string
groups []string
without, noLabels bool
resultCache map[uint64]LabelsResult
*hasher
}
// LabelsBuilder is the same as labels.Builder but tailored for this package.
type LabelsBuilder struct {
base labels.Labels
del []string
add []labels.Label
base labels.Labels
currentResult LabelsResult
groupedResult LabelsResult
err string
*BaseLabelsBuilder
}
// NewBaseLabelsBuilderWithGrouping creates a new base labels builder with grouping to compute results.
func NewBaseLabelsBuilderWithGrouping(groups []string, without, noLabels bool) *BaseLabelsBuilder {
return &BaseLabelsBuilder{
del: make([]string, 0, 5),
add: make([]labels.Label, 0, 16),
resultCache: make(map[uint64]LabelsResult),
hasher: newHasher(),
groups: groups,
noLabels: noLabels,
without: without,
}
}
// NewLabelsBuilder creates a new labels builder.
func NewLabelsBuilder() *LabelsBuilder {
return &LabelsBuilder{
del: make([]string, 0, 5),
add: make([]labels.Label, 0, 5),
// NewLabelsBuilder creates a new base labels builder.
func NewBaseLabelsBuilder() *BaseLabelsBuilder {
return NewBaseLabelsBuilderWithGrouping(nil, false, false)
}
// ForLabels creates a labels builder for a given labels set as base.
// The labels cache is shared across all created LabelsBuilders.
func (b *BaseLabelsBuilder) ForLabels(lbs labels.Labels, hash uint64) *LabelsBuilder {
if labelResult, ok := b.resultCache[hash]; ok {
res := &LabelsBuilder{
base: lbs,
currentResult: labelResult,
BaseLabelsBuilder: b,
}
return res
}
labelResult := NewLabelsResult(lbs, hash)
b.resultCache[hash] = labelResult
res := &LabelsBuilder{
base: lbs,
currentResult: labelResult,
BaseLabelsBuilder: b,
}
return res
}
// Reset clears all current state for the builder.
func (b *LabelsBuilder) Reset(base labels.Labels) {
b.base = base
func (b *LabelsBuilder) Reset() {
b.del = b.del[:0]
b.add = b.add[:0]
b.err = ""
@ -47,11 +145,12 @@ func (b *LabelsBuilder) HasErr() bool {
return b.err != ""
}
// Base returns the base labels unmodified
func (b *LabelsBuilder) Base() labels.Labels {
return b.base
// BaseHas returns the base labels have the given key
func (b *LabelsBuilder) BaseHas(key string) bool {
return b.base.Has(key)
}
// Get returns the value of a labels key if it exists.
func (b *LabelsBuilder) Get(key string) (string, bool) {
for _, a := range b.add {
if a.Name == key {
@ -136,12 +235,124 @@ Outer:
return res
}
func (b *LabelsBuilder) WithoutLabels(names ...string) labels.Labels {
// naive implementation for now.
return b.Labels().WithoutLabels(names...)
// LabelsResult returns the LabelsResult from the builder.
// No grouping is applied and the cache is used when possible.
func (b *LabelsBuilder) LabelsResult() LabelsResult {
// unchanged path.
if len(b.del) == 0 && len(b.add) == 0 && b.err == "" {
return b.currentResult
}
return b.toResult(b.Labels())
}
func (b *BaseLabelsBuilder) toResult(lbs labels.Labels) LabelsResult {
hash := b.hasher.Hash(lbs)
if cached, ok := b.resultCache[hash]; ok {
return cached
}
res := NewLabelsResult(lbs, hash)
b.resultCache[hash] = res
return res
}
func (b *LabelsBuilder) WithLabels(names ...string) labels.Labels {
// naive implementation for now.
return b.Labels().WithLabels(names...)
// GroupedLabels returns the LabelsResult from the builder.
// Groups are applied and the cache is used when possible.
func (b *LabelsBuilder) GroupedLabels() LabelsResult {
if b.err != "" {
// We need to return now before applying grouping otherwise the error might get lost.
return b.LabelsResult()
}
if b.noLabels {
return emptyLabelsResult
}
// unchanged path.
if len(b.del) == 0 && len(b.add) == 0 {
if len(b.groups) == 0 {
return b.currentResult
}
return b.toBaseGroup()
}
if b.without {
return b.withoutResult()
}
return b.withResult()
}
func (b *LabelsBuilder) withResult() LabelsResult {
res := make(labels.Labels, 0, len(b.groups))
Outer:
for _, g := range b.groups {
for _, n := range b.del {
if g == n {
continue Outer
}
}
for _, la := range b.add {
if g == la.Name {
res = append(res, la)
continue Outer
}
}
for _, l := range b.base {
if g == l.Name {
res = append(res, l)
continue Outer
}
}
}
return b.toResult(res)
}
func (b *LabelsBuilder) withoutResult() LabelsResult {
size := len(b.base) + len(b.add) - len(b.del) - len(b.groups)
if size < 0 {
size = 0
}
res := make(labels.Labels, 0, size)
Outer:
for _, l := range b.base {
for _, n := range b.del {
if l.Name == n {
continue Outer
}
}
for _, la := range b.add {
if l.Name == la.Name {
continue Outer
}
}
for _, lg := range b.groups {
if l.Name == lg {
continue Outer
}
}
res = append(res, l)
}
OuterAdd:
for _, la := range b.add {
for _, lg := range b.groups {
if la.Name == lg {
continue OuterAdd
}
}
res = append(res, la)
}
sort.Sort(res)
return b.toResult(res)
}
func (b *LabelsBuilder) toBaseGroup() LabelsResult {
if b.groupedResult != nil {
return b.groupedResult
}
var lbs labels.Labels
if b.without {
lbs = b.base.WithoutLabels(b.groups...)
} else {
lbs = b.base.WithLabels(b.groups...)
}
res := NewLabelsResult(lbs, lbs.Hash())
b.groupedResult = res
return res
}

@ -1,6 +1,7 @@
package log
import (
"sort"
"testing"
"github.com/prometheus/prometheus/pkg/labels"
@ -8,8 +9,9 @@ import (
)
func TestLabelsBuilder_Get(t *testing.T) {
b := NewLabelsBuilder()
b.Reset(labels.Labels{labels.Label{Name: "already", Value: "in"}})
lbs := labels.Labels{labels.Label{Name: "already", Value: "in"}}
b := NewBaseLabelsBuilder().ForLabels(lbs, lbs.Hash())
b.Reset()
b.Set("foo", "bar")
b.Set("bar", "buzz")
b.Del("foo")
@ -31,8 +33,8 @@ func TestLabelsBuilder_Get(t *testing.T) {
func TestLabelsBuilder_LabelsError(t *testing.T) {
lbs := labels.Labels{labels.Label{Name: "already", Value: "in"}}
b := NewLabelsBuilder()
b.Reset(lbs)
b := NewBaseLabelsBuilder().ForLabels(lbs, lbs.Hash())
b.Reset()
b.SetErr("err")
lbsWithErr := b.Labels()
require.Equal(
@ -46,3 +48,102 @@ func TestLabelsBuilder_LabelsError(t *testing.T) {
// make sure the original labels is unchanged.
require.Equal(t, labels.Labels{labels.Label{Name: "already", Value: "in"}}, lbs)
}
func TestLabelsBuilder_LabelsResult(t *testing.T) {
lbs := labels.Labels{
labels.Label{Name: "namespace", Value: "loki"},
labels.Label{Name: "job", Value: "us-central1/loki"},
labels.Label{Name: "cluster", Value: "us-central1"},
}
sort.Sort(lbs)
b := NewBaseLabelsBuilder().ForLabels(lbs, lbs.Hash())
b.Reset()
assertLabelResult(t, lbs, b.LabelsResult())
b.SetErr("err")
withErr := append(lbs, labels.Label{Name: ErrorLabel, Value: "err"})
sort.Sort(withErr)
assertLabelResult(t, withErr, b.LabelsResult())
b.Set("foo", "bar")
b.Set("namespace", "tempo")
b.Set("buzz", "fuzz")
b.Del("job")
expected := labels.Labels{
labels.Label{Name: ErrorLabel, Value: "err"},
labels.Label{Name: "namespace", Value: "tempo"},
labels.Label{Name: "cluster", Value: "us-central1"},
labels.Label{Name: "foo", Value: "bar"},
labels.Label{Name: "buzz", Value: "fuzz"},
}
sort.Sort(expected)
assertLabelResult(t, expected, b.LabelsResult())
// cached.
assertLabelResult(t, expected, b.LabelsResult())
}
func TestLabelsBuilder_GroupedLabelsResult(t *testing.T) {
lbs := labels.Labels{
labels.Label{Name: "namespace", Value: "loki"},
labels.Label{Name: "job", Value: "us-central1/loki"},
labels.Label{Name: "cluster", Value: "us-central1"},
}
sort.Sort(lbs)
b := NewBaseLabelsBuilderWithGrouping([]string{"namespace"}, false, false).ForLabels(lbs, lbs.Hash())
b.Reset()
assertLabelResult(t, labels.Labels{labels.Label{Name: "namespace", Value: "loki"}}, b.GroupedLabels())
b.SetErr("err")
withErr := append(lbs, labels.Label{Name: ErrorLabel, Value: "err"})
sort.Sort(withErr)
assertLabelResult(t, withErr, b.GroupedLabels())
b.Reset()
b.Set("foo", "bar")
b.Set("namespace", "tempo")
b.Set("buzz", "fuzz")
b.Del("job")
expected := labels.Labels{
labels.Label{Name: "namespace", Value: "tempo"},
}
sort.Sort(expected)
assertLabelResult(t, expected, b.GroupedLabels())
// cached.
assertLabelResult(t, expected, b.GroupedLabels())
b = NewBaseLabelsBuilderWithGrouping([]string{"job"}, false, false).ForLabels(lbs, lbs.Hash())
assertLabelResult(t, labels.Labels{labels.Label{Name: "job", Value: "us-central1/loki"}}, b.GroupedLabels())
assertLabelResult(t, labels.Labels{labels.Label{Name: "job", Value: "us-central1/loki"}}, b.GroupedLabels())
b.Del("job")
assertLabelResult(t, labels.Labels{}, b.GroupedLabels())
b.Reset()
b.Set("namespace", "tempo")
assertLabelResult(t, labels.Labels{labels.Label{Name: "job", Value: "us-central1/loki"}}, b.GroupedLabels())
b = NewBaseLabelsBuilderWithGrouping([]string{"job"}, true, false).ForLabels(lbs, lbs.Hash())
b.Del("job")
b.Set("foo", "bar")
b.Set("job", "something")
expected = labels.Labels{
labels.Label{Name: "namespace", Value: "loki"},
labels.Label{Name: "cluster", Value: "us-central1"},
labels.Label{Name: "foo", Value: "bar"},
}
sort.Sort(expected)
assertLabelResult(t, expected, b.GroupedLabels())
}
func assertLabelResult(t *testing.T, lbs labels.Labels, res LabelsResult) {
t.Helper()
require.Equal(t,
lbs,
res.Labels(),
)
require.Equal(t,
lbs.Hash(),
res.Hash(),
)
require.Equal(t,
lbs.String(),
res.String(),
)
}

@ -17,96 +17,87 @@ const (
ConvertFloat = "float"
)
// SampleExtractor extracts sample for a log line.
type SampleExtractor interface {
Process(line []byte, lbs labels.Labels) (float64, labels.Labels, bool)
}
type SampleExtractorFunc func(line []byte, lbs labels.Labels) (float64, labels.Labels, bool)
func (fn SampleExtractorFunc) Process(line []byte, lbs labels.Labels) (float64, labels.Labels, bool) {
return fn(line, lbs)
}
// LineExtractor extracts a float64 from a log line.
type LineExtractor func([]byte) float64
// ToSampleExtractor transform a LineExtractor into a SampleExtractor.
// Useful for metric conversion without log Pipeline.
func (l LineExtractor) ToSampleExtractor(groups []string, without bool, noLabels bool) SampleExtractor {
return SampleExtractorFunc(func(line []byte, lbs labels.Labels) (float64, labels.Labels, bool) {
// todo(cyriltovena) grouping should be done once per stream/chunk not for everyline.
// so for now we'll cover just vector without grouping. This requires changes to SampleExtractor interface.
// For another day !
if len(groups) == 0 && noLabels {
return l(line), labels.Labels{}, true
}
return l(line), lbs, true
})
}
var (
CountExtractor LineExtractor = func(line []byte) float64 { return 1. }
BytesExtractor LineExtractor = func(line []byte) float64 { return float64(len(line)) }
)
// SampleExtractor creates StreamSampleExtractor that can extract samples for a given log stream.
type SampleExtractor interface {
ForStream(labels labels.Labels) StreamSampleExtractor
}
// StreamSampleExtractor extracts sample for a log line.
type StreamSampleExtractor interface {
Process(line []byte) (float64, LabelsResult, bool)
}
type lineSampleExtractor struct {
Stage
LineExtractor
groups []string
without bool
noLabels bool
builder *LabelsBuilder
baseBuilder *BaseLabelsBuilder
streamExtractors map[uint64]StreamSampleExtractor
}
func (l lineSampleExtractor) Process(line []byte, lbs labels.Labels) (float64, labels.Labels, bool) {
l.builder.Reset(lbs)
line, ok := l.Stage.Process(line, l.builder)
if !ok {
return 0, nil, false
}
if len(l.groups) != 0 {
if l.without {
return l.LineExtractor(line), l.builder.WithoutLabels(l.groups...), true
}
return l.LineExtractor(line), l.builder.WithLabels(l.groups...), true
// NewLineSampleExtractor creates a SampleExtractor from a LineExtractor.
// Multiple log stages are run before converting the log line.
func NewLineSampleExtractor(ex LineExtractor, stages []Stage, groups []string, without bool, noLabels bool) (SampleExtractor, error) {
return &lineSampleExtractor{
Stage: ReduceStages(stages),
LineExtractor: ex,
baseBuilder: NewBaseLabelsBuilderWithGrouping(groups, without, noLabels),
streamExtractors: make(map[uint64]StreamSampleExtractor),
}, nil
}
func (l *lineSampleExtractor) ForStream(labels labels.Labels) StreamSampleExtractor {
hash := l.baseBuilder.Hash(labels)
if res, ok := l.streamExtractors[hash]; ok {
return res
}
if l.noLabels {
// no grouping but it was a vector operation so we return a single vector
return l.LineExtractor(line), labels.Labels{}, true
res := &streamLineSampleExtractor{
Stage: l.Stage,
LineExtractor: l.LineExtractor,
builder: l.baseBuilder.ForLabels(labels, hash),
}
return l.LineExtractor(line), l.builder.Labels(), true
l.streamExtractors[hash] = res
return res
}
// LineExtractorWithStages creates a SampleExtractor from a LineExtractor.
// Multiple log stages are run before converting the log line.
func LineExtractorWithStages(ex LineExtractor, stages []Stage, groups []string, without bool, noLabels bool) (SampleExtractor, error) {
if len(stages) == 0 {
return ex.ToSampleExtractor(groups, without, noLabels), nil
type streamLineSampleExtractor struct {
Stage
LineExtractor
builder *LabelsBuilder
}
func (l *streamLineSampleExtractor) Process(line []byte) (float64, LabelsResult, bool) {
// short circuit.
if l.Stage == NoopStage {
return l.LineExtractor(line), l.builder.GroupedLabels(), true
}
return lineSampleExtractor{
Stage: ReduceStages(stages),
LineExtractor: ex,
builder: NewLabelsBuilder(),
groups: groups,
without: without,
noLabels: noLabels,
}, nil
l.builder.Reset()
line, ok := l.Stage.Process(line, l.builder)
if !ok {
return 0, nil, false
}
return l.LineExtractor(line), l.builder.GroupedLabels(), true
}
type convertionFn func(value string) (float64, error)
type labelSampleExtractor struct {
preStage Stage
postFilter Stage
builder *LabelsBuilder
preStage Stage
postFilter Stage
labelName string
conversionFn convertionFn
groups []string
without bool
noLabels bool
baseBuilder *BaseLabelsBuilder
streamExtractors map[uint64]StreamSampleExtractor
}
// LabelExtractorWithStages creates a SampleExtractor that will extract metrics from a labels.
@ -129,25 +120,43 @@ func LabelExtractorWithStages(
default:
return nil, errors.Errorf("unsupported conversion operation %s", conversion)
}
if len(groups) != 0 && without {
if len(groups) == 0 || without {
without = true
groups = append(groups, labelName)
sort.Strings(groups)
}
return &labelSampleExtractor{
preStage: ReduceStages(preStages),
conversionFn: convFn,
groups: groups,
labelName: labelName,
postFilter: postFilter,
without: without,
builder: NewLabelsBuilder(),
noLabels: noLabels,
preStage: ReduceStages(preStages),
conversionFn: convFn,
labelName: labelName,
postFilter: postFilter,
baseBuilder: NewBaseLabelsBuilderWithGrouping(groups, without, noLabels),
streamExtractors: make(map[uint64]StreamSampleExtractor),
}, nil
}
func (l *labelSampleExtractor) Process(line []byte, lbs labels.Labels) (float64, labels.Labels, bool) {
type streamLabelSampleExtractor struct {
*labelSampleExtractor
builder *LabelsBuilder
}
func (l *labelSampleExtractor) ForStream(labels labels.Labels) StreamSampleExtractor {
hash := l.baseBuilder.Hash(labels)
if res, ok := l.streamExtractors[hash]; ok {
return res
}
res := &streamLabelSampleExtractor{
labelSampleExtractor: l,
builder: l.baseBuilder.ForLabels(labels, hash),
}
l.streamExtractors[hash] = res
return res
}
func (l *streamLabelSampleExtractor) Process(line []byte) (float64, LabelsResult, bool) {
// Apply the pipeline first.
l.builder.Reset(lbs)
l.builder.Reset()
line, ok := l.preStage.Process(line, l.builder)
if !ok {
return 0, nil, false
@ -168,25 +177,7 @@ func (l *labelSampleExtractor) Process(line []byte, lbs labels.Labels) (float64,
if _, ok = l.postFilter.Process(line, l.builder); !ok {
return 0, nil, false
}
if l.builder.HasErr() {
// we still have an error after post filtering.
// We need to return now before applying grouping otherwise the error might get lost.
return v, l.builder.Labels(), true
}
return v, l.groupLabels(l.builder), true
}
func (l *labelSampleExtractor) groupLabels(lbs *LabelsBuilder) labels.Labels {
if len(l.groups) != 0 {
if l.without {
return lbs.WithoutLabels(l.groups...)
}
return lbs.WithLabels(l.groups...)
}
if l.noLabels {
return labels.Labels{}
}
return lbs.WithoutLabels(l.labelName)
return v, l.builder.GroupedLabels(), true
}
func convertFloat(v string) (float64, error) {

@ -112,10 +112,11 @@ func Test_labelSampleExtractor_Extract(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
sort.Sort(tt.in)
outval, outlbs, ok := tt.ex.Process([]byte(""), tt.in)
outval, outlbs, ok := tt.ex.ForStream(tt.in).Process([]byte(""))
require.Equal(t, tt.wantOk, ok)
require.Equal(t, tt.want, outval)
require.Equal(t, tt.wantLbs, outlbs)
require.Equal(t, tt.wantLbs, outlbs.Labels())
})
}
}
@ -126,3 +127,33 @@ func mustSampleExtractor(ex SampleExtractor, err error) SampleExtractor {
}
return ex
}
func TestNewLineSampleExtractor(t *testing.T) {
se, err := NewLineSampleExtractor(CountExtractor, nil, nil, false, false)
require.NoError(t, err)
lbs := labels.Labels{
{Name: "namespace", Value: "dev"},
{Name: "cluster", Value: "us-central1"},
}
sort.Sort(lbs)
sse := se.ForStream(lbs)
f, l, ok := sse.Process([]byte(`foo`))
require.True(t, ok)
require.Equal(t, 1., f)
assertLabelResult(t, lbs, l)
filter, err := NewFilter("foo", labels.MatchEqual)
require.NoError(t, err)
se, err = NewLineSampleExtractor(BytesExtractor, []Stage{filter.ToStage()}, []string{"namespace"}, false, false)
require.NoError(t, err)
sse = se.ForStream(lbs)
f, l, ok = sse.Process([]byte(`foo`))
require.True(t, ok)
require.Equal(t, 3., f)
assertLabelResult(t, labels.Labels{labels.Label{Name: "namespace", Value: "dev"}}, l)
sse = se.ForStream(lbs)
_, _, ok = sse.Process([]byte(`nope`))
require.False(t, ok)
}

@ -26,14 +26,12 @@ var (
errMissingCapture = errors.New("at least one named capture must be supplied")
)
func addLabel(lbs *LabelsBuilder) func(key, value string) {
return func(key, value string) {
key = sanitizeKey(key)
if lbs.Base().Has(key) {
key = fmt.Sprintf("%s%s", key, duplicateSuffix)
}
lbs.Set(key, value)
func addLabel(lbs *LabelsBuilder, key, value string) {
key = sanitizeKey(key)
if lbs.BaseHas(key) {
key = fmt.Sprintf("%s%s", key, duplicateSuffix)
}
lbs.Set(key, value)
}
func sanitizeKey(key string) string {
@ -66,20 +64,20 @@ func (j *JSONParser) Process(line []byte, lbs *LabelsBuilder) ([]byte, bool) {
lbs.SetErr(errJSON)
return line, true
}
parseMap("", data, addLabel(lbs))
parseMap("", data, lbs)
return line, true
}
func parseMap(prefix string, data map[string]interface{}, add func(key, value string)) {
func parseMap(prefix string, data map[string]interface{}, lbs *LabelsBuilder) {
for key, val := range data {
switch concrete := val.(type) {
case map[string]interface{}:
parseMap(jsonKey(prefix, key), concrete, add)
parseMap(jsonKey(prefix, key), concrete, lbs)
case string:
add(jsonKey(prefix, key), concrete)
addLabel(lbs, jsonKey(prefix, key), concrete)
case float64:
f := strconv.FormatFloat(concrete, 'f', -1, 64)
add(jsonKey(prefix, key), f)
addLabel(lbs, jsonKey(prefix, key), f)
}
}
}
@ -138,10 +136,9 @@ func mustNewRegexParser(re string) *RegexpParser {
}
func (r *RegexpParser) Process(line []byte, lbs *LabelsBuilder) ([]byte, bool) {
add := addLabel(lbs)
for i, value := range r.regex.FindSubmatch(line) {
if name, ok := r.nameIndex[i]; ok {
add(name, string(value))
addLabel(lbs, name, string(value))
}
}
return line, true
@ -161,11 +158,11 @@ func NewLogfmtParser() *LogfmtParser {
func (l *LogfmtParser) Process(line []byte, lbs *LabelsBuilder) ([]byte, bool) {
l.dec.Reset(line)
add := addLabel(lbs)
for l.dec.ScanKeyval() {
key := string(l.dec.Key())
val := string(l.dec.Value())
add(key, val)
addLabel(lbs, key, val)
}
if l.dec.Err() != nil {
lbs.SetErr(errLogfmt)

@ -77,8 +77,8 @@ func Test_jsonParser_Parse(t *testing.T) {
for _, tt := range tests {
j := NewJSONParser()
t.Run(tt.name, func(t *testing.T) {
b := NewLabelsBuilder()
b.Reset(tt.lbs)
b := NewBaseLabelsBuilder().ForLabels(tt.lbs, tt.lbs.Hash())
b.Reset()
_, _ = j.Process(tt.line, b)
sort.Sort(tt.want)
require.Equal(t, tt.want, b.Labels())
@ -169,8 +169,8 @@ func Test_regexpParser_Parse(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
b := NewLabelsBuilder()
b.Reset(tt.lbs)
b := NewBaseLabelsBuilder().ForLabels(tt.lbs, tt.lbs.Hash())
b.Reset()
_, _ = tt.parser.Process(tt.line, b)
sort.Sort(tt.want)
require.Equal(t, tt.want, b.Labels())
@ -281,8 +281,8 @@ func Test_logfmtParser_Parse(t *testing.T) {
p := NewLogfmtParser()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
b := NewLabelsBuilder()
b.Reset(tt.lbs)
b := NewBaseLabelsBuilder().ForLabels(tt.lbs, tt.lbs.Hash())
b.Reset()
_, _ = p.Process(tt.line, b)
sort.Sort(tt.want)
require.Equal(t, tt.want, b.Labels())

@ -4,9 +4,19 @@ import (
"github.com/prometheus/prometheus/pkg/labels"
)
// Pipeline transform and filter log lines and labels.
var (
// NoopStage is a stage that doesn't process a log line.
NoopStage Stage = &noopStage{}
)
// Pipeline can create pipelines for each log stream.
type Pipeline interface {
Process(line []byte, lbs labels.Labels) ([]byte, labels.Labels, bool)
ForStream(labels labels.Labels) StreamPipeline
}
// StreamPipeline transform and filter log lines and labels.
type StreamPipeline interface {
Process(line []byte) ([]byte, LabelsResult, bool)
}
// Stage is a single step of a Pipeline.
@ -14,15 +24,33 @@ type Stage interface {
Process(line []byte, lbs *LabelsBuilder) ([]byte, bool)
}
var (
NoopPipeline Pipeline = &noopPipeline{}
NoopStage Stage = &noopStage{}
)
// NewNoopPipeline creates a pipelines that does not process anything and returns log streams as is.
func NewNoopPipeline() Pipeline {
return &noopPipeline{
cache: map[uint64]*noopStreamPipeline{},
}
}
type noopPipeline struct {
cache map[uint64]*noopStreamPipeline
}
type noopPipeline struct{}
type noopStreamPipeline struct {
LabelsResult
}
func (noopPipeline) Process(line []byte, lbs labels.Labels) ([]byte, labels.Labels, bool) {
return line, lbs, true
func (n noopStreamPipeline) Process(line []byte) ([]byte, LabelsResult, bool) {
return line, n.LabelsResult, true
}
func (n *noopPipeline) ForStream(labels labels.Labels) StreamPipeline {
h := labels.Hash()
if cached, ok := n.cache[h]; ok {
return cached
}
sp := &noopStreamPipeline{LabelsResult: NewLabelsResult(labels, h)}
n.cache[h] = sp
return sp
}
type noopStage struct{}
@ -40,30 +68,53 @@ func (fn StageFunc) Process(line []byte, lbs *LabelsBuilder) ([]byte, bool) {
// pipeline is a combinations of multiple stages.
// It can also be reduced into a single stage for convenience.
type pipeline struct {
stages []Stage
builder *LabelsBuilder
stages []Stage
baseBuilder *BaseLabelsBuilder
streamPipelines map[uint64]StreamPipeline
}
// NewPipeline creates a new pipeline for a given set of stages.
func NewPipeline(stages []Stage) Pipeline {
if len(stages) == 0 {
return NewNoopPipeline()
}
return &pipeline{
stages: stages,
builder: NewLabelsBuilder(),
stages: stages,
baseBuilder: NewBaseLabelsBuilder(),
streamPipelines: make(map[uint64]StreamPipeline),
}
}
func (p *pipeline) Process(line []byte, lbs labels.Labels) ([]byte, labels.Labels, bool) {
var ok bool
if len(p.stages) == 0 {
return line, lbs, true
type streamPipeline struct {
stages []Stage
builder *LabelsBuilder
}
func (p *pipeline) ForStream(labels labels.Labels) StreamPipeline {
hash := p.baseBuilder.Hash(labels)
if res, ok := p.streamPipelines[hash]; ok {
return res
}
res := &streamPipeline{
stages: p.stages,
builder: p.baseBuilder.ForLabels(labels, hash),
}
p.builder.Reset(lbs)
p.streamPipelines[hash] = res
return res
}
func (p *streamPipeline) Process(line []byte) ([]byte, LabelsResult, bool) {
var ok bool
p.builder.Reset()
for _, s := range p.stages {
line, ok = s.Process(line, p.builder)
if !ok {
return nil, nil, false
}
}
return line, p.builder.Labels(), true
return line, p.builder.LabelsResult(), true
}
// ReduceStages reduces multiple stages into one.

@ -10,7 +10,7 @@ import (
var (
resOK bool
resLine []byte
resLbs labels.Labels
resLbs LabelsResult
)
func Benchmark_Pipeline(b *testing.B) {
@ -39,8 +39,9 @@ func Benchmark_Pipeline(b *testing.B) {
{Name: "pod_template_hash", Value: "5896759c79"},
}
b.ResetTimer()
sp := p.ForStream(lbs)
for n := 0; n < b.N; n++ {
resLine, resLbs, resOK = p.Process(line, lbs)
resLine, resLbs, resOK = sp.Process(line)
}
}

@ -1990,13 +1990,13 @@ func Test_PipelineCombined(t *testing.T) {
p, err := expr.Pipeline()
require.Nil(t, err)
line, lbs, ok := p.Process([]byte(`level=debug ts=2020-10-02T10:10:42.092268913Z caller=logging.go:66 traceID=a9d4d8a928d8db1 msg="POST /api/prom/api/v1/query_range (200) 1.5s"`), labels.Labels{})
sp := p.ForStream(labels.Labels{})
line, lbs, ok := sp.Process([]byte(`level=debug ts=2020-10-02T10:10:42.092268913Z caller=logging.go:66 traceID=a9d4d8a928d8db1 msg="POST /api/prom/api/v1/query_range (200) 1.5s"`))
require.True(t, ok)
require.Equal(
t,
labels.Labels{labels.Label{Name: "caller", Value: "logging.go:66"}, labels.Label{Name: "duration", Value: "1.5s"}, labels.Label{Name: "level", Value: "debug"}, labels.Label{Name: "method", Value: "POST"}, labels.Label{Name: "msg", Value: "POST /api/prom/api/v1/query_range (200) 1.5s"}, labels.Label{Name: "path", Value: "/api/prom/api/v1/query_range"}, labels.Label{Name: "status", Value: "200"}, labels.Label{Name: "traceID", Value: "a9d4d8a928d8db1"}, labels.Label{Name: "ts", Value: "2020-10-02T10:10:42.092268913Z"}},
lbs,
lbs.Labels(),
)
require.Equal(t, string([]byte(`1.5s|POST|200`)), string(line))
}

@ -97,7 +97,8 @@ func processStream(in []logproto.Stream, pipeline log.Pipeline) []logproto.Strea
for _, stream := range in {
for _, e := range stream.Entries {
if l, out, ok := pipeline.Process([]byte(e.Line), mustParseLabels(stream.Labels)); ok {
sp := pipeline.ForStream(mustParseLabels(stream.Labels))
if l, out, ok := sp.Process([]byte(e.Line)); ok {
var s *logproto.Stream
var found bool
s, found = resByStream[out.String()]
@ -124,7 +125,8 @@ func processSeries(in []logproto.Stream, ex log.SampleExtractor) []logproto.Seri
for _, stream := range in {
for _, e := range stream.Entries {
if f, lbs, ok := ex.Process([]byte(e.Line), mustParseLabels(stream.Labels)); ok {
exs := ex.ForStream(mustParseLabels(stream.Labels))
if f, lbs, ok := exs.Process([]byte(e.Line)); ok {
var s *logproto.Series
var found bool
s, found = resBySeries[lbs.String()]

@ -21,6 +21,7 @@ import (
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/log"
"github.com/grafana/loki/pkg/logql/stats"
)
@ -381,19 +382,21 @@ func (it *logBatchIterator) newChunksIterator(b *chunkBatch) (iter.EntryIterator
func (it *logBatchIterator) buildIterators(chks map[model.Fingerprint][][]*LazyChunk, from, through time.Time, nextChunk *LazyChunk) ([]iter.EntryIterator, error) {
result := make([]iter.EntryIterator, 0, len(chks))
for _, chunks := range chks {
if len(chunks) != 0 && len(chunks[0]) != 0 {
streamPipeline := it.pipeline.ForStream(chunks[0][0].Chunk.Metric.WithoutLabels(labels.MetricName))
iterator, err := it.buildHeapIterator(chunks, from, through, streamPipeline, nextChunk)
if err != nil {
return nil, err
}
iterator, err := it.buildHeapIterator(chunks, from, through, nextChunk)
if err != nil {
return nil, err
result = append(result, iterator)
}
result = append(result, iterator)
}
return result, nil
}
func (it *logBatchIterator) buildHeapIterator(chks [][]*LazyChunk, from, through time.Time, nextChunk *LazyChunk) (iter.EntryIterator, error) {
func (it *logBatchIterator) buildHeapIterator(chks [][]*LazyChunk, from, through time.Time, streamPipeline log.StreamPipeline, nextChunk *LazyChunk) (iter.EntryIterator, error) {
result := make([]iter.EntryIterator, 0, len(chks))
for i := range chks {
@ -402,7 +405,7 @@ func (it *logBatchIterator) buildHeapIterator(chks [][]*LazyChunk, from, through
if !chks[i][j].IsValid {
continue
}
iterator, err := chks[i][j].Iterator(it.ctx, from, through, it.direction, it.pipeline, nextChunk)
iterator, err := chks[i][j].Iterator(it.ctx, from, through, it.direction, streamPipeline, nextChunk)
if err != nil {
return nil, err
}
@ -514,17 +517,21 @@ func (it *sampleBatchIterator) newChunksIterator(b *chunkBatch) (iter.SampleIter
func (it *sampleBatchIterator) buildIterators(chks map[model.Fingerprint][][]*LazyChunk, from, through time.Time, nextChunk *LazyChunk) ([]iter.SampleIterator, error) {
result := make([]iter.SampleIterator, 0, len(chks))
for _, chunks := range chks {
iterator, err := it.buildHeapIterator(chunks, from, through, nextChunk)
if err != nil {
return nil, err
if len(chunks) != 0 && len(chunks[0]) != 0 {
streamExtractor := it.extractor.ForStream(chunks[0][0].Chunk.Metric.WithoutLabels(labels.MetricName))
iterator, err := it.buildHeapIterator(chunks, from, through, streamExtractor, nextChunk)
if err != nil {
return nil, err
}
result = append(result, iterator)
}
result = append(result, iterator)
}
return result, nil
}
func (it *sampleBatchIterator) buildHeapIterator(chks [][]*LazyChunk, from, through time.Time, nextChunk *LazyChunk) (iter.SampleIterator, error) {
func (it *sampleBatchIterator) buildHeapIterator(chks [][]*LazyChunk, from, through time.Time, streamExtractor log.StreamSampleExtractor, nextChunk *LazyChunk) (iter.SampleIterator, error) {
result := make([]iter.SampleIterator, 0, len(chks))
for i := range chks {
@ -533,7 +540,7 @@ func (it *sampleBatchIterator) buildHeapIterator(chks [][]*LazyChunk, from, thro
if !chks[i][j].IsValid {
continue
}
iterator, err := chks[i][j].SampleIterator(it.ctx, from, through, it.extractor, nextChunk)
iterator, err := chks[i][j].SampleIterator(it.ctx, from, through, streamExtractor, nextChunk)
if err != nil {
return nil, err
}
@ -755,20 +762,3 @@ outer:
return css
}
// dropLabels returns a new label set with certain labels dropped
func dropLabels(ls labels.Labels, removals ...string) (dst labels.Labels) {
toDel := make(map[string]struct{})
for _, r := range removals {
toDel[r] = struct{}{}
}
for _, l := range ls {
_, remove := toDel[l.Name]
if !remove {
dst = append(dst, l)
}
}
return dst
}

@ -1231,7 +1231,10 @@ func Test_newSampleBatchChunkIterator(t *testing.T) {
for name, tt := range tests {
tt := tt
t.Run(name, func(t *testing.T) {
it, err := newSampleBatchIterator(context.Background(), NilMetrics, tt.chunks, tt.batchSize, newMatchers(tt.matchers), log.CountExtractor.ToSampleExtractor(nil, false, false), tt.start, tt.end)
ex, err := log.NewLineSampleExtractor(log.CountExtractor, nil, nil, false, false)
require.NoError(t, err)
it, err := newSampleBatchIterator(context.Background(), NilMetrics, tt.chunks, tt.batchSize, newMatchers(tt.matchers), ex, tt.start, tt.end)
require.NoError(t, err)
series, _, err := iter.ReadSampleBatch(it, 1000)
_ = it.Close()
@ -1441,7 +1444,7 @@ func TestBuildHeapIterator(t *testing.T) {
ctx: ctx,
pipeline: logql.NoopPipeline,
}
it, err := b.buildHeapIterator(tc.input, from, from.Add(6*time.Millisecond), nil)
it, err := b.buildHeapIterator(tc.input, from, from.Add(6*time.Millisecond), b.pipeline.ForStream(labels.Labels{labels.Label{Name: "foo", Value: "bar"}}), nil)
if err != nil {
t.Errorf("buildHeapIterator error = %v", err)
return
@ -1457,48 +1460,6 @@ func TestBuildHeapIterator(t *testing.T) {
}
}
func TestDropLabels(t *testing.T) {
for i, tc := range []struct {
ls labels.Labels
drop []string
expected labels.Labels
}{
{
ls: labels.Labels{
labels.Label{
Name: "a",
Value: "1",
},
labels.Label{
Name: "b",
Value: "2",
},
labels.Label{
Name: "c",
Value: "3",
},
},
drop: []string{"b"},
expected: labels.Labels{
labels.Label{
Name: "a",
Value: "1",
},
labels.Label{
Name: "c",
Value: "3",
},
},
},
} {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
dropped := dropLabels(tc.ls, tc.drop...)
require.Equal(t, tc.expected, dropped)
})
}
}
func Test_IsInvalidChunkError(t *testing.T) {
tests := []struct {
name string

@ -6,12 +6,11 @@ import (
"time"
"github.com/cortexproject/cortex/pkg/chunk"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/log"
)
// LazyChunk loads the chunk when it is accessed.
@ -33,7 +32,7 @@ func (c *LazyChunk) Iterator(
ctx context.Context,
from, through time.Time,
direction logproto.Direction,
pipeline logql.Pipeline,
pipeline log.StreamPipeline,
nextChunk *LazyChunk,
) (iter.EntryIterator, error) {
@ -59,7 +58,7 @@ func (c *LazyChunk) Iterator(
// if the block is overlapping cache it with the next chunk boundaries.
if nextChunk != nil && IsBlockOverlapping(b, nextChunk, direction) {
// todo(cyriltovena) we can avoid to drop the metric name for each chunks since many chunks have the same metric/labelset.
it := iter.NewCachedIterator(b.Iterator(ctx, dropLabels(c.Chunk.Metric, labels.MetricName), pipeline), b.Entries())
it := iter.NewCachedIterator(b.Iterator(ctx, pipeline), b.Entries())
its = append(its, it)
if c.overlappingBlocks == nil {
c.overlappingBlocks = make(map[int]iter.CacheEntryIterator)
@ -71,7 +70,7 @@ func (c *LazyChunk) Iterator(
delete(c.overlappingBlocks, b.Offset())
}
// non-overlapping block with the next chunk are not cached.
its = append(its, b.Iterator(ctx, dropLabels(c.Chunk.Metric, labels.MetricName), pipeline))
its = append(its, b.Iterator(ctx, pipeline))
}
if direction == logproto.FORWARD {
@ -106,7 +105,7 @@ func (c *LazyChunk) Iterator(
func (c *LazyChunk) SampleIterator(
ctx context.Context,
from, through time.Time,
extractor logql.SampleExtractor,
extractor log.StreamSampleExtractor,
nextChunk *LazyChunk,
) (iter.SampleIterator, error) {
@ -132,7 +131,7 @@ func (c *LazyChunk) SampleIterator(
// if the block is overlapping cache it with the next chunk boundaries.
if nextChunk != nil && IsBlockOverlapping(b, nextChunk, logproto.FORWARD) {
// todo(cyriltovena) we can avoid to drop the metric name for each chunks since many chunks have the same metric/labelset.
it := iter.NewCachedSampleIterator(b.SampleIterator(ctx, dropLabels(c.Chunk.Metric, labels.MetricName), extractor), b.Entries())
it := iter.NewCachedSampleIterator(b.SampleIterator(ctx, extractor), b.Entries())
its = append(its, it)
if c.overlappingSampleBlocks == nil {
c.overlappingSampleBlocks = make(map[int]iter.CacheSampleIterator)
@ -144,7 +143,7 @@ func (c *LazyChunk) SampleIterator(
delete(c.overlappingSampleBlocks, b.Offset())
}
// non-overlapping block with the next chunk are not cached.
its = append(its, b.SampleIterator(ctx, dropLabels(c.Chunk.Metric, labels.MetricName), extractor))
its = append(its, b.SampleIterator(ctx, extractor))
}
// build the final iterator bound to the requested time range.

@ -14,6 +14,7 @@ import (
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/log"
"github.com/grafana/loki/pkg/util"
)
@ -46,7 +47,7 @@ func TestLazyChunkIterator(t *testing.T) {
},
} {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
it, err := tc.chunk.Iterator(context.Background(), time.Unix(0, 0), time.Unix(1000, 0), logproto.FORWARD, logql.NoopPipeline, nil)
it, err := tc.chunk.Iterator(context.Background(), time.Unix(0, 0), time.Unix(1000, 0), logproto.FORWARD, logql.NoopPipeline.ForStream(labels.Labels{labels.Label{Name: "foo", Value: "bar"}}), nil)
require.Nil(t, err)
streams, _, err := iter.ReadBatch(it, 1000)
require.Nil(t, err)
@ -174,10 +175,10 @@ func (fakeBlock) Entries() int { return 0 }
func (fakeBlock) Offset() int { return 0 }
func (f fakeBlock) MinTime() int64 { return f.mint }
func (f fakeBlock) MaxTime() int64 { return f.maxt }
func (fakeBlock) Iterator(context.Context, labels.Labels, logql.Pipeline) iter.EntryIterator {
func (fakeBlock) Iterator(context.Context, log.StreamPipeline) iter.EntryIterator {
return nil
}
func (fakeBlock) SampleIterator(context.Context, labels.Labels, logql.SampleExtractor) iter.SampleIterator {
func (fakeBlock) SampleIterator(context.Context, log.StreamSampleExtractor) iter.SampleIterator {
return nil
}

Loading…
Cancel
Save