From c0a28cbf30f342b68766c104403fa3bfe5cd56ee Mon Sep 17 00:00:00 2001 From: Aditya C S Date: Fri, 24 Apr 2020 19:18:08 +0530 Subject: [PATCH] Log error message for invalid checksum (#1713) --- pkg/chunkenc/interface.go | 2 +- pkg/chunkenc/lazy_chunk.go | 1 + pkg/chunkenc/memchunk.go | 5 +- pkg/storage/iterator.go | 35 +++++++- pkg/storage/iterator_test.go | 161 +++++++++++++++++++++++++++++++++++ pkg/storage/store_test.go | 2 + pkg/storage/util_test.go | 10 +++ 7 files changed, 210 insertions(+), 6 deletions(-) diff --git a/pkg/chunkenc/interface.go b/pkg/chunkenc/interface.go index 00381176ce..a2faf8fa58 100644 --- a/pkg/chunkenc/interface.go +++ b/pkg/chunkenc/interface.go @@ -18,7 +18,7 @@ var ( ErrOutOfOrder = errors.New("entry out of order") ErrInvalidSize = errors.New("invalid size") ErrInvalidFlag = errors.New("invalid flag") - ErrInvalidChecksum = errors.New("invalid checksum") + ErrInvalidChecksum = errors.New("invalid chunk checksum") ) // Encoding is the identifier for a chunk encoding. diff --git a/pkg/chunkenc/lazy_chunk.go b/pkg/chunkenc/lazy_chunk.go index 9cfcde600a..2e14c48a6f 100644 --- a/pkg/chunkenc/lazy_chunk.go +++ b/pkg/chunkenc/lazy_chunk.go @@ -15,6 +15,7 @@ import ( // LazyChunk loads the chunk when it is accessed. type LazyChunk struct { Chunk chunk.Chunk + IsValid bool Fetcher *chunk.Fetcher } diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index b3f450aa16..e46fa12191 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -11,6 +11,8 @@ import ( "io" "time" + "github.com/cortexproject/cortex/pkg/util" + "github.com/go-kit/kit/log/level" "github.com/pkg/errors" "github.com/grafana/loki/pkg/iter" @@ -226,7 +228,8 @@ func NewByteChunk(b []byte, blockSize, targetSize int) (*MemChunk, error) { // Verify checksums. expCRC := binary.BigEndian.Uint32(b[blk.offset+l:]) if expCRC != crc32.Checksum(blk.b, castagnoliTable) { - return bc, ErrInvalidChecksum + level.Error(util.Logger).Log("msg", "Checksum does not match for a block in chunk, this block will be skipped", "err", ErrInvalidChecksum) + continue } bc.blocks = append(bc.blocks, blk) diff --git a/pkg/storage/iterator.go b/pkg/storage/iterator.go index 847a4873c1..1fb96949b9 100644 --- a/pkg/storage/iterator.go +++ b/pkg/storage/iterator.go @@ -9,8 +9,10 @@ import ( "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/spanlogger" "github.com/go-kit/kit/log/level" + "github.com/pkg/errors" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/promql" "github.com/grafana/loki/pkg/chunkenc" "github.com/grafana/loki/pkg/iter" @@ -199,7 +201,7 @@ func (it *batchChunkIterator) nextBatch() (iter.EntryIterator, error) { // │ # 47 │ // └──────────────┘ // ┌──────────────────────────┐ - // │ # 48 │ + // │ # 48 | // └──────────────────────────┘ // ┌──────────────┐ // │ # 49 │ @@ -328,10 +330,12 @@ func buildHeapIterator(ctx context.Context, chks [][]*chunkenc.LazyChunk, filter // __name__ is only used for upstream compatibility and is hardcoded within loki. Strip it from the return label set. labels := dropLabels(chks[0][0].Chunk.Metric, labels.MetricName).String() - for i := range chks { iterators := make([]iter.EntryIterator, 0, len(chks[i])) for j := range chks[i] { + if !chks[i][j].IsValid { + continue + } iterator, err := chks[i][j].Iterator(ctx, from, through, direction, filter) if err != nil { return nil, err @@ -388,7 +392,6 @@ func fetchLazyChunks(ctx context.Context, chunks []*chunkenc.LazyChunk) error { errChan := make(chan error) for fetcher, chunks := range chksByFetcher { go func(fetcher *chunk.Fetcher, chunks []*chunkenc.LazyChunk) { - keys := make([]string, 0, len(chunks)) chks := make([]chunk.Chunk, 0, len(chunks)) index := make(map[string]*chunkenc.LazyChunk, len(chunks)) @@ -403,8 +406,14 @@ func fetchLazyChunks(ctx context.Context, chunks []*chunkenc.LazyChunk) error { } chks, err := fetcher.FetchChunks(ctx, chks, keys) if err != nil { + if isInvalidChunkError(err) { + level.Error(util.Logger).Log("msg", "checksum of chunks does not match", "err", chunk.ErrInvalidChecksum) + errChan <- nil + return + } errChan <- err return + } // assign fetched chunk by key as FetchChunks doesn't guarantee the order. for _, chk := range chks { @@ -421,7 +430,25 @@ func fetchLazyChunks(ctx context.Context, chunks []*chunkenc.LazyChunk) error { lastErr = err } } - return lastErr + + if lastErr != nil { + return lastErr + } + + for _, c := range chunks { + if c.Chunk.Data != nil { + c.IsValid = true + } + } + return nil +} + +func isInvalidChunkError(err error) bool { + err = errors.Cause(err) + if err, ok := err.(promql.ErrStorage); ok { + return err.Err == chunk.ErrInvalidChecksum || err.Err == chunkenc.ErrInvalidChecksum + } + return false } func loadFirstChunks(ctx context.Context, chks map[model.Fingerprint][][]*chunkenc.LazyChunk) error { diff --git a/pkg/storage/iterator_test.go b/pkg/storage/iterator_test.go index 2a42251b50..bc55edd0c4 100644 --- a/pkg/storage/iterator_test.go +++ b/pkg/storage/iterator_test.go @@ -6,8 +6,12 @@ import ( "testing" "time" + "github.com/cortexproject/cortex/pkg/chunk" + "github.com/pkg/errors" "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/promql" "github.com/stretchr/testify/require" + "github.com/weaveworks/common/user" "github.com/grafana/loki/pkg/chunkenc" "github.com/grafana/loki/pkg/iter" @@ -639,6 +643,130 @@ func TestPartitionOverlappingchunks(t *testing.T) { } } +func TestBuildHeapIterator(t *testing.T) { + var ( + firstChunk = newLazyChunk(logproto.Stream{ + Labels: "{foo=\"bar\"}", + Entries: []logproto.Entry{ + { + Timestamp: from, + Line: "1", + }, + { + Timestamp: from.Add(time.Millisecond), + Line: "2", + }, + { + Timestamp: from.Add(2 * time.Millisecond), + Line: "3", + }, + }, + }) + secondChunk = newLazyInvalidChunk(logproto.Stream{ + Labels: "{foo=\"bar\"}", + Entries: []logproto.Entry{ + { + Timestamp: from.Add(3 * time.Millisecond), + Line: "4", + }, + { + Timestamp: from.Add(4 * time.Millisecond), + Line: "5", + }, + }, + }) + thirdChunk = newLazyChunk(logproto.Stream{ + Labels: "{foo=\"bar\"}", + Entries: []logproto.Entry{ + { + Timestamp: from.Add(5 * time.Millisecond), + Line: "6", + }, + }, + }) + ) + + for i, tc := range []struct { + input [][]*chunkenc.LazyChunk + expected []*logproto.Stream + }{ + { + [][]*chunkenc.LazyChunk{ + {firstChunk}, + {thirdChunk}, + }, + []*logproto.Stream{ + { + Labels: "{foo=\"bar\"}", + Entries: []logproto.Entry{ + { + Timestamp: from, + Line: "1", + }, + { + Timestamp: from.Add(time.Millisecond), + Line: "2", + }, + { + Timestamp: from.Add(2 * time.Millisecond), + Line: "3", + }, + { + Timestamp: from.Add(5 * time.Millisecond), + Line: "6", + }, + }, + }, + }, + }, + { + [][]*chunkenc.LazyChunk{ + {secondChunk}, + {firstChunk, thirdChunk}, + }, + []*logproto.Stream{ + { + Labels: "{foo=\"bar\"}", + Entries: []logproto.Entry{ + { + Timestamp: from, + Line: "1", + }, + { + Timestamp: from.Add(time.Millisecond), + Line: "2", + }, + { + Timestamp: from.Add(2 * time.Millisecond), + Line: "3", + }, + { + Timestamp: from.Add(5 * time.Millisecond), + Line: "6", + }, + }, + }, + }, + }, + } { + t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { + ctx = user.InjectOrgID(context.Background(), "test-user") + it, err := buildHeapIterator(ctx, tc.input, nil, logproto.FORWARD, from, from.Add(6*time.Millisecond)) + if err != nil { + t.Errorf("buildHeapIterator error = %v", err) + return + } + req := newQuery("{foo=\"bar\"}", from, from.Add(6*time.Millisecond), logproto.FORWARD) + streams, _, err := iter.ReadBatch(it, req.Limit) + _ = it.Close() + if err != nil { + t.Fatalf("error reading batch %s", err) + } + assertStream(t, tc.expected, streams.Streams) + }) + } +} + func TestDropLabels(t *testing.T) { for i, tc := range []struct { @@ -680,3 +808,36 @@ func TestDropLabels(t *testing.T) { }) } } + +func Test_IsInvalidChunkError(t *testing.T) { + tests := []struct { + name string + err error + expectedResult bool + }{ + { + "invalid chunk cheksum error from cortex", + promql.ErrStorage{Err: chunk.ErrInvalidChecksum}, + true, + }, + { + "invalid chunk cheksum error from loki", + promql.ErrStorage{Err: chunkenc.ErrInvalidChecksum}, + true, + }, + { + "cache error", + promql.ErrStorage{Err: errors.New("error fetching from cache")}, + false, + }, + { + "no error from cortex or loki", + nil, + false, + }, + } + for _, tc := range tests { + result := isInvalidChunkError(tc.err) + require.Equal(t, tc.expectedResult, result) + } +} diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index 33cd9105c1..03a2953301 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -342,12 +342,14 @@ func Test_store_LazyQuery(t *testing.T) { MaxChunkBatchSize: 10, }, } + ctx = user.InjectOrgID(context.Background(), "test-user") it, err := s.LazyQuery(ctx, logql.SelectParams{QueryRequest: tt.req}) if err != nil { t.Errorf("store.LazyQuery() error = %v", err) return } + streams, _, err := iter.ReadBatch(it, tt.req.Limit) _ = it.Close() if err != nil { diff --git a/pkg/storage/util_test.go b/pkg/storage/util_test.go index 4370b744df..08fcc67917 100644 --- a/pkg/storage/util_test.go +++ b/pkg/storage/util_test.go @@ -49,6 +49,15 @@ func assertStream(t *testing.T, expected, actual []*logproto.Stream) { func newLazyChunk(stream logproto.Stream) *chunkenc.LazyChunk { return &chunkenc.LazyChunk{ Fetcher: nil, + IsValid: true, + Chunk: newChunk(stream), + } +} + +func newLazyInvalidChunk(stream logproto.Stream) *chunkenc.LazyChunk { + return &chunkenc.LazyChunk{ + Fetcher: nil, + IsValid: false, Chunk: newChunk(stream), } } @@ -119,6 +128,7 @@ func newMockChunkStore(streams []*logproto.Stream) *mockChunkStore { } return &mockChunkStore{chunks: chunks, client: &mockChunkStoreClient{chunks: chunks}} } + func (m *mockChunkStore) Put(ctx context.Context, chunks []chunk.Chunk) error { return nil } func (m *mockChunkStore) PutOne(ctx context.Context, from, through model.Time, chunk chunk.Chunk) error { return nil