diff --git a/pkg/storage/stores/shipper/compactor/compactor.go b/pkg/storage/stores/shipper/compactor/compactor.go index 56c04902a2..df12c4104c 100644 --- a/pkg/storage/stores/shipper/compactor/compactor.go +++ b/pkg/storage/stores/shipper/compactor/compactor.go @@ -18,6 +18,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/grafana/loki/pkg/storage/stores/shipper" + shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util" "github.com/grafana/loki/pkg/storage/stores/util" ) @@ -66,9 +67,14 @@ func NewCompactor(cfg Config, storageConfig storage.Config, r prometheus.Registe return nil, err } + objectClient = util.NewPrefixedObjectClient(objectClient, shipper.StorageKeyPrefix) + if cfg.SharedStoreType != "filesystem" { + objectClient = shipper_util.NewCachedObjectClient(objectClient) + } + compactor := Compactor{ cfg: cfg, - objectClient: util.NewPrefixedObjectClient(objectClient, shipper.StorageKeyPrefix), + objectClient: objectClient, metrics: newMetrics(r), } diff --git a/pkg/storage/stores/shipper/shipper_index_client.go b/pkg/storage/stores/shipper/shipper_index_client.go index a9118dfa26..4eeeb30a9e 100644 --- a/pkg/storage/stores/shipper/shipper_index_client.go +++ b/pkg/storage/stores/shipper/shipper_index_client.go @@ -22,6 +22,7 @@ import ( "github.com/grafana/loki/pkg/storage/stores/shipper/downloads" "github.com/grafana/loki/pkg/storage/stores/shipper/uploads" + shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util" "github.com/grafana/loki/pkg/storage/stores/util" ) @@ -101,7 +102,7 @@ func NewShipper(cfg Config, storageClient chunk.ObjectClient, registerer prometh return &shipper, nil } -func (s *Shipper) init(storageClient chunk.ObjectClient, registerer prometheus.Registerer) error { +func (s *Shipper) init(objectClient chunk.ObjectClient, registerer prometheus.Registerer) error { // When we run with target querier we don't have ActiveIndexDirectory set so using CacheLocation instead. // Also it doesn't matter which directory we use since BoltDBIndexClient doesn't do anything with it but it is good to have a valid path. boltdbIndexClientDir := s.cfg.ActiveIndexDirectory @@ -115,7 +116,11 @@ func (s *Shipper) init(storageClient chunk.ObjectClient, registerer prometheus.R return err } - prefixedObjectClient := util.NewPrefixedObjectClient(storageClient, StorageKeyPrefix) + objectClient = util.NewPrefixedObjectClient(objectClient, StorageKeyPrefix) + + if s.cfg.SharedStoreType != "filesystem" { + objectClient = shipper_util.NewCachedObjectClient(objectClient) + } if s.cfg.Mode != ModeReadOnly { uploader, err := s.getUploaderName() @@ -129,7 +134,7 @@ func (s *Shipper) init(storageClient chunk.ObjectClient, registerer prometheus.R UploadInterval: UploadInterval, DBRetainPeriod: s.cfg.ResyncInterval + 2*time.Minute, } - uploadsManager, err := uploads.NewTableManager(cfg, s.boltDBIndexClient, prefixedObjectClient, registerer) + uploadsManager, err := uploads.NewTableManager(cfg, s.boltDBIndexClient, objectClient, registerer) if err != nil { return err } @@ -144,7 +149,7 @@ func (s *Shipper) init(storageClient chunk.ObjectClient, registerer prometheus.R CacheTTL: s.cfg.CacheTTL, QueryReadyNumDays: s.cfg.QueryReadyNumDays, } - downloadsManager, err := downloads.NewTableManager(cfg, s.boltDBIndexClient, prefixedObjectClient, registerer) + downloadsManager, err := downloads.NewTableManager(cfg, s.boltDBIndexClient, objectClient, registerer) if err != nil { return err } diff --git a/pkg/storage/stores/shipper/util/cached_client.go b/pkg/storage/stores/shipper/util/cached_client.go new file mode 100644 index 0000000000..96f3c186d7 --- /dev/null +++ b/pkg/storage/stores/shipper/util/cached_client.go @@ -0,0 +1,111 @@ +package util + +import ( + "context" + "fmt" + "strings" + "sync" + "time" + + "github.com/cortexproject/cortex/pkg/chunk" +) + +const ( + delimiter = "/" + cacheTimeout = time.Minute +) + +// CachedObjectClient is meant for reducing number of LIST calls on hosted object stores(S3, GCS, Azure Blob Storage and Swift). +// We as of now do a LIST call per table when we need to find its objects. +// CachedObjectClient does flat listing of objects which is only supported by hosted object stores mentioned above. +// In case of boltdb files stored by shipper, the listed objects would have keys like /. +// For each List call without a prefix(which is actually done to get list of tables), +// CachedObjectClient would build a map of TableName -> chunk.StorageObject which would be used as a cache for subsequent List calls for getting list of objects for tables. +// Cache items are evicted after first read or a timeout. The cache is rebuilt during List call with empty prefix or we encounter a cache miss. +type CachedObjectClient struct { + chunk.ObjectClient + tables map[string][]chunk.StorageObject + tablesMtx sync.Mutex + cacheBuiltAt time.Time +} + +func NewCachedObjectClient(downstreamClient chunk.ObjectClient) *CachedObjectClient { + return &CachedObjectClient{ + ObjectClient: downstreamClient, + tables: map[string][]chunk.StorageObject{}, + } +} + +func (c *CachedObjectClient) List(ctx context.Context, prefix, _ string) ([]chunk.StorageObject, []chunk.StorageCommonPrefix, error) { + c.tablesMtx.Lock() + defer c.tablesMtx.Unlock() + + if prefix == "" { + tables, err := c.listTables(ctx) + if err != nil { + return nil, nil, err + } + + return []chunk.StorageObject{}, tables, nil + } + + // While listing objects in a table, prefix is set to +delimiter so trim the delimiter first. + tableName := strings.TrimSuffix(prefix, delimiter) + if strings.Contains(tableName, delimiter) { + return nil, nil, fmt.Errorf("invalid prefix %s for listing table objects", prefix) + } + tableObjects, err := c.listTableObjects(ctx, tableName) + if err != nil { + return nil, nil, err + } + + return tableObjects, []chunk.StorageCommonPrefix{}, nil +} + +// listTables assumes that tablesMtx is already locked by the caller +func (c *CachedObjectClient) listTables(ctx context.Context) ([]chunk.StorageCommonPrefix, error) { + // do a flat listing by setting delimiter to empty string + objects, _, err := c.ObjectClient.List(ctx, "", "") + if err != nil { + return nil, err + } + + // build the cache and response containing just table names as chunk.StorageCommonPrefix + var tableNames []chunk.StorageCommonPrefix + for _, object := range objects { + ss := strings.Split(object.Key, delimiter) + if len(ss) != 2 { + return nil, fmt.Errorf("invalid object key found %s", object.Key) + } + + if _, ok := c.tables[ss[0]]; !ok { + tableNames = append(tableNames, chunk.StorageCommonPrefix(ss[0])) + } + c.tables[ss[0]] = append(c.tables[ss[0]], object) + } + + c.cacheBuiltAt = time.Now() + + return tableNames, nil +} + +// listTableObjects assumes that tablesMtx is already locked by the caller +func (c *CachedObjectClient) listTableObjects(ctx context.Context, tableName string) ([]chunk.StorageObject, error) { + objects, ok := c.tables[tableName] + if ok && c.cacheBuiltAt.Add(cacheTimeout).After(time.Now()) { + // evict the element read from cache + delete(c.tables, tableName) + return objects, nil + } + + // requested element not found in the cache, rebuild the cache. + _, err := c.listTables(ctx) + if err != nil { + return nil, err + } + + objects = c.tables[tableName] + // evict the element read from cache + delete(c.tables, tableName) + return objects, nil +} diff --git a/pkg/storage/stores/shipper/util/cached_client_test.go b/pkg/storage/stores/shipper/util/cached_client_test.go new file mode 100644 index 0000000000..3c68e65217 --- /dev/null +++ b/pkg/storage/stores/shipper/util/cached_client_test.go @@ -0,0 +1,80 @@ +package util + +import ( + "context" + "testing" + + "github.com/cortexproject/cortex/pkg/chunk" + "github.com/stretchr/testify/require" +) + +type mockHostedObjectClient struct { + chunk.ObjectClient + objects []chunk.StorageObject +} + +func (m mockHostedObjectClient) List(_ context.Context, _, _ string) ([]chunk.StorageObject, []chunk.StorageCommonPrefix, error) { + return m.objects, []chunk.StorageCommonPrefix{}, nil +} + +func TestCachedObjectClient_List(t *testing.T) { + objectClient := mockHostedObjectClient{ + objects: []chunk.StorageObject{ + { + Key: "table1/obj1", + }, + { + Key: "table1/obj2", + }, + { + Key: "table2/obj1", + }, + { + Key: "table2/obj2", + }, + }, + } + + cachedObjectClient := NewCachedObjectClient(objectClient) + + // list tables which should build the cache + _, tables, err := cachedObjectClient.List(context.Background(), "", "") + require.NoError(t, err) + require.Equal(t, []chunk.StorageCommonPrefix{"table1", "table2"}, tables) + + // verify whether cache has right items + require.Len(t, cachedObjectClient.tables, 2) + require.Equal(t, objectClient.objects[:2], cachedObjectClient.tables["table1"]) + require.Equal(t, objectClient.objects[2:], cachedObjectClient.tables["table2"]) + + // list table1 objects + objects, _, err := cachedObjectClient.List(context.Background(), "table1/", "") + require.NoError(t, err) + require.Equal(t, objectClient.objects[:2], objects) + + // verify whether table1 got evicted + require.Len(t, cachedObjectClient.tables, 1) + require.Contains(t, cachedObjectClient.tables, "table2") + + // list table2 objects + objects, _, err = cachedObjectClient.List(context.Background(), "table2/", "") + require.NoError(t, err) + require.Equal(t, objectClient.objects[2:], objects) + + // verify whether table2 got evicted as well + require.Len(t, cachedObjectClient.tables, 0) + + // list table1 again which should rebuild the cache + objects, _, err = cachedObjectClient.List(context.Background(), "table1/", "") + require.NoError(t, err) + require.Equal(t, objectClient.objects[:2], objects) + + // verify whether cache was rebuilt and table1 got evicted already + require.Len(t, cachedObjectClient.tables, 1) + require.Contains(t, cachedObjectClient.tables, "table2") + + // verify whether listing non-existing table should not error + objects, _, err = cachedObjectClient.List(context.Background(), "table3/", "") + require.NoError(t, err) + require.Len(t, objects, 0) +}