refactor: allow sample expressions to have multiple extractors (#16224)

pull/15953/head
Trevor Whitney 1 year ago committed by GitHub
parent 651d410eea
commit 5b16c0be7d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 6
      docs/sources/shared/configuration.md
  2. 2
      pkg/chunkenc/dumb_chunk.go
  3. 4
      pkg/chunkenc/interface.go
  4. 121
      pkg/chunkenc/memchunk.go
  5. 187
      pkg/chunkenc/memchunk_test.go
  6. 61
      pkg/chunkenc/unordered.go
  7. 54
      pkg/chunkenc/unordered_test.go
  8. 117
      pkg/chunkenc/variants.go
  9. 66
      pkg/dataobj/querier/iter.go
  10. 6
      pkg/dataobj/querier/store.go
  11. 36
      pkg/ingester/instance.go
  12. 90
      pkg/ingester/instance_test.go
  13. 4
      pkg/ingester/stream.go
  14. 33
      pkg/logql/engine.go
  15. 21
      pkg/logql/engine_test.go
  16. 31
      pkg/logql/evaluator.go
  17. 57
      pkg/logql/log/metrics_extraction.go
  18. 96
      pkg/logql/log/metrics_extraction_test.go
  19. 125
      pkg/logql/log/parser_hints_test.go
  20. 95
      pkg/logql/syntax/ast.go
  21. 8
      pkg/logql/syntax/extractor.go
  22. 3
      pkg/logql/syntax/extractor_test.go
  23. 62
      pkg/logql/syntax/parser_test.go
  24. 41
      pkg/logql/test_utils.go
  25. 50
      pkg/querier/queryrange/roundtrip.go
  26. 56
      pkg/storage/batch.go
  27. 13
      pkg/storage/batch_test.go
  28. 6
      pkg/storage/lazy_chunk.go
  29. 2
      pkg/storage/lazy_chunk_test.go
  30. 36
      pkg/storage/store.go
  31. 6
      pkg/util/deletion/deletion.go

@ -4052,6 +4052,12 @@ engine:
# CLI flag: -querier.engine.max-count-min-sketch-heap-size
[max_count_min_sketch_heap_size: <int> | default = 10000]
# Enable experimental support for running multiple query variants over the
# same underlying data. For example, running both a rate() and
# count_over_time() query over the same range selector.
# CLI flag: -querier.engine.enable-multi-variant-queries
[enable_multi_variant_queries: <boolean> | default = false]
# The maximum number of queries that can be simultaneously processed by the
# querier.
# CLI flag: -querier.max-concurrent

@ -99,7 +99,7 @@ func (c *dumbChunk) Iterator(_ context.Context, from, through time.Time, directi
}, nil
}
func (c *dumbChunk) SampleIterator(_ context.Context, _, _ time.Time, _ log.StreamSampleExtractor) iter.SampleIterator {
func (c *dumbChunk) SampleIterator(_ context.Context, _, _ time.Time, _ ...log.StreamSampleExtractor) iter.SampleIterator {
return nil
}

@ -55,7 +55,7 @@ type Chunk interface {
// Append returns true if the entry appended was a duplicate
Append(*logproto.Entry) (bool, error)
Iterator(ctx context.Context, mintT, maxtT time.Time, direction logproto.Direction, pipeline log.StreamPipeline) (iter.EntryIterator, error)
SampleIterator(ctx context.Context, from, through time.Time, extractor log.StreamSampleExtractor) iter.SampleIterator
SampleIterator(ctx context.Context, from, through time.Time, extractor ...log.StreamSampleExtractor) iter.SampleIterator
// Returns the list of blocks in the chunks.
Blocks(mintT, maxtT time.Time) []Block
// Size returns the number of entries in a chunk
@ -85,5 +85,5 @@ type Block interface {
// Iterator returns an entry iterator for the block.
Iterator(ctx context.Context, pipeline log.StreamPipeline) iter.EntryIterator
// SampleIterator returns a sample iterator for the block.
SampleIterator(ctx context.Context, extractor log.StreamSampleExtractor) iter.SampleIterator
SampleIterator(ctx context.Context, extractor ...log.StreamSampleExtractor) iter.SampleIterator
}

@ -1065,7 +1065,14 @@ func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, directi
}
// Iterator implements Chunk.
func (c *MemChunk) SampleIterator(ctx context.Context, from, through time.Time, extractor log.StreamSampleExtractor) iter.SampleIterator {
func (c *MemChunk) SampleIterator(
ctx context.Context,
from, through time.Time,
extractors ...log.StreamSampleExtractor,
) iter.SampleIterator {
if len(extractors) == 0 {
return iter.NoopSampleIterator
}
mint, maxt := from.UnixNano(), through.UnixNano()
its := make([]iter.SampleIterator, 0, len(c.blocks)+1)
@ -1089,7 +1096,10 @@ func (c *MemChunk) SampleIterator(ctx context.Context, from, through time.Time,
ordered = false
}
lastMax = b.maxt
its = append(its, encBlock{c.encoding, c.format, c.symbolizer, b}.SampleIterator(ctx, extractor))
its = append(
its,
encBlock{c.encoding, c.format, c.symbolizer, b}.SampleIterator(ctx, extractors...),
)
}
if !c.head.IsEmpty() {
@ -1097,7 +1107,7 @@ func (c *MemChunk) SampleIterator(ctx context.Context, from, through time.Time,
if from < lastMax {
ordered = false
}
its = append(its, c.head.SampleIterator(ctx, mint, maxt, extractor))
its = append(its, c.head.SampleIterator(ctx, mint, maxt, extractors...))
}
var it iter.SampleIterator
@ -1186,11 +1196,21 @@ func (b encBlock) Iterator(ctx context.Context, pipeline log.StreamPipeline) ite
return newEntryIterator(ctx, compression.GetReaderPool(b.enc), b.b, pipeline, b.format, b.symbolizer)
}
func (b encBlock) SampleIterator(ctx context.Context, extractor log.StreamSampleExtractor) iter.SampleIterator {
func (b encBlock) SampleIterator(
ctx context.Context,
extractors ...log.StreamSampleExtractor,
) iter.SampleIterator {
if len(b.b) == 0 {
return iter.NoopSampleIterator
}
return newSampleIterator(ctx, compression.GetReaderPool(b.enc), b.b, b.format, extractor, b.symbolizer)
return newSampleIterator(
ctx,
compression.GetReaderPool(b.enc),
b.b,
b.format,
b.symbolizer,
extractors...,
)
}
func (b block) Offset() int {
@ -1275,45 +1295,61 @@ func (hb *headBlock) Iterator(ctx context.Context, direction logproto.Direction,
return iter.NewStreamsIterator(streamsResult, direction)
}
func (hb *headBlock) SampleIterator(ctx context.Context, mint, maxt int64, extractor log.StreamSampleExtractor) iter.SampleIterator {
func unsafeGetBytes(s string) []byte {
return unsafe.Slice(unsafe.StringData(s), len(s)) // #nosec G103 -- we know the string is not mutated
}
func (hb *headBlock) SampleIterator(
ctx context.Context,
mint, maxt int64,
extractors ...log.StreamSampleExtractor,
) iter.SampleIterator {
if hb.IsEmpty() || (maxt < hb.mint || hb.maxt < mint) {
return iter.NoopSampleIterator
}
stats := stats.FromContext(ctx)
stats.AddHeadChunkLines(int64(len(hb.entries)))
series := map[string]*logproto.Series{}
baseHash := extractor.BaseLabels().Hash()
setQueryReferencedStructuredMetadata := false
for _, e := range hb.entries {
stats.AddHeadChunkBytes(int64(len(e.s)))
value, parsedLabels, ok := extractor.ProcessString(e.t, e.s, e.structuredMetadata...)
if !ok {
continue
}
stats.AddPostFilterLines(1)
var (
found bool
s *logproto.Series
)
for _, extractor := range extractors {
stats.AddHeadChunkBytes(int64(len(e.s)))
value, lbls, ok := extractor.ProcessString(e.t, e.s, e.structuredMetadata...)
if !ok {
continue
}
var (
found bool
s *logproto.Series
)
lblStr := lbls.String()
baseHash := extractor.BaseLabels().Hash()
if s, found = series[lblStr]; !found {
s = &logproto.Series{
Labels: lblStr,
Samples: SamplesPool.Get(len(hb.entries)).([]logproto.Sample)[:0],
StreamHash: baseHash,
}
series[lblStr] = s
}
s.Samples = append(s.Samples, logproto.Sample{
Timestamp: e.t,
Value: value,
Hash: xxhash.Sum64(unsafeGetBytes(e.s)),
})
lbs := parsedLabels.String()
if s, found = series[lbs]; !found {
s = &logproto.Series{
Labels: lbs,
Samples: SamplesPool.Get(len(hb.entries)).([]logproto.Sample)[:0],
StreamHash: baseHash,
if extractor.ReferencedStructuredMetadata() {
setQueryReferencedStructuredMetadata = true
}
series[lbs] = s
}
s.Samples = append(s.Samples, logproto.Sample{
Timestamp: e.t,
Value: value,
Hash: xxhash.Sum64(unsafeGetBytes(e.s)),
})
stats.AddPostFilterLines(1)
}
if extractor.ReferencedStructuredMetadata() {
if setQueryReferencedStructuredMetadata {
stats.SetQueryReferencedStructuredMetadata()
}
if len(series) == 0 {
@ -1331,10 +1367,6 @@ func (hb *headBlock) SampleIterator(ctx context.Context, mint, maxt int64, extra
})
}
func unsafeGetBytes(s string) []byte {
return unsafe.Slice(unsafe.StringData(s), len(s)) // #nosec G103 -- we know the string is not mutated
}
type bufferedIterator struct {
origBytes []byte
stats *stats.Context
@ -1672,10 +1704,25 @@ func (e *entryBufferedIterator) Close() error {
return e.bufferedIterator.Close()
}
func newSampleIterator(ctx context.Context, pool compression.ReaderPool, b []byte, format byte, extractor log.StreamSampleExtractor, symbolizer *symbolizer) iter.SampleIterator {
func newSampleIterator(
ctx context.Context,
pool compression.ReaderPool,
b []byte,
format byte,
symbolizer *symbolizer,
extractors ...log.StreamSampleExtractor,
) iter.SampleIterator {
if len(extractors) == 0 {
return iter.NoopSampleIterator
}
if len(extractors) > 1 {
return newMultiExtractorSampleIterator(ctx, pool, b, format, extractors, symbolizer)
}
return &sampleBufferedIterator{
bufferedIterator: newBufferedIterator(ctx, pool, b, format, symbolizer),
extractor: extractor,
extractor: extractors[0],
stats: stats.FromContext(ctx),
}
}

@ -55,6 +55,13 @@ var (
}
return ex.ForStream(labels.Labels{})
}()
bytesExtractor = func() log.StreamSampleExtractor {
ex, err := log.NewLineSampleExtractor(log.BytesExtractor, nil, nil, false, false)
if err != nil {
panic(err)
}
return ex.ForStream(labels.Labels{})
}()
allPossibleFormats = []struct {
headBlockFmt HeadBlockFmt
chunkFormat byte
@ -110,66 +117,79 @@ func TestBlock(t *testing.T) {
t.Parallel()
chk := newMemChunkWithFormat(chunkFormat, enc, headBlockFmt, testBlockSize, testTargetSize)
cases := []struct {
ts int64
str string
lbs []logproto.LabelAdapter
cut bool
ts int64
str string
bytes float64
lbs []logproto.LabelAdapter
cut bool
}{
{
ts: 1,
str: "hello, world!",
ts: 1,
str: "hello, world!",
bytes: float64(len("hello, world!")),
},
{
ts: 2,
str: "hello, world2!",
ts: 2,
str: "hello, world2!",
bytes: float64(len("hello, world2!")),
lbs: []logproto.LabelAdapter{
{Name: "app", Value: "myapp"},
},
},
{
ts: 3,
str: "hello, world3!",
ts: 3,
str: "hello, world3!",
bytes: float64(len("hello, world3!")),
lbs: []logproto.LabelAdapter{
{Name: "a", Value: "a"},
{Name: "b", Value: "b"},
},
},
{
ts: 4,
str: "hello, world4!",
ts: 4,
str: "hello, world4!",
bytes: float64(len("hello, world4!")),
},
{
ts: 5,
str: "hello, world5!",
ts: 5,
str: "hello, world5!",
bytes: float64(len("hello, world5!")),
},
{
ts: 6,
str: "hello, world6!",
cut: true,
ts: 6,
str: "hello, world6!",
bytes: float64(len("hello, world6!")),
cut: true,
},
{
ts: 7,
str: "hello, world7!",
ts: 7,
str: "hello, world7!",
bytes: float64(len("hello, world7!")),
},
{
ts: 8,
str: "hello, worl\nd8!",
ts: 8,
str: "hello, worl\nd8!",
bytes: float64(len("hello, worl\nd8!")),
},
{
ts: 8,
str: "hello, world 8, 2!",
ts: 8,
str: "hello, world 8, 2!",
bytes: float64(len("hello, world 8, 2!")),
},
{
ts: 8,
str: "hello, world 8, 3!",
ts: 8,
str: "hello, world 8, 3!",
bytes: float64(len("hello, world 8, 3!")),
},
{
ts: 9,
str: "",
ts: 9,
str: "",
bytes: float64(len("")),
},
{
ts: 10,
str: "hello, world10!",
ts: 10,
str: "hello, world10!",
bytes: float64(len("hello, world10!")),
lbs: []logproto.LabelAdapter{
{Name: "a", Value: "a2"},
{Name: "b", Value: "b"},
@ -214,14 +234,6 @@ func TestBlock(t *testing.T) {
require.NoError(t, it.Close())
require.Equal(t, len(cases), idx)
countExtractor := func() log.StreamSampleExtractor {
ex, err := log.NewLineSampleExtractor(log.CountExtractor, nil, nil, false, false)
if err != nil {
panic(err)
}
return ex.ForStream(labels.Labels{})
}()
sampleIt := chk.SampleIterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), countExtractor)
idx = 0
for sampleIt.Next() {
@ -236,6 +248,29 @@ func TestBlock(t *testing.T) {
require.NoError(t, sampleIt.Close())
require.Equal(t, len(cases), idx)
extractors := []log.StreamSampleExtractor{countExtractor, bytesExtractor}
sampleIt = chk.SampleIterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), extractors...)
idx = 0
// 2 extractors, expect 2 samples per original timestamp
for sampleIt.Next() {
s := sampleIt.At()
require.Equal(t, cases[idx].ts, s.Timestamp)
require.Equal(t, 1., s.Value)
require.NotEmpty(t, s.Hash)
require.True(t, sampleIt.Next())
s = sampleIt.At()
require.Equal(t, cases[idx].ts, s.Timestamp)
require.Equal(t, cases[idx].bytes, s.Value)
require.NotEmpty(t, s.Hash)
idx++
}
require.NoError(t, sampleIt.Err())
require.NoError(t, sampleIt.Close())
require.Equal(t, len(cases), idx)
t.Run("bounded-iteration", func(t *testing.T) {
it, err := chk.Iterator(context.Background(), time.Unix(0, 3), time.Unix(0, 7), logproto.FORWARD, noopStreamPipeline)
require.NoError(t, err)
@ -468,15 +503,16 @@ func TestSerialization(t *testing.T) {
}
require.NoError(t, it.Err())
extractor := func() log.StreamSampleExtractor {
countExtractor := func() log.StreamSampleExtractor {
ex, err := log.NewLineSampleExtractor(log.CountExtractor, nil, nil, false, false)
if err != nil {
panic(err)
}
return ex.ForStream(labels.Labels{})
}()
extractors := []log.StreamSampleExtractor{countExtractor, countExtractor}
sampleIt := bc.SampleIterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), extractor)
sampleIt := bc.SampleIterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), extractors...)
for i := 0; i < numSamples; i++ {
require.True(t, sampleIt.Next(), i)
@ -488,6 +524,12 @@ func TestSerialization(t *testing.T) {
} else {
require.Equal(t, labels.EmptyLabels().String(), sampleIt.Labels())
}
// check that the second extractor is returning samples as well
require.True(t, sampleIt.Next())
s = sampleIt.At()
require.Equal(t, int64(i), s.Timestamp)
require.Equal(t, 1., s.Value)
}
require.NoError(t, sampleIt.Err())
@ -1084,6 +1126,38 @@ func BenchmarkHeadBlockSampleIterator(b *testing.B) {
}
}
func BenchmarkHeadBlockMultiExtractorSampleIterator(b *testing.B) {
for _, j := range []int{20000, 10000, 8000, 5000} {
for _, withStructuredMetadata := range []bool{false, true} {
b.Run(fmt.Sprintf("size=%d structuredMetadata=%v", j, withStructuredMetadata), func(b *testing.B) {
h := headBlock{}
var structuredMetadata labels.Labels
if withStructuredMetadata {
structuredMetadata = labels.Labels{{Name: "foo", Value: "foo"}}
}
for i := 0; i < j; i++ {
if _, err := h.Append(int64(i), "this is the append string", structuredMetadata); err != nil {
b.Fatal(err)
}
}
b.ResetTimer()
for n := 0; n < b.N; n++ {
iter := h.SampleIterator(context.Background(), 0, math.MaxInt64, countExtractor, bytesExtractor)
for iter.Next() {
_ = iter.At()
}
iter.Close()
}
})
}
}
}
func TestMemChunk_IteratorBounds(t *testing.T) {
createChunk := func() *MemChunk {
t.Helper()
@ -1345,13 +1419,24 @@ func BenchmarkBufferedIteratorLabels(b *testing.B) {
if err != nil {
b.Fatal(err)
}
ex, err := expr.Extractor()
ex, err := expr.Extractors()
if err != nil {
b.Fatal(err)
}
var iters []iter.SampleIterator
for _, lbs := range labelsSet {
iters = append(iters, c.SampleIterator(context.Background(), time.Unix(0, 0), time.Now(), ex.ForStream(lbs)))
streamExtractors := make([]log.StreamSampleExtractor, 0, len(ex))
for _, extractor := range ex {
streamExtractors = append(streamExtractors, extractor.ForStream(lbs))
}
iters = append(
iters,
c.SampleIterator(
context.Background(),
time.Unix(0, 0),
time.Now(),
streamExtractors...),
)
}
b.ResetTimer()
for n := 0; n < b.N; n++ {
@ -2018,14 +2103,30 @@ func TestMemChunk_IteratorWithStructuredMetadata(t *testing.T) {
expr, err := syntax.ParseSampleExpr(query)
require.NoError(t, err)
extractor, err := expr.Extractor()
extractors, err := expr.Extractors()
require.NoError(t, err)
// We will run the test twice so the iterator will be created twice.
// This is to ensure that the iterator is correctly closed.
for i := 0; i < 2; i++ {
sts, ctx := stats.NewContext(context.Background())
it := chk.SampleIterator(ctx, time.Unix(0, 0), time.Unix(0, math.MaxInt64), extractor.ForStream(streamLabels))
streamExtractors := make(
[]log.StreamSampleExtractor,
0,
len(extractors),
)
for _, extractor := range extractors {
streamExtractors = append(
streamExtractors,
extractor.ForStream(streamLabels),
)
}
it := chk.SampleIterator(
ctx,
time.Unix(0, 0),
time.Unix(0, math.MaxInt64),
streamExtractors...)
var sumValues int
var streams []string

@ -47,7 +47,7 @@ type HeadBlock interface {
ctx context.Context,
mint,
maxt int64,
extractor log.StreamSampleExtractor,
extractor ...log.StreamSampleExtractor,
) iter.SampleIterator
Format() HeadBlockFmt
}
@ -305,15 +305,14 @@ func (hb *unorderedHeadBlock) Iterator(ctx context.Context, direction logproto.D
})
}
// nolint:unused
func (hb *unorderedHeadBlock) SampleIterator(
ctx context.Context,
mint,
maxt int64,
extractor log.StreamSampleExtractor,
extractor ...log.StreamSampleExtractor,
) iter.SampleIterator {
series := map[string]*logproto.Series{}
baseHash := extractor.BaseLabels().Hash()
setQueryReferencedStructuredMetadata := false
var structuredMetadata labels.Labels
_ = hb.forEntries(
ctx,
@ -322,35 +321,43 @@ func (hb *unorderedHeadBlock) SampleIterator(
maxt,
func(statsCtx *stats.Context, ts int64, line string, structuredMetadataSymbols symbols) error {
structuredMetadata = hb.symbolizer.Lookup(structuredMetadataSymbols, structuredMetadata)
value, parsedLabels, ok := extractor.ProcessString(ts, line, structuredMetadata...)
if !ok {
return nil
}
statsCtx.AddPostFilterLines(1)
var (
found bool
s *logproto.Series
)
lbs := parsedLabels.String()
s, found = series[lbs]
if !found {
s = &logproto.Series{
Labels: lbs,
Samples: SamplesPool.Get(hb.lines).([]logproto.Sample)[:0],
StreamHash: baseHash,
for _, extractor := range extractor {
value, lbls, ok := extractor.ProcessString(ts, line, structuredMetadata...)
if !ok {
return nil
}
var (
found bool
s *logproto.Series
)
lblStr := lbls.String()
s, found = series[lblStr]
if !found {
baseHash := extractor.BaseLabels().Hash()
s = &logproto.Series{
Labels: lblStr,
Samples: SamplesPool.Get(hb.lines).([]logproto.Sample)[:0],
StreamHash: baseHash,
}
series[lblStr] = s
}
s.Samples = append(s.Samples, logproto.Sample{
Timestamp: ts,
Value: value,
Hash: xxhash.Sum64(unsafeGetBytes(line)),
})
if extractor.ReferencedStructuredMetadata() {
setQueryReferencedStructuredMetadata = true
}
series[lbs] = s
}
s.Samples = append(s.Samples, logproto.Sample{
Timestamp: ts,
Value: value,
Hash: xxhash.Sum64(unsafeGetBytes(line)),
})
statsCtx.AddPostFilterLines(1)
return nil
},
)
if extractor.ReferencedStructuredMetadata() {
if setQueryReferencedStructuredMetadata {
stats.FromContext(ctx).SetQueryReferencedStructuredMetadata()
}

@ -545,7 +545,12 @@ func BenchmarkUnorderedRead(b *testing.B) {
for _, tc := range tcs {
b.Run(tc.desc, func(b *testing.B) {
for n := 0; n < b.N; n++ {
iterator := tc.c.SampleIterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), countExtractor)
iterator := tc.c.SampleIterator(
context.Background(),
time.Unix(0, 0),
time.Unix(0, math.MaxInt64),
countExtractor,
)
for iterator.Next() {
_ = iterator.At()
}
@ -581,7 +586,12 @@ func TestUnorderedIteratorCountsAllEntries(t *testing.T) {
ct = 0
i = 0
smpl := c.SampleIterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), countExtractor)
smpl := c.SampleIterator(
context.Background(),
time.Unix(0, 0),
time.Unix(0, math.MaxInt64),
countExtractor,
)
for smpl.Next() {
next := smpl.At().Timestamp
require.GreaterOrEqual(t, next, i)
@ -722,17 +732,17 @@ func TestReorderAcrossBlocks(t *testing.T) {
func Test_HeadIteratorHash(t *testing.T) {
lbs := labels.Labels{labels.Label{Name: "foo", Value: "bar"}}
ex, err := log.NewLineSampleExtractor(log.CountExtractor, nil, nil, false, false)
if err != nil {
panic(err)
}
countEx, err := log.NewLineSampleExtractor(log.CountExtractor, nil, nil, false, false)
require.NoError(t, err)
bytesEx, err := log.NewLineSampleExtractor(log.BytesExtractor, nil, nil, false, false)
require.NoError(t, err)
for name, b := range map[string]HeadBlock{
"unordered": newUnorderedHeadBlock(UnorderedHeadBlockFmt, nil),
"unordered with structured metadata": newUnorderedHeadBlock(UnorderedWithStructuredMetadataHeadBlockFmt, newSymbolizer()),
"ordered": &headBlock{},
} {
t.Run(name, func(t *testing.T) {
t.Run(fmt.Sprintf("%s SampleIterator", name), func(t *testing.T) {
dup, err := b.Append(1, "foo", labels.Labels{{Name: "foo", Value: "bar"}})
require.False(t, dup)
require.NoError(t, err)
@ -742,7 +752,35 @@ func Test_HeadIteratorHash(t *testing.T) {
require.Equal(t, lbs.Hash(), eit.StreamHash())
}
sit := b.SampleIterator(context.TODO(), 0, 2, ex.ForStream(lbs))
sit := b.SampleIterator(context.TODO(), 0, 2, countEx.ForStream(lbs))
for sit.Next() {
require.Equal(t, lbs.Hash(), sit.StreamHash())
}
})
t.Run(fmt.Sprintf("%s SampleIterator with multiple extractors", name), func(t *testing.T) {
dup, err := b.Append(1, "bar", labels.Labels{{Name: "bar", Value: "foo"}})
require.False(t, dup)
require.NoError(t, err)
eit := b.Iterator(
context.Background(),
logproto.BACKWARD,
0,
2,
log.NewNoopPipeline().ForStream(lbs),
)
for eit.Next() {
require.Equal(t, lbs.Hash(), eit.StreamHash())
}
sit := b.SampleIterator(
context.TODO(),
0,
2,
countEx.ForStream(lbs),
bytesEx.ForStream(lbs),
)
for sit.Next() {
require.Equal(t, lbs.Hash(), sit.StreamHash())
}

@ -0,0 +1,117 @@
package chunkenc
import (
"context"
"sort"
"github.com/cespare/xxhash/v2"
"github.com/prometheus/prometheus/model/labels"
"github.com/grafana/loki/v3/pkg/compression"
"github.com/grafana/loki/v3/pkg/iter"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/log"
"github.com/grafana/loki/v3/pkg/logqlmodel/stats"
)
func newMultiExtractorSampleIterator(ctx context.Context, pool compression.ReaderPool, b []byte, format byte, extractors []log.StreamSampleExtractor, symbolizer *symbolizer) iter.SampleIterator {
return &multiExtractorSampleBufferedIterator{
bufferedIterator: newBufferedIterator(ctx, pool, b, format, symbolizer),
extractors: extractors,
stats: stats.FromContext(ctx),
}
}
// TODO(twhitney): Once multi-variant queries have been validated,
// we should merge this into the regular sampledBufferedIterator.
type multiExtractorSampleBufferedIterator struct {
*bufferedIterator
extractors []log.StreamSampleExtractor
stats *stats.Context
cur []logproto.Sample
currLabels []log.LabelsResult
currBaseLabels []log.LabelsResult
}
func (e *multiExtractorSampleBufferedIterator) Next() bool {
if len(e.cur) > 1 {
e.cur = e.cur[1:]
e.currLabels = e.currLabels[1:]
e.currBaseLabels = e.currBaseLabels[1:]
return true
}
if len(e.cur) == 1 {
e.cur = e.cur[:0]
e.currLabels = e.currLabels[:0]
e.currBaseLabels = e.currBaseLabels[:0]
}
for e.bufferedIterator.Next() {
e.stats.AddPostFilterLines(1)
for _, extractor := range e.extractors {
val, lbls, ok := extractor.Process(e.currTs, e.currLine, e.currStructuredMetadata...)
if !ok {
continue
}
e.currLabels = append(e.currLabels, lbls)
e.currBaseLabels = append(e.currBaseLabels, extractor.BaseLabels())
e.cur = append(e.cur, logproto.Sample{
Value: val,
Hash: xxhash.Sum64(e.currLine),
Timestamp: e.currTs,
})
}
// catch the case where no extractors were ok
if len(e.cur) <= 1 {
continue
}
return true
}
return false
}
func flattenLabels(buf labels.Labels, many ...labels.Labels) labels.Labels {
var size int
for _, lbls := range many {
size += len(lbls)
}
if buf == nil || cap(buf) < size {
buf = make(labels.Labels, 0, size)
} else {
buf = buf[:0]
}
for _, lbls := range many {
buf = append(buf, lbls...)
}
sort.Sort(buf)
return buf
}
func (e *multiExtractorSampleBufferedIterator) Close() error {
for _, extractor := range e.extractors {
if extractor.ReferencedStructuredMetadata() {
e.stats.SetQueryReferencedStructuredMetadata()
}
}
return e.bufferedIterator.Close()
}
func (e *multiExtractorSampleBufferedIterator) Labels() string { return e.currLabels[0].String() }
func (e *multiExtractorSampleBufferedIterator) StreamHash() uint64 { return e.currBaseLabels[0].Hash() }
func (e *multiExtractorSampleBufferedIterator) At() logproto.Sample {
return e.cur[0]
}

@ -278,7 +278,7 @@ func (s *sliceIterator) Close() error {
func newSampleIterator(ctx context.Context,
streams map[int64]dataobj.Stream,
extractor syntax.SampleExtractor,
extractors []syntax.SampleExtractor,
reader *dataobj.LogsReader,
) (iter.SampleIterator, error) {
bufPtr := recordsPool.Get().(*[]dataobj.Record)
@ -312,36 +312,42 @@ func newSampleIterator(ctx context.Context,
continue
}
// Handle stream transition
if prevStreamID != record.StreamID {
iterators = appendIteratorFromSeries(iterators, series)
clear(series)
streamExtractor = extractor.ForStream(stream.Labels)
streamHash = streamExtractor.BaseLabels().Hash()
prevStreamID = record.StreamID
}
// Process the record
timestamp := record.Timestamp.UnixNano()
value, parsedLabels, ok := streamExtractor.ProcessString(timestamp, record.Line, record.Metadata...)
if !ok {
continue
}
// Get or create series for the parsed labels
labelString := parsedLabels.String()
s, exists := series[labelString]
if !exists {
s = createNewSeries(labelString, streamHash)
series[labelString] = s
for _, extractor := range extractors {
// Handle stream transition
if prevStreamID != record.StreamID {
iterators = appendIteratorFromSeries(iterators, series)
clear(series)
streamExtractor = extractor.ForStream(stream.Labels)
streamHash = streamExtractor.BaseLabels().Hash()
prevStreamID = record.StreamID
}
// Process the record
timestamp := record.Timestamp.UnixNano()
// TODO(twhitney): when iterating over multiple extractors, we need a way to pre-process as much of the line as possible
// In the case of multi-variant expressions, the only difference between the multiple extractors should be the final value, with all
// other filters and processing already done.
value, parsedLabels, ok := streamExtractor.ProcessString(timestamp, record.Line, record.Metadata...)
if !ok {
continue
}
// Get or create series for the parsed labels
labelString := parsedLabels.String()
s, exists := series[labelString]
if !exists {
s = createNewSeries(labelString, streamHash)
series[labelString] = s
}
// Add sample to the series
s.Samples = append(s.Samples, logproto.Sample{
Timestamp: timestamp,
Value: value,
Hash: 0,
})
}
// Add sample to the series
s.Samples = append(s.Samples, logproto.Sample{
Timestamp: timestamp,
Value: value,
Hash: 0,
})
}
}

@ -489,12 +489,12 @@ func (s *shardedObject) selectSamples(ctx context.Context, streamsPredicate data
sp.LogKV("msg", "starting selectSamples in section", "index", i)
defer sp.LogKV("msg", "selectSamples section done", "index", i)
}
// extractor is not thread safe, so we need to create a new one for each object
extractor, err := expr.Extractor()
// extractors is not thread safe, so we need to create a new one for each object
extractors, err := expr.Extractors()
if err != nil {
return err
}
iter, err := newSampleIterator(ctx, s.streams, extractor, reader)
iter, err := newSampleIterator(ctx, s.streams, extractors, reader)
if err != nil {
return err
}

@ -539,23 +539,28 @@ func (i *instance) querySample(ctx context.Context, req logql.SelectSampleParams
return nil, err
}
extractor, err := expr.Extractor()
extractors, err := expr.Extractors()
if err != nil {
return nil, err
}
extractor, err = deletion.SetupExtractor(req, extractor)
if err != nil {
return nil, err
}
if i.extractorWrapper != nil && httpreq.ExtractHeader(ctx, httpreq.LokiDisablePipelineWrappersHeader) != "true" {
userID, err := tenant.TenantID(ctx)
for j, extractor := range extractors {
extractor, err = deletion.SetupExtractor(req, extractor)
if err != nil {
return nil, err
}
extractor = i.extractorWrapper.Wrap(ctx, extractor, req.Plan.String(), userID)
if i.extractorWrapper != nil &&
httpreq.ExtractHeader(ctx, httpreq.LokiDisablePipelineWrappersHeader) != "true" {
userID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
}
extractor = i.extractorWrapper.Wrap(ctx, extractor, req.Plan.String(), userID)
}
extractors[j] = extractor
}
stats := stats.FromContext(ctx)
@ -575,7 +580,18 @@ func (i *instance) querySample(ctx context.Context, req logql.SelectSampleParams
selector.Matchers(),
shard,
func(stream *stream) error {
iter, err := stream.SampleIterator(ctx, stats, req.Start, req.End, extractor.ForStream(stream.labels))
streamExtractors := make([]log.StreamSampleExtractor, 0, len(extractors))
for _, extractor := range extractors {
streamExtractors = append(streamExtractors, extractor.ForStream(stream.labels))
}
iter, err := stream.SampleIterator(
ctx,
stats,
req.Start,
req.End,
streamExtractors...,
)
if err != nil {
return err
}

@ -846,30 +846,36 @@ func Test_ExtractorWrapper(t *testing.T) {
}
instance.extractorWrapper = wrapper
ctx := user.InjectOrgID(context.Background(), "test-user")
it, err := instance.QuerySample(ctx,
logql.SelectSampleParams{
SampleQueryRequest: &logproto.SampleQueryRequest{
Selector: `sum(count_over_time({job="3"}[1m]))`,
Start: time.Unix(0, 0),
End: time.Unix(0, 100000000),
Shards: []string{astmapper.ShardAnnotation{Shard: 0, Of: 1}.String()},
Plan: &plan.QueryPlan{
AST: syntax.MustParseExpr(`sum(count_over_time({job="3"}[1m]))`),
t.Run("single extractor samples", func(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), "test-user")
it, err := instance.QuerySample(ctx,
logql.SelectSampleParams{
SampleQueryRequest: &logproto.SampleQueryRequest{
Selector: `sum(count_over_time({job="3"}[1m]))`,
Start: time.Unix(0, 0),
End: time.Unix(0, 100000000),
Shards: []string{astmapper.ShardAnnotation{Shard: 0, Of: 1}.String()},
Plan: &plan.QueryPlan{
AST: syntax.MustParseExpr(`sum(count_over_time({job="3"}[1m]))`),
},
},
},
},
)
require.NoError(t, err)
defer it.Close()
)
require.NoError(t, err)
defer it.Close()
for it.Next() {
// Consume the iterator
require.NoError(t, it.Err())
}
for it.Next() {
// Consume the iterator
require.NoError(t, it.Err())
}
require.Equal(t, `sum(count_over_time({job="3"}[1m]))`, wrapper.query)
require.Equal(t, 10, wrapper.extractor.sp.called) // we've passed every log line through the wrapper
require.Equal(t, `sum(count_over_time({job="3"}[1m]))`, wrapper.query)
require.Equal(
t,
10,
wrapper.extractor.sp.called,
) // we've passed every log line through the wrapper
})
}
func Test_ExtractorWrapper_disabled(t *testing.T) {
@ -880,31 +886,33 @@ func Test_ExtractorWrapper_disabled(t *testing.T) {
}
instance.extractorWrapper = wrapper
ctx := user.InjectOrgID(context.Background(), "test-user")
ctx = httpreq.InjectHeader(ctx, httpreq.LokiDisablePipelineWrappersHeader, "true")
it, err := instance.QuerySample(ctx,
logql.SelectSampleParams{
SampleQueryRequest: &logproto.SampleQueryRequest{
Selector: `sum(count_over_time({job="3"}[1m]))`,
Start: time.Unix(0, 0),
End: time.Unix(0, 100000000),
Shards: []string{astmapper.ShardAnnotation{Shard: 0, Of: 1}.String()},
Plan: &plan.QueryPlan{
AST: syntax.MustParseExpr(`sum(count_over_time({job="3"}[1m]))`),
t.Run("single extractor samples", func(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), "test-user")
ctx = httpreq.InjectHeader(ctx, httpreq.LokiDisablePipelineWrappersHeader, "true")
it, err := instance.QuerySample(ctx,
logql.SelectSampleParams{
SampleQueryRequest: &logproto.SampleQueryRequest{
Selector: `sum(count_over_time({job="3"}[1m]))`,
Start: time.Unix(0, 0),
End: time.Unix(0, 100000000),
Shards: []string{astmapper.ShardAnnotation{Shard: 0, Of: 1}.String()},
Plan: &plan.QueryPlan{
AST: syntax.MustParseExpr(`sum(count_over_time({job="3"}[1m]))`),
},
},
},
},
)
require.NoError(t, err)
defer it.Close()
)
require.NoError(t, err)
defer it.Close()
for it.Next() {
// Consume the iterator
require.NoError(t, it.Err())
}
for it.Next() {
// Consume the iterator
require.NoError(t, it.Err())
}
require.Equal(t, ``, wrapper.query)
require.Equal(t, 0, wrapper.extractor.sp.called) // we've passed every log line through the wrapper
require.Equal(t, ``, wrapper.query)
require.Equal(t, 0, wrapper.extractor.sp.called) // we've passed every log line through the wrapper
})
}
type testExtractorWrapper struct {

@ -611,7 +611,7 @@ func (s *stream) Iterator(ctx context.Context, statsCtx *stats.Context, from, th
}
// Returns an SampleIterator.
func (s *stream) SampleIterator(ctx context.Context, statsCtx *stats.Context, from, through time.Time, extractor log.StreamSampleExtractor) (iter.SampleIterator, error) {
func (s *stream) SampleIterator(ctx context.Context, statsCtx *stats.Context, from, through time.Time, extractors ...log.StreamSampleExtractor) (iter.SampleIterator, error) {
s.chunkMtx.RLock()
defer s.chunkMtx.RUnlock()
iterators := make([]iter.SampleIterator, 0, len(s.chunks))
@ -632,7 +632,7 @@ func (s *stream) SampleIterator(ctx context.Context, statsCtx *stats.Context, fr
}
lastMax = maxt
if itr := c.chunk.SampleIterator(ctx, from, through, extractor); itr != nil {
if itr := c.chunk.SampleIterator(ctx, from, through, extractors...); itr != nil {
iterators = append(iterators, itr)
}
}

@ -64,6 +64,7 @@ type QueryParams interface {
GetStart() time.Time
GetEnd() time.Time
GetShards() []string
GetDeletes() []*logproto.Delete
}
// SelectParams specifies parameters passed to data selections.
@ -150,11 +151,31 @@ type EngineOpts struct {
// MaxCountMinSketchHeapSize is the maximum number of labels the heap for a topk query using a count min sketch
// can track. This impacts the memory usage and accuracy of a sharded probabilistic topk query.
MaxCountMinSketchHeapSize int `yaml:"max_count_min_sketch_heap_size"`
// EnableMutiVariantQueries enables support for running multiple query variants over the same underlying data.
// For example, running both a rate() and count_over_time() query over the same range selector.
EnableMutiVariantQueries bool `yaml:"enable_multi_variant_queries"`
}
func (opts *EngineOpts) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.DurationVar(&opts.MaxLookBackPeriod, prefix+".engine.max-lookback-period", 30*time.Second, "The maximum amount of time to look back for log lines. Used only for instant log queries.")
f.IntVar(&opts.MaxCountMinSketchHeapSize, prefix+".engine.max-count-min-sketch-heap-size", 10_000, "The maximum number of labels the heap of a topk query using a count min sketch can track.")
f.DurationVar(
&opts.MaxLookBackPeriod,
prefix+".engine.max-lookback-period",
30*time.Second,
"The maximum amount of time to look back for log lines. Used only for instant log queries.",
)
f.IntVar(
&opts.MaxCountMinSketchHeapSize,
prefix+".engine.max-count-min-sketch-heap-size",
10_000,
"The maximum number of labels the heap of a topk query using a count min sketch can track.",
)
f.BoolVar(
&opts.EnableMutiVariantQueries,
prefix+".engine.enable-multi-variant-queries",
false,
"Enable experimental support for running multiple query variants over the same underlying data. For example, running both a rate() and count_over_time() query over the same range selector.",
)
// Log executing query by default
opts.LogExecutingQuery = true
}
@ -196,6 +217,7 @@ func (ng *Engine) Query(params Params) Query {
record: true,
logExecQuery: ng.opts.LogExecutingQuery,
limits: ng.limits,
multiVariant: ng.opts.EnableMutiVariantQueries,
}
}
@ -212,6 +234,7 @@ type query struct {
evaluator EvaluatorFactory
record bool
logExecQuery bool
multiVariant bool
}
func (q *query) resultLength(res promql_parser.Value) int {
@ -324,7 +347,11 @@ func (q *query) Eval(ctx context.Context) (promql_parser.Value, error) {
streams, err := readStreams(itr, q.params.Limit(), q.params.Direction(), q.params.Interval())
return streams, err
case syntax.VariantsExpr:
return nil, logqlmodel.ErrVariantsDisabled
if !q.multiVariant {
return nil, logqlmodel.ErrVariantsDisabled
}
return nil, errors.New("variants not yet implemented")
default:
return nil, fmt.Errorf("unexpected type (%T): cannot evaluate", e)
}

@ -73,7 +73,6 @@ func TestEngine_checkIntervalLimit(t *testing.T) {
}
})
}
}
}
@ -2352,7 +2351,10 @@ func (metaQuerier) SelectLogs(ctx context.Context, _ SelectLogParams) (iter.Entr
return iter.NoopEntryIterator, nil
}
func (metaQuerier) SelectSamples(ctx context.Context, _ SelectSampleParams) (iter.SampleIterator, error) {
func (metaQuerier) SelectSamples(
ctx context.Context,
_ SelectSampleParams,
) (iter.SampleIterator, error) {
_ = metadata.JoinHeaders(ctx, []*definitions.PrometheusResponseHeader{
{Name: "Header", Values: []string{"value"}},
})
@ -2666,9 +2668,13 @@ func TestHashingStability(t *testing.T) {
func TestUnexpectedEmptyResults(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), "fake")
mock := &mockEvaluatorFactory{SampleEvaluatorFunc(func(context.Context, SampleEvaluatorFactory, syntax.SampleExpr, Params) (StepEvaluator, error) {
return EmptyEvaluator[SampleVector]{value: nil}, nil
})}
mock := &mockEvaluatorFactory{
SampleEvaluatorFunc(
func(context.Context, SampleEvaluatorFactory, syntax.SampleExpr, Params) (StepEvaluator, error) {
return EmptyEvaluator[SampleVector]{value: nil}, nil
},
),
}
eng := NewEngine(EngineOpts{}, nil, NoLimits, log.NewNopLogger())
params, err := NewLiteralParams(`first_over_time({a=~".+"} | logfmt | unwrap value [1s])`, time.Now(), time.Now(), 0, 0, logproto.BACKWARD, 0, nil, nil)
@ -2769,7 +2775,10 @@ func (q *querierRecorder) SelectLogs(_ context.Context, p SelectLogParams) (iter
return iter.NewStreamsIterator(streams, p.Direction), nil
}
func (q *querierRecorder) SelectSamples(_ context.Context, p SelectSampleParams) (iter.SampleIterator, error) {
func (q *querierRecorder) SelectSamples(
_ context.Context,
p SelectSampleParams,
) (iter.SampleIterator, error) {
if !q.match {
for _, s := range q.series {
return iter.NewMultiSeriesIterator(s), nil

@ -235,22 +235,23 @@ func ParamOverridesFromShard(base Params, shard *ShardWithChunkRefs) (result Par
// Sortable logql contain sort or sort_desc.
func Sortable(q Params) (bool, error) {
var sortable bool
expr, ok := q.GetExpression().(syntax.SampleExpr)
if !ok {
return false, errors.New("only sample expression supported")
switch expr := q.GetExpression().(type) {
case syntax.VariantsExpr:
return false, nil
case syntax.SampleExpr:
var sortable bool
expr.Walk(func(e syntax.Expr) {
if rangeExpr, ok := e.(*syntax.VectorAggregationExpr); ok {
if rangeExpr.Operation == syntax.OpTypeSort || rangeExpr.Operation == syntax.OpTypeSortDesc {
sortable = true
return
}
}
})
return sortable, nil
default:
return false, errors.New("only sample and variants expressions supported")
}
expr.Walk(func(e syntax.Expr) {
rangeExpr, ok := e.(*syntax.VectorAggregationExpr)
if !ok {
return
}
if rangeExpr.Operation == syntax.OpTypeSort || rangeExpr.Operation == syntax.OpTypeSortDesc {
sortable = true
return
}
})
return sortable, nil
}
// EvaluatorFactory is an interface for iterating over data at different nodes in the AST

@ -320,3 +320,60 @@ func convertBytes(v string) (float64, error) {
}
return float64(b), nil
}
type variantsStreamSampleExtractorWrapper struct {
StreamSampleExtractor
index int
}
func NewVariantsStreamSampleExtractorWrapper(
index int,
extractor StreamSampleExtractor,
) StreamSampleExtractor {
return &variantsStreamSampleExtractorWrapper{
StreamSampleExtractor: extractor,
index: index,
}
}
func (v *variantsStreamSampleExtractorWrapper) BaseLabels() LabelsResult {
return appendVariantLabel(v.StreamSampleExtractor.BaseLabels(), v.index)
}
func (v *variantsStreamSampleExtractorWrapper) Process(
ts int64,
line []byte,
structuredMetadata ...labels.Label,
) (float64, LabelsResult, bool) {
n, lbls, ok := v.StreamSampleExtractor.Process(ts, line, structuredMetadata...)
if !ok {
return n, lbls, ok
}
return n, appendVariantLabel(lbls, v.index), ok
}
func (v *variantsStreamSampleExtractorWrapper) ProcessString(
ts int64,
line string,
structuredMetadata ...labels.Label,
) (float64, LabelsResult, bool) {
n, lbls, ok := v.StreamSampleExtractor.ProcessString(ts, line, structuredMetadata...)
if !ok {
return n, lbls, ok
}
return n, appendVariantLabel(lbls, v.index), ok
}
func appendVariantLabel(lbls LabelsResult, variantIndex int) LabelsResult {
newLbls := lbls.Stream()
newLbls = append(newLbls, labels.Label{
Name: "__variant__",
Value: strconv.Itoa(variantIndex),
})
builder := NewBaseLabelsBuilder().ForLabels(newLbls, newLbls.Hash())
builder.Add(StructuredMetadataLabel, lbls.StructuredMetadata()...)
builder.Add(ParsedLabel, lbls.Parsed()...)
return builder.LabelsResult()
}

@ -491,17 +491,103 @@ func (p *stubExtractor) ForStream(_ labels.Labels) StreamSampleExtractor {
type stubStreamExtractor struct{}
func (p *stubStreamExtractor) BaseLabels() LabelsResult {
return nil
builder := NewBaseLabelsBuilder().ForLabels(labels.FromStrings("foo", "bar"), 0)
return builder.LabelsResult()
}
func (p *stubStreamExtractor) Process(_ int64, _ []byte, _ ...labels.Label) (float64, LabelsResult, bool) {
return 0, nil, true
func (p *stubStreamExtractor) Process(
_ int64,
_ []byte,
structuredMetadata ...labels.Label,
) (float64, LabelsResult, bool) {
builder := NewBaseLabelsBuilder().ForLabels(labels.FromStrings("foo", "bar"), 0)
builder.Add(StructuredMetadataLabel, structuredMetadata...)
return 1.0, builder.LabelsResult(), true
}
func (p *stubStreamExtractor) ProcessString(_ int64, _ string, _ ...labels.Label) (float64, LabelsResult, bool) {
return 0, nil, true
func (p *stubStreamExtractor) ProcessString(
_ int64,
_ string,
structuredMetadata ...labels.Label,
) (float64, LabelsResult, bool) {
builder := NewBaseLabelsBuilder().ForLabels(labels.FromStrings("foo", "bar"), 0)
builder.Add(StructuredMetadataLabel, structuredMetadata...)
return 1.0, builder.LabelsResult(), true
}
func (p *stubStreamExtractor) ReferencedStructuredMetadata() bool {
return false
}
func TestVariantsStreamSampleExtractorWrapper(t *testing.T) {
tests := []struct {
name string
index int
input string
labels labels.Labels
structuredMetadata labels.Labels
want float64
wantLbs labels.Labels
wantBaseLbs labels.Labels
}{
{
name: "extraction with variant 0",
index: 0,
input: "test line",
labels: labels.FromStrings("foo", "bar"),
want: 1.0,
wantLbs: labels.FromStrings("foo", "bar", "__variant__", "0"),
wantBaseLbs: labels.FromStrings("foo", "bar", "__variant__", "0"),
},
{
name: "extraction with variant 1",
index: 1,
input: "test line",
labels: labels.FromStrings("foo", "bar"),
want: 1.0,
wantLbs: labels.FromStrings("foo", "bar", "__variant__", "1"),
wantBaseLbs: labels.FromStrings("foo", "bar", "__variant__", "1"),
},
{
name: "with structured metadata",
index: 2,
input: "test line",
labels: labels.FromStrings("foo", "bar"),
structuredMetadata: labels.FromStrings("meta", "data"),
want: 1.0,
wantLbs: labels.FromStrings(
"foo",
"bar",
"__variant__",
"2",
"meta",
"data",
),
wantBaseLbs: labels.FromStrings("foo", "bar", "__variant__", "2"),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Create base extractor that always returns 1.0 and the input labels
baseExtractor := &stubStreamExtractor{}
wrapped := NewVariantsStreamSampleExtractorWrapper(tt.index, baseExtractor)
// Test Process
val, lbs, ok := wrapped.Process(0, []byte(tt.input), tt.structuredMetadata...)
require.Equal(t, true, ok)
require.Equal(t, tt.want, val)
require.Equal(t, tt.wantLbs, lbs.Labels())
// Test ProcessString
val, lbs, ok = wrapped.ProcessString(0, tt.input, tt.structuredMetadata...)
require.Equal(t, true, ok)
require.Equal(t, tt.want, val)
require.Equal(t, tt.wantLbs, lbs.Labels())
// Test BaseLabels
baseLbs := wrapped.BaseLabels()
require.Equal(t, tt.wantBaseLbs, baseLbs.Labels())
})
}
}

@ -50,179 +50,179 @@ func Test_ParserHints(t *testing.T) {
t.Parallel()
for _, tt := range []struct {
expr string
line []byte
expectOk bool
expectVal float64
expectLbs string
expr string
line []byte
expectOk bool
expectVals []float64
expectLbs []string
}{
{
`rate({app="nginx"} | json | response_status = 204 [1m])`,
jsonLine,
true,
1.0,
`{app="nginx", cluster="us-central-west", cluster_extracted="us-east-west", message_message="foo", protocol="HTTP/2.0", remote_user="foo", request_host="foo.grafana.net", request_method="POST", request_size="101", request_time="30.001", request_uri="/rpc/v2/stage", response_latency_seconds="30.001", response_status="204", upstream_addr="10.0.0.1:80"}`,
[]float64{1.0},
[]string{"{app=\"nginx\", cluster=\"us-central-west\", cluster_extracted=\"us-east-west\", message_message=\"foo\", protocol=\"HTTP/2.0\", remote_user=\"foo\", request_host=\"foo.grafana.net\", request_method=\"POST\", request_size=\"101\", request_time=\"30.001\", request_uri=\"/rpc/v2/stage\", response_latency_seconds=\"30.001\", response_status=\"204\", upstream_addr=\"10.0.0.1:80\"}"},
},
{
`sum without (request_host,app,cluster) (rate({app="nginx"} | json | __error__="" | response_status = 204 [1m]))`,
jsonLine,
true,
1.0,
`{cluster_extracted="us-east-west", message_message="foo", protocol="HTTP/2.0", remote_user="foo", request_method="POST", request_size="101", request_time="30.001", request_uri="/rpc/v2/stage", response_latency_seconds="30.001", response_status="204", upstream_addr="10.0.0.1:80"}`,
[]float64{1.0},
[]string{"{cluster_extracted=\"us-east-west\", message_message=\"foo\", protocol=\"HTTP/2.0\", remote_user=\"foo\", request_method=\"POST\", request_size=\"101\", request_time=\"30.001\", request_uri=\"/rpc/v2/stage\", response_latency_seconds=\"30.001\", response_status=\"204\", upstream_addr=\"10.0.0.1:80\"}"},
},
{
`sum by (request_host,app) (rate({app="nginx"} | json | __error__="" | response_status = 204 [1m]))`,
jsonLine,
true,
1.0,
`{app="nginx", request_host="foo.grafana.net"}`,
[]float64{1.0},
[]string{"{app=\"nginx\", request_host=\"foo.grafana.net\"}"},
},
{
`sum(rate({app="nginx"} | json | __error__="" | response_status = 204 [1m]))`,
jsonLine,
true,
1.0,
`{}`,
[]float64{1.0},
[]string{"{}"},
},
{
`sum(rate({app="nginx"} | json [1m]))`,
jsonLine,
true,
1.0,
`{}`,
[]float64{1.0},
[]string{"{}"},
},
{
`sum(rate({app="nginx"} | json | unwrap response_latency_seconds [1m]))`,
jsonLine,
true,
30.001,
`{}`,
[]float64{30.001},
[]string{"{}"},
},
{
`sum(rate({app="nginx"} | json | response_status = 204 | unwrap response_latency_seconds [1m]))`,
jsonLine,
true,
30.001,
`{}`,
[]float64{30.001},
[]string{"{}"},
},
{
`sum by (request_host,app)(rate({app="nginx"} | json | response_status = 204 and remote_user = "foo" | unwrap response_latency_seconds [1m]))`,
jsonLine,
true,
30.001,
`{app="nginx", request_host="foo.grafana.net"}`,
[]float64{30.001},
[]string{`{app="nginx", request_host="foo.grafana.net"}`},
},
{
`rate({app="nginx"} | json | response_status = 204 | unwrap response_latency_seconds [1m])`,
jsonLine,
true,
30.001,
`{app="nginx", cluster="us-central-west", cluster_extracted="us-east-west", message_message="foo", protocol="HTTP/2.0", remote_user="foo", request_host="foo.grafana.net", request_method="POST", request_size="101", request_time="30.001", request_uri="/rpc/v2/stage", response_status="204", upstream_addr="10.0.0.1:80"}`,
[]float64{30.001},
[]string{"{app=\"nginx\", cluster=\"us-central-west\", cluster_extracted=\"us-east-west\", message_message=\"foo\", protocol=\"HTTP/2.0\", remote_user=\"foo\", request_host=\"foo.grafana.net\", request_method=\"POST\", request_size=\"101\", request_time=\"30.001\", request_uri=\"/rpc/v2/stage\", response_status=\"204\", upstream_addr=\"10.0.0.1:80\"}"},
},
{
`sum without (request_host,app,cluster)(rate({app="nginx"} | json | response_status = 204 | unwrap response_latency_seconds [1m]))`,
jsonLine,
true,
30.001,
`{cluster_extracted="us-east-west", message_message="foo", protocol="HTTP/2.0", remote_user="foo", request_method="POST", request_size="101", request_time="30.001", request_uri="/rpc/v2/stage", response_status="204", upstream_addr="10.0.0.1:80"}`,
[]float64{30.001},
[]string{`{cluster_extracted="us-east-west", message_message="foo", protocol="HTTP/2.0", remote_user="foo", request_method="POST", request_size="101", request_time="30.001", request_uri="/rpc/v2/stage", response_status="204", upstream_addr="10.0.0.1:80"}`},
},
{
`sum(rate({app="nginx"} | logfmt | org_id=3677 | unwrap Ingester_TotalReached[1m]))`,
logfmtLine,
true,
15.0,
`{}`,
[]float64{15.0},
[]string{"{}"},
},
{
`sum by (org_id,app) (rate({app="nginx"} | logfmt | org_id=3677 | unwrap Ingester_TotalReached[1m]))`,
logfmtLine,
true,
15.0,
`{app="nginx", org_id="3677"}`,
[]float64{15.0},
[]string{"{app=\"nginx\", org_id=\"3677\"}"},
},
{
`rate({app="nginx"} | logfmt | org_id=3677 | unwrap Ingester_TotalReached[1m])`,
logfmtLine,
true,
15.0,
`{Ingester_TotalBatches="0", Ingester_TotalChunksMatched="0", app="nginx", caller="spanlogger.go:79", cluster="us-central-west", org_id="3677", traceID="2e5c7234b8640997", ts="2021-02-02T14:35:05.983992774Z"}`,
[]float64{15.0},
[]string{"{Ingester_TotalBatches=\"0\", Ingester_TotalChunksMatched=\"0\", app=\"nginx\", caller=\"spanlogger.go:79\", cluster=\"us-central-west\", org_id=\"3677\", traceID=\"2e5c7234b8640997\", ts=\"2021-02-02T14:35:05.983992774Z\"}"},
},
{
`sum without (org_id,app,cluster)(rate({app="nginx"} | logfmt | org_id=3677 | unwrap Ingester_TotalReached[1m]))`,
logfmtLine,
true,
15.0,
`{Ingester_TotalBatches="0", Ingester_TotalChunksMatched="0", caller="spanlogger.go:79", traceID="2e5c7234b8640997", ts="2021-02-02T14:35:05.983992774Z"}`,
[]float64{15.0},
[]string{"{Ingester_TotalBatches=\"0\", Ingester_TotalChunksMatched=\"0\", caller=\"spanlogger.go:79\", traceID=\"2e5c7234b8640997\", ts=\"2021-02-02T14:35:05.983992774Z\"}"},
},
{
`sum(rate({app="nginx"} | json | remote_user="foo" [1m]))`,
jsonLine,
true,
1.0,
`{}`,
[]float64{1.0},
[]string{"{}"},
},
{
`sum(rate({app="nginx"} | json | nonexistant_field="foo" [1m]))`,
jsonLine,
false,
0,
``,
[]float64{0},
[]string{""},
},
{
`absent_over_time({app="nginx"} | json [1m])`,
jsonLine,
true,
1.0,
`{}`,
[]float64{1.0},
[]string{"{}"},
},
{
`absent_over_time({app="nginx"} | json | nonexistant_field="foo" [1m])`,
jsonLine,
false,
0,
``,
[]float64{0},
[]string{""},
},
{
`absent_over_time({app="nginx"} | json | remote_user="foo" [1m])`,
jsonLine,
true,
1.0,
`{}`,
[]float64{1.0},
[]string{"{}"},
},
{
`sum by (cluster_extracted)(count_over_time({app="nginx"} | json | cluster_extracted="us-east-west" [1m]))`,
jsonLine,
true,
1.0,
`{cluster_extracted="us-east-west"}`,
[]float64{1.0},
[]string{"{cluster_extracted=\"us-east-west\"}"},
},
{
`sum by (cluster_extracted)(count_over_time({app="nginx"} | unpack | cluster_extracted="us-east-west" [1m]))`,
packedLine,
true,
1.0,
`{cluster_extracted="us-east-west"}`,
[]float64{1.0},
[]string{`{cluster_extracted="us-east-west"}`},
},
{
`sum by (cluster_extracted)(count_over_time({app="nginx"} | unpack[1m]))`,
packedLine,
true,
1.0,
`{cluster_extracted="us-east-west"}`,
[]float64{1.0},
[]string{`{cluster_extracted="us-east-west"}`},
},
{
`sum(rate({app="nginx"} | unpack | nonexistant_field="foo" [1m]))`,
packedLine,
false,
0,
``,
[]float64{0},
[]string{""},
},
{
`sum by (message_message,app)(count_over_time({app="nginx"} | json | response_status = 204 and remote_user = "foo"[1m]))`,
jsonLine,
true,
1,
`{app="nginx", message_message="foo"}`,
[]float64{1.0},
[]string{"{app=\"nginx\", message_message=\"foo\"}"},
},
} {
t.Run(tt.expr, func(t *testing.T) {
@ -230,16 +230,19 @@ func Test_ParserHints(t *testing.T) {
expr, err := syntax.ParseSampleExpr(tt.expr)
require.NoError(t, err)
ex, err := expr.Extractor()
exs, err := expr.Extractors()
require.NoError(t, err)
v, lbsRes, ok := ex.ForStream(lbs).Process(0, append([]byte{}, tt.line...))
var lbsResString string
if lbsRes != nil {
lbsResString = lbsRes.String()
for i, ex := range exs {
v, lbsRes, ok := ex.ForStream(lbs).Process(0, append([]byte{}, tt.line...))
var lbsResString string
if lbsRes != nil {
lbsResString = lbsRes.String()
}
require.Equal(t, tt.expectOk, ok)
require.Equal(t, tt.expectVals[i], v)
require.Equal(t, tt.expectLbs[i], lbsResString)
}
require.Equal(t, tt.expectOk, ok)
require.Equal(t, tt.expectVal, v)
require.Equal(t, tt.expectLbs, lbsResString)
})
}
}

@ -88,7 +88,7 @@ func (VectorExpr) isLogSelectorExpr() {}
// SampleExpr is a LogQL expression filtering logs and returning metric samples
type SampleExpr interface {
Selector() (LogSelectorExpr, error)
Extractor() (SampleExtractor, error)
Extractors() ([]SampleExtractor, error)
MatcherGroups() ([]MatcherRange, error)
Expr
@ -101,6 +101,7 @@ func (VectorAggregationExpr) isSampleExpr() {}
func (LiteralExpr) isSampleExpr() {}
func (VectorExpr) isSampleExpr() {}
func (LabelReplaceExpr) isSampleExpr() {}
func (MultiVariantExpr) isSampleExpr() {}
// StageExpr is an expression defining a single step into a log pipeline
type StageExpr interface {
@ -1580,7 +1581,7 @@ func (e *VectorAggregationExpr) Selector() (LogSelectorExpr, error) {
return e.Left.Selector()
}
func (e *VectorAggregationExpr) Extractor() (log.SampleExtractor, error) {
func (e *VectorAggregationExpr) Extractors() ([]SampleExtractor, error) {
if e.err != nil {
return nil, e.err
}
@ -1589,10 +1590,14 @@ func (e *VectorAggregationExpr) Extractor() (log.SampleExtractor, error) {
if r, ok := e.Left.(*RangeAggregationExpr); ok && canInjectVectorGrouping(e.Operation, r.Operation) {
// if the range vec operation has no grouping we can push down the vec one.
if r.Grouping == nil {
return r.extractor(e.Grouping)
ext, err := r.extractor(e.Grouping)
if err != nil {
return []SampleExtractor{}, err
}
return []SampleExtractor{ext}, nil
}
}
return e.Left.Extractor()
return e.Left.Extractors()
}
// canInjectVectorGrouping tells if a vector operation can inject grouping into the nested range vector.
@ -2135,15 +2140,17 @@ func (e *LiteralExpr) String() string {
// literlExpr impls SampleExpr & LogSelectorExpr mainly to reduce the need for more complicated typings
// to facilitate sum types. We'll be type switching when evaluating them anyways
// and they will only be present in binary operation legs.
func (e *LiteralExpr) Selector() (LogSelectorExpr, error) { return e, e.err }
func (e *LiteralExpr) HasFilter() bool { return false }
func (e *LiteralExpr) Shardable(_ bool) bool { return true }
func (e *LiteralExpr) Walk(f WalkFn) { f(e) }
func (e *LiteralExpr) Accept(v RootVisitor) { v.VisitLiteral(e) }
func (e *LiteralExpr) Pipeline() (log.Pipeline, error) { return log.NewNoopPipeline(), nil }
func (e *LiteralExpr) Matchers() []*labels.Matcher { return nil }
func (e *LiteralExpr) MatcherGroups() ([]MatcherRange, error) { return nil, e.err }
func (e *LiteralExpr) Extractor() (log.SampleExtractor, error) { return nil, e.err }
func (e *LiteralExpr) Selector() (LogSelectorExpr, error) { return e, e.err }
func (e *LiteralExpr) HasFilter() bool { return false }
func (e *LiteralExpr) Shardable(_ bool) bool { return true }
func (e *LiteralExpr) Walk(f WalkFn) { f(e) }
func (e *LiteralExpr) Accept(v RootVisitor) { v.VisitLiteral(e) }
func (e *LiteralExpr) Pipeline() (log.Pipeline, error) { return log.NewNoopPipeline(), nil }
func (e *LiteralExpr) Matchers() []*labels.Matcher { return nil }
func (e *LiteralExpr) MatcherGroups() ([]MatcherRange, error) { return nil, e.err }
func (e *LiteralExpr) Extractors() ([]log.SampleExtractor, error) {
return []log.SampleExtractor{}, e.err
}
func (e *LiteralExpr) Value() (float64, error) {
if e.err != nil {
return 0, e.err
@ -2213,11 +2220,11 @@ func (e *LabelReplaceExpr) MatcherGroups() ([]MatcherRange, error) {
return e.Left.MatcherGroups()
}
func (e *LabelReplaceExpr) Extractor() (SampleExtractor, error) {
func (e *LabelReplaceExpr) Extractors() ([]SampleExtractor, error) {
if e.err != nil {
return nil, e.err
return []SampleExtractor{}, e.err
}
return e.Left.Extractor()
return e.Left.Extractors()
}
func (e *LabelReplaceExpr) Shardable(_ bool) bool {
@ -2357,15 +2364,17 @@ func (e *VectorExpr) Value() (float64, error) {
return e.Val, nil
}
func (e *VectorExpr) Selector() (LogSelectorExpr, error) { return e, e.err }
func (e *VectorExpr) HasFilter() bool { return false }
func (e *VectorExpr) Shardable(_ bool) bool { return false }
func (e *VectorExpr) Walk(f WalkFn) { f(e) }
func (e *VectorExpr) Accept(v RootVisitor) { v.VisitVector(e) }
func (e *VectorExpr) Pipeline() (log.Pipeline, error) { return log.NewNoopPipeline(), nil }
func (e *VectorExpr) Matchers() []*labels.Matcher { return nil }
func (e *VectorExpr) MatcherGroups() ([]MatcherRange, error) { return nil, e.err }
func (e *VectorExpr) Extractor() (log.SampleExtractor, error) { return nil, nil }
func (e *VectorExpr) Selector() (LogSelectorExpr, error) { return e, e.err }
func (e *VectorExpr) HasFilter() bool { return false }
func (e *VectorExpr) Shardable(_ bool) bool { return false }
func (e *VectorExpr) Walk(f WalkFn) { f(e) }
func (e *VectorExpr) Accept(v RootVisitor) { v.VisitVector(e) }
func (e *VectorExpr) Pipeline() (log.Pipeline, error) { return log.NewNoopPipeline(), nil }
func (e *VectorExpr) Matchers() []*labels.Matcher { return nil }
func (e *VectorExpr) MatcherGroups() ([]MatcherRange, error) { return nil, e.err }
func (e *VectorExpr) Extractors() ([]log.SampleExtractor, error) { return []log.SampleExtractor{}, nil }
func ReducesLabels(e Expr) (conflict bool) {
e.Walk(func(e Expr) {
@ -2423,13 +2432,15 @@ func groupingReducesLabels(grp *Grouping) bool {
//
//sumtype:decl
type VariantsExpr interface {
Extractors() ([]SampleExtractor, error)
Interval() time.Duration
LogRange() *LogRangeExpr
MatcherGroups() ([]MatcherRange, error)
Matchers() []*labels.Matcher
Variants() []SampleExpr
SetVariant(i int, e SampleExpr) error
Interval() time.Duration
Offset() time.Duration
Extractors() ([]SampleExtractor, error)
SetVariant(i int, e SampleExpr) error
Variants() []SampleExpr
Selector() (LogSelectorExpr, error)
Expr
}
@ -2563,15 +2574,37 @@ func (m *MultiVariantExpr) Pretty(level int) string {
return s
}
func (m *MultiVariantExpr) MatcherGroups() ([]MatcherRange, error) {
xs := m.Matchers()
if len(xs) > 0 {
return []MatcherRange{
{
Matchers: xs,
Interval: m.Interval(),
Offset: m.Offset(),
},
}, nil
}
return nil, nil
}
func (m *MultiVariantExpr) Selector() (LogSelectorExpr, error) {
if m.err != nil {
return nil, m.err
}
return m.logRange.Left, nil
}
func (m *MultiVariantExpr) Extractors() ([]log.SampleExtractor, error) {
extractors := make([]log.SampleExtractor, 0, len(m.variants))
for _, v := range m.variants {
e, err := v.Extractor()
e, err := v.Extractors()
if err != nil {
return nil, err
}
extractors = append(extractors, e)
extractors = append(extractors, e...)
}
return extractors, nil

@ -9,8 +9,12 @@ import (
const UnsupportedErr = "unsupported range vector aggregation operation: %s"
func (r RangeAggregationExpr) Extractor() (log.SampleExtractor, error) {
return r.extractor(nil)
func (r RangeAggregationExpr) Extractors() ([]log.SampleExtractor, error) {
ext, err := r.extractor(nil)
if err != nil {
return []log.SampleExtractor{}, err
}
return []log.SampleExtractor{ext}, nil
}
// extractor creates a SampleExtractor but allows for the grouping to be overridden.

@ -99,8 +99,9 @@ func Test_Extractor(t *testing.T) {
t.Run(tc, func(t *testing.T) {
expr, err := ParseSampleExpr(tc)
require.Nil(t, err)
_, err = expr.Extractor()
extractors, err := expr.Extractors()
require.Nil(t, err)
require.Len(t, extractors, 1)
})
}
}

@ -3489,27 +3489,51 @@ func Benchmark_MetricPipelineCombined(b *testing.B) {
expr, err := ParseSampleExpr(query)
require.Nil(b, err)
p, err := expr.Extractor()
extractors, err := expr.Extractors()
require.Nil(b, err)
sp := p.ForStream(labels.EmptyLabels())
var (
v float64
lbs log.LabelsResult
matches bool
)
in := []byte(`level=debug ts=2020-10-02T10:10:42.092268913Z caller=logging.go:66 traceID=a9d4d8a928d8db1 msg="POST /api/prom/api/v1/query_range (200) 1.5s"`)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v, lbs, matches = sp.Process(0, in)
for _, p := range extractors {
sp := p.ForStream(labels.EmptyLabels())
var (
v float64
lbs log.LabelsResult
matches bool
)
in := []byte(
`level=debug ts=2020-10-02T10:10:42.092268913Z caller=logging.go:66 traceID=a9d4d8a928d8db1 msg="POST /api/prom/api/v1/query_range (200) 1.5s"`,
)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v, lbs, matches = sp.Process(0, in)
}
require.True(b, matches)
require.Equal(
b,
labels.FromStrings(
"caller",
"logging.go:66",
"duration",
"1.5s",
"level",
"debug",
"method",
"POST",
"msg",
"POST /api/prom/api/v1/query_range (200) 1.5s",
"path",
"/api/prom/api/v1/query_range",
"status",
"200",
"traceID",
"a9d4d8a928d8db1",
"ts",
"2020-10-02T10:10:42.092268913Z",
),
lbs.Labels(),
)
require.Equal(b, 1.0, v)
}
require.True(b, matches)
require.Equal(
b,
labels.FromStrings("caller", "logging.go:66", "duration", "1.5s", "level", "debug", "method", "POST", "msg", "POST /api/prom/api/v1/query_range (200) 1.5s", "path", "/api/prom/api/v1/query_range", "status", "200", "traceID", "a9d4d8a928d8db1", "ts", "2020-10-02T10:10:42.092268913Z"),
lbs.Labels(),
)
require.Equal(b, 1.0, v)
}
var c []*labels.Matcher

@ -140,26 +140,31 @@ func processStream(in []logproto.Stream, pipeline log.Pipeline) []logproto.Strea
return streams
}
func processSeries(in []logproto.Stream, ex log.SampleExtractor) ([]logproto.Series, error) {
func processSeries(in []logproto.Stream, ex []log.SampleExtractor) ([]logproto.Series, error) {
resBySeries := map[string]*logproto.Series{}
for _, stream := range in {
exs := ex.ForStream(mustParseLabels(stream.Labels))
for _, e := range stream.Entries {
if f, lbs, ok := exs.Process(e.Timestamp.UnixNano(), []byte(e.Line)); ok {
var s *logproto.Series
var found bool
s, found = resBySeries[lbs.String()]
if !found {
s = &logproto.Series{Labels: lbs.String(), StreamHash: exs.BaseLabels().Hash()}
resBySeries[lbs.String()] = s
for _, extractor := range ex {
exs := extractor.ForStream(mustParseLabels(stream.Labels))
for _, e := range stream.Entries {
if f, lbs, ok := exs.Process(e.Timestamp.UnixNano(), []byte(e.Line)); ok {
var s *logproto.Series
var found bool
s, found = resBySeries[lbs.String()]
if !found {
s = &logproto.Series{
Labels: lbs.String(),
StreamHash: exs.BaseLabels().Hash(),
}
resBySeries[lbs.String()] = s
}
s.Samples = append(s.Samples, logproto.Sample{
Timestamp: e.Timestamp.UnixNano(),
Value: f,
Hash: xxhash.Sum64([]byte(e.Line)),
})
}
s.Samples = append(s.Samples, logproto.Sample{
Timestamp: e.Timestamp.UnixNano(),
Value: f,
Hash: xxhash.Sum64([]byte(e.Line)),
})
}
}
}
@ -183,7 +188,7 @@ func (q MockQuerier) SelectSamples(_ context.Context, req SelectSampleParams) (i
return nil, err
}
extractor, err := expr.Extractor()
extractors, err := expr.Extractors()
if err != nil {
return nil, err
}
@ -217,7 +222,7 @@ outer:
matched = append(matched, stream)
}
filtered, err := processSeries(matched, extractor)
filtered, err := processSeries(matched, extractors)
if err != nil {
return nil, err
}

@ -416,6 +416,31 @@ func (r roundTripper) Do(ctx context.Context, req base.Request) (base.Response,
}
switch e := op.Plan.AST.(type) {
case syntax.VariantsExpr:
if err := validateMaxEntriesLimits(ctx, op.Limit, r.limits); err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error())
}
matchers := e.Matchers()
if err := validateMatchers(ctx, r.limits, matchers); err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error())
}
for _, v := range e.Variants() {
groups, err := v.MatcherGroups()
if err != nil {
level.Warn(logger).Log("msg", "unexpected matcher groups error in roundtripper", "err", err)
}
for _, g := range groups {
if err := validateMatchers(ctx, r.limits, g.Matchers); err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error())
}
}
}
return r.variants.Do(ctx, req)
case syntax.SampleExpr:
// The error will be handled later.
groups, err := e.MatcherGroups()
@ -444,31 +469,6 @@ func (r roundTripper) Do(ctx context.Context, req base.Request) (base.Response,
return r.limited.Do(ctx, req)
}
return r.log.Do(ctx, req)
case syntax.VariantsExpr:
if err := validateMaxEntriesLimits(ctx, op.Limit, r.limits); err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error())
}
matchers := e.Matchers()
if err := validateMatchers(ctx, r.limits, matchers); err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error())
}
for _, v := range e.Variants() {
groups, err := v.MatcherGroups()
if err != nil {
level.Warn(logger).Log("msg", "unexpected matcher groups error in roundtripper", "err", err)
}
for _, g := range groups {
if err := validateMatchers(ctx, r.limits, g.Matchers); err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error())
}
}
}
return r.variants.Do(ctx, req)
default:
return r.next.Do(ctx, req)
}

@ -464,9 +464,9 @@ type sampleBatchIterator struct {
curr iter.SampleIterator
err error
ctx context.Context
cancel context.CancelFunc
extractor syntax.SampleExtractor
ctx context.Context
cancel context.CancelFunc
extractors []syntax.SampleExtractor
}
func newSampleBatchIterator(
@ -476,16 +476,27 @@ func newSampleBatchIterator(
chunks []*LazyChunk,
batchSize int,
matchers []*labels.Matcher,
extractor syntax.SampleExtractor,
start, end time.Time,
chunkFilterer chunk.Filterer,
extractors ...syntax.SampleExtractor,
) (iter.SampleIterator, error) {
ctx, cancel := context.WithCancel(ctx)
return &sampleBatchIterator{
extractor: extractor,
ctx: ctx,
cancel: cancel,
batchChunkIterator: newBatchChunkIterator(ctx, schemas, chunks, batchSize, logproto.FORWARD, start, end, metrics, matchers, chunkFilterer),
extractors: extractors,
ctx: ctx,
cancel: cancel,
batchChunkIterator: newBatchChunkIterator(
ctx,
schemas,
chunks,
batchSize,
logproto.FORWARD,
start,
end,
metrics,
matchers,
chunkFilterer,
),
}, nil
}
@ -560,12 +571,26 @@ func (it *sampleBatchIterator) newChunksIterator(b *chunkBatch) (iter.SampleIter
return iter.NewSortSampleIterator(iters), nil
}
func (it *sampleBatchIterator) buildIterators(chks map[model.Fingerprint][][]*LazyChunk, from, through time.Time, nextChunk *LazyChunk) ([]iter.SampleIterator, error) {
func (it *sampleBatchIterator) buildIterators(
chks map[model.Fingerprint][][]*LazyChunk,
from, through time.Time,
nextChunk *LazyChunk,
) ([]iter.SampleIterator, error) {
result := make([]iter.SampleIterator, 0, len(chks))
for _, chunks := range chks {
if len(chunks) != 0 && len(chunks[0]) != 0 {
streamExtractor := it.extractor.ForStream(labels.NewBuilder(chunks[0][0].Chunk.Metric).Del(labels.MetricName).Labels())
iterator, err := it.buildHeapIterator(chunks, from, through, streamExtractor, nextChunk)
extractors := make([]log.StreamSampleExtractor, 0, len(it.extractors))
for _, extractor := range it.extractors {
extractors = append(
extractors,
extractor.ForStream(
labels.NewBuilder(chunks[0][0].Chunk.Metric).
Del(labels.MetricName).
Labels(),
),
)
}
iterator, err := it.buildHeapIterator(chunks, from, through, extractors, nextChunk)
if err != nil {
return nil, err
}
@ -576,7 +601,12 @@ func (it *sampleBatchIterator) buildIterators(chks map[model.Fingerprint][][]*La
return result, nil
}
func (it *sampleBatchIterator) buildHeapIterator(chks [][]*LazyChunk, from, through time.Time, streamExtractor log.StreamSampleExtractor, nextChunk *LazyChunk) (iter.SampleIterator, error) {
func (it *sampleBatchIterator) buildHeapIterator(
chks [][]*LazyChunk,
from, through time.Time,
streamExtractors []log.StreamSampleExtractor,
nextChunk *LazyChunk,
) (iter.SampleIterator, error) {
result := make([]iter.SampleIterator, 0, len(chks))
for i := range chks {
@ -585,7 +615,7 @@ func (it *sampleBatchIterator) buildHeapIterator(chks [][]*LazyChunk, from, thro
if !chks[i][j].IsValid {
continue
}
iterator, err := chks[i][j].SampleIterator(it.ctx, from, through, streamExtractor, nextChunk)
iterator, err := chks[i][j].SampleIterator(it.ctx, from, through, nextChunk, streamExtractors...)
if err != nil {
return nil, err
}

@ -1418,7 +1418,18 @@ func Test_newSampleBatchChunkIterator(t *testing.T) {
ex, err := log.NewLineSampleExtractor(log.CountExtractor, nil, nil, false, false)
require.NoError(t, err)
it, err := newSampleBatchIterator(context.Background(), s, NilMetrics, tt.chunks, tt.batchSize, newMatchers(tt.matchers), ex, tt.start, tt.end, nil)
it, err := newSampleBatchIterator(
context.Background(),
s,
NilMetrics,
tt.chunks,
tt.batchSize,
newMatchers(tt.matchers),
tt.start,
tt.end,
nil,
ex,
)
require.NoError(t, err)
series, _, err := iter.ReadSampleBatch(it, 1000)
_ = it.Close()

@ -115,8 +115,8 @@ func (c *LazyChunk) Iterator(
func (c *LazyChunk) SampleIterator(
ctx context.Context,
from, through time.Time,
extractor log.StreamSampleExtractor,
nextChunk *LazyChunk,
extractors ...log.StreamSampleExtractor,
) (iter.SampleIterator, error) {
// If the chunk is not already loaded, then error out.
if c.Chunk.Data == nil {
@ -140,7 +140,7 @@ func (c *LazyChunk) SampleIterator(
// if the block is overlapping cache it with the next chunk boundaries.
if nextChunk != nil && IsBlockOverlapping(b, nextChunk, logproto.FORWARD) {
// todo(cyriltovena) we can avoid to drop the metric name for each chunks since many chunks have the same metric/labelset.
it := iter.NewCachedSampleIterator(b.SampleIterator(ctx, extractor), b.Entries())
it := iter.NewCachedSampleIterator(b.SampleIterator(ctx, extractors...), b.Entries())
its = append(its, it)
if c.overlappingSampleBlocks == nil {
c.overlappingSampleBlocks = make(map[int]iter.CacheSampleIterator)
@ -160,7 +160,7 @@ func (c *LazyChunk) SampleIterator(
}
}
// non-overlapping block with the next chunk are not cached.
its = append(its, b.SampleIterator(ctx, extractor))
its = append(its, b.SampleIterator(ctx, extractors...))
}
// build the final iterator bound to the requested time range.

@ -206,7 +206,7 @@ func (fakeBlock) Iterator(context.Context, log.StreamPipeline) iter.EntryIterato
return nil
}
func (fakeBlock) SampleIterator(context.Context, log.StreamSampleExtractor) iter.SampleIterator {
func (fakeBlock) SampleIterator(_ context.Context, _ ...log.StreamSampleExtractor) iter.SampleIterator {
return nil
}

@ -551,23 +551,28 @@ func (s *LokiStore) SelectSamples(ctx context.Context, req logql.SelectSamplePar
return nil, err
}
extractor, err := expr.Extractor()
extractors, err := expr.Extractors()
if err != nil {
return nil, err
}
extractor, err = deletion.SetupExtractor(req, extractor)
if err != nil {
return nil, err
}
if s.extractorWrapper != nil && httpreq.ExtractHeader(ctx, httpreq.LokiDisablePipelineWrappersHeader) != "true" {
userID, err := tenant.TenantID(ctx)
for i, extractor := range extractors {
extractor, err = deletion.SetupExtractor(req, extractor)
if err != nil {
return nil, err
}
extractor = s.extractorWrapper.Wrap(ctx, extractor, req.Plan.String(), userID)
if s.extractorWrapper != nil &&
httpreq.ExtractHeader(ctx, httpreq.LokiDisablePipelineWrappersHeader) != "true" {
userID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
}
extractor = s.extractorWrapper.Wrap(ctx, extractor, req.Plan.String(), userID)
}
extractors[i] = extractor
}
var chunkFilterer chunk.Filterer
@ -575,7 +580,18 @@ func (s *LokiStore) SelectSamples(ctx context.Context, req logql.SelectSamplePar
chunkFilterer = s.chunkFilterer.ForRequest(ctx)
}
return newSampleBatchIterator(ctx, s.schemaCfg, s.chunkMetrics, lazyChunks, s.cfg.MaxChunkBatchSize, matchers, extractor, req.Start, req.End, chunkFilterer)
return newSampleBatchIterator(
ctx,
s.schemaCfg,
s.chunkMetrics,
lazyChunks,
s.cfg.MaxChunkBatchSize,
matchers,
req.Start,
req.End,
chunkFilterer,
extractors...,
)
}
func (s *LokiStore) GetSchemaConfigs() []config.PeriodConfig {

@ -20,12 +20,12 @@ func SetupPipeline(req logql.SelectLogParams, p log.Pipeline) (log.Pipeline, err
return log.NewFilteringPipeline(filters, p), nil
}
func SetupExtractor(req logql.SelectSampleParams, se log.SampleExtractor) (log.SampleExtractor, error) {
if len(req.Deletes) == 0 {
func SetupExtractor(req logql.QueryParams, se log.SampleExtractor) (log.SampleExtractor, error) {
if len(req.GetDeletes()) == 0 {
return se, nil
}
filters, err := deleteFilters(req.Deletes)
filters, err := deleteFilters(req.GetDeletes())
if err != nil {
return nil, err
}

Loading…
Cancel
Save