diff --git a/block.go b/block.go index 2bd1fd3648..2d7bab2d8b 100644 --- a/block.go +++ b/block.go @@ -142,8 +142,8 @@ type Block struct { dir string meta BlockMeta - chunkr *chunkReader - indexr *indexReader + chunkr ChunkReader + indexr IndexReader tombstones tombstoneReader } @@ -160,7 +160,7 @@ func OpenBlock(dir string, pool chunks.Pool) (*Block, error) { if err != nil { return nil, err } - ir, err := newIndexReader(dir) + ir, err := NewFileIndexReader(filepath.Join(dir, "index")) if err != nil { return nil, err } diff --git a/index.go b/index.go index 36783ced29..277807e21e 100644 --- a/index.go +++ b/index.go @@ -560,7 +560,7 @@ type StringTuples interface { type indexReader struct { // The underlying byte slice holding the encoded series data. - b []byte + b ByteSlice toc indexTOC // Close that releases the underlying resources of the byte slice. @@ -585,27 +585,52 @@ var ( errInvalidChecksum = fmt.Errorf("invalid checksum") ) +// ByteSlice abstracts a byte slice. +type ByteSlice interface { + Len() int + Range(start, end int) []byte +} + +type realByteSlice []byte + +func (b realByteSlice) Len() int { + return len(b) +} + +func (b realByteSlice) Range(start, end int) []byte { + return b[start:end] +} + +func (b realByteSlice) Sub(start, end int) ByteSlice { + return b[start:end] +} + // NewIndexReader returns a new IndexReader on the given directory. -func NewIndexReader(dir string) (IndexReader, error) { return newIndexReader(dir) } +func NewIndexReader(b ByteSlice) (IndexReader, error) { + return newIndexReader(b, nil) +} -// newIndexReader returns a new indexReader on the given directory. -func newIndexReader(dir string) (*indexReader, error) { - f, err := openMmapFile(filepath.Join(dir, "index")) +func NewFileIndexReader(path string) (IndexReader, error) { + f, err := openMmapFile(path) if err != nil { return nil, err } + return newIndexReader(realByteSlice(f.b), f) +} + +// newIndexReader returns a new indexReader on the given directory. +func newIndexReader(b ByteSlice, c io.Closer) (*indexReader, error) { r := &indexReader{ - b: f.b, - c: f, + b: b, + c: c, symbols: map[uint32]string{}, crc32: newCRC32(), } - // Verify magic number. - if len(f.b) < 4 { + if b.Len() < 4 { return nil, errors.Wrap(errInvalidSize, "index header") } - if m := binary.BigEndian.Uint32(r.b[:4]); m != MagicIndex { + if m := binary.BigEndian.Uint32(r.b.Range(0, 4)); m != MagicIndex { return nil, errors.Errorf("invalid magic number %x", m) } @@ -615,6 +640,7 @@ func newIndexReader(dir string) (*indexReader, error) { if err := r.readSymbols(int(r.toc.symbols)); err != nil { return nil, errors.Wrap(err, "read symbols") } + var err error r.labels, err = r.readOffsetTable(r.toc.labelIndicesTable) if err != nil { @@ -625,31 +651,80 @@ func newIndexReader(dir string) (*indexReader, error) { } func (r *indexReader) readTOC() error { - d1 := r.decbufAt(len(r.b) - indexTOCLen) - d2 := d1.decbuf(indexTOCLen - 4) - crc := d2.crc32() + if r.b.Len() < indexTOCLen { + return errInvalidSize + } + b := r.b.Range(r.b.Len()-indexTOCLen, r.b.Len()) - r.toc.symbols = d2.be64() - r.toc.series = d2.be64() - r.toc.labelIndices = d2.be64() - r.toc.labelIndicesTable = d2.be64() - r.toc.postings = d2.be64() - r.toc.postingsTable = d2.be64() + expCRC := binary.BigEndian.Uint32(b[len(b)-4:]) + d := decbuf{b: b[:len(b)-4]} - if d2.err() != nil { - return d2.err() + if d.crc32() != expCRC { + return (errInvalidChecksum) } - if read := d1.be32(); crc != read { - return errors.Wrap(errInvalidChecksum, "read TOC") - } - return d1.err() + + r.toc.symbols = d.be64() + r.toc.series = d.be64() + r.toc.labelIndices = d.be64() + r.toc.labelIndicesTable = d.be64() + r.toc.postings = d.be64() + r.toc.postingsTable = d.be64() + + return d.err() } +// decbufAt returns a new decoding buffer. It expects the first 4 bytes +// after offset to hold the big endian encoded content length, followed by the contents and the expected +// checksum. func (r *indexReader) decbufAt(off int) decbuf { - if len(r.b) < off { + if r.b.Len() < off+4 { + return decbuf{e: errInvalidSize} + } + b := r.b.Range(off, off+4) + l := int(binary.BigEndian.Uint32(b)) + + if r.b.Len() < off+4+l+4 { + return decbuf{e: errInvalidSize} + } + + // Load bytes holding the contents plus a CRC32 checksum. + b = r.b.Range(off+4, off+4+l+4) + dec := decbuf{b: b[:len(b)-4]} + + if exp := binary.BigEndian.Uint32(b[len(b)-4:]); dec.crc32() != exp { + return decbuf{e: errInvalidChecksum} + } + return dec +} + +// decbufAt returns a new decoding buffer. It expects the first bytes +// after offset to hold the uvarint-encoded buffers length, followed by the contents and the expected +// checksum. +func (r *indexReader) decbufUvarintAt(off int) decbuf { + // We never have to access this method at the far end of the byte slice. Thus just checking + // against the MaxVarintLen32 is sufficient. + if r.b.Len() < off+binary.MaxVarintLen32 { return decbuf{e: errInvalidSize} } - return decbuf{b: r.b[off:]} + b := r.b.Range(off, off+binary.MaxVarintLen32) + + l, n := binary.Uvarint(b) + if n > binary.MaxVarintLen32 { + return decbuf{e: errors.New("invalid uvarint")} + } + + if r.b.Len() < off+n+int(l)+4 { + return decbuf{e: errInvalidSize} + } + + // Load bytes holding the contents plus a CRC32 checksum. + b = r.b.Range(off+n, off+n+int(l)+4) + dec := decbuf{b: b[:len(b)-4]} + + if dec.crc32() != binary.BigEndian.Uint32(b[len(b)-4:]) { + return decbuf{e: errInvalidChecksum} + } + return dec } // readSymbols reads the symbol table fully into memory and allocates proper strings for them. @@ -659,26 +734,22 @@ func (r *indexReader) readSymbols(off int) error { if off == 0 { return nil } + d := r.decbufAt(off) + var ( - d1 = r.decbufAt(int(off)) - d2 = d1.decbuf(d1.be32int()) - crc = d2.crc32() - origLen = d2.len() - cnt = d2.be32int() + origLen = d.len() + cnt = d.be32int() basePos = uint32(off) + 4 - nextPos = basePos + uint32(origLen-d2.len()) + nextPos = basePos + uint32(origLen-d.len()) ) - for d2.err() == nil && d2.len() > 0 && cnt > 0 { - s := d2.uvarintStr() + for d.err() == nil && d.len() > 0 && cnt > 0 { + s := d.uvarintStr() r.symbols[uint32(nextPos)] = s - nextPos = basePos + uint32(origLen-d2.len()) + nextPos = basePos + uint32(origLen-d.len()) cnt-- } - if read := d1.be32(); crc != read { - return errors.Wrap(errInvalidChecksum, "read symbols") - } - return d2.err() + return d.err() } // readOffsetTable reads an offset table at the given position and returns a map @@ -686,55 +757,29 @@ func (r *indexReader) readSymbols(off int) error { func (r *indexReader) readOffsetTable(off uint64) (map[string]uint32, error) { const sep = "\xff" - var ( - d1 = r.decbufAt(int(off)) - d2 = d1.decbuf(d1.be32int()) - crc = d2.crc32() - cnt = d2.be32() - ) + d := r.decbufAt(int(off)) + cnt := d.be32() - res := make(map[string]uint32, 512) + res := make(map[string]uint32, cnt) - for d2.err() == nil && d2.len() > 0 && cnt > 0 { - keyCount := int(d2.uvarint()) + for d.err() == nil && d.len() > 0 && cnt > 0 { + keyCount := int(d.uvarint()) keys := make([]string, 0, keyCount) for i := 0; i < keyCount; i++ { - keys = append(keys, d2.uvarintStr()) + keys = append(keys, d.uvarintStr()) } - res[strings.Join(keys, sep)] = uint32(d2.uvarint()) + res[strings.Join(keys, sep)] = uint32(d.uvarint()) cnt-- } - if read := d1.be32(); crc != read { - return nil, errors.Wrap(errInvalidChecksum, "read offset table") - } - return res, d2.err() + return res, d.err() } func (r *indexReader) Close() error { return r.c.Close() } -func (r *indexReader) section(o uint32) (byte, []byte, error) { - b := r.b[o:] - - if len(b) < 5 { - return 0, nil, errors.Wrap(errInvalidSize, "read header") - } - - flag := b[0] - l := binary.BigEndian.Uint32(b[1:5]) - - b = b[5:] - - // b must have the given length plus 4 bytes for the CRC32 checksum. - if len(b) < int(l)+4 { - return 0, nil, errors.Wrap(errInvalidSize, "section content") - } - return flag, b[:l], nil -} - func (r *indexReader) lookupSymbol(o uint32) (string, error) { s, ok := r.symbols[o] if !ok { @@ -764,23 +809,17 @@ func (r *indexReader) LabelValues(names ...string) (StringTuples, error) { //return nil, fmt.Errorf("label index doesn't exist") } - d1 := r.decbufAt(int(off)) - d2 := d1.decbuf(d1.be32int()) - crc := d2.crc32() + d := r.decbufAt(int(off)) - nc := d2.be32int() - d2.be32() // consume unused value entry count. - - if d2.err() != nil { - return nil, errors.Wrap(d2.err(), "read label value index") - } + nc := d.be32int() + d.be32() // consume unused value entry count. - if read := d1.be32(); crc != read { - return nil, errors.Wrap(errInvalidChecksum, "read label values") + if d.err() != nil { + return nil, errors.Wrap(d.err(), "read label value index") } st := &serializedStringTuples{ l: nc, - b: d2.get(), + b: d.get(), lookup: r.lookupSymbol, } return st, nil @@ -803,21 +842,19 @@ func (r *indexReader) LabelIndices() ([][]string, error) { } func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta) error { - d1 := r.decbufAt(int(ref)) - d2 := d1.decbuf(d1.uvarint()) - crc := d2.crc32() + d := r.decbufUvarintAt(int(ref)) *lbls = (*lbls)[:0] *chks = (*chks)[:0] - k := int(d2.uvarint()) + k := int(d.uvarint()) for i := 0; i < k; i++ { - lno := uint32(d2.uvarint()) - lvo := uint32(d2.uvarint()) + lno := uint32(d.uvarint()) + lvo := uint32(d.uvarint()) - if d2.err() != nil { - return errors.Wrap(d2.err(), "read series label offsets") + if d.err() != nil { + return errors.Wrap(d.err(), "read series label offsets") } ln, err := r.lookupSymbol(lno) @@ -833,15 +870,15 @@ func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta) } // Read the chunks meta data. - k = int(d2.uvarint()) + k = int(d.uvarint()) if k == 0 { return nil } - t0 := d2.varint64() - maxt := int64(d2.uvarint64()) + t0 - ref0 := int64(d2.uvarint64()) + t0 := d.varint64() + maxt := int64(d.uvarint64()) + t0 + ref0 := int64(d.uvarint64()) *chks = append(*chks, ChunkMeta{ Ref: uint64(ref0), @@ -851,14 +888,14 @@ func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta) t0 = maxt for i := 1; i < k; i++ { - mint := int64(d2.uvarint64()) + t0 - maxt := int64(d2.uvarint64()) + mint + mint := int64(d.uvarint64()) + t0 + maxt := int64(d.uvarint64()) + mint - ref0 += d2.varint64() + ref0 += d.varint64() t0 = maxt - if d2.err() != nil { - return errors.Wrapf(d2.err(), "read meta for chunk %d", i) + if d.err() != nil { + return errors.Wrapf(d.err(), "read meta for chunk %d", i) } *chks = append(*chks, ChunkMeta{ @@ -867,10 +904,7 @@ func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta) MaxTime: maxt, }) } - if read := d1.be32(); crc != read { - return errors.Wrap(errInvalidChecksum, "read series") - } - return nil + return d.err() } func (r *indexReader) Postings(name, value string) (Postings, error) { @@ -881,21 +915,10 @@ func (r *indexReader) Postings(name, value string) (Postings, error) { if !ok { return emptyPostings, nil } + d := r.decbufAt(int(off)) + d.be32() // consume unused postings list length. - d1 := r.decbufAt(int(off)) - d2 := d1.decbuf(d1.be32int()) - - crc := d2.crc32() - - d2.be32() // consume unused postings list length. - - if d2.err() != nil { - return nil, errors.Wrap(d2.err(), "get postings bytes") - } - if read := d1.be32(); crc != read { - return nil, errors.Wrap(errInvalidChecksum, "read postings") - } - return newBigEndianPostings(d2.get()), nil + return newBigEndianPostings(d.get()), errors.Wrap(d.err(), "get postings bytes") } func (r *indexReader) SortedPostings(p Postings) Postings { diff --git a/index_test.go b/index_test.go index 7b908e39af..f18aba0ebe 100644 --- a/index_test.go +++ b/index_test.go @@ -159,7 +159,7 @@ func TestIndexRW_Create_Open(t *testing.T) { require.NoError(t, err, "create index writer") require.NoError(t, iw.Close(), "close index writer") - ir, err := newIndexReader(dir) + ir, err := NewFileIndexReader(filepath.Join(dir, "index")) require.NoError(t, err, "open index reader") require.NoError(t, ir.Close(), "close index reader") @@ -169,7 +169,7 @@ func TestIndexRW_Create_Open(t *testing.T) { _, err = f.WriteAt([]byte{0, 0}, 0) require.NoError(t, err) - _, err = newIndexReader(dir) + _, err = NewFileIndexReader(dir) require.Error(t, err) } @@ -210,7 +210,7 @@ func TestIndexRW_Postings(t *testing.T) { require.NoError(t, iw.Close()) - ir, err := newIndexReader(dir) + ir, err := NewFileIndexReader(filepath.Join(dir, "index")) require.NoError(t, err, "open index reader") p, err := ir.Postings("a", "1") @@ -325,7 +325,7 @@ func TestPersistence_index_e2e(t *testing.T) { err = iw.Close() require.NoError(t, err) - ir, err := newIndexReader(dir) + ir, err := NewFileIndexReader(filepath.Join(dir, "index")) require.NoError(t, err) for p := range mi.postings.m {