|
|
|
|
@ -560,7 +560,7 @@ type StringTuples interface { |
|
|
|
|
|
|
|
|
|
type indexReader struct { |
|
|
|
|
// The underlying byte slice holding the encoded series data.
|
|
|
|
|
b []byte |
|
|
|
|
b ByteSlice |
|
|
|
|
toc indexTOC |
|
|
|
|
|
|
|
|
|
// Close that releases the underlying resources of the byte slice.
|
|
|
|
|
@ -575,33 +575,62 @@ type indexReader struct { |
|
|
|
|
// prevents memory faults when applications work with read symbols after
|
|
|
|
|
// the block has been unmapped.
|
|
|
|
|
symbols map[uint32]string |
|
|
|
|
|
|
|
|
|
crc32 hash.Hash32 |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
var ( |
|
|
|
|
errInvalidSize = fmt.Errorf("invalid size") |
|
|
|
|
errInvalidFlag = fmt.Errorf("invalid flag") |
|
|
|
|
errInvalidSize = fmt.Errorf("invalid size") |
|
|
|
|
errInvalidFlag = fmt.Errorf("invalid flag") |
|
|
|
|
errInvalidChecksum = fmt.Errorf("invalid checksum") |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
// NewIndexReader returns a new IndexReader on the given directory.
|
|
|
|
|
func NewIndexReader(dir string) (IndexReader, error) { return newIndexReader(dir) } |
|
|
|
|
// ByteSlice abstracts a byte slice.
|
|
|
|
|
type ByteSlice interface { |
|
|
|
|
Len() int |
|
|
|
|
Range(start, end int) []byte |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
type realByteSlice []byte |
|
|
|
|
|
|
|
|
|
func (b realByteSlice) Len() int { |
|
|
|
|
return len(b) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (b realByteSlice) Range(start, end int) []byte { |
|
|
|
|
return b[start:end] |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (b realByteSlice) Sub(start, end int) ByteSlice { |
|
|
|
|
return b[start:end] |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// NewIndexReader returns a new IndexReader on the given byte slice.
|
|
|
|
|
func NewIndexReader(b ByteSlice) (IndexReader, error) { |
|
|
|
|
return newIndexReader(b, nil) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// newIndexReader returns a new indexReader on the given directory.
|
|
|
|
|
func newIndexReader(dir string) (*indexReader, error) { |
|
|
|
|
f, err := openMmapFile(filepath.Join(dir, "index")) |
|
|
|
|
// NewFileIndexReader returns a new index reader against the given index file.
|
|
|
|
|
func NewFileIndexReader(path string) (IndexReader, error) { |
|
|
|
|
f, err := openMmapFile(path) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
return newIndexReader(realByteSlice(f.b), f) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func newIndexReader(b ByteSlice, c io.Closer) (*indexReader, error) { |
|
|
|
|
r := &indexReader{ |
|
|
|
|
b: f.b, |
|
|
|
|
c: f, |
|
|
|
|
b: b, |
|
|
|
|
c: c, |
|
|
|
|
symbols: map[uint32]string{}, |
|
|
|
|
crc32: newCRC32(), |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Verify magic number.
|
|
|
|
|
if len(f.b) < 4 { |
|
|
|
|
if b.Len() < 4 { |
|
|
|
|
return nil, errors.Wrap(errInvalidSize, "index header") |
|
|
|
|
} |
|
|
|
|
if m := binary.BigEndian.Uint32(r.b[:4]); m != MagicIndex { |
|
|
|
|
if m := binary.BigEndian.Uint32(r.b.Range(0, 4)); m != MagicIndex { |
|
|
|
|
return nil, errors.Errorf("invalid magic number %x", m) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@ -611,6 +640,7 @@ func newIndexReader(dir string) (*indexReader, error) { |
|
|
|
|
if err := r.readSymbols(int(r.toc.symbols)); err != nil { |
|
|
|
|
return nil, errors.Wrap(err, "read symbols") |
|
|
|
|
} |
|
|
|
|
var err error |
|
|
|
|
|
|
|
|
|
r.labels, err = r.readOffsetTable(r.toc.labelIndicesTable) |
|
|
|
|
if err != nil { |
|
|
|
|
@ -621,7 +651,17 @@ func newIndexReader(dir string) (*indexReader, error) { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (r *indexReader) readTOC() error { |
|
|
|
|
d := r.decbufAt(len(r.b) - indexTOCLen) |
|
|
|
|
if r.b.Len() < indexTOCLen { |
|
|
|
|
return errInvalidSize |
|
|
|
|
} |
|
|
|
|
b := r.b.Range(r.b.Len()-indexTOCLen, r.b.Len()) |
|
|
|
|
|
|
|
|
|
expCRC := binary.BigEndian.Uint32(b[len(b)-4:]) |
|
|
|
|
d := decbuf{b: b[:len(b)-4]} |
|
|
|
|
|
|
|
|
|
if d.crc32() != expCRC { |
|
|
|
|
return errInvalidChecksum |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
r.toc.symbols = d.be64() |
|
|
|
|
r.toc.series = d.be64() |
|
|
|
|
@ -630,16 +670,61 @@ func (r *indexReader) readTOC() error { |
|
|
|
|
r.toc.postings = d.be64() |
|
|
|
|
r.toc.postingsTable = d.be64() |
|
|
|
|
|
|
|
|
|
// TODO(fabxc): validate checksum.
|
|
|
|
|
|
|
|
|
|
return nil |
|
|
|
|
return d.err() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// decbufAt returns a new decoding buffer. It expects the first 4 bytes
|
|
|
|
|
// after offset to hold the big endian encoded content length, followed by the contents and the expected
|
|
|
|
|
// checksum.
|
|
|
|
|
func (r *indexReader) decbufAt(off int) decbuf { |
|
|
|
|
if len(r.b) < off { |
|
|
|
|
if r.b.Len() < off+4 { |
|
|
|
|
return decbuf{e: errInvalidSize} |
|
|
|
|
} |
|
|
|
|
b := r.b.Range(off, off+4) |
|
|
|
|
l := int(binary.BigEndian.Uint32(b)) |
|
|
|
|
|
|
|
|
|
if r.b.Len() < off+4+l+4 { |
|
|
|
|
return decbuf{e: errInvalidSize} |
|
|
|
|
} |
|
|
|
|
return decbuf{b: r.b[off:]} |
|
|
|
|
|
|
|
|
|
// Load bytes holding the contents plus a CRC32 checksum.
|
|
|
|
|
b = r.b.Range(off+4, off+4+l+4) |
|
|
|
|
dec := decbuf{b: b[:len(b)-4]} |
|
|
|
|
|
|
|
|
|
if exp := binary.BigEndian.Uint32(b[len(b)-4:]); dec.crc32() != exp { |
|
|
|
|
return decbuf{e: errInvalidChecksum} |
|
|
|
|
} |
|
|
|
|
return dec |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// decbufUvarintAt returns a new decoding buffer. It expects the first bytes
|
|
|
|
|
// after offset to hold the uvarint-encoded buffers length, followed by the contents and the expected
|
|
|
|
|
// checksum.
|
|
|
|
|
func (r *indexReader) decbufUvarintAt(off int) decbuf { |
|
|
|
|
// We never have to access this method at the far end of the byte slice. Thus just checking
|
|
|
|
|
// against the MaxVarintLen32 is sufficient.
|
|
|
|
|
if r.b.Len() < off+binary.MaxVarintLen32 { |
|
|
|
|
return decbuf{e: errInvalidSize} |
|
|
|
|
} |
|
|
|
|
b := r.b.Range(off, off+binary.MaxVarintLen32) |
|
|
|
|
|
|
|
|
|
l, n := binary.Uvarint(b) |
|
|
|
|
if n > binary.MaxVarintLen32 { |
|
|
|
|
return decbuf{e: errors.New("invalid uvarint")} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if r.b.Len() < off+n+int(l)+4 { |
|
|
|
|
return decbuf{e: errInvalidSize} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Load bytes holding the contents plus a CRC32 checksum.
|
|
|
|
|
b = r.b.Range(off+n, off+n+int(l)+4) |
|
|
|
|
dec := decbuf{b: b[:len(b)-4]} |
|
|
|
|
|
|
|
|
|
if dec.crc32() != binary.BigEndian.Uint32(b[len(b)-4:]) { |
|
|
|
|
return decbuf{e: errInvalidChecksum} |
|
|
|
|
} |
|
|
|
|
return dec |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// readSymbols reads the symbol table fully into memory and allocates proper strings for them.
|
|
|
|
|
@ -649,22 +734,22 @@ func (r *indexReader) readSymbols(off int) error { |
|
|
|
|
if off == 0 { |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
d := r.decbufAt(off) |
|
|
|
|
|
|
|
|
|
var ( |
|
|
|
|
d1 = r.decbufAt(int(off)) |
|
|
|
|
d2 = d1.decbuf(d1.be32int()) |
|
|
|
|
origLen = d2.len() |
|
|
|
|
cnt = d2.be32int() |
|
|
|
|
origLen = d.len() |
|
|
|
|
cnt = d.be32int() |
|
|
|
|
basePos = uint32(off) + 4 |
|
|
|
|
nextPos = basePos + uint32(origLen-d2.len()) |
|
|
|
|
nextPos = basePos + uint32(origLen-d.len()) |
|
|
|
|
) |
|
|
|
|
for d2.err() == nil && d2.len() > 0 && cnt > 0 { |
|
|
|
|
s := d2.uvarintStr() |
|
|
|
|
for d.err() == nil && d.len() > 0 && cnt > 0 { |
|
|
|
|
s := d.uvarintStr() |
|
|
|
|
r.symbols[uint32(nextPos)] = s |
|
|
|
|
|
|
|
|
|
nextPos = basePos + uint32(origLen-d2.len()) |
|
|
|
|
nextPos = basePos + uint32(origLen-d.len()) |
|
|
|
|
cnt-- |
|
|
|
|
} |
|
|
|
|
return d2.err() |
|
|
|
|
return d.err() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// readOffsetTable reads an offset table at the given position and returns a map
|
|
|
|
|
@ -672,53 +757,29 @@ func (r *indexReader) readSymbols(off int) error { |
|
|
|
|
func (r *indexReader) readOffsetTable(off uint64) (map[string]uint32, error) { |
|
|
|
|
const sep = "\xff" |
|
|
|
|
|
|
|
|
|
var ( |
|
|
|
|
d1 = r.decbufAt(int(off)) |
|
|
|
|
d2 = d1.decbuf(d1.be32int()) |
|
|
|
|
cnt = d2.be32() |
|
|
|
|
) |
|
|
|
|
d := r.decbufAt(int(off)) |
|
|
|
|
cnt := d.be32() |
|
|
|
|
|
|
|
|
|
res := make(map[string]uint32, 512) |
|
|
|
|
res := make(map[string]uint32, cnt) |
|
|
|
|
|
|
|
|
|
for d2.err() == nil && d2.len() > 0 && cnt > 0 { |
|
|
|
|
keyCount := int(d2.uvarint()) |
|
|
|
|
for d.err() == nil && d.len() > 0 && cnt > 0 { |
|
|
|
|
keyCount := int(d.uvarint()) |
|
|
|
|
keys := make([]string, 0, keyCount) |
|
|
|
|
|
|
|
|
|
for i := 0; i < keyCount; i++ { |
|
|
|
|
keys = append(keys, d2.uvarintStr()) |
|
|
|
|
keys = append(keys, d.uvarintStr()) |
|
|
|
|
} |
|
|
|
|
res[strings.Join(keys, sep)] = uint32(d2.uvarint()) |
|
|
|
|
res[strings.Join(keys, sep)] = uint32(d.uvarint()) |
|
|
|
|
|
|
|
|
|
cnt-- |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// TODO(fabxc): verify checksum from remainer of d1.
|
|
|
|
|
return res, d2.err() |
|
|
|
|
return res, d.err() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (r *indexReader) Close() error { |
|
|
|
|
return r.c.Close() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (r *indexReader) section(o uint32) (byte, []byte, error) { |
|
|
|
|
b := r.b[o:] |
|
|
|
|
|
|
|
|
|
if len(b) < 5 { |
|
|
|
|
return 0, nil, errors.Wrap(errInvalidSize, "read header") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
flag := b[0] |
|
|
|
|
l := binary.BigEndian.Uint32(b[1:5]) |
|
|
|
|
|
|
|
|
|
b = b[5:] |
|
|
|
|
|
|
|
|
|
// b must have the given length plus 4 bytes for the CRC32 checksum.
|
|
|
|
|
if len(b) < int(l)+4 { |
|
|
|
|
return 0, nil, errors.Wrap(errInvalidSize, "section content") |
|
|
|
|
} |
|
|
|
|
return flag, b[:l], nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (r *indexReader) lookupSymbol(o uint32) (string, error) { |
|
|
|
|
s, ok := r.symbols[o] |
|
|
|
|
if !ok { |
|
|
|
|
@ -748,21 +809,17 @@ func (r *indexReader) LabelValues(names ...string) (StringTuples, error) { |
|
|
|
|
//return nil, fmt.Errorf("label index doesn't exist")
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
d1 := r.decbufAt(int(off)) |
|
|
|
|
d2 := d1.decbuf(d1.be32int()) |
|
|
|
|
d := r.decbufAt(int(off)) |
|
|
|
|
|
|
|
|
|
nc := d2.be32int() |
|
|
|
|
d2.be32() // consume unused value entry count.
|
|
|
|
|
nc := d.be32int() |
|
|
|
|
d.be32() // consume unused value entry count.
|
|
|
|
|
|
|
|
|
|
if d2.err() != nil { |
|
|
|
|
return nil, errors.Wrap(d2.err(), "read label value index") |
|
|
|
|
if d.err() != nil { |
|
|
|
|
return nil, errors.Wrap(d.err(), "read label value index") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// TODO(fabxc): verify checksum in 4 remaining bytes of d1.
|
|
|
|
|
|
|
|
|
|
st := &serializedStringTuples{ |
|
|
|
|
l: nc, |
|
|
|
|
b: d2.get(), |
|
|
|
|
b: d.get(), |
|
|
|
|
lookup: r.lookupSymbol, |
|
|
|
|
} |
|
|
|
|
return st, nil |
|
|
|
|
@ -785,20 +842,19 @@ func (r *indexReader) LabelIndices() ([][]string, error) { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta) error { |
|
|
|
|
d1 := r.decbufAt(int(ref)) |
|
|
|
|
d2 := d1.decbuf(int(d1.uvarint())) |
|
|
|
|
d := r.decbufUvarintAt(int(ref)) |
|
|
|
|
|
|
|
|
|
*lbls = (*lbls)[:0] |
|
|
|
|
*chks = (*chks)[:0] |
|
|
|
|
|
|
|
|
|
k := int(d2.uvarint()) |
|
|
|
|
k := int(d.uvarint()) |
|
|
|
|
|
|
|
|
|
for i := 0; i < k; i++ { |
|
|
|
|
lno := uint32(d2.uvarint()) |
|
|
|
|
lvo := uint32(d2.uvarint()) |
|
|
|
|
lno := uint32(d.uvarint()) |
|
|
|
|
lvo := uint32(d.uvarint()) |
|
|
|
|
|
|
|
|
|
if d2.err() != nil { |
|
|
|
|
return errors.Wrap(d2.err(), "read series label offsets") |
|
|
|
|
if d.err() != nil { |
|
|
|
|
return errors.Wrap(d.err(), "read series label offsets") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
ln, err := r.lookupSymbol(lno) |
|
|
|
|
@ -814,15 +870,15 @@ func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Read the chunks meta data.
|
|
|
|
|
k = int(d2.uvarint()) |
|
|
|
|
k = int(d.uvarint()) |
|
|
|
|
|
|
|
|
|
if k == 0 { |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
t0 := d2.varint64() |
|
|
|
|
maxt := int64(d2.uvarint64()) + t0 |
|
|
|
|
ref0 := int64(d2.uvarint64()) |
|
|
|
|
t0 := d.varint64() |
|
|
|
|
maxt := int64(d.uvarint64()) + t0 |
|
|
|
|
ref0 := int64(d.uvarint64()) |
|
|
|
|
|
|
|
|
|
*chks = append(*chks, ChunkMeta{ |
|
|
|
|
Ref: uint64(ref0), |
|
|
|
|
@ -832,14 +888,14 @@ func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta) |
|
|
|
|
t0 = maxt |
|
|
|
|
|
|
|
|
|
for i := 1; i < k; i++ { |
|
|
|
|
mint := int64(d2.uvarint64()) + t0 |
|
|
|
|
maxt := int64(d2.uvarint64()) + mint |
|
|
|
|
mint := int64(d.uvarint64()) + t0 |
|
|
|
|
maxt := int64(d.uvarint64()) + mint |
|
|
|
|
|
|
|
|
|
ref0 += d2.varint64() |
|
|
|
|
ref0 += d.varint64() |
|
|
|
|
t0 = maxt |
|
|
|
|
|
|
|
|
|
if d2.err() != nil { |
|
|
|
|
return errors.Wrapf(d2.err(), "read meta for chunk %d", i) |
|
|
|
|
if d.err() != nil { |
|
|
|
|
return errors.Wrapf(d.err(), "read meta for chunk %d", i) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
*chks = append(*chks, ChunkMeta{ |
|
|
|
|
@ -848,10 +904,7 @@ func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta) |
|
|
|
|
MaxTime: maxt, |
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// TODO(fabxc): verify CRC32.
|
|
|
|
|
|
|
|
|
|
return nil |
|
|
|
|
return d.err() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (r *indexReader) Postings(name, value string) (Postings, error) { |
|
|
|
|
@ -862,19 +915,10 @@ func (r *indexReader) Postings(name, value string) (Postings, error) { |
|
|
|
|
if !ok { |
|
|
|
|
return emptyPostings, nil |
|
|
|
|
} |
|
|
|
|
d := r.decbufAt(int(off)) |
|
|
|
|
d.be32() // consume unused postings list length.
|
|
|
|
|
|
|
|
|
|
d1 := r.decbufAt(int(off)) |
|
|
|
|
d2 := d1.decbuf(d1.be32int()) |
|
|
|
|
|
|
|
|
|
d2.be32() // consume unused postings list length.
|
|
|
|
|
|
|
|
|
|
if d2.err() != nil { |
|
|
|
|
return nil, errors.Wrap(d2.err(), "get postings bytes") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// TODO(fabxc): read checksum from 4 remainer bytes of d1 and verify.
|
|
|
|
|
|
|
|
|
|
return newBigEndianPostings(d2.get()), nil |
|
|
|
|
return newBigEndianPostings(d.get()), errors.Wrap(d.err(), "get postings bytes") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (r *indexReader) SortedPostings(p Postings) Postings { |
|
|
|
|
|