diff --git a/pkg/storage/bloom/v1/TODO.md b/pkg/storage/bloom/v1/TODO.md index 984cb02c43..d7d2e49e9c 100644 --- a/pkg/storage/bloom/v1/TODO.md +++ b/pkg/storage/bloom/v1/TODO.md @@ -4,4 +4,7 @@ * implement streaming encoding.Decbuf over io.ReadSeeker * Build & load from directories * Less copying! I've taken some shortcuts we'll need to refactor to avoid copying []byte around in a few places -* more sophisticated querying methods \ No newline at end of file +* more sophisticated querying methods +* queue access to blooms +* io.reader based decoder +* tar support \ No newline at end of file diff --git a/pkg/storage/bloom/v1/archive.go b/pkg/storage/bloom/v1/archive.go new file mode 100644 index 0000000000..4c0b124a05 --- /dev/null +++ b/pkg/storage/bloom/v1/archive.go @@ -0,0 +1,100 @@ +package v1 + +import ( + "archive/tar" + "io" + "os" + "path/filepath" + + "github.com/pkg/errors" + + "github.com/grafana/loki/pkg/chunkenc" +) + +func TarGz(dst io.Writer, src *DirectoryBlockReader) error { + if err := src.Init(); err != nil { + return errors.Wrap(err, "error initializing directory block reader") + } + + gzipper := chunkenc.GetWriterPool(chunkenc.EncGZIP).GetWriter(dst) + defer gzipper.Close() + + tarballer := tar.NewWriter(gzipper) + defer tarballer.Close() + + for _, f := range []*os.File{src.index, src.blooms} { + info, err := f.Stat() + if err != nil { + return errors.Wrapf(err, "error stat'ing file %s", f.Name()) + } + + header, err := tar.FileInfoHeader(info, f.Name()) + if err != nil { + return errors.Wrapf(err, "error creating tar header for file %s", f.Name()) + } + + if err := tarballer.WriteHeader(header); err != nil { + return errors.Wrapf(err, "error writing tar header for file %s", f.Name()) + } + + if _, err := io.Copy(tarballer, f); err != nil { + return errors.Wrapf(err, "error writing file %s to tarball", f.Name()) + } + + } + return nil +} + +func UnTarGz(dst string, r io.Reader) error { + gzipper, err := chunkenc.GetReaderPool(chunkenc.EncGZIP).GetReader(r) + if err != nil { + return errors.Wrap(err, "error getting gzip reader") + } + + tarballer := tar.NewReader(gzipper) + + for { + header, err := tarballer.Next() + if err == io.EOF { + break + } + if err != nil { + return errors.Wrap(err, "error reading tarball header") + } + + target := filepath.Join(dst, header.Name) + + // check the file type + switch header.Typeflag { + + // if its a dir and it doesn't exist create it + case tar.TypeDir: + if _, err := os.Stat(target); err != nil { + if err := os.MkdirAll(target, 0755); err != nil { + return err + } + } + + // if it's a file create it + case tar.TypeReg: + f, err := os.OpenFile(target, os.O_CREATE|os.O_RDWR|os.O_TRUNC, os.FileMode(header.Mode)) + if err != nil { + return errors.Wrapf(err, "error creating file %s", target) + } + + // copy over contents + if _, err := io.Copy(f, tarballer); err != nil { + return errors.Wrapf(err, "error copying contents of file %s", target) + } + + // manually close here after each file operation; defering would cause each file close + // to wait until all operations have completed. + if err := f.Close(); err != nil { + return errors.Wrapf(err, "error closing file %s", target) + } + } + + } + + return nil +} diff --git a/pkg/storage/bloom/v1/archive_test.go b/pkg/storage/bloom/v1/archive_test.go new file mode 100644 index 0000000000..3d49118051 --- /dev/null +++ b/pkg/storage/bloom/v1/archive_test.go @@ -0,0 +1,77 @@ +package v1 + +import ( + "bytes" + "io" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/pkg/chunkenc" +) + +func TestArchive(t *testing.T) { + // for writing files to two dirs for comparison and ensuring they're equal + dir1 := t.TempDir() + dir2 := t.TempDir() + + numSeries := 100 + data := mkBasicSeriesWithBlooms(numSeries, 0, 0xffff, 0, 10000) + + builder, err := NewBlockBuilder( + BlockOptions{ + schema: Schema{ + version: DefaultSchemaVersion, + encoding: chunkenc.EncSnappy, + }, + SeriesPageSize: 100, + BloomPageSize: 10 << 10, + }, + NewDirectoryBlockWriter(dir1), + ) + + require.Nil(t, err) + itr := NewSliceIter[SeriesWithBloom](data) + require.Nil(t, builder.BuildFrom(itr)) + + reader := NewDirectoryBlockReader(dir1) + + w := bytes.NewBuffer(nil) + require.Nil(t, TarGz(w, reader)) + + require.Nil(t, UnTarGz(dir2, w)) + + reader2 := NewDirectoryBlockReader(dir2) + + // Check Index is byte for byte equivalent + srcIndex, err := reader.Index() + require.Nil(t, err) + _, err = srcIndex.Seek(0, io.SeekStart) + require.Nil(t, err) + dstIndex, err := reader2.Index() + require.Nil(t, err) + _, err = dstIndex.Seek(0, io.SeekStart) + require.Nil(t, err) + + srcIndexBytes, err := io.ReadAll(srcIndex) + require.Nil(t, err) + dstIndexBytes, err := io.ReadAll(dstIndex) + require.Nil(t, err) + require.Equal(t, srcIndexBytes, dstIndexBytes) + + // Check Blooms is byte for byte equivalent + srcBlooms, err := reader.Blooms() + require.Nil(t, err) + _, err = srcBlooms.Seek(0, io.SeekStart) + require.Nil(t, err) + dstBlooms, err := reader2.Blooms() + require.Nil(t, err) + _, err = dstBlooms.Seek(0, io.SeekStart) + require.Nil(t, err) + + srcBloomsBytes, err := io.ReadAll(srcBlooms) + require.Nil(t, err) + dstBloomsBytes, err := io.ReadAll(dstBlooms) + require.Nil(t, err) + require.Equal(t, srcBloomsBytes, dstBloomsBytes) +}