Fixes race conditions in the batch iterator. (#2773)

* Adds logfmt, regexp and json logql parser

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* hook the ast with parsers.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* hook parser with memchunk.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* hook parser with the storage.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* hook parser with ingesters

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* fixes all tests

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Refactor to pipeline and implement ast parsing.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fixes the lexer for duration and range

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fixes all tests and add some for label filters

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Add label and line format.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Add tests for fmt label and line with validations.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Polishing parsers and add some more test cases

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Finish the unwrap parser, still need to add more tests

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Indent this hell.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Moar tests and it works.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Add more tests which lead me to find a bug in the lexer

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Add more tests and fix all engine tests

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fixes match stage in promtail pipelines.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Hook Pipeline into ingester, tailer and storage.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Correctly setup sharding for logqlv2

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fixes precedences issue with label filters and add moar tests ✌️

* Adds quantile_over_time, grouping for non associate range aggregation parsing and moar tests

* Extract with grouping

* Adds parsing duration on unwrap

* Improve the lexer to support more common identifier as functions.

Also add duration convertion for unwrap.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fixes the frontend logs to include org_id.

The auth middleware was happening after the stats one and so org_id was not set 🤦.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Support byte sizes in label filters.

This patch extends the duration label filter with support for byte sizes
such as `1kB` and `42MiB`.

* Wip on error handling.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fixes json parser with prometheus label name rules.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* fixup! Support byte sizes in label filters.

* Wip error handling, commit before big refactoring.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Refactoring in progress.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Work in progress.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Got something that builds and throw __error__ labels properly now.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Add error handling + fixes groupins and post filtering.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* 400 on pipeline errors.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fixes a races in the log pipeline.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Unsure the key is parsable and valid.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Cleanup and code documentation.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Lint.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Lint.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fixes frontend handler.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fixes old test.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fix go1.15 local failing test.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fixes race conditions in the batch iterator.

We should never advance an iterator in parallel. Unfortunately before the code was building iterators while advancing previous one, building iterator can advance iterator and thus creates a race condition. This changeset make sure we only fetch chunks in advance and build iterator and iterate over them in sequence.

Also add support for labels in the cacheIterator which is required for logqlv2.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

Co-authored-by: Karsten Jeschkies <k@jeschkies.xyz>
pull/2774/head^2
Cyril Tovena 5 years ago committed by GitHub
parent e4e52285fe
commit fe7aadfb9d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 135
      pkg/iter/cache.go
  2. 25
      pkg/iter/cache_test.go
  3. 293
      pkg/storage/batch.go
  4. 15
      pkg/storage/batch_test.go
  5. 22
      pkg/storage/lazy_chunk.go

@ -1,66 +1,56 @@
package storage
package iter
import (
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
)
type CacheEntryIterator interface {
EntryIterator
Reset()
}
// cachedIterator is an iterator that caches iteration to be replayed later on.
type cachedIterator struct {
cache []*logproto.Entry
base iter.EntryIterator
cache []entryWithLabels
base EntryIterator // once set to nil it means we have to use the cache.
labels string
curr int
curr int
closeErr error
iterErr error
}
// newCachedIterator creates an iterator that cache iteration result and can be iterated again
// NewCachedIterator creates an iterator that cache iteration result and can be iterated again
// after closing it without re-using the underlaying iterator `it`.
// The cache iterator should be used for entries that belongs to the same stream only.
func newCachedIterator(it iter.EntryIterator, cap int) *cachedIterator {
func NewCachedIterator(it EntryIterator, cap int) CacheEntryIterator {
c := &cachedIterator{
base: it,
cache: make([]*logproto.Entry, 0, cap),
cache: make([]entryWithLabels, 0, cap),
curr: -1,
}
c.load()
return c
}
func (it *cachedIterator) reset() {
func (it *cachedIterator) Reset() {
it.curr = -1
}
func (it *cachedIterator) load() {
func (it *cachedIterator) Next() bool {
if it.base != nil {
defer func() {
ok := it.base.Next()
// we're done with the base iterator.
if !ok {
it.closeErr = it.base.Close()
it.iterErr = it.base.Error()
it.base = nil
it.reset()
}()
// set labels using the first entry
if !it.base.Next() {
return
return false
}
it.labels = it.base.Labels()
// add all entries until the base iterator is exhausted
for {
e := it.base.Entry()
it.cache = append(it.cache, &e)
if !it.base.Next() {
break
}
}
// we're caching entries
it.cache = append(it.cache, entryWithLabels{entry: it.base.Entry(), labels: it.base.Labels()})
it.curr++
return true
}
}
func (it *cachedIterator) Next() bool {
// second pass
if len(it.cache) == 0 {
it.cache = nil
return false
@ -73,33 +63,38 @@ func (it *cachedIterator) Next() bool {
}
func (it *cachedIterator) Entry() logproto.Entry {
if len(it.cache) == 0 {
if len(it.cache) == 0 || it.curr < 0 {
return logproto.Entry{}
}
if it.curr < 0 {
return *it.cache[0]
}
return *it.cache[it.curr]
return it.cache[it.curr].entry
}
func (it *cachedIterator) Labels() string {
return it.labels
if len(it.cache) == 0 || it.curr < 0 {
return ""
}
return it.cache[it.curr].labels
}
func (it *cachedIterator) Error() error { return it.iterErr }
func (it *cachedIterator) Close() error {
it.reset()
it.Reset()
return it.closeErr
}
type CacheSampleIterator interface {
SampleIterator
Reset()
}
// cachedIterator is an iterator that caches iteration to be replayed later on.
type cachedSampleIterator struct {
cache []logproto.Sample
base iter.SampleIterator
cache []sampleWithLabels
base SampleIterator
labels string
curr int
curr int
closeErr error
iterErr error
@ -107,47 +102,35 @@ type cachedSampleIterator struct {
// newSampleCachedIterator creates an iterator that cache iteration result and can be iterated again
// after closing it without re-using the underlaying iterator `it`.
// The cache iterator should be used for entries that belongs to the same stream only.
func newCachedSampleIterator(it iter.SampleIterator, cap int) *cachedSampleIterator {
func NewCachedSampleIterator(it SampleIterator, cap int) CacheSampleIterator {
c := &cachedSampleIterator{
base: it,
cache: make([]logproto.Sample, 0, cap),
cache: make([]sampleWithLabels, 0, cap),
curr: -1,
}
c.load()
return c
}
func (it *cachedSampleIterator) reset() {
func (it *cachedSampleIterator) Reset() {
it.curr = -1
}
func (it *cachedSampleIterator) load() {
func (it *cachedSampleIterator) Next() bool {
if it.base != nil {
defer func() {
ok := it.base.Next()
// we're done with the base iterator.
if !ok {
it.closeErr = it.base.Close()
it.iterErr = it.base.Error()
it.base = nil
it.reset()
}()
// set labels using the first entry
if !it.base.Next() {
return
return false
}
it.labels = it.base.Labels()
// add all entries until the base iterator is exhausted
for {
it.cache = append(it.cache, it.base.Sample())
if !it.base.Next() {
break
}
}
// we're caching entries
it.cache = append(it.cache, sampleWithLabels{Sample: it.base.Sample(), labels: it.base.Labels()})
it.curr++
return true
}
}
func (it *cachedSampleIterator) Next() bool {
// second pass
if len(it.cache) == 0 {
it.cache = nil
return false
@ -160,22 +143,22 @@ func (it *cachedSampleIterator) Next() bool {
}
func (it *cachedSampleIterator) Sample() logproto.Sample {
if len(it.cache) == 0 {
if len(it.cache) == 0 || it.curr < 0 {
return logproto.Sample{}
}
if it.curr < 0 {
return it.cache[0]
}
return it.cache[it.curr]
return it.cache[it.curr].Sample
}
func (it *cachedSampleIterator) Labels() string {
return it.labels
if len(it.cache) == 0 || it.curr < 0 {
return ""
}
return it.cache[it.curr].labels
}
func (it *cachedSampleIterator) Error() error { return it.iterErr }
func (it *cachedSampleIterator) Close() error {
it.reset()
it.Reset()
return it.closeErr
}

@ -1,4 +1,4 @@
package storage
package iter
import (
"errors"
@ -7,7 +7,6 @@ import (
"github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
)
@ -20,12 +19,11 @@ func Test_CachedIterator(t *testing.T) {
{Timestamp: time.Unix(0, 3), Line: "3"},
},
}
c := newCachedIterator(iter.NewStreamIterator(stream), 3)
c := NewCachedIterator(NewStreamIterator(stream), 3)
assert := func() {
// we should crash for call of entry without next although that's not expected.
require.Equal(t, stream.Labels, c.Labels())
require.Equal(t, stream.Entries[0], c.Entry())
require.Equal(t, "", c.Labels())
require.Equal(t, logproto.Entry{}, c.Entry())
require.Equal(t, true, c.Next())
require.Equal(t, stream.Entries[0], c.Entry())
require.Equal(t, true, c.Next())
@ -48,7 +46,7 @@ func Test_CachedIterator(t *testing.T) {
func Test_EmptyCachedIterator(t *testing.T) {
c := newCachedIterator(iter.NoopIterator, 0)
c := NewCachedIterator(NoopIterator, 0)
require.Equal(t, "", c.Labels())
require.Equal(t, logproto.Entry{}, c.Entry())
@ -68,7 +66,7 @@ func Test_EmptyCachedIterator(t *testing.T) {
func Test_ErrorCachedIterator(t *testing.T) {
c := newCachedIterator(&errorIter{}, 0)
c := NewCachedIterator(&errorIter{}, 0)
require.Equal(t, false, c.Next())
require.Equal(t, "", c.Labels())
@ -86,12 +84,11 @@ func Test_CachedSampleIterator(t *testing.T) {
{Timestamp: time.Unix(0, 3).UnixNano(), Hash: 3, Value: 3.},
},
}
c := newCachedSampleIterator(iter.NewSeriesIterator(series), 3)
c := NewCachedSampleIterator(NewSeriesIterator(series), 3)
assert := func() {
// we should crash for call of entry without next although that's not expected.
require.Equal(t, series.Labels, c.Labels())
require.Equal(t, series.Samples[0], c.Sample())
require.Equal(t, "", c.Labels())
require.Equal(t, logproto.Sample{}, c.Sample())
require.Equal(t, true, c.Next())
require.Equal(t, series.Samples[0], c.Sample())
require.Equal(t, true, c.Next())
@ -114,7 +111,7 @@ func Test_CachedSampleIterator(t *testing.T) {
func Test_EmptyCachedSampleIterator(t *testing.T) {
c := newCachedSampleIterator(iter.NoopIterator, 0)
c := NewCachedSampleIterator(NoopIterator, 0)
require.Equal(t, "", c.Labels())
require.Equal(t, logproto.Sample{}, c.Sample())
@ -134,7 +131,7 @@ func Test_EmptyCachedSampleIterator(t *testing.T) {
func Test_ErrorCachedSampleIterator(t *testing.T) {
c := newCachedSampleIterator(&errorIter{}, 0)
c := NewCachedSampleIterator(&errorIter{}, 0)
require.Equal(t, false, c.Next())
require.Equal(t, "", c.Labels())

@ -73,15 +73,6 @@ func NewChunkMetrics(r prometheus.Registerer, maxBatchSize int) *ChunkMetrics {
}
}
type genericIterator interface {
Next() bool
Labels() string
Error() error
Close() error
}
type chunksIteratorFactory func(chunks []*LazyChunk, from, through time.Time, nextChunk *LazyChunk) (genericIterator, error)
// batchChunkIterator is an EntryIterator that iterates through chunks by batch of `batchSize`.
// Since chunks can overlap across batches for each iteration the iterator will keep all overlapping
// chunks with the next chunk from the next batch and added it to the next iteration. In this case the boundaries of the batch
@ -89,20 +80,15 @@ type chunksIteratorFactory func(chunks []*LazyChunk, from, through time.Time, ne
type batchChunkIterator struct {
chunks lazyChunks
batchSize int
err error
curr genericIterator
lastOverlapping []*LazyChunk
iterFactory chunksIteratorFactory
metrics *ChunkMetrics
matchers []*labels.Matcher
begun bool
ctx context.Context
cancel context.CancelFunc
start, end time.Time
direction logproto.Direction
next chan *struct {
iter genericIterator
err error
}
next chan *chunkBatch
}
// newBatchChunkIterator creates a new batch iterator with the given batchSize.
@ -112,24 +98,23 @@ func newBatchChunkIterator(
batchSize int,
direction logproto.Direction,
start, end time.Time,
iterFactory chunksIteratorFactory,
metrics *ChunkMetrics,
matchers []*labels.Matcher,
) *batchChunkIterator {
ctx, cancel := context.WithCancel(ctx)
// __name__ is not something we filter by because it's a constant in loki
// and only used for upstream compatibility; therefore remove it.
// The same applies to the sharding label which is injected by the cortex storage code.
matchers = removeMatchersByName(matchers, labels.MetricName, astmapper.ShardLabel)
res := &batchChunkIterator{
batchSize: batchSize,
start: start,
end: end,
direction: direction,
ctx: ctx,
cancel: cancel,
iterFactory: iterFactory,
chunks: lazyChunks{direction: direction, chunks: chunks},
next: make(chan *struct {
iter genericIterator
err error
}),
metrics: metrics,
matchers: matchers,
start: start,
end: end,
direction: direction,
ctx: ctx,
chunks: lazyChunks{direction: direction, chunks: chunks},
next: make(chan *chunkBatch),
}
sort.Sort(res.chunks)
return res
@ -149,54 +134,21 @@ func (it *batchChunkIterator) loop(ctx context.Context) {
close(it.next)
return
}
next, err := it.nextBatch()
select {
case <-ctx.Done():
close(it.next)
// next can be nil if we are waiting to return that the nextBatch was empty and the context is closed
// or if another error occurred reading nextBatch
if next == nil {
return
}
err = next.Close()
if err != nil {
level.Error(util.WithContext(ctx, util.Logger)).Log("msg", "Failed to close the pre-fetched iterator when pre-fetching was canceled", "err", err)
}
return
case it.next <- &struct {
iter genericIterator
err error
}{next, err}:
case it.next <- it.nextBatch():
}
}
}
func (it *batchChunkIterator) Next() bool {
func (it *batchChunkIterator) Next() *chunkBatch {
it.Start() // Ensure the iterator has started.
var err error
// for loop to avoid recursion
for {
if it.curr != nil && it.curr.Next() {
return true
}
// close previous iterator
if it.curr != nil {
it.err = it.curr.Close()
}
next := <-it.next
if next == nil {
return false
}
it.curr = next.iter
if next.err != nil {
it.err = err
return false
}
}
return <-it.next
}
func (it *batchChunkIterator) nextBatch() (genericIterator, error) {
func (it *batchChunkIterator) nextBatch() *chunkBatch {
// the first chunk of the batch
headChunk := it.chunks.Peek()
from, through := it.start, it.end
@ -306,38 +258,35 @@ func (it *batchChunkIterator) nextBatch() (genericIterator, error) {
}
}
}
// create the new chunks iterator from the current batch.
return it.iterFactory(batch, from, through, nextChunk)
}
func (it *batchChunkIterator) Labels() string {
return it.curr.Labels()
}
func (it *batchChunkIterator) Error() error {
if it.err != nil {
return it.err
// download chunk for this batch.
chksBySeries, err := fetchChunkBySeries(it.ctx, it.metrics, batch, it.matchers)
if err != nil {
return &chunkBatch{err: err}
}
if it.curr != nil {
return it.curr.Error()
return &chunkBatch{
chunksBySeries: chksBySeries,
err: err,
from: from,
through: through,
nextChunk: nextChunk,
}
return nil
}
func (it *batchChunkIterator) Close() error {
it.cancel()
if it.curr != nil {
return it.curr.Close()
}
return nil
type chunkBatch struct {
chunksBySeries map[model.Fingerprint][][]*LazyChunk
err error
from, through time.Time
nextChunk *LazyChunk
}
type logBatchIterator struct {
*batchChunkIterator
curr iter.EntryIterator
err error
ctx context.Context
metrics *ChunkMetrics
matchers []*labels.Matcher
cancel context.CancelFunc
pipeline logql.Pipeline
}
@ -351,37 +300,71 @@ func newLogBatchIterator(
direction logproto.Direction,
start, end time.Time,
) (iter.EntryIterator, error) {
// __name__ is not something we filter by because it's a constant in loki
// and only used for upstream compatibility; therefore remove it.
// The same applies to the sharding label which is injected by the cortex storage code.
matchers = removeMatchersByName(matchers, labels.MetricName, astmapper.ShardLabel)
logbatch := &logBatchIterator{
matchers: matchers,
pipeline: pipeline,
metrics: metrics,
ctx: ctx,
ctx, cancel := context.WithCancel(ctx)
return &logBatchIterator{
pipeline: pipeline,
ctx: ctx,
cancel: cancel,
batchChunkIterator: newBatchChunkIterator(ctx, chunks, batchSize, direction, start, end, metrics, matchers),
}, nil
}
func (it *logBatchIterator) Labels() string {
return it.curr.Labels()
}
func (it *logBatchIterator) Error() error {
if it.err != nil {
return it.err
}
if it.curr != nil {
return it.curr.Error()
}
return nil
}
batch := newBatchChunkIterator(ctx, chunks, batchSize, direction, start, end, logbatch.newChunksIterator)
// Important: since the batchChunkIterator is bound to the LogBatchIterator,
// ensure embedded fields are present before it's started.
logbatch.batchChunkIterator = batch
batch.Start()
return logbatch, nil
func (it *logBatchIterator) Close() error {
it.cancel()
if it.curr != nil {
return it.curr.Close()
}
return nil
}
func (it *logBatchIterator) Entry() logproto.Entry {
return it.curr.(iter.EntryIterator).Entry()
return it.curr.Entry()
}
// newChunksIterator creates an iterator over a set of lazychunks.
func (it *logBatchIterator) newChunksIterator(chunks []*LazyChunk, from, through time.Time, nextChunk *LazyChunk) (genericIterator, error) {
chksBySeries, err := fetchChunkBySeries(it.ctx, it.metrics, chunks, it.matchers)
if err != nil {
return nil, err
func (it *logBatchIterator) Next() bool {
// for loop to avoid recursion
for {
if it.curr != nil && it.curr.Next() {
return true
}
// close previous iterator
if it.curr != nil {
it.err = it.curr.Close()
}
next := it.batchChunkIterator.Next()
if next == nil {
return false
}
if next.err != nil {
it.err = next.err
return false
}
var err error
it.curr, err = it.newChunksIterator(next)
if err != nil {
it.err = err
return false
}
}
}
iters, err := it.buildIterators(chksBySeries, from, through, nextChunk)
// newChunksIterator creates an iterator over a set of lazychunks.
func (it *logBatchIterator) newChunksIterator(b *chunkBatch) (iter.EntryIterator, error) {
iters, err := it.buildIterators(b.chunksBySeries, b.from, b.through, b.nextChunk)
if err != nil {
return nil, err
}
@ -432,10 +415,11 @@ func (it *logBatchIterator) buildHeapIterator(chks [][]*LazyChunk, from, through
type sampleBatchIterator struct {
*batchChunkIterator
curr iter.SampleIterator
err error
ctx context.Context
metrics *ChunkMetrics
matchers []*labels.Matcher
cancel context.CancelFunc
extractor logql.SampleExtractor
}
@ -448,37 +432,72 @@ func newSampleBatchIterator(
extractor logql.SampleExtractor,
start, end time.Time,
) (iter.SampleIterator, error) {
// __name__ is not something we filter by because it's a constant in loki
// and only used for upstream compatibility; therefore remove it.
// The same applies to the sharding label which is injected by the cortex storage code.
matchers = removeMatchersByName(matchers, labels.MetricName, astmapper.ShardLabel)
ctx, cancel := context.WithCancel(ctx)
return &sampleBatchIterator{
extractor: extractor,
ctx: ctx,
cancel: cancel,
batchChunkIterator: newBatchChunkIterator(ctx, chunks, batchSize, logproto.FORWARD, start, end, metrics, matchers),
}, nil
}
samplebatch := &sampleBatchIterator{
matchers: matchers,
extractor: extractor,
metrics: metrics,
ctx: ctx,
func (it *sampleBatchIterator) Labels() string {
return it.curr.Labels()
}
func (it *sampleBatchIterator) Error() error {
if it.err != nil {
return it.err
}
batch := newBatchChunkIterator(ctx, chunks, batchSize, logproto.FORWARD, start, end, samplebatch.newChunksIterator)
if it.curr != nil {
return it.curr.Error()
}
return nil
}
// Important: since the batchChunkIterator is bound to the SampleBatchIterator,
// ensure embedded fields are present before it's started.
samplebatch.batchChunkIterator = batch
batch.Start()
return samplebatch, nil
func (it *sampleBatchIterator) Close() error {
it.cancel()
if it.curr != nil {
return it.curr.Close()
}
return nil
}
func (it *sampleBatchIterator) Sample() logproto.Sample {
return it.curr.(iter.SampleIterator).Sample()
return it.curr.Sample()
}
// newChunksIterator creates an iterator over a set of lazychunks.
func (it *sampleBatchIterator) newChunksIterator(chunks []*LazyChunk, from, through time.Time, nextChunk *LazyChunk) (genericIterator, error) {
chksBySeries, err := fetchChunkBySeries(it.ctx, it.metrics, chunks, it.matchers)
if err != nil {
return nil, err
func (it *sampleBatchIterator) Next() bool {
// for loop to avoid recursion
for {
if it.curr != nil && it.curr.Next() {
return true
}
// close previous iterator
if it.curr != nil {
it.err = it.curr.Close()
}
next := it.batchChunkIterator.Next()
if next == nil {
return false
}
if next.err != nil {
it.err = next.err
return false
}
var err error
it.curr, err = it.newChunksIterator(next)
if err != nil {
it.err = err
return false
}
}
iters, err := it.buildIterators(chksBySeries, from, through, nextChunk)
}
// newChunksIterator creates an iterator over a set of lazychunks.
func (it *sampleBatchIterator) newChunksIterator(b *chunkBatch) (iter.SampleIterator, error) {
iters, err := it.buildIterators(b.chunksBySeries, b.from, b.through, b.nextChunk)
if err != nil {
return nil, err
}

@ -41,27 +41,16 @@ func Test_batchIterSafeStart(t *testing.T) {
newLazyChunk(stream),
}
var ok bool
batch := newBatchChunkIterator(context.Background(), chks, 1, logproto.FORWARD, from, from.Add(4*time.Millisecond), func(chunks []*LazyChunk, from, through time.Time, nextChunk *LazyChunk) (genericIterator, error) {
if !ok {
panic("unexpected")
}
// we don't care about the actual data for this test, just give it an iterator.
return iter.NewStreamIterator(stream), nil
})
batch := newBatchChunkIterator(context.Background(), chks, 1, logproto.FORWARD, from, from.Add(4*time.Millisecond), NilMetrics, []*labels.Matcher{})
// if it was started already, we should see a panic before this
time.Sleep(time.Millisecond)
ok = true
// ensure idempotency
batch.Start()
batch.Start()
ok = batch.Next()
require.Equal(t, true, ok)
require.NotNil(t, batch.Next())
}

@ -22,8 +22,8 @@ type LazyChunk struct {
// cache of overlapping block.
// We use the offset of the block as key since it's unique per chunk.
overlappingBlocks map[int]*cachedIterator
overlappingSampleBlocks map[int]*cachedSampleIterator
overlappingBlocks map[int]iter.CacheEntryIterator
overlappingSampleBlocks map[int]iter.CacheSampleIterator
}
// Iterator returns an entry iterator.
@ -52,18 +52,17 @@ func (c *LazyChunk) Iterator(
for _, b := range blocks {
// if we have already processed and cache block let's use it.
if cache, ok := c.overlappingBlocks[b.Offset()]; ok {
clone := *cache
clone.reset()
its = append(its, &clone)
cache.Reset()
its = append(its, cache)
continue
}
// if the block is overlapping cache it with the next chunk boundaries.
if nextChunk != nil && IsBlockOverlapping(b, nextChunk, direction) {
// todo(cyriltovena) we can avoid to drop the metric name for each chunks since many chunks have the same metric/labelset.
it := newCachedIterator(b.Iterator(ctx, dropLabels(c.Chunk.Metric, labels.MetricName), pipeline), b.Entries())
it := iter.NewCachedIterator(b.Iterator(ctx, dropLabels(c.Chunk.Metric, labels.MetricName), pipeline), b.Entries())
its = append(its, it)
if c.overlappingBlocks == nil {
c.overlappingBlocks = make(map[int]*cachedIterator)
c.overlappingBlocks = make(map[int]iter.CacheEntryIterator)
}
c.overlappingBlocks[b.Offset()] = it
continue
@ -114,18 +113,17 @@ func (c *LazyChunk) SampleIterator(
for _, b := range blocks {
// if we have already processed and cache block let's use it.
if cache, ok := c.overlappingSampleBlocks[b.Offset()]; ok {
clone := *cache
clone.reset()
its = append(its, &clone)
cache.Reset()
its = append(its, cache)
continue
}
// if the block is overlapping cache it with the next chunk boundaries.
if nextChunk != nil && IsBlockOverlapping(b, nextChunk, logproto.FORWARD) {
// todo(cyriltovena) we can avoid to drop the metric name for each chunks since many chunks have the same metric/labelset.
it := newCachedSampleIterator(b.SampleIterator(ctx, dropLabels(c.Chunk.Metric, labels.MetricName), extractor), b.Entries())
it := iter.NewCachedSampleIterator(b.SampleIterator(ctx, dropLabels(c.Chunk.Metric, labels.MetricName), extractor), b.Entries())
its = append(its, it)
if c.overlappingSampleBlocks == nil {
c.overlappingSampleBlocks = make(map[int]*cachedSampleIterator)
c.overlappingSampleBlocks = make(map[int]iter.CacheSampleIterator)
}
c.overlappingSampleBlocks[b.Offset()] = it
continue

Loading…
Cancel
Save