diff --git a/pkg/storage/stores/indexshipper/compactor/compactor.go b/pkg/storage/stores/indexshipper/compactor/compactor.go index d0cf66db93..14691ded4e 100644 --- a/pkg/storage/stores/indexshipper/compactor/compactor.go +++ b/pkg/storage/stores/indexshipper/compactor/compactor.go @@ -631,7 +631,7 @@ func (c *Compactor) RunCompaction(ctx context.Context, applyRetention bool) erro var tables []string for _, sc := range c.storeContainers { // refresh index list cache since previous compaction would have changed the index files in the object store - sc.indexStorageClient.RefreshIndexListCache(ctx) + sc.indexStorageClient.RefreshIndexTableNamesCache(ctx) tbls, err := sc.indexStorageClient.ListTables(ctx) if err != nil { status = statusFailure diff --git a/pkg/storage/stores/indexshipper/compactor/table.go b/pkg/storage/stores/indexshipper/compactor/table.go index 3e4de7d840..95d6b3f31b 100644 --- a/pkg/storage/stores/indexshipper/compactor/table.go +++ b/pkg/storage/stores/indexshipper/compactor/table.go @@ -116,6 +116,7 @@ func newTable(ctx context.Context, workingDirectory string, indexStorageClient s } func (t *table) compact(applyRetention bool) error { + t.indexStorageClient.RefreshIndexTableCache(t.ctx, t.name) indexFiles, usersWithPerUserIndex, err := t.indexStorageClient.ListFiles(t.ctx, t.name, false) if err != nil { return err diff --git a/pkg/storage/stores/indexshipper/downloads/index_set.go b/pkg/storage/stores/indexshipper/downloads/index_set.go index 12b0287da1..9f1d6e9efd 100644 --- a/pkg/storage/stores/indexshipper/downloads/index_set.go +++ b/pkg/storage/stores/indexshipper/downloads/index_set.go @@ -279,7 +279,7 @@ func (t *indexSet) syncWithRetry(ctx context.Context, lock, bypassListCache bool if errors.Is(err, errIndexListCacheTooStale) && i < maxSyncRetries { level.Info(t.logger).Log("msg", "we have hit stale list cache, refreshing it before retrying") - t.baseIndexSet.RefreshIndexListCache(ctx) + t.baseIndexSet.RefreshIndexTableCache(ctx, t.tableName) } level.Error(t.logger).Log("msg", "sync failed, retrying it", "err", err) diff --git a/pkg/storage/stores/indexshipper/downloads/index_set_test.go b/pkg/storage/stores/indexshipper/downloads/index_set_test.go index 722b9ab1cb..660d0170e8 100644 --- a/pkg/storage/stores/indexshipper/downloads/index_set_test.go +++ b/pkg/storage/stores/indexshipper/downloads/index_set_test.go @@ -113,7 +113,7 @@ func TestIndexSet_Sync(t *testing.T) { indexesSetup = buildListOfExpectedIndexes("", 0, 10) // sync and verify the indexSet - indexSet.baseIndexSet.RefreshIndexListCache(context.Background()) + indexSet.baseIndexSet.RefreshIndexTableCache(context.Background(), tableName) require.NoError(t, indexSet.Sync(context.Background())) // check index set twice; first run to have new files to download, second run to test with no changes in storage. @@ -126,7 +126,7 @@ func TestIndexSet_Sync(t *testing.T) { indexesSetup = indexesSetup[1:] // sync and verify the indexSet - indexSet.baseIndexSet.RefreshIndexListCache(context.Background()) + indexSet.baseIndexSet.RefreshIndexTableCache(context.Background(), tableName) require.NoError(t, indexSet.Sync(context.Background())) checkIndexSet() @@ -135,7 +135,7 @@ func TestIndexSet_Sync(t *testing.T) { // first, let us add a new file and refresh the index list cache oneMoreDB := "one-more-db" require.NoError(t, os.WriteFile(filepath.Join(tablePathInStorage, oneMoreDB), []byte(oneMoreDB), 0755)) - indexSet.baseIndexSet.RefreshIndexListCache(context.Background()) + indexSet.baseIndexSet.RefreshIndexTableCache(context.Background(), tableName) // now, without syncing the indexset, let us compact the index in storage compactedDBName := "compacted-db" diff --git a/pkg/storage/stores/indexshipper/downloads/table_manager.go b/pkg/storage/stores/indexshipper/downloads/table_manager.go index 258a34399d..c2cb7bb6ef 100644 --- a/pkg/storage/stores/indexshipper/downloads/table_manager.go +++ b/pkg/storage/stores/indexshipper/downloads/table_manager.go @@ -293,6 +293,7 @@ func (tm *tableManager) ensureQueryReadiness(ctx context.Context) error { return nil } + tm.indexStorageClient.RefreshIndexTableNamesCache(ctx) tables, err := tm.indexStorageClient.ListTables(ctx) if err != nil { return err diff --git a/pkg/storage/stores/indexshipper/downloads/table_manager_test.go b/pkg/storage/stores/indexshipper/downloads/table_manager_test.go index f530a7c5cf..1395369de2 100644 --- a/pkg/storage/stores/indexshipper/downloads/table_manager_test.go +++ b/pkg/storage/stores/indexshipper/downloads/table_manager_test.go @@ -465,6 +465,8 @@ func (m *mockIndexStorageClient) ListFiles(_ context.Context, tableName string, return []storage.IndexFile{}, m.userIndexesInTables[tableName], nil } +func (m *mockIndexStorageClient) RefreshIndexTableNamesCache(_ context.Context) {} + func buildTableNumber(idx int) int64 { return getActiveTableNumber() - int64(idx) } diff --git a/pkg/storage/stores/indexshipper/downloads/table_test.go b/pkg/storage/stores/indexshipper/downloads/table_test.go index 75993581d4..02ad12e310 100644 --- a/pkg/storage/stores/indexshipper/downloads/table_test.go +++ b/pkg/storage/stores/indexshipper/downloads/table_test.go @@ -329,7 +329,7 @@ func TestTable_Sync(t *testing.T) { require.NoError(t, os.WriteFile(filepath.Join(tablePathInStorage, newDB), []byte(newDB), 0755)) // sync the table - table.storageClient.RefreshIndexListCache(context.Background()) + table.storageClient.RefreshIndexTableCache(context.Background(), table.name) require.NoError(t, table.Sync(context.Background())) // check that table got the new index and dropped the deleted index @@ -362,7 +362,7 @@ func TestTable_Sync(t *testing.T) { // first, let us add a new file and refresh the index list cache oneMoreDB := "one-more-db" require.NoError(t, os.WriteFile(filepath.Join(tablePathInStorage, oneMoreDB), []byte(oneMoreDB), 0755)) - table.storageClient.RefreshIndexListCache(context.Background()) + table.storageClient.RefreshIndexTableCache(context.Background(), table.name) // now, without syncing the table, let us compact the index in storage compactedDBName := "compacted-db" diff --git a/pkg/storage/stores/indexshipper/storage/cached_client.go b/pkg/storage/stores/indexshipper/storage/cached_client.go index 2d3c839724..5aa559dad9 100644 --- a/pkg/storage/stores/indexshipper/storage/cached_client.go +++ b/pkg/storage/stores/indexshipper/storage/cached_client.go @@ -20,29 +20,46 @@ const ( ) type table struct { + name string + mtx sync.RWMutex commonObjects []client.StorageObject userIDs []client.StorageCommonPrefix userObjects map[string][]client.StorageObject + + cacheBuiltAt time.Time + buildCacheChan chan struct{} + buildCacheWg sync.WaitGroup + err error +} + +func newTable(tableName string) *table { + return &table{ + name: tableName, + buildCacheChan: make(chan struct{}, 1), + userIDs: []client.StorageCommonPrefix{}, + userObjects: map[string][]client.StorageObject{}, + commonObjects: []client.StorageObject{}, + } } type cachedObjectClient struct { client.ObjectClient - tables map[string]*table - tableNames []client.StorageCommonPrefix - tablesMtx sync.RWMutex - cacheBuiltAt time.Time + tables map[string]*table + tableNames []client.StorageCommonPrefix + tablesMtx sync.RWMutex + tableNamesCacheBuiltAt time.Time - buildCacheChan chan struct{} - buildCacheWg sync.WaitGroup - err error + buildTableNamesCacheChan chan struct{} + buildTableNamesCacheWg sync.WaitGroup + err error } func newCachedObjectClient(downstreamClient client.ObjectClient) *cachedObjectClient { return &cachedObjectClient{ - ObjectClient: downstreamClient, - tables: map[string]*table{}, - buildCacheChan: make(chan struct{}, 1), + ObjectClient: downstreamClient, + tables: map[string]*table{}, + buildTableNamesCacheChan: make(chan struct{}, 1), } } @@ -50,27 +67,47 @@ func newCachedObjectClient(downstreamClient client.ObjectClient) *cachedObjectCl // We have a buffered channel here with a capacity of 1 to make sure only one concurrent call makes it through. // We also have a sync.WaitGroup to make sure all the concurrent calls to buildCacheOnce wait until the cache gets rebuilt since // we are doing read-through cache, and we do not want to serve stale results. -func (c *cachedObjectClient) buildCacheOnce(ctx context.Context, forceRefresh bool) { - c.buildCacheWg.Add(1) - defer c.buildCacheWg.Done() +func buildCacheOnce(buildCacheWg *sync.WaitGroup, buildCacheChan chan struct{}, buildCacheFunc func()) { + buildCacheWg.Add(1) + defer buildCacheWg.Done() // when the cache is expired, only one concurrent call must be able to rebuild it // all other calls will wait until the cache is built successfully or failed with an error select { - case c.buildCacheChan <- struct{}{}: - c.err = nil - c.err = c.buildCache(ctx, forceRefresh) - <-c.buildCacheChan - if c.err != nil { - level.Error(util_log.Logger).Log("msg", "failed to build cache", "err", c.err) - } + case buildCacheChan <- struct{}{}: + buildCacheFunc() + <-buildCacheChan default: } } -func (c *cachedObjectClient) RefreshIndexListCache(ctx context.Context) { - c.buildCacheOnce(ctx, true) - c.buildCacheWg.Wait() +func (c *cachedObjectClient) RefreshIndexTableNamesCache(ctx context.Context) { + buildCacheOnce(&c.buildTableNamesCacheWg, c.buildTableNamesCacheChan, func() { + c.err = nil + c.err = c.buildTableNamesCache(ctx, true) + }) + c.buildTableNamesCacheWg.Wait() +} + +func (c *cachedObjectClient) RefreshIndexTableCache(ctx context.Context, tableName string) { + tbl := c.getTable(tableName) + // if we did not find the table in the cache, let us force refresh the table names cache to see if we can find it. + // It would be rare that a non-existent table name is being referred. + // Should happen only when a table got deleted by compactor due to retention policy or user issued delete requests. + if tbl == nil { + c.RefreshIndexTableNamesCache(ctx) + tbl = c.getTable(tableName) + // still can't find the table, let us return + if tbl == nil { + return + } + } + + buildCacheOnce(&tbl.buildCacheWg, tbl.buildCacheChan, func() { + tbl.err = nil + tbl.err = tbl.buildCache(ctx, c.ObjectClient, true) + }) + tbl.buildCacheWg.Wait() } func (c *cachedObjectClient) List(ctx context.Context, prefix, objectDelimiter string, bypassCache bool) ([]client.StorageObject, []client.StorageCommonPrefix, error) { @@ -78,70 +115,121 @@ func (c *cachedObjectClient) List(ctx context.Context, prefix, objectDelimiter s return c.ObjectClient.List(ctx, prefix, objectDelimiter) } + // if we have never built table names cache, let us build it first. + if c.tableNamesCacheBuiltAt.IsZero() { + c.RefreshIndexTableNamesCache(ctx) + } + prefix = strings.TrimSuffix(prefix, delimiter) ss := strings.Split(prefix, delimiter) if len(ss) > 2 { return nil, nil, fmt.Errorf("invalid prefix %s", prefix) } - if time.Since(c.cacheBuiltAt) >= cacheTimeout { - c.buildCacheOnce(ctx, false) + // list of tables were requested + if prefix == "" { + tableNames, err := c.listTableNames(ctx) + return []client.StorageObject{}, tableNames, err + } + + // common objects and list of users having objects in a table were requested + if len(ss) == 1 { + tableName := ss[0] + return c.listTable(ctx, tableName) + } + + // user objects in a table were requested + tableName := ss[0] + userID := ss[1] + + userObjects, err := c.listUserIndexInTable(ctx, tableName, userID) + return userObjects, []client.StorageCommonPrefix{}, err +} + +func (c *cachedObjectClient) listTableNames(ctx context.Context) ([]client.StorageCommonPrefix, error) { + if time.Since(c.tableNamesCacheBuiltAt) >= cacheTimeout { + buildCacheOnce(&c.buildTableNamesCacheWg, c.buildTableNamesCacheChan, func() { + c.err = nil + c.err = c.buildTableNamesCache(ctx, false) + }) } // wait for cache build operation to finish, if running - c.buildCacheWg.Wait() + c.buildTableNamesCacheWg.Wait() if c.err != nil { - return nil, nil, c.err + return nil, c.err } c.tablesMtx.RLock() defer c.tablesMtx.RUnlock() - // list of tables were requested - if prefix == "" { - return []client.StorageObject{}, c.tableNames, nil + return c.tableNames, nil +} + +func (c *cachedObjectClient) listTable(ctx context.Context, tableName string) ([]client.StorageObject, []client.StorageCommonPrefix, error) { + tbl := c.getTable(tableName) + if tbl == nil { + return []client.StorageObject{}, []client.StorageCommonPrefix{}, nil } - // common objects and list of users having objects in a table were requested - if len(ss) == 1 { - tableName := ss[0] - if table, ok := c.tables[tableName]; ok { - return table.commonObjects, table.userIDs, nil - } + if time.Since(tbl.cacheBuiltAt) >= cacheTimeout { + buildCacheOnce(&tbl.buildCacheWg, tbl.buildCacheChan, func() { + tbl.err = nil + tbl.err = tbl.buildCache(ctx, c.ObjectClient, false) + }) + } - return []client.StorageObject{}, []client.StorageCommonPrefix{}, nil + // wait for cache build operation to finish, if running + tbl.buildCacheWg.Wait() + + if tbl.err != nil { + return nil, nil, tbl.err } - // user objects in a table were requested - tableName := ss[0] - table, ok := c.tables[tableName] - if !ok { - return []client.StorageObject{}, []client.StorageCommonPrefix{}, nil + return tbl.commonObjects, tbl.userIDs, nil +} + +func (c *cachedObjectClient) listUserIndexInTable(ctx context.Context, tableName, userID string) ([]client.StorageObject, error) { + tbl := c.getTable(tableName) + if tbl == nil { + return []client.StorageObject{}, nil } - userID := ss[1] - if objects, ok := table.userObjects[userID]; ok { - return objects, []client.StorageCommonPrefix{}, nil + if time.Since(tbl.cacheBuiltAt) >= cacheTimeout { + buildCacheOnce(&tbl.buildCacheWg, tbl.buildCacheChan, func() { + tbl.err = nil + tbl.err = tbl.buildCache(ctx, c.ObjectClient, false) + }) + } + + // wait for cache build operation to finish, if running + tbl.buildCacheWg.Wait() + + if tbl.err != nil { + return nil, c.err + } + + if objects, ok := tbl.userObjects[userID]; ok { + return objects, nil } - return []client.StorageObject{}, []client.StorageCommonPrefix{}, nil + return []client.StorageObject{}, nil } -// buildCache builds the cache if expired -func (c *cachedObjectClient) buildCache(ctx context.Context, forceRefresh bool) error { - if !forceRefresh && time.Since(c.cacheBuiltAt) < cacheTimeout { +func (c *cachedObjectClient) buildTableNamesCache(ctx context.Context, forceRefresh bool) error { + if !forceRefresh && time.Since(c.tableNamesCacheBuiltAt) < cacheTimeout { return nil } logger := spanlogger.FromContextWithFallback(ctx, util_log.Logger) - level.Info(logger).Log("msg", "building index list cache") + level.Info(logger).Log("msg", "building table names cache") now := time.Now() defer func() { - level.Info(logger).Log("msg", "index list cache built", "duration", time.Since(now)) + level.Info(logger).Log("msg", "table names cache built", "duration", time.Since(now)) }() - objects, _, err := c.ObjectClient.List(ctx, "", "") + _, tableNames, err := c.ObjectClient.List(ctx, "", delimiter) if err != nil { return err } @@ -149,8 +237,62 @@ func (c *cachedObjectClient) buildCache(ctx context.Context, forceRefresh bool) c.tablesMtx.Lock() defer c.tablesMtx.Unlock() - c.tables = map[string]*table{} - c.tableNames = []client.StorageCommonPrefix{} + tableNamesMap := make(map[string]struct{}, len(tableNames)) + tableNamesNormalized := make([]client.StorageCommonPrefix, len(tableNames)) + for i := range tableNames { + tableName := strings.TrimSuffix(string(tableNames[i]), delimiter) + tableNamesMap[tableName] = struct{}{} + tableNamesNormalized[i] = client.StorageCommonPrefix(tableName) + if _, ok := c.tables[tableName]; ok { + continue + } + + c.tables[tableName] = newTable(tableName) + } + + for tableName := range c.tables { + if _, ok := tableNamesMap[tableName]; ok { + continue + } + + delete(c.tables, tableName) + } + + c.tableNames = tableNamesNormalized + c.tableNamesCacheBuiltAt = time.Now() + return nil +} + +func (c *cachedObjectClient) getTable(tableName string) *table { + c.tablesMtx.RLock() + defer c.tablesMtx.RUnlock() + + return c.tables[tableName] +} + +func (t *table) buildCache(ctx context.Context, objectClient client.ObjectClient, forceRefresh bool) error { + if !forceRefresh && time.Since(t.cacheBuiltAt) < cacheTimeout { + return nil + } + + logger := spanlogger.FromContextWithFallback(ctx, util_log.Logger) + level.Info(logger).Log("msg", "building table cache") + now := time.Now() + defer func() { + level.Info(logger).Log("msg", "table cache built", "duration", time.Since(now)) + }() + + objects, _, err := objectClient.List(ctx, t.name+delimiter, "") + if err != nil { + return err + } + + t.mtx.Lock() + defer t.mtx.Unlock() + + t.commonObjects = t.commonObjects[:0] + t.userObjects = map[string][]client.StorageObject{} + t.userIDs = t.userIDs[:0] for _, object := range objects { // The s3 client can also return the directory itself in the ListObjects. @@ -163,29 +305,17 @@ func (c *cachedObjectClient) buildCache(ctx context.Context, forceRefresh bool) return fmt.Errorf("invalid key: %s", object.Key) } - tableName := ss[0] - tbl, ok := c.tables[tableName] - if !ok { - tbl = &table{ - commonObjects: []client.StorageObject{}, - userObjects: map[string][]client.StorageObject{}, - userIDs: []client.StorageCommonPrefix{}, - } - c.tables[tableName] = tbl - c.tableNames = append(c.tableNames, client.StorageCommonPrefix(tableName)) - } - if len(ss) == 2 { - tbl.commonObjects = append(tbl.commonObjects, object) + t.commonObjects = append(t.commonObjects, object) } else { userID := ss[1] - if len(tbl.userObjects[userID]) == 0 { - tbl.userIDs = append(tbl.userIDs, client.StorageCommonPrefix(path.Join(tableName, userID))) + if len(t.userObjects[userID]) == 0 { + t.userIDs = append(t.userIDs, client.StorageCommonPrefix(path.Join(t.name, userID))) } - tbl.userObjects[userID] = append(tbl.userObjects[userID], object) + t.userObjects[userID] = append(t.userObjects[userID], object) } } - c.cacheBuiltAt = time.Now() + t.cacheBuiltAt = time.Now() return nil } diff --git a/pkg/storage/stores/indexshipper/storage/cached_client_test.go b/pkg/storage/stores/indexshipper/storage/cached_client_test.go index f7835dd0cd..309256aa25 100644 --- a/pkg/storage/stores/indexshipper/storage/cached_client_test.go +++ b/pkg/storage/stores/indexshipper/storage/cached_client_test.go @@ -3,6 +3,8 @@ package storage import ( "context" "errors" + "os" + "path/filepath" "sync" "testing" "time" @@ -10,8 +12,12 @@ import ( "github.com/stretchr/testify/require" "github.com/grafana/loki/pkg/storage/chunk/client" + "github.com/grafana/loki/pkg/storage/chunk/client/local" + "github.com/grafana/loki/pkg/storage/chunk/client/util" ) +var objectsMtime = time.Now().Local() + type mockObjectClient struct { client.ObjectClient storageObjects []client.StorageObject @@ -20,20 +26,24 @@ type mockObjectClient struct { listDelay time.Duration } -func newMockObjectClient(objects []string) *mockObjectClient { - storageObjects := make([]client.StorageObject, 0, len(objects)) +func newMockObjectClient(t *testing.T, objects []string) *mockObjectClient { + tempDir := t.TempDir() for _, objectName := range objects { - storageObjects = append(storageObjects, client.StorageObject{ - Key: objectName, - }) + objectFullPath := filepath.Join(tempDir, objectName) + parentDir := filepath.Dir(objectFullPath) + require.NoError(t, util.EnsureDirectory(parentDir)) + require.NoError(t, os.WriteFile(objectFullPath, []byte("foo"), 0644)) + require.NoError(t, os.Chtimes(objectFullPath, objectsMtime, objectsMtime)) } + objectClient, err := local.NewFSObjectClient(local.FSConfig{Directory: tempDir}) + require.NoError(t, err) return &mockObjectClient{ - storageObjects: storageObjects, + ObjectClient: objectClient, } } -func (m *mockObjectClient) List(_ context.Context, _, _ string) ([]client.StorageObject, []client.StorageCommonPrefix, error) { +func (m *mockObjectClient) List(ctx context.Context, prefix, delimiter string) ([]client.StorageObject, []client.StorageCommonPrefix, error) { defer func() { time.Sleep(m.listDelay) m.listCallsCount++ @@ -43,7 +53,7 @@ func (m *mockObjectClient) List(_ context.Context, _, _ string) ([]client.Storag return nil, nil, m.errResp } - return m.storageObjects, []client.StorageCommonPrefix{}, nil + return m.ObjectClient.List(ctx, prefix, delimiter) } func TestCachedObjectClient(t *testing.T) { @@ -61,7 +71,7 @@ func TestCachedObjectClient(t *testing.T) { "table3/user1/db2.gz", } - objectClient := newMockObjectClient(objectsInStorage) + objectClient := newMockObjectClient(t, objectsInStorage) cachedObjectClient := newCachedObjectClient(objectClient) // list tables @@ -74,58 +84,59 @@ func TestCachedObjectClient(t *testing.T) { // list objects in all 3 tables objects, commonPrefixes, err = cachedObjectClient.List(context.Background(), "table1/", "", false) require.NoError(t, err) - require.Equal(t, 1, objectClient.listCallsCount) + require.Equal(t, 2, objectClient.listCallsCount) require.Equal(t, []client.StorageObject{ - {Key: "table1/db1.gz"}, - {Key: "table1/db2.gz"}, + {Key: "table1/db1.gz", ModifiedAt: objectsMtime}, + {Key: "table1/db2.gz", ModifiedAt: objectsMtime}, }, objects) require.Equal(t, []client.StorageCommonPrefix{}, commonPrefixes) objects, commonPrefixes, err = cachedObjectClient.List(context.Background(), "table2/", "", false) require.NoError(t, err) - require.Equal(t, 1, objectClient.listCallsCount) + require.Equal(t, 3, objectClient.listCallsCount) require.Equal(t, []client.StorageObject{ - {Key: "table2/db1.gz"}, + {Key: "table2/db1.gz", ModifiedAt: objectsMtime}, }, objects) require.Equal(t, []client.StorageCommonPrefix{"table2/user1"}, commonPrefixes) objects, commonPrefixes, err = cachedObjectClient.List(context.Background(), "table3/", "", false) require.NoError(t, err) - require.Equal(t, 1, objectClient.listCallsCount) + require.Equal(t, 4, objectClient.listCallsCount) require.Equal(t, []client.StorageObject{}, objects) require.Equal(t, []client.StorageCommonPrefix{"table3/user1"}, commonPrefixes) - // list user objects from table2 and table3 + // list user objects from table2 and table3, which should not make any new list calls objects, commonPrefixes, err = cachedObjectClient.List(context.Background(), "table2/user1/", "", false) require.NoError(t, err) - require.Equal(t, 1, objectClient.listCallsCount) + require.Equal(t, 4, objectClient.listCallsCount) require.Equal(t, []client.StorageObject{ { - Key: "table2/user1/db1.gz", + Key: "table2/user1/db1.gz", + ModifiedAt: objectsMtime, }, }, objects) require.Equal(t, []client.StorageCommonPrefix{}, commonPrefixes) objects, commonPrefixes, err = cachedObjectClient.List(context.Background(), "table3/user1/", "", false) require.NoError(t, err) - require.Equal(t, 1, objectClient.listCallsCount) + require.Equal(t, 4, objectClient.listCallsCount) require.Equal(t, []client.StorageObject{ - {Key: "table3/user1/db1.gz"}, - {Key: "table3/user1/db2.gz"}, + {Key: "table3/user1/db1.gz", ModifiedAt: objectsMtime}, + {Key: "table3/user1/db2.gz", ModifiedAt: objectsMtime}, }, objects) require.Equal(t, []client.StorageCommonPrefix{}, commonPrefixes) // list non-existent table objects, commonPrefixes, err = cachedObjectClient.List(context.Background(), "table4/", "", false) require.NoError(t, err) - require.Equal(t, 1, objectClient.listCallsCount) + require.Equal(t, 4, objectClient.listCallsCount) require.Equal(t, []client.StorageObject{}, objects) require.Equal(t, []client.StorageCommonPrefix{}, commonPrefixes) // list non-existent user objects, commonPrefixes, err = cachedObjectClient.List(context.Background(), "table3/user2/", "", false) require.NoError(t, err) - require.Equal(t, 1, objectClient.listCallsCount) + require.Equal(t, 4, objectClient.listCallsCount) require.Equal(t, []client.StorageObject{}, objects) require.Equal(t, []client.StorageCommonPrefix{}, commonPrefixes) } @@ -137,7 +148,7 @@ func TestCachedObjectClient_errors(t *testing.T) { "table1/db2.gz", } - objectClient := newMockObjectClient(objectsInStorage) + objectClient := newMockObjectClient(t, objectsInStorage) cachedObjectClient := newCachedObjectClient(objectClient) // do the initial listing @@ -150,7 +161,7 @@ func TestCachedObjectClient_errors(t *testing.T) { // timeout the cache and call List concurrently with objectClient throwing an error // objectClient must receive just one request and all the cachedObjectClient.List calls should get an error wg := sync.WaitGroup{} - cachedObjectClient.cacheBuiltAt = time.Now().Add(-(cacheTimeout + time.Second)) + cachedObjectClient.tableNamesCacheBuiltAt = time.Now().Add(-(cacheTimeout + time.Second)) objectClient.listDelay = time.Millisecond * 100 objectClient.errResp = errors.New("fake error") for i := 0; i < 5; i++ { diff --git a/pkg/storage/stores/indexshipper/storage/client.go b/pkg/storage/stores/indexshipper/storage/client.go index 1663891da4..fc6feab38a 100644 --- a/pkg/storage/stores/indexshipper/storage/client.go +++ b/pkg/storage/stores/indexshipper/storage/client.go @@ -33,8 +33,9 @@ type Client interface { CommonIndexClient UserIndexClient - RefreshIndexListCache(ctx context.Context) + RefreshIndexTableNamesCache(ctx context.Context) ListTables(ctx context.Context) ([]string, error) + RefreshIndexTableCache(ctx context.Context, tableName string) IsFileNotFoundErr(err error) bool Stop() } @@ -53,8 +54,12 @@ func NewIndexStorageClient(origObjectClient client.ObjectClient, storagePrefix s return &indexStorageClient{objectClient: objectClient} } -func (s *indexStorageClient) RefreshIndexListCache(ctx context.Context) { - s.objectClient.RefreshIndexListCache(ctx) +func (s *indexStorageClient) RefreshIndexTableNamesCache(ctx context.Context) { + s.objectClient.RefreshIndexTableNamesCache(ctx) +} + +func (s *indexStorageClient) RefreshIndexTableCache(ctx context.Context, tableName string) { + s.objectClient.RefreshIndexTableCache(ctx, tableName) } func (s *indexStorageClient) ListTables(ctx context.Context) ([]string, error) { diff --git a/pkg/storage/stores/indexshipper/storage/client_test.go b/pkg/storage/stores/indexshipper/storage/client_test.go index dd080e8e55..c9b7da78b3 100644 --- a/pkg/storage/stores/indexshipper/storage/client_test.go +++ b/pkg/storage/stores/indexshipper/storage/client_test.go @@ -37,7 +37,6 @@ func TestIndexStorageClient(t *testing.T) { indexStorageClient := NewIndexStorageClient(objectClient, storageKeyPrefix) verifyFiles := func() { - indexStorageClient.RefreshIndexListCache(context.Background()) tables, err := indexStorageClient.ListTables(context.Background()) require.NoError(t, err) require.Len(t, tables, len(tablesToSetup)) @@ -45,6 +44,7 @@ func TestIndexStorageClient(t *testing.T) { expectedFiles, ok := tablesToSetup[table] require.True(t, ok) + indexStorageClient.RefreshIndexTableCache(context.Background(), table) filesInStorage, _, err := indexStorageClient.ListFiles(context.Background(), table, false) require.NoError(t, err) require.Len(t, filesInStorage, len(expectedFiles)) diff --git a/pkg/storage/stores/indexshipper/storage/index_set.go b/pkg/storage/stores/indexshipper/storage/index_set.go index 2f05f88c1d..74762a7e3e 100644 --- a/pkg/storage/stores/indexshipper/storage/index_set.go +++ b/pkg/storage/stores/indexshipper/storage/index_set.go @@ -13,7 +13,7 @@ var ( // IndexSet provides storage operations for user or common index tables. type IndexSet interface { - RefreshIndexListCache(ctx context.Context) + RefreshIndexTableCache(ctx context.Context, tableName string) ListFiles(ctx context.Context, tableName, userID string, bypassCache bool) ([]IndexFile, error) GetFile(ctx context.Context, tableName, userID, fileName string) (io.ReadCloser, error) PutFile(ctx context.Context, tableName, userID, fileName string, file io.ReadSeeker) error @@ -45,8 +45,8 @@ func (i indexSet) validateUserID(userID string) error { return nil } -func (i indexSet) RefreshIndexListCache(ctx context.Context) { - i.client.RefreshIndexListCache(ctx) +func (i indexSet) RefreshIndexTableCache(ctx context.Context, tableName string) { + i.client.RefreshIndexTableCache(ctx, tableName) } func (i indexSet) ListFiles(ctx context.Context, tableName, userID string, bypassCache bool) ([]IndexFile, error) { diff --git a/pkg/storage/stores/indexshipper/table_client.go b/pkg/storage/stores/indexshipper/table_client.go index 26bd79db9e..068cdd1e36 100644 --- a/pkg/storage/stores/indexshipper/table_client.go +++ b/pkg/storage/stores/indexshipper/table_client.go @@ -20,7 +20,7 @@ func NewTableClient(objectClient client.ObjectClient, storageKeyPrefix string) i } func (b *tableClient) ListTables(ctx context.Context) ([]string, error) { - b.indexStorageClient.RefreshIndexListCache(ctx) + b.indexStorageClient.RefreshIndexTableNamesCache(ctx) return b.indexStorageClient.ListTables(ctx) }