bloom blocks downloading queue (#11201)

implemented bloom blocks downloading queue to control the concurrency of
downloading the blocks from the storage

Signed-off-by: Vladyslav Diachenko <vlad.diachenko@grafana.com>
pull/11347/head
Vladyslav Diachenko 2 years ago committed by GitHub
parent 10fe48b815
commit 75cfe59596
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 14
      docs/sources/configure/_index.md
  2. 4
      pkg/bloomgateway/bloomgateway.go
  3. 26
      pkg/bloomgateway/bloomgateway_test.go
  4. 1
      pkg/bloomgateway/sharding.go
  5. 2
      pkg/loki/modules.go
  6. 8
      pkg/storage/bloom/v1/block_writer.go
  7. 4
      pkg/storage/bloom/v1/reader.go
  8. 230
      pkg/storage/stores/shipper/bloomshipper/block_downloader.go
  9. 168
      pkg/storage/stores/shipper/bloomshipper/block_downloader_test.go
  10. 51
      pkg/storage/stores/shipper/bloomshipper/client.go
  11. 81
      pkg/storage/stores/shipper/bloomshipper/client_test.go
  12. 14
      pkg/storage/stores/shipper/bloomshipper/config/config.go
  13. 100
      pkg/storage/stores/shipper/bloomshipper/shipper.go
  14. 68
      pkg/storage/stores/shipper/bloomshipper/shipper_test.go
  15. 20
      pkg/validation/limits.go

@ -2248,6 +2248,16 @@ bloom_shipper:
# Working directory to store downloaded Bloom Blocks.
# CLI flag: -bloom.shipper.working-directory
[working_directory: <string> | default = "bloom-shipper"]
blocks_downloading_queue:
# The count of parallel workers that download Bloom Blocks.
# CLI flag: -bloom.shipper.blocks-downloading-queue.workers-count
[workers_count: <int> | default = 100]
# Maximum number of task in queue per tenant per bloom-gateway. Enqueuing
# the tasks above this limit will fail an error.
# CLI flag: -bloom.shipper.blocks-downloading-queue.max_tasks_enqueued_per_tenant
[max_tasks_enqueued_per_tenant: <int> | default = 10000]
```
### chunk_store_config
@ -2990,6 +3000,10 @@ shard_streams:
# CLI flag: -bloom-compactor.false-positive-rate
[bloom_false_positive_rate: <float> | default = 0.01]
# Maximum number of blocks will be downloaded in parallel by the Bloom Gateway.
# CLI flag: -bloom-gateway.blocks-downloading-parallelism
[bloom_gateway_blocks_downloading_parallelism: <int> | default = 50]
# Allow user to send structured metadata in push payload.
# CLI flag: -validation.allow-structured-metadata
[allow_structured_metadata: <boolean> | default = false]

@ -187,7 +187,7 @@ type Gateway struct {
}
// New returns a new instance of the Bloom Gateway.
func New(cfg Config, schemaCfg config.SchemaConfig, storageCfg storage.Config, shardingStrategy ShardingStrategy, cm storage.ClientMetrics, logger log.Logger, reg prometheus.Registerer) (*Gateway, error) {
func New(cfg Config, schemaCfg config.SchemaConfig, storageCfg storage.Config, overrides Limits, shardingStrategy ShardingStrategy, cm storage.ClientMetrics, logger log.Logger, reg prometheus.Registerer) (*Gateway, error) {
g := &Gateway{
cfg: cfg,
logger: logger,
@ -205,7 +205,7 @@ func New(cfg Config, schemaCfg config.SchemaConfig, storageCfg storage.Config, s
return nil, err
}
bloomShipper, err := bloomshipper.NewShipper(client, storageCfg.BloomShipperConfig, logger)
bloomShipper, err := bloomshipper.NewShipper(client, storageCfg.BloomShipperConfig, overrides, logger, reg)
if err != nil {
return nil, err
}

@ -82,7 +82,7 @@ func TestBloomGateway_StartStopService(t *testing.T) {
},
}
gw, err := New(cfg, schemaCfg, storageCfg, ss, cm, logger, reg)
gw, err := New(cfg, schemaCfg, storageCfg, fakeLimits{}, ss, cm, logger, reg)
require.NoError(t, err)
err = services.StartAndAwaitRunning(context.Background(), gw)
@ -142,7 +142,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
t.Run("returns unfiltered chunk refs if no filters provided", func(t *testing.T) {
reg := prometheus.NewRegistry()
gw, err := New(cfg, schemaCfg, storageCfg, ss, cm, logger, reg)
gw, err := New(cfg, schemaCfg, storageCfg, fakeLimits{}, ss, cm, logger, reg)
require.NoError(t, err)
err = services.StartAndAwaitRunning(context.Background(), gw)
@ -188,7 +188,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
t.Run("returns error if chunk refs do not belong to tenant", func(t *testing.T) {
reg := prometheus.NewRegistry()
gw, err := New(cfg, schemaCfg, storageCfg, ss, cm, logger, reg)
gw, err := New(cfg, schemaCfg, storageCfg, fakeLimits{}, ss, cm, logger, reg)
require.NoError(t, err)
ts, _ := time.Parse("2006-01-02 15:04", "2023-10-03 10:00")
@ -212,7 +212,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
t.Run("gateway tracks active users", func(t *testing.T) {
reg := prometheus.NewRegistry()
gw, err := New(cfg, schemaCfg, storageCfg, ss, cm, logger, reg)
gw, err := New(cfg, schemaCfg, storageCfg, fakeLimits{}, ss, cm, logger, reg)
require.NoError(t, err)
err = services.StartAndAwaitRunning(context.Background(), gw)
@ -248,3 +248,21 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
require.ElementsMatch(t, tenants, gw.activeUsers.ActiveUsers())
})
}
type fakeLimits struct {
}
func (f fakeLimits) BloomGatewayShardSize(_ string) int {
//TODO implement me
panic("implement me")
}
func (f fakeLimits) BloomGatewayEnabled(_ string) bool {
//TODO implement me
panic("implement me")
}
func (f fakeLimits) BloomGatewayBlocksDownloadingParallelism(_ string) int {
//TODO implement me
panic("implement me")
}

@ -38,6 +38,7 @@ var (
type Limits interface {
BloomGatewayShardSize(tenantID string) int
BloomGatewayEnabled(tenantID string) bool
BloomGatewayBlocksDownloadingParallelism(userID string) int
}
type ShardingStrategy interface {

@ -1263,7 +1263,7 @@ func (t *Loki) initBloomGateway() (services.Service, error) {
shuffleSharding := bloomgateway.NewShuffleShardingStrategy(t.bloomGatewayRingManager.Ring, t.bloomGatewayRingManager.RingLifecycler, t.Overrides, logger)
gateway, err := bloomgateway.New(t.Cfg.BloomGateway, t.Cfg.SchemaConfig, t.Cfg.StorageConfig, shuffleSharding, t.clientMetrics, logger, prometheus.DefaultRegisterer)
gateway, err := bloomgateway.New(t.Cfg.BloomGateway, t.Cfg.SchemaConfig, t.Cfg.StorageConfig, t.Overrides, shuffleSharding, t.clientMetrics, logger, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}

@ -12,8 +12,8 @@ import (
)
const (
bloomFileName = "bloom"
seriesFileName = "series"
BloomFileName = "bloom"
SeriesFileName = "series"
)
type BlockWriter interface {
@ -66,12 +66,12 @@ func (b *DirectoryBlockWriter) Init() error {
return errors.Wrap(err, "creating bloom block dir")
}
b.index, err = os.Create(filepath.Join(b.dir, seriesFileName))
b.index, err = os.Create(filepath.Join(b.dir, SeriesFileName))
if err != nil {
return errors.Wrap(err, "creating series file")
}
b.blooms, err = os.Create(filepath.Join(b.dir, bloomFileName))
b.blooms, err = os.Create(filepath.Join(b.dir, BloomFileName))
if err != nil {
return errors.Wrap(err, "creating bloom file")
}

@ -49,12 +49,12 @@ func NewDirectoryBlockReader(dir string) *DirectoryBlockReader {
func (r *DirectoryBlockReader) Init() error {
if !r.initialized {
var err error
r.index, err = os.Open(filepath.Join(r.dir, seriesFileName))
r.index, err = os.Open(filepath.Join(r.dir, SeriesFileName))
if err != nil {
return errors.Wrap(err, "opening series file")
}
r.blooms, err = os.Open(filepath.Join(r.dir, bloomFileName))
r.blooms, err = os.Open(filepath.Join(r.dir, BloomFileName))
if err != nil {
return errors.Wrap(err, "opening bloom file")
}

@ -0,0 +1,230 @@
package bloomshipper
import (
"context"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"strconv"
"strings"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
"github.com/grafana/loki/pkg/queue"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper/config"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/constants"
)
type blockDownloader struct {
logger log.Logger
workingDirectory string
queueMetrics *queue.Metrics
queue *queue.RequestQueue
blockClient BlockClient
limits Limits
activeUsersService *util.ActiveUsersCleanupService
ctx context.Context
manager *services.Manager
onWorkerStopCallback func()
}
func newBlockDownloader(config config.Config, blockClient BlockClient, limits Limits, logger log.Logger, reg prometheus.Registerer) (*blockDownloader, error) {
queueMetrics := queue.NewMetrics(reg, constants.Loki, "bloom_blocks_downloader")
//add cleanup service
downloadingQueue := queue.NewRequestQueue(config.BlocksDownloadingQueue.MaxTasksEnqueuedPerTenant, time.Minute, queueMetrics)
activeUsersService := util.NewActiveUsersCleanupWithDefaultValues(queueMetrics.Cleanup)
ctx := context.Background()
manager, err := services.NewManager(downloadingQueue, activeUsersService)
if err != nil {
return nil, fmt.Errorf("error creating service manager: %w", err)
}
err = services.StartManagerAndAwaitHealthy(ctx, manager)
if err != nil {
return nil, fmt.Errorf("error starting service manager: %w", err)
}
b := &blockDownloader{
ctx: ctx,
logger: logger,
workingDirectory: config.WorkingDirectory,
queueMetrics: queueMetrics,
queue: downloadingQueue,
blockClient: blockClient,
activeUsersService: activeUsersService,
limits: limits,
manager: manager,
onWorkerStopCallback: onWorkerStopNoopCallback,
}
for i := 0; i < config.BlocksDownloadingQueue.WorkersCount; i++ {
go b.serveDownloadingTasks(fmt.Sprintf("worker-%d", i))
}
return b, nil
}
type BlockDownloadingTask struct {
ctx context.Context
block BlockRef
// ErrCh is a send-only channel to write an error to
ErrCh chan<- error
// ResultsCh is a send-only channel to return the block querier for the downloaded block
ResultsCh chan<- blockWithQuerier
}
func NewBlockDownloadingTask(ctx context.Context, block BlockRef, resCh chan<- blockWithQuerier, errCh chan<- error) *BlockDownloadingTask {
return &BlockDownloadingTask{
ctx: ctx,
block: block,
ErrCh: errCh,
ResultsCh: resCh,
}
}
// noop implementation
var onWorkerStopNoopCallback = func() {}
func (d *blockDownloader) serveDownloadingTasks(workerID string) {
logger := log.With(d.logger, "worker", workerID)
level.Debug(logger).Log("msg", "starting worker")
d.queue.RegisterConsumerConnection(workerID)
defer d.queue.UnregisterConsumerConnection(workerID)
//this callback is used only in the tests to assert that worker is stopped
defer d.onWorkerStopCallback()
idx := queue.StartIndexWithLocalQueue
for {
item, newIdx, err := d.queue.Dequeue(d.ctx, idx, workerID)
if err != nil {
if !errors.Is(err, queue.ErrStopped) && !errors.Is(err, context.Canceled) {
level.Error(logger).Log("msg", "failed to dequeue task", "err", err)
continue
}
level.Info(logger).Log("msg", "stopping worker")
return
}
task, ok := item.(*BlockDownloadingTask)
if !ok {
level.Error(logger).Log("msg", "failed to cast to BlockDownloadingTask", "item", fmt.Sprintf("%+v", item), "type", fmt.Sprintf("%T", item))
continue
}
idx = newIdx
blockPath := task.block.BlockPath
//todo add cache before downloading
level.Debug(logger).Log("msg", "start downloading the block", "block", blockPath)
block, err := d.blockClient.GetBlock(task.ctx, task.block)
if err != nil {
level.Error(logger).Log("msg", "error downloading the block", "block", blockPath, "err", err)
task.ErrCh <- fmt.Errorf("error downloading the block %s : %w", blockPath, err)
continue
}
directory, err := d.extractBlock(&block, time.Now())
if err != nil {
level.Error(logger).Log("msg", "error extracting the block", "block", blockPath, "err", err)
task.ErrCh <- fmt.Errorf("error extracting the block %s : %w", blockPath, err)
continue
}
level.Debug(d.logger).Log("msg", "block has been downloaded and extracted", "block", task.block.BlockPath, "directory", directory)
blockQuerier := d.createBlockQuerier(directory)
task.ResultsCh <- blockWithQuerier{
BlockRef: task.block,
BlockQuerier: blockQuerier,
}
}
}
func (d *blockDownloader) downloadBlocks(ctx context.Context, tenantID string, references []BlockRef) (chan blockWithQuerier, chan error) {
d.activeUsersService.UpdateUserTimestamp(tenantID, time.Now())
// we need to have errCh with size that can keep max count of errors to prevent the case when
// the queue worker reported the error to this channel before the current goroutine
// and this goroutine will go to the deadlock because it won't be able to report an error
// because nothing reads this channel at this moment.
errCh := make(chan error, len(references))
blocksCh := make(chan blockWithQuerier, len(references))
downloadingParallelism := d.limits.BloomGatewayBlocksDownloadingParallelism(tenantID)
for _, reference := range references {
task := NewBlockDownloadingTask(ctx, reference, blocksCh, errCh)
level.Debug(d.logger).Log("msg", "enqueuing task to download block", "block", reference.BlockPath)
err := d.queue.Enqueue(tenantID, nil, task, downloadingParallelism, nil)
if err != nil {
errCh <- fmt.Errorf("error enquing downloading task for block %s : %w", reference.BlockPath, err)
return blocksCh, errCh
}
}
return blocksCh, errCh
}
type blockWithQuerier struct {
BlockRef
*v1.BlockQuerier
}
// extract the files into directory and returns absolute path to this directory.
func (d *blockDownloader) extractBlock(block *Block, ts time.Time) (string, error) {
workingDirectoryPath := filepath.Join(d.workingDirectory, block.BlockPath, strconv.FormatInt(ts.UnixMilli(), 10))
err := os.MkdirAll(workingDirectoryPath, os.ModePerm)
if err != nil {
return "", fmt.Errorf("can not create directory to extract the block: %w", err)
}
archivePath, err := writeDataToTempFile(workingDirectoryPath, block)
if err != nil {
return "", fmt.Errorf("error writing data to temp file: %w", err)
}
defer func() {
os.Remove(archivePath)
// todo log err
}()
err = extractArchive(archivePath, workingDirectoryPath)
if err != nil {
return "", fmt.Errorf("error extracting archive: %w", err)
}
return workingDirectoryPath, nil
}
func (d *blockDownloader) createBlockQuerier(directory string) *v1.BlockQuerier {
reader := v1.NewDirectoryBlockReader(directory)
block := v1.NewBlock(reader)
return v1.NewBlockQuerier(block)
}
func (d *blockDownloader) stop() {
_ = services.StopManagerAndAwaitStopped(d.ctx, d.manager)
}
func writeDataToTempFile(workingDirectoryPath string, block *Block) (string, error) {
defer block.Data.Close()
archivePath := filepath.Join(workingDirectoryPath, block.BlockPath[strings.LastIndex(block.BlockPath, delimiter)+1:])
archiveFile, err := os.Create(archivePath)
if err != nil {
return "", fmt.Errorf("error creating empty file to store the archiver: %w", err)
}
defer archiveFile.Close()
_, err = io.Copy(archiveFile, block.Data)
if err != nil {
return "", fmt.Errorf("error writing data to archive file: %w", err)
}
return archivePath, nil
}
func extractArchive(archivePath string, workingDirectoryPath string) error {
file, err := os.Open(archivePath)
if err != nil {
return fmt.Errorf("error opening archive file %s: %w", file.Name(), err)
}
return v1.UnTarGz(workingDirectoryPath, file)
}

@ -0,0 +1,168 @@
package bloomshipper
import (
"bytes"
"context"
"fmt"
"io"
"os"
"path/filepath"
"strconv"
"testing"
"time"
"github.com/go-kit/log"
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper/config"
"github.com/grafana/loki/pkg/validation"
)
func Test_blockDownloader_downloadBlocks(t *testing.T) {
overrides, err := validation.NewOverrides(validation.Limits{BloomGatewayBlocksDownloadingParallelism: 20}, nil)
require.NoError(t, err)
workingDirectory := t.TempDir()
blockReferences, blockClient := createFakeBlocks(t, 20)
blockClient.responseDelay = 100 * time.Millisecond
workersCount := 10
downloader, err := newBlockDownloader(config.Config{
WorkingDirectory: workingDirectory,
BlocksDownloadingQueue: config.DownloadingQueueConfig{
WorkersCount: workersCount,
MaxTasksEnqueuedPerTenant: 20,
},
}, blockClient, overrides, log.NewNopLogger(), prometheus.DefaultRegisterer)
stoppedWorkersCount := atomic.NewInt32(0)
downloader.onWorkerStopCallback = func() {
stoppedWorkersCount.Inc()
}
require.NoError(t, err)
blocksCh, errorsCh := downloader.downloadBlocks(context.Background(), "fake", blockReferences)
downloadedBlocks := make(map[string]any, len(blockReferences))
done := make(chan bool)
go func() {
for i := 0; i < 20; i++ {
block := <-blocksCh
downloadedBlocks[block.BlockPath] = nil
}
done <- true
}()
select {
//20 blocks, 10 workers, fixed delay 100ms per block: the total downloading time must be ~200ms.
case <-time.After(2 * time.Second):
t.Fatalf("test must complete before the timeout")
case err := <-errorsCh:
require.NoError(t, err)
case <-done:
}
require.Len(t, downloadedBlocks, 20, "all 20 block must be downloaded")
downloader.stop()
require.Eventuallyf(t, func() bool {
return stoppedWorkersCount.Load() == int32(workersCount)
}, 1*time.Second, 10*time.Millisecond, "expected all %d workers to be stopped", workersCount)
}
// creates fake blocks and returns map[block-path]Block and mockBlockClient
func createFakeBlocks(t *testing.T, count int) ([]BlockRef, *mockBlockClient) {
mockData := make(map[string]Block, count)
refs := make([]BlockRef, 0, count)
for i := 0; i < count; i++ {
archive, _, _ := createBlockArchive(t)
block := Block{
BlockRef: BlockRef{
BlockPath: fmt.Sprintf("block-path-%d", i),
},
Data: archive,
}
mockData[block.BlockPath] = block
refs = append(refs, block.BlockRef)
}
return refs, &mockBlockClient{mockData: mockData}
}
type mockBlockClient struct {
responseDelay time.Duration
mockData map[string]Block
}
func (m *mockBlockClient) GetBlock(_ context.Context, reference BlockRef) (Block, error) {
time.Sleep(m.responseDelay)
block, exists := m.mockData[reference.BlockPath]
if exists {
return block, nil
}
return block, fmt.Errorf("block %s is not found in mockData", reference.BlockPath)
}
func (m *mockBlockClient) PutBlocks(_ context.Context, _ []Block) ([]Block, error) {
panic("implement me")
}
func (m *mockBlockClient) DeleteBlocks(_ context.Context, _ []BlockRef) error {
panic("implement me")
}
func Test_blockDownloader_extractBlock(t *testing.T) {
blockFile, bloomFileContent, seriesFileContent := createBlockArchive(t)
workingDir := t.TempDir()
downloader := &blockDownloader{workingDirectory: workingDir}
ts := time.Now().UTC()
block := Block{
BlockRef: BlockRef{BlockPath: "first-period-19621/tenantA/metas/ff-fff-1695272400-1695276000-aaa"},
Data: blockFile,
}
actualPath, err := downloader.extractBlock(&block, ts)
require.NoError(t, err)
expectedPath := filepath.Join(workingDir, block.BlockPath, strconv.FormatInt(ts.UnixMilli(), 10))
require.Equal(t, expectedPath, actualPath,
"expected archive to be extracted to working directory under the same path as blockPath and with timestamp suffix")
require.FileExists(t, filepath.Join(expectedPath, v1.BloomFileName))
require.FileExists(t, filepath.Join(expectedPath, v1.SeriesFileName))
actualBloomFileContent, err := os.ReadFile(filepath.Join(expectedPath, v1.BloomFileName))
require.NoError(t, err)
require.Equal(t, bloomFileContent, string(actualBloomFileContent))
actualSeriesFileContent, err := os.ReadFile(filepath.Join(expectedPath, v1.SeriesFileName))
require.NoError(t, err)
require.Equal(t, seriesFileContent, string(actualSeriesFileContent))
}
func createBlockArchive(t *testing.T) (*os.File, string, string) {
dir := t.TempDir()
mockBlockDir := filepath.Join(dir, "mock-block-dir")
err := os.MkdirAll(mockBlockDir, 0777)
require.NoError(t, err)
bloomFile, err := os.Create(filepath.Join(mockBlockDir, v1.BloomFileName))
require.NoError(t, err)
bloomFileContent := uuid.NewString()
_, err = io.Copy(bloomFile, bytes.NewReader([]byte(bloomFileContent)))
require.NoError(t, err)
seriesFile, err := os.Create(filepath.Join(mockBlockDir, v1.SeriesFileName))
require.NoError(t, err)
seriesFileContent := uuid.NewString()
_, err = io.Copy(seriesFile, bytes.NewReader([]byte(seriesFileContent)))
require.NoError(t, err)
blockFilePath := filepath.Join(dir, "test-block-archive")
file, err := os.OpenFile(blockFilePath, os.O_CREATE|os.O_RDWR, 0700)
require.NoError(t, err)
err = v1.TarGz(file, v1.NewDirectoryBlockReader(mockBlockDir))
require.NoError(t, err)
blockFile, err := os.OpenFile(blockFilePath, os.O_RDONLY, 0700)
require.NoError(t, err)
return blockFile, bloomFileContent, seriesFileContent
}

@ -79,7 +79,7 @@ type Block struct {
}
type BlockClient interface {
GetBlocks(ctx context.Context, references []BlockRef) (chan Block, chan error)
GetBlock(ctx context.Context, reference BlockRef) (Block, error)
PutBlocks(ctx context.Context, blocks []Block) ([]Block, error)
DeleteBlocks(ctx context.Context, blocks []BlockRef) error
}
@ -190,42 +190,23 @@ func (b *BloomClient) DeleteMeta(ctx context.Context, meta Meta) error {
return b.periodicObjectClients[periodFrom].DeleteObject(ctx, key)
}
// GetBlocks downloads all the blocks from objectStorage in parallel and sends the downloaded blocks
// via the channel Block that is closed only if all the blocks are downloaded without errors.
// If an error happens, the error will be sent via error channel.
func (b *BloomClient) GetBlocks(ctx context.Context, references []BlockRef) (chan Block, chan error) {
blocksChannel := make(chan Block, len(references))
errChannel := make(chan error)
go func() {
//todo move concurrency to the config
err := concurrency.ForEachJob(ctx, len(references), 100, func(ctx context.Context, idx int) error {
reference := references[idx]
period, err := findPeriod(b.periodicConfigs, reference.StartTimestamp)
if err != nil {
return fmt.Errorf("error while period lookup: %w", err)
}
objectClient := b.periodicObjectClients[period]
readCloser, _, err := objectClient.GetObject(ctx, createBlockObjectKey(reference.Ref))
if err != nil {
return fmt.Errorf("error while fetching object from storage: %w", err)
}
blocksChannel <- Block{
BlockRef: reference,
Data: readCloser,
}
return nil
})
if err != nil {
errChannel <- fmt.Errorf("error downloading block file: %w", err)
return
}
//close blocks channel only if there is no error
close(blocksChannel)
}()
return blocksChannel, errChannel
// GetBlock downloads the blocks from objectStorage and returns the downloaded block
func (b *BloomClient) GetBlock(ctx context.Context, reference BlockRef) (Block, error) {
period, err := findPeriod(b.periodicConfigs, reference.StartTimestamp)
if err != nil {
return Block{}, fmt.Errorf("error while period lookup: %w", err)
}
objectClient := b.periodicObjectClients[period]
readCloser, _, err := objectClient.GetObject(ctx, createBlockObjectKey(reference.Ref))
if err != nil {
return Block{}, fmt.Errorf("error while fetching object from storage: %w", err)
}
return Block{
BlockRef: reference,
Data: readCloser,
}, nil
}
// TODO zip (archive) blocks before uploading to storage
func (b *BloomClient) PutBlocks(ctx context.Context, blocks []Block) ([]Block, error) {
results := make([]Block, len(blocks))
//todo move concurrency to the config

@ -32,7 +32,7 @@ var (
)
func Test_BloomClient_GetMetas(t *testing.T) {
shipper := createShipper(t)
shipper := createClient(t)
var expected []Meta
folder1 := shipper.storageConfig.NamedStores.Filesystem["folder-1"].Directory
@ -99,12 +99,12 @@ func Test_BloomClient_PutMeta(t *testing.T) {
}
for name, data := range tests {
t.Run(name, func(t *testing.T) {
shipper := createShipper(t)
bloomClient := createClient(t)
err := shipper.PutMeta(context.Background(), data.source)
err := bloomClient.PutMeta(context.Background(), data.source)
require.NoError(t, err)
directory := shipper.storageConfig.NamedStores.Filesystem[data.expectedStorage].Directory
directory := bloomClient.storageConfig.NamedStores.Filesystem[data.expectedStorage].Directory
filePath := filepath.Join(directory, data.expectedFilePath)
require.FileExists(t, filePath)
content, err := os.ReadFile(filePath)
@ -155,15 +155,15 @@ func Test_BloomClient_DeleteMeta(t *testing.T) {
}
for name, data := range tests {
t.Run(name, func(t *testing.T) {
shipper := createShipper(t)
directory := shipper.storageConfig.NamedStores.Filesystem[data.expectedStorage].Directory
bloomClient := createClient(t)
directory := bloomClient.storageConfig.NamedStores.Filesystem[data.expectedStorage].Directory
file := filepath.Join(directory, data.expectedFilePath)
err := os.MkdirAll(file[:strings.LastIndex(file, delimiter)], 0755)
require.NoError(t, err)
err = os.WriteFile(file, []byte("dummy content"), 0700)
require.NoError(t, err)
err = shipper.DeleteMeta(context.Background(), data.source)
err = bloomClient.DeleteMeta(context.Background(), data.source)
require.NoError(t, err)
require.NoFileExists(t, file)
@ -173,8 +173,8 @@ func Test_BloomClient_DeleteMeta(t *testing.T) {
}
func Test_BloomClient_GetBlocks(t *testing.T) {
shipper := createShipper(t)
fsNamedStores := shipper.storageConfig.NamedStores.Filesystem
bloomClient := createClient(t)
fsNamedStores := bloomClient.storageConfig.NamedStores.Filesystem
firstBlockPath := "bloom/first-period-19621/tenantA/blooms/eeee-ffff/1695272400-1695276000-1"
firstBlockFullPath := filepath.Join(fsNamedStores["folder-1"].Directory, firstBlockPath)
firstBlockData := createBlockFile(t, firstBlockFullPath)
@ -209,44 +209,21 @@ func Test_BloomClient_GetBlocks(t *testing.T) {
BlockPath: secondBlockPath,
}
blocksToDownload := []BlockRef{firstBlockRef, secondBlockRef}
blocksCh, errorsCh := shipper.GetBlocks(context.Background(), blocksToDownload)
blocks := make(map[string]string)
func() {
timout := time.After(5 * time.Second)
for {
select {
case <-timout:
t.Fatalf("the test had to be completed before the timeout")
return
case err := <-errorsCh:
require.NoError(t, err)
case block, ok := <-blocksCh:
if !ok {
return
}
blockData, err := io.ReadAll(block.Data)
require.NoError(t, err)
blocks[block.BlockRef.BlockPath] = string(blockData)
}
}
}()
firstBlockActualData, exists := blocks[firstBlockRef.BlockPath]
require.Truef(t, exists, "data for the first block must be present in the results: %+v", blocks)
require.Equal(t, firstBlockData, firstBlockActualData)
secondBlockActualData, exists := blocks[secondBlockRef.BlockPath]
require.True(t, exists, "data for the second block must be present in the results: %+v", blocks)
require.Equal(t, secondBlockData, secondBlockActualData)
downloadedFirstBlock, err := bloomClient.GetBlock(context.Background(), firstBlockRef)
require.NoError(t, err)
firstBlockActualData, err := io.ReadAll(downloadedFirstBlock.Data)
require.NoError(t, err)
require.Equal(t, firstBlockData, string(firstBlockActualData))
require.Len(t, blocks, 2)
downloadedSecondBlock, err := bloomClient.GetBlock(context.Background(), secondBlockRef)
require.NoError(t, err)
secondBlockActualData, err := io.ReadAll(downloadedSecondBlock.Data)
require.NoError(t, err)
require.Equal(t, secondBlockData, string(secondBlockActualData))
}
func Test_BloomClient_PutBlocks(t *testing.T) {
shipper := createShipper(t)
bloomClient := createClient(t)
blockForFirstFolderData := "data1"
blockForFirstFolder := Block{
BlockRef: BlockRef{
@ -281,7 +258,7 @@ func Test_BloomClient_PutBlocks(t *testing.T) {
Data: aws_io.ReadSeekNopCloser{ReadSeeker: bytes.NewReader([]byte(blockForSecondFolderData))},
}
results, err := shipper.PutBlocks(context.Background(), []Block{blockForFirstFolder, blockForSecondFolder})
results, err := bloomClient.PutBlocks(context.Background(), []Block{blockForFirstFolder, blockForSecondFolder})
require.NoError(t, err)
require.Len(t, results, 2)
firstResultBlock := results[0]
@ -295,7 +272,7 @@ func Test_BloomClient_PutBlocks(t *testing.T) {
require.Equal(t, blockForFirstFolder.EndTimestamp, firstResultBlock.EndTimestamp)
require.Equal(t, blockForFirstFolder.Checksum, firstResultBlock.Checksum)
require.Equal(t, blockForFirstFolder.IndexPath, firstResultBlock.IndexPath)
folder1 := shipper.storageConfig.NamedStores.Filesystem["folder-1"].Directory
folder1 := bloomClient.storageConfig.NamedStores.Filesystem["folder-1"].Directory
savedFilePath := filepath.Join(folder1, path)
require.FileExists(t, savedFilePath)
savedData, err := os.ReadFile(savedFilePath)
@ -313,7 +290,7 @@ func Test_BloomClient_PutBlocks(t *testing.T) {
require.Equal(t, blockForSecondFolder.EndTimestamp, secondResultBlock.EndTimestamp)
require.Equal(t, blockForSecondFolder.Checksum, secondResultBlock.Checksum)
require.Equal(t, blockForSecondFolder.IndexPath, secondResultBlock.IndexPath)
folder2 := shipper.storageConfig.NamedStores.Filesystem["folder-2"].Directory
folder2 := bloomClient.storageConfig.NamedStores.Filesystem["folder-2"].Directory
savedFilePath = filepath.Join(folder2, path)
require.FileExists(t, savedFilePath)
@ -323,8 +300,8 @@ func Test_BloomClient_PutBlocks(t *testing.T) {
}
func Test_BloomClient_DeleteBlocks(t *testing.T) {
shipper := createShipper(t)
fsNamedStores := shipper.storageConfig.NamedStores.Filesystem
bloomClient := createClient(t)
fsNamedStores := bloomClient.storageConfig.NamedStores.Filesystem
block1Path := filepath.Join(fsNamedStores["folder-1"].Directory, "bloom/first-period-19621/tenantA/blooms/eeee-ffff/1695272400-1695276000-1")
createBlockFile(t, block1Path)
block2Path := filepath.Join(fsNamedStores["folder-2"].Directory, "bloom/second-period-19624/tenantA/blooms/aaaa-bbbb/1695531600-1695535200-2")
@ -358,7 +335,7 @@ func Test_BloomClient_DeleteBlocks(t *testing.T) {
IndexPath: uuid.New().String(),
},
}
err := shipper.DeleteBlocks(context.Background(), blocksToDelete)
err := bloomClient.DeleteBlocks(context.Background(), blocksToDelete)
require.NoError(t, err)
require.NoFileExists(t, block1Path)
require.NoFileExists(t, block2Path)
@ -500,7 +477,7 @@ func Test_createMetaRef(t *testing.T) {
}
}
func createShipper(t *testing.T) *BloomClient {
func createClient(t *testing.T) *BloomClient {
periodicConfigs := createPeriodConfigs()
namedStores := storage.NamedStores{
Filesystem: map[string]storage.NamedFSConfig{
@ -513,9 +490,9 @@ func createShipper(t *testing.T) *BloomClient {
metrics := storage.NewClientMetrics()
t.Cleanup(metrics.Unregister)
bshipper, err := NewBloomClient(periodicConfigs, storageConfig, metrics)
bloomClient, err := NewBloomClient(periodicConfigs, storageConfig, metrics)
require.NoError(t, err)
return bshipper
return bloomClient
}
func createPeriodConfigs() []config.PeriodConfig {

@ -8,11 +8,23 @@ import (
)
type Config struct {
WorkingDirectory string `yaml:"working_directory"`
WorkingDirectory string `yaml:"working_directory"`
BlocksDownloadingQueue DownloadingQueueConfig `yaml:"blocks_downloading_queue"`
}
type DownloadingQueueConfig struct {
WorkersCount int `yaml:"workers_count"`
MaxTasksEnqueuedPerTenant int `yaml:"max_tasks_enqueued_per_tenant"`
}
func (cfg *DownloadingQueueConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.IntVar(&cfg.WorkersCount, prefix+"workers-count", 100, "The count of parallel workers that download Bloom Blocks.")
f.IntVar(&cfg.MaxTasksEnqueuedPerTenant, prefix+"max_tasks_enqueued_per_tenant", 10_000, "Maximum number of task in queue per tenant per bloom-gateway. Enqueuing the tasks above this limit will fail an error.")
}
func (c *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&c.WorkingDirectory, prefix+"shipper.working-directory", "bloom-shipper", "Working directory to store downloaded Bloom Blocks.")
c.BlocksDownloadingQueue.RegisterFlagsWithPrefix(prefix+"shipper.blocks-downloading-queue.", f)
}
func (c *Config) Validate() error {

@ -4,32 +4,38 @@ import (
"cmp"
"context"
"fmt"
"io"
"os"
"path/filepath"
"strconv"
"strings"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/exp/slices"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper/config"
)
type Shipper struct {
client Client
config config.Config
logger log.Logger
client Client
config config.Config
logger log.Logger
blockDownloader *blockDownloader
}
func NewShipper(client Client, config config.Config, logger log.Logger) (*Shipper, error) {
type Limits interface {
BloomGatewayBlocksDownloadingParallelism(tenantID string) int
}
func NewShipper(client Client, config config.Config, limits Limits, logger log.Logger, reg prometheus.Registerer) (*Shipper, error) {
logger = log.With(logger, "component", "bloom-shipper")
downloader, err := newBlockDownloader(config, client, limits, logger, reg)
if err != nil {
return nil, fmt.Errorf("error creating block downloader: %w", err)
}
return &Shipper{
client: client,
config: config,
logger: log.With(logger, "component", "bloom-shipper"),
client: client,
config: config,
logger: logger,
blockDownloader: downloader,
}, nil
}
@ -47,21 +53,18 @@ func (s *Shipper) ForEachBlock(
return fmt.Errorf("error fetching active block references : %w", err)
}
blocksChannel, errorsChannel := s.client.GetBlocks(ctx, blockRefs)
cancelContext, cancelFunc := context.WithCancel(ctx)
defer cancelFunc()
blocksChannel, errorsChannel := s.blockDownloader.downloadBlocks(cancelContext, tenantID, blockRefs)
for {
select {
case block, ok := <-blocksChannel:
case result, ok := <-blocksChannel:
if !ok {
return nil
}
directory, err := s.extractBlock(&block, time.Now().UTC())
err = callback(result.BlockQuerier)
if err != nil {
return fmt.Errorf("error unarchiving block %s err: %w", block.BlockPath, err)
}
blockQuerier := s.createBlockQuerier(directory)
err = callback(blockQuerier)
if err != nil {
return fmt.Errorf("error running callback function for block %s err: %w", block.BlockPath, err)
return fmt.Errorf("error running callback function for block %s err: %w", result.BlockPath, err)
}
case err := <-errorsChannel:
if err != nil {
@ -73,6 +76,7 @@ func (s *Shipper) ForEachBlock(
func (s *Shipper) Stop() {
s.client.Stop()
s.blockDownloader.stop()
}
// getFromThrough returns the first and list item of a fingerprint slice
@ -177,55 +181,3 @@ func isOutsideRange(b *BlockRef, startTimestamp, endTimestamp int64, fingerprint
}
return b.MaxFingerprint < fingerprints[idx]
}
// extract the files into directory and returns absolute path to this directory.
func (s *Shipper) extractBlock(block *Block, ts time.Time) (string, error) {
workingDirectoryPath := filepath.Join(s.config.WorkingDirectory, block.BlockPath, strconv.FormatInt(ts.UnixMilli(), 10))
err := os.MkdirAll(workingDirectoryPath, os.ModePerm)
if err != nil {
return "", fmt.Errorf("can not create directory to extract the block: %w", err)
}
archivePath, err := writeDataToTempFile(workingDirectoryPath, block)
if err != nil {
return "", fmt.Errorf("error writing data to temp file: %w", err)
}
defer func() {
os.Remove(archivePath)
// todo log err
}()
err = extractArchive(archivePath, workingDirectoryPath)
if err != nil {
return "", fmt.Errorf("error extracting archive: %w", err)
}
return workingDirectoryPath, nil
}
func (s *Shipper) createBlockQuerier(directory string) *v1.BlockQuerier {
reader := v1.NewDirectoryBlockReader(directory)
block := v1.NewBlock(reader)
return v1.NewBlockQuerier(block)
}
func writeDataToTempFile(workingDirectoryPath string, block *Block) (string, error) {
defer block.Data.Close()
archivePath := filepath.Join(workingDirectoryPath, block.BlockPath[strings.LastIndex(block.BlockPath, delimiter)+1:])
archiveFile, err := os.Create(archivePath)
if err != nil {
return "", fmt.Errorf("error creating empty file to store the archiver: %w", err)
}
defer archiveFile.Close()
_, err = io.Copy(archiveFile, block.Data)
if err != nil {
return "", fmt.Errorf("error writing data to archive file: %w", err)
}
return archivePath, nil
}
func extractArchive(archivePath string, workingDirectoryPath string) error {
file, err := os.Open(archivePath)
if err != nil {
return fmt.Errorf("error opening archive file %s: %w", file.Name(), err)
}
return v1.UnTarGz(workingDirectoryPath, file)
}

@ -1,21 +1,11 @@
package bloomshipper
import (
"bytes"
"fmt"
"io"
"math"
"os"
"path/filepath"
"strconv"
"testing"
"time"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper/config"
)
func Test_Shipper_findBlocks(t *testing.T) {
@ -208,61 +198,3 @@ func createBlockRef(
BlockPath: blockPath,
}
}
const (
bloomFileName = "bloom"
seriesFileName = "series"
)
func Test_Shipper_extractBlock(t *testing.T) {
dir := t.TempDir()
mockBlockDir := filepath.Join(dir, "mock-block-dir")
err := os.MkdirAll(mockBlockDir, 0777)
require.NoError(t, err)
bloomFile, err := os.Create(filepath.Join(mockBlockDir, bloomFileName))
require.NoError(t, err)
bloomFileContent := uuid.NewString()
_, err = io.Copy(bloomFile, bytes.NewReader([]byte(bloomFileContent)))
require.NoError(t, err)
seriesFile, err := os.Create(filepath.Join(mockBlockDir, seriesFileName))
require.NoError(t, err)
seriesFileContent := uuid.NewString()
_, err = io.Copy(seriesFile, bytes.NewReader([]byte(seriesFileContent)))
require.NoError(t, err)
blockFilePath := filepath.Join(dir, "test-block-archive")
file, err := os.OpenFile(blockFilePath, os.O_CREATE|os.O_RDWR, 0700)
require.NoError(t, err)
err = v1.TarGz(file, v1.NewDirectoryBlockReader(mockBlockDir))
require.NoError(t, err)
blockFile, err := os.OpenFile(blockFilePath, os.O_RDONLY, 0700)
require.NoError(t, err)
workingDir := t.TempDir()
shipper := Shipper{config: config.Config{WorkingDirectory: workingDir}}
ts := time.Now().UTC()
block := Block{
BlockRef: BlockRef{BlockPath: "first-period-19621/tenantA/metas/ff-fff-1695272400-1695276000-aaa"},
Data: blockFile,
}
actualPath, err := shipper.extractBlock(&block, ts)
require.NoError(t, err)
expectedPath := filepath.Join(workingDir, block.BlockPath, strconv.FormatInt(ts.UnixMilli(), 10))
require.Equal(t, expectedPath, actualPath,
"expected archive to be extracted to working directory under the same path as blockPath and with timestamp suffix")
require.FileExists(t, filepath.Join(expectedPath, bloomFileName))
require.FileExists(t, filepath.Join(expectedPath, seriesFileName))
actualBloomFileContent, err := os.ReadFile(filepath.Join(expectedPath, bloomFileName))
require.NoError(t, err)
require.Equal(t, bloomFileContent, string(actualBloomFileContent))
actualSeriesFileContent, err := os.ReadFile(filepath.Join(expectedPath, seriesFileName))
require.NoError(t, err)
require.Equal(t, seriesFileContent, string(actualSeriesFileContent))
}

@ -182,13 +182,14 @@ type Limits struct {
BloomGatewayShardSize int `yaml:"bloom_gateway_shard_size" json:"bloom_gateway_shard_size"`
BloomGatewayEnabled bool `yaml:"bloom_gateway_enable_filtering" json:"bloom_gateway_enable_filtering"`
BloomCompactorShardSize int `yaml:"bloom_compactor_shard_size" json:"bloom_compactor_shard_size"`
BloomCompactorMaxTableAge time.Duration `yaml:"bloom_compactor_max_table_age" json:"bloom_compactor_max_table_age"`
BloomCompactorMinTableAge time.Duration `yaml:"bloom_compactor_min_table_age" json:"bloom_compactor_min_table_age"`
BloomCompactorEnabled bool `yaml:"bloom_compactor_enable_compaction" json:"bloom_compactor_enable_compaction"`
BloomNGramLength int `yaml:"bloom_ngram_length" json:"bloom_ngram_length"`
BloomNGramSkip int `yaml:"bloom_ngram_skip" json:"bloom_ngram_skip"`
BloomFalsePositiveRate float64 `yaml:"bloom_false_positive_rate" json:"bloom_false_positive_rate"`
BloomCompactorShardSize int `yaml:"bloom_compactor_shard_size" json:"bloom_compactor_shard_size"`
BloomCompactorMaxTableAge time.Duration `yaml:"bloom_compactor_max_table_age" json:"bloom_compactor_max_table_age"`
BloomCompactorMinTableAge time.Duration `yaml:"bloom_compactor_min_table_age" json:"bloom_compactor_min_table_age"`
BloomCompactorEnabled bool `yaml:"bloom_compactor_enable_compaction" json:"bloom_compactor_enable_compaction"`
BloomNGramLength int `yaml:"bloom_ngram_length" json:"bloom_ngram_length"`
BloomNGramSkip int `yaml:"bloom_ngram_skip" json:"bloom_ngram_skip"`
BloomFalsePositiveRate float64 `yaml:"bloom_false_positive_rate" json:"bloom_false_positive_rate"`
BloomGatewayBlocksDownloadingParallelism int `yaml:"bloom_gateway_blocks_downloading_parallelism" json:"bloom_gateway_blocks_downloading_parallelism"`
AllowStructuredMetadata bool `yaml:"allow_structured_metadata,omitempty" json:"allow_structured_metadata,omitempty" doc:"description=Allow user to send structured metadata in push payload."`
MaxStructuredMetadataSize flagext.ByteSize `yaml:"max_structured_metadata_size" json:"max_structured_metadata_size" doc:"description=Maximum size accepted for structured metadata per log line."`
@ -309,6 +310,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&l.BloomNGramLength, "bloom-compactor.ngram-length", 4, "Length of the n-grams created when computing blooms from log lines.")
f.IntVar(&l.BloomNGramSkip, "bloom-compactor.ngram-skip", 0, "Skip factor for the n-grams created when computing blooms from log lines.")
f.Float64Var(&l.BloomFalsePositiveRate, "bloom-compactor.false-positive-rate", 0.01, "Scalable Bloom Filter desired false-positive rate.")
f.IntVar(&l.BloomGatewayBlocksDownloadingParallelism, "bloom-gateway.blocks-downloading-parallelism", 50, "Maximum number of blocks will be downloaded in parallel by the Bloom Gateway.")
l.ShardStreams = &shardstreams.Config{}
l.ShardStreams.RegisterFlagsWithPrefix("shard-streams", f)
@ -788,6 +790,10 @@ func (o *Overrides) BloomGatewayShardSize(userID string) int {
return o.getOverridesForUser(userID).BloomGatewayShardSize
}
func (o *Overrides) BloomGatewayBlocksDownloadingParallelism(userID string) int {
return o.getOverridesForUser(userID).BloomGatewayBlocksDownloadingParallelism
}
func (o *Overrides) BloomGatewayEnabled(userID string) bool {
return o.getOverridesForUser(userID).BloomGatewayEnabled
}

Loading…
Cancel
Save