feat(bloomstore): Support for sharding blocks across multiple different directories (#12375)

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
pull/12385/head
Christian Haudum 2 years ago committed by GitHub
parent c1edb8220c
commit 15dc2bac04
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 5
      docs/sources/configure/_index.md
  2. 2
      pkg/bloomgateway/bloomgateway_test.go
  3. 7
      pkg/loki/config_wrapper.go
  4. 5
      pkg/loki/config_wrapper_test.go
  5. 2
      pkg/loki/modules_test.go
  6. 13
      pkg/storage/stores/shipper/bloomshipper/cache.go
  7. 2
      pkg/storage/stores/shipper/bloomshipper/cache_test.go
  8. 7
      pkg/storage/stores/shipper/bloomshipper/client.go
  9. 4
      pkg/storage/stores/shipper/bloomshipper/client_test.go
  10. 10
      pkg/storage/stores/shipper/bloomshipper/config/config.go
  11. 6
      pkg/storage/stores/shipper/bloomshipper/fetcher.go
  12. 12
      pkg/storage/stores/shipper/bloomshipper/fetcher_test.go
  13. 46
      pkg/storage/stores/shipper/bloomshipper/resolver.go
  14. 46
      pkg/storage/stores/shipper/bloomshipper/resolver_test.go
  15. 10
      pkg/storage/stores/shipper/bloomshipper/store.go
  16. 2
      pkg/storage/stores/shipper/bloomshipper/store_test.go

@ -2353,9 +2353,10 @@ tsdb_shipper:
# Configures Bloom Shipper.
bloom_shipper:
# Working directory to store downloaded Bloom Blocks.
# Working directory to store downloaded bloom blocks. Supports multiple
# directories, separated by comma.
# CLI flag: -bloom.shipper.working-directory
[working_directory: <string> | default = "bloom-shipper"]
[working_directory: <string> | default = "/data/blooms"]
# Maximum size of bloom pages that should be queried. Larger pages than this
# limit are skipped when querying blooms to limit memory usage.

@ -72,7 +72,7 @@ func setupBloomStore(t *testing.T) *bloomshipper.BloomStore {
}
storageCfg := storage.Config{
BloomShipperConfig: bloomshipperconfig.Config{
WorkingDirectory: t.TempDir(),
WorkingDirectory: []string{t.TempDir()},
BlocksDownloadingQueue: bloomshipperconfig.DownloadingQueueConfig{
WorkersCount: 1,
},

@ -409,8 +409,11 @@ func applyPathPrefixDefaults(r, defaults *ConfigWrapper) {
if r.CompactorConfig.WorkingDirectory == defaults.CompactorConfig.WorkingDirectory {
r.CompactorConfig.WorkingDirectory = fmt.Sprintf("%s/compactor", prefix)
}
if r.StorageConfig.BloomShipperConfig.WorkingDirectory == defaults.StorageConfig.BloomShipperConfig.WorkingDirectory {
r.StorageConfig.BloomShipperConfig.WorkingDirectory = fmt.Sprintf("%s/blooms", prefix)
if len(r.StorageConfig.BloomShipperConfig.WorkingDirectory) == 1 &&
len(r.StorageConfig.BloomShipperConfig.WorkingDirectory) == len(defaults.StorageConfig.BloomShipperConfig.WorkingDirectory) &&
r.StorageConfig.BloomShipperConfig.WorkingDirectory[0] == defaults.StorageConfig.BloomShipperConfig.WorkingDirectory[0] {
_ = r.StorageConfig.BloomShipperConfig.WorkingDirectory.Set(fmt.Sprintf("%s/blooms", prefix))
}
}
}

@ -9,6 +9,7 @@ import (
"testing"
"time"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/netutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -100,7 +101,7 @@ common:
assert.EqualValues(t, "/opt/loki/rules-temp", config.Ruler.RulePath)
assert.EqualValues(t, "/opt/loki/wal", config.Ingester.WAL.Dir)
assert.EqualValues(t, "/opt/loki/compactor", config.CompactorConfig.WorkingDirectory)
assert.EqualValues(t, "/opt/loki/blooms", config.StorageConfig.BloomShipperConfig.WorkingDirectory)
assert.EqualValues(t, flagext.StringSliceCSV{"/opt/loki/blooms"}, config.StorageConfig.BloomShipperConfig.WorkingDirectory)
})
t.Run("accepts paths both with and without trailing slash", func(t *testing.T) {
@ -112,7 +113,7 @@ common:
assert.EqualValues(t, "/opt/loki/rules-temp", config.Ruler.RulePath)
assert.EqualValues(t, "/opt/loki/wal", config.Ingester.WAL.Dir)
assert.EqualValues(t, "/opt/loki/compactor", config.CompactorConfig.WorkingDirectory)
assert.EqualValues(t, "/opt/loki/blooms", config.StorageConfig.BloomShipperConfig.WorkingDirectory)
assert.EqualValues(t, flagext.StringSliceCSV{"/opt/loki/blooms"}, config.StorageConfig.BloomShipperConfig.WorkingDirectory)
})
t.Run("does not rewrite custom (non-default) paths passed via config file", func(t *testing.T) {

@ -367,7 +367,7 @@ func minimalWorkingConfig(t *testing.T, dir, target string, cfgTransformers ...f
cfg.StorageConfig = storage.Config{
FSConfig: local.FSConfig{Directory: dir},
BloomShipperConfig: bloomshipperconfig.Config{
WorkingDirectory: filepath.Join(dir, "blooms"),
WorkingDirectory: []string{filepath.Join(dir, "blooms")},
BlocksDownloadingQueue: bloomshipperconfig.DownloadingQueueConfig{
WorkersCount: 1,
},

@ -12,6 +12,7 @@ import (
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/chunk/cache"
"github.com/grafana/loki/pkg/util"
)
type CloseableBlockQuerier struct {
@ -34,10 +35,14 @@ func (c *CloseableBlockQuerier) SeriesIter() (v1.PeekingIterator[*v1.SeriesWithB
return v1.NewPeekingIter[*v1.SeriesWithBloom](c.BlockQuerier), nil
}
func LoadBlocksDirIntoCache(path string, c Cache, logger log.Logger) error {
level.Debug(logger).Log("msg", "load bloomshipper working directory into cache", "path", path)
keys, values := loadBlockDirectories(path, logger)
return c.PutMany(context.Background(), keys, values)
func LoadBlocksDirIntoCache(paths []string, c Cache, logger log.Logger) error {
var err util.MultiError
for _, path := range paths {
level.Debug(logger).Log("msg", "load bloomshipper working directory into cache", "path", path)
keys, values := loadBlockDirectories(path, logger)
err.Add(c.PutMany(context.Background(), keys, values))
}
return err.Err()
}
func loadBlockDirectories(root string, logger log.Logger) (keys []string, values []BlockDirectory) {

@ -88,7 +88,7 @@ func Test_LoadBlocksDirIntoCache(t *testing.T) {
}
c := NewFsBlocksCache(cfg, nil, log.NewNopLogger())
err := LoadBlocksDirIntoCache(wd, c, logger)
err := LoadBlocksDirIntoCache([]string{wd, t.TempDir()}, c, logger)
require.NoError(t, err)
require.Equal(t, 1, len(c.entries))

@ -256,9 +256,14 @@ type BloomClient struct {
}
func NewBloomClient(cfg bloomStoreConfig, client client.ObjectClient, logger log.Logger) (*BloomClient, error) {
fsResolver, err := NewShardedPrefixedResolver(cfg.workingDirs, defaultKeyResolver{})
if err != nil {
return nil, errors.Wrap(err, "creating fs resolver")
}
return &BloomClient{
KeyResolver: defaultKeyResolver{}, // TODO(owen-d): hook into schema, similar to `{,Parse}ExternalKey`
fsResolver: NewPrefixedResolver(cfg.workingDir, defaultKeyResolver{}),
fsResolver: fsResolver,
concurrency: cfg.numWorkers,
client: client,
logger: logger,

@ -41,8 +41,8 @@ func newMockBloomClient(t *testing.T) (*BloomClient, string) {
dir := t.TempDir()
logger := log.NewLogfmtLogger(os.Stderr)
cfg := bloomStoreConfig{
workingDir: dir,
numWorkers: 3,
workingDirs: []string{dir},
numWorkers: 3,
}
client, err := NewBloomClient(cfg, oc, logger)
require.NoError(t, err)

@ -4,7 +4,6 @@ package config
import (
"errors"
"flag"
"strings"
"time"
"github.com/grafana/dskit/flagext"
@ -13,7 +12,7 @@ import (
)
type Config struct {
WorkingDirectory string `yaml:"working_directory"`
WorkingDirectory flagext.StringSliceCSV `yaml:"working_directory"`
MaxQueryPageSize flagext.Bytes `yaml:"max_query_page_size"`
BlocksDownloadingQueue DownloadingQueueConfig `yaml:"blocks_downloading_queue"`
BlocksCache BlocksCacheConfig `yaml:"blocks_cache"`
@ -31,7 +30,8 @@ func (cfg *DownloadingQueueConfig) RegisterFlagsWithPrefix(prefix string, f *fla
}
func (c *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&c.WorkingDirectory, prefix+"shipper.working-directory", "bloom-shipper", "Working directory to store downloaded Bloom Blocks.")
c.WorkingDirectory = []string{"/data/blooms"}
f.Var(&c.WorkingDirectory, prefix+"shipper.working-directory", "Working directory to store downloaded bloom blocks. Supports multiple directories, separated by comma.")
_ = c.MaxQueryPageSize.Set("64MiB") // default should match the one set in pkg/storage/bloom/v1/bloom.go
f.Var(&c.MaxQueryPageSize, prefix+"max-query-page-size", "Maximum size of bloom pages that should be queried. Larger pages than this limit are skipped when querying blooms to limit memory usage.")
c.BlocksDownloadingQueue.RegisterFlagsWithPrefix(prefix+"shipper.blocks-downloading-queue.", f)
@ -40,8 +40,8 @@ func (c *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
}
func (c *Config) Validate() error {
if strings.TrimSpace(c.WorkingDirectory) == "" {
return errors.New("working directory must be specified")
if len(c.WorkingDirectory) == 0 {
return errors.New("at least one working directory must be specified")
}
return nil
}

@ -91,12 +91,16 @@ func NewFetcher(
logger log.Logger,
bloomMetrics *v1.Metrics,
) (*Fetcher, error) {
localFSResolver, err := NewShardedPrefixedResolver(cfg.workingDirs, defaultKeyResolver{})
if err != nil {
return nil, errors.Wrap(err, "creating fs resolver")
}
fetcher := &Fetcher{
cfg: cfg,
client: client,
metasCache: metasCache,
blocksCache: blocksCache,
localFSResolver: NewPrefixedResolver(cfg.workingDir, defaultKeyResolver{}),
localFSResolver: localFSResolver,
metrics: newFetcherMetrics(reg, constants.Loki, "bloom_store"),
bloomMetrics: bloomMetrics,
logger: logger,

@ -100,7 +100,7 @@ func TestMetasFetcher(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
ctx := context.Background()
metasCache := cache.NewMockCache()
cfg := bloomStoreConfig{workingDir: t.TempDir(), numWorkers: 1}
cfg := bloomStoreConfig{workingDirs: []string{t.TempDir()}, numWorkers: 1}
oc, err := local.NewFSObjectClient(local.FSConfig{Directory: dir})
require.NoError(t, err)
@ -259,7 +259,7 @@ func TestFetcher_DownloadQueue(t *testing.T) {
func TestFetcher_LoadBlocksFromFS(t *testing.T) {
base := t.TempDir()
cfg := bloomStoreConfig{workingDir: base, numWorkers: 1}
cfg := bloomStoreConfig{workingDirs: []string{base}, numWorkers: 1}
resolver := NewPrefixedResolver(base, defaultKeyResolver{})
refs := []BlockRef{
@ -312,9 +312,13 @@ func createBlockDir(t *testing.T, path string) {
}
func TestFetcher_IsBlockDir(t *testing.T) {
cfg := bloomStoreConfig{numWorkers: 1}
cfg := bloomStoreConfig{
numWorkers: 1,
workingDirs: []string{t.TempDir()},
}
fetcher, _ := NewFetcher(cfg, nil, nil, nil, nil, log.NewNopLogger(), v1.NewMetrics(nil))
fetcher, err := NewFetcher(cfg, nil, nil, nil, nil, log.NewNopLogger(), v1.NewMetrics(nil))
require.NoError(t, err)
t.Run("path does not exist", func(t *testing.T) {
base := t.TempDir()

@ -2,6 +2,8 @@ package bloomshipper
import (
"fmt"
"hash"
"hash/fnv"
"path"
"path/filepath"
"strconv"
@ -150,6 +152,50 @@ func (p PrefixedResolver) Block(ref BlockRef) Location {
}
}
type hashable interface {
Hash(hash.Hash32) error
}
type ShardedPrefixedResolver struct {
prefixes []string
KeyResolver
}
func NewShardedPrefixedResolver(prefixes []string, resolver KeyResolver) (KeyResolver, error) {
n := len(prefixes)
switch n {
case 0:
return nil, fmt.Errorf("requires at least 1 prefix")
case 1:
return NewPrefixedResolver(prefixes[0], resolver), nil
default:
return ShardedPrefixedResolver{
prefixes: prefixes,
KeyResolver: resolver,
}, nil
}
}
func (r ShardedPrefixedResolver) prefix(ref hashable) key {
h := fnv.New32()
_ = ref.Hash(h)
return key(r.prefixes[h.Sum32()%uint32(len(r.prefixes))])
}
func (r ShardedPrefixedResolver) Meta(ref MetaRef) Location {
return locations{
r.prefix(ref),
r.KeyResolver.Meta(ref),
}
}
func (r ShardedPrefixedResolver) Block(ref BlockRef) Location {
return locations{
r.prefix(ref),
r.KeyResolver.Block(ref),
}
}
type Location interface {
Addr() string // object storage location
LocalPath() string // local path version

@ -53,3 +53,49 @@ func TestResolver_ParseBlockKey(t *testing.T) {
require.NoError(t, err)
require.Equal(t, ref, parsed)
}
func TestResolver_ShardedPrefixedResolver(t *testing.T) {
blockRef := BlockRef{
Ref: Ref{
TenantID: "tenant",
TableName: "table_1",
Bounds: v1.NewBounds(0x0000, 0xffff),
StartTimestamp: 0,
EndTimestamp: 3600000,
Checksum: 48350,
},
}
metaRef := MetaRef{
Ref: Ref{
TenantID: "tenant",
TableName: "table_1",
Bounds: v1.NewBounds(0x0000, 0xffff),
Checksum: 43981,
},
}
t.Run("empty prefixes cause error", func(t *testing.T) {
_, err := NewShardedPrefixedResolver([]string{}, defaultKeyResolver{})
require.ErrorContains(t, err, "requires at least 1 prefix")
})
t.Run("single prefix", func(t *testing.T) {
r, err := NewShardedPrefixedResolver([]string{"prefix"}, defaultKeyResolver{})
require.NoError(t, err)
loc := r.Meta(metaRef)
require.Equal(t, "prefix/bloom/table_1/tenant/metas/0000000000000000-000000000000ffff-abcd.json", loc.LocalPath())
loc = r.Block(blockRef)
require.Equal(t, "prefix/bloom/table_1/tenant/blocks/0000000000000000-000000000000ffff/0-3600000-bcde.tar.gz", loc.LocalPath())
})
t.Run("multiple prefixes", func(t *testing.T) {
r, err := NewShardedPrefixedResolver([]string{"a", "b", "c", "d"}, defaultKeyResolver{})
require.NoError(t, err)
loc := r.Meta(metaRef)
require.Equal(t, "b/bloom/table_1/tenant/metas/0000000000000000-000000000000ffff-abcd.json", loc.LocalPath())
loc = r.Block(blockRef)
require.Equal(t, "d/bloom/table_1/tenant/blocks/0000000000000000-000000000000ffff/0-3600000-bcde.tar.gz", loc.LocalPath())
})
}

@ -41,7 +41,7 @@ type StoreWithMetrics interface {
}
type bloomStoreConfig struct {
workingDir string
workingDirs []string
numWorkers int
maxBloomPageSize int
}
@ -193,13 +193,15 @@ func NewBloomStore(
// TODO(chaudum): Remove wrapper
cfg := bloomStoreConfig{
workingDir: storageConfig.BloomShipperConfig.WorkingDirectory,
workingDirs: storageConfig.BloomShipperConfig.WorkingDirectory,
numWorkers: storageConfig.BloomShipperConfig.BlocksDownloadingQueue.WorkersCount,
maxBloomPageSize: int(storageConfig.BloomShipperConfig.MaxQueryPageSize),
}
if err := util.EnsureDirectory(cfg.workingDir); err != nil {
return nil, errors.Wrapf(err, "failed to create working directory for bloom store: '%s'", cfg.workingDir)
for _, wd := range cfg.workingDirs {
if err := util.EnsureDirectory(wd); err != nil {
return nil, errors.Wrapf(err, "failed to create working directory for bloom store: '%s'", wd)
}
}
for _, periodicConfig := range periodicConfigs {

@ -51,7 +51,7 @@ func newMockBloomStoreWithWorkDir(t *testing.T, workDir string) (*BloomStore, st
storageConfig := storage.Config{
BloomShipperConfig: config.Config{
WorkingDirectory: workDir,
WorkingDirectory: []string{workDir},
BlocksDownloadingQueue: config.DownloadingQueueConfig{
WorkersCount: 1,
},

Loading…
Cancel
Save