Log error message for invalid checksum (#1713)

pull/1981/head
Aditya C S 6 years ago committed by GitHub
parent 2374d4891b
commit c0a28cbf30
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      pkg/chunkenc/interface.go
  2. 1
      pkg/chunkenc/lazy_chunk.go
  3. 5
      pkg/chunkenc/memchunk.go
  4. 35
      pkg/storage/iterator.go
  5. 161
      pkg/storage/iterator_test.go
  6. 2
      pkg/storage/store_test.go
  7. 10
      pkg/storage/util_test.go

@ -18,7 +18,7 @@ var (
ErrOutOfOrder = errors.New("entry out of order")
ErrInvalidSize = errors.New("invalid size")
ErrInvalidFlag = errors.New("invalid flag")
ErrInvalidChecksum = errors.New("invalid checksum")
ErrInvalidChecksum = errors.New("invalid chunk checksum")
)
// Encoding is the identifier for a chunk encoding.

@ -15,6 +15,7 @@ import (
// LazyChunk loads the chunk when it is accessed.
type LazyChunk struct {
Chunk chunk.Chunk
IsValid bool
Fetcher *chunk.Fetcher
}

@ -11,6 +11,8 @@ import (
"io"
"time"
"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/grafana/loki/pkg/iter"
@ -226,7 +228,8 @@ func NewByteChunk(b []byte, blockSize, targetSize int) (*MemChunk, error) {
// Verify checksums.
expCRC := binary.BigEndian.Uint32(b[blk.offset+l:])
if expCRC != crc32.Checksum(blk.b, castagnoliTable) {
return bc, ErrInvalidChecksum
level.Error(util.Logger).Log("msg", "Checksum does not match for a block in chunk, this block will be skipped", "err", ErrInvalidChecksum)
continue
}
bc.blocks = append(bc.blocks, blk)

@ -9,8 +9,10 @@ import (
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/spanlogger"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/iter"
@ -199,7 +201,7 @@ func (it *batchChunkIterator) nextBatch() (iter.EntryIterator, error) {
// │ # 47 │
// └──────────────┘
// ┌──────────────────────────┐
// │ # 48
// │ # 48 |
// └──────────────────────────┘
// ┌──────────────┐
// │ # 49 │
@ -328,10 +330,12 @@ func buildHeapIterator(ctx context.Context, chks [][]*chunkenc.LazyChunk, filter
// __name__ is only used for upstream compatibility and is hardcoded within loki. Strip it from the return label set.
labels := dropLabels(chks[0][0].Chunk.Metric, labels.MetricName).String()
for i := range chks {
iterators := make([]iter.EntryIterator, 0, len(chks[i]))
for j := range chks[i] {
if !chks[i][j].IsValid {
continue
}
iterator, err := chks[i][j].Iterator(ctx, from, through, direction, filter)
if err != nil {
return nil, err
@ -388,7 +392,6 @@ func fetchLazyChunks(ctx context.Context, chunks []*chunkenc.LazyChunk) error {
errChan := make(chan error)
for fetcher, chunks := range chksByFetcher {
go func(fetcher *chunk.Fetcher, chunks []*chunkenc.LazyChunk) {
keys := make([]string, 0, len(chunks))
chks := make([]chunk.Chunk, 0, len(chunks))
index := make(map[string]*chunkenc.LazyChunk, len(chunks))
@ -403,8 +406,14 @@ func fetchLazyChunks(ctx context.Context, chunks []*chunkenc.LazyChunk) error {
}
chks, err := fetcher.FetchChunks(ctx, chks, keys)
if err != nil {
if isInvalidChunkError(err) {
level.Error(util.Logger).Log("msg", "checksum of chunks does not match", "err", chunk.ErrInvalidChecksum)
errChan <- nil
return
}
errChan <- err
return
}
// assign fetched chunk by key as FetchChunks doesn't guarantee the order.
for _, chk := range chks {
@ -421,7 +430,25 @@ func fetchLazyChunks(ctx context.Context, chunks []*chunkenc.LazyChunk) error {
lastErr = err
}
}
return lastErr
if lastErr != nil {
return lastErr
}
for _, c := range chunks {
if c.Chunk.Data != nil {
c.IsValid = true
}
}
return nil
}
func isInvalidChunkError(err error) bool {
err = errors.Cause(err)
if err, ok := err.(promql.ErrStorage); ok {
return err.Err == chunk.ErrInvalidChecksum || err.Err == chunkenc.ErrInvalidChecksum
}
return false
}
func loadFirstChunks(ctx context.Context, chks map[model.Fingerprint][][]*chunkenc.LazyChunk) error {

@ -6,8 +6,12 @@ import (
"testing"
"time"
"github.com/cortexproject/cortex/pkg/chunk"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/user"
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/iter"
@ -639,6 +643,130 @@ func TestPartitionOverlappingchunks(t *testing.T) {
}
}
func TestBuildHeapIterator(t *testing.T) {
var (
firstChunk = newLazyChunk(logproto.Stream{
Labels: "{foo=\"bar\"}",
Entries: []logproto.Entry{
{
Timestamp: from,
Line: "1",
},
{
Timestamp: from.Add(time.Millisecond),
Line: "2",
},
{
Timestamp: from.Add(2 * time.Millisecond),
Line: "3",
},
},
})
secondChunk = newLazyInvalidChunk(logproto.Stream{
Labels: "{foo=\"bar\"}",
Entries: []logproto.Entry{
{
Timestamp: from.Add(3 * time.Millisecond),
Line: "4",
},
{
Timestamp: from.Add(4 * time.Millisecond),
Line: "5",
},
},
})
thirdChunk = newLazyChunk(logproto.Stream{
Labels: "{foo=\"bar\"}",
Entries: []logproto.Entry{
{
Timestamp: from.Add(5 * time.Millisecond),
Line: "6",
},
},
})
)
for i, tc := range []struct {
input [][]*chunkenc.LazyChunk
expected []*logproto.Stream
}{
{
[][]*chunkenc.LazyChunk{
{firstChunk},
{thirdChunk},
},
[]*logproto.Stream{
{
Labels: "{foo=\"bar\"}",
Entries: []logproto.Entry{
{
Timestamp: from,
Line: "1",
},
{
Timestamp: from.Add(time.Millisecond),
Line: "2",
},
{
Timestamp: from.Add(2 * time.Millisecond),
Line: "3",
},
{
Timestamp: from.Add(5 * time.Millisecond),
Line: "6",
},
},
},
},
},
{
[][]*chunkenc.LazyChunk{
{secondChunk},
{firstChunk, thirdChunk},
},
[]*logproto.Stream{
{
Labels: "{foo=\"bar\"}",
Entries: []logproto.Entry{
{
Timestamp: from,
Line: "1",
},
{
Timestamp: from.Add(time.Millisecond),
Line: "2",
},
{
Timestamp: from.Add(2 * time.Millisecond),
Line: "3",
},
{
Timestamp: from.Add(5 * time.Millisecond),
Line: "6",
},
},
},
},
},
} {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
ctx = user.InjectOrgID(context.Background(), "test-user")
it, err := buildHeapIterator(ctx, tc.input, nil, logproto.FORWARD, from, from.Add(6*time.Millisecond))
if err != nil {
t.Errorf("buildHeapIterator error = %v", err)
return
}
req := newQuery("{foo=\"bar\"}", from, from.Add(6*time.Millisecond), logproto.FORWARD)
streams, _, err := iter.ReadBatch(it, req.Limit)
_ = it.Close()
if err != nil {
t.Fatalf("error reading batch %s", err)
}
assertStream(t, tc.expected, streams.Streams)
})
}
}
func TestDropLabels(t *testing.T) {
for i, tc := range []struct {
@ -680,3 +808,36 @@ func TestDropLabels(t *testing.T) {
})
}
}
func Test_IsInvalidChunkError(t *testing.T) {
tests := []struct {
name string
err error
expectedResult bool
}{
{
"invalid chunk cheksum error from cortex",
promql.ErrStorage{Err: chunk.ErrInvalidChecksum},
true,
},
{
"invalid chunk cheksum error from loki",
promql.ErrStorage{Err: chunkenc.ErrInvalidChecksum},
true,
},
{
"cache error",
promql.ErrStorage{Err: errors.New("error fetching from cache")},
false,
},
{
"no error from cortex or loki",
nil,
false,
},
}
for _, tc := range tests {
result := isInvalidChunkError(tc.err)
require.Equal(t, tc.expectedResult, result)
}
}

@ -342,12 +342,14 @@ func Test_store_LazyQuery(t *testing.T) {
MaxChunkBatchSize: 10,
},
}
ctx = user.InjectOrgID(context.Background(), "test-user")
it, err := s.LazyQuery(ctx, logql.SelectParams{QueryRequest: tt.req})
if err != nil {
t.Errorf("store.LazyQuery() error = %v", err)
return
}
streams, _, err := iter.ReadBatch(it, tt.req.Limit)
_ = it.Close()
if err != nil {

@ -49,6 +49,15 @@ func assertStream(t *testing.T, expected, actual []*logproto.Stream) {
func newLazyChunk(stream logproto.Stream) *chunkenc.LazyChunk {
return &chunkenc.LazyChunk{
Fetcher: nil,
IsValid: true,
Chunk: newChunk(stream),
}
}
func newLazyInvalidChunk(stream logproto.Stream) *chunkenc.LazyChunk {
return &chunkenc.LazyChunk{
Fetcher: nil,
IsValid: false,
Chunk: newChunk(stream),
}
}
@ -119,6 +128,7 @@ func newMockChunkStore(streams []*logproto.Stream) *mockChunkStore {
}
return &mockChunkStore{chunks: chunks, client: &mockChunkStoreClient{chunks: chunks}}
}
func (m *mockChunkStore) Put(ctx context.Context, chunks []chunk.Chunk) error { return nil }
func (m *mockChunkStore) PutOne(ctx context.Context, from, through model.Time, chunk chunk.Chunk) error {
return nil

Loading…
Cancel
Save