make prefix for keys of objects created by boltdb-shipper configurable (#3491)

* make prefix for keys of objects created by boltdb-shipper configurable

* shared store key prefix should have a path separator
pull/3505/head
Sandeep Sukhani 5 years ago committed by GitHub
parent 6148794b9b
commit 835f710bc9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      pkg/loki/loki.go
  2. 2
      pkg/storage/store.go
  3. 16
      pkg/storage/stores/shipper/compactor/compactor.go
  4. 3
      pkg/storage/stores/shipper/compactor/compactor_test.go
  5. 11
      pkg/storage/stores/shipper/shipper_index_client.go
  6. 4
      pkg/storage/stores/shipper/table_client.go
  7. 4
      pkg/storage/stores/shipper/table_client_test.go
  8. 19
      pkg/storage/stores/shipper/util/util.go

@ -131,6 +131,12 @@ func (c *Config) Validate() error {
if err := c.Ingester.Validate(); err != nil {
return errors.Wrap(err, "invalid ingester config")
}
if err := c.StorageConfig.BoltDBShipperConfig.Validate(); err != nil {
return errors.Wrap(err, "invalid boltdb-shipper config")
}
if err := c.CompactorConfig.Validate(); err != nil {
return errors.Wrap(err, "invalid compactor config")
}
return nil
}

@ -362,7 +362,7 @@ func RegisterCustomIndexClients(cfg *Config, registerer prometheus.Registerer) {
return nil, err
}
return shipper.NewBoltDBShipperTableClient(objectClient), nil
return shipper.NewBoltDBShipperTableClient(objectClient, cfg.BoltDBShipperConfig.SharedStoreKeyPrefix), nil
})
}

@ -17,22 +17,24 @@ import (
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/grafana/loki/pkg/storage/stores/shipper"
shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util"
"github.com/grafana/loki/pkg/storage/stores/util"
)
const delimiter = "/"
type Config struct {
WorkingDirectory string `yaml:"working_directory"`
SharedStoreType string `yaml:"shared_store"`
CompactionInterval time.Duration `yaml:"compaction_interval"`
WorkingDirectory string `yaml:"working_directory"`
SharedStoreType string `yaml:"shared_store"`
SharedStoreKeyPrefix string `yaml:"shared_store_key_prefix"`
CompactionInterval time.Duration `yaml:"compaction_interval"`
}
// RegisterFlags registers flags.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.WorkingDirectory, "boltdb.shipper.compactor.working-directory", "", "Directory where files can be downloaded for compaction.")
f.StringVar(&cfg.SharedStoreType, "boltdb.shipper.compactor.shared-store", "", "Shared store used for storing boltdb files. Supported types: gcs, s3, azure, swift, filesystem")
f.StringVar(&cfg.SharedStoreKeyPrefix, "boltdb.shipper.compactor.shared-store.key-prefix", "index/", "Prefix to add to Object Keys in Shared store. Path separator(if any) should always be a '/'. Prefix should never start with a separator but should always end with it.")
f.DurationVar(&cfg.CompactionInterval, "boltdb.shipper.compactor.compaction-interval", 2*time.Hour, "Interval at which to re-run the compaction operation.")
}
@ -42,6 +44,10 @@ func (cfg *Config) IsDefaults() bool {
return reflect.DeepEqual(cfg, cpy)
}
func (cfg *Config) Validate() error {
return shipper_util.ValidateSharedStoreKeyPrefix(cfg.SharedStoreKeyPrefix)
}
type Compactor struct {
services.Service
@ -68,7 +74,7 @@ func NewCompactor(cfg Config, storageConfig storage.Config, r prometheus.Registe
compactor := Compactor{
cfg: cfg,
objectClient: util.NewPrefixedObjectClient(objectClient, shipper.StorageKeyPrefix),
objectClient: util.NewPrefixedObjectClient(objectClient, cfg.SharedStoreKeyPrefix),
metrics: newMetrics(r),
}

@ -18,7 +18,8 @@ func TestIsDefaults(t *testing.T) {
}, false},
{&Config{}, false},
{&Config{
CompactionInterval: 2 * time.Hour,
SharedStoreKeyPrefix: "index/",
CompactionInterval: 2 * time.Hour,
}, true},
} {
t.Run(fmt.Sprint(i), func(t *testing.T) {

@ -22,6 +22,7 @@ import (
"github.com/grafana/loki/pkg/storage/stores/shipper/downloads"
"github.com/grafana/loki/pkg/storage/stores/shipper/uploads"
shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util"
"github.com/grafana/loki/pkg/storage/stores/util"
)
@ -39,8 +40,6 @@ const (
// FilesystemObjectStoreType holds the periodic config type for the filesystem store
FilesystemObjectStoreType = "filesystem"
StorageKeyPrefix = "index/"
// UploadInterval defines interval for when we check if there are new index files to upload.
// It's also used to snapshot the currently written index tables so the snapshots can be used for reads.
UploadInterval = 1 * time.Minute
@ -56,6 +55,7 @@ type boltDBIndexClient interface {
type Config struct {
ActiveIndexDirectory string `yaml:"active_index_directory"`
SharedStoreType string `yaml:"shared_store"`
SharedStoreKeyPrefix string `yaml:"shared_store_key_prefix"`
CacheLocation string `yaml:"cache_location"`
CacheTTL time.Duration `yaml:"cache_ttl"`
ResyncInterval time.Duration `yaml:"resync_interval"`
@ -69,12 +69,17 @@ type Config struct {
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.ActiveIndexDirectory, "boltdb.shipper.active-index-directory", "", "Directory where ingesters would write boltdb files which would then be uploaded by shipper to configured storage")
f.StringVar(&cfg.SharedStoreType, "boltdb.shipper.shared-store", "", "Shared store for keeping boltdb files. Supported types: gcs, s3, azure, filesystem")
f.StringVar(&cfg.SharedStoreKeyPrefix, "boltdb.shipper.shared-store.key-prefix", "index/", "Prefix to add to Object Keys in Shared store. Path separator(if any) should always be a '/'. Prefix should never start with a separator but should always end with it")
f.StringVar(&cfg.CacheLocation, "boltdb.shipper.cache-location", "", "Cache location for restoring boltDB files for queries")
f.DurationVar(&cfg.CacheTTL, "boltdb.shipper.cache-ttl", 24*time.Hour, "TTL for boltDB files restored in cache for queries")
f.DurationVar(&cfg.ResyncInterval, "boltdb.shipper.resync-interval", 5*time.Minute, "Resync downloaded files with the storage")
f.IntVar(&cfg.QueryReadyNumDays, "boltdb.shipper.query-ready-num-days", 0, "Number of days of index to be kept downloaded for queries. Works only with tables created with 24h period.")
}
func (cfg *Config) Validate() error {
return shipper_util.ValidateSharedStoreKeyPrefix(cfg.SharedStoreKeyPrefix)
}
type Shipper struct {
cfg Config
boltDBIndexClient boltDBIndexClient
@ -116,7 +121,7 @@ func (s *Shipper) init(storageClient chunk.ObjectClient, registerer prometheus.R
return err
}
prefixedObjectClient := util.NewPrefixedObjectClient(storageClient, StorageKeyPrefix)
prefixedObjectClient := util.NewPrefixedObjectClient(storageClient, s.cfg.SharedStoreKeyPrefix)
if s.cfg.Mode != ModeReadOnly {
uploader, err := s.getUploaderName()

@ -21,8 +21,8 @@ type boltDBShipperTableClient struct {
objectClient chunk.ObjectClient
}
func NewBoltDBShipperTableClient(objectClient chunk.ObjectClient) chunk.TableClient {
return &boltDBShipperTableClient{util.NewPrefixedObjectClient(objectClient, StorageKeyPrefix)}
func NewBoltDBShipperTableClient(objectClient chunk.ObjectClient, storageKeyPrefix string) chunk.TableClient {
return &boltDBShipperTableClient{util.NewPrefixedObjectClient(objectClient, storageKeyPrefix)}
}
func (b *boltDBShipperTableClient) ListTables(ctx context.Context) ([]string, error) {

@ -36,7 +36,7 @@ func TestBoltDBShipperTableClient(t *testing.T) {
}
// we need to use prefixed object client while creating files/folder
prefixedObjectClient := util.NewPrefixedObjectClient(objectClient, StorageKeyPrefix)
prefixedObjectClient := util.NewPrefixedObjectClient(objectClient, "index/")
for folder, files := range foldersWithFiles {
for _, fileName := range files {
@ -45,7 +45,7 @@ func TestBoltDBShipperTableClient(t *testing.T) {
}
}
tableClient := NewBoltDBShipperTableClient(objectClient)
tableClient := NewBoltDBShipperTableClient(objectClient, "index/")
// check list of tables returns all the folders/tables created above
checkExpectedTables(t, tableClient, foldersWithFiles)

@ -2,6 +2,7 @@ package util
import (
"context"
"errors"
"fmt"
"io"
"os"
@ -17,6 +18,8 @@ import (
"github.com/grafana/loki/pkg/chunkenc"
)
const delimiter = "/"
type StorageClient interface {
GetObject(ctx context.Context, objectKey string) (io.ReadCloser, error)
}
@ -172,3 +175,19 @@ func RemoveDirectories(incoming []chunk.StorageObject) []chunk.StorageObject {
func IsDirectory(key string) bool {
return strings.HasSuffix(key, "/")
}
func ValidateSharedStoreKeyPrefix(prefix string) error {
if prefix == "" {
return errors.New("shared store key prefix must be set")
} else if strings.Contains(prefix, "\\") {
// When using windows filesystem as object store the implementation of ObjectClient in Cortex takes care of conversion of separator.
// We just need to always use `/` as a path separator.
return fmt.Errorf("shared store key prefix should only have '%s' as a path separator", delimiter)
} else if strings.HasPrefix(prefix, delimiter) {
return errors.New("shared store key prefix should never start with a path separator i.e '/'")
} else if !strings.HasSuffix(prefix, delimiter) {
return errors.New("shared store key prefix should end with a path separator i.e '/'")
}
return nil
}

Loading…
Cancel
Save