feat(TSDB): allow tsdb index creation in memory only (#14732)

pull/14734/head^2
Owen Diehl 7 months ago committed by GitHub
parent 5f9fe83bec
commit 831c0d56b8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 4
      pkg/storage/stores/shipper/indexshipper/tsdb/builder.go
  2. 10
      pkg/storage/stores/shipper/indexshipper/tsdb/index/chunk_test.go
  3. 295
      pkg/storage/stores/shipper/indexshipper/tsdb/index/index.go
  4. 15
      pkg/storage/stores/shipper/indexshipper/tsdb/index/index_test.go
  5. 239
      pkg/storage/stores/shipper/indexshipper/tsdb/index/writer.go

@ -111,7 +111,7 @@ func (b *Builder) Build(
name := fmt.Sprintf("%s-%x.staging", index.IndexFilename, rng)
tmpPath := filepath.Join(scratchDir, name)
var writer *index.Writer
var writer *index.Creator
writer, err = index.NewWriterWithVersion(ctx, b.version, tmpPath)
if err != nil {
@ -169,7 +169,7 @@ func (b *Builder) Build(
}
}
if err := writer.Close(); err != nil {
if _, err := writer.Close(false); err != nil {
return id, err
}

@ -433,7 +433,7 @@ func TestChunkEncodingRoundTrip(t *testing.T) {
} {
t.Run(fmt.Sprintf("version %d nChks %d pageSize %d", version, nChks, pageSize), func(t *testing.T) {
chks := mkChks(nChks)
var w Writer
var w Creator
w.Version = version
primary := encoding.EncWrap(tsdb_enc.Encbuf{B: make([]byte, 0)})
scratch := encoding.EncWrap(tsdb_enc.Encbuf{B: make([]byte, 0)})
@ -583,7 +583,7 @@ func TestSearchWithPageMarkers(t *testing.T) {
},
} {
t.Run(fmt.Sprintf("%s-pagesize-%d", tc.desc, pageSize), func(t *testing.T) {
var w Writer
var w Creator
w.Version = FormatV3
primary := encoding.EncWrap(tsdb_enc.Encbuf{B: make([]byte, 0)})
scratch := encoding.EncWrap(tsdb_enc.Encbuf{B: make([]byte, 0)})
@ -697,7 +697,7 @@ func TestDecoderChunkStats(t *testing.T) {
},
} {
t.Run(fmt.Sprintf("%s_version=%d_pageSize=%d", tc.desc, version, pageSize), func(t *testing.T) {
var w Writer
var w Creator
w.Version = version
primary := encoding.EncWrap(tsdb_enc.Encbuf{B: make([]byte, 0)})
scratch := encoding.EncWrap(tsdb_enc.Encbuf{B: make([]byte, 0)})
@ -722,7 +722,7 @@ func BenchmarkChunkStats(b *testing.B) {
from, through := int64(nChks*40/100), int64(nChks*60/100)
for _, version := range []int{FormatV2, FormatV3} {
b.Run(fmt.Sprintf("version %d/%d chunks", version, nChks), func(b *testing.B) {
var w Writer
var w Creator
w.Version = version
primary := encoding.EncWrap(tsdb_enc.Encbuf{B: make([]byte, 0)})
scratch := encoding.EncWrap(tsdb_enc.Encbuf{B: make([]byte, 0)})
@ -747,7 +747,7 @@ func BenchmarkReadChunks(b *testing.B) {
from, through := int64(nChks*40/100), int64(nChks*60/100)
for _, version := range []int{FormatV2, FormatV3} {
b.Run(fmt.Sprintf("version %d/%d chunks", version, nChks), func(b *testing.B) {
var w Writer
var w Creator
w.Version = version
primary := encoding.EncWrap(tsdb_enc.Encbuf{B: make([]byte, 0)})
scratch := encoding.EncWrap(tsdb_enc.Encbuf{B: make([]byte, 0)})

@ -14,7 +14,6 @@
package index
import (
"bufio"
"bytes"
"context"
"encoding/binary"
@ -38,6 +37,8 @@ import (
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/prometheus/prometheus/tsdb/fileutil"
"github.com/grafana/dskit/multierror"
"github.com/grafana/loki/v3/pkg/util/encoding"
)
@ -107,18 +108,18 @@ type symbolCacheEntry struct {
lastValueIndex uint32
}
// Writer implements the IndexWriter interface for the standard
// Creator implements the IndexWriter interface for the standard
// serialization format.
type Writer struct {
type Creator struct {
ctx context.Context
// For the main index file.
f *FileWriter
// For the main index.
f writer
// Temporary file for postings.
fP *FileWriter
// Temporary file for posting offsets table.
fPO *FileWriter
// Temporary writer for postings.
fP writer
// Temporary writer for posting offsets table.
fPO writer
cntPO uint64
toc TOC
@ -131,7 +132,6 @@ type Writer struct {
numSymbols int
symbols *Symbols
symbolFile *fileutil.MmapFile
lastSymbol string
symbolCache map[string]symbolCacheEntry
@ -211,7 +211,7 @@ func NewTOCFromByteSlice(bs ByteSlice) (*TOC, error) {
}, nil
}
func NewWriterWithVersion(ctx context.Context, version int, fn string) (*Writer, error) {
func NewWriterWithVersion(ctx context.Context, version int, fn string) (*Creator, error) {
dir := filepath.Dir(fn)
df, err := fileutil.OpenDir(dir)
@ -243,7 +243,7 @@ func NewWriterWithVersion(ctx context.Context, version int, fn string) (*Writer,
return nil, errors.Wrap(err, "sync dir")
}
iw := &Writer{
iw := &Creator{
Version: version,
ctx: ctx,
f: f,
@ -266,107 +266,26 @@ func NewWriterWithVersion(ctx context.Context, version int, fn string) (*Writer,
}
// NewWriter returns a new Writer to the given filename.
func NewWriter(ctx context.Context, indexFormat int, fn string) (*Writer, error) {
func NewWriter(ctx context.Context, indexFormat int, fn string) (*Creator, error) {
return NewWriterWithVersion(ctx, indexFormat, fn)
}
func (w *Writer) write(bufs ...[]byte) error {
return w.f.Write(bufs...)
}
func (w *Writer) writeAt(buf []byte, pos uint64) error {
return w.f.WriteAt(buf, pos)
}
func (w *Writer) addPadding(size int) error {
return w.f.AddPadding(size)
}
type FileWriter struct {
f *os.File
fbuf *bufio.Writer
pos uint64
name string
}
func NewFileWriter(name string) (*FileWriter, error) {
f, err := os.OpenFile(name, os.O_CREATE|os.O_RDWR, 0o666)
if err != nil {
return nil, err
}
return &FileWriter{
f: f,
fbuf: bufio.NewWriterSize(f, 1<<22),
pos: 0,
name: name,
}, nil
}
func (fw *FileWriter) Pos() uint64 {
return fw.pos
}
func (fw *FileWriter) Write(bufs ...[]byte) error {
for _, b := range bufs {
n, err := fw.fbuf.Write(b)
fw.pos += uint64(n)
if err != nil {
return err
}
// For now the index file must not grow beyond 64GiB. Some of the fixed-sized
// offset references in v1 are only 4 bytes large.
// Once we move to compressed/varint representations in those areas, this limitation
// can be lifted.
if fw.pos > 16*math.MaxUint32 {
return errors.Errorf("%q exceeding max size of 64GiB", fw.name)
}
}
return nil
}
func (fw *FileWriter) Flush() error {
return fw.fbuf.Flush()
func (w *Creator) write(bufs ...[]byte) error {
return w.f.WriteBufs(bufs...)
}
func (fw *FileWriter) WriteAt(buf []byte, pos uint64) error {
if err := fw.Flush(); err != nil {
return err
}
_, err := fw.f.WriteAt(buf, int64(pos))
func (w *Creator) writeAt(buf []byte, pos int64) error {
_, err := w.f.WriteAt(buf, pos)
return err
}
// AddPadding adds zero byte padding until the file size is a multiple size.
func (fw *FileWriter) AddPadding(size int) error {
p := fw.pos % uint64(size)
if p == 0 {
return nil
}
p = uint64(size) - p
if err := fw.Write(make([]byte, p)); err != nil {
return errors.Wrap(err, "add padding")
}
return nil
}
func (fw *FileWriter) Close() error {
if err := fw.Flush(); err != nil {
return err
}
if err := fw.f.Sync(); err != nil {
return err
}
return fw.f.Close()
}
func (fw *FileWriter) Remove() error {
return os.Remove(fw.name)
func (w *Creator) addPadding(size int) error {
return w.f.AddPadding(size)
}
// ensureStage handles transitions between write stages and ensures that IndexWriter
// methods are called in an order valid for the implementation.
func (w *Writer) ensureStage(s indexWriterStage) error {
func (w *Creator) ensureStage(s indexWriterStage) error {
select {
case <-w.ctx.Done():
return w.ctx.Err()
@ -389,7 +308,7 @@ func (w *Writer) ensureStage(s indexWriterStage) error {
// Mark start of sections in table of contents.
switch s {
case idxStageSymbols:
w.toc.Symbols = w.f.pos
w.toc.Symbols = w.f.Pos()
if err := w.startSymbols(); err != nil {
return err
}
@ -397,10 +316,10 @@ func (w *Writer) ensureStage(s indexWriterStage) error {
if err := w.finishSymbols(); err != nil {
return err
}
w.toc.Series = w.f.pos
w.toc.Series = w.f.Pos()
case idxStageDone:
w.toc.LabelIndices = w.f.pos
w.toc.LabelIndices = w.f.Pos()
// LabelIndices generation depends on the posting offset
// table produced at this stage.
if err := w.writePostingsToTmpFiles(); err != nil {
@ -410,22 +329,22 @@ func (w *Writer) ensureStage(s indexWriterStage) error {
return err
}
w.toc.Postings = w.f.pos
w.toc.Postings = w.f.Pos()
if err := w.writePostings(); err != nil {
return err
}
w.toc.LabelIndicesTable = w.f.pos
w.toc.LabelIndicesTable = w.f.Pos()
if err := w.writeLabelIndexesOffsetTable(); err != nil {
return err
}
w.toc.PostingsTable = w.f.pos
w.toc.PostingsTable = w.f.Pos()
if err := w.writePostingsOffsetTable(); err != nil {
return err
}
w.toc.FingerprintOffsets = w.f.pos
w.toc.FingerprintOffsets = w.f.Pos()
if err := w.writeFingerprintOffsetsTable(); err != nil {
return err
}
@ -439,7 +358,7 @@ func (w *Writer) ensureStage(s indexWriterStage) error {
return nil
}
func (w *Writer) writeMeta() error {
func (w *Creator) writeMeta() error {
w.buf1.Reset()
w.buf1.PutBE32(MagicIndex)
w.buf1.PutByte(byte(w.Version))
@ -452,7 +371,7 @@ func (w *Writer) writeMeta() error {
// fingerprint differs from what labels.Hash() produces. For example,
// multitenant TSDBs embed a tenant label, but the actual series has no such
// label and so the derived fingerprint differs.
func (w *Writer) AddSeries(ref storage.SeriesRef, lset labels.Labels, fp model.Fingerprint, chunks ...ChunkMeta) error {
func (w *Creator) AddSeries(ref storage.SeriesRef, lset labels.Labels, fp model.Fingerprint, chunks ...ChunkMeta) error {
if err := w.ensureStage(idxStageSeries); err != nil {
return err
}
@ -478,8 +397,8 @@ func (w *Writer) AddSeries(ref storage.SeriesRef, lset labels.Labels, fp model.F
return errors.Errorf("failed to write padding bytes: %v", err)
}
if w.f.pos%16 != 0 {
return errors.Errorf("series write not 16-byte aligned at %d", w.f.pos)
if w.f.Pos()%16 != 0 {
return errors.Errorf("series write not 16-byte aligned at %d", w.f.Pos())
}
w.buf2.Reset()
@ -528,7 +447,7 @@ func (w *Writer) AddSeries(ref storage.SeriesRef, lset labels.Labels, fp model.F
if ref%fingerprintInterval == 0 {
// series references are the 16-byte aligned offsets
// Do NOT ask me how long I debugged this particular bit >:O
sRef := w.f.pos / 16
sRef := w.f.Pos() / 16
w.fingerprintOffsets = append(w.fingerprintOffsets, [2]uint64{sRef, labelHash})
}
@ -539,7 +458,7 @@ func (w *Writer) AddSeries(ref storage.SeriesRef, lset labels.Labels, fp model.F
return nil
}
func (w *Writer) addChunks(chunks []ChunkMeta, primary, scratch *encoding.Encbuf, pageSize int) {
func (w *Creator) addChunks(chunks []ChunkMeta, primary, scratch *encoding.Encbuf, pageSize int) {
if w.Version > FormatV2 {
w.addChunksV3(chunks, primary, scratch, pageSize)
return
@ -547,7 +466,7 @@ func (w *Writer) addChunks(chunks []ChunkMeta, primary, scratch *encoding.Encbuf
w.addChunksPriorV3(chunks, primary, scratch)
}
func (w *Writer) addChunksPriorV3(chunks []ChunkMeta, primary, _ *encoding.Encbuf) {
func (w *Creator) addChunksPriorV3(chunks []ChunkMeta, primary, _ *encoding.Encbuf) {
primary.PutUvarint(len(chunks))
if len(chunks) > 0 {
@ -576,7 +495,7 @@ func (w *Writer) addChunksPriorV3(chunks []ChunkMeta, primary, _ *encoding.Encbu
}
}
func (w *Writer) addChunksV3(chunks []ChunkMeta, primary, scratch *encoding.Encbuf, chunkPageSize int) {
func (w *Creator) addChunksV3(chunks []ChunkMeta, primary, scratch *encoding.Encbuf, chunkPageSize int) {
scratch.Reset()
primary.PutUvarint(len(chunks))
@ -645,14 +564,14 @@ func (w *Writer) addChunksV3(chunks []ChunkMeta, primary, scratch *encoding.Encb
primary.PutBytes(scratch.Get())
}
func (w *Writer) startSymbols() error {
func (w *Creator) startSymbols() error {
// We are at w.toc.Symbols.
// Leave 4 bytes of space for the length, and another 4 for the number of symbols
// which will both be calculated later.
return w.write([]byte("alenblen"))
}
func (w *Writer) AddSymbol(sym string) error {
func (w *Creator) AddSymbol(sym string) error {
if err := w.ensureStage(idxStageSymbols); err != nil {
return err
}
@ -666,8 +585,8 @@ func (w *Writer) AddSymbol(sym string) error {
return w.write(w.buf1.Get())
}
func (w *Writer) finishSymbols() error {
symbolTableSize := w.f.pos - w.toc.Symbols - 4
func (w *Creator) finishSymbols() error {
symbolTableSize := w.f.Pos() - w.toc.Symbols - 4
// The symbol table's <len> part is 4 bytes. So the total symbol table size must be less than or equal to 2^32-1
if symbolTableSize > math.MaxUint32 {
return errors.Errorf("symbol table size exceeds 4 bytes: %d", symbolTableSize)
@ -677,11 +596,11 @@ func (w *Writer) finishSymbols() error {
w.buf1.Reset()
w.buf1.PutBE32int(int(symbolTableSize))
w.buf1.PutBE32int(w.numSymbols)
if err := w.writeAt(w.buf1.Get(), w.toc.Symbols); err != nil {
if err := w.writeAt(w.buf1.Get(), int64(w.toc.Symbols)); err != nil {
return err
}
hashPos := w.f.pos
hashPos := w.f.Pos()
// Leave space for the hash. We can only calculate it
// now that the number of symbols is known, so mmap and do it from there.
if err := w.write([]byte("hash")); err != nil {
@ -691,39 +610,39 @@ func (w *Writer) finishSymbols() error {
return err
}
sf, err := fileutil.OpenMmapFile(w.f.name)
symbolBytes, err := w.f.Bytes()
if err != nil {
return err
}
w.symbolFile = sf
hash := crc32.Checksum(w.symbolFile.Bytes()[w.toc.Symbols+4:hashPos], castagnoliTable)
hash := crc32.Checksum(symbolBytes[w.toc.Symbols+4:hashPos], castagnoliTable)
w.buf1.Reset()
w.buf1.PutBE32(hash)
if err := w.writeAt(w.buf1.Get(), hashPos); err != nil {
if err := w.writeAt(w.buf1.Get(), int64(hashPos)); err != nil {
return err
}
// Load in the symbol table efficiently for the rest of the index writing.
w.symbols, err = NewSymbols(RealByteSlice(w.symbolFile.Bytes()), w.Version, int(w.toc.Symbols))
// Now that we've calculated and added the checksum on disk, add it to the
// pre-checksummed bytes in memory so we can use this later,
// loading the symbol table efficiently for the rest of the index writing.
copy(symbolBytes[hashPos:], w.buf1.Get())
w.symbols, err = NewSymbols(RealByteSlice(symbolBytes), w.Version, int(w.toc.Symbols))
if err != nil {
return errors.Wrap(err, "read symbols")
}
return nil
}
func (w *Writer) writeLabelIndices() error {
func (w *Creator) writeLabelIndices() error {
if err := w.fPO.Flush(); err != nil {
return err
}
// Find all the label values in the tmp posting offset table.
f, err := fileutil.OpenMmapFile(w.fPO.name)
pOffsets, err := w.fPO.Bytes()
if err != nil {
return err
}
defer f.Close()
d := encoding.DecWrap(tsdb_enc.NewDecbufRaw(RealByteSlice(f.Bytes()), int(w.fPO.pos)))
d := encoding.DecWrap(tsdb_enc.NewDecbufRaw(RealByteSlice(pOffsets), int(w.fPO.Pos())))
cnt := w.cntPO
current := []byte{}
values := []uint32{}
@ -764,7 +683,7 @@ func (w *Writer) writeLabelIndices() error {
return nil
}
func (w *Writer) writeLabelIndex(name string, values []uint32) error {
func (w *Creator) writeLabelIndex(name string, values []uint32) error {
// Align beginning to 4 bytes for more efficient index list scans.
if err := w.addPadding(4); err != nil {
return err
@ -772,10 +691,10 @@ func (w *Writer) writeLabelIndex(name string, values []uint32) error {
w.labelIndexes = append(w.labelIndexes, labelIndexHashEntry{
keys: []string{name},
offset: w.f.pos,
offset: w.f.Pos(),
})
startPos := w.f.pos
startPos := w.f.Pos()
// Leave 4 bytes of space for the length, which will be calculated later.
if err := w.write([]byte("alen")); err != nil {
return err
@ -801,12 +720,12 @@ func (w *Writer) writeLabelIndex(name string, values []uint32) error {
// Write out the length.
w.buf1.Reset()
l := w.f.pos - startPos - 4
l := w.f.Pos() - startPos - 4
if l > math.MaxUint32 {
return errors.Errorf("label index size exceeds 4 bytes: %d", l)
}
w.buf1.PutBE32int(int(l))
if err := w.writeAt(w.buf1.Get(), startPos); err != nil {
if err := w.writeAt(w.buf1.Get(), int64(startPos)); err != nil {
return err
}
@ -816,8 +735,8 @@ func (w *Writer) writeLabelIndex(name string, values []uint32) error {
}
// writeLabelIndexesOffsetTable writes the label indices offset table.
func (w *Writer) writeLabelIndexesOffsetTable() error {
startPos := w.f.pos
func (w *Creator) writeLabelIndexesOffsetTable() error {
startPos := w.f.Pos()
// Leave 4 bytes of space for the length, which will be calculated later.
if err := w.write([]byte("alen")); err != nil {
return err
@ -845,12 +764,12 @@ func (w *Writer) writeLabelIndexesOffsetTable() error {
}
// Write out the length.
w.buf1.Reset()
l := w.f.pos - startPos - 4
l := w.f.Pos() - startPos - 4
if l > math.MaxUint32 {
return errors.Errorf("label indexes offset table size exceeds 4 bytes: %d", l)
}
w.buf1.PutBE32int(int(l))
if err := w.writeAt(w.buf1.Get(), startPos); err != nil {
if err := w.writeAt(w.buf1.Get(), int64(startPos)); err != nil {
return err
}
@ -860,13 +779,13 @@ func (w *Writer) writeLabelIndexesOffsetTable() error {
}
// writePostingsOffsetTable writes the postings offset table.
func (w *Writer) writePostingsOffsetTable() error {
func (w *Creator) writePostingsOffsetTable() error {
// Ensure everything is in the temporary file.
if err := w.fPO.Flush(); err != nil {
return err
}
startPos := w.f.pos
startPos := w.f.Pos()
// Leave 4 bytes of space for the length, which will be calculated later.
if err := w.write([]byte("alen")); err != nil {
return err
@ -884,16 +803,12 @@ func (w *Writer) writePostingsOffsetTable() error {
return err
}
f, err := fileutil.OpenMmapFile(w.fPO.name)
pOffsets, err := w.fPO.Bytes()
if err != nil {
return err
}
defer func() {
if f != nil {
f.Close()
}
}()
d := encoding.DecWrap(tsdb_enc.NewDecbufRaw(RealByteSlice(f.Bytes()), int(w.fPO.pos)))
d := encoding.DecWrap(tsdb_enc.NewDecbufRaw(RealByteSlice(pOffsets), int(w.fPO.Pos())))
cnt := w.cntPO
for d.Err() == nil && cnt > 0 {
w.buf1.Reset()
@ -911,11 +826,6 @@ func (w *Writer) writePostingsOffsetTable() error {
return d.Err()
}
// Cleanup temporary file.
if err := f.Close(); err != nil {
return err
}
f = nil
if err := w.fPO.Close(); err != nil {
return err
}
@ -926,12 +836,12 @@ func (w *Writer) writePostingsOffsetTable() error {
// Write out the length.
w.buf1.Reset()
l := w.f.pos - startPos - 4
l := w.f.Pos() - startPos - 4
if l > math.MaxUint32 {
return errors.Errorf("postings offset table size exceeds 4 bytes: %d", l)
}
w.buf1.PutBE32int(int(l))
if err := w.writeAt(w.buf1.Get(), startPos); err != nil {
if err := w.writeAt(w.buf1.Get(), int64(startPos)); err != nil {
return err
}
@ -941,7 +851,7 @@ func (w *Writer) writePostingsOffsetTable() error {
return w.write(w.buf1.Get())
}
func (w *Writer) writeFingerprintOffsetsTable() error {
func (w *Creator) writeFingerprintOffsetsTable() error {
w.buf1.Reset()
w.buf2.Reset()
@ -975,7 +885,7 @@ func (w *Writer) writeFingerprintOffsetsTable() error {
const indexTOCLen = 8*9 + crc32.Size
func (w *Writer) writeTOC() error {
func (w *Creator) writeTOC() error {
w.buf1.Reset()
w.buf1.PutBE64(w.toc.Symbols)
@ -995,7 +905,7 @@ func (w *Writer) writeTOC() error {
return w.write(w.buf1.Get())
}
func (w *Writer) writePostingsToTmpFiles() error {
func (w *Creator) writePostingsToTmpFiles() error {
names := make([]string, 0, len(w.labelNames))
for n := range w.labelNames {
names = append(names, n)
@ -1005,15 +915,15 @@ func (w *Writer) writePostingsToTmpFiles() error {
if err := w.f.Flush(); err != nil {
return err
}
f, err := fileutil.OpenMmapFile(w.f.name)
b, err := w.f.Bytes()
if err != nil {
return err
}
defer f.Close()
// Write out the special all posting.
offsets := []uint32{}
d := encoding.DecWrap(tsdb_enc.NewDecbufRaw(RealByteSlice(f.Bytes()), int(w.toc.LabelIndices)))
d := encoding.DecWrap(tsdb_enc.NewDecbufRaw(RealByteSlice(b), int(w.toc.LabelIndices)))
d.Skip(int(w.toc.Series))
for d.Len() > 0 {
d.ConsumePadding()
@ -1059,7 +969,7 @@ func (w *Writer) writePostingsToTmpFiles() error {
// Label name -> label value -> positions.
postings := map[uint32]map[uint32][]uint32{}
d := encoding.DecWrap(tsdb_enc.NewDecbufRaw(RealByteSlice(f.Bytes()), int(w.toc.LabelIndices)))
d := encoding.DecWrap(tsdb_enc.NewDecbufRaw(RealByteSlice(b), int(w.toc.LabelIndices)))
d.Skip(int(w.toc.Series))
for d.Len() > 0 {
d.ConsumePadding()
@ -1120,7 +1030,7 @@ func (w *Writer) writePostingsToTmpFiles() error {
return nil
}
func (w *Writer) writePosting(name, value string, offs []uint32) error {
func (w *Creator) writePosting(name, value string, offs []uint32) error {
// Align beginning to 4 bytes for more efficient postings list scans.
if err := w.fP.AddPadding(4); err != nil {
return err
@ -1131,8 +1041,8 @@ func (w *Writer) writePosting(name, value string, offs []uint32) error {
w.buf1.PutUvarint(2)
w.buf1.PutUvarintStr(name)
w.buf1.PutUvarintStr(value)
w.buf1.PutUvarint64(w.fP.pos) // This is relative to the postings tmp file, not the final index file.
if err := w.fPO.Write(w.buf1.Get()); err != nil {
w.buf1.PutUvarint64(w.fP.Pos()) // This is relative to the postings tmp file, not the final index file.
if err := w.fPO.WriteBufs(w.buf1.Get()); err != nil {
return err
}
w.cntPO++
@ -1155,32 +1065,34 @@ func (w *Writer) writePosting(name, value string, offs []uint32) error {
}
w.buf2.PutBE32int(l)
w.buf1.PutHash(w.crc32)
return w.fP.Write(w.buf2.Get(), w.buf1.Get())
return w.fP.WriteBufs(w.buf2.Get(), w.buf1.Get())
}
func (w *Writer) writePostings() error {
func (w *Creator) writePostings() error {
// There's padding in the tmp file, make sure it actually works.
if err := w.f.AddPadding(4); err != nil {
return err
}
w.postingsStart = w.f.pos
w.postingsStart = w.f.Pos()
// Copy temporary file into main index.
if err := w.fP.Flush(); err != nil {
return err
}
if _, err := w.fP.f.Seek(0, 0); err != nil {
// NB(owen-d): inefficient, but avoids complexity `Pos()` altering `Seek` logic.
postings, err := w.fP.Bytes()
if err != nil {
return err
}
// Don't need to calculate a checksum, so can copy directly.
n, err := io.CopyBuffer(w.f.fbuf, w.fP.f, make([]byte, 1<<20))
n, err := io.CopyBuffer(w.f, bytes.NewReader(postings), make([]byte, 1<<20))
if err != nil {
return err
}
if uint64(n) != w.fP.pos {
return errors.Errorf("wrote %d bytes to posting temporary file, but only read back %d", w.fP.pos, n)
if uint64(n) != w.fP.Pos() {
return errors.Errorf("wrote %d bytes to posting temporary file, but only read back %d", w.fP.Pos(), n)
}
w.f.pos += uint64(n)
if err := w.fP.Close(); err != nil {
return err
@ -1203,29 +1115,38 @@ type labelIndexHashEntry struct {
offset uint64
}
func (w *Writer) Close() error {
// Even if this fails, we need to close all the files.
ensureErr := w.ensureStage(idxStageDone)
// if reader is true, return an io.ReadCloser of the underlying index. Otherwise, it returns nil.
func (w *Creator) Close(reader bool) (db io.ReadCloser, err error) {
var errs multierror.MultiError
if w.symbolFile != nil {
if err := w.symbolFile.Close(); err != nil {
return err
}
if ensureErr := w.ensureStage(idxStageDone); ensureErr != nil {
errs.Add(ensureErr)
}
if w.fP != nil {
if err := w.fP.Close(); err != nil {
return err
errs.Add(err)
}
}
if w.fPO != nil {
if err := w.fPO.Close(); err != nil {
return err
errs.Add(err)
}
}
if err := w.f.Close(); err != nil {
return err
errs.Add(err)
}
if err := errs.Err(); err != nil {
return nil, err
}
if reader {
return w.f.Load()
}
return ensureErr
return nil, nil
}
// StringIter iterates over a sorted list of strings.

@ -132,7 +132,8 @@ func TestIndexRW_Create_Open(t *testing.T) {
// An empty index must still result in a readable file.
iw, err := NewWriter(context.Background(), FormatV3, fn)
require.NoError(t, err)
require.NoError(t, iw.Close())
_, err = iw.Close(false)
require.NoError(t, err)
ir, err := NewFileReader(fn)
require.NoError(t, err)
@ -178,7 +179,8 @@ func TestIndexRW_Postings(t *testing.T) {
require.NoError(t, iw.AddSeries(3, series[2], model.Fingerprint(series[2].Hash())))
require.NoError(t, iw.AddSeries(4, series[3], model.Fingerprint(series[3].Hash())))
require.NoError(t, iw.Close())
_, err = iw.Close(false)
require.NoError(t, err)
ir, err := NewFileReader(fn)
require.NoError(t, err)
@ -266,7 +268,8 @@ func TestPostingsMany(t *testing.T) {
for i, s := range series {
require.NoError(t, iw.AddSeries(storage.SeriesRef(i), s, model.Fingerprint(s.Hash())))
}
require.NoError(t, iw.Close())
_, err = iw.Close(false)
require.NoError(t, err)
ir, err := NewFileReader(fn)
require.NoError(t, err)
@ -406,7 +409,7 @@ func TestPersistence_index_e2e(t *testing.T) {
postings.Add(storage.SeriesRef(i), s.labels)
}
err = iw.Close()
_, err = iw.Close(false)
require.NoError(t, err)
ir, err := NewFileReader(filepath.Join(dir, IndexFilename))
@ -741,7 +744,7 @@ func TestDecoder_ChunkSamples(t *testing.T) {
require.NoError(t, err)
}
err = iw.Close()
_, err = iw.Close(false)
require.NoError(t, err)
ir, err := NewFileReader(filepath.Join(dir, name))
@ -997,7 +1000,7 @@ func BenchmarkInitReader_ReadOffsetTable(b *testing.B) {
require.NoError(b, err)
}
err = iw.Close()
_, err = iw.Close(false)
require.NoError(b, err)
bs, err := os.ReadFile(idxFile)

@ -0,0 +1,239 @@
package index
import (
"bufio"
"bytes"
"io"
"math"
"os"
"github.com/pkg/errors"
)
// interface used in tsdb creation -- originally extracted from
// interacting with temporary files on the file system.
type writer interface {
io.WriteCloser
io.ReaderFrom
// WriteAt overwrites a subset of the writer, but only if it won't overflow the current position.
// NB: will not change position.
io.WriterAt
Remove() error
Pos() uint64
WriteBufs(bufs ...[]byte) error
AddPadding(size int) error
Flush() error
// Returns the underlying bytes of the writer and sets the Pos to the end
Bytes() ([]byte, error)
// Used at the end to return the built file. Left as a implementable method rather than being
// done via Bytes() to allow optimizations (e.g. avoid loading whole index into memory when unused)
Load() (io.ReadCloser, error)
}
type FileWriter struct {
f *os.File
fbuf *bufio.Writer
position uint64
name string
}
func NewFileWriter(name string) (*FileWriter, error) {
f, err := os.OpenFile(name, os.O_CREATE|os.O_RDWR, 0o666)
if err != nil {
return nil, err
}
return &FileWriter{
f: f,
fbuf: bufio.NewWriterSize(f, 1<<22),
position: 0,
name: name,
}, nil
}
func (fw *FileWriter) Pos() uint64 {
return fw.position
}
func (fw *FileWriter) ReadFrom(r io.Reader) (int64, error) {
n, err := fw.fbuf.ReadFrom(r)
fw.position += uint64(n)
return n, err
}
func (fw *FileWriter) Write(p []byte) (n int, err error) {
n, err = fw.fbuf.Write(p)
fw.position += uint64(n)
if err != nil {
return n, err
}
// For now the index file must not grow beyond 64GiB. Some of the fixed-sized
// offset references in v1 are only 4 bytes large.
// Once we move to compressed/varint representations in those areas, this limitation
// can be lifted.
if fw.position > 16*math.MaxUint32 {
return n, errors.Errorf("%q exceeding max size of 64GiB", fw.name)
}
return n, nil
}
func (fw *FileWriter) WriteBufs(bufs ...[]byte) error {
for _, b := range bufs {
if _, err := fw.Write(b); err != nil {
return err
}
}
return nil
}
func (fw *FileWriter) Flush() error {
return fw.fbuf.Flush()
}
func (fw *FileWriter) WriteAt(buf []byte, pos int64) (int, error) {
if err := fw.Flush(); err != nil {
return 0, err
}
if pos > int64(fw.Pos()) {
return 0, errors.New("position out of range")
}
if pos+int64(len(buf)) > int64(fw.Pos()) {
return 0, errors.New("write exceeds buffer size")
}
return fw.f.WriteAt(buf, pos)
}
// AddPadding adds zero byte padding until the file size is a multiple size.
func (fw *FileWriter) AddPadding(size int) error {
p := fw.position % uint64(size)
if p == 0 {
return nil
}
p = uint64(size) - p
if _, err := fw.Write(make([]byte, p)); err != nil {
return errors.Wrap(err, "add padding")
}
return nil
}
func (fw *FileWriter) Close() error {
if err := fw.Flush(); err != nil {
return err
}
if err := fw.f.Sync(); err != nil {
return err
}
return fw.f.Close()
}
func (fw *FileWriter) Load() (io.ReadCloser, error) {
f, err := os.Open(fw.name)
if err != nil {
return nil, err
}
return f, nil
}
func (fw *FileWriter) Remove() error {
return os.Remove(fw.name)
}
func (fw *FileWriter) Bytes() ([]byte, error) {
// First, ensure all is flushed
if err := fw.Flush(); err != nil {
return nil, err
}
if _, err := fw.f.Seek(0, io.SeekStart); err != nil {
return nil, err
}
return io.ReadAll(fw.f)
}
type MemWriter struct {
buf *bytes.Buffer
}
// NewBufferWriter returns a new BufferWriter.
// todo: pooling memory
func NewBufferWriter() *MemWriter {
return &MemWriter{
buf: bytes.NewBuffer(nil),
}
}
func (bw *MemWriter) Write(p []byte) (n int, err error) {
n, err = bw.buf.Write(p)
return n, err
}
func (bw *MemWriter) Pos() uint64 {
return uint64(bw.buf.Len())
}
func (bw *MemWriter) WriteBufs(bufs ...[]byte) error {
for _, b := range bufs {
if _, err := bw.Write(b); err != nil {
return err
}
}
return nil
}
func (bw *MemWriter) ReadFrom(r io.Reader) (int64, error) {
return io.Copy(bw.buf, r)
}
func (bw *MemWriter) WriteAt(buf []byte, pos int64) (int, error) {
if pos+int64(len(buf)) > int64(bw.buf.Len()) {
return 0, errors.New("write exceeds buffer size")
}
// Get current bytes
bytes := bw.buf.Bytes()
// Copy buf into correct position
copy(bytes[pos:], buf)
return len(buf), nil
}
// AddPadding adds zero byte padding until the file size is a multiple of size.
func (bw *MemWriter) AddPadding(size int) error {
if size <= 0 {
return nil
}
p := bw.buf.Len() % size
if p == 0 {
return nil
}
p = size - p
padding := make([]byte, p)
n, err := bw.Write(padding)
if err != nil {
return err
}
if n != len(padding) {
return errors.New("failed to write padding")
}
return nil
}
func (bw *MemWriter) Bytes() ([]byte, error) {
return bw.buf.Bytes(), nil
}
func (bw *MemWriter) Close() error { return nil }
func (bw *MemWriter) Flush() error { return nil }
func (bw *MemWriter) Remove() error { return nil }
func (bw *MemWriter) Load() (io.ReadCloser, error) {
return io.NopCloser(bytes.NewReader(bw.buf.Bytes())), nil
}
Loading…
Cancel
Save