diff --git a/pkg/iter/cache.go b/pkg/iter/cache.go index 9f9df5624c..7e8bf5e840 100644 --- a/pkg/iter/cache.go +++ b/pkg/iter/cache.go @@ -6,13 +6,14 @@ import ( type CacheEntryIterator interface { EntryIterator + Wrapped() EntryIterator Reset() } // cachedIterator is an iterator that caches iteration to be replayed later on. type cachedIterator struct { - cache []entryWithLabels - base EntryIterator // once set to nil it means we have to use the cache. + cache []entryWithLabels + wrapped EntryIterator // once set to nil it means we have to use the cache. curr int @@ -24,9 +25,9 @@ type cachedIterator struct { // after closing it without re-using the underlaying iterator `it`. func NewCachedIterator(it EntryIterator, cap int) CacheEntryIterator { c := &cachedIterator{ - base: it, - cache: make([]entryWithLabels, 0, cap), - curr: -1, + wrapped: it, + cache: make([]entryWithLabels, 0, cap), + curr: -1, } return c } @@ -35,35 +36,44 @@ func (it *cachedIterator) Reset() { it.curr = -1 } -func (it *cachedIterator) Next() bool { - if it.base != nil { - ok := it.base.Next() - // we're done with the base iterator. - if !ok { - it.closeErr = it.base.Close() - it.iterErr = it.base.Error() - it.base = nil - return false - } - // we're caching entries - it.cache = append(it.cache, entryWithLabels{entry: it.base.Entry(), labels: it.base.Labels()}) - it.curr++ - return true +func (it *cachedIterator) Wrapped() EntryIterator { + return it.wrapped +} + +func (it *cachedIterator) consumeWrapped() bool { + if it.Wrapped() == nil { + return false + } + ok := it.Wrapped().Next() + // we're done with the base iterator. + if !ok { + it.closeErr = it.Wrapped().Close() + it.iterErr = it.Wrapped().Error() + it.wrapped = nil + return false } - // second pass - if len(it.cache) == 0 { - it.cache = nil + // we're caching entries + it.cache = append(it.cache, entryWithLabels{entry: it.Wrapped().Entry(), labels: it.Wrapped().Labels()}) + it.curr++ + return true +} + +func (it *cachedIterator) Next() bool { + if len(it.cache) == 0 && it.Wrapped() == nil { return false } if it.curr+1 >= len(it.cache) { + if it.Wrapped() != nil { + return it.consumeWrapped() + } return false } it.curr++ - return it.curr < len(it.cache) + return true } func (it *cachedIterator) Entry() logproto.Entry { - if len(it.cache) == 0 || it.curr < 0 { + if len(it.cache) == 0 || it.curr < 0 || it.curr >= len(it.cache) { return logproto.Entry{} } @@ -71,7 +81,7 @@ func (it *cachedIterator) Entry() logproto.Entry { } func (it *cachedIterator) Labels() string { - if len(it.cache) == 0 || it.curr < 0 { + if len(it.cache) == 0 || it.curr < 0 || it.curr >= len(it.cache) { return "" } return it.cache[it.curr].labels @@ -86,13 +96,14 @@ func (it *cachedIterator) Close() error { type CacheSampleIterator interface { SampleIterator + Wrapped() SampleIterator Reset() } // cachedIterator is an iterator that caches iteration to be replayed later on. type cachedSampleIterator struct { - cache []sampleWithLabels - base SampleIterator + cache []sampleWithLabels + wrapped SampleIterator curr int @@ -104,53 +115,62 @@ type cachedSampleIterator struct { // after closing it without re-using the underlaying iterator `it`. func NewCachedSampleIterator(it SampleIterator, cap int) CacheSampleIterator { c := &cachedSampleIterator{ - base: it, - cache: make([]sampleWithLabels, 0, cap), - curr: -1, + wrapped: it, + cache: make([]sampleWithLabels, 0, cap), + curr: -1, } return c } +func (it *cachedSampleIterator) Wrapped() SampleIterator { + return it.wrapped +} + func (it *cachedSampleIterator) Reset() { it.curr = -1 } -func (it *cachedSampleIterator) Next() bool { - if it.base != nil { - ok := it.base.Next() - // we're done with the base iterator. - if !ok { - it.closeErr = it.base.Close() - it.iterErr = it.base.Error() - it.base = nil - return false - } - // we're caching entries - it.cache = append(it.cache, sampleWithLabels{Sample: it.base.Sample(), labels: it.base.Labels()}) - it.curr++ - return true +func (it *cachedSampleIterator) consumeWrapped() bool { + if it.Wrapped() == nil { + return false + } + ok := it.Wrapped().Next() + // we're done with the base iterator. + if !ok { + it.closeErr = it.Wrapped().Close() + it.iterErr = it.Wrapped().Error() + it.wrapped = nil + return false } - // second pass - if len(it.cache) == 0 { - it.cache = nil + // we're caching entries + it.cache = append(it.cache, sampleWithLabels{Sample: it.Wrapped().Sample(), labels: it.Wrapped().Labels()}) + it.curr++ + return true +} + +func (it *cachedSampleIterator) Next() bool { + if len(it.cache) == 0 && it.Wrapped() == nil { return false } if it.curr+1 >= len(it.cache) { + if it.Wrapped() != nil { + return it.consumeWrapped() + } return false } it.curr++ - return it.curr < len(it.cache) + return true } func (it *cachedSampleIterator) Sample() logproto.Sample { - if len(it.cache) == 0 || it.curr < 0 { + if len(it.cache) == 0 || it.curr < 0 || it.curr >= len(it.cache) { return logproto.Sample{} } return it.cache[it.curr].Sample } func (it *cachedSampleIterator) Labels() string { - if len(it.cache) == 0 || it.curr < 0 { + if len(it.cache) == 0 || it.curr < 0 || it.curr >= len(it.cache) { return "" } return it.cache[it.curr].labels diff --git a/pkg/iter/cache_test.go b/pkg/iter/cache_test.go index b04811caed..a786aa5b7a 100644 --- a/pkg/iter/cache_test.go +++ b/pkg/iter/cache_test.go @@ -31,7 +31,7 @@ func Test_CachedIterator(t *testing.T) { require.Equal(t, true, c.Next()) require.Equal(t, stream.Entries[2], c.Entry()) require.Equal(t, false, c.Next()) - require.Equal(t, nil, c.Error()) + require.NoError(t, c.Error()) require.Equal(t, stream.Entries[2], c.Entry()) require.Equal(t, false, c.Next()) } @@ -45,7 +45,6 @@ func Test_CachedIterator(t *testing.T) { } func Test_EmptyCachedIterator(t *testing.T) { - c := NewCachedIterator(NoopIterator, 0) require.Equal(t, "", c.Labels()) @@ -61,11 +60,9 @@ func Test_EmptyCachedIterator(t *testing.T) { require.Equal(t, false, c.Next()) require.Equal(t, "", c.Labels()) require.Equal(t, logproto.Entry{}, c.Entry()) - } func Test_ErrorCachedIterator(t *testing.T) { - c := NewCachedIterator(&errorIter{}, 0) require.Equal(t, false, c.Next()) @@ -75,6 +72,62 @@ func Test_ErrorCachedIterator(t *testing.T) { require.Equal(t, errors.New("close"), c.Close()) } +func Test_CachedIteratorResetNotExhausted(t *testing.T) { + stream := logproto.Stream{ + Labels: `{foo="bar"}`, + Entries: []logproto.Entry{ + {Timestamp: time.Unix(0, 1), Line: "1"}, + {Timestamp: time.Unix(0, 2), Line: "2"}, + {Timestamp: time.Unix(0, 3), Line: "3"}, + }, + } + c := NewCachedIterator(NewStreamIterator(stream), 3) + + require.Equal(t, true, c.Next()) + require.Equal(t, stream.Entries[0], c.Entry()) + require.Equal(t, true, c.Next()) + require.Equal(t, stream.Entries[1], c.Entry()) + c.Reset() + require.Equal(t, true, c.Next()) + require.Equal(t, stream.Entries[0], c.Entry()) + require.Equal(t, true, c.Next()) + require.Equal(t, stream.Entries[1], c.Entry()) + require.Equal(t, true, c.Next()) + require.Equal(t, stream.Entries[2], c.Entry()) + require.Equal(t, false, c.Next()) + require.NoError(t, c.Error()) + require.Equal(t, stream.Entries[2], c.Entry()) + require.Equal(t, false, c.Next()) + + // Close the iterator reset it to the beginning. + require.Equal(t, nil, c.Close()) +} + +func Test_CachedIteratorResetExhausted(t *testing.T) { + stream := logproto.Stream{ + Labels: `{foo="bar"}`, + Entries: []logproto.Entry{ + {Timestamp: time.Unix(0, 1), Line: "1"}, + {Timestamp: time.Unix(0, 2), Line: "2"}, + }, + } + c := NewCachedIterator(NewStreamIterator(stream), 3) + + require.Equal(t, true, c.Next()) + require.Equal(t, stream.Entries[0], c.Entry()) + require.Equal(t, true, c.Next()) + require.Equal(t, stream.Entries[1], c.Entry()) + c.Reset() + require.Equal(t, true, c.Next()) + require.Equal(t, stream.Entries[0], c.Entry()) + require.Equal(t, true, c.Next()) + require.Equal(t, stream.Entries[1], c.Entry()) + require.Equal(t, false, c.Next()) + + // Close the iterator reset it to the beginning. + require.Equal(t, nil, c.Close()) +} + func Test_CachedSampleIterator(t *testing.T) { series := logproto.Series{ Labels: `{foo="bar"}`, @@ -96,7 +149,7 @@ func Test_CachedSampleIterator(t *testing.T) { require.Equal(t, true, c.Next()) require.Equal(t, series.Samples[2], c.Sample()) require.Equal(t, false, c.Next()) - require.Equal(t, nil, c.Error()) + require.NoError(t, c.Error()) require.Equal(t, series.Samples[2], c.Sample()) require.Equal(t, false, c.Next()) } @@ -109,8 +162,63 @@ func Test_CachedSampleIterator(t *testing.T) { assert() } -func Test_EmptyCachedSampleIterator(t *testing.T) { +func Test_CachedSampleIteratorResetNotExhausted(t *testing.T) { + series := logproto.Series{ + Labels: `{foo="bar"}`, + Samples: []logproto.Sample{ + {Timestamp: time.Unix(0, 1).UnixNano(), Hash: 1, Value: 1.}, + {Timestamp: time.Unix(0, 2).UnixNano(), Hash: 2, Value: 2.}, + {Timestamp: time.Unix(0, 3).UnixNano(), Hash: 3, Value: 3.}, + }, + } + c := NewCachedSampleIterator(NewSeriesIterator(series), 3) + require.Equal(t, true, c.Next()) + require.Equal(t, series.Samples[0], c.Sample()) + require.Equal(t, true, c.Next()) + require.Equal(t, series.Samples[1], c.Sample()) + c.Reset() + require.Equal(t, true, c.Next()) + require.Equal(t, series.Samples[0], c.Sample()) + require.Equal(t, true, c.Next()) + require.Equal(t, series.Samples[1], c.Sample()) + require.Equal(t, true, c.Next()) + require.Equal(t, series.Samples[2], c.Sample()) + require.Equal(t, false, c.Next()) + require.NoError(t, c.Error()) + require.Equal(t, series.Samples[2], c.Sample()) + require.Equal(t, false, c.Next()) + + // Close the iterator reset it to the beginning. + require.Equal(t, nil, c.Close()) +} + +func Test_CachedSampleIteratorResetExhausted(t *testing.T) { + series := logproto.Series{ + Labels: `{foo="bar"}`, + Samples: []logproto.Sample{ + {Timestamp: time.Unix(0, 1).UnixNano(), Hash: 1, Value: 1.}, + {Timestamp: time.Unix(0, 2).UnixNano(), Hash: 2, Value: 2.}, + }, + } + c := NewCachedSampleIterator(NewSeriesIterator(series), 3) + + require.Equal(t, true, c.Next()) + require.Equal(t, series.Samples[0], c.Sample()) + require.Equal(t, true, c.Next()) + require.Equal(t, series.Samples[1], c.Sample()) + c.Reset() + require.Equal(t, true, c.Next()) + require.Equal(t, series.Samples[0], c.Sample()) + require.Equal(t, true, c.Next()) + require.Equal(t, series.Samples[1], c.Sample()) + require.Equal(t, false, c.Next()) + + // Close the iterator reset it to the beginning. + require.Equal(t, nil, c.Close()) +} + +func Test_EmptyCachedSampleIterator(t *testing.T) { c := NewCachedSampleIterator(NoopIterator, 0) require.Equal(t, "", c.Labels()) @@ -126,11 +234,9 @@ func Test_EmptyCachedSampleIterator(t *testing.T) { require.Equal(t, false, c.Next()) require.Equal(t, "", c.Labels()) require.Equal(t, logproto.Sample{}, c.Sample()) - } func Test_ErrorCachedSampleIterator(t *testing.T) { - c := NewCachedSampleIterator(&errorIter{}, 0) require.Equal(t, false, c.Next()) diff --git a/pkg/storage/lazy_chunk.go b/pkg/storage/lazy_chunk.go index 6babce8f8f..fb56298718 100644 --- a/pkg/storage/lazy_chunk.go +++ b/pkg/storage/lazy_chunk.go @@ -5,12 +5,14 @@ import ( "errors" "time" - "github.com/grafana/loki/pkg/storage/chunk" + util_log "github.com/cortexproject/cortex/pkg/util/log" + "github.com/go-kit/kit/log/level" "github.com/grafana/loki/pkg/chunkenc" "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql/log" + "github.com/grafana/loki/pkg/storage/chunk" ) // LazyChunk loads the chunk when it is accessed. @@ -67,7 +69,15 @@ func (c *LazyChunk) Iterator( continue } if nextChunk != nil { - delete(c.overlappingBlocks, b.Offset()) + if cache, ok := c.overlappingBlocks[b.Offset()]; ok { + delete(c.overlappingBlocks, b.Offset()) + if err := cache.Wrapped().Close(); err != nil { + level.Warn(util_log.Logger).Log( + "msg", "failed to close cache block iterator", + "err", err, + ) + } + } } // non-overlapping block with the next chunk are not cached. its = append(its, b.Iterator(ctx, pipeline)) @@ -140,7 +150,15 @@ func (c *LazyChunk) SampleIterator( continue } if nextChunk != nil { - delete(c.overlappingSampleBlocks, b.Offset()) + if cache, ok := c.overlappingSampleBlocks[b.Offset()]; ok { + delete(c.overlappingSampleBlocks, b.Offset()) + if err := cache.Wrapped().Close(); err != nil { + level.Warn(util_log.Logger).Log( + "msg", "failed to close cache block sample iterator", + "err", err, + ) + } + } } // non-overlapping block with the next chunk are not cached. its = append(its, b.SampleIterator(ctx, extractor))