Fixes a bug in the block cache code. (#4302)

* Fixes a bug in the block cache code.

Because chunks can overlap and that can causes multiple processing of the same chunk within batches, we decided to cache the
evaluation of it.

However there was a bug here causing incorrect query result, specially for metric queries. The bug would occur when we would only partially consume a block (overlaping block between multiple batches) and then try to replay it.
In which case it wouldn't actually replay it but keep consuming the underlaying iterator.

This took us 2 days to figure out, thank Danny for helping along.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Missing warning when closing fails.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* got linted.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Review feedback.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Forgot to rename usage.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
pull/4310/head
Cyril Tovena 4 years ago committed by GitHub
parent 206956f767
commit e81ca28c80
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 120
      pkg/iter/cache.go
  2. 122
      pkg/iter/cache_test.go
  3. 24
      pkg/storage/lazy_chunk.go

@ -6,13 +6,14 @@ import (
type CacheEntryIterator interface {
EntryIterator
Wrapped() EntryIterator
Reset()
}
// cachedIterator is an iterator that caches iteration to be replayed later on.
type cachedIterator struct {
cache []entryWithLabels
base EntryIterator // once set to nil it means we have to use the cache.
cache []entryWithLabels
wrapped EntryIterator // once set to nil it means we have to use the cache.
curr int
@ -24,9 +25,9 @@ type cachedIterator struct {
// after closing it without re-using the underlaying iterator `it`.
func NewCachedIterator(it EntryIterator, cap int) CacheEntryIterator {
c := &cachedIterator{
base: it,
cache: make([]entryWithLabels, 0, cap),
curr: -1,
wrapped: it,
cache: make([]entryWithLabels, 0, cap),
curr: -1,
}
return c
}
@ -35,35 +36,44 @@ func (it *cachedIterator) Reset() {
it.curr = -1
}
func (it *cachedIterator) Next() bool {
if it.base != nil {
ok := it.base.Next()
// we're done with the base iterator.
if !ok {
it.closeErr = it.base.Close()
it.iterErr = it.base.Error()
it.base = nil
return false
}
// we're caching entries
it.cache = append(it.cache, entryWithLabels{entry: it.base.Entry(), labels: it.base.Labels()})
it.curr++
return true
func (it *cachedIterator) Wrapped() EntryIterator {
return it.wrapped
}
func (it *cachedIterator) consumeWrapped() bool {
if it.Wrapped() == nil {
return false
}
ok := it.Wrapped().Next()
// we're done with the base iterator.
if !ok {
it.closeErr = it.Wrapped().Close()
it.iterErr = it.Wrapped().Error()
it.wrapped = nil
return false
}
// second pass
if len(it.cache) == 0 {
it.cache = nil
// we're caching entries
it.cache = append(it.cache, entryWithLabels{entry: it.Wrapped().Entry(), labels: it.Wrapped().Labels()})
it.curr++
return true
}
func (it *cachedIterator) Next() bool {
if len(it.cache) == 0 && it.Wrapped() == nil {
return false
}
if it.curr+1 >= len(it.cache) {
if it.Wrapped() != nil {
return it.consumeWrapped()
}
return false
}
it.curr++
return it.curr < len(it.cache)
return true
}
func (it *cachedIterator) Entry() logproto.Entry {
if len(it.cache) == 0 || it.curr < 0 {
if len(it.cache) == 0 || it.curr < 0 || it.curr >= len(it.cache) {
return logproto.Entry{}
}
@ -71,7 +81,7 @@ func (it *cachedIterator) Entry() logproto.Entry {
}
func (it *cachedIterator) Labels() string {
if len(it.cache) == 0 || it.curr < 0 {
if len(it.cache) == 0 || it.curr < 0 || it.curr >= len(it.cache) {
return ""
}
return it.cache[it.curr].labels
@ -86,13 +96,14 @@ func (it *cachedIterator) Close() error {
type CacheSampleIterator interface {
SampleIterator
Wrapped() SampleIterator
Reset()
}
// cachedIterator is an iterator that caches iteration to be replayed later on.
type cachedSampleIterator struct {
cache []sampleWithLabels
base SampleIterator
cache []sampleWithLabels
wrapped SampleIterator
curr int
@ -104,53 +115,62 @@ type cachedSampleIterator struct {
// after closing it without re-using the underlaying iterator `it`.
func NewCachedSampleIterator(it SampleIterator, cap int) CacheSampleIterator {
c := &cachedSampleIterator{
base: it,
cache: make([]sampleWithLabels, 0, cap),
curr: -1,
wrapped: it,
cache: make([]sampleWithLabels, 0, cap),
curr: -1,
}
return c
}
func (it *cachedSampleIterator) Wrapped() SampleIterator {
return it.wrapped
}
func (it *cachedSampleIterator) Reset() {
it.curr = -1
}
func (it *cachedSampleIterator) Next() bool {
if it.base != nil {
ok := it.base.Next()
// we're done with the base iterator.
if !ok {
it.closeErr = it.base.Close()
it.iterErr = it.base.Error()
it.base = nil
return false
}
// we're caching entries
it.cache = append(it.cache, sampleWithLabels{Sample: it.base.Sample(), labels: it.base.Labels()})
it.curr++
return true
func (it *cachedSampleIterator) consumeWrapped() bool {
if it.Wrapped() == nil {
return false
}
ok := it.Wrapped().Next()
// we're done with the base iterator.
if !ok {
it.closeErr = it.Wrapped().Close()
it.iterErr = it.Wrapped().Error()
it.wrapped = nil
return false
}
// second pass
if len(it.cache) == 0 {
it.cache = nil
// we're caching entries
it.cache = append(it.cache, sampleWithLabels{Sample: it.Wrapped().Sample(), labels: it.Wrapped().Labels()})
it.curr++
return true
}
func (it *cachedSampleIterator) Next() bool {
if len(it.cache) == 0 && it.Wrapped() == nil {
return false
}
if it.curr+1 >= len(it.cache) {
if it.Wrapped() != nil {
return it.consumeWrapped()
}
return false
}
it.curr++
return it.curr < len(it.cache)
return true
}
func (it *cachedSampleIterator) Sample() logproto.Sample {
if len(it.cache) == 0 || it.curr < 0 {
if len(it.cache) == 0 || it.curr < 0 || it.curr >= len(it.cache) {
return logproto.Sample{}
}
return it.cache[it.curr].Sample
}
func (it *cachedSampleIterator) Labels() string {
if len(it.cache) == 0 || it.curr < 0 {
if len(it.cache) == 0 || it.curr < 0 || it.curr >= len(it.cache) {
return ""
}
return it.cache[it.curr].labels

@ -31,7 +31,7 @@ func Test_CachedIterator(t *testing.T) {
require.Equal(t, true, c.Next())
require.Equal(t, stream.Entries[2], c.Entry())
require.Equal(t, false, c.Next())
require.Equal(t, nil, c.Error())
require.NoError(t, c.Error())
require.Equal(t, stream.Entries[2], c.Entry())
require.Equal(t, false, c.Next())
}
@ -45,7 +45,6 @@ func Test_CachedIterator(t *testing.T) {
}
func Test_EmptyCachedIterator(t *testing.T) {
c := NewCachedIterator(NoopIterator, 0)
require.Equal(t, "", c.Labels())
@ -61,11 +60,9 @@ func Test_EmptyCachedIterator(t *testing.T) {
require.Equal(t, false, c.Next())
require.Equal(t, "", c.Labels())
require.Equal(t, logproto.Entry{}, c.Entry())
}
func Test_ErrorCachedIterator(t *testing.T) {
c := NewCachedIterator(&errorIter{}, 0)
require.Equal(t, false, c.Next())
@ -75,6 +72,62 @@ func Test_ErrorCachedIterator(t *testing.T) {
require.Equal(t, errors.New("close"), c.Close())
}
func Test_CachedIteratorResetNotExhausted(t *testing.T) {
stream := logproto.Stream{
Labels: `{foo="bar"}`,
Entries: []logproto.Entry{
{Timestamp: time.Unix(0, 1), Line: "1"},
{Timestamp: time.Unix(0, 2), Line: "2"},
{Timestamp: time.Unix(0, 3), Line: "3"},
},
}
c := NewCachedIterator(NewStreamIterator(stream), 3)
require.Equal(t, true, c.Next())
require.Equal(t, stream.Entries[0], c.Entry())
require.Equal(t, true, c.Next())
require.Equal(t, stream.Entries[1], c.Entry())
c.Reset()
require.Equal(t, true, c.Next())
require.Equal(t, stream.Entries[0], c.Entry())
require.Equal(t, true, c.Next())
require.Equal(t, stream.Entries[1], c.Entry())
require.Equal(t, true, c.Next())
require.Equal(t, stream.Entries[2], c.Entry())
require.Equal(t, false, c.Next())
require.NoError(t, c.Error())
require.Equal(t, stream.Entries[2], c.Entry())
require.Equal(t, false, c.Next())
// Close the iterator reset it to the beginning.
require.Equal(t, nil, c.Close())
}
func Test_CachedIteratorResetExhausted(t *testing.T) {
stream := logproto.Stream{
Labels: `{foo="bar"}`,
Entries: []logproto.Entry{
{Timestamp: time.Unix(0, 1), Line: "1"},
{Timestamp: time.Unix(0, 2), Line: "2"},
},
}
c := NewCachedIterator(NewStreamIterator(stream), 3)
require.Equal(t, true, c.Next())
require.Equal(t, stream.Entries[0], c.Entry())
require.Equal(t, true, c.Next())
require.Equal(t, stream.Entries[1], c.Entry())
c.Reset()
require.Equal(t, true, c.Next())
require.Equal(t, stream.Entries[0], c.Entry())
require.Equal(t, true, c.Next())
require.Equal(t, stream.Entries[1], c.Entry())
require.Equal(t, false, c.Next())
// Close the iterator reset it to the beginning.
require.Equal(t, nil, c.Close())
}
func Test_CachedSampleIterator(t *testing.T) {
series := logproto.Series{
Labels: `{foo="bar"}`,
@ -96,7 +149,7 @@ func Test_CachedSampleIterator(t *testing.T) {
require.Equal(t, true, c.Next())
require.Equal(t, series.Samples[2], c.Sample())
require.Equal(t, false, c.Next())
require.Equal(t, nil, c.Error())
require.NoError(t, c.Error())
require.Equal(t, series.Samples[2], c.Sample())
require.Equal(t, false, c.Next())
}
@ -109,8 +162,63 @@ func Test_CachedSampleIterator(t *testing.T) {
assert()
}
func Test_EmptyCachedSampleIterator(t *testing.T) {
func Test_CachedSampleIteratorResetNotExhausted(t *testing.T) {
series := logproto.Series{
Labels: `{foo="bar"}`,
Samples: []logproto.Sample{
{Timestamp: time.Unix(0, 1).UnixNano(), Hash: 1, Value: 1.},
{Timestamp: time.Unix(0, 2).UnixNano(), Hash: 2, Value: 2.},
{Timestamp: time.Unix(0, 3).UnixNano(), Hash: 3, Value: 3.},
},
}
c := NewCachedSampleIterator(NewSeriesIterator(series), 3)
require.Equal(t, true, c.Next())
require.Equal(t, series.Samples[0], c.Sample())
require.Equal(t, true, c.Next())
require.Equal(t, series.Samples[1], c.Sample())
c.Reset()
require.Equal(t, true, c.Next())
require.Equal(t, series.Samples[0], c.Sample())
require.Equal(t, true, c.Next())
require.Equal(t, series.Samples[1], c.Sample())
require.Equal(t, true, c.Next())
require.Equal(t, series.Samples[2], c.Sample())
require.Equal(t, false, c.Next())
require.NoError(t, c.Error())
require.Equal(t, series.Samples[2], c.Sample())
require.Equal(t, false, c.Next())
// Close the iterator reset it to the beginning.
require.Equal(t, nil, c.Close())
}
func Test_CachedSampleIteratorResetExhausted(t *testing.T) {
series := logproto.Series{
Labels: `{foo="bar"}`,
Samples: []logproto.Sample{
{Timestamp: time.Unix(0, 1).UnixNano(), Hash: 1, Value: 1.},
{Timestamp: time.Unix(0, 2).UnixNano(), Hash: 2, Value: 2.},
},
}
c := NewCachedSampleIterator(NewSeriesIterator(series), 3)
require.Equal(t, true, c.Next())
require.Equal(t, series.Samples[0], c.Sample())
require.Equal(t, true, c.Next())
require.Equal(t, series.Samples[1], c.Sample())
c.Reset()
require.Equal(t, true, c.Next())
require.Equal(t, series.Samples[0], c.Sample())
require.Equal(t, true, c.Next())
require.Equal(t, series.Samples[1], c.Sample())
require.Equal(t, false, c.Next())
// Close the iterator reset it to the beginning.
require.Equal(t, nil, c.Close())
}
func Test_EmptyCachedSampleIterator(t *testing.T) {
c := NewCachedSampleIterator(NoopIterator, 0)
require.Equal(t, "", c.Labels())
@ -126,11 +234,9 @@ func Test_EmptyCachedSampleIterator(t *testing.T) {
require.Equal(t, false, c.Next())
require.Equal(t, "", c.Labels())
require.Equal(t, logproto.Sample{}, c.Sample())
}
func Test_ErrorCachedSampleIterator(t *testing.T) {
c := NewCachedSampleIterator(&errorIter{}, 0)
require.Equal(t, false, c.Next())

@ -5,12 +5,14 @@ import (
"errors"
"time"
"github.com/grafana/loki/pkg/storage/chunk"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/go-kit/kit/log/level"
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/log"
"github.com/grafana/loki/pkg/storage/chunk"
)
// LazyChunk loads the chunk when it is accessed.
@ -67,7 +69,15 @@ func (c *LazyChunk) Iterator(
continue
}
if nextChunk != nil {
delete(c.overlappingBlocks, b.Offset())
if cache, ok := c.overlappingBlocks[b.Offset()]; ok {
delete(c.overlappingBlocks, b.Offset())
if err := cache.Wrapped().Close(); err != nil {
level.Warn(util_log.Logger).Log(
"msg", "failed to close cache block iterator",
"err", err,
)
}
}
}
// non-overlapping block with the next chunk are not cached.
its = append(its, b.Iterator(ctx, pipeline))
@ -140,7 +150,15 @@ func (c *LazyChunk) SampleIterator(
continue
}
if nextChunk != nil {
delete(c.overlappingSampleBlocks, b.Offset())
if cache, ok := c.overlappingSampleBlocks[b.Offset()]; ok {
delete(c.overlappingSampleBlocks, b.Offset())
if err := cache.Wrapped().Close(); err != nil {
level.Warn(util_log.Logger).Log(
"msg", "failed to close cache block sample iterator",
"err", err,
)
}
}
}
// non-overlapping block with the next chunk are not cached.
its = append(its, b.SampleIterator(ctx, extractor))

Loading…
Cancel
Save