chore(TSDB): add hooks for in-memory only tsdb creation (#14734)

Signed-off-by: Owen Diehl <ow.diehl@gmail.com>
pull/14761/head
Owen Diehl 7 months ago committed by GitHub
parent 0704f5d365
commit 1c993f9bb0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 95
      pkg/storage/stores/shipper/indexshipper/tsdb/builder.go
  2. 38
      pkg/storage/stores/shipper/indexshipper/tsdb/index/index.go
  3. 2
      pkg/storage/stores/shipper/indexshipper/tsdb/index/index_test.go

@ -3,6 +3,7 @@ package tsdb
import (
"context"
"fmt"
"io"
"math/rand"
"os"
"path/filepath"
@ -111,12 +112,47 @@ func (b *Builder) Build(
name := fmt.Sprintf("%s-%x.staging", index.IndexFilename, rng)
tmpPath := filepath.Join(scratchDir, name)
var writer *index.Creator
writer, err := index.NewFileWriterWithVersion(ctx, b.version, tmpPath)
if err != nil {
return id, err
}
writer, err = index.NewWriterWithVersion(ctx, b.version, tmpPath)
if _, err := b.build(writer, false); err != nil {
return id, err
}
reader, err := index.NewFileReader(tmpPath)
if err != nil {
return id, err
}
from, through := reader.Bounds()
// load the newly compacted index to grab checksum, promptly close
dst := createFn(model.Time(from), model.Time(through), reader.Checksum())
reader.Close()
defer func() {
if err != nil {
os.RemoveAll(tmpPath)
}
}()
if err := chunk_util.EnsureDirectory(filepath.Dir(dst.Path())); err != nil {
return id, err
}
dstPath := dst.Path()
if err := os.Rename(tmpPath, dstPath); err != nil {
return id, err
}
return dst, nil
}
func (b *Builder) build(
writer *index.Creator,
reader bool, // whether to return the ReadCloser of the underlying DB
) (io.ReadCloser, error) {
// TODO(owen-d): multithread
// Sort series
@ -155,7 +191,7 @@ func (b *Builder) Build(
// Add symbols
for _, symbol := range symbols {
if err := writer.AddSymbol(symbol); err != nil {
return id, err
return nil, err
}
}
@ -165,38 +201,45 @@ func (b *Builder) Build(
s.chunks = s.chunks.Finalize()
}
if err := writer.AddSeries(storage.SeriesRef(i), s.labels, s.fp, s.chunks...); err != nil {
return id, err
return nil, err
}
}
if _, err := writer.Close(false); err != nil {
return id, err
}
return writer.Close(reader)
}
reader, err := index.NewFileReader(tmpPath)
func (b *Builder) BuildInMemory(
ctx context.Context,
// Determines how to create the resulting Identifier and file name.
// This is variable as we use Builder for multiple reasons,
// such as building multi-tenant tsdbs on the ingester
// and per tenant ones during compaction
createFn func(from, through model.Time, checksum uint32) Identifier,
) (id Identifier, data []byte, err error) {
writer, err := index.NewMemWriterWithVersion(ctx, b.version)
if err != nil {
return id, err
return id, nil, err
}
from, through := reader.Bounds()
// load the newly compacted index to grab checksum, promptly close
dst := createFn(model.Time(from), model.Time(through), reader.Checksum())
reader.Close()
defer func() {
if err != nil {
os.RemoveAll(tmpPath)
}
}()
readCloser, err := b.build(writer, true)
if err != nil {
return id, nil, err
}
defer readCloser.Close()
if err := chunk_util.EnsureDirectory(filepath.Dir(dst.Path())); err != nil {
return id, err
data, err = io.ReadAll(readCloser)
if err != nil {
return nil, nil, err
}
dstPath := dst.Path()
if err := os.Rename(tmpPath, dstPath); err != nil {
return id, err
reader, err := index.NewReader(index.RealByteSlice(data))
if err != nil {
return id, nil, err
}
defer reader.Close()
return dst, nil
from, through := reader.Bounds()
id = createFn(model.Time(from), model.Time(through), reader.Checksum())
return id, data, nil
}

@ -211,7 +211,8 @@ func NewTOCFromByteSlice(bs ByteSlice) (*TOC, error) {
}, nil
}
func NewWriterWithVersion(ctx context.Context, version int, fn string) (*Creator, error) {
// For writing TSDBs using temporary files
func NewFileWriterWithVersion(ctx context.Context, version int, fn string) (*Creator, error) {
dir := filepath.Dir(fn)
df, err := fileutil.OpenDir(dir)
@ -243,12 +244,39 @@ func NewWriterWithVersion(ctx context.Context, version int, fn string) (*Creator
return nil, errors.Wrap(err, "sync dir")
}
return newWriter(
ctx,
version,
f,
fP,
fPO,
)
}
// For writing TSDBs in memory
func NewMemWriterWithVersion(ctx context.Context, version int) (*Creator, error) {
return newWriter(
ctx,
version,
NewBufferWriter(),
NewBufferWriter(),
NewBufferWriter(),
)
}
func newWriter(
ctx context.Context,
version int,
fWriter writer,
postingsWriter writer,
postingOffsetsWriter writer,
) (*Creator, error) {
iw := &Creator{
Version: version,
ctx: ctx,
f: f,
fP: fP,
fPO: fPO,
f: fWriter,
fP: postingsWriter,
fPO: postingOffsetsWriter,
stage: idxStageNone,
// Reusable memory.
@ -267,7 +295,7 @@ func NewWriterWithVersion(ctx context.Context, version int, fn string) (*Creator
// NewWriter returns a new Writer to the given filename.
func NewWriter(ctx context.Context, indexFormat int, fn string) (*Creator, error) {
return NewWriterWithVersion(ctx, indexFormat, fn)
return NewFileWriterWithVersion(ctx, indexFormat, fn)
}
func (w *Creator) write(bufs ...[]byte) error {

@ -727,7 +727,7 @@ func TestDecoder_ChunkSamples(t *testing.T) {
},
} {
t.Run(name, func(t *testing.T) {
iw, err := NewWriterWithVersion(context.Background(), FormatV2, filepath.Join(dir, name))
iw, err := NewFileWriterWithVersion(context.Background(), FormatV2, filepath.Join(dir, name))
require.NoError(t, err)
syms := []string{}

Loading…
Cancel
Save