|
|
|
|
@ -11,13 +11,14 @@ |
|
|
|
|
// See the License for the specific language governing permissions and
|
|
|
|
|
// limitations under the License.
|
|
|
|
|
|
|
|
|
|
package tsdb |
|
|
|
|
package index |
|
|
|
|
|
|
|
|
|
import ( |
|
|
|
|
"bufio" |
|
|
|
|
"encoding/binary" |
|
|
|
|
"fmt" |
|
|
|
|
"hash" |
|
|
|
|
"hash/crc32" |
|
|
|
|
"io" |
|
|
|
|
"math" |
|
|
|
|
"os" |
|
|
|
|
@ -26,6 +27,7 @@ import ( |
|
|
|
|
"strings" |
|
|
|
|
|
|
|
|
|
"github.com/pkg/errors" |
|
|
|
|
"github.com/prometheus/tsdb/chunks" |
|
|
|
|
"github.com/prometheus/tsdb/fileutil" |
|
|
|
|
"github.com/prometheus/tsdb/labels" |
|
|
|
|
) |
|
|
|
|
@ -35,18 +37,12 @@ const ( |
|
|
|
|
MagicIndex = 0xBAAAD700 |
|
|
|
|
|
|
|
|
|
indexFormatV1 = 1 |
|
|
|
|
|
|
|
|
|
size_unit = 4 |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
const indexFilename = "index" |
|
|
|
|
|
|
|
|
|
const compactionPageBytes = minSectorSize * 64 |
|
|
|
|
|
|
|
|
|
type indexWriterSeries struct { |
|
|
|
|
labels labels.Labels |
|
|
|
|
chunks []ChunkMeta // series file offset of chunks
|
|
|
|
|
offset uint32 // index file offset of series reference
|
|
|
|
|
chunks []chunks.Meta // series file offset of chunks
|
|
|
|
|
offset uint32 // index file offset of series reference
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
type indexWriterSeriesSlice []*indexWriterSeries |
|
|
|
|
@ -87,37 +83,24 @@ func (s indexWriterStage) String() string { |
|
|
|
|
return "<unknown>" |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// IndexWriter serializes the index for a block of series data.
|
|
|
|
|
// The methods must be called in the order they are specified in.
|
|
|
|
|
type IndexWriter interface { |
|
|
|
|
// AddSymbols registers all string symbols that are encountered in series
|
|
|
|
|
// and other indices.
|
|
|
|
|
AddSymbols(sym map[string]struct{}) error |
|
|
|
|
// The table gets initialized with sync.Once but may still cause a race
|
|
|
|
|
// with any other use of the crc32 package anywhere. Thus we initialize it
|
|
|
|
|
// before.
|
|
|
|
|
var castagnoliTable *crc32.Table |
|
|
|
|
|
|
|
|
|
// AddSeries populates the index writer with a series and its offsets
|
|
|
|
|
// of chunks that the index can reference.
|
|
|
|
|
// Implementations may require series to be insert in increasing order by
|
|
|
|
|
// their labels.
|
|
|
|
|
// The reference numbers are used to resolve entries in postings lists that
|
|
|
|
|
// are added later.
|
|
|
|
|
AddSeries(ref uint64, l labels.Labels, chunks ...ChunkMeta) error |
|
|
|
|
|
|
|
|
|
// WriteLabelIndex serializes an index from label names to values.
|
|
|
|
|
// The passed in values chained tuples of strings of the length of names.
|
|
|
|
|
WriteLabelIndex(names []string, values []string) error |
|
|
|
|
|
|
|
|
|
// WritePostings writes a postings list for a single label pair.
|
|
|
|
|
// The Postings here contain refs to the series that were added.
|
|
|
|
|
WritePostings(name, value string, it Postings) error |
|
|
|
|
func init() { |
|
|
|
|
castagnoliTable = crc32.MakeTable(crc32.Castagnoli) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Close writes any finalization and closes the resources associated with
|
|
|
|
|
// the underlying writer.
|
|
|
|
|
Close() error |
|
|
|
|
// newCRC32 initializes a CRC32 hash with a preconfigured polynomial, so the
|
|
|
|
|
// polynomial may be easily changed in one location at a later time, if necessary.
|
|
|
|
|
func newCRC32() hash.Hash32 { |
|
|
|
|
return crc32.New(castagnoliTable) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// indexWriter implements the IndexWriter interface for the standard
|
|
|
|
|
// serialization format.
|
|
|
|
|
type indexWriter struct { |
|
|
|
|
type Writer struct { |
|
|
|
|
f *os.File |
|
|
|
|
fbuf *bufio.Writer |
|
|
|
|
pos uint64 |
|
|
|
|
@ -150,14 +133,17 @@ type indexTOC struct { |
|
|
|
|
postingsTable uint64 |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func newIndexWriter(dir string) (*indexWriter, error) { |
|
|
|
|
// NewWriter returns a new Writer to the given filename.
|
|
|
|
|
func NewWriter(fn string) (*Writer, error) { |
|
|
|
|
dir := filepath.Dir(fn) |
|
|
|
|
|
|
|
|
|
df, err := fileutil.OpenDir(dir) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
defer df.Close() // close for flatform windows
|
|
|
|
|
|
|
|
|
|
f, err := os.OpenFile(filepath.Join(dir, indexFilename), os.O_CREATE|os.O_WRONLY, 0666) |
|
|
|
|
f, err := os.OpenFile(fn, os.O_CREATE|os.O_WRONLY, 0666) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
@ -165,7 +151,7 @@ func newIndexWriter(dir string) (*indexWriter, error) { |
|
|
|
|
return nil, errors.Wrap(err, "sync dir") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
iw := &indexWriter{ |
|
|
|
|
iw := &Writer{ |
|
|
|
|
f: f, |
|
|
|
|
fbuf: bufio.NewWriterSize(f, 1<<22), |
|
|
|
|
pos: 0, |
|
|
|
|
@ -187,7 +173,7 @@ func newIndexWriter(dir string) (*indexWriter, error) { |
|
|
|
|
return iw, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (w *indexWriter) write(bufs ...[]byte) error { |
|
|
|
|
func (w *Writer) write(bufs ...[]byte) error { |
|
|
|
|
for _, b := range bufs { |
|
|
|
|
n, err := w.fbuf.Write(b) |
|
|
|
|
w.pos += uint64(n) |
|
|
|
|
@ -206,18 +192,18 @@ func (w *indexWriter) write(bufs ...[]byte) error { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// addPadding adds zero byte padding until the file size is a multiple size_unit.
|
|
|
|
|
func (w *indexWriter) addPadding() error { |
|
|
|
|
p := w.pos % size_unit |
|
|
|
|
func (w *Writer) addPadding(size int) error { |
|
|
|
|
p := w.pos % uint64(size) |
|
|
|
|
if p == 0 { |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
p = size_unit - p |
|
|
|
|
p = uint64(size) - p |
|
|
|
|
return errors.Wrap(w.write(make([]byte, p)), "add padding") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// ensureStage handles transitions between write stages and ensures that IndexWriter
|
|
|
|
|
// methods are called in an order valid for the implementation.
|
|
|
|
|
func (w *indexWriter) ensureStage(s indexWriterStage) error { |
|
|
|
|
func (w *Writer) ensureStage(s indexWriterStage) error { |
|
|
|
|
if w.stage == s { |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
@ -256,7 +242,7 @@ func (w *indexWriter) ensureStage(s indexWriterStage) error { |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (w *indexWriter) writeMeta() error { |
|
|
|
|
func (w *Writer) writeMeta() error { |
|
|
|
|
w.buf1.reset() |
|
|
|
|
w.buf1.putBE32(MagicIndex) |
|
|
|
|
w.buf1.putByte(indexFormatV1) |
|
|
|
|
@ -264,7 +250,7 @@ func (w *indexWriter) writeMeta() error { |
|
|
|
|
return w.write(w.buf1.get()) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (w *indexWriter) AddSeries(ref uint64, lset labels.Labels, chunks ...ChunkMeta) error { |
|
|
|
|
func (w *Writer) AddSeries(ref uint64, lset labels.Labels, chunks ...chunks.Meta) error { |
|
|
|
|
if err := w.ensureStage(idxStageSeries); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
@ -328,7 +314,7 @@ func (w *indexWriter) AddSeries(ref uint64, lset labels.Labels, chunks ...ChunkM |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (w *indexWriter) AddSymbols(sym map[string]struct{}) error { |
|
|
|
|
func (w *Writer) AddSymbols(sym map[string]struct{}) error { |
|
|
|
|
if err := w.ensureStage(idxStageSymbols); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
@ -361,7 +347,7 @@ func (w *indexWriter) AddSymbols(sym map[string]struct{}) error { |
|
|
|
|
return errors.Wrap(err, "write symbols") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (w *indexWriter) WriteLabelIndex(names []string, values []string) error { |
|
|
|
|
func (w *Writer) WriteLabelIndex(names []string, values []string) error { |
|
|
|
|
if len(values)%len(names) != 0 { |
|
|
|
|
return errors.Errorf("invalid value list length %d for %d names", len(values), len(names)) |
|
|
|
|
} |
|
|
|
|
@ -369,14 +355,14 @@ func (w *indexWriter) WriteLabelIndex(names []string, values []string) error { |
|
|
|
|
return errors.Wrap(err, "ensure stage") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
valt, err := newStringTuples(values, len(names)) |
|
|
|
|
valt, err := NewStringTuples(values, len(names)) |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
sort.Sort(valt) |
|
|
|
|
|
|
|
|
|
// Align beginning to 4 bytes for more efficient index list scans.
|
|
|
|
|
if err := w.addPadding(); err != nil { |
|
|
|
|
if err := w.addPadding(4); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@ -407,7 +393,7 @@ func (w *indexWriter) WriteLabelIndex(names []string, values []string) error { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// writeOffsetTable writes a sequence of readable hash entries.
|
|
|
|
|
func (w *indexWriter) writeOffsetTable(entries []hashEntry) error { |
|
|
|
|
func (w *Writer) writeOffsetTable(entries []hashEntry) error { |
|
|
|
|
w.buf2.reset() |
|
|
|
|
w.buf2.putBE32int(len(entries)) |
|
|
|
|
|
|
|
|
|
@ -428,7 +414,7 @@ func (w *indexWriter) writeOffsetTable(entries []hashEntry) error { |
|
|
|
|
|
|
|
|
|
const indexTOCLen = 6*8 + 4 |
|
|
|
|
|
|
|
|
|
func (w *indexWriter) writeTOC() error { |
|
|
|
|
func (w *Writer) writeTOC() error { |
|
|
|
|
w.buf1.reset() |
|
|
|
|
|
|
|
|
|
w.buf1.putBE64(w.toc.symbols) |
|
|
|
|
@ -443,13 +429,13 @@ func (w *indexWriter) writeTOC() error { |
|
|
|
|
return w.write(w.buf1.get()) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (w *indexWriter) WritePostings(name, value string, it Postings) error { |
|
|
|
|
func (w *Writer) WritePostings(name, value string, it Postings) error { |
|
|
|
|
if err := w.ensureStage(idxStagePostings); err != nil { |
|
|
|
|
return errors.Wrap(err, "ensure stage") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Align beginning to 4 bytes for more efficient postings list scans.
|
|
|
|
|
if err := w.addPadding(); err != nil { |
|
|
|
|
if err := w.addPadding(4); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@ -506,7 +492,7 @@ type hashEntry struct { |
|
|
|
|
offset uint64 |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (w *indexWriter) Close() error { |
|
|
|
|
func (w *Writer) Close() error { |
|
|
|
|
if err := w.ensureStage(idxStageDone); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
@ -519,37 +505,6 @@ func (w *indexWriter) Close() error { |
|
|
|
|
return w.f.Close() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// IndexReader provides reading access of serialized index data.
|
|
|
|
|
type IndexReader interface { |
|
|
|
|
// Symbols returns a set of string symbols that may occur in series' labels
|
|
|
|
|
// and indices.
|
|
|
|
|
Symbols() (map[string]struct{}, error) |
|
|
|
|
|
|
|
|
|
// LabelValues returns the possible label values
|
|
|
|
|
LabelValues(names ...string) (StringTuples, error) |
|
|
|
|
|
|
|
|
|
// Postings returns the postings list iterator for the label pair.
|
|
|
|
|
// The Postings here contain the offsets to the series inside the index.
|
|
|
|
|
// Found IDs are not strictly required to point to a valid Series, e.g. during
|
|
|
|
|
// background garbage collections.
|
|
|
|
|
Postings(name, value string) (Postings, error) |
|
|
|
|
|
|
|
|
|
// SortedPostings returns a postings list that is reordered to be sorted
|
|
|
|
|
// by the label set of the underlying series.
|
|
|
|
|
SortedPostings(Postings) Postings |
|
|
|
|
|
|
|
|
|
// Series populates the given labels and chunk metas for the series identified
|
|
|
|
|
// by the reference.
|
|
|
|
|
// Returns ErrNotFound if the ref does not resolve to a known series.
|
|
|
|
|
Series(ref uint64, lset *labels.Labels, chks *[]ChunkMeta) error |
|
|
|
|
|
|
|
|
|
// LabelIndices returns the label pairs for which indices exist.
|
|
|
|
|
LabelIndices() ([][]string, error) |
|
|
|
|
|
|
|
|
|
// Close released the underlying resources of the reader.
|
|
|
|
|
Close() error |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// StringTuples provides access to a sorted list of string tuples.
|
|
|
|
|
type StringTuples interface { |
|
|
|
|
// Total number of tuples in the list.
|
|
|
|
|
@ -558,7 +513,7 @@ type StringTuples interface { |
|
|
|
|
At(i int) ([]string, error) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
type indexReader struct { |
|
|
|
|
type Reader struct { |
|
|
|
|
// The underlying byte slice holding the encoded series data.
|
|
|
|
|
b ByteSlice |
|
|
|
|
toc indexTOC |
|
|
|
|
@ -568,7 +523,7 @@ type indexReader struct { |
|
|
|
|
|
|
|
|
|
// Cached hashmaps of section offsets.
|
|
|
|
|
labels map[string]uint32 |
|
|
|
|
postings map[string]uint32 |
|
|
|
|
postings map[labels.Label]uint32 |
|
|
|
|
// Cache of read symbols. Strings that are returned when reading from the
|
|
|
|
|
// block are always backed by true strings held in here rather than
|
|
|
|
|
// strings that are backed by byte slices from the mmap'd index file. This
|
|
|
|
|
@ -576,6 +531,8 @@ type indexReader struct { |
|
|
|
|
// the block has been unmapped.
|
|
|
|
|
symbols map[uint32]string |
|
|
|
|
|
|
|
|
|
dec *DecoderV1 |
|
|
|
|
|
|
|
|
|
crc32 hash.Hash32 |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@ -605,26 +562,28 @@ func (b realByteSlice) Sub(start, end int) ByteSlice { |
|
|
|
|
return b[start:end] |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// NewIndexReader returns a new IndexReader on the given byte slice.
|
|
|
|
|
func NewIndexReader(b ByteSlice) (IndexReader, error) { |
|
|
|
|
return newIndexReader(b, nil) |
|
|
|
|
// NewReader returns a new IndexReader on the given byte slice.
|
|
|
|
|
func NewReader(b ByteSlice) (*Reader, error) { |
|
|
|
|
return newReader(b, nil) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// NewFileIndexReader returns a new index reader against the given index file.
|
|
|
|
|
func NewFileIndexReader(path string) (IndexReader, error) { |
|
|
|
|
f, err := openMmapFile(path) |
|
|
|
|
// NewFileReader returns a new index reader against the given index file.
|
|
|
|
|
func NewFileReader(path string) (*Reader, error) { |
|
|
|
|
f, err := fileutil.OpenMmapFile(path) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
return newIndexReader(realByteSlice(f.b), f) |
|
|
|
|
return newReader(realByteSlice(f.Bytes()), f) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func newIndexReader(b ByteSlice, c io.Closer) (*indexReader, error) { |
|
|
|
|
r := &indexReader{ |
|
|
|
|
b: b, |
|
|
|
|
c: c, |
|
|
|
|
symbols: map[uint32]string{}, |
|
|
|
|
crc32: newCRC32(), |
|
|
|
|
func newReader(b ByteSlice, c io.Closer) (*Reader, error) { |
|
|
|
|
r := &Reader{ |
|
|
|
|
b: b, |
|
|
|
|
c: c, |
|
|
|
|
symbols: map[uint32]string{}, |
|
|
|
|
labels: map[string]uint32{}, |
|
|
|
|
postings: map[labels.Label]uint32{}, |
|
|
|
|
crc32: newCRC32(), |
|
|
|
|
} |
|
|
|
|
// Verify magic number.
|
|
|
|
|
if b.Len() < 4 { |
|
|
|
|
@ -642,15 +601,56 @@ func newIndexReader(b ByteSlice, c io.Closer) (*indexReader, error) { |
|
|
|
|
} |
|
|
|
|
var err error |
|
|
|
|
|
|
|
|
|
r.labels, err = r.readOffsetTable(r.toc.labelIndicesTable) |
|
|
|
|
err = r.readOffsetTable(r.toc.labelIndicesTable, func(key []string, off uint32) error { |
|
|
|
|
if len(key) != 1 { |
|
|
|
|
return errors.Errorf("unexpected key length %d", len(key)) |
|
|
|
|
} |
|
|
|
|
r.labels[key[0]] = off |
|
|
|
|
return nil |
|
|
|
|
}) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, errors.Wrap(err, "read label index table") |
|
|
|
|
} |
|
|
|
|
r.postings, err = r.readOffsetTable(r.toc.postingsTable) |
|
|
|
|
return r, errors.Wrap(err, "read postings table") |
|
|
|
|
err = r.readOffsetTable(r.toc.postingsTable, func(key []string, off uint32) error { |
|
|
|
|
if len(key) != 2 { |
|
|
|
|
return errors.Errorf("unexpected key length %d", len(key)) |
|
|
|
|
} |
|
|
|
|
r.postings[labels.Label{Name: key[0], Value: key[1]}] = off |
|
|
|
|
return nil |
|
|
|
|
}) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, errors.Wrap(err, "read postings table") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
r.dec = &DecoderV1{symbols: r.symbols} |
|
|
|
|
|
|
|
|
|
return r, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Range marks a byte range.
|
|
|
|
|
type Range struct { |
|
|
|
|
Start, End int64 |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// PostingsRanges returns a new map of byte range in the underlying index file
|
|
|
|
|
// for all postings lists.
|
|
|
|
|
func (r *Reader) PostingsRanges() (map[labels.Label]Range, error) { |
|
|
|
|
m := map[labels.Label]Range{} |
|
|
|
|
|
|
|
|
|
for l, start := range r.postings { |
|
|
|
|
d := r.decbufAt(int(start)) |
|
|
|
|
if d.err() != nil { |
|
|
|
|
return nil, d.err() |
|
|
|
|
} |
|
|
|
|
m[l] = Range{ |
|
|
|
|
Start: int64(start) + 4, |
|
|
|
|
End: int64(start) + 4 + int64(d.len()), |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return m, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (r *indexReader) readTOC() error { |
|
|
|
|
func (r *Reader) readTOC() error { |
|
|
|
|
if r.b.Len() < indexTOCLen { |
|
|
|
|
return errInvalidSize |
|
|
|
|
} |
|
|
|
|
@ -676,7 +676,7 @@ func (r *indexReader) readTOC() error { |
|
|
|
|
// decbufAt returns a new decoding buffer. It expects the first 4 bytes
|
|
|
|
|
// after offset to hold the big endian encoded content length, followed by the contents and the expected
|
|
|
|
|
// checksum.
|
|
|
|
|
func (r *indexReader) decbufAt(off int) decbuf { |
|
|
|
|
func (r *Reader) decbufAt(off int) decbuf { |
|
|
|
|
if r.b.Len() < off+4 { |
|
|
|
|
return decbuf{e: errInvalidSize} |
|
|
|
|
} |
|
|
|
|
@ -700,7 +700,7 @@ func (r *indexReader) decbufAt(off int) decbuf { |
|
|
|
|
// decbufUvarintAt returns a new decoding buffer. It expects the first bytes
|
|
|
|
|
// after offset to hold the uvarint-encoded buffers length, followed by the contents and the expected
|
|
|
|
|
// checksum.
|
|
|
|
|
func (r *indexReader) decbufUvarintAt(off int) decbuf { |
|
|
|
|
func (r *Reader) decbufUvarintAt(off int) decbuf { |
|
|
|
|
// We never have to access this method at the far end of the byte slice. Thus just checking
|
|
|
|
|
// against the MaxVarintLen32 is sufficient.
|
|
|
|
|
if r.b.Len() < off+binary.MaxVarintLen32 { |
|
|
|
|
@ -730,7 +730,7 @@ func (r *indexReader) decbufUvarintAt(off int) decbuf { |
|
|
|
|
// readSymbols reads the symbol table fully into memory and allocates proper strings for them.
|
|
|
|
|
// Strings backed by the mmap'd memory would cause memory faults if applications keep using them
|
|
|
|
|
// after the reader is closed.
|
|
|
|
|
func (r *indexReader) readSymbols(off int) error { |
|
|
|
|
func (r *Reader) readSymbols(off int) error { |
|
|
|
|
if off == 0 { |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
@ -752,16 +752,13 @@ func (r *indexReader) readSymbols(off int) error { |
|
|
|
|
return d.err() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// readOffsetTable reads an offset table at the given position and returns a map
|
|
|
|
|
// with the key strings concatenated by the 0xff unicode non-character.
|
|
|
|
|
func (r *indexReader) readOffsetTable(off uint64) (map[string]uint32, error) { |
|
|
|
|
const sep = "\xff" |
|
|
|
|
|
|
|
|
|
// readOffsetTable reads an offset table at the given position calls f for each
|
|
|
|
|
// found entry.f
|
|
|
|
|
// If f returns an error it stops decoding and returns the received error,
|
|
|
|
|
func (r *Reader) readOffsetTable(off uint64, f func([]string, uint32) error) error { |
|
|
|
|
d := r.decbufAt(int(off)) |
|
|
|
|
cnt := d.be32() |
|
|
|
|
|
|
|
|
|
res := make(map[string]uint32, cnt) |
|
|
|
|
|
|
|
|
|
for d.err() == nil && d.len() > 0 && cnt > 0 { |
|
|
|
|
keyCount := int(d.uvarint()) |
|
|
|
|
keys := make([]string, 0, keyCount) |
|
|
|
|
@ -769,18 +766,24 @@ func (r *indexReader) readOffsetTable(off uint64) (map[string]uint32, error) { |
|
|
|
|
for i := 0; i < keyCount; i++ { |
|
|
|
|
keys = append(keys, d.uvarintStr()) |
|
|
|
|
} |
|
|
|
|
res[strings.Join(keys, sep)] = uint32(d.uvarint()) |
|
|
|
|
|
|
|
|
|
o := uint32(d.uvarint()) |
|
|
|
|
if d.err() != nil { |
|
|
|
|
break |
|
|
|
|
} |
|
|
|
|
if err := f(keys, o); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
cnt-- |
|
|
|
|
} |
|
|
|
|
return res, d.err() |
|
|
|
|
return d.err() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (r *indexReader) Close() error { |
|
|
|
|
// Close the reader and its underlying resources.
|
|
|
|
|
func (r *Reader) Close() error { |
|
|
|
|
return r.c.Close() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (r *indexReader) lookupSymbol(o uint32) (string, error) { |
|
|
|
|
func (r *Reader) lookupSymbol(o uint32) (string, error) { |
|
|
|
|
s, ok := r.symbols[o] |
|
|
|
|
if !ok { |
|
|
|
|
return "", errors.Errorf("unknown symbol offset %d", o) |
|
|
|
|
@ -788,7 +791,8 @@ func (r *indexReader) lookupSymbol(o uint32) (string, error) { |
|
|
|
|
return s, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (r *indexReader) Symbols() (map[string]struct{}, error) { |
|
|
|
|
// Symbols returns a set of symbols that exist within the index.
|
|
|
|
|
func (r *Reader) Symbols() (map[string]struct{}, error) { |
|
|
|
|
res := make(map[string]struct{}, len(r.symbols)) |
|
|
|
|
|
|
|
|
|
for _, s := range r.symbols { |
|
|
|
|
@ -797,7 +801,13 @@ func (r *indexReader) Symbols() (map[string]struct{}, error) { |
|
|
|
|
return res, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (r *indexReader) LabelValues(names ...string) (StringTuples, error) { |
|
|
|
|
// SymbolTable returns the symbol table that is used to resolve symbol references.
|
|
|
|
|
func (r *Reader) SymbolTable() map[uint32]string { |
|
|
|
|
return r.symbols |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// LabelValues returns value tuples that exist for the given label name tuples.
|
|
|
|
|
func (r *Reader) LabelValues(names ...string) (StringTuples, error) { |
|
|
|
|
const sep = "\xff" |
|
|
|
|
|
|
|
|
|
key := strings.Join(names, sep) |
|
|
|
|
@ -830,7 +840,8 @@ type emptyStringTuples struct{} |
|
|
|
|
func (emptyStringTuples) At(i int) ([]string, error) { return nil, nil } |
|
|
|
|
func (emptyStringTuples) Len() int { return 0 } |
|
|
|
|
|
|
|
|
|
func (r *indexReader) LabelIndices() ([][]string, error) { |
|
|
|
|
// LabelIndices returns a for which labels or label tuples value indices exist.
|
|
|
|
|
func (r *Reader) LabelIndices() ([][]string, error) { |
|
|
|
|
const sep = "\xff" |
|
|
|
|
|
|
|
|
|
res := [][]string{} |
|
|
|
|
@ -841,87 +852,38 @@ func (r *indexReader) LabelIndices() ([][]string, error) { |
|
|
|
|
return res, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta) error { |
|
|
|
|
d := r.decbufUvarintAt(int(ref)) |
|
|
|
|
|
|
|
|
|
*lbls = (*lbls)[:0] |
|
|
|
|
*chks = (*chks)[:0] |
|
|
|
|
|
|
|
|
|
k := int(d.uvarint()) |
|
|
|
|
|
|
|
|
|
for i := 0; i < k; i++ { |
|
|
|
|
lno := uint32(d.uvarint()) |
|
|
|
|
lvo := uint32(d.uvarint()) |
|
|
|
|
|
|
|
|
|
if d.err() != nil { |
|
|
|
|
return errors.Wrap(d.err(), "read series label offsets") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
ln, err := r.lookupSymbol(lno) |
|
|
|
|
if err != nil { |
|
|
|
|
return errors.Wrap(err, "lookup label name") |
|
|
|
|
} |
|
|
|
|
lv, err := r.lookupSymbol(lvo) |
|
|
|
|
if err != nil { |
|
|
|
|
return errors.Wrap(err, "lookup label value") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
*lbls = append(*lbls, labels.Label{Name: ln, Value: lv}) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Read the chunks meta data.
|
|
|
|
|
k = int(d.uvarint()) |
|
|
|
|
|
|
|
|
|
if k == 0 { |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
t0 := d.varint64() |
|
|
|
|
maxt := int64(d.uvarint64()) + t0 |
|
|
|
|
ref0 := int64(d.uvarint64()) |
|
|
|
|
|
|
|
|
|
*chks = append(*chks, ChunkMeta{ |
|
|
|
|
Ref: uint64(ref0), |
|
|
|
|
MinTime: t0, |
|
|
|
|
MaxTime: maxt, |
|
|
|
|
}) |
|
|
|
|
t0 = maxt |
|
|
|
|
|
|
|
|
|
for i := 1; i < k; i++ { |
|
|
|
|
mint := int64(d.uvarint64()) + t0 |
|
|
|
|
maxt := int64(d.uvarint64()) + mint |
|
|
|
|
|
|
|
|
|
ref0 += d.varint64() |
|
|
|
|
t0 = maxt |
|
|
|
|
|
|
|
|
|
if d.err() != nil { |
|
|
|
|
return errors.Wrapf(d.err(), "read meta for chunk %d", i) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
*chks = append(*chks, ChunkMeta{ |
|
|
|
|
Ref: uint64(ref0), |
|
|
|
|
MinTime: mint, |
|
|
|
|
MaxTime: maxt, |
|
|
|
|
}) |
|
|
|
|
// Series the series with the given ID and writes its labels and chunks into lbls and chks.
|
|
|
|
|
func (r *Reader) Series(id uint64, lbls *labels.Labels, chks *[]chunks.Meta) error { |
|
|
|
|
d := r.decbufUvarintAt(int(id)) |
|
|
|
|
if d.err() != nil { |
|
|
|
|
return d.err() |
|
|
|
|
} |
|
|
|
|
return d.err() |
|
|
|
|
return r.dec.Series(d.get(), lbls, chks) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (r *indexReader) Postings(name, value string) (Postings, error) { |
|
|
|
|
const sep = "\xff" |
|
|
|
|
key := strings.Join([]string{name, value}, sep) |
|
|
|
|
|
|
|
|
|
off, ok := r.postings[key] |
|
|
|
|
// Postings returns a postings list for the given label pair.
|
|
|
|
|
func (r *Reader) Postings(name, value string) (Postings, error) { |
|
|
|
|
off, ok := r.postings[labels.Label{ |
|
|
|
|
Name: name, |
|
|
|
|
Value: value, |
|
|
|
|
}] |
|
|
|
|
if !ok { |
|
|
|
|
return emptyPostings, nil |
|
|
|
|
return EmptyPostings(), nil |
|
|
|
|
} |
|
|
|
|
d := r.decbufAt(int(off)) |
|
|
|
|
d.be32() // consume unused postings list length.
|
|
|
|
|
|
|
|
|
|
return newBigEndianPostings(d.get()), errors.Wrap(d.err(), "get postings bytes") |
|
|
|
|
if d.err() != nil { |
|
|
|
|
return nil, errors.Wrap(d.err(), "get postings entry") |
|
|
|
|
} |
|
|
|
|
_, p, err := r.dec.Postings(d.get()) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, errors.Wrap(err, "decode postings") |
|
|
|
|
} |
|
|
|
|
return p, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (r *indexReader) SortedPostings(p Postings) Postings { |
|
|
|
|
// SortedPostings returns the given postings list reordered so that the backing series
|
|
|
|
|
// are sorted.
|
|
|
|
|
func (r *Reader) SortedPostings(p Postings) Postings { |
|
|
|
|
return p |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@ -930,7 +892,7 @@ type stringTuples struct { |
|
|
|
|
s []string // flattened tuple entries
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func newStringTuples(s []string, l int) (*stringTuples, error) { |
|
|
|
|
func NewStringTuples(s []string, l int) (*stringTuples, error) { |
|
|
|
|
if len(s)%l != 0 { |
|
|
|
|
return nil, errors.Wrap(errInvalidSize, "string tuple list") |
|
|
|
|
} |
|
|
|
|
@ -992,3 +954,100 @@ func (t *serializedStringTuples) At(i int) ([]string, error) { |
|
|
|
|
|
|
|
|
|
return res, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// DecoderV1 provides decoding methods for the v1 index file format.
|
|
|
|
|
//
|
|
|
|
|
// It currently does not contain decoding methods for all entry types but can be extended
|
|
|
|
|
// by them if there's demand.
|
|
|
|
|
type DecoderV1 struct { |
|
|
|
|
symbols map[uint32]string |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (dec *DecoderV1) lookupSymbol(o uint32) (string, error) { |
|
|
|
|
s, ok := dec.symbols[o] |
|
|
|
|
if !ok { |
|
|
|
|
return "", errors.Errorf("unknown symbol offset %d", o) |
|
|
|
|
} |
|
|
|
|
return s, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// SetSymbolTable set the symbol table to be used for lookups when decoding series
|
|
|
|
|
// and label indices
|
|
|
|
|
func (dec *DecoderV1) SetSymbolTable(t map[uint32]string) { |
|
|
|
|
dec.symbols = t |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Postings returns a postings list for b and its number of elements.
|
|
|
|
|
func (dec *DecoderV1) Postings(b []byte) (int, Postings, error) { |
|
|
|
|
d := decbuf{b: b} |
|
|
|
|
n := d.be32int() |
|
|
|
|
l := d.get() |
|
|
|
|
return n, newBigEndianPostings(l), d.err() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Series decodes a series entry from the given byte slice into lset and chks.
|
|
|
|
|
func (dec *DecoderV1) Series(b []byte, lbls *labels.Labels, chks *[]chunks.Meta) error { |
|
|
|
|
*lbls = (*lbls)[:0] |
|
|
|
|
*chks = (*chks)[:0] |
|
|
|
|
|
|
|
|
|
d := decbuf{b: b} |
|
|
|
|
|
|
|
|
|
k := int(d.uvarint()) |
|
|
|
|
|
|
|
|
|
for i := 0; i < k; i++ { |
|
|
|
|
lno := uint32(d.uvarint()) |
|
|
|
|
lvo := uint32(d.uvarint()) |
|
|
|
|
|
|
|
|
|
if d.err() != nil { |
|
|
|
|
return errors.Wrap(d.err(), "read series label offsets") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
ln, err := dec.lookupSymbol(lno) |
|
|
|
|
if err != nil { |
|
|
|
|
return errors.Wrap(err, "lookup label name") |
|
|
|
|
} |
|
|
|
|
lv, err := dec.lookupSymbol(lvo) |
|
|
|
|
if err != nil { |
|
|
|
|
return errors.Wrap(err, "lookup label value") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
*lbls = append(*lbls, labels.Label{Name: ln, Value: lv}) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Read the chunks meta data.
|
|
|
|
|
k = int(d.uvarint()) |
|
|
|
|
|
|
|
|
|
if k == 0 { |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
t0 := d.varint64() |
|
|
|
|
maxt := int64(d.uvarint64()) + t0 |
|
|
|
|
ref0 := int64(d.uvarint64()) |
|
|
|
|
|
|
|
|
|
*chks = append(*chks, chunks.Meta{ |
|
|
|
|
Ref: uint64(ref0), |
|
|
|
|
MinTime: t0, |
|
|
|
|
MaxTime: maxt, |
|
|
|
|
}) |
|
|
|
|
t0 = maxt |
|
|
|
|
|
|
|
|
|
for i := 1; i < k; i++ { |
|
|
|
|
mint := int64(d.uvarint64()) + t0 |
|
|
|
|
maxt := int64(d.uvarint64()) + mint |
|
|
|
|
|
|
|
|
|
ref0 += d.varint64() |
|
|
|
|
t0 = maxt |
|
|
|
|
|
|
|
|
|
if d.err() != nil { |
|
|
|
|
return errors.Wrapf(d.err(), "read meta for chunk %d", i) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
*chks = append(*chks, chunks.Meta{ |
|
|
|
|
Ref: uint64(ref0), |
|
|
|
|
MinTime: mint, |
|
|
|
|
MaxTime: maxt, |
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|
return d.err() |
|
|
|
|
} |