From af177034edc0a69b8982a79bf55593fa579fbf66 Mon Sep 17 00:00:00 2001 From: Paul Rogers <129207811+paul1r@users.noreply.github.com> Date: Wed, 22 Nov 2023 08:07:43 -0500 Subject: [PATCH] Compression of bloom blocks (#11267) **What this PR does / why we need it**: Compress bloom blocks when writing to object storage, and uncompress when reading from object storage. **Which issue(s) this PR fixes**: Fixes # **Special notes for your reviewer**: **Checklist** - [ ] Reviewed the [`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md) guide (**required**) - [ ] Documentation added - [ ] Tests updated - [ ] `CHANGELOG.md` updated - [ ] If the change is worth mentioning in the release notes, add `add-to-release-notes` label - [ ] Changes that require user attention or interaction to upgrade are documented in `docs/sources/setup/upgrade/_index.md` - [ ] For Helm chart changes bump the Helm chart version in `production/helm/loki/Chart.yaml` and update `production/helm/loki/CHANGELOG.md` and `production/helm/loki/README.md`. [Example PR](https://github.com/grafana/loki/commit/d10549e3ece02120974929894ee333d07755d213) - [ ] If the change is deprecating or removing a configuration option, update the `deprecated-config.yaml` and `deleted-config.yaml` files respectively in the `tools/deprecated-config-checker` directory. [Example PR](https://github.com/grafana/loki/pull/10840/commits/0d4416a4b03739583349934b96f272fb4f685d15) --- pkg/bloomcompactor/bloomcompactor.go | 14 +++- pkg/storage/bloom/v1/archive.go | 42 ++++++++++- pkg/storage/bloom/v1/block_writer.go | 8 +- pkg/storage/bloom/v1/reader.go | 4 +- .../stores/shipper/bloomshipper/client.go | 75 ++++++++++++++++--- .../shipper/bloomshipper/client_test.go | 71 +++++++++++++++--- .../stores/shipper/bloomshipper/shipper.go | 5 +- .../shipper/bloomshipper/shipper_test.go | 5 +- 8 files changed, 190 insertions(+), 34 deletions(-) diff --git a/pkg/bloomcompactor/bloomcompactor.go b/pkg/bloomcompactor/bloomcompactor.go index 71dbb08380..f004936e10 100644 --- a/pkg/bloomcompactor/bloomcompactor.go +++ b/pkg/bloomcompactor/bloomcompactor.go @@ -61,9 +61,11 @@ import ( "github.com/grafana/loki/pkg/util" ) +// TODO: Make a constants file somewhere const ( - fpRate = 0.01 - bloomFileName = "bloom" + fpRate = 0.01 + bloomFileName = "bloom" + seriesFileName = "series" ) type Compactor struct { @@ -485,6 +487,11 @@ func buildBloomBlock(ctx context.Context, logger log.Logger, bloomForChks v1.Ser level.Error(logger).Log("reading bloomBlock", err) } + indexFile, err := os.Open(filepath.Join(localDst, seriesFileName)) + if err != nil { + level.Error(logger).Log("reading bloomBlock", err) + } + blocks := bloomshipper.Block{ BlockRef: bloomshipper.BlockRef{ Ref: bloomshipper.Ref{ @@ -498,7 +505,8 @@ func buildBloomBlock(ctx context.Context, logger log.Logger, bloomForChks v1.Ser }, IndexPath: job.IndexPath(), }, - Data: blockFile, + BloomData: blockFile, + IndexData: indexFile, } return blocks, nil diff --git a/pkg/storage/bloom/v1/archive.go b/pkg/storage/bloom/v1/archive.go index 4c0b124a05..7f252e3bde 100644 --- a/pkg/storage/bloom/v1/archive.go +++ b/pkg/storage/bloom/v1/archive.go @@ -5,12 +5,46 @@ import ( "io" "os" "path/filepath" + "strings" "github.com/pkg/errors" "github.com/grafana/loki/pkg/chunkenc" ) +func TarGzMemory(dst io.Writer, src *ByteReader) error { + gzipper := chunkenc.GetWriterPool(chunkenc.EncGZIP).GetWriter(dst) + defer gzipper.Close() + + tarballer := tar.NewWriter(gzipper) + defer tarballer.Close() + + header := &tar.Header{ + Name: SeriesFileName, + Size: int64(src.index.Len()), + } + // Write the header + if err := tarballer.WriteHeader(header); err != nil { + return errors.Wrapf(err, "error writing tar header for index file") + } + // Write the file contents + if _, err := tarballer.Write(src.index.Bytes()); err != nil { + return errors.Wrapf(err, "error writing file contents for index file") + } + + header = &tar.Header{ + Name: BloomFileName, + Size: int64(src.blooms.Len()), + } + if err := tarballer.WriteHeader(header); err != nil { + return errors.Wrapf(err, "error writing tar header for bloom file") + } + if _, err := tarballer.Write(src.blooms.Bytes()); err != nil { + return errors.Wrapf(err, "error writing file contents for bloom file") + } + return nil +} + func TarGz(dst io.Writer, src *DirectoryBlockReader) error { if err := src.Init(); err != nil { return errors.Wrap(err, "error initializing directory block reader") @@ -77,7 +111,13 @@ func UnTarGz(dst string, r io.Reader) error { // if it's a file create it case tar.TypeReg: - f, err := os.OpenFile(target, os.O_CREATE|os.O_RDWR|os.O_TRUNC, os.FileMode(header.Mode)) + err := os.MkdirAll(target[:strings.LastIndex(target, "/")], 0755) + if err != nil { + return errors.Wrapf(err, "error creating directory %s", target) + } + // TODO: We need to settle on how best to handle file permissions and ownership + // This may be utilizing a zip file instead of tar.gz + f, err := os.OpenFile(target, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0755) if err != nil { return errors.Wrapf(err, "error creating file %s", target) } diff --git a/pkg/storage/bloom/v1/block_writer.go b/pkg/storage/bloom/v1/block_writer.go index 317d1e5984..99ab65ef9c 100644 --- a/pkg/storage/bloom/v1/block_writer.go +++ b/pkg/storage/bloom/v1/block_writer.go @@ -12,8 +12,8 @@ import ( ) const ( - bloomFileName = "bloom" - seriesFileName = "series" + BloomFileName = "bloom" + SeriesFileName = "series" ) type BlockWriter interface { @@ -66,12 +66,12 @@ func (b *DirectoryBlockWriter) Init() error { return errors.Wrap(err, "creating bloom block dir") } - b.index, err = os.Create(filepath.Join(b.dir, seriesFileName)) + b.index, err = os.Create(filepath.Join(b.dir, SeriesFileName)) if err != nil { return errors.Wrap(err, "creating series file") } - b.blooms, err = os.Create(filepath.Join(b.dir, bloomFileName)) + b.blooms, err = os.Create(filepath.Join(b.dir, BloomFileName)) if err != nil { return errors.Wrap(err, "creating bloom file") } diff --git a/pkg/storage/bloom/v1/reader.go b/pkg/storage/bloom/v1/reader.go index e4de9609b9..d5c70a2b64 100644 --- a/pkg/storage/bloom/v1/reader.go +++ b/pkg/storage/bloom/v1/reader.go @@ -49,12 +49,12 @@ func NewDirectoryBlockReader(dir string) *DirectoryBlockReader { func (r *DirectoryBlockReader) Init() error { if !r.initialized { var err error - r.index, err = os.Open(filepath.Join(r.dir, seriesFileName)) + r.index, err = os.Open(filepath.Join(r.dir, SeriesFileName)) if err != nil { return errors.Wrap(err, "opening series file") } - r.blooms, err = os.Open(filepath.Join(r.dir, bloomFileName)) + r.blooms, err = os.Open(filepath.Join(r.dir, BloomFileName)) if err != nil { return errors.Wrap(err, "opening bloom file") } diff --git a/pkg/storage/stores/shipper/bloomshipper/client.go b/pkg/storage/stores/shipper/bloomshipper/client.go index a68959e1d9..76cc4c2bfd 100644 --- a/pkg/storage/stores/shipper/bloomshipper/client.go +++ b/pkg/storage/stores/shipper/bloomshipper/client.go @@ -1,16 +1,20 @@ package bloomshipper import ( + "bufio" "bytes" "context" "encoding/json" "fmt" "io" + "os" "path/filepath" "strconv" "strings" "time" + v1 "github.com/grafana/loki/pkg/storage/bloom/v1" + "github.com/prometheus/common/model" "github.com/grafana/dskit/concurrency" @@ -75,7 +79,8 @@ type MetaClient interface { type Block struct { BlockRef - Data io.ReadCloser + IndexData io.ReadCloser + BloomData io.ReadCloser } type BlockClient interface { @@ -205,13 +210,35 @@ func (b *BloomClient) GetBlocks(ctx context.Context, references []BlockRef) (cha return fmt.Errorf("error while period lookup: %w", err) } objectClient := b.periodicObjectClients[period] - readCloser, _, err := objectClient.GetObject(ctx, createBlockObjectKey(reference.Ref)) + compressedObjectReadCloser, _, err := objectClient.GetObject(ctx, createBlockObjectKey(reference.Ref)) if err != nil { return fmt.Errorf("error while fetching object from storage: %w", err) } + defer func() { + compressedObjectReadCloser.Close() + }() + + workingDirectoryPath := filepath.Join(b.storageConfig.BloomShipperConfig.WorkingDirectory, reference.BlockPath, strconv.FormatInt(time.Now().UTC().UnixMilli(), 10)) + err = v1.UnTarGz(workingDirectoryPath, compressedObjectReadCloser) + if err != nil { + return fmt.Errorf("error while untarring: %w", err) + } + + indexFile, err := os.Open(filepath.Join(workingDirectoryPath, v1.SeriesFileName)) + if err != nil { + return fmt.Errorf("error while opening index file: %w", err) + } + indexReader := bufio.NewReader(indexFile) + + bloomFile, err := os.Open(filepath.Join(workingDirectoryPath, v1.BloomFileName)) + if err != nil { + return fmt.Errorf("error while opening bloom file: %w", err) + } + bloomReader := bufio.NewReader(bloomFile) blocksChannel <- Block{ - BlockRef: reference, - Data: readCloser, + BlockRef: reference, + BloomData: io.NopCloser(bloomReader), + IndexData: io.NopCloser(indexReader), } return nil }) @@ -225,7 +252,25 @@ func (b *BloomClient) GetBlocks(ctx context.Context, references []BlockRef) (cha return blocksChannel, errChannel } -// TODO zip (archive) blocks before uploading to storage +func readCloserToBuffer(rc io.ReadCloser) *bytes.Buffer { + defer rc.Close() + + // Read the data from io.ReadCloser + data, err := io.ReadAll(rc) + if err != nil { + return nil + } + + // Write the data into a bytes.Buffer + var buf bytes.Buffer + _, err = buf.Write(data) + if err != nil { + return nil + } + + return &buf +} + func (b *BloomClient) PutBlocks(ctx context.Context, blocks []Block) ([]Block, error) { results := make([]Block, len(blocks)) //todo move concurrency to the config @@ -233,7 +278,11 @@ func (b *BloomClient) PutBlocks(ctx context.Context, blocks []Block) ([]Block, e block := blocks[idx] defer func(Data io.ReadCloser) { _ = Data.Close() - }(block.Data) + }(block.BloomData) + + defer func(Data io.ReadCloser) { + _ = Data.Close() + }(block.IndexData) period, err := findPeriod(b.periodicConfigs, block.StartTimestamp) if err != nil { @@ -241,11 +290,19 @@ func (b *BloomClient) PutBlocks(ctx context.Context, blocks []Block) ([]Block, e } key := createBlockObjectKey(block.Ref) objectClient := b.periodicObjectClients[period] - data, err := io.ReadAll(block.Data) + byteReader := v1.NewByteReader(readCloserToBuffer(block.IndexData), readCloserToBuffer(block.BloomData)) + + // TODO: Right now, this is asymetrical with the GetBlocks path. We have all the pieces + // in memory now, so it doesn't necessarily make sense to write the files to disk. That may change + // as we finalize on an archive format, and we may want to just house the downloaded files in memory instead. + // Create a buffer to write data + buf := new(bytes.Buffer) + err = v1.TarGzMemory(buf, byteReader) if err != nil { - return fmt.Errorf("error while reading object data: %w", err) + return fmt.Errorf("error while tarring object data: %w", err) } - err = objectClient.PutObject(ctx, key, bytes.NewReader(data)) + + err = objectClient.PutObject(ctx, key, bytes.NewReader(buf.Bytes())) if err != nil { return fmt.Errorf("error updloading block file: %w", err) } diff --git a/pkg/storage/stores/shipper/bloomshipper/client_test.go b/pkg/storage/stores/shipper/bloomshipper/client_test.go index 4c4b6f855a..6031e8bb06 100644 --- a/pkg/storage/stores/shipper/bloomshipper/client_test.go +++ b/pkg/storage/stores/shipper/bloomshipper/client_test.go @@ -1,7 +1,9 @@ package bloomshipper import ( + "archive/tar" "bytes" + "compress/gzip" "context" "encoding/json" "fmt" @@ -13,6 +15,8 @@ import ( "testing" "time" + v1 "github.com/grafana/loki/pkg/storage/bloom/v1" + aws_io "github.com/aws/smithy-go/io" "github.com/google/uuid" "github.com/prometheus/common/model" @@ -183,6 +187,8 @@ func Test_BloomClient_GetBlocks(t *testing.T) { secondBlockData := createBlockFile(t, secondBlockFullPath) require.FileExists(t, firstBlockFullPath) require.FileExists(t, secondBlockFullPath) + rootDir := filepath.Join(fsNamedStores["folder-1"].Directory, "bloom") + defer os.RemoveAll(rootDir) firstBlockRef := BlockRef{ Ref: Ref{ @@ -212,7 +218,7 @@ func Test_BloomClient_GetBlocks(t *testing.T) { blocksToDownload := []BlockRef{firstBlockRef, secondBlockRef} blocksCh, errorsCh := shipper.GetBlocks(context.Background(), blocksToDownload) - blocks := make(map[string]string) + blocks := make(map[string][]byte) func() { timout := time.After(5 * time.Second) for { @@ -226,13 +232,14 @@ func Test_BloomClient_GetBlocks(t *testing.T) { if !ok { return } - blockData, err := io.ReadAll(block.Data) + blockData, err := io.ReadAll(block.BloomData) require.NoError(t, err) - blocks[block.BlockRef.BlockPath] = string(blockData) + blocks[block.BlockRef.BlockPath] = blockData } } }() + defer os.RemoveAll("./bloom") firstBlockActualData, exists := blocks[firstBlockRef.BlockPath] require.Truef(t, exists, "data for the first block must be present in the results: %+v", blocks) @@ -245,9 +252,42 @@ func Test_BloomClient_GetBlocks(t *testing.T) { require.Len(t, blocks, 2) } +func extractFileFromTGZ(tarGzData []byte, targetFileName string) []byte { + gzReader, err := gzip.NewReader(bytes.NewReader(tarGzData)) + if err != nil { + return nil + } + defer gzReader.Close() + + tarReader := tar.NewReader(gzReader) + + for { + header, err := tarReader.Next() + + if err == io.EOF { + break + } + + if err != nil { + return nil + } + + if header.Name == targetFileName { + buffer := new(bytes.Buffer) + if _, err := io.Copy(buffer, tarReader); err != nil { + return nil + } + return buffer.Bytes() + } + } + + return nil +} + func Test_BloomClient_PutBlocks(t *testing.T) { shipper := createShipper(t) blockForFirstFolderData := "data1" + indexForFirstFolderData := "index1" blockForFirstFolder := Block{ BlockRef: BlockRef{ Ref: Ref{ @@ -261,10 +301,12 @@ func Test_BloomClient_PutBlocks(t *testing.T) { }, IndexPath: uuid.New().String(), }, - Data: aws_io.ReadSeekNopCloser{ReadSeeker: bytes.NewReader([]byte(blockForFirstFolderData))}, + BloomData: aws_io.ReadSeekNopCloser{ReadSeeker: bytes.NewReader([]byte(blockForFirstFolderData))}, + IndexData: aws_io.ReadSeekNopCloser{ReadSeeker: bytes.NewReader([]byte(indexForFirstFolderData))}, } blockForSecondFolderData := "data2" + indexForSecondFolderData := "index2" blockForSecondFolder := Block{ BlockRef: BlockRef{ Ref: Ref{ @@ -278,7 +320,8 @@ func Test_BloomClient_PutBlocks(t *testing.T) { }, IndexPath: uuid.New().String(), }, - Data: aws_io.ReadSeekNopCloser{ReadSeeker: bytes.NewReader([]byte(blockForSecondFolderData))}, + BloomData: aws_io.ReadSeekNopCloser{ReadSeeker: bytes.NewReader([]byte(blockForSecondFolderData))}, + IndexData: aws_io.ReadSeekNopCloser{ReadSeeker: bytes.NewReader([]byte(indexForSecondFolderData))}, } results, err := shipper.PutBlocks(context.Background(), []Block{blockForFirstFolder, blockForSecondFolder}) @@ -300,7 +343,7 @@ func Test_BloomClient_PutBlocks(t *testing.T) { require.FileExists(t, savedFilePath) savedData, err := os.ReadFile(savedFilePath) require.NoError(t, err) - require.Equal(t, blockForFirstFolderData, string(savedData)) + require.Equal(t, blockForFirstFolderData, string(extractFileFromTGZ(savedData, "bloom"))) secondResultBlock := results[1] path = secondResultBlock.BlockPath @@ -319,7 +362,7 @@ func Test_BloomClient_PutBlocks(t *testing.T) { require.FileExists(t, savedFilePath) savedData, err = os.ReadFile(savedFilePath) require.NoError(t, err) - require.Equal(t, blockForSecondFolderData, string(savedData)) + require.Equal(t, blockForSecondFolderData, string(extractFileFromTGZ(savedData, "bloom"))) } func Test_BloomClient_DeleteBlocks(t *testing.T) { @@ -364,13 +407,19 @@ func Test_BloomClient_DeleteBlocks(t *testing.T) { require.NoFileExists(t, block2Path) } -func createBlockFile(t *testing.T, path string) string { +func createBlockFile(t *testing.T, path string) []byte { err := os.MkdirAll(path[:strings.LastIndex(path, "/")], 0755) require.NoError(t, err) - fileContent := uuid.NewString() - err = os.WriteFile(path, []byte(fileContent), 0700) + bloomContent := []byte(uuid.NewString()) + indexContent := []byte(uuid.NewString()) + outputFile, err := os.Create(path) + require.NoError(t, err) + byteReader := v1.NewByteReader(bytes.NewBuffer(indexContent), bytes.NewBuffer(bloomContent)) + err = v1.TarGzMemory(outputFile, byteReader) + require.NoError(t, err) + err = outputFile.Close() require.NoError(t, err) - return fileContent + return bloomContent } func Test_TablesByPeriod(t *testing.T) { diff --git a/pkg/storage/stores/shipper/bloomshipper/shipper.go b/pkg/storage/stores/shipper/bloomshipper/shipper.go index 2df1f41cd4..0272a8e4f7 100644 --- a/pkg/storage/stores/shipper/bloomshipper/shipper.go +++ b/pkg/storage/stores/shipper/bloomshipper/shipper.go @@ -207,7 +207,8 @@ func (s *Shipper) createBlockQuerier(directory string) *v1.BlockQuerier { } func writeDataToTempFile(workingDirectoryPath string, block *Block) (string, error) { - defer block.Data.Close() + defer block.BloomData.Close() + defer block.IndexData.Close() archivePath := filepath.Join(workingDirectoryPath, block.BlockPath[strings.LastIndex(block.BlockPath, delimiter)+1:]) archiveFile, err := os.Create(archivePath) @@ -215,7 +216,7 @@ func writeDataToTempFile(workingDirectoryPath string, block *Block) (string, err return "", fmt.Errorf("error creating empty file to store the archiver: %w", err) } defer archiveFile.Close() - _, err = io.Copy(archiveFile, block.Data) + _, err = io.Copy(archiveFile, block.BloomData) if err != nil { return "", fmt.Errorf("error writing data to archive file: %w", err) } diff --git a/pkg/storage/stores/shipper/bloomshipper/shipper_test.go b/pkg/storage/stores/shipper/bloomshipper/shipper_test.go index 45450c0e38..2f662b2b79 100644 --- a/pkg/storage/stores/shipper/bloomshipper/shipper_test.go +++ b/pkg/storage/stores/shipper/bloomshipper/shipper_test.go @@ -245,8 +245,9 @@ func Test_Shipper_extractBlock(t *testing.T) { shipper := Shipper{config: config.Config{WorkingDirectory: workingDir}} ts := time.Now().UTC() block := Block{ - BlockRef: BlockRef{BlockPath: "first-period-19621/tenantA/metas/ff-fff-1695272400-1695276000-aaa"}, - Data: blockFile, + BlockRef: BlockRef{BlockPath: "first-period-19621/tenantA/metas/ff-fff-1695272400-1695276000-aaa"}, + BloomData: blockFile, + IndexData: seriesFile, } actualPath, err := shipper.extractBlock(&block, ts)