helpers for tar+gz over bloom blocks (#10849)

Simple start, but helpful for building & extracting tar+gz archives over
bloom block directories.
pull/10798/head^2
Owen Diehl 2 years ago committed by GitHub
parent c071447900
commit 7f02ef3fa7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      pkg/storage/bloom/v1/TODO.md
  2. 100
      pkg/storage/bloom/v1/archive.go
  3. 77
      pkg/storage/bloom/v1/archive_test.go

@ -4,4 +4,7 @@
* implement streaming encoding.Decbuf over io.ReadSeeker
* Build & load from directories
* Less copying! I've taken some shortcuts we'll need to refactor to avoid copying []byte around in a few places
* more sophisticated querying methods
* more sophisticated querying methods
* queue access to blooms
* io.reader based decoder
* tar support

@ -0,0 +1,100 @@
package v1
import (
"archive/tar"
"io"
"os"
"path/filepath"
"github.com/pkg/errors"
"github.com/grafana/loki/pkg/chunkenc"
)
func TarGz(dst io.Writer, src *DirectoryBlockReader) error {
if err := src.Init(); err != nil {
return errors.Wrap(err, "error initializing directory block reader")
}
gzipper := chunkenc.GetWriterPool(chunkenc.EncGZIP).GetWriter(dst)
defer gzipper.Close()
tarballer := tar.NewWriter(gzipper)
defer tarballer.Close()
for _, f := range []*os.File{src.index, src.blooms} {
info, err := f.Stat()
if err != nil {
return errors.Wrapf(err, "error stat'ing file %s", f.Name())
}
header, err := tar.FileInfoHeader(info, f.Name())
if err != nil {
return errors.Wrapf(err, "error creating tar header for file %s", f.Name())
}
if err := tarballer.WriteHeader(header); err != nil {
return errors.Wrapf(err, "error writing tar header for file %s", f.Name())
}
if _, err := io.Copy(tarballer, f); err != nil {
return errors.Wrapf(err, "error writing file %s to tarball", f.Name())
}
}
return nil
}
func UnTarGz(dst string, r io.Reader) error {
gzipper, err := chunkenc.GetReaderPool(chunkenc.EncGZIP).GetReader(r)
if err != nil {
return errors.Wrap(err, "error getting gzip reader")
}
tarballer := tar.NewReader(gzipper)
for {
header, err := tarballer.Next()
if err == io.EOF {
break
}
if err != nil {
return errors.Wrap(err, "error reading tarball header")
}
target := filepath.Join(dst, header.Name)
// check the file type
switch header.Typeflag {
// if its a dir and it doesn't exist create it
case tar.TypeDir:
if _, err := os.Stat(target); err != nil {
if err := os.MkdirAll(target, 0755); err != nil {
return err
}
}
// if it's a file create it
case tar.TypeReg:
f, err := os.OpenFile(target, os.O_CREATE|os.O_RDWR|os.O_TRUNC, os.FileMode(header.Mode))
if err != nil {
return errors.Wrapf(err, "error creating file %s", target)
}
// copy over contents
if _, err := io.Copy(f, tarballer); err != nil {
return errors.Wrapf(err, "error copying contents of file %s", target)
}
// manually close here after each file operation; defering would cause each file close
// to wait until all operations have completed.
if err := f.Close(); err != nil {
return errors.Wrapf(err, "error closing file %s", target)
}
}
}
return nil
}

@ -0,0 +1,77 @@
package v1
import (
"bytes"
"io"
"testing"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/chunkenc"
)
func TestArchive(t *testing.T) {
// for writing files to two dirs for comparison and ensuring they're equal
dir1 := t.TempDir()
dir2 := t.TempDir()
numSeries := 100
data := mkBasicSeriesWithBlooms(numSeries, 0, 0xffff, 0, 10000)
builder, err := NewBlockBuilder(
BlockOptions{
schema: Schema{
version: DefaultSchemaVersion,
encoding: chunkenc.EncSnappy,
},
SeriesPageSize: 100,
BloomPageSize: 10 << 10,
},
NewDirectoryBlockWriter(dir1),
)
require.Nil(t, err)
itr := NewSliceIter[SeriesWithBloom](data)
require.Nil(t, builder.BuildFrom(itr))
reader := NewDirectoryBlockReader(dir1)
w := bytes.NewBuffer(nil)
require.Nil(t, TarGz(w, reader))
require.Nil(t, UnTarGz(dir2, w))
reader2 := NewDirectoryBlockReader(dir2)
// Check Index is byte for byte equivalent
srcIndex, err := reader.Index()
require.Nil(t, err)
_, err = srcIndex.Seek(0, io.SeekStart)
require.Nil(t, err)
dstIndex, err := reader2.Index()
require.Nil(t, err)
_, err = dstIndex.Seek(0, io.SeekStart)
require.Nil(t, err)
srcIndexBytes, err := io.ReadAll(srcIndex)
require.Nil(t, err)
dstIndexBytes, err := io.ReadAll(dstIndex)
require.Nil(t, err)
require.Equal(t, srcIndexBytes, dstIndexBytes)
// Check Blooms is byte for byte equivalent
srcBlooms, err := reader.Blooms()
require.Nil(t, err)
_, err = srcBlooms.Seek(0, io.SeekStart)
require.Nil(t, err)
dstBlooms, err := reader2.Blooms()
require.Nil(t, err)
_, err = dstBlooms.Seek(0, io.SeekStart)
require.Nil(t, err)
srcBloomsBytes, err := io.ReadAll(srcBlooms)
require.Nil(t, err)
dstBloomsBytes, err := io.ReadAll(dstBlooms)
require.Nil(t, err)
require.Equal(t, srcBloomsBytes, dstBloomsBytes)
}
Loading…
Cancel
Save