[BloomShipper] updated bloom shipper to extract the block using the library (#10876)

Initial implementation was done based on the assumption that we would
use ZIP to archive the blocks.
However, we will use `.tar.gz` archives, so, the implementation has been
adjusted to use helpers from the library that allow to archive/unarchive
`.tar.gz` files.
re: https://github.com/grafana/loki/pull/10849

Signed-off-by: Vladyslav Diachenko <vlad.diachenko@grafana.com>
pull/10875/head^2
Vladyslav Diachenko 2 years ago committed by GitHub
parent a9b2dee0b7
commit 127a8e27e7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 34
      pkg/storage/stores/shipper/bloomshipper/shipper.go
  2. 25
      pkg/storage/stores/shipper/bloomshipper/shipper_test.go

@ -1,7 +1,6 @@
package bloomshipper
import (
"archive/zip"
"context"
"fmt"
"io"
@ -137,7 +136,7 @@ func isOutsideRange(
b.EndTimestamp < startTimestamp || b.StartTimestamp > endTimestamp
}
// unzip the bytes into directory and returns absolute path to this directory.
// extract the files into directory and returns absolute path to this directory.
func (s *Shipper) extractBlock(block *Block, ts time.Time) (string, error) {
workingDirectoryPath := filepath.Join(s.config.WorkingDirectory, block.BlockPath, strconv.FormatInt(ts.UnixMilli(), 10))
err := os.MkdirAll(workingDirectoryPath, os.ModePerm)
@ -182,34 +181,9 @@ func writeDataToTempFile(workingDirectoryPath string, block *Block) (string, err
}
func extractArchive(archivePath string, workingDirectoryPath string) error {
reader, err := zip.OpenReader(archivePath)
file, err := os.Open(archivePath)
if err != nil {
return fmt.Errorf("error opening archive: %w", err)
return fmt.Errorf("error opening archive file %s: %w", file.Name(), err)
}
defer reader.Close()
for _, file := range reader.File {
err := extractInnerFile(file, workingDirectoryPath)
if err != nil {
return fmt.Errorf("error extracting %s file from archive: %w", file.Name, err)
}
}
return nil
}
func extractInnerFile(file *zip.File, workingDirectoryPath string) error {
innerFile, err := file.Open()
if err != nil {
return fmt.Errorf("error opening file: %w", err)
}
defer innerFile.Close()
extractedInnerFile, err := os.Create(filepath.Join(workingDirectoryPath, file.Name))
if err != nil {
return fmt.Errorf("error creating empty file: %w", err)
}
defer extractedInnerFile.Close()
_, err = io.Copy(extractedInnerFile, innerFile)
if err != nil {
return fmt.Errorf("error writing data: %w", err)
}
return nil
return v1.UnTarGz(workingDirectoryPath, file)
}

@ -1,7 +1,6 @@
package bloomshipper
import (
"archive/zip"
"bytes"
"io"
"math"
@ -14,6 +13,7 @@ import (
"github.com/google/uuid"
"github.com/stretchr/testify/require"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper/bloomshipperconfig"
)
@ -146,24 +146,26 @@ const (
func Test_Shipper_extractBlock(t *testing.T) {
dir := t.TempDir()
blockFilePath := filepath.Join(dir, "test-block.zip")
file, err := os.OpenFile(blockFilePath, os.O_CREATE|os.O_RDWR, 0700)
require.NoError(t, err)
writer := zip.NewWriter(file)
bloomFileWriter, err := writer.Create(bloomFileName)
mockBlockDir := filepath.Join(dir, "mock-block-dir")
err := os.MkdirAll(mockBlockDir, 0777)
require.NoError(t, err)
bloomFile, err := os.Create(filepath.Join(mockBlockDir, bloomFileName))
require.NoError(t, err)
bloomFileContent := uuid.NewString()
_, err = io.Copy(bloomFileWriter, bytes.NewReader([]byte(bloomFileContent)))
_, err = io.Copy(bloomFile, bytes.NewReader([]byte(bloomFileContent)))
require.NoError(t, err)
seriesFileWriter, err := writer.Create(seriesFileName)
seriesFile, err := os.Create(filepath.Join(mockBlockDir, seriesFileName))
require.NoError(t, err)
seriesFileContent := uuid.NewString()
_, err = io.Copy(seriesFileWriter, bytes.NewReader([]byte(seriesFileContent)))
_, err = io.Copy(seriesFile, bytes.NewReader([]byte(seriesFileContent)))
require.NoError(t, err)
err = writer.Close()
blockFilePath := filepath.Join(dir, "test-block-archive")
file, err := os.OpenFile(blockFilePath, os.O_CREATE|os.O_RDWR, 0700)
require.NoError(t, err)
err = file.Close()
err = v1.TarGz(file, v1.NewDirectoryBlockReader(mockBlockDir))
require.NoError(t, err)
blockFile, err := os.OpenFile(blockFilePath, os.O_RDONLY, 0700)
@ -176,6 +178,7 @@ func Test_Shipper_extractBlock(t *testing.T) {
BlockRef: BlockRef{BlockPath: "first-period-19621/tenantA/metas/ff-fff-1695272400-1695276000-aaa"},
Data: blockFile,
}
actualPath, err := shipper.extractBlock(&block, ts)
require.NoError(t, err)

Loading…
Cancel
Save