From 2ed3c15c224dec896cffda2dc6967f62dcc8cda2 Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Sat, 1 Jul 2023 01:03:07 +0530 Subject: [PATCH] index: allow refreshing index file names cache for specific tables (#9798) **What this PR does / why we need it**: We have a cache for index files available in the object storage, which helps reduce the number of list calls we make to the object storage. Currently, we only support refreshing the whole index cache, which includes all the files for all tables and users. However, in a large cell with many tables and tenants, it is less likely that all the tables are being referenced. Also, most iterative operations would only need a fresh listing of just a few tables at a time so it is wasteful to always refresh the whole index cache. This PR adds support for refreshing the index cache for only specific tables. I am doing it as part of some compactor improvements to run compaction and retention in separate goroutines to avoid blocking compaction when some expensive delete requests are being processed. To make those changes, I needed a way to cost-effectively refresh the index files in the table being picked up for processing. **Checklist** - [x] Tests updated --- .../indexshipper/compactor/compactor.go | 2 +- .../stores/indexshipper/compactor/table.go | 1 + .../indexshipper/downloads/index_set.go | 2 +- .../indexshipper/downloads/index_set_test.go | 6 +- .../indexshipper/downloads/table_manager.go | 1 + .../downloads/table_manager_test.go | 2 + .../indexshipper/downloads/table_test.go | 4 +- .../indexshipper/storage/cached_client.go | 272 +++++++++++++----- .../storage/cached_client_test.go | 61 ++-- .../stores/indexshipper/storage/client.go | 11 +- .../indexshipper/storage/client_test.go | 2 +- .../stores/indexshipper/storage/index_set.go | 6 +- .../stores/indexshipper/table_client.go | 2 +- 13 files changed, 261 insertions(+), 111 deletions(-) diff --git a/pkg/storage/stores/indexshipper/compactor/compactor.go b/pkg/storage/stores/indexshipper/compactor/compactor.go index d0cf66db93..14691ded4e 100644 --- a/pkg/storage/stores/indexshipper/compactor/compactor.go +++ b/pkg/storage/stores/indexshipper/compactor/compactor.go @@ -631,7 +631,7 @@ func (c *Compactor) RunCompaction(ctx context.Context, applyRetention bool) erro var tables []string for _, sc := range c.storeContainers { // refresh index list cache since previous compaction would have changed the index files in the object store - sc.indexStorageClient.RefreshIndexListCache(ctx) + sc.indexStorageClient.RefreshIndexTableNamesCache(ctx) tbls, err := sc.indexStorageClient.ListTables(ctx) if err != nil { status = statusFailure diff --git a/pkg/storage/stores/indexshipper/compactor/table.go b/pkg/storage/stores/indexshipper/compactor/table.go index 3e4de7d840..95d6b3f31b 100644 --- a/pkg/storage/stores/indexshipper/compactor/table.go +++ b/pkg/storage/stores/indexshipper/compactor/table.go @@ -116,6 +116,7 @@ func newTable(ctx context.Context, workingDirectory string, indexStorageClient s } func (t *table) compact(applyRetention bool) error { + t.indexStorageClient.RefreshIndexTableCache(t.ctx, t.name) indexFiles, usersWithPerUserIndex, err := t.indexStorageClient.ListFiles(t.ctx, t.name, false) if err != nil { return err diff --git a/pkg/storage/stores/indexshipper/downloads/index_set.go b/pkg/storage/stores/indexshipper/downloads/index_set.go index 12b0287da1..9f1d6e9efd 100644 --- a/pkg/storage/stores/indexshipper/downloads/index_set.go +++ b/pkg/storage/stores/indexshipper/downloads/index_set.go @@ -279,7 +279,7 @@ func (t *indexSet) syncWithRetry(ctx context.Context, lock, bypassListCache bool if errors.Is(err, errIndexListCacheTooStale) && i < maxSyncRetries { level.Info(t.logger).Log("msg", "we have hit stale list cache, refreshing it before retrying") - t.baseIndexSet.RefreshIndexListCache(ctx) + t.baseIndexSet.RefreshIndexTableCache(ctx, t.tableName) } level.Error(t.logger).Log("msg", "sync failed, retrying it", "err", err) diff --git a/pkg/storage/stores/indexshipper/downloads/index_set_test.go b/pkg/storage/stores/indexshipper/downloads/index_set_test.go index 722b9ab1cb..660d0170e8 100644 --- a/pkg/storage/stores/indexshipper/downloads/index_set_test.go +++ b/pkg/storage/stores/indexshipper/downloads/index_set_test.go @@ -113,7 +113,7 @@ func TestIndexSet_Sync(t *testing.T) { indexesSetup = buildListOfExpectedIndexes("", 0, 10) // sync and verify the indexSet - indexSet.baseIndexSet.RefreshIndexListCache(context.Background()) + indexSet.baseIndexSet.RefreshIndexTableCache(context.Background(), tableName) require.NoError(t, indexSet.Sync(context.Background())) // check index set twice; first run to have new files to download, second run to test with no changes in storage. @@ -126,7 +126,7 @@ func TestIndexSet_Sync(t *testing.T) { indexesSetup = indexesSetup[1:] // sync and verify the indexSet - indexSet.baseIndexSet.RefreshIndexListCache(context.Background()) + indexSet.baseIndexSet.RefreshIndexTableCache(context.Background(), tableName) require.NoError(t, indexSet.Sync(context.Background())) checkIndexSet() @@ -135,7 +135,7 @@ func TestIndexSet_Sync(t *testing.T) { // first, let us add a new file and refresh the index list cache oneMoreDB := "one-more-db" require.NoError(t, os.WriteFile(filepath.Join(tablePathInStorage, oneMoreDB), []byte(oneMoreDB), 0755)) - indexSet.baseIndexSet.RefreshIndexListCache(context.Background()) + indexSet.baseIndexSet.RefreshIndexTableCache(context.Background(), tableName) // now, without syncing the indexset, let us compact the index in storage compactedDBName := "compacted-db" diff --git a/pkg/storage/stores/indexshipper/downloads/table_manager.go b/pkg/storage/stores/indexshipper/downloads/table_manager.go index 258a34399d..c2cb7bb6ef 100644 --- a/pkg/storage/stores/indexshipper/downloads/table_manager.go +++ b/pkg/storage/stores/indexshipper/downloads/table_manager.go @@ -293,6 +293,7 @@ func (tm *tableManager) ensureQueryReadiness(ctx context.Context) error { return nil } + tm.indexStorageClient.RefreshIndexTableNamesCache(ctx) tables, err := tm.indexStorageClient.ListTables(ctx) if err != nil { return err diff --git a/pkg/storage/stores/indexshipper/downloads/table_manager_test.go b/pkg/storage/stores/indexshipper/downloads/table_manager_test.go index f530a7c5cf..1395369de2 100644 --- a/pkg/storage/stores/indexshipper/downloads/table_manager_test.go +++ b/pkg/storage/stores/indexshipper/downloads/table_manager_test.go @@ -465,6 +465,8 @@ func (m *mockIndexStorageClient) ListFiles(_ context.Context, tableName string, return []storage.IndexFile{}, m.userIndexesInTables[tableName], nil } +func (m *mockIndexStorageClient) RefreshIndexTableNamesCache(_ context.Context) {} + func buildTableNumber(idx int) int64 { return getActiveTableNumber() - int64(idx) } diff --git a/pkg/storage/stores/indexshipper/downloads/table_test.go b/pkg/storage/stores/indexshipper/downloads/table_test.go index 75993581d4..02ad12e310 100644 --- a/pkg/storage/stores/indexshipper/downloads/table_test.go +++ b/pkg/storage/stores/indexshipper/downloads/table_test.go @@ -329,7 +329,7 @@ func TestTable_Sync(t *testing.T) { require.NoError(t, os.WriteFile(filepath.Join(tablePathInStorage, newDB), []byte(newDB), 0755)) // sync the table - table.storageClient.RefreshIndexListCache(context.Background()) + table.storageClient.RefreshIndexTableCache(context.Background(), table.name) require.NoError(t, table.Sync(context.Background())) // check that table got the new index and dropped the deleted index @@ -362,7 +362,7 @@ func TestTable_Sync(t *testing.T) { // first, let us add a new file and refresh the index list cache oneMoreDB := "one-more-db" require.NoError(t, os.WriteFile(filepath.Join(tablePathInStorage, oneMoreDB), []byte(oneMoreDB), 0755)) - table.storageClient.RefreshIndexListCache(context.Background()) + table.storageClient.RefreshIndexTableCache(context.Background(), table.name) // now, without syncing the table, let us compact the index in storage compactedDBName := "compacted-db" diff --git a/pkg/storage/stores/indexshipper/storage/cached_client.go b/pkg/storage/stores/indexshipper/storage/cached_client.go index 2d3c839724..5aa559dad9 100644 --- a/pkg/storage/stores/indexshipper/storage/cached_client.go +++ b/pkg/storage/stores/indexshipper/storage/cached_client.go @@ -20,29 +20,46 @@ const ( ) type table struct { + name string + mtx sync.RWMutex commonObjects []client.StorageObject userIDs []client.StorageCommonPrefix userObjects map[string][]client.StorageObject + + cacheBuiltAt time.Time + buildCacheChan chan struct{} + buildCacheWg sync.WaitGroup + err error +} + +func newTable(tableName string) *table { + return &table{ + name: tableName, + buildCacheChan: make(chan struct{}, 1), + userIDs: []client.StorageCommonPrefix{}, + userObjects: map[string][]client.StorageObject{}, + commonObjects: []client.StorageObject{}, + } } type cachedObjectClient struct { client.ObjectClient - tables map[string]*table - tableNames []client.StorageCommonPrefix - tablesMtx sync.RWMutex - cacheBuiltAt time.Time + tables map[string]*table + tableNames []client.StorageCommonPrefix + tablesMtx sync.RWMutex + tableNamesCacheBuiltAt time.Time - buildCacheChan chan struct{} - buildCacheWg sync.WaitGroup - err error + buildTableNamesCacheChan chan struct{} + buildTableNamesCacheWg sync.WaitGroup + err error } func newCachedObjectClient(downstreamClient client.ObjectClient) *cachedObjectClient { return &cachedObjectClient{ - ObjectClient: downstreamClient, - tables: map[string]*table{}, - buildCacheChan: make(chan struct{}, 1), + ObjectClient: downstreamClient, + tables: map[string]*table{}, + buildTableNamesCacheChan: make(chan struct{}, 1), } } @@ -50,27 +67,47 @@ func newCachedObjectClient(downstreamClient client.ObjectClient) *cachedObjectCl // We have a buffered channel here with a capacity of 1 to make sure only one concurrent call makes it through. // We also have a sync.WaitGroup to make sure all the concurrent calls to buildCacheOnce wait until the cache gets rebuilt since // we are doing read-through cache, and we do not want to serve stale results. -func (c *cachedObjectClient) buildCacheOnce(ctx context.Context, forceRefresh bool) { - c.buildCacheWg.Add(1) - defer c.buildCacheWg.Done() +func buildCacheOnce(buildCacheWg *sync.WaitGroup, buildCacheChan chan struct{}, buildCacheFunc func()) { + buildCacheWg.Add(1) + defer buildCacheWg.Done() // when the cache is expired, only one concurrent call must be able to rebuild it // all other calls will wait until the cache is built successfully or failed with an error select { - case c.buildCacheChan <- struct{}{}: - c.err = nil - c.err = c.buildCache(ctx, forceRefresh) - <-c.buildCacheChan - if c.err != nil { - level.Error(util_log.Logger).Log("msg", "failed to build cache", "err", c.err) - } + case buildCacheChan <- struct{}{}: + buildCacheFunc() + <-buildCacheChan default: } } -func (c *cachedObjectClient) RefreshIndexListCache(ctx context.Context) { - c.buildCacheOnce(ctx, true) - c.buildCacheWg.Wait() +func (c *cachedObjectClient) RefreshIndexTableNamesCache(ctx context.Context) { + buildCacheOnce(&c.buildTableNamesCacheWg, c.buildTableNamesCacheChan, func() { + c.err = nil + c.err = c.buildTableNamesCache(ctx, true) + }) + c.buildTableNamesCacheWg.Wait() +} + +func (c *cachedObjectClient) RefreshIndexTableCache(ctx context.Context, tableName string) { + tbl := c.getTable(tableName) + // if we did not find the table in the cache, let us force refresh the table names cache to see if we can find it. + // It would be rare that a non-existent table name is being referred. + // Should happen only when a table got deleted by compactor due to retention policy or user issued delete requests. + if tbl == nil { + c.RefreshIndexTableNamesCache(ctx) + tbl = c.getTable(tableName) + // still can't find the table, let us return + if tbl == nil { + return + } + } + + buildCacheOnce(&tbl.buildCacheWg, tbl.buildCacheChan, func() { + tbl.err = nil + tbl.err = tbl.buildCache(ctx, c.ObjectClient, true) + }) + tbl.buildCacheWg.Wait() } func (c *cachedObjectClient) List(ctx context.Context, prefix, objectDelimiter string, bypassCache bool) ([]client.StorageObject, []client.StorageCommonPrefix, error) { @@ -78,70 +115,121 @@ func (c *cachedObjectClient) List(ctx context.Context, prefix, objectDelimiter s return c.ObjectClient.List(ctx, prefix, objectDelimiter) } + // if we have never built table names cache, let us build it first. + if c.tableNamesCacheBuiltAt.IsZero() { + c.RefreshIndexTableNamesCache(ctx) + } + prefix = strings.TrimSuffix(prefix, delimiter) ss := strings.Split(prefix, delimiter) if len(ss) > 2 { return nil, nil, fmt.Errorf("invalid prefix %s", prefix) } - if time.Since(c.cacheBuiltAt) >= cacheTimeout { - c.buildCacheOnce(ctx, false) + // list of tables were requested + if prefix == "" { + tableNames, err := c.listTableNames(ctx) + return []client.StorageObject{}, tableNames, err + } + + // common objects and list of users having objects in a table were requested + if len(ss) == 1 { + tableName := ss[0] + return c.listTable(ctx, tableName) + } + + // user objects in a table were requested + tableName := ss[0] + userID := ss[1] + + userObjects, err := c.listUserIndexInTable(ctx, tableName, userID) + return userObjects, []client.StorageCommonPrefix{}, err +} + +func (c *cachedObjectClient) listTableNames(ctx context.Context) ([]client.StorageCommonPrefix, error) { + if time.Since(c.tableNamesCacheBuiltAt) >= cacheTimeout { + buildCacheOnce(&c.buildTableNamesCacheWg, c.buildTableNamesCacheChan, func() { + c.err = nil + c.err = c.buildTableNamesCache(ctx, false) + }) } // wait for cache build operation to finish, if running - c.buildCacheWg.Wait() + c.buildTableNamesCacheWg.Wait() if c.err != nil { - return nil, nil, c.err + return nil, c.err } c.tablesMtx.RLock() defer c.tablesMtx.RUnlock() - // list of tables were requested - if prefix == "" { - return []client.StorageObject{}, c.tableNames, nil + return c.tableNames, nil +} + +func (c *cachedObjectClient) listTable(ctx context.Context, tableName string) ([]client.StorageObject, []client.StorageCommonPrefix, error) { + tbl := c.getTable(tableName) + if tbl == nil { + return []client.StorageObject{}, []client.StorageCommonPrefix{}, nil } - // common objects and list of users having objects in a table were requested - if len(ss) == 1 { - tableName := ss[0] - if table, ok := c.tables[tableName]; ok { - return table.commonObjects, table.userIDs, nil - } + if time.Since(tbl.cacheBuiltAt) >= cacheTimeout { + buildCacheOnce(&tbl.buildCacheWg, tbl.buildCacheChan, func() { + tbl.err = nil + tbl.err = tbl.buildCache(ctx, c.ObjectClient, false) + }) + } - return []client.StorageObject{}, []client.StorageCommonPrefix{}, nil + // wait for cache build operation to finish, if running + tbl.buildCacheWg.Wait() + + if tbl.err != nil { + return nil, nil, tbl.err } - // user objects in a table were requested - tableName := ss[0] - table, ok := c.tables[tableName] - if !ok { - return []client.StorageObject{}, []client.StorageCommonPrefix{}, nil + return tbl.commonObjects, tbl.userIDs, nil +} + +func (c *cachedObjectClient) listUserIndexInTable(ctx context.Context, tableName, userID string) ([]client.StorageObject, error) { + tbl := c.getTable(tableName) + if tbl == nil { + return []client.StorageObject{}, nil } - userID := ss[1] - if objects, ok := table.userObjects[userID]; ok { - return objects, []client.StorageCommonPrefix{}, nil + if time.Since(tbl.cacheBuiltAt) >= cacheTimeout { + buildCacheOnce(&tbl.buildCacheWg, tbl.buildCacheChan, func() { + tbl.err = nil + tbl.err = tbl.buildCache(ctx, c.ObjectClient, false) + }) + } + + // wait for cache build operation to finish, if running + tbl.buildCacheWg.Wait() + + if tbl.err != nil { + return nil, c.err + } + + if objects, ok := tbl.userObjects[userID]; ok { + return objects, nil } - return []client.StorageObject{}, []client.StorageCommonPrefix{}, nil + return []client.StorageObject{}, nil } -// buildCache builds the cache if expired -func (c *cachedObjectClient) buildCache(ctx context.Context, forceRefresh bool) error { - if !forceRefresh && time.Since(c.cacheBuiltAt) < cacheTimeout { +func (c *cachedObjectClient) buildTableNamesCache(ctx context.Context, forceRefresh bool) error { + if !forceRefresh && time.Since(c.tableNamesCacheBuiltAt) < cacheTimeout { return nil } logger := spanlogger.FromContextWithFallback(ctx, util_log.Logger) - level.Info(logger).Log("msg", "building index list cache") + level.Info(logger).Log("msg", "building table names cache") now := time.Now() defer func() { - level.Info(logger).Log("msg", "index list cache built", "duration", time.Since(now)) + level.Info(logger).Log("msg", "table names cache built", "duration", time.Since(now)) }() - objects, _, err := c.ObjectClient.List(ctx, "", "") + _, tableNames, err := c.ObjectClient.List(ctx, "", delimiter) if err != nil { return err } @@ -149,8 +237,62 @@ func (c *cachedObjectClient) buildCache(ctx context.Context, forceRefresh bool) c.tablesMtx.Lock() defer c.tablesMtx.Unlock() - c.tables = map[string]*table{} - c.tableNames = []client.StorageCommonPrefix{} + tableNamesMap := make(map[string]struct{}, len(tableNames)) + tableNamesNormalized := make([]client.StorageCommonPrefix, len(tableNames)) + for i := range tableNames { + tableName := strings.TrimSuffix(string(tableNames[i]), delimiter) + tableNamesMap[tableName] = struct{}{} + tableNamesNormalized[i] = client.StorageCommonPrefix(tableName) + if _, ok := c.tables[tableName]; ok { + continue + } + + c.tables[tableName] = newTable(tableName) + } + + for tableName := range c.tables { + if _, ok := tableNamesMap[tableName]; ok { + continue + } + + delete(c.tables, tableName) + } + + c.tableNames = tableNamesNormalized + c.tableNamesCacheBuiltAt = time.Now() + return nil +} + +func (c *cachedObjectClient) getTable(tableName string) *table { + c.tablesMtx.RLock() + defer c.tablesMtx.RUnlock() + + return c.tables[tableName] +} + +func (t *table) buildCache(ctx context.Context, objectClient client.ObjectClient, forceRefresh bool) error { + if !forceRefresh && time.Since(t.cacheBuiltAt) < cacheTimeout { + return nil + } + + logger := spanlogger.FromContextWithFallback(ctx, util_log.Logger) + level.Info(logger).Log("msg", "building table cache") + now := time.Now() + defer func() { + level.Info(logger).Log("msg", "table cache built", "duration", time.Since(now)) + }() + + objects, _, err := objectClient.List(ctx, t.name+delimiter, "") + if err != nil { + return err + } + + t.mtx.Lock() + defer t.mtx.Unlock() + + t.commonObjects = t.commonObjects[:0] + t.userObjects = map[string][]client.StorageObject{} + t.userIDs = t.userIDs[:0] for _, object := range objects { // The s3 client can also return the directory itself in the ListObjects. @@ -163,29 +305,17 @@ func (c *cachedObjectClient) buildCache(ctx context.Context, forceRefresh bool) return fmt.Errorf("invalid key: %s", object.Key) } - tableName := ss[0] - tbl, ok := c.tables[tableName] - if !ok { - tbl = &table{ - commonObjects: []client.StorageObject{}, - userObjects: map[string][]client.StorageObject{}, - userIDs: []client.StorageCommonPrefix{}, - } - c.tables[tableName] = tbl - c.tableNames = append(c.tableNames, client.StorageCommonPrefix(tableName)) - } - if len(ss) == 2 { - tbl.commonObjects = append(tbl.commonObjects, object) + t.commonObjects = append(t.commonObjects, object) } else { userID := ss[1] - if len(tbl.userObjects[userID]) == 0 { - tbl.userIDs = append(tbl.userIDs, client.StorageCommonPrefix(path.Join(tableName, userID))) + if len(t.userObjects[userID]) == 0 { + t.userIDs = append(t.userIDs, client.StorageCommonPrefix(path.Join(t.name, userID))) } - tbl.userObjects[userID] = append(tbl.userObjects[userID], object) + t.userObjects[userID] = append(t.userObjects[userID], object) } } - c.cacheBuiltAt = time.Now() + t.cacheBuiltAt = time.Now() return nil } diff --git a/pkg/storage/stores/indexshipper/storage/cached_client_test.go b/pkg/storage/stores/indexshipper/storage/cached_client_test.go index f7835dd0cd..309256aa25 100644 --- a/pkg/storage/stores/indexshipper/storage/cached_client_test.go +++ b/pkg/storage/stores/indexshipper/storage/cached_client_test.go @@ -3,6 +3,8 @@ package storage import ( "context" "errors" + "os" + "path/filepath" "sync" "testing" "time" @@ -10,8 +12,12 @@ import ( "github.com/stretchr/testify/require" "github.com/grafana/loki/pkg/storage/chunk/client" + "github.com/grafana/loki/pkg/storage/chunk/client/local" + "github.com/grafana/loki/pkg/storage/chunk/client/util" ) +var objectsMtime = time.Now().Local() + type mockObjectClient struct { client.ObjectClient storageObjects []client.StorageObject @@ -20,20 +26,24 @@ type mockObjectClient struct { listDelay time.Duration } -func newMockObjectClient(objects []string) *mockObjectClient { - storageObjects := make([]client.StorageObject, 0, len(objects)) +func newMockObjectClient(t *testing.T, objects []string) *mockObjectClient { + tempDir := t.TempDir() for _, objectName := range objects { - storageObjects = append(storageObjects, client.StorageObject{ - Key: objectName, - }) + objectFullPath := filepath.Join(tempDir, objectName) + parentDir := filepath.Dir(objectFullPath) + require.NoError(t, util.EnsureDirectory(parentDir)) + require.NoError(t, os.WriteFile(objectFullPath, []byte("foo"), 0644)) + require.NoError(t, os.Chtimes(objectFullPath, objectsMtime, objectsMtime)) } + objectClient, err := local.NewFSObjectClient(local.FSConfig{Directory: tempDir}) + require.NoError(t, err) return &mockObjectClient{ - storageObjects: storageObjects, + ObjectClient: objectClient, } } -func (m *mockObjectClient) List(_ context.Context, _, _ string) ([]client.StorageObject, []client.StorageCommonPrefix, error) { +func (m *mockObjectClient) List(ctx context.Context, prefix, delimiter string) ([]client.StorageObject, []client.StorageCommonPrefix, error) { defer func() { time.Sleep(m.listDelay) m.listCallsCount++ @@ -43,7 +53,7 @@ func (m *mockObjectClient) List(_ context.Context, _, _ string) ([]client.Storag return nil, nil, m.errResp } - return m.storageObjects, []client.StorageCommonPrefix{}, nil + return m.ObjectClient.List(ctx, prefix, delimiter) } func TestCachedObjectClient(t *testing.T) { @@ -61,7 +71,7 @@ func TestCachedObjectClient(t *testing.T) { "table3/user1/db2.gz", } - objectClient := newMockObjectClient(objectsInStorage) + objectClient := newMockObjectClient(t, objectsInStorage) cachedObjectClient := newCachedObjectClient(objectClient) // list tables @@ -74,58 +84,59 @@ func TestCachedObjectClient(t *testing.T) { // list objects in all 3 tables objects, commonPrefixes, err = cachedObjectClient.List(context.Background(), "table1/", "", false) require.NoError(t, err) - require.Equal(t, 1, objectClient.listCallsCount) + require.Equal(t, 2, objectClient.listCallsCount) require.Equal(t, []client.StorageObject{ - {Key: "table1/db1.gz"}, - {Key: "table1/db2.gz"}, + {Key: "table1/db1.gz", ModifiedAt: objectsMtime}, + {Key: "table1/db2.gz", ModifiedAt: objectsMtime}, }, objects) require.Equal(t, []client.StorageCommonPrefix{}, commonPrefixes) objects, commonPrefixes, err = cachedObjectClient.List(context.Background(), "table2/", "", false) require.NoError(t, err) - require.Equal(t, 1, objectClient.listCallsCount) + require.Equal(t, 3, objectClient.listCallsCount) require.Equal(t, []client.StorageObject{ - {Key: "table2/db1.gz"}, + {Key: "table2/db1.gz", ModifiedAt: objectsMtime}, }, objects) require.Equal(t, []client.StorageCommonPrefix{"table2/user1"}, commonPrefixes) objects, commonPrefixes, err = cachedObjectClient.List(context.Background(), "table3/", "", false) require.NoError(t, err) - require.Equal(t, 1, objectClient.listCallsCount) + require.Equal(t, 4, objectClient.listCallsCount) require.Equal(t, []client.StorageObject{}, objects) require.Equal(t, []client.StorageCommonPrefix{"table3/user1"}, commonPrefixes) - // list user objects from table2 and table3 + // list user objects from table2 and table3, which should not make any new list calls objects, commonPrefixes, err = cachedObjectClient.List(context.Background(), "table2/user1/", "", false) require.NoError(t, err) - require.Equal(t, 1, objectClient.listCallsCount) + require.Equal(t, 4, objectClient.listCallsCount) require.Equal(t, []client.StorageObject{ { - Key: "table2/user1/db1.gz", + Key: "table2/user1/db1.gz", + ModifiedAt: objectsMtime, }, }, objects) require.Equal(t, []client.StorageCommonPrefix{}, commonPrefixes) objects, commonPrefixes, err = cachedObjectClient.List(context.Background(), "table3/user1/", "", false) require.NoError(t, err) - require.Equal(t, 1, objectClient.listCallsCount) + require.Equal(t, 4, objectClient.listCallsCount) require.Equal(t, []client.StorageObject{ - {Key: "table3/user1/db1.gz"}, - {Key: "table3/user1/db2.gz"}, + {Key: "table3/user1/db1.gz", ModifiedAt: objectsMtime}, + {Key: "table3/user1/db2.gz", ModifiedAt: objectsMtime}, }, objects) require.Equal(t, []client.StorageCommonPrefix{}, commonPrefixes) // list non-existent table objects, commonPrefixes, err = cachedObjectClient.List(context.Background(), "table4/", "", false) require.NoError(t, err) - require.Equal(t, 1, objectClient.listCallsCount) + require.Equal(t, 4, objectClient.listCallsCount) require.Equal(t, []client.StorageObject{}, objects) require.Equal(t, []client.StorageCommonPrefix{}, commonPrefixes) // list non-existent user objects, commonPrefixes, err = cachedObjectClient.List(context.Background(), "table3/user2/", "", false) require.NoError(t, err) - require.Equal(t, 1, objectClient.listCallsCount) + require.Equal(t, 4, objectClient.listCallsCount) require.Equal(t, []client.StorageObject{}, objects) require.Equal(t, []client.StorageCommonPrefix{}, commonPrefixes) } @@ -137,7 +148,7 @@ func TestCachedObjectClient_errors(t *testing.T) { "table1/db2.gz", } - objectClient := newMockObjectClient(objectsInStorage) + objectClient := newMockObjectClient(t, objectsInStorage) cachedObjectClient := newCachedObjectClient(objectClient) // do the initial listing @@ -150,7 +161,7 @@ func TestCachedObjectClient_errors(t *testing.T) { // timeout the cache and call List concurrently with objectClient throwing an error // objectClient must receive just one request and all the cachedObjectClient.List calls should get an error wg := sync.WaitGroup{} - cachedObjectClient.cacheBuiltAt = time.Now().Add(-(cacheTimeout + time.Second)) + cachedObjectClient.tableNamesCacheBuiltAt = time.Now().Add(-(cacheTimeout + time.Second)) objectClient.listDelay = time.Millisecond * 100 objectClient.errResp = errors.New("fake error") for i := 0; i < 5; i++ { diff --git a/pkg/storage/stores/indexshipper/storage/client.go b/pkg/storage/stores/indexshipper/storage/client.go index 1663891da4..fc6feab38a 100644 --- a/pkg/storage/stores/indexshipper/storage/client.go +++ b/pkg/storage/stores/indexshipper/storage/client.go @@ -33,8 +33,9 @@ type Client interface { CommonIndexClient UserIndexClient - RefreshIndexListCache(ctx context.Context) + RefreshIndexTableNamesCache(ctx context.Context) ListTables(ctx context.Context) ([]string, error) + RefreshIndexTableCache(ctx context.Context, tableName string) IsFileNotFoundErr(err error) bool Stop() } @@ -53,8 +54,12 @@ func NewIndexStorageClient(origObjectClient client.ObjectClient, storagePrefix s return &indexStorageClient{objectClient: objectClient} } -func (s *indexStorageClient) RefreshIndexListCache(ctx context.Context) { - s.objectClient.RefreshIndexListCache(ctx) +func (s *indexStorageClient) RefreshIndexTableNamesCache(ctx context.Context) { + s.objectClient.RefreshIndexTableNamesCache(ctx) +} + +func (s *indexStorageClient) RefreshIndexTableCache(ctx context.Context, tableName string) { + s.objectClient.RefreshIndexTableCache(ctx, tableName) } func (s *indexStorageClient) ListTables(ctx context.Context) ([]string, error) { diff --git a/pkg/storage/stores/indexshipper/storage/client_test.go b/pkg/storage/stores/indexshipper/storage/client_test.go index dd080e8e55..c9b7da78b3 100644 --- a/pkg/storage/stores/indexshipper/storage/client_test.go +++ b/pkg/storage/stores/indexshipper/storage/client_test.go @@ -37,7 +37,6 @@ func TestIndexStorageClient(t *testing.T) { indexStorageClient := NewIndexStorageClient(objectClient, storageKeyPrefix) verifyFiles := func() { - indexStorageClient.RefreshIndexListCache(context.Background()) tables, err := indexStorageClient.ListTables(context.Background()) require.NoError(t, err) require.Len(t, tables, len(tablesToSetup)) @@ -45,6 +44,7 @@ func TestIndexStorageClient(t *testing.T) { expectedFiles, ok := tablesToSetup[table] require.True(t, ok) + indexStorageClient.RefreshIndexTableCache(context.Background(), table) filesInStorage, _, err := indexStorageClient.ListFiles(context.Background(), table, false) require.NoError(t, err) require.Len(t, filesInStorage, len(expectedFiles)) diff --git a/pkg/storage/stores/indexshipper/storage/index_set.go b/pkg/storage/stores/indexshipper/storage/index_set.go index 2f05f88c1d..74762a7e3e 100644 --- a/pkg/storage/stores/indexshipper/storage/index_set.go +++ b/pkg/storage/stores/indexshipper/storage/index_set.go @@ -13,7 +13,7 @@ var ( // IndexSet provides storage operations for user or common index tables. type IndexSet interface { - RefreshIndexListCache(ctx context.Context) + RefreshIndexTableCache(ctx context.Context, tableName string) ListFiles(ctx context.Context, tableName, userID string, bypassCache bool) ([]IndexFile, error) GetFile(ctx context.Context, tableName, userID, fileName string) (io.ReadCloser, error) PutFile(ctx context.Context, tableName, userID, fileName string, file io.ReadSeeker) error @@ -45,8 +45,8 @@ func (i indexSet) validateUserID(userID string) error { return nil } -func (i indexSet) RefreshIndexListCache(ctx context.Context) { - i.client.RefreshIndexListCache(ctx) +func (i indexSet) RefreshIndexTableCache(ctx context.Context, tableName string) { + i.client.RefreshIndexTableCache(ctx, tableName) } func (i indexSet) ListFiles(ctx context.Context, tableName, userID string, bypassCache bool) ([]IndexFile, error) { diff --git a/pkg/storage/stores/indexshipper/table_client.go b/pkg/storage/stores/indexshipper/table_client.go index 26bd79db9e..068cdd1e36 100644 --- a/pkg/storage/stores/indexshipper/table_client.go +++ b/pkg/storage/stores/indexshipper/table_client.go @@ -20,7 +20,7 @@ func NewTableClient(objectClient client.ObjectClient, storageKeyPrefix string) i } func (b *tableClient) ListTables(ctx context.Context) ([]string, error) { - b.indexStorageClient.RefreshIndexListCache(ctx) + b.indexStorageClient.RefreshIndexTableNamesCache(ctx) return b.indexStorageClient.ListTables(ctx) }