Compression of bloom blocks (#11267)

**What this PR does / why we need it**:
Compress bloom blocks when writing to object storage, and uncompress
when reading from object storage.

**Which issue(s) this PR fixes**:
Fixes #<issue number>

**Special notes for your reviewer**:

**Checklist**
- [ ] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- [ ] Documentation added
- [ ] Tests updated
- [ ] `CHANGELOG.md` updated
- [ ] If the change is worth mentioning in the release notes, add
`add-to-release-notes` label
- [ ] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/setup/upgrade/_index.md`
- [ ] For Helm chart changes bump the Helm chart version in
`production/helm/loki/Chart.yaml` and update
`production/helm/loki/CHANGELOG.md` and
`production/helm/loki/README.md`. [Example
PR](d10549e3ec)
- [ ] If the change is deprecating or removing a configuration option,
update the `deprecated-config.yaml` and `deleted-config.yaml` files
respectively in the `tools/deprecated-config-checker` directory.
[Example
PR](0d4416a4b0)
pull/11131/head^2
Paul Rogers 2 years ago committed by GitHub
parent e6940691c9
commit af177034ed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 14
      pkg/bloomcompactor/bloomcompactor.go
  2. 42
      pkg/storage/bloom/v1/archive.go
  3. 8
      pkg/storage/bloom/v1/block_writer.go
  4. 4
      pkg/storage/bloom/v1/reader.go
  5. 75
      pkg/storage/stores/shipper/bloomshipper/client.go
  6. 71
      pkg/storage/stores/shipper/bloomshipper/client_test.go
  7. 5
      pkg/storage/stores/shipper/bloomshipper/shipper.go
  8. 5
      pkg/storage/stores/shipper/bloomshipper/shipper_test.go

@ -61,9 +61,11 @@ import (
"github.com/grafana/loki/pkg/util" "github.com/grafana/loki/pkg/util"
) )
// TODO: Make a constants file somewhere
const ( const (
fpRate = 0.01 fpRate = 0.01
bloomFileName = "bloom" bloomFileName = "bloom"
seriesFileName = "series"
) )
type Compactor struct { type Compactor struct {
@ -485,6 +487,11 @@ func buildBloomBlock(ctx context.Context, logger log.Logger, bloomForChks v1.Ser
level.Error(logger).Log("reading bloomBlock", err) level.Error(logger).Log("reading bloomBlock", err)
} }
indexFile, err := os.Open(filepath.Join(localDst, seriesFileName))
if err != nil {
level.Error(logger).Log("reading bloomBlock", err)
}
blocks := bloomshipper.Block{ blocks := bloomshipper.Block{
BlockRef: bloomshipper.BlockRef{ BlockRef: bloomshipper.BlockRef{
Ref: bloomshipper.Ref{ Ref: bloomshipper.Ref{
@ -498,7 +505,8 @@ func buildBloomBlock(ctx context.Context, logger log.Logger, bloomForChks v1.Ser
}, },
IndexPath: job.IndexPath(), IndexPath: job.IndexPath(),
}, },
Data: blockFile, BloomData: blockFile,
IndexData: indexFile,
} }
return blocks, nil return blocks, nil

@ -5,12 +5,46 @@ import (
"io" "io"
"os" "os"
"path/filepath" "path/filepath"
"strings"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/grafana/loki/pkg/chunkenc" "github.com/grafana/loki/pkg/chunkenc"
) )
func TarGzMemory(dst io.Writer, src *ByteReader) error {
gzipper := chunkenc.GetWriterPool(chunkenc.EncGZIP).GetWriter(dst)
defer gzipper.Close()
tarballer := tar.NewWriter(gzipper)
defer tarballer.Close()
header := &tar.Header{
Name: SeriesFileName,
Size: int64(src.index.Len()),
}
// Write the header
if err := tarballer.WriteHeader(header); err != nil {
return errors.Wrapf(err, "error writing tar header for index file")
}
// Write the file contents
if _, err := tarballer.Write(src.index.Bytes()); err != nil {
return errors.Wrapf(err, "error writing file contents for index file")
}
header = &tar.Header{
Name: BloomFileName,
Size: int64(src.blooms.Len()),
}
if err := tarballer.WriteHeader(header); err != nil {
return errors.Wrapf(err, "error writing tar header for bloom file")
}
if _, err := tarballer.Write(src.blooms.Bytes()); err != nil {
return errors.Wrapf(err, "error writing file contents for bloom file")
}
return nil
}
func TarGz(dst io.Writer, src *DirectoryBlockReader) error { func TarGz(dst io.Writer, src *DirectoryBlockReader) error {
if err := src.Init(); err != nil { if err := src.Init(); err != nil {
return errors.Wrap(err, "error initializing directory block reader") return errors.Wrap(err, "error initializing directory block reader")
@ -77,7 +111,13 @@ func UnTarGz(dst string, r io.Reader) error {
// if it's a file create it // if it's a file create it
case tar.TypeReg: case tar.TypeReg:
f, err := os.OpenFile(target, os.O_CREATE|os.O_RDWR|os.O_TRUNC, os.FileMode(header.Mode)) err := os.MkdirAll(target[:strings.LastIndex(target, "/")], 0755)
if err != nil {
return errors.Wrapf(err, "error creating directory %s", target)
}
// TODO: We need to settle on how best to handle file permissions and ownership
// This may be utilizing a zip file instead of tar.gz
f, err := os.OpenFile(target, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0755)
if err != nil { if err != nil {
return errors.Wrapf(err, "error creating file %s", target) return errors.Wrapf(err, "error creating file %s", target)
} }

@ -12,8 +12,8 @@ import (
) )
const ( const (
bloomFileName = "bloom" BloomFileName = "bloom"
seriesFileName = "series" SeriesFileName = "series"
) )
type BlockWriter interface { type BlockWriter interface {
@ -66,12 +66,12 @@ func (b *DirectoryBlockWriter) Init() error {
return errors.Wrap(err, "creating bloom block dir") return errors.Wrap(err, "creating bloom block dir")
} }
b.index, err = os.Create(filepath.Join(b.dir, seriesFileName)) b.index, err = os.Create(filepath.Join(b.dir, SeriesFileName))
if err != nil { if err != nil {
return errors.Wrap(err, "creating series file") return errors.Wrap(err, "creating series file")
} }
b.blooms, err = os.Create(filepath.Join(b.dir, bloomFileName)) b.blooms, err = os.Create(filepath.Join(b.dir, BloomFileName))
if err != nil { if err != nil {
return errors.Wrap(err, "creating bloom file") return errors.Wrap(err, "creating bloom file")
} }

@ -49,12 +49,12 @@ func NewDirectoryBlockReader(dir string) *DirectoryBlockReader {
func (r *DirectoryBlockReader) Init() error { func (r *DirectoryBlockReader) Init() error {
if !r.initialized { if !r.initialized {
var err error var err error
r.index, err = os.Open(filepath.Join(r.dir, seriesFileName)) r.index, err = os.Open(filepath.Join(r.dir, SeriesFileName))
if err != nil { if err != nil {
return errors.Wrap(err, "opening series file") return errors.Wrap(err, "opening series file")
} }
r.blooms, err = os.Open(filepath.Join(r.dir, bloomFileName)) r.blooms, err = os.Open(filepath.Join(r.dir, BloomFileName))
if err != nil { if err != nil {
return errors.Wrap(err, "opening bloom file") return errors.Wrap(err, "opening bloom file")
} }

@ -1,16 +1,20 @@
package bloomshipper package bloomshipper
import ( import (
"bufio"
"bytes" "bytes"
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
"os"
"path/filepath" "path/filepath"
"strconv" "strconv"
"strings" "strings"
"time" "time"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/grafana/dskit/concurrency" "github.com/grafana/dskit/concurrency"
@ -75,7 +79,8 @@ type MetaClient interface {
type Block struct { type Block struct {
BlockRef BlockRef
Data io.ReadCloser IndexData io.ReadCloser
BloomData io.ReadCloser
} }
type BlockClient interface { type BlockClient interface {
@ -205,13 +210,35 @@ func (b *BloomClient) GetBlocks(ctx context.Context, references []BlockRef) (cha
return fmt.Errorf("error while period lookup: %w", err) return fmt.Errorf("error while period lookup: %w", err)
} }
objectClient := b.periodicObjectClients[period] objectClient := b.periodicObjectClients[period]
readCloser, _, err := objectClient.GetObject(ctx, createBlockObjectKey(reference.Ref)) compressedObjectReadCloser, _, err := objectClient.GetObject(ctx, createBlockObjectKey(reference.Ref))
if err != nil { if err != nil {
return fmt.Errorf("error while fetching object from storage: %w", err) return fmt.Errorf("error while fetching object from storage: %w", err)
} }
defer func() {
compressedObjectReadCloser.Close()
}()
workingDirectoryPath := filepath.Join(b.storageConfig.BloomShipperConfig.WorkingDirectory, reference.BlockPath, strconv.FormatInt(time.Now().UTC().UnixMilli(), 10))
err = v1.UnTarGz(workingDirectoryPath, compressedObjectReadCloser)
if err != nil {
return fmt.Errorf("error while untarring: %w", err)
}
indexFile, err := os.Open(filepath.Join(workingDirectoryPath, v1.SeriesFileName))
if err != nil {
return fmt.Errorf("error while opening index file: %w", err)
}
indexReader := bufio.NewReader(indexFile)
bloomFile, err := os.Open(filepath.Join(workingDirectoryPath, v1.BloomFileName))
if err != nil {
return fmt.Errorf("error while opening bloom file: %w", err)
}
bloomReader := bufio.NewReader(bloomFile)
blocksChannel <- Block{ blocksChannel <- Block{
BlockRef: reference, BlockRef: reference,
Data: readCloser, BloomData: io.NopCloser(bloomReader),
IndexData: io.NopCloser(indexReader),
} }
return nil return nil
}) })
@ -225,7 +252,25 @@ func (b *BloomClient) GetBlocks(ctx context.Context, references []BlockRef) (cha
return blocksChannel, errChannel return blocksChannel, errChannel
} }
// TODO zip (archive) blocks before uploading to storage func readCloserToBuffer(rc io.ReadCloser) *bytes.Buffer {
defer rc.Close()
// Read the data from io.ReadCloser
data, err := io.ReadAll(rc)
if err != nil {
return nil
}
// Write the data into a bytes.Buffer
var buf bytes.Buffer
_, err = buf.Write(data)
if err != nil {
return nil
}
return &buf
}
func (b *BloomClient) PutBlocks(ctx context.Context, blocks []Block) ([]Block, error) { func (b *BloomClient) PutBlocks(ctx context.Context, blocks []Block) ([]Block, error) {
results := make([]Block, len(blocks)) results := make([]Block, len(blocks))
//todo move concurrency to the config //todo move concurrency to the config
@ -233,7 +278,11 @@ func (b *BloomClient) PutBlocks(ctx context.Context, blocks []Block) ([]Block, e
block := blocks[idx] block := blocks[idx]
defer func(Data io.ReadCloser) { defer func(Data io.ReadCloser) {
_ = Data.Close() _ = Data.Close()
}(block.Data) }(block.BloomData)
defer func(Data io.ReadCloser) {
_ = Data.Close()
}(block.IndexData)
period, err := findPeriod(b.periodicConfigs, block.StartTimestamp) period, err := findPeriod(b.periodicConfigs, block.StartTimestamp)
if err != nil { if err != nil {
@ -241,11 +290,19 @@ func (b *BloomClient) PutBlocks(ctx context.Context, blocks []Block) ([]Block, e
} }
key := createBlockObjectKey(block.Ref) key := createBlockObjectKey(block.Ref)
objectClient := b.periodicObjectClients[period] objectClient := b.periodicObjectClients[period]
data, err := io.ReadAll(block.Data) byteReader := v1.NewByteReader(readCloserToBuffer(block.IndexData), readCloserToBuffer(block.BloomData))
// TODO: Right now, this is asymetrical with the GetBlocks path. We have all the pieces
// in memory now, so it doesn't necessarily make sense to write the files to disk. That may change
// as we finalize on an archive format, and we may want to just house the downloaded files in memory instead.
// Create a buffer to write data
buf := new(bytes.Buffer)
err = v1.TarGzMemory(buf, byteReader)
if err != nil { if err != nil {
return fmt.Errorf("error while reading object data: %w", err) return fmt.Errorf("error while tarring object data: %w", err)
} }
err = objectClient.PutObject(ctx, key, bytes.NewReader(data))
err = objectClient.PutObject(ctx, key, bytes.NewReader(buf.Bytes()))
if err != nil { if err != nil {
return fmt.Errorf("error updloading block file: %w", err) return fmt.Errorf("error updloading block file: %w", err)
} }

@ -1,7 +1,9 @@
package bloomshipper package bloomshipper
import ( import (
"archive/tar"
"bytes" "bytes"
"compress/gzip"
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
@ -13,6 +15,8 @@ import (
"testing" "testing"
"time" "time"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
aws_io "github.com/aws/smithy-go/io" aws_io "github.com/aws/smithy-go/io"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
@ -183,6 +187,8 @@ func Test_BloomClient_GetBlocks(t *testing.T) {
secondBlockData := createBlockFile(t, secondBlockFullPath) secondBlockData := createBlockFile(t, secondBlockFullPath)
require.FileExists(t, firstBlockFullPath) require.FileExists(t, firstBlockFullPath)
require.FileExists(t, secondBlockFullPath) require.FileExists(t, secondBlockFullPath)
rootDir := filepath.Join(fsNamedStores["folder-1"].Directory, "bloom")
defer os.RemoveAll(rootDir)
firstBlockRef := BlockRef{ firstBlockRef := BlockRef{
Ref: Ref{ Ref: Ref{
@ -212,7 +218,7 @@ func Test_BloomClient_GetBlocks(t *testing.T) {
blocksToDownload := []BlockRef{firstBlockRef, secondBlockRef} blocksToDownload := []BlockRef{firstBlockRef, secondBlockRef}
blocksCh, errorsCh := shipper.GetBlocks(context.Background(), blocksToDownload) blocksCh, errorsCh := shipper.GetBlocks(context.Background(), blocksToDownload)
blocks := make(map[string]string) blocks := make(map[string][]byte)
func() { func() {
timout := time.After(5 * time.Second) timout := time.After(5 * time.Second)
for { for {
@ -226,13 +232,14 @@ func Test_BloomClient_GetBlocks(t *testing.T) {
if !ok { if !ok {
return return
} }
blockData, err := io.ReadAll(block.Data) blockData, err := io.ReadAll(block.BloomData)
require.NoError(t, err) require.NoError(t, err)
blocks[block.BlockRef.BlockPath] = string(blockData) blocks[block.BlockRef.BlockPath] = blockData
} }
} }
}() }()
defer os.RemoveAll("./bloom")
firstBlockActualData, exists := blocks[firstBlockRef.BlockPath] firstBlockActualData, exists := blocks[firstBlockRef.BlockPath]
require.Truef(t, exists, "data for the first block must be present in the results: %+v", blocks) require.Truef(t, exists, "data for the first block must be present in the results: %+v", blocks)
@ -245,9 +252,42 @@ func Test_BloomClient_GetBlocks(t *testing.T) {
require.Len(t, blocks, 2) require.Len(t, blocks, 2)
} }
func extractFileFromTGZ(tarGzData []byte, targetFileName string) []byte {
gzReader, err := gzip.NewReader(bytes.NewReader(tarGzData))
if err != nil {
return nil
}
defer gzReader.Close()
tarReader := tar.NewReader(gzReader)
for {
header, err := tarReader.Next()
if err == io.EOF {
break
}
if err != nil {
return nil
}
if header.Name == targetFileName {
buffer := new(bytes.Buffer)
if _, err := io.Copy(buffer, tarReader); err != nil {
return nil
}
return buffer.Bytes()
}
}
return nil
}
func Test_BloomClient_PutBlocks(t *testing.T) { func Test_BloomClient_PutBlocks(t *testing.T) {
shipper := createShipper(t) shipper := createShipper(t)
blockForFirstFolderData := "data1" blockForFirstFolderData := "data1"
indexForFirstFolderData := "index1"
blockForFirstFolder := Block{ blockForFirstFolder := Block{
BlockRef: BlockRef{ BlockRef: BlockRef{
Ref: Ref{ Ref: Ref{
@ -261,10 +301,12 @@ func Test_BloomClient_PutBlocks(t *testing.T) {
}, },
IndexPath: uuid.New().String(), IndexPath: uuid.New().String(),
}, },
Data: aws_io.ReadSeekNopCloser{ReadSeeker: bytes.NewReader([]byte(blockForFirstFolderData))}, BloomData: aws_io.ReadSeekNopCloser{ReadSeeker: bytes.NewReader([]byte(blockForFirstFolderData))},
IndexData: aws_io.ReadSeekNopCloser{ReadSeeker: bytes.NewReader([]byte(indexForFirstFolderData))},
} }
blockForSecondFolderData := "data2" blockForSecondFolderData := "data2"
indexForSecondFolderData := "index2"
blockForSecondFolder := Block{ blockForSecondFolder := Block{
BlockRef: BlockRef{ BlockRef: BlockRef{
Ref: Ref{ Ref: Ref{
@ -278,7 +320,8 @@ func Test_BloomClient_PutBlocks(t *testing.T) {
}, },
IndexPath: uuid.New().String(), IndexPath: uuid.New().String(),
}, },
Data: aws_io.ReadSeekNopCloser{ReadSeeker: bytes.NewReader([]byte(blockForSecondFolderData))}, BloomData: aws_io.ReadSeekNopCloser{ReadSeeker: bytes.NewReader([]byte(blockForSecondFolderData))},
IndexData: aws_io.ReadSeekNopCloser{ReadSeeker: bytes.NewReader([]byte(indexForSecondFolderData))},
} }
results, err := shipper.PutBlocks(context.Background(), []Block{blockForFirstFolder, blockForSecondFolder}) results, err := shipper.PutBlocks(context.Background(), []Block{blockForFirstFolder, blockForSecondFolder})
@ -300,7 +343,7 @@ func Test_BloomClient_PutBlocks(t *testing.T) {
require.FileExists(t, savedFilePath) require.FileExists(t, savedFilePath)
savedData, err := os.ReadFile(savedFilePath) savedData, err := os.ReadFile(savedFilePath)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, blockForFirstFolderData, string(savedData)) require.Equal(t, blockForFirstFolderData, string(extractFileFromTGZ(savedData, "bloom")))
secondResultBlock := results[1] secondResultBlock := results[1]
path = secondResultBlock.BlockPath path = secondResultBlock.BlockPath
@ -319,7 +362,7 @@ func Test_BloomClient_PutBlocks(t *testing.T) {
require.FileExists(t, savedFilePath) require.FileExists(t, savedFilePath)
savedData, err = os.ReadFile(savedFilePath) savedData, err = os.ReadFile(savedFilePath)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, blockForSecondFolderData, string(savedData)) require.Equal(t, blockForSecondFolderData, string(extractFileFromTGZ(savedData, "bloom")))
} }
func Test_BloomClient_DeleteBlocks(t *testing.T) { func Test_BloomClient_DeleteBlocks(t *testing.T) {
@ -364,13 +407,19 @@ func Test_BloomClient_DeleteBlocks(t *testing.T) {
require.NoFileExists(t, block2Path) require.NoFileExists(t, block2Path)
} }
func createBlockFile(t *testing.T, path string) string { func createBlockFile(t *testing.T, path string) []byte {
err := os.MkdirAll(path[:strings.LastIndex(path, "/")], 0755) err := os.MkdirAll(path[:strings.LastIndex(path, "/")], 0755)
require.NoError(t, err) require.NoError(t, err)
fileContent := uuid.NewString() bloomContent := []byte(uuid.NewString())
err = os.WriteFile(path, []byte(fileContent), 0700) indexContent := []byte(uuid.NewString())
outputFile, err := os.Create(path)
require.NoError(t, err)
byteReader := v1.NewByteReader(bytes.NewBuffer(indexContent), bytes.NewBuffer(bloomContent))
err = v1.TarGzMemory(outputFile, byteReader)
require.NoError(t, err)
err = outputFile.Close()
require.NoError(t, err) require.NoError(t, err)
return fileContent return bloomContent
} }
func Test_TablesByPeriod(t *testing.T) { func Test_TablesByPeriod(t *testing.T) {

@ -207,7 +207,8 @@ func (s *Shipper) createBlockQuerier(directory string) *v1.BlockQuerier {
} }
func writeDataToTempFile(workingDirectoryPath string, block *Block) (string, error) { func writeDataToTempFile(workingDirectoryPath string, block *Block) (string, error) {
defer block.Data.Close() defer block.BloomData.Close()
defer block.IndexData.Close()
archivePath := filepath.Join(workingDirectoryPath, block.BlockPath[strings.LastIndex(block.BlockPath, delimiter)+1:]) archivePath := filepath.Join(workingDirectoryPath, block.BlockPath[strings.LastIndex(block.BlockPath, delimiter)+1:])
archiveFile, err := os.Create(archivePath) archiveFile, err := os.Create(archivePath)
@ -215,7 +216,7 @@ func writeDataToTempFile(workingDirectoryPath string, block *Block) (string, err
return "", fmt.Errorf("error creating empty file to store the archiver: %w", err) return "", fmt.Errorf("error creating empty file to store the archiver: %w", err)
} }
defer archiveFile.Close() defer archiveFile.Close()
_, err = io.Copy(archiveFile, block.Data) _, err = io.Copy(archiveFile, block.BloomData)
if err != nil { if err != nil {
return "", fmt.Errorf("error writing data to archive file: %w", err) return "", fmt.Errorf("error writing data to archive file: %w", err)
} }

@ -245,8 +245,9 @@ func Test_Shipper_extractBlock(t *testing.T) {
shipper := Shipper{config: config.Config{WorkingDirectory: workingDir}} shipper := Shipper{config: config.Config{WorkingDirectory: workingDir}}
ts := time.Now().UTC() ts := time.Now().UTC()
block := Block{ block := Block{
BlockRef: BlockRef{BlockPath: "first-period-19621/tenantA/metas/ff-fff-1695272400-1695276000-aaa"}, BlockRef: BlockRef{BlockPath: "first-period-19621/tenantA/metas/ff-fff-1695272400-1695276000-aaa"},
Data: blockFile, BloomData: blockFile,
IndexData: seriesFile,
} }
actualPath, err := shipper.extractBlock(&block, ts) actualPath, err := shipper.extractBlock(&block, ts)

Loading…
Cancel
Save