diff --git a/pkg/storage/stores/shipper/compactor/compactor.go b/pkg/storage/stores/shipper/compactor/compactor.go index 53dc4de1b6..de41f6a5c6 100644 --- a/pkg/storage/stores/shipper/compactor/compactor.go +++ b/pkg/storage/stores/shipper/compactor/compactor.go @@ -7,7 +7,6 @@ import ( "path/filepath" "reflect" "strconv" - "strings" "sync" "time" @@ -18,19 +17,16 @@ import ( "github.com/prometheus/common/model" loki_storage "github.com/grafana/loki/pkg/storage" - "github.com/grafana/loki/pkg/storage/chunk" "github.com/grafana/loki/pkg/storage/chunk/local" "github.com/grafana/loki/pkg/storage/chunk/objectclient" "github.com/grafana/loki/pkg/storage/chunk/storage" chunk_util "github.com/grafana/loki/pkg/storage/chunk/util" "github.com/grafana/loki/pkg/storage/stores/shipper/compactor/deletion" "github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention" + shipper_storage "github.com/grafana/loki/pkg/storage/stores/shipper/storage" shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util" - "github.com/grafana/loki/pkg/storage/stores/util" ) -const delimiter = "/" - type Config struct { WorkingDirectory string `yaml:"working_directory"` SharedStoreType string `yaml:"shared_store"` @@ -73,7 +69,7 @@ type Compactor struct { services.Service cfg Config - objectClient chunk.ObjectClient + indexStorageClient shipper_storage.Client tableMarker retention.TableMarker sweeper *retention.Sweeper deleteRequestsStore deletion.DeleteRequestsStore @@ -110,7 +106,7 @@ func (c *Compactor) init(storageConfig storage.Config, schemaConfig loki_storage if err != nil { return err } - c.objectClient = util.NewPrefixedObjectClient(objectClient, c.cfg.SharedStoreKeyPrefix) + c.indexStorageClient = shipper_storage.NewIndexStorageClient(objectClient, c.cfg.SharedStoreKeyPrefix) c.metrics = newMetrics(r) if c.cfg.RetentionEnabled { @@ -129,7 +125,7 @@ func (c *Compactor) init(storageConfig storage.Config, schemaConfig loki_storage deletionWorkDir := filepath.Join(c.cfg.WorkingDirectory, "deletion") - c.deleteRequestsStore, err = deletion.NewDeleteStore(deletionWorkDir, c.objectClient) + c.deleteRequestsStore, err = deletion.NewDeleteStore(deletionWorkDir, c.indexStorageClient) if err != nil { return err } @@ -196,7 +192,7 @@ func (c *Compactor) loop(ctx context.Context) error { } func (c *Compactor) CompactTable(ctx context.Context, tableName string) error { - table, err := newTable(ctx, filepath.Join(c.cfg.WorkingDirectory, tableName), c.objectClient, c.cfg.RetentionEnabled, c.tableMarker) + table, err := newTable(ctx, filepath.Join(c.cfg.WorkingDirectory, tableName), c.indexStorageClient, c.cfg.RetentionEnabled, c.tableMarker) if err != nil { level.Error(util_log.Logger).Log("msg", "failed to initialize table for compaction", "table", tableName, "err", err) return err @@ -240,17 +236,12 @@ func (c *Compactor) RunCompaction(ctx context.Context) error { } }() - _, dirs, err := c.objectClient.List(ctx, "", delimiter) + tables, err := c.indexStorageClient.ListTables(ctx) if err != nil { status = statusFailure return err } - tables := make([]string, len(dirs)) - for i, dir := range dirs { - tables[i] = strings.TrimSuffix(string(dir), delimiter) - } - compactTablesChan := make(chan string) errChan := make(chan error) diff --git a/pkg/storage/stores/shipper/compactor/deletion/delete_requests_store.go b/pkg/storage/stores/shipper/compactor/deletion/delete_requests_store.go index 845bdbe601..b14bdb3bae 100644 --- a/pkg/storage/stores/shipper/compactor/deletion/delete_requests_store.go +++ b/pkg/storage/stores/shipper/compactor/deletion/delete_requests_store.go @@ -15,6 +15,7 @@ import ( "github.com/prometheus/common/model" "github.com/grafana/loki/pkg/storage/chunk" + "github.com/grafana/loki/pkg/storage/stores/shipper/storage" ) type ( @@ -53,8 +54,8 @@ type deleteRequestsStore struct { } // NewDeleteStore creates a store for managing delete requests. -func NewDeleteStore(workingDirectory string, objectClient chunk.ObjectClient) (DeleteRequestsStore, error) { - indexClient, err := newDeleteRequestsTable(workingDirectory, objectClient) +func NewDeleteStore(workingDirectory string, indexStorageClient storage.Client) (DeleteRequestsStore, error) { + indexClient, err := newDeleteRequestsTable(workingDirectory, indexStorageClient) if err != nil { return nil, err } diff --git a/pkg/storage/stores/shipper/compactor/deletion/delete_requests_store_test.go b/pkg/storage/stores/shipper/compactor/deletion/delete_requests_store_test.go index 0502bb950c..59e2423118 100644 --- a/pkg/storage/stores/shipper/compactor/deletion/delete_requests_store_test.go +++ b/pkg/storage/stores/shipper/compactor/deletion/delete_requests_store_test.go @@ -10,6 +10,8 @@ import ( "testing" "time" + "github.com/grafana/loki/pkg/storage/stores/shipper/storage" + "github.com/prometheus/common/model" "github.com/stretchr/testify/require" @@ -57,7 +59,7 @@ func TestDeleteRequestsStore(t *testing.T) { Directory: objectStorePath, }) require.NoError(t, err) - testDeleteRequestsStore, err := NewDeleteStore(workingDir, objectClient) + testDeleteRequestsStore, err := NewDeleteStore(workingDir, storage.NewIndexStorageClient(objectClient, "")) require.NoError(t, err) defer testDeleteRequestsStore.Stop() diff --git a/pkg/storage/stores/shipper/compactor/deletion/delete_requests_table.go b/pkg/storage/stores/shipper/compactor/deletion/delete_requests_table.go index 61fbe18d22..9d3f36155d 100644 --- a/pkg/storage/stores/shipper/compactor/deletion/delete_requests_table.go +++ b/pkg/storage/stores/shipper/compactor/deletion/delete_requests_table.go @@ -17,12 +17,13 @@ import ( "github.com/grafana/loki/pkg/chunkenc" "github.com/grafana/loki/pkg/storage/chunk" "github.com/grafana/loki/pkg/storage/chunk/local" + "github.com/grafana/loki/pkg/storage/stores/shipper/storage" shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util" ) type deleteRequestsTable struct { - objectClient chunk.ObjectClient - dbPath string + indexStorageClient storage.Client + dbPath string boltdbIndexClient *local.BoltIndexClient db *bbolt.DB @@ -30,9 +31,9 @@ type deleteRequestsTable struct { wg sync.WaitGroup } -const objectPathInStorage = DeleteRequestsTableName + "/" + DeleteRequestsTableName + ".gz" +const deleteRequestsIndexFileName = DeleteRequestsTableName + ".gz" -func newDeleteRequestsTable(workingDirectory string, objectClient chunk.ObjectClient) (chunk.IndexClient, error) { +func newDeleteRequestsTable(workingDirectory string, indexStorageClient storage.Client) (chunk.IndexClient, error) { dbPath := filepath.Join(workingDirectory, DeleteRequestsTableName, DeleteRequestsTableName) boltdbIndexClient, err := local.NewBoltDBIndexClient(local.BoltDBConfig{Directory: filepath.Dir(dbPath)}) if err != nil { @@ -40,10 +41,10 @@ func newDeleteRequestsTable(workingDirectory string, objectClient chunk.ObjectCl } table := &deleteRequestsTable{ - objectClient: objectClient, - dbPath: dbPath, - boltdbIndexClient: boltdbIndexClient, - done: make(chan struct{}), + indexStorageClient: indexStorageClient, + dbPath: dbPath, + boltdbIndexClient: boltdbIndexClient, + done: make(chan struct{}), } err = table.init() @@ -64,8 +65,8 @@ func (t *deleteRequestsTable) init() error { _, err := os.Stat(t.dbPath) if err != nil { - err = shipper_util.GetFileFromStorage(context.Background(), t.objectClient, objectPathInStorage, t.dbPath, true) - if err != nil && !t.objectClient.IsObjectNotFoundErr(err) { + err = shipper_util.GetFileFromStorage(context.Background(), t.indexStorageClient, DeleteRequestsTableName, deleteRequestsIndexFileName, t.dbPath, true) + if err != nil && !t.indexStorageClient.IsFileNotFoundErr(err) { return err } } @@ -139,7 +140,7 @@ func (t *deleteRequestsTable) uploadFile() error { return err } - return t.objectClient.PutObject(context.Background(), objectPathInStorage, f) + return t.indexStorageClient.PutFile(context.Background(), DeleteRequestsTableName, deleteRequestsIndexFileName, f) } func (t *deleteRequestsTable) Stop() { diff --git a/pkg/storage/stores/shipper/compactor/deletion/delete_requests_table_test.go b/pkg/storage/stores/shipper/compactor/deletion/delete_requests_table_test.go index dba3b08222..3e9e2e97ca 100644 --- a/pkg/storage/stores/shipper/compactor/deletion/delete_requests_table_test.go +++ b/pkg/storage/stores/shipper/compactor/deletion/delete_requests_table_test.go @@ -11,7 +11,7 @@ import ( "github.com/grafana/loki/pkg/storage/chunk" "github.com/grafana/loki/pkg/storage/chunk/local" - + "github.com/grafana/loki/pkg/storage/stores/shipper/storage" "github.com/grafana/loki/pkg/storage/stores/shipper/testutil" "github.com/grafana/loki/pkg/storage/stores/shipper/util" ) @@ -31,7 +31,7 @@ func TestDeleteRequestsTable(t *testing.T) { Directory: objectStorePath, }) require.NoError(t, err) - indexClient, err := newDeleteRequestsTable(workingDir, objectClient) + indexClient, err := newDeleteRequestsTable(workingDir, storage.NewIndexStorageClient(objectClient, "")) require.NoError(t, err) // see if delete requests db was created @@ -49,7 +49,7 @@ func TestDeleteRequestsTable(t *testing.T) { // upload the file to the storage require.NoError(t, testDeleteRequestsTable.uploadFile()) - storageFilePath := filepath.Join(objectStorePath, objectPathInStorage) + storageFilePath := filepath.Join(objectStorePath, DeleteRequestsTableName+"/"+DeleteRequestsTableName+".gz") require.FileExists(t, storageFilePath) // validate records in the storage db @@ -70,7 +70,7 @@ func TestDeleteRequestsTable(t *testing.T) { require.NoError(t, err) // re-create table to see if the db gets downloaded locally since it does not exist anymore - indexClient, err = newDeleteRequestsTable(workingDir, objectClient) + indexClient, err = newDeleteRequestsTable(workingDir, storage.NewIndexStorageClient(objectClient, "")) require.NoError(t, err) defer indexClient.Stop() diff --git a/pkg/storage/stores/shipper/compactor/table.go b/pkg/storage/stores/shipper/compactor/table.go index 9bc7a21a60..2555ed8432 100644 --- a/pkg/storage/stores/shipper/compactor/table.go +++ b/pkg/storage/stores/shipper/compactor/table.go @@ -15,10 +15,9 @@ import ( "github.com/go-kit/kit/log/level" "go.etcd.io/bbolt" - "github.com/grafana/loki/pkg/storage/chunk" chunk_util "github.com/grafana/loki/pkg/storage/chunk/util" "github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention" - "github.com/grafana/loki/pkg/storage/stores/shipper/util" + "github.com/grafana/loki/pkg/storage/stores/shipper/storage" shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util" ) @@ -37,11 +36,11 @@ type indexEntry struct { } type table struct { - name string - workingDirectory string - storageClient chunk.ObjectClient - applyRetention bool - tableMarker retention.TableMarker + name string + workingDirectory string + indexStorageClient storage.Client + applyRetention bool + tableMarker retention.TableMarker compactedDB *bbolt.DB logger log.Logger @@ -50,20 +49,20 @@ type table struct { quit chan struct{} } -func newTable(ctx context.Context, workingDirectory string, objectClient chunk.ObjectClient, applyRetention bool, tableMarker retention.TableMarker) (*table, error) { +func newTable(ctx context.Context, workingDirectory string, indexStorageClient storage.Client, applyRetention bool, tableMarker retention.TableMarker) (*table, error) { err := chunk_util.EnsureDirectory(workingDirectory) if err != nil { return nil, err } table := table{ - ctx: ctx, - name: filepath.Base(workingDirectory), - workingDirectory: workingDirectory, - storageClient: objectClient, - quit: make(chan struct{}), - applyRetention: applyRetention, - tableMarker: tableMarker, + ctx: ctx, + name: filepath.Base(workingDirectory), + workingDirectory: workingDirectory, + indexStorageClient: indexStorageClient, + quit: make(chan struct{}), + applyRetention: applyRetention, + tableMarker: tableMarker, } table.logger = log.With(util_log.Logger, "table-name", table.name) @@ -71,12 +70,12 @@ func newTable(ctx context.Context, workingDirectory string, objectClient chunk.O } func (t *table) compact(tableHasExpiredStreams bool) error { - objects, err := util.ListDirectory(t.ctx, t.name, t.storageClient) + indexFiles, err := t.indexStorageClient.ListFiles(t.ctx, t.name) if err != nil { return err } - level.Info(t.logger).Log("msg", "listed files", "count", len(objects)) + level.Info(t.logger).Log("msg", "listed files", "count", len(indexFiles)) defer func() { err := t.cleanup() @@ -88,11 +87,11 @@ func (t *table) compact(tableHasExpiredStreams bool) error { applyRetention := t.applyRetention && tableHasExpiredStreams if !applyRetention { - if len(objects) < compactMinDBs { - level.Info(t.logger).Log("msg", fmt.Sprintf("skipping compaction since we have just %d files in storage", len(objects))) + if len(indexFiles) < compactMinDBs { + level.Info(t.logger).Log("msg", fmt.Sprintf("skipping compaction since we have just %d files in storage", len(indexFiles))) return nil } - if err := t.compactFiles(objects); err != nil { + if err := t.compactFiles(indexFiles); err != nil { return err } // upload the compacted db @@ -102,7 +101,7 @@ func (t *table) compact(tableHasExpiredStreams bool) error { } // remove source files from storage which were compacted - err = t.removeObjectsFromStorage(objects) + err = t.removeFilesFromStorage(indexFiles) if err != nil { return err } @@ -110,17 +109,17 @@ func (t *table) compact(tableHasExpiredStreams bool) error { } var compacted bool - if len(objects) > 1 { - if err := t.compactFiles(objects); err != nil { + if len(indexFiles) > 1 { + if err := t.compactFiles(indexFiles); err != nil { return err } compacted = true } - if len(objects) == 1 { + if len(indexFiles) == 1 { // download the db downloadAt := filepath.Join(t.workingDirectory, fmt.Sprint(time.Now().Unix())) - err = shipper_util.GetFileFromStorage(t.ctx, t.storageClient, objects[0].Key, downloadAt, false) + err = shipper_util.GetFileFromStorage(t.ctx, t.indexStorageClient, t.name, indexFiles[0].Name, downloadAt, false) if err != nil { return err } @@ -141,7 +140,7 @@ func (t *table) compact(tableHasExpiredStreams bool) error { } if empty { - return t.removeObjectsFromStorage(objects) + return t.removeFilesFromStorage(indexFiles) } if markCount == 0 && !compacted { @@ -154,22 +153,19 @@ func (t *table) compact(tableHasExpiredStreams bool) error { return err } - return t.removeObjectsFromStorage(objects) + return t.removeFilesFromStorage(indexFiles) } -func (t *table) compactFiles(objects []chunk.StorageObject) error { +func (t *table) compactFiles(files []storage.IndexFile) error { var err error level.Info(t.logger).Log("msg", "starting compaction of dbs") compactedDBName := filepath.Join(t.workingDirectory, fmt.Sprint(time.Now().Unix())) - seedFileIdx, err := findSeedObjectIdx(objects) - if err != nil { - return err - } + seedFileIdx := findSeedFileIdx(files) - level.Info(t.logger).Log("msg", fmt.Sprintf("using %s as seed file", objects[seedFileIdx].Key)) + level.Info(t.logger).Log("msg", fmt.Sprintf("using %s as seed file", files[seedFileIdx].Name)) - err = shipper_util.GetFileFromStorage(t.ctx, t.storageClient, objects[seedFileIdx].Key, compactedDBName, false) + err = shipper_util.GetFileFromStorage(t.ctx, t.indexStorageClient, t.name, files[seedFileIdx].Name, compactedDBName, false) if err != nil { return err } @@ -180,8 +176,8 @@ func (t *table) compactFiles(objects []chunk.StorageObject) error { } errChan := make(chan error) - readObjectChan := make(chan string) - n := util_math.Min(len(objects), readDBsParallelism) + readFileChan := make(chan string) + n := util_math.Min(len(files), readDBsParallelism) // read files in parallel for i := 0; i < n; i++ { @@ -193,32 +189,21 @@ func (t *table) compactFiles(objects []chunk.StorageObject) error { for { select { - case objectKey, ok := <-readObjectChan: + case fileName, ok := <-readFileChan: if !ok { return } - // The s3 client can also return the directory itself in the ListObjects. - if shipper_util.IsDirectory(objectKey) { - continue - } - - var dbName string - dbName, err = shipper_util.GetDBNameFromObjectKey(objectKey) - if err != nil { - return - } - - downloadAt := filepath.Join(t.workingDirectory, dbName) + downloadAt := filepath.Join(t.workingDirectory, fileName) - err = shipper_util.GetFileFromStorage(t.ctx, t.storageClient, objectKey, downloadAt, false) + err = shipper_util.GetFileFromStorage(t.ctx, t.indexStorageClient, t.name, fileName, downloadAt, false) if err != nil { return } err = t.readFile(downloadAt) if err != nil { - level.Error(t.logger).Log("msg", fmt.Sprintf("error reading file %s", objectKey), "err", err) + level.Error(t.logger).Log("msg", fmt.Sprintf("error reading file %s", fileName), "err", err) return } case <-t.quit: @@ -230,15 +215,15 @@ func (t *table) compactFiles(objects []chunk.StorageObject) error { }() } - // send all files to readObjectChan + // send all files to readFileChan go func() { - for i, object := range objects { + for i, file := range files { // skip seed file if i == seedFileIdx { continue } select { - case readObjectChan <- object.Key: + case readFileChan <- file.Name: case <-t.quit: break case <-t.ctx.Done(): @@ -246,9 +231,9 @@ func (t *table) compactFiles(objects []chunk.StorageObject) error { } } - level.Debug(t.logger).Log("msg", "closing readObjectChan") + level.Debug(t.logger).Log("msg", "closing readFileChan") - close(readObjectChan) + close(readFileChan) }() var firstErr error @@ -403,18 +388,18 @@ func (t *table) upload() error { } }() - objectKey := fmt.Sprintf("%s.gz", shipper_util.BuildObjectKey(t.name, uploaderName, fmt.Sprint(time.Now().Unix()))) - level.Info(t.logger).Log("msg", "uploading the compacted file", "objectKey", objectKey) + fileName := fmt.Sprintf("%s.gz", shipper_util.BuildIndexFileName(t.name, uploaderName, fmt.Sprint(time.Now().Unix()))) + level.Info(t.logger).Log("msg", "uploading the compacted file", "fileName", fileName) - return t.storageClient.PutObject(t.ctx, objectKey, compressedDB) + return t.indexStorageClient.PutFile(t.ctx, t.name, fileName, compressedDB) } -// removeObjectsFromStorage deletes objects from storage. -func (t *table) removeObjectsFromStorage(objects []chunk.StorageObject) error { - level.Info(t.logger).Log("msg", "removing source db files from storage", "count", len(objects)) +// removeFilesFromStorage deletes index files from storage. +func (t *table) removeFilesFromStorage(files []storage.IndexFile) error { + level.Info(t.logger).Log("msg", "removing source db files from storage", "count", len(files)) - for _, object := range objects { - err := t.storageClient.DeleteObject(t.ctx, object.Key) + for _, file := range files { + err := t.indexStorageClient.DeleteFile(t.ctx, t.name, file.Name) if err != nil { return err } @@ -437,21 +422,16 @@ func openBoltdbFileWithNoSync(path string) (*bbolt.DB, error) { return boltdb, nil } -// findSeedObjectIdx returns index of object to use as seed which would then get index from all the files written to. +// findSeedFileIdx returns index of file to use as seed which would then get index from all the files written to. // It tries to find previously compacted file(which has uploaderName) which would be the biggest file. // In a large cluster, using previously compacted file as seed would significantly reduce compaction time. // If it can't find a previously compacted file, it would just use the first file from the list of files. -func findSeedObjectIdx(objects []chunk.StorageObject) (int, error) { - for i, object := range objects { - dbName, err := shipper_util.GetDBNameFromObjectKey(object.Key) - if err != nil { - return 0, err - } - - if strings.HasPrefix(dbName, uploaderName) { - return i, nil +func findSeedFileIdx(files []storage.IndexFile) int { + for i, file := range files { + if strings.HasPrefix(file.Name, uploaderName) { + return i } } - return 0, nil + return 0 } diff --git a/pkg/storage/stores/shipper/compactor/table_test.go b/pkg/storage/stores/shipper/compactor/table_test.go index 0006f7155e..e732483339 100644 --- a/pkg/storage/stores/shipper/compactor/table_test.go +++ b/pkg/storage/stores/shipper/compactor/table_test.go @@ -14,6 +14,7 @@ import ( "github.com/grafana/loki/pkg/storage/chunk/local" "github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention" + "github.com/grafana/loki/pkg/storage/stores/shipper/storage" "github.com/grafana/loki/pkg/storage/stores/shipper/testutil" ) @@ -78,7 +79,7 @@ func TestTable_Compaction(t *testing.T) { objectClient, err := local.NewFSObjectClient(local.FSConfig{Directory: objectStoragePath}) require.NoError(t, err) - table, err := newTable(context.Background(), tableWorkingDirectory, objectClient, false, nil) + table, err := newTable(context.Background(), tableWorkingDirectory, storage.NewIndexStorageClient(objectClient, ""), false, nil) require.NoError(t, err) require.NoError(t, table.compact(false)) @@ -185,7 +186,7 @@ func TestTable_CompactionRetention(t *testing.T) { objectClient, err := local.NewFSObjectClient(local.FSConfig{Directory: objectStoragePath}) require.NoError(t, err) - table, err := newTable(context.Background(), tableWorkingDirectory, objectClient, true, tt.tableMarker) + table, err := newTable(context.Background(), tableWorkingDirectory, storage.NewIndexStorageClient(objectClient, ""), true, tt.tableMarker) require.NoError(t, err) require.NoError(t, table.compact(true)) @@ -228,7 +229,7 @@ func TestTable_CompactionFailure(t *testing.T) { objectClient, err := local.NewFSObjectClient(local.FSConfig{Directory: objectStoragePath}) require.NoError(t, err) - table, err := newTable(context.Background(), tableWorkingDirectory, objectClient, false, nil) + table, err := newTable(context.Background(), tableWorkingDirectory, storage.NewIndexStorageClient(objectClient, ""), false, nil) require.NoError(t, err) // compaction should fail due to a non-boltdb file. @@ -245,7 +246,7 @@ func TestTable_CompactionFailure(t *testing.T) { // remove the non-boltdb file and ensure that compaction succeeds now. require.NoError(t, os.Remove(filepath.Join(tablePathInStorage, "fail.txt"))) - table, err = newTable(context.Background(), tableWorkingDirectory, objectClient, false, nil) + table, err = newTable(context.Background(), tableWorkingDirectory, storage.NewIndexStorageClient(objectClient, ""), false, nil) require.NoError(t, err) require.NoError(t, table.compact(true)) diff --git a/pkg/storage/stores/shipper/downloads/table.go b/pkg/storage/stores/shipper/downloads/table.go index 756df33d9c..e2dbe3f2e2 100644 --- a/pkg/storage/stores/shipper/downloads/table.go +++ b/pkg/storage/stores/shipper/downloads/table.go @@ -8,7 +8,6 @@ import ( "os" "path" "path/filepath" - "strings" "sync" "time" @@ -21,6 +20,7 @@ import ( "github.com/grafana/loki/pkg/storage/chunk" chunk_util "github.com/grafana/loki/pkg/storage/chunk/util" + "github.com/grafana/loki/pkg/storage/stores/shipper/storage" shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util" ) @@ -28,7 +28,6 @@ import ( const ( downloadTimeout = 5 * time.Minute downloadParallelism = 50 - delimiter = "/" ) var bucketName = []byte("index") @@ -38,9 +37,10 @@ type BoltDBIndexClient interface { } type StorageClient interface { - GetObject(ctx context.Context, objectKey string) (io.ReadCloser, error) - List(ctx context.Context, prefix, delimiter string) ([]chunk.StorageObject, []chunk.StorageCommonPrefix, error) - IsObjectNotFoundErr(err error) bool + ListTables(ctx context.Context) ([]string, error) + ListFiles(ctx context.Context, tableName string) ([]storage.IndexFile, error) + GetFile(ctx context.Context, tableName, fileName string) (io.ReadCloser, error) + IsFileNotFoundErr(err error) bool } // Table is a collection of multiple files created for a same table by various ingesters. @@ -196,14 +196,12 @@ func (t *Table) init(ctx context.Context, spanLogger log.Logger) (err error) { startTime := time.Now() totalFilesSize := int64(0) - // The forward slash here needs to stay because we are trying to list contents of a directory without it we will get the name of the same directory back with hosted object stores. - // This is due to the object stores not having a concept of directories. - objects, _, err := t.storageClient.List(ctx, t.name+delimiter, delimiter) + files, err := t.storageClient.ListFiles(ctx, t.name) if err != nil { return } - level.Debug(util_log.Logger).Log("msg", fmt.Sprintf("list of files to download for period %s: %s", t.name, objects)) + level.Debug(util_log.Logger).Log("msg", fmt.Sprintf("list of files to download for period %s: %s", t.name, files)) folderPath, err := t.folderPathForTable(true) if err != nil { @@ -211,23 +209,16 @@ func (t *Table) init(ctx context.Context, spanLogger log.Logger) (err error) { } // download the dbs parallelly - err = t.doParallelDownload(ctx, objects, folderPath) + err = t.doParallelDownload(ctx, files, folderPath) if err != nil { return err } - level.Debug(spanLogger).Log("total-files-downloaded", len(objects)) - - objects = shipper_util.RemoveDirectories(objects) + level.Debug(spanLogger).Log("total-files-downloaded", len(files)) // open all the downloaded dbs - for _, object := range objects { - dbName, err := getDBNameFromObjectKey(object.Key) - if err != nil { - return err - } - - filePath := path.Join(folderPath, dbName) + for _, file := range files { + filePath := path.Join(folderPath, file.Name) if _, err := os.Stat(filePath); os.IsNotExist(err) { level.Info(util_log.Logger).Log("msg", fmt.Sprintf("skipping opening of non-existent file %s, possibly not downloaded due to it being removed during compaction.", filePath)) continue @@ -245,7 +236,7 @@ func (t *Table) init(ctx context.Context, spanLogger log.Logger) (err error) { totalFilesSize += stat.Size() - t.dbs[dbName] = boltdb + t.dbs[file.Name] = boltdb } duration := time.Since(startTime).Seconds() @@ -402,37 +393,28 @@ func (t *Table) Sync(ctx context.Context) error { } // checkStorageForUpdates compares files from cache with storage and builds the list of files to be downloaded from storage and to be deleted from cache -func (t *Table) checkStorageForUpdates(ctx context.Context) (toDownload []chunk.StorageObject, toDelete []string, err error) { +func (t *Table) checkStorageForUpdates(ctx context.Context) (toDownload []storage.IndexFile, toDelete []string, err error) { // listing tables from store - var objects []chunk.StorageObject + var files []storage.IndexFile - // The forward slash here needs to stay because we are trying to list contents of a directory without it we will get the name of the same directory back with hosted object stores. - // This is due to the object stores not having a concept of directories. - objects, _, err = t.storageClient.List(ctx, t.name+delimiter, delimiter) + files, err = t.storageClient.ListFiles(ctx, t.name) if err != nil { return } - listedDBs := make(map[string]struct{}, len(objects)) + listedDBs := make(map[string]struct{}, len(files)) t.dbsMtx.RLock() defer t.dbsMtx.RUnlock() - objects = shipper_util.RemoveDirectories(objects) - - for _, object := range objects { - - dbName, err := getDBNameFromObjectKey(object.Key) - if err != nil { - return nil, nil, err - } - listedDBs[dbName] = struct{}{} + for _, file := range files { + listedDBs[file.Name] = struct{}{} // Checking whether file was already downloaded, if not, download it. // We do not ever upload files in the object store with the same name but different contents so we do not consider downloading modified files again. - _, ok := t.dbs[dbName] + _, ok := t.dbs[file.Name] if !ok { - toDownload = append(toDownload, object) + toDownload = append(toDownload, file) } } @@ -446,20 +428,16 @@ func (t *Table) checkStorageForUpdates(ctx context.Context) (toDownload []chunk. } // downloadFile first downloads file to a temp location so that we can close the existing db(if already exists), replace it with new one and then reopen it. -func (t *Table) downloadFile(ctx context.Context, storageObject chunk.StorageObject) error { - level.Info(util_log.Logger).Log("msg", fmt.Sprintf("downloading object from storage with key %s", storageObject.Key)) +func (t *Table) downloadFile(ctx context.Context, file storage.IndexFile) error { + level.Info(util_log.Logger).Log("msg", fmt.Sprintf("downloading object from storage with key %s", file.Name)) - dbName, err := getDBNameFromObjectKey(storageObject.Key) - if err != nil { - return err - } folderPath, _ := t.folderPathForTable(false) - filePath := path.Join(folderPath, dbName) + filePath := path.Join(folderPath, file.Name) - err = shipper_util.GetFileFromStorage(ctx, t.storageClient, storageObject.Key, filePath, true) + err := shipper_util.GetFileFromStorage(ctx, t.storageClient, t.name, file.Name, filePath, true) if err != nil { - if t.storageClient.IsObjectNotFoundErr(err) { - level.Info(util_log.Logger).Log("msg", fmt.Sprintf("ignoring missing object %s, possibly removed during compaction", storageObject.Key)) + if t.storageClient.IsFileNotFoundErr(err) { + level.Info(util_log.Logger).Log("msg", fmt.Sprintf("ignoring missing object %s, possibly removed during compaction", file.Name)) return nil } return err @@ -473,7 +451,7 @@ func (t *Table) downloadFile(ctx context.Context, storageObject chunk.StorageObj return err } - t.dbs[dbName] = boltdb + t.dbs[file.Name] = boltdb return nil } @@ -491,54 +469,31 @@ func (t *Table) folderPathForTable(ensureExists bool) (string, error) { return folderPath, nil } -func getDBNameFromObjectKey(objectKey string) (string, error) { - ss := strings.Split(objectKey, delimiter) - - if len(ss) != 2 { - return "", fmt.Errorf("invalid object key: %v", objectKey) - } - if ss[1] == "" { - return "", fmt.Errorf("empty db name, object key: %v", objectKey) - } - return ss[1], nil -} - // doParallelDownload downloads objects(dbs) parallelly. It is upto the caller to open the dbs after the download finishes successfully. -func (t *Table) doParallelDownload(ctx context.Context, objects []chunk.StorageObject, folderPathForTable string) error { +func (t *Table) doParallelDownload(ctx context.Context, files []storage.IndexFile, folderPathForTable string) error { ctx, cancel := context.WithCancel(ctx) defer cancel() - queue := make(chan chunk.StorageObject) - n := util_math.Min(len(objects), downloadParallelism) + queue := make(chan storage.IndexFile) + n := util_math.Min(len(files), downloadParallelism) incomingErrors := make(chan error) - // Run n parallel goroutines fetching objects to download from the queue + // Run n parallel goroutines fetching files to download from the queue for i := 0; i < n; i++ { go func() { // when there is an error, break the loop and send the error to the channel to stop the operation. var err error for { - object, ok := <-queue + file, ok := <-queue if !ok { break } - // The s3 client can also return the directory itself in the ListObjects. - if shipper_util.IsDirectory(object.Key) { - continue - } - - var dbName string - dbName, err = getDBNameFromObjectKey(object.Key) - if err != nil { - break - } - - filePath := path.Join(folderPathForTable, dbName) - err = shipper_util.GetFileFromStorage(ctx, t.storageClient, object.Key, filePath, true) + filePath := path.Join(folderPathForTable, file.Name) + err = shipper_util.GetFileFromStorage(ctx, t.storageClient, t.name, file.Name, filePath, true) if err != nil { - if t.storageClient.IsObjectNotFoundErr(err) { - level.Info(util_log.Logger).Log("msg", fmt.Sprintf("ignoring missing object %s, possibly removed during compaction", object.Key)) + if t.storageClient.IsFileNotFoundErr(err) { + level.Info(util_log.Logger).Log("msg", fmt.Sprintf("ignoring missing file %s, possibly removed during compaction", file.Name)) err = nil } else { break @@ -550,11 +505,11 @@ func (t *Table) doParallelDownload(ctx context.Context, objects []chunk.StorageO }() } - // Send all the objects to download into the queue + // Send all the files to download into the queue go func() { - for _, object := range objects { + for _, file := range files { select { - case queue <- object: + case queue <- file: case <-ctx.Done(): break } diff --git a/pkg/storage/stores/shipper/downloads/table_manager.go b/pkg/storage/stores/shipper/downloads/table_manager.go index bc0f5377cf..282c95a223 100644 --- a/pkg/storage/stores/shipper/downloads/table_manager.go +++ b/pkg/storage/stores/shipper/downloads/table_manager.go @@ -8,7 +8,6 @@ import ( "path" "regexp" "strconv" - "strings" "sync" "time" @@ -35,9 +34,9 @@ type Config struct { } type TableManager struct { - cfg Config - boltIndexClient BoltDBIndexClient - storageClient StorageClient + cfg Config + boltIndexClient BoltDBIndexClient + indexStorageClient StorageClient tables map[string]*Table tablesMtx sync.RWMutex @@ -48,20 +47,20 @@ type TableManager struct { wg sync.WaitGroup } -func NewTableManager(cfg Config, boltIndexClient BoltDBIndexClient, storageClient StorageClient, registerer prometheus.Registerer) (*TableManager, error) { +func NewTableManager(cfg Config, boltIndexClient BoltDBIndexClient, indexStorageClient StorageClient, registerer prometheus.Registerer) (*TableManager, error) { if err := chunk_util.EnsureDirectory(cfg.CacheDir); err != nil { return nil, err } ctx, cancel := context.WithCancel(context.Background()) tm := &TableManager{ - cfg: cfg, - boltIndexClient: boltIndexClient, - storageClient: storageClient, - tables: make(map[string]*Table), - metrics: newMetrics(registerer), - ctx: ctx, - cancel: cancel, + cfg: cfg, + boltIndexClient: boltIndexClient, + indexStorageClient: indexStorageClient, + tables: make(map[string]*Table), + metrics: newMetrics(registerer), + ctx: ctx, + cancel: cancel, } // load the existing tables first. @@ -181,7 +180,7 @@ func (tm *TableManager) getOrCreateTable(spanCtx context.Context, tableName stri // table not found, creating one. level.Info(util_log.Logger).Log("msg", fmt.Sprintf("downloading all files for table %s", tableName)) - table = NewTable(spanCtx, tableName, tm.cfg.CacheDir, tm.storageClient, tm.boltIndexClient, tm.metrics) + table = NewTable(spanCtx, tableName, tm.cfg.CacheDir, tm.indexStorageClient, tm.boltIndexClient, tm.metrics) tm.tables[tableName] = table } tm.tablesMtx.Unlock() @@ -251,13 +250,13 @@ func (tm *TableManager) ensureQueryReadiness() error { return nil } - _, tablesInStorage, err := tm.storageClient.List(context.Background(), "", delimiter) + tableNames, err := tm.indexStorageClient.ListTables(context.Background()) if err != nil { return err } // get the names of tables required for being query ready. - tableNames, err := tm.tablesRequiredForQueryReadiness(tablesInStorage) + tableNames, err = tm.tablesRequiredForQueryReadiness(tableNames) if err != nil { return err } @@ -276,7 +275,7 @@ func (tm *TableManager) ensureQueryReadiness() error { level.Info(util_log.Logger).Log("msg", "table required for query readiness does not exist locally, downloading it", "table-name", tableName) // table doesn't exist, download it. - table, err := LoadTable(tm.ctx, tableName, tm.cfg.CacheDir, tm.storageClient, tm.boltIndexClient, tm.metrics) + table, err := LoadTable(tm.ctx, tableName, tm.cfg.CacheDir, tm.indexStorageClient, tm.boltIndexClient, tm.metrics) if err != nil { return err } @@ -298,7 +297,7 @@ func (tm *TableManager) queryReadyTableNumbersRange() (int64, int64) { // tablesRequiredForQueryReadiness returns the names of tables required to be downloaded for being query ready as per configured QueryReadyNumDays. // It only considers daily tables for simplicity and we anyways have made it mandatory to have daily tables with boltdb-shipper. -func (tm *TableManager) tablesRequiredForQueryReadiness(tablesInStorage []chunk.StorageCommonPrefix) ([]string, error) { +func (tm *TableManager) tablesRequiredForQueryReadiness(tablesInStorage []string) ([]string, error) { // regex for finding daily tables which have a 5 digit number at the end. re, err := regexp.Compile(`.+[0-9]{5}$`) if err != nil { @@ -308,9 +307,7 @@ func (tm *TableManager) tablesRequiredForQueryReadiness(tablesInStorage []chunk. minTableNumber, maxTableNumber := tm.queryReadyTableNumbersRange() var requiredTableNames []string - for _, tableNameWithSep := range tablesInStorage { - // tableNames come with a delimiter(separator) because they are directories not objects so removing the delimiter. - tableName := strings.TrimSuffix(string(tableNameWithSep), delimiter) + for _, tableName := range tablesInStorage { if !re.MatchString(tableName) { continue } @@ -342,7 +339,7 @@ func (tm *TableManager) loadLocalTables() error { level.Info(util_log.Logger).Log("msg", fmt.Sprintf("loading local table %s", fileInfo.Name())) - table, err := LoadTable(tm.ctx, fileInfo.Name(), tm.cfg.CacheDir, tm.storageClient, tm.boltIndexClient, tm.metrics) + table, err := LoadTable(tm.ctx, fileInfo.Name(), tm.cfg.CacheDir, tm.indexStorageClient, tm.boltIndexClient, tm.metrics) if err != nil { return err } diff --git a/pkg/storage/stores/shipper/downloads/table_manager_test.go b/pkg/storage/stores/shipper/downloads/table_manager_test.go index abdeb7c6c8..25c90d9a98 100644 --- a/pkg/storage/stores/shipper/downloads/table_manager_test.go +++ b/pkg/storage/stores/shipper/downloads/table_manager_test.go @@ -18,7 +18,7 @@ import ( ) func buildTestTableManager(t *testing.T, path string) (*TableManager, stopFunc) { - boltDBIndexClient, fsObjectClient := buildTestClients(t, path) + boltDBIndexClient, indexStorageClient := buildTestClients(t, path) cachePath := filepath.Join(path, cacheDirName) cfg := Config{ @@ -26,7 +26,7 @@ func buildTestTableManager(t *testing.T, path string) (*TableManager, stopFunc) SyncInterval: time.Hour, CacheTTL: time.Hour, } - tableManager, err := NewTableManager(cfg, boltDBIndexClient, fsObjectClient, nil) + tableManager, err := NewTableManager(cfg, boltDBIndexClient, indexStorageClient, nil) require.NoError(t, err) return tableManager, func() { @@ -177,7 +177,7 @@ func TestTableManager_ensureQueryReadiness(t *testing.T) { testutil.SetupDBTablesAtPath(t, name, objectStoragePath, dbs, true) } - boltDBIndexClient, fsObjectClient := buildTestClients(t, tempDir) + boltDBIndexClient, indexStorageClient := buildTestClients(t, tempDir) cachePath := filepath.Join(tempDir, cacheDirName) require.NoError(t, util.EnsureDirectory(cachePath)) @@ -188,13 +188,13 @@ func TestTableManager_ensureQueryReadiness(t *testing.T) { QueryReadyNumDays: tc.queryReadyNumDaysCfg, } tableManager := &TableManager{ - cfg: cfg, - boltIndexClient: boltDBIndexClient, - storageClient: fsObjectClient, - tables: make(map[string]*Table), - metrics: newMetrics(nil), - ctx: context.Background(), - cancel: func() {}, + cfg: cfg, + boltIndexClient: boltDBIndexClient, + indexStorageClient: indexStorageClient, + tables: make(map[string]*Table), + metrics: newMetrics(nil), + ctx: context.Background(), + cancel: func() {}, } defer func() { @@ -215,21 +215,21 @@ func TestTableManager_ensureQueryReadiness(t *testing.T) { func TestTableManager_tablesRequiredForQueryReadiness(t *testing.T) { numDailyTablesInStorage := 10 - var tablesInStorage []chunk.StorageCommonPrefix + var tablesInStorage []string // tables with daily table number activeDailyTableNumber := getActiveTableNumber() for i := 0; i < numDailyTablesInStorage; i++ { - tablesInStorage = append(tablesInStorage, chunk.StorageCommonPrefix(fmt.Sprintf("table_%d/", activeDailyTableNumber-int64(i)))) + tablesInStorage = append(tablesInStorage, fmt.Sprintf("table_%d", activeDailyTableNumber-int64(i))) } // tables with weekly table number activeWeeklyTableNumber := time.Now().Unix() / int64((durationDay*7)/time.Second) for i := 0; i < 10; i++ { - tablesInStorage = append(tablesInStorage, chunk.StorageCommonPrefix(fmt.Sprintf("table_%d/", activeWeeklyTableNumber-int64(i)))) + tablesInStorage = append(tablesInStorage, fmt.Sprintf("table_%d", activeWeeklyTableNumber-int64(i))) } // tables without a table number - tablesInStorage = append(tablesInStorage, "foo/", "bar/") + tablesInStorage = append(tablesInStorage, "foo", "bar") for i, tc := range []int{ 0, 5, 10, 20, diff --git a/pkg/storage/stores/shipper/downloads/table_test.go b/pkg/storage/stores/shipper/downloads/table_test.go index bff6a55fcf..b8625ab5f0 100644 --- a/pkg/storage/stores/shipper/downloads/table_test.go +++ b/pkg/storage/stores/shipper/downloads/table_test.go @@ -14,6 +14,7 @@ import ( "github.com/grafana/loki/pkg/storage/chunk" "github.com/grafana/loki/pkg/storage/chunk/local" + "github.com/grafana/loki/pkg/storage/stores/shipper/storage" "github.com/grafana/loki/pkg/storage/stores/shipper/testutil" ) @@ -32,18 +33,18 @@ func newStorageClientWithFakeObjectsInList(storageClient StorageClient) StorageC return storageClientWithFakeObjectsInList{storageClient} } -func (o storageClientWithFakeObjectsInList) List(ctx context.Context, prefix string, delimiter string) ([]chunk.StorageObject, []chunk.StorageCommonPrefix, error) { - objects, commonPrefixes, err := o.StorageClient.List(ctx, prefix, delimiter) +func (o storageClientWithFakeObjectsInList) ListFiles(ctx context.Context, tableName string) ([]storage.IndexFile, error) { + files, err := o.StorageClient.ListFiles(ctx, tableName) if err != nil { - return nil, nil, err + return nil, err } - objects = append(objects, chunk.StorageObject{ - Key: fmt.Sprintf(prefix, "fake-object"), + files = append(files, storage.IndexFile{ + Name: "fake-object", ModifiedAt: time.Now(), }) - return objects, commonPrefixes, nil + return files, nil } type stopFunc func() @@ -58,7 +59,7 @@ func buildTestClients(t *testing.T, path string) (*local.BoltIndexClient, Storag fsObjectClient, err := local.NewFSObjectClient(local.FSConfig{Directory: objectStoragePath}) require.NoError(t, err) - return boltDBIndexClient, fsObjectClient + return boltDBIndexClient, storage.NewIndexStorageClient(fsObjectClient, "") } func buildTestTable(t *testing.T, tableName, path string) (*Table, *local.BoltIndexClient, stopFunc) { diff --git a/pkg/storage/stores/shipper/shipper_index_client.go b/pkg/storage/stores/shipper/shipper_index_client.go index 0043a86738..67573132d2 100644 --- a/pkg/storage/stores/shipper/shipper_index_client.go +++ b/pkg/storage/stores/shipper/shipper_index_client.go @@ -21,9 +21,9 @@ import ( "github.com/grafana/loki/pkg/storage/chunk/local" chunk_util "github.com/grafana/loki/pkg/storage/chunk/util" "github.com/grafana/loki/pkg/storage/stores/shipper/downloads" + "github.com/grafana/loki/pkg/storage/stores/shipper/storage" "github.com/grafana/loki/pkg/storage/stores/shipper/uploads" shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util" - "github.com/grafana/loki/pkg/storage/stores/util" ) const ( @@ -124,7 +124,7 @@ func (s *Shipper) init(storageClient chunk.ObjectClient, registerer prometheus.R return err } - prefixedObjectClient := util.NewPrefixedObjectClient(storageClient, s.cfg.SharedStoreKeyPrefix) + indexStorageClient := storage.NewIndexStorageClient(storageClient, s.cfg.SharedStoreKeyPrefix) if s.cfg.Mode != ModeReadOnly { uploader, err := s.getUploaderName() @@ -138,7 +138,7 @@ func (s *Shipper) init(storageClient chunk.ObjectClient, registerer prometheus.R UploadInterval: UploadInterval, DBRetainPeriod: s.cfg.IngesterDBRetainPeriod, } - uploadsManager, err := uploads.NewTableManager(cfg, s.boltDBIndexClient, prefixedObjectClient, registerer) + uploadsManager, err := uploads.NewTableManager(cfg, s.boltDBIndexClient, indexStorageClient, registerer) if err != nil { return err } @@ -153,7 +153,7 @@ func (s *Shipper) init(storageClient chunk.ObjectClient, registerer prometheus.R CacheTTL: s.cfg.CacheTTL, QueryReadyNumDays: s.cfg.QueryReadyNumDays, } - downloadsManager, err := downloads.NewTableManager(cfg, s.boltDBIndexClient, prefixedObjectClient, registerer) + downloadsManager, err := downloads.NewTableManager(cfg, s.boltDBIndexClient, indexStorageClient, registerer) if err != nil { return err } diff --git a/pkg/storage/stores/shipper/storage/client.go b/pkg/storage/stores/shipper/storage/client.go new file mode 100644 index 0000000000..5ac65e4a79 --- /dev/null +++ b/pkg/storage/stores/shipper/storage/client.go @@ -0,0 +1,96 @@ +package storage + +import ( + "context" + "io" + "path" + "strings" + "time" + + "github.com/grafana/loki/pkg/storage/chunk" +) + +const delimiter = "/" + +// Client is used to manage boltdb index files in object storage, when using boltdb-shipper. +type Client interface { + ListTables(ctx context.Context) ([]string, error) + ListFiles(ctx context.Context, tableName string) ([]IndexFile, error) + GetFile(ctx context.Context, tableName, fileName string) (io.ReadCloser, error) + PutFile(ctx context.Context, tableName, fileName string, file io.ReadSeeker) error + DeleteFile(ctx context.Context, tableName, fileName string) error + IsFileNotFoundErr(err error) bool + Stop() +} + +type indexStorageClient struct { + objectClient chunk.ObjectClient + storagePrefix string +} + +type IndexFile struct { + Name string + ModifiedAt time.Time +} + +func NewIndexStorageClient(objectClient chunk.ObjectClient, storagePrefix string) Client { + return &indexStorageClient{objectClient: objectClient, storagePrefix: storagePrefix} +} + +func (s *indexStorageClient) ListTables(ctx context.Context) ([]string, error) { + _, tables, err := s.objectClient.List(ctx, s.storagePrefix, delimiter) + if err != nil { + return nil, err + } + + tableNames := make([]string, 0, len(tables)) + for _, table := range tables { + tableNames = append(tableNames, path.Base(string(table))) + } + + return tableNames, nil +} + +func (s *indexStorageClient) ListFiles(ctx context.Context, tableName string) ([]IndexFile, error) { + // The forward slash here needs to stay because we are trying to list contents of a directory without which + // we will get the name of the same directory back with hosted object stores. + // This is due to the object stores not having a concept of directories. + objects, _, err := s.objectClient.List(ctx, s.storagePrefix+tableName+delimiter, delimiter) + if err != nil { + return nil, err + } + + files := make([]IndexFile, 0, len(objects)) + for _, object := range objects { + // The s3 client can also return the directory itself in the ListObjects. + if strings.HasSuffix(object.Key, delimiter) { + continue + } + files = append(files, IndexFile{ + Name: path.Base(object.Key), + ModifiedAt: object.ModifiedAt, + }) + } + + return files, nil +} + +func (s *indexStorageClient) GetFile(ctx context.Context, tableName, fileName string) (io.ReadCloser, error) { + return s.objectClient.GetObject(ctx, s.storagePrefix+path.Join(tableName, fileName)) +} + +func (s *indexStorageClient) PutFile(ctx context.Context, tableName, fileName string, file io.ReadSeeker) error { + return s.objectClient.PutObject(ctx, s.storagePrefix+path.Join(tableName, fileName), file) +} + +func (s *indexStorageClient) DeleteFile(ctx context.Context, tableName, fileName string) error { + return s.objectClient.DeleteObject(ctx, s.storagePrefix+path.Join(tableName, fileName)) +} + +func (s *indexStorageClient) IsFileNotFoundErr(err error) bool { + return s.objectClient.IsObjectNotFoundErr(err) +} + +func (s *indexStorageClient) Stop() { + s.objectClient.Stop() +} diff --git a/pkg/storage/stores/shipper/storage/client_test.go b/pkg/storage/stores/shipper/storage/client_test.go new file mode 100644 index 0000000000..206f0cbe51 --- /dev/null +++ b/pkg/storage/stores/shipper/storage/client_test.go @@ -0,0 +1,81 @@ +package storage + +import ( + "bytes" + "context" + "io/ioutil" + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/pkg/storage/chunk/local" + "github.com/grafana/loki/pkg/storage/chunk/util" +) + +func TestIndexStorageClient(t *testing.T) { + tempDir, err := ioutil.TempDir("", "test-index-storage-client") + require.NoError(t, err) + + defer func() { + require.NoError(t, os.RemoveAll(tempDir)) + }() + + storageKeyPrefix := "prefix/" + tablesToSetup := map[string][]string{ + "table1": {"a", "b"}, + "table2": {"b", "c", "d"}, + } + + objectClient, err := local.NewFSObjectClient(local.FSConfig{Directory: tempDir}) + require.NoError(t, err) + + for tableName, files := range tablesToSetup { + require.NoError(t, util.EnsureDirectory(filepath.Join(tempDir, storageKeyPrefix, tableName))) + for _, file := range files { + err := ioutil.WriteFile(filepath.Join(tempDir, storageKeyPrefix, tableName, file), []byte(tableName+file), 0666) + require.NoError(t, err) + } + } + + indexStorageClient := NewIndexStorageClient(objectClient, storageKeyPrefix) + + verifyFiles := func() { + tables, err := indexStorageClient.ListTables(context.Background()) + require.NoError(t, err) + require.Len(t, tables, len(tablesToSetup)) + for _, table := range tables { + expectedFiles, ok := tablesToSetup[table] + require.True(t, ok) + + filesInStorage, err := indexStorageClient.ListFiles(context.Background(), table) + require.NoError(t, err) + require.Len(t, filesInStorage, len(expectedFiles)) + + for i, fileInStorage := range filesInStorage { + require.Equal(t, expectedFiles[i], fileInStorage.Name) + readCloser, err := indexStorageClient.GetFile(context.Background(), table, fileInStorage.Name) + require.NoError(t, err) + + b, err := ioutil.ReadAll(readCloser) + require.NoError(t, readCloser.Close()) + require.NoError(t, err) + require.EqualValues(t, []byte(table+fileInStorage.Name), b) + } + } + } + + // verify the files using indexStorageClient + verifyFiles() + + // delete a file and verify them again + require.NoError(t, indexStorageClient.DeleteFile(context.Background(), "table2", "d")) + tablesToSetup["table2"] = tablesToSetup["table2"][:2] + verifyFiles() + + // add a file and verify them again + require.NoError(t, indexStorageClient.PutFile(context.Background(), "table2", "e", bytes.NewReader([]byte("table2"+"e")))) + tablesToSetup["table2"] = append(tablesToSetup["table2"], "e") + verifyFiles() +} diff --git a/pkg/storage/stores/shipper/table_client.go b/pkg/storage/stores/shipper/table_client.go index 4a82121644..a52cbd708e 100644 --- a/pkg/storage/stores/shipper/table_client.go +++ b/pkg/storage/stores/shipper/table_client.go @@ -2,41 +2,22 @@ package shipper import ( "context" - "fmt" - "strings" - "github.com/go-kit/kit/log/level" - - util_log "github.com/cortexproject/cortex/pkg/util/log" + "github.com/grafana/loki/pkg/storage/stores/shipper/storage" "github.com/grafana/loki/pkg/storage/chunk" - "github.com/grafana/loki/pkg/storage/stores/util" -) - -const ( - delimiter = "/" ) type boltDBShipperTableClient struct { - objectClient chunk.ObjectClient + indexStorageClient storage.Client } func NewBoltDBShipperTableClient(objectClient chunk.ObjectClient, storageKeyPrefix string) chunk.TableClient { - return &boltDBShipperTableClient{util.NewPrefixedObjectClient(objectClient, storageKeyPrefix)} + return &boltDBShipperTableClient{storage.NewIndexStorageClient(objectClient, storageKeyPrefix)} } func (b *boltDBShipperTableClient) ListTables(ctx context.Context) ([]string, error) { - _, dirs, err := b.objectClient.List(ctx, "", delimiter) - if err != nil { - return nil, err - } - - tables := make([]string, len(dirs)) - for i, dir := range dirs { - tables[i] = strings.TrimSuffix(string(dir), delimiter) - } - - return tables, nil + return b.indexStorageClient.ListTables(ctx) } func (b *boltDBShipperTableClient) CreateTable(ctx context.Context, desc chunk.TableDesc) error { @@ -44,21 +25,17 @@ func (b *boltDBShipperTableClient) CreateTable(ctx context.Context, desc chunk.T } func (b *boltDBShipperTableClient) Stop() { - b.objectClient.Stop() + b.indexStorageClient.Stop() } -func (b *boltDBShipperTableClient) DeleteTable(ctx context.Context, name string) error { - objects, dirs, err := b.objectClient.List(ctx, name+delimiter, delimiter) +func (b *boltDBShipperTableClient) DeleteTable(ctx context.Context, tableName string) error { + files, err := b.indexStorageClient.ListFiles(ctx, tableName) if err != nil { return err } - if len(dirs) != 0 { - level.Error(util_log.Logger).Log("msg", fmt.Sprintf("unexpected directories in %s folder, not touching them", name), "directories", fmt.Sprint(dirs)) - } - - for _, object := range objects { - err := b.objectClient.DeleteObject(ctx, object.Key) + for _, file := range files { + err := b.indexStorageClient.DeleteFile(ctx, tableName, file.Name) if err != nil { return err } diff --git a/pkg/storage/stores/shipper/table_client_test.go b/pkg/storage/stores/shipper/table_client_test.go index bdc8f8d821..c3861e5400 100644 --- a/pkg/storage/stores/shipper/table_client_test.go +++ b/pkg/storage/stores/shipper/table_client_test.go @@ -13,7 +13,6 @@ import ( "github.com/grafana/loki/pkg/storage/chunk" "github.com/grafana/loki/pkg/storage/chunk/local" "github.com/grafana/loki/pkg/storage/chunk/storage" - "github.com/grafana/loki/pkg/storage/stores/util" ) func TestBoltDBShipperTableClient(t *testing.T) { @@ -34,12 +33,10 @@ func TestBoltDBShipperTableClient(t *testing.T) { "table3": {"file5", "file6"}, } - // we need to use prefixed object client while creating files/folder - prefixedObjectClient := util.NewPrefixedObjectClient(objectClient, "index/") - for folder, files := range foldersWithFiles { for _, fileName := range files { - err := prefixedObjectClient.PutObject(context.Background(), path.Join(folder, fileName), bytes.NewReader([]byte{})) + // we will use "index/" prefix for all the objects + err := objectClient.PutObject(context.Background(), path.Join("index", folder, fileName), bytes.NewReader([]byte{})) require.NoError(t, err) } } diff --git a/pkg/storage/stores/shipper/uploads/table.go b/pkg/storage/stores/shipper/uploads/table.go index be8ca655ef..13bb7246bb 100644 --- a/pkg/storage/stores/shipper/uploads/table.go +++ b/pkg/storage/stores/shipper/uploads/table.go @@ -43,7 +43,7 @@ type BoltDBIndexClient interface { } type StorageClient interface { - PutObject(ctx context.Context, objectKey string, object io.ReadSeeker) error + PutFile(ctx context.Context, tableName, fileName string, file io.ReadSeeker) error } type dbSnapshot struct { @@ -410,8 +410,8 @@ func (lt *Table) uploadDB(ctx context.Context, name string, db *bbolt.DB) error return err } - objectKey := lt.buildObjectKey(name) - return lt.storageClient.PutObject(ctx, objectKey, f) + fileName := lt.buildFileName(name) + return lt.storageClient.PutFile(ctx, lt.name, fileName, f) } // Cleanup removes dbs which are already uploaded and have not been modified for period longer than dbRetainPeriod. @@ -452,16 +452,16 @@ func (lt *Table) Cleanup(dbRetainPeriod time.Duration) error { return nil } -func (lt *Table) buildObjectKey(dbName string) string { - // Files are stored with /- - objectKey := fmt.Sprintf("%s/%s-%s", lt.name, lt.uploader, dbName) +func (lt *Table) buildFileName(dbName string) string { + // Files are stored with - + fileName := fmt.Sprintf("%s-%s", lt.uploader, dbName) // if the file is a migrated one then don't add its name to the object key otherwise we would re-upload them again here with a different name. if lt.name == dbName { - objectKey = fmt.Sprintf("%s/%s", lt.name, lt.uploader) + fileName = lt.uploader } - return fmt.Sprintf("%s.gz", objectKey) + return fmt.Sprintf("%s.gz", fileName) } func loadBoltDBsFromDir(dir string, metrics *metrics) (map[string]*bbolt.DB, error) { diff --git a/pkg/storage/stores/shipper/uploads/table_test.go b/pkg/storage/stores/shipper/uploads/table_test.go index f5828e0720..ea64528c08 100644 --- a/pkg/storage/stores/shipper/uploads/table_test.go +++ b/pkg/storage/stores/shipper/uploads/table_test.go @@ -17,6 +17,7 @@ import ( "github.com/grafana/loki/pkg/storage/chunk" "github.com/grafana/loki/pkg/storage/chunk/local" + "github.com/grafana/loki/pkg/storage/stores/shipper/storage" "github.com/grafana/loki/pkg/storage/stores/shipper/testutil" ) @@ -25,7 +26,7 @@ const ( objectsStorageDirName = "objects" ) -func buildTestClients(t *testing.T, path string) (*local.BoltIndexClient, *local.FSObjectClient) { +func buildTestClients(t *testing.T, path string) (*local.BoltIndexClient, StorageClient) { indexPath := filepath.Join(path, indexDirName) boltDBIndexClient, err := local.NewBoltDBIndexClient(local.BoltDBConfig{Directory: indexPath}) @@ -35,7 +36,7 @@ func buildTestClients(t *testing.T, path string) (*local.BoltIndexClient, *local fsObjectClient, err := local.NewFSObjectClient(local.FSConfig{Directory: objectStoragePath}) require.NoError(t, err) - return boltDBIndexClient, fsObjectClient + return boltDBIndexClient, storage.NewIndexStorageClient(fsObjectClient, "") } type stopFunc func() @@ -222,10 +223,10 @@ func compareTableWithStorage(t *testing.T, table *Table, storageDir string) { }() for name, db := range table.dbs { - objectKey := table.buildObjectKey(name) + fileName := table.buildFileName(name) // open compressed file from storage - compressedFile, err := os.Open(filepath.Join(storageDir, objectKey)) + compressedFile, err := os.Open(filepath.Join(storageDir, table.name, fileName)) require.NoError(t, err) // get a compressed reader @@ -233,7 +234,7 @@ func compareTableWithStorage(t *testing.T, table *Table, storageDir string) { require.NoError(t, err) // create a temp file for writing decompressed file - decompressedFilePath := filepath.Join(tempDir, filepath.Base(objectKey)) + decompressedFilePath := filepath.Join(tempDir, filepath.Base(fileName)) decompressedFile, err := os.Create(decompressedFilePath) require.NoError(t, err) @@ -417,7 +418,8 @@ func TestTable_ImmutableUploads(t *testing.T) { } // setup some dbs for a table at a path. - tablePath := testutil.SetupDBTablesAtPath(t, "test-table", indexPath, dbs, false) + tableName := "test-table" + tablePath := testutil.SetupDBTablesAtPath(t, tableName, indexPath, dbs, false) table, err := LoadTable(tablePath, "test", storageClient, boltDBIndexClient, newMetrics(nil)) require.NoError(t, err) @@ -440,7 +442,7 @@ func TestTable_ImmutableUploads(t *testing.T) { require.Len(t, uploadedDBs, len(expectedDBsToUpload)) for _, expectedDB := range expectedDBsToUpload { - require.FileExists(t, filepath.Join(objectStorageDir, table.buildObjectKey(fmt.Sprint(expectedDB)))) + require.FileExists(t, filepath.Join(objectStorageDir, tableName, table.buildFileName(fmt.Sprint(expectedDB)))) } // force upload of dbs @@ -453,7 +455,7 @@ func TestTable_ImmutableUploads(t *testing.T) { require.Len(t, uploadedDBs, len(expectedDBsToUpload)) for _, expectedDB := range expectedDBsToUpload { - require.FileExists(t, filepath.Join(objectStorageDir, table.buildObjectKey(fmt.Sprint(expectedDB)))) + require.FileExists(t, filepath.Join(objectStorageDir, tableName, table.buildFileName(fmt.Sprint(expectedDB)))) } // delete everything uploaded @@ -468,7 +470,7 @@ func TestTable_ImmutableUploads(t *testing.T) { // make sure nothing was re-uploaded for _, expectedDB := range expectedDBsToUpload { - require.NoFileExists(t, filepath.Join(objectStorageDir, table.buildObjectKey(fmt.Sprint(expectedDB)))) + require.NoFileExists(t, filepath.Join(objectStorageDir, tableName, table.buildFileName(fmt.Sprint(expectedDB)))) } } diff --git a/pkg/storage/stores/shipper/util/util.go b/pkg/storage/stores/shipper/util/util.go index adc2f764ae..b9b19b5ea5 100644 --- a/pkg/storage/stores/shipper/util/util.go +++ b/pkg/storage/stores/shipper/util/util.go @@ -66,13 +66,13 @@ func putGzipWriter(writer io.WriteCloser) { gzipWriter.Put(writer) } -type StorageClient interface { - GetObject(ctx context.Context, objectKey string) (io.ReadCloser, error) +type IndexStorageClient interface { + GetFile(ctx context.Context, tableName, fileName string) (io.ReadCloser, error) } // GetFileFromStorage downloads a file from storage to given location. -func GetFileFromStorage(ctx context.Context, storageClient StorageClient, objectKey, destination string, sync bool) error { - readCloser, err := storageClient.GetObject(ctx, objectKey) +func GetFileFromStorage(ctx context.Context, storageClient IndexStorageClient, tableName, fileName, destination string, sync bool) error { + readCloser, err := storageClient.GetFile(ctx, tableName, fileName) if err != nil { return err } @@ -94,7 +94,7 @@ func GetFileFromStorage(ctx context.Context, storageClient StorageClient, object } }() var objectReader io.Reader = readCloser - if strings.HasSuffix(objectKey, ".gz") { + if strings.HasSuffix(fileName, ".gz") { decompressedReader := getGzipReader(readCloser) defer putGzipReader(decompressedReader) @@ -106,32 +106,20 @@ func GetFileFromStorage(ctx context.Context, storageClient StorageClient, object return err } - level.Info(util_log.Logger).Log("msg", fmt.Sprintf("downloaded file %s", objectKey)) + level.Info(util_log.Logger).Log("msg", fmt.Sprintf("downloaded file %s from table %s", fileName, tableName)) if sync { return f.Sync() } return nil } -func GetDBNameFromObjectKey(objectKey string) (string, error) { - ss := strings.Split(objectKey, "/") - - if len(ss) != 2 { - return "", fmt.Errorf("invalid object key: %v", objectKey) - } - if ss[1] == "" { - return "", fmt.Errorf("empty db name, object key: %v", objectKey) - } - return ss[1], nil -} - -func BuildObjectKey(tableName, uploader, dbName string) string { - // Files are stored with /- - objectKey := fmt.Sprintf("%s/%s-%s", tableName, uploader, dbName) +func BuildIndexFileName(tableName, uploader, dbName string) string { + // Files are stored with - + objectKey := fmt.Sprintf("%s-%s", uploader, dbName) // if the file is a migrated one then don't add its name to the object key otherwise we would re-upload them again here with a different name. if tableName == dbName { - objectKey = fmt.Sprintf("%s/%s", tableName, uploader) + objectKey = uploader } return objectKey @@ -214,23 +202,6 @@ func safeOpenBoltDbFile(path string, ret chan *result) { res.err = err } -// RemoveDirectories will return a new slice with any StorageObjects identified as directories removed. -func RemoveDirectories(incoming []chunk.StorageObject) []chunk.StorageObject { - outgoing := make([]chunk.StorageObject, 0, len(incoming)) - for _, o := range incoming { - if IsDirectory(o.Key) { - continue - } - outgoing = append(outgoing, o) - } - return outgoing -} - -// IsDirectory will return true if the string ends in a forward slash -func IsDirectory(key string) bool { - return strings.HasSuffix(key, "/") -} - func ValidateSharedStoreKeyPrefix(prefix string) error { if prefix == "" { return errors.New("shared store key prefix must be set") @@ -247,16 +218,6 @@ func ValidateSharedStoreKeyPrefix(prefix string) error { return nil } -func ListDirectory(ctx context.Context, dirName string, objectClient chunk.ObjectClient) ([]chunk.StorageObject, error) { - // The forward slash here needs to stay because we are trying to list contents of a directory without it we will get the name of the same directory back with hosted object stores. - // This is due to the object stores not having a concept of directories. - objects, _, err := objectClient.List(ctx, dirName+delimiter, delimiter) - if err != nil { - return nil, err - } - return objects, nil -} - func QueryKey(q chunk.IndexQuery) string { ret := q.TableName + sep + q.HashValue diff --git a/pkg/storage/stores/shipper/util/util_test.go b/pkg/storage/stores/shipper/util/util_test.go index a10845c5e2..0a31dc9e8b 100644 --- a/pkg/storage/stores/shipper/util/util_test.go +++ b/pkg/storage/stores/shipper/util/util_test.go @@ -7,11 +7,11 @@ import ( "path/filepath" "testing" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/grafana/loki/pkg/storage/chunk" "github.com/grafana/loki/pkg/storage/chunk/local" + "github.com/grafana/loki/pkg/storage/chunk/util" + "github.com/grafana/loki/pkg/storage/stores/shipper/storage" "github.com/grafana/loki/pkg/storage/stores/shipper/testutil" ) @@ -25,13 +25,17 @@ func Test_GetFileFromStorage(t *testing.T) { // write a file to storage. testData := []byte("test-data") - require.NoError(t, ioutil.WriteFile(filepath.Join(tempDir, "src"), testData, 0666)) + tableName := "test-table" + require.NoError(t, util.EnsureDirectory(filepath.Join(tempDir, tableName))) + require.NoError(t, ioutil.WriteFile(filepath.Join(tempDir, tableName, "src"), testData, 0666)) // try downloading the file from the storage. objectClient, err := local.NewFSObjectClient(local.FSConfig{Directory: tempDir}) require.NoError(t, err) - require.NoError(t, GetFileFromStorage(context.Background(), objectClient, "src", filepath.Join(tempDir, "dest"), false)) + indexStorageClient := storage.NewIndexStorageClient(objectClient, "") + + require.NoError(t, GetFileFromStorage(context.Background(), indexStorageClient, tableName, "src", filepath.Join(tempDir, "dest"), false)) // verify the contents of the downloaded file. b, err := ioutil.ReadFile(filepath.Join(tempDir, "dest")) @@ -40,11 +44,11 @@ func Test_GetFileFromStorage(t *testing.T) { require.Equal(t, testData, b) // compress the file in storage - err = CompressFile(filepath.Join(tempDir, "src"), filepath.Join(tempDir, "src.gz"), true) + err = CompressFile(filepath.Join(tempDir, tableName, "src"), filepath.Join(tempDir, tableName, "src.gz"), true) require.NoError(t, err) // get the compressed file from storage - require.NoError(t, GetFileFromStorage(context.Background(), objectClient, "src.gz", filepath.Join(tempDir, "dest.gz"), false)) + require.NoError(t, GetFileFromStorage(context.Background(), indexStorageClient, tableName, "src.gz", filepath.Join(tempDir, "dest.gz"), false)) // verify the contents of the downloaded gz file. b, err = ioutil.ReadFile(filepath.Join(tempDir, "dest.gz")) @@ -78,76 +82,3 @@ func Test_CompressFile(t *testing.T) { require.Equal(t, testData, b) } - -func TestRemoveDirectories(t *testing.T) { - tests := []struct { - name string - incoming []chunk.StorageObject - expected []chunk.StorageObject - }{ - { - name: "no trailing slash", - incoming: []chunk.StorageObject{ - {Key: "obj1"}, - {Key: "obj2"}, - {Key: "obj3"}, - }, - expected: []chunk.StorageObject{ - {Key: "obj1"}, - {Key: "obj2"}, - {Key: "obj3"}, - }, - }, - { - name: "one trailing slash", - incoming: []chunk.StorageObject{ - {Key: "obj1"}, - {Key: "obj2/"}, - {Key: "obj3"}, - }, - expected: []chunk.StorageObject{ - {Key: "obj1"}, - {Key: "obj3"}, - }, - }, - { - name: "only trailing slash", - incoming: []chunk.StorageObject{ - {Key: "obj1"}, - {Key: "obj2"}, - {Key: "/"}, - }, - expected: []chunk.StorageObject{ - {Key: "obj1"}, - {Key: "obj2"}, - }, - }, - { - name: "all trailing slash", - incoming: []chunk.StorageObject{ - {Key: "/"}, - {Key: "/"}, - {Key: "/"}, - }, - expected: []chunk.StorageObject{}, - }, - { - name: "internal slash", - incoming: []chunk.StorageObject{ - {Key: "test/test1"}, - {Key: "te/st"}, - {Key: "/sted"}, - }, - expected: []chunk.StorageObject{ - {Key: "test/test1"}, - {Key: "te/st"}, - {Key: "/sted"}, - }, - }, - } - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - assert.Equal(t, test.expected, RemoveDirectories(test.incoming)) - }) - } -} diff --git a/pkg/storage/stores/util/object_client.go b/pkg/storage/stores/util/object_client.go deleted file mode 100644 index 0dec53266c..0000000000 --- a/pkg/storage/stores/util/object_client.go +++ /dev/null @@ -1,55 +0,0 @@ -package util - -import ( - "context" - "io" - "strings" - - "github.com/grafana/loki/pkg/storage/chunk" -) - -type PrefixedObjectClient struct { - downstreamClient chunk.ObjectClient - prefix string -} - -func (p PrefixedObjectClient) PutObject(ctx context.Context, objectKey string, object io.ReadSeeker) error { - return p.downstreamClient.PutObject(ctx, p.prefix+objectKey, object) -} - -func (p PrefixedObjectClient) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, error) { - return p.downstreamClient.GetObject(ctx, p.prefix+objectKey) -} - -func (p PrefixedObjectClient) List(ctx context.Context, prefix, delimeter string) ([]chunk.StorageObject, []chunk.StorageCommonPrefix, error) { - objects, commonPrefixes, err := p.downstreamClient.List(ctx, p.prefix+prefix, delimeter) - if err != nil { - return nil, nil, err - } - - for i := range objects { - objects[i].Key = strings.TrimPrefix(objects[i].Key, p.prefix) - } - - for i := range commonPrefixes { - commonPrefixes[i] = chunk.StorageCommonPrefix(strings.TrimPrefix(string(commonPrefixes[i]), p.prefix)) - } - - return objects, commonPrefixes, nil -} - -func (p PrefixedObjectClient) DeleteObject(ctx context.Context, objectKey string) error { - return p.downstreamClient.DeleteObject(ctx, p.prefix+objectKey) -} - -func (p PrefixedObjectClient) IsObjectNotFoundErr(err error) bool { - return p.downstreamClient.IsObjectNotFoundErr(err) -} - -func (p PrefixedObjectClient) Stop() { - p.downstreamClient.Stop() -} - -func NewPrefixedObjectClient(downstreamClient chunk.ObjectClient, prefix string) chunk.ObjectClient { - return PrefixedObjectClient{downstreamClient: downstreamClient, prefix: prefix} -}