From cb20afaa26d1e8fd285554129e32843a21afbe5a Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Wed, 5 Aug 2020 10:07:14 -0400 Subject: [PATCH] BatchIter edge cases (#2466) * sampleIter uses correct cache * minimal lazy chunk iterator test * memchunk Blocks() inclusivity * fixes a few edge cases in the batchiterator batching * fixes bad metric name in test * due to later chunks len check, resetting nextChunk is unnecessary * lazychunks pop test * safe starting of batchChunkIterator * batchiter rudimentary safe start test --- pkg/chunkenc/memchunk.go | 2 +- pkg/chunkenc/memchunk_test.go | 12 +++ pkg/storage/batch.go | 32 +++++++- pkg/storage/batch_test.go | 144 +++++++++++++++++++++++++++++++++ pkg/storage/lazy_chunk.go | 2 +- pkg/storage/lazy_chunk_test.go | 69 ++++++++++++++++ pkg/storage/util_test.go | 2 +- 7 files changed, 257 insertions(+), 6 deletions(-) diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index 407cd3add9..6a96dc5f53 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -532,7 +532,7 @@ func (c *MemChunk) Blocks(mintT, maxtT time.Time) []Block { blocks := make([]Block, 0, len(c.blocks)) for _, b := range c.blocks { - if maxt > b.mint && b.maxt > mint { + if maxt >= b.mint && b.maxt >= mint { blocks = append(blocks, b) } } diff --git a/pkg/chunkenc/memchunk_test.go b/pkg/chunkenc/memchunk_test.go index b05b477811..f8b93057db 100644 --- a/pkg/chunkenc/memchunk_test.go +++ b/pkg/chunkenc/memchunk_test.go @@ -37,6 +37,18 @@ var ( testTargetSize = 1500 * 1024 ) +func TestBlocksInclusive(t *testing.T) { + chk := NewMemChunk(EncNone, testBlockSize, testTargetSize) + err := chk.Append(logprotoEntry(1, "1")) + require.Nil(t, err) + err = chk.cut() + require.Nil(t, err) + + blocks := chk.Blocks(time.Unix(0, 1), time.Unix(0, 1)) + require.Equal(t, 1, len(blocks)) + require.Equal(t, 1, blocks[0].Entries()) +} + func TestBlock(t *testing.T) { for _, enc := range testEncoding { t.Run(enc.String(), func(t *testing.T) { diff --git a/pkg/storage/batch.go b/pkg/storage/batch.go index 1578029aa9..9c8462ecf1 100644 --- a/pkg/storage/batch.go +++ b/pkg/storage/batch.go @@ -43,6 +43,8 @@ type batchChunkIterator struct { lastOverlapping []*LazyChunk iterFactory chunksIteratorFactory + begun bool + ctx context.Context cancel context.CancelFunc start, end time.Time direction logproto.Direction @@ -69,6 +71,7 @@ func newBatchChunkIterator( start: start, end: end, direction: direction, + ctx: ctx, cancel: cancel, iterFactory: iterFactory, chunks: lazyChunks{direction: direction, chunks: chunks}, @@ -78,10 +81,17 @@ func newBatchChunkIterator( }), } sort.Sort(res.chunks) - go res.loop(ctx) return res } +// Start is idempotent and will begin the processing thread which seeds the iterator data. +func (it *batchChunkIterator) Start() { + if !it.begun { + it.begun = true + go it.loop(it.ctx) + } +} + func (it *batchChunkIterator) loop(ctx context.Context) { for { if it.chunks.Len() == 0 { @@ -111,6 +121,8 @@ func (it *batchChunkIterator) loop(ctx context.Context) { } func (it *batchChunkIterator) Next() bool { + it.Start() // Ensure the iterator has started. + var err error // for loop to avoid recursion for { @@ -140,18 +152,22 @@ func (it *batchChunkIterator) nextBatch() (genericIterator, error) { batch := make([]*LazyChunk, 0, it.batchSize+len(it.lastOverlapping)) var nextChunk *LazyChunk + var includesOverlap bool + for it.chunks.Len() > 0 { // pop the next batch of chunks and append/prepend previous overlapping chunks // so we can merge/de-dupe overlapping entries. - if it.direction == logproto.FORWARD { + if !includesOverlap && it.direction == logproto.FORWARD { batch = append(batch, it.lastOverlapping...) } batch = append(batch, it.chunks.pop(it.batchSize)...) - if it.direction == logproto.BACKWARD { + if !includesOverlap && it.direction == logproto.BACKWARD { batch = append(batch, it.lastOverlapping...) } + includesOverlap = true + if it.chunks.Len() > 0 { nextChunk = it.chunks.Peek() // we max out our iterator boundaries to the next chunks in the queue @@ -294,8 +310,12 @@ func newLogBatchIterator( filter: filter, ctx: ctx, } + batch := newBatchChunkIterator(ctx, chunks, batchSize, direction, start, end, logbatch.newChunksIterator) + // Important: since the batchChunkIterator is bound to the LogBatchIterator, + // ensure embedded fields are present before it's started. logbatch.batchChunkIterator = batch + batch.Start() return logbatch, nil } @@ -321,10 +341,12 @@ func (it *logBatchIterator) newChunksIterator(chunks []*LazyChunk, from, through func (it *logBatchIterator) buildIterators(chks map[model.Fingerprint][][]*LazyChunk, from, through time.Time, nextChunk *LazyChunk) ([]iter.EntryIterator, error) { result := make([]iter.EntryIterator, 0, len(chks)) for _, chunks := range chks { + iterator, err := it.buildHeapIterator(chunks, from, through, nextChunk) if err != nil { return nil, err } + result = append(result, iterator) } @@ -391,7 +413,11 @@ func newSampleBatchIterator( ctx: ctx, } batch := newBatchChunkIterator(ctx, chunks, batchSize, logproto.FORWARD, start, end, samplebatch.newChunksIterator) + + // Important: since the batchChunkIterator is bound to the SampleBatchIterator, + // ensure embedded fields are present before it's started. samplebatch.batchChunkIterator = batch + batch.Start() return samplebatch, nil } diff --git a/pkg/storage/batch_test.go b/pkg/storage/batch_test.go index 19dbd5d284..f7ae77417f 100644 --- a/pkg/storage/batch_test.go +++ b/pkg/storage/batch_test.go @@ -22,6 +22,48 @@ import ( "github.com/grafana/loki/pkg/logql/stats" ) +func Test_batchIterSafeStart(t *testing.T) { + stream := logproto.Stream{ + Labels: fooLabelsWithName, + Entries: []logproto.Entry{ + { + Timestamp: from, + Line: "1", + }, + { + Timestamp: from.Add(time.Millisecond), + Line: "2", + }, + }, + } + chks := []*LazyChunk{ + newLazyChunk(stream), + } + + var ok bool + + batch := newBatchChunkIterator(context.Background(), chks, 1, logproto.FORWARD, from, from.Add(4*time.Millisecond), func(chunks []*LazyChunk, from, through time.Time, nextChunk *LazyChunk) (genericIterator, error) { + if !ok { + panic("unexpected") + } + + // we don't care about the actual data for this test, just give it an iterator. + return iter.NewStreamIterator(stream), nil + }) + + // if it was started already, we should see a panic before this + time.Sleep(time.Millisecond) + ok = true + + // ensure idempotency + batch.Start() + batch.Start() + + ok = batch.Next() + require.Equal(t, true, ok) + +} + func Test_newLogBatchChunkIterator(t *testing.T) { tests := map[string]struct { @@ -548,6 +590,108 @@ func Test_newLogBatchChunkIterator(t *testing.T) { logproto.BACKWARD, 2, }, + // This test is rather complex under the hood. + // It should cause three sub batches in the iterator. + // The first batch has no overlap -- it cannot as the first. It has bounds [1,2) + // The second batch has one chunk overlap, but it includes no entries in the overlap. + // It has bounds [2,4). + // The third batch finally consumes the overlap, with bounds [4,max). + // Notably it also ends up testing the code paths for increasing batch sizes past + // the default due to nextChunks with the same start timestamp. + "forward identicals": { + []*LazyChunk{ + newLazyChunk(logproto.Stream{ + Labels: fooLabelsWithName, + Entries: []logproto.Entry{ + { + Timestamp: from, + Line: "1", + }, + }, + }), + newLazyChunk(logproto.Stream{ + Labels: fooLabelsWithName, + Entries: []logproto.Entry{ + { + Timestamp: from, + Line: "1", + }, + }, + }), + newLazyChunk(logproto.Stream{ + Labels: fooLabelsWithName, + Entries: []logproto.Entry{ + { + Timestamp: from, + Line: "1", + }, + { + Timestamp: from.Add(3 * time.Millisecond), + Line: "4", + }, + }, + }), + newLazyChunk(logproto.Stream{ + Labels: fooLabelsWithName, + Entries: []logproto.Entry{ + { + Timestamp: from.Add(time.Millisecond), + Line: "2", + }, + }, + }), + newLazyChunk(logproto.Stream{ + Labels: fooLabelsWithName, + Entries: []logproto.Entry{ + { + Timestamp: from.Add(time.Millisecond), + Line: "2", + }, + }, + }), + newLazyChunk(logproto.Stream{ + Labels: fooLabelsWithName, + Entries: []logproto.Entry{ + { + Timestamp: from.Add(time.Millisecond), + Line: "2", + }, + }, + }), + newLazyChunk(logproto.Stream{ + Labels: fooLabelsWithName, + Entries: []logproto.Entry{ + { + Timestamp: from.Add(3 * time.Millisecond), + Line: "4", + }, + }, + }), + }, + []logproto.Stream{ + { + Labels: fooLabels, + Entries: []logproto.Entry{ + { + Timestamp: from, + Line: "1", + }, + { + Timestamp: from.Add(time.Millisecond), + Line: "2", + }, + { + Timestamp: from.Add(3 * time.Millisecond), + Line: "4", + }, + }, + }, + }, + fooLabelsWithName, + from, from.Add(4 * time.Millisecond), + logproto.FORWARD, + 1, + }, } for name, tt := range tests { diff --git a/pkg/storage/lazy_chunk.go b/pkg/storage/lazy_chunk.go index 709442e121..9dee210c44 100644 --- a/pkg/storage/lazy_chunk.go +++ b/pkg/storage/lazy_chunk.go @@ -129,7 +129,7 @@ func (c *LazyChunk) SampleIterator( continue } if nextChunk != nil { - delete(c.overlappingBlocks, b.Offset()) + delete(c.overlappingSampleBlocks, b.Offset()) } // non-overlapping block with the next chunk are not cached. its = append(its, b.SampleIterator(ctx, filter, extractor)) diff --git a/pkg/storage/lazy_chunk_test.go b/pkg/storage/lazy_chunk_test.go index 9495e43353..9a1aabc3eb 100644 --- a/pkg/storage/lazy_chunk_test.go +++ b/pkg/storage/lazy_chunk_test.go @@ -2,6 +2,7 @@ package storage import ( "context" + "fmt" "testing" "time" @@ -15,6 +16,74 @@ import ( "github.com/grafana/loki/pkg/util" ) +func TestLazyChunkIterator(t *testing.T) { + for i, tc := range []struct { + chunk *LazyChunk + expected []logproto.Stream + }{ + { + newLazyChunk(logproto.Stream{ + Labels: fooLabelsWithName, + Entries: []logproto.Entry{ + { + Timestamp: from, + Line: "1", + }, + }, + }), + []logproto.Stream{ + { + Entries: []logproto.Entry{ + { + Timestamp: from, + Line: "1", + }, + }, + }, + }, + }, + } { + t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { + it, err := tc.chunk.Iterator(context.Background(), time.Unix(0, 0), time.Unix(1000, 0), logproto.FORWARD, logql.TrueFilter, nil) + require.Nil(t, err) + streams, _, err := iter.ReadBatch(it, 1000) + require.Nil(t, err) + _ = it.Close() + require.Equal(t, tc.expected, streams.Streams) + }) + } +} + +func TestLazyChunksPop(t *testing.T) { + for i, tc := range []struct { + initial int + n int + expectedLn int + rem int + }{ + {1, 1, 1, 0}, + {2, 1, 1, 1}, + {3, 4, 3, 0}, + } { + + t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { + lc := &lazyChunks{} + for i := 0; i < tc.initial; i++ { + lc.chunks = append(lc.chunks, &LazyChunk{}) + } + out := lc.pop(tc.n) + + for i := 0; i < tc.expectedLn; i++ { + require.NotNil(t, out[i]) + } + + for i := 0; i < tc.rem; i++ { + require.NotNil(t, lc.chunks[i]) + } + }) + } +} + func TestIsOverlapping(t *testing.T) { tests := []struct { name string diff --git a/pkg/storage/util_test.go b/pkg/storage/util_test.go index 9e2b267502..8c27f092de 100644 --- a/pkg/storage/util_test.go +++ b/pkg/storage/util_test.go @@ -23,7 +23,7 @@ import ( "github.com/grafana/loki/pkg/util" ) -var fooLabelsWithName = "{foo=\"bar\", __name__=\"log\"}" +var fooLabelsWithName = "{foo=\"bar\", __name__=\"logs\"}" var fooLabels = "{foo=\"bar\"}" var from = time.Unix(0, time.Millisecond.Nanoseconds())