From 73edf7a9430aa3a8dc91370fc34de9be92bbe88a Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Mon, 5 Feb 2024 22:31:51 +0100 Subject: [PATCH] (chore) Bloomshipper: Separate store and client (#11865) **What this PR does / why we need it**: This PR removes the `StoreAndClient` interface that was accepted by the `BloomShipper`. Since the `BloomStore` had to not only implement the `Store` interface, but also the `Client` interface, it caused re-implementation of the same methods in different ways. Now the shipper solely relies on the `Store` interface. See individual commit messages for more context. Tests have been rewritten from scratch and placed in their own respective test files for store and client. --------- Signed-off-by: Christian Haudum --- go.mod | 2 +- pkg/bloomcompactor/bloomcompactor.go | 4 +- .../testutils/inmemory_storage_client.go | 51 +- .../shipper/bloomshipper/client_test.go | 662 +++++++----------- .../stores/shipper/bloomshipper/fetcher.go | 4 +- .../stores/shipper/bloomshipper/shipper.go | 8 +- .../stores/shipper/bloomshipper/store.go | 192 +---- .../stores/shipper/bloomshipper/store_test.go | 268 +++++++ 8 files changed, 586 insertions(+), 605 deletions(-) create mode 100644 pkg/storage/stores/shipper/bloomshipper/store_test.go diff --git a/go.mod b/go.mod index f8db7e46af..87ea0fd075 100644 --- a/go.mod +++ b/go.mod @@ -119,7 +119,6 @@ require ( github.com/DmitriyVTitov/size v1.5.0 github.com/IBM/go-sdk-core/v5 v5.13.1 github.com/IBM/ibm-cos-sdk-go v1.10.0 - github.com/aws/smithy-go v1.11.1 github.com/axiomhq/hyperloglog v0.0.0-20230201085229-3ddf4bad03dc github.com/d4l3k/messagediff v1.2.1 github.com/efficientgo/core v1.0.0-rc.2 @@ -183,6 +182,7 @@ require ( github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.1 // indirect github.com/aws/aws-sdk-go-v2/service/sso v1.11.1 // indirect github.com/aws/aws-sdk-go-v2/service/sts v1.16.1 // indirect + github.com/aws/smithy-go v1.11.1 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect github.com/cncf/udpa/go v0.0.0-20220112060539-c52dc94e7fbe // indirect diff --git a/pkg/bloomcompactor/bloomcompactor.go b/pkg/bloomcompactor/bloomcompactor.go index a386240859..cf3b3fafcb 100644 --- a/pkg/bloomcompactor/bloomcompactor.go +++ b/pkg/bloomcompactor/bloomcompactor.go @@ -38,7 +38,7 @@ type Compactor struct { limits Limits // temporary workaround until store has implemented read/write shipper interface - store bloomshipper.StoreAndClient + store bloomshipper.Store sharding ShardingStrategy @@ -48,7 +48,7 @@ type Compactor struct { func New( cfg Config, - store bloomshipper.StoreAndClient, + store bloomshipper.Store, sharding ShardingStrategy, limits Limits, logger log.Logger, diff --git a/pkg/storage/chunk/client/testutils/inmemory_storage_client.go b/pkg/storage/chunk/client/testutils/inmemory_storage_client.go index 48a7a6c650..15e2ddb256 100644 --- a/pkg/storage/chunk/client/testutils/inmemory_storage_client.go +++ b/pkg/storage/chunk/client/testutils/inmemory_storage_client.go @@ -33,9 +33,10 @@ const ( // MockStorage is a fake in-memory StorageClient. type MockStorage struct { + *InMemoryObjectClient + mtx sync.RWMutex tables map[string]*mockTable - objects map[string][]byte schemaCfg config.SchemaConfig numIndexWrites int @@ -43,6 +44,25 @@ type MockStorage struct { mode MockStorageMode } +// compiler check +var _ client.ObjectClient = &InMemoryObjectClient{} + +type InMemoryObjectClient struct { + objects map[string][]byte + mtx sync.RWMutex + mode MockStorageMode +} + +func NewInMemoryObjectClient() *InMemoryObjectClient { + return &InMemoryObjectClient{ + objects: make(map[string][]byte), + } +} + +func (m *InMemoryObjectClient) Internals() map[string][]byte { + return m.objects +} + type mockTable struct { items map[string][]mockItem write, read int64 @@ -64,6 +84,7 @@ func ResetMockStorage() { func NewMockStorage() *MockStorage { if singleton == nil { singleton = &MockStorage{ + InMemoryObjectClient: NewInMemoryObjectClient(), schemaCfg: config.SchemaConfig{ Configs: []config.PeriodConfig{ { @@ -73,8 +94,7 @@ func NewMockStorage() *MockStorage { }, }, }, - tables: map[string]*mockTable{}, - objects: map[string][]byte{}, + tables: map[string]*mockTable{}, } } return singleton @@ -109,6 +129,7 @@ func (*MockStorage) Stop() { func (m *MockStorage) SetMode(mode MockStorageMode) { m.mode = mode + m.InMemoryObjectClient.mode = mode } // ListTables implements StorageClient. @@ -370,7 +391,8 @@ func (m *MockStorage) query(ctx context.Context, query index.Query, callback fun return nil } -func (m *MockStorage) ObjectExists(_ context.Context, objectKey string) (bool, error) { +// ObjectExists implments client.ObjectClient +func (m *InMemoryObjectClient) ObjectExists(_ context.Context, objectKey string) (bool, error) { m.mtx.RLock() defer m.mtx.RUnlock() @@ -386,7 +408,8 @@ func (m *MockStorage) ObjectExists(_ context.Context, objectKey string) (bool, e return true, nil } -func (m *MockStorage) GetObject(_ context.Context, objectKey string) (io.ReadCloser, int64, error) { +// GetObject implements client.ObjectClient. +func (m *InMemoryObjectClient) GetObject(_ context.Context, objectKey string) (io.ReadCloser, int64, error) { m.mtx.RLock() defer m.mtx.RUnlock() @@ -402,7 +425,8 @@ func (m *MockStorage) GetObject(_ context.Context, objectKey string) (io.ReadClo return io.NopCloser(bytes.NewReader(buf)), int64(len(buf)), nil } -func (m *MockStorage) PutObject(_ context.Context, objectKey string, object io.ReadSeeker) error { +// PutObject implements client.ObjectClient. +func (m *InMemoryObjectClient) PutObject(_ context.Context, objectKey string, object io.ReadSeeker) error { buf, err := io.ReadAll(object) if err != nil { return err @@ -419,7 +443,8 @@ func (m *MockStorage) PutObject(_ context.Context, objectKey string, object io.R return nil } -func (m *MockStorage) IsObjectNotFoundErr(err error) bool { +// IsObjectNotFoundErr implements client.ObjectClient. +func (m *InMemoryObjectClient) IsObjectNotFoundErr(err error) bool { return errors.Is(err, errStorageObjectNotFound) } @@ -427,9 +452,11 @@ func (m *MockStorage) IsChunkNotFoundErr(err error) bool { return m.IsObjectNotFoundErr(err) } -func (m *MockStorage) IsRetryableErr(error) bool { return false } +// IsRetryableErr implements client.ObjectClient. +func (m *InMemoryObjectClient) IsRetryableErr(error) bool { return false } -func (m *MockStorage) DeleteObject(_ context.Context, objectKey string) error { +// DeleteObject implements client.ObjectClient. +func (m *InMemoryObjectClient) DeleteObject(_ context.Context, objectKey string) error { m.mtx.Lock() defer m.mtx.Unlock() @@ -446,7 +473,7 @@ func (m *MockStorage) DeleteObject(_ context.Context, objectKey string) error { } // List implements chunk.ObjectClient. -func (m *MockStorage) List(_ context.Context, prefix, delimiter string) ([]client.StorageObject, []client.StorageCommonPrefix, error) { +func (m *InMemoryObjectClient) List(_ context.Context, prefix, delimiter string) ([]client.StorageObject, []client.StorageCommonPrefix, error) { m.mtx.RLock() defer m.mtx.RUnlock() @@ -494,6 +521,10 @@ func (m *MockStorage) List(_ context.Context, prefix, delimiter string) ([]clien return storageObjects, commonPrefixes, nil } +// Stop implements client.ObjectClient +func (*InMemoryObjectClient) Stop() { +} + type mockWriteBatch struct { inserts []struct { tableName, hashValue string diff --git a/pkg/storage/stores/shipper/bloomshipper/client_test.go b/pkg/storage/stores/shipper/bloomshipper/client_test.go index 03a15f11bd..02d80429d7 100644 --- a/pkg/storage/stores/shipper/bloomshipper/client_test.go +++ b/pkg/storage/stores/shipper/bloomshipper/client_test.go @@ -1,43 +1,29 @@ package bloomshipper import ( - "archive/tar" "bytes" "context" "encoding/json" - "io" + "fmt" "os" - "path/filepath" - "strings" "testing" "time" - awsio "github.com/aws/smithy-go/io" "github.com/go-kit/log" - "github.com/google/uuid" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" - "github.com/grafana/loki/pkg/chunkenc" - "github.com/grafana/loki/pkg/storage" v1 "github.com/grafana/loki/pkg/storage/bloom/v1" - "github.com/grafana/loki/pkg/storage/chunk/cache" + "github.com/grafana/loki/pkg/storage/chunk/client/testutils" "github.com/grafana/loki/pkg/storage/config" - bloomshipperconfig "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper/config" ) -const ( - day = 24 * time.Hour -) - -var ( - // table 19627 - fixedDay = Date(2023, time.September, 27, 0, 0, 0) -) - -func Date(year int, month time.Month, day, hour, min, sec int) model.Time { - date := time.Date(year, month, day, hour, min, sec, 0, time.UTC) - return model.TimeFromUnixNano(date.UnixNano()) +func parseTime(s string) model.Time { + t, err := time.Parse("2006-01-02 15:04", s) + if err != nil { + panic(err) + } + return model.TimeFromUnix(t.Unix()) } func parseDayTime(s string) config.DayTime { @@ -50,462 +36,310 @@ func parseDayTime(s string) config.DayTime { } } -func Test_BloomClient_FetchMetas(t *testing.T) { - store := createStore(t) - - var expected []Meta - // metas that belong to 1st schema stored in folder-1 - // must not be present in results because it is outside of time range - createMetaInStorage(t, store, "19621", "tenantA", 0, 100, fixedDay.Add(-7*day)) - // must be present in the results - expected = append(expected, createMetaInStorage(t, store, "19621", "tenantA", 0, 100, fixedDay.Add(-6*day))) - // must not be present in results because it belongs to another tenant - createMetaInStorage(t, store, "19621", "tenantB", 0, 100, fixedDay.Add(-6*day)) - // must be present in the results - expected = append(expected, createMetaInStorage(t, store, "19621", "tenantA", 101, 200, fixedDay.Add(-6*day))) - - // metas that belong to 2nd schema stored in folder-2 - // must not be present in results because it's out of the time range - createMetaInStorage(t, store, "19626", "tenantA", 0, 100, fixedDay.Add(-1*day)) - // must be present in the results - expected = append(expected, createMetaInStorage(t, store, "19625", "tenantA", 0, 100, fixedDay.Add(-2*day))) - // must not be present in results because it belongs to another tenant - createMetaInStorage(t, store, "19624", "tenantB", 0, 100, fixedDay.Add(-3*day)) - - searchParams := MetaSearchParams{ - TenantID: "tenantA", - Keyspace: v1.NewBounds(50, 150), - Interval: NewInterval(fixedDay.Add(-6*day), fixedDay.Add(-1*day-1*time.Hour)), +func newMockBloomClient(t *testing.T) (*BloomClient, string) { + oc := testutils.NewInMemoryObjectClient() + dir := t.TempDir() + logger := log.NewLogfmtLogger(os.Stderr) + cfg := bloomStoreConfig{ + workingDir: dir, + numWorkers: 3, } - - fetched, err := store.FetchMetas(context.Background(), searchParams) + client, err := NewBloomClient(cfg, oc, logger) require.NoError(t, err) + return client, dir +} - require.Equal(t, len(expected), len(fetched)) - for i := range expected { - require.Equal(t, expected[i].String(), fetched[i].String()) - require.ElementsMatch(t, expected[i].Blocks, fetched[i].Blocks) - require.ElementsMatch(t, expected[i].Tombstones, fetched[i].Tombstones) +func putMeta(c *BloomClient, tenant string, start model.Time, minFp, maxFp model.Fingerprint) (Meta, error) { + step := int64((24 * time.Hour).Seconds()) + day := start.Unix() / step + meta := Meta{ + MetaRef: MetaRef{ + Ref: Ref{ + TenantID: tenant, + Bounds: v1.NewBounds(minFp, maxFp), + TableName: fmt.Sprintf("table_%d", day), + // Unused + // StartTimestamp: start, + // EndTimestamp: start.Add(12 * time.Hour), + }, + }, + Blocks: []BlockRef{}, + Tombstones: []BlockRef{}, } + raw, _ := json.Marshal(meta) + return meta, c.client.PutObject(context.Background(), c.Meta(meta.MetaRef).Addr(), bytes.NewReader(raw)) +} - resolved, _, err := store.ResolveMetas(context.Background(), searchParams) +func TestBloomClient_GetMeta(t *testing.T) { + c, _ := newMockBloomClient(t) + ctx := context.Background() + + m, err := putMeta(c, "tenant", parseTime("2024-02-05 00:00"), 0x0000, 0xffff) require.NoError(t, err) - var resolvedRefs []MetaRef - for _, refs := range resolved { - resolvedRefs = append(resolvedRefs, refs...) - } - for i := range resolvedRefs { - require.Equal(t, fetched[i].MetaRef, resolvedRefs[i]) - } + t.Run("exists", func(t *testing.T) { + meta, err := c.GetMeta(ctx, m.MetaRef) + require.NoError(t, err) + require.Equal(t, meta, m) + }) + + t.Run("does not exist", func(t *testing.T) { + meta, err := c.GetMeta(ctx, MetaRef{}) + require.Error(t, err) + require.True(t, c.client.IsObjectNotFoundErr(err)) + require.Equal(t, meta, Meta{}) + }) } -func Test_BloomClient_PutMeta(t *testing.T) { - tests := map[string]struct { - source Meta - expectedFilePath string - expectedStorage string - }{ - "expected meta to be uploaded to the first folder": { - source: createMetaEntity("tenantA", - "table_19621", - 0xff, - 0xfff, - Date(2023, time.September, 21, 5, 0, 0), - Date(2023, time.September, 21, 6, 0, 0), - ), - expectedStorage: "folder-1", - expectedFilePath: "bloom/table_19621/tenantA/metas/00000000000000ff-0000000000000fff-0", - }, - "expected meta to be uploaded to the second folder": { - source: createMetaEntity("tenantA", - "table_19625", - 200, - 300, - Date(2023, time.September, 25, 0, 0, 0), - Date(2023, time.September, 25, 1, 0, 0), - ), - expectedStorage: "folder-2", - expectedFilePath: "bloom/table_19625/tenantA/metas/00000000000000c8-000000000000012c-0", - }, - } - for name, data := range tests { - t.Run(name, func(t *testing.T) { - bloomClient := createStore(t) - - err := bloomClient.PutMeta(context.Background(), data.source) - require.NoError(t, err) - - directory := bloomClient.storageConfig.NamedStores.Filesystem[data.expectedStorage].Directory - filePath := filepath.Join(directory, data.expectedFilePath) - require.FileExistsf(t, filePath, data.source.String()) - content, err := os.ReadFile(filePath) - require.NoError(t, err) - result := Meta{} - err = json.Unmarshal(content, &result) - require.NoError(t, err) - - require.Equal(t, data.source.Blocks, result.Blocks) - require.Equal(t, data.source.Tombstones, result.Tombstones) - }) - } +func TestBloomClient_GetMetas(t *testing.T) { + c, _ := newMockBloomClient(t) + ctx := context.Background() + + m1, err := putMeta(c, "tenant", parseTime("2024-02-05 00:00"), 0x0000, 0x0fff) + require.NoError(t, err) + m2, err := putMeta(c, "tenant", parseTime("2024-02-05 00:00"), 0x1000, 0xffff) + require.NoError(t, err) + + t.Run("exists", func(t *testing.T) { + metas, err := c.GetMetas(ctx, []MetaRef{m1.MetaRef, m2.MetaRef}) + require.NoError(t, err) + require.Equal(t, metas, []Meta{m1, m2}) + }) + t.Run("does not exist", func(t *testing.T) { + metas, err := c.GetMetas(ctx, []MetaRef{{}}) + require.Error(t, err) + require.True(t, c.client.IsObjectNotFoundErr(err)) + require.Equal(t, metas, []Meta{{}}) + }) } -func Test_BloomClient_DeleteMeta(t *testing.T) { - tests := map[string]struct { - source Meta - expectedFilePath string - expectedStorage string - }{ - "expected meta to be deleted from the first folder": { - source: createMetaEntity("tenantA", - "table_19621", - 0xff, - 0xfff, - Date(2023, time.September, 21, 5, 0, 0), - Date(2023, time.September, 21, 6, 0, 0), - ), - expectedStorage: "folder-1", - expectedFilePath: "bloom/table_19621/tenantA/metas/00000000000000ff-0000000000000fff-0", - }, - "expected meta to be delete from the second folder": { - source: createMetaEntity("tenantA", - "table_19625", - 200, - 300, - Date(2023, time.September, 25, 0, 0, 0), - Date(2023, time.September, 25, 1, 0, 0), - ), - expectedStorage: "folder-2", - expectedFilePath: "bloom/table_19625/tenantA/metas/00000000000000c8-000000000000012c-0", +func TestBloomClient_PutMeta(t *testing.T) { + c, _ := newMockBloomClient(t) + ctx := context.Background() + + meta := Meta{ + MetaRef: MetaRef{ + Ref: Ref{ + TenantID: "tenant", + Bounds: v1.NewBounds(0x0000, 0xffff), + TableName: "table_1234", + // Unused + // StartTimestamp: start, + // EndTimestamp: start.Add(12 * time.Hour), + }, }, + Blocks: []BlockRef{}, + Tombstones: []BlockRef{}, } - for name, data := range tests { - t.Run(name, func(t *testing.T) { - bloomClient := createStore(t) - directory := bloomClient.storageConfig.NamedStores.Filesystem[data.expectedStorage].Directory - file := filepath.Join(directory, data.expectedFilePath) - - // requires that Test_BloomClient_PutMeta does not fail - err := bloomClient.PutMeta(context.Background(), data.source) - require.NoError(t, err) - require.FileExists(t, file, data.source.String()) + err := c.PutMeta(ctx, meta) + require.NoError(t, err) - err = bloomClient.DeleteMetas(context.Background(), []MetaRef{data.source.MetaRef}) - require.NoError(t, err) + oc := c.client.(*testutils.InMemoryObjectClient) + stored := oc.Internals() + _, found := stored[c.Meta(meta.MetaRef).Addr()] + require.True(t, found) - require.NoFileExists(t, file, data.source.String()) - }) - } + fromStorage, err := c.GetMeta(ctx, meta.MetaRef) + require.NoError(t, err) + require.Equal(t, meta, fromStorage) } -func Test_BloomClient_GetBlocks(t *testing.T) { - firstBlockRef := BlockRef{ - Ref: Ref{ - TenantID: "tenantA", - TableName: "schema_a_table_19621", - Bounds: v1.NewBounds(0xeeee, 0xffff), - StartTimestamp: Date(2023, time.September, 21, 5, 0, 0), - EndTimestamp: Date(2023, time.September, 21, 6, 0, 0), - Checksum: 1, - }, - } - secondBlockRef := BlockRef{ - Ref: Ref{ - TenantID: "tenantA", - TableName: "schema_b_table_19624", - Bounds: v1.NewBounds(0xaaaa, 0xbbbb), - StartTimestamp: Date(2023, time.September, 24, 5, 0, 0), - EndTimestamp: Date(2023, time.September, 24, 6, 0, 0), - Checksum: 2, - }, - } +func TestBloomClient_DeleteMetas(t *testing.T) { + c, _ := newMockBloomClient(t) + ctx := context.Background() - bloomClient := createStore(t) + m1, err := putMeta(c, "tenant", parseTime("2024-02-05 00:00"), 0x0000, 0xffff) + require.NoError(t, err) + m2, err := putMeta(c, "tenant", parseTime("2024-02-06 00:00"), 0x0000, 0xffff) + require.NoError(t, err) + m3, err := putMeta(c, "tenant", parseTime("2024-02-07 00:00"), 0x0000, 0xffff) + require.NoError(t, err) + + oc := c.client.(*testutils.InMemoryObjectClient) + stored := oc.Internals() + _, found := stored[c.Meta(m1.MetaRef).Addr()] + require.True(t, found) + _, found = stored[c.Meta(m2.MetaRef).Addr()] + require.True(t, found) + _, found = stored[c.Meta(m3.MetaRef).Addr()] + require.True(t, found) + + t.Run("all deleted", func(t *testing.T) { + err = c.DeleteMetas(ctx, []MetaRef{m1.MetaRef, m2.MetaRef}) + require.NoError(t, err) + + _, found = stored[c.Meta(m1.MetaRef).Addr()] + require.False(t, found) + _, found = stored[c.Meta(m2.MetaRef).Addr()] + require.False(t, found) + }) + + t.Run("some not found", func(t *testing.T) { + err = c.DeleteMetas(ctx, []MetaRef{m3.MetaRef, m1.MetaRef}) + require.Error(t, err) + require.True(t, c.client.IsObjectNotFoundErr(err)) - fsNamedStores := bloomClient.storageConfig.NamedStores.Filesystem + _, found = stored[c.Meta(m3.MetaRef).Addr()] + require.False(t, found) + }) +} - firstBlockFullPath := NewPrefixedResolver( - fsNamedStores["folder-1"].Directory, - defaultKeyResolver{}, - ).Block(firstBlockRef).LocalPath() - _ = createBlockFile(t, firstBlockFullPath) - require.FileExists(t, firstBlockFullPath) +func putBlock(t *testing.T, c *BloomClient, tenant string, start model.Time, minFp, maxFp model.Fingerprint) (Block, error) { + step := int64((24 * time.Hour).Seconds()) + day := start.Unix() / step - secondBlockFullPath := NewPrefixedResolver( - fsNamedStores["folder-2"].Directory, - defaultKeyResolver{}, - ).Block(secondBlockRef).LocalPath() - _ = createBlockFile(t, secondBlockFullPath) - require.FileExists(t, secondBlockFullPath) + tmpDir := t.TempDir() + fp, _ := os.CreateTemp(t.TempDir(), "*.tar.gz") - _, err := bloomClient.GetBlock(context.Background(), firstBlockRef) + blockWriter := v1.NewDirectoryBlockWriter(tmpDir) + err := blockWriter.Init() require.NoError(t, err) - // firstBlockActualData, err := io.ReadAll(downloadedFirstBlock.Data) - // require.NoError(t, err) - // require.Equal(t, firstBlockData, string(firstBlockActualData)) - _, err = bloomClient.GetBlock(context.Background(), secondBlockRef) + err = v1.TarGz(fp, v1.NewDirectoryBlockReader(tmpDir)) require.NoError(t, err) - // secondBlockActualData, err := io.ReadAll(downloadedSecondBlock.Data) - // require.NoError(t, err) - // require.Equal(t, secondBlockData, string(secondBlockActualData)) -} -func Test_BloomClient_PutBlocks(t *testing.T) { - bloomClient := createStore(t) + _, _ = fp.Seek(0, 0) + block := Block{ BlockRef: BlockRef{ Ref: Ref{ - TenantID: "tenantA", - TableName: "table_19621", - Bounds: v1.NewBounds(0xeeee, 0xffff), - StartTimestamp: Date(2023, time.September, 21, 5, 0, 0), - EndTimestamp: Date(2023, time.September, 21, 6, 0, 0), - Checksum: 1, + TenantID: tenant, + Bounds: v1.NewBounds(minFp, maxFp), + TableName: fmt.Sprintf("table_%d", day), + StartTimestamp: start, + EndTimestamp: start.Add(12 * time.Hour), }, }, - Data: awsio.ReadSeekNopCloser{ReadSeeker: bytes.NewReader([]byte("data"))}, + Data: fp, } - err := bloomClient.PutBlock(context.Background(), block) - require.NoError(t, err) - - _ = bloomClient.storeDo(block.StartTimestamp, func(s *bloomStoreEntry) error { - c := s.bloomClient.(*BloomClient) - rc, _, err := c.client.GetObject(context.Background(), block.BlockRef.String()) - require.NoError(t, err) - data, err := io.ReadAll(rc) - require.NoError(t, err) - require.Equal(t, "data", string(data)) - return nil - }) + return block, c.client.PutObject(context.Background(), c.Block(block.BlockRef).Addr(), block.Data) } -func Test_BloomClient_DeleteBlocks(t *testing.T) { - block := BlockRef{ - Ref: Ref{ - TenantID: "tenantA", - TableName: "table_19621", - Bounds: v1.NewBounds(0xeeee, 0xffff), - StartTimestamp: Date(2023, time.September, 21, 5, 0, 0), - EndTimestamp: Date(2023, time.September, 21, 6, 0, 0), - Checksum: 1, - }, - } - - bloomClient := createStore(t) - fsNamedStores := bloomClient.storageConfig.NamedStores.Filesystem - blockFullPath := NewPrefixedResolver( - fsNamedStores["folder-1"].Directory, - defaultKeyResolver{}, - ).Block(block).LocalPath() - _ = createBlockFile(t, blockFullPath) - require.FileExists(t, blockFullPath) +func TestBloomClient_GetBlock(t *testing.T) { + c, _ := newMockBloomClient(t) + ctx := context.Background() - err := bloomClient.DeleteBlocks(context.Background(), []BlockRef{block}) + b, err := putBlock(t, c, "tenant", parseTime("2024-02-05 00:00"), 0x0000, 0xffff) require.NoError(t, err) - require.NoFileExists(t, blockFullPath) + t.Run("exists", func(t *testing.T) { + blockDir, err := c.GetBlock(ctx, b.BlockRef) + require.NoError(t, err) + require.Equal(t, b.BlockRef, blockDir.BlockRef) + }) + + t.Run("does not exist", func(t *testing.T) { + blockDir, err := c.GetBlock(ctx, BlockRef{}) + require.Error(t, err) + require.True(t, c.client.IsObjectNotFoundErr(err)) + require.Equal(t, blockDir, BlockDirectory{}) + }) } -func createBlockFile(t *testing.T, dst string) string { - err := os.MkdirAll(dst[:strings.LastIndex(dst, "/")], 0755) - require.NoError(t, err) - fileContent := uuid.NewString() +func TestBloomClient_GetBlocks(t *testing.T) { + c, _ := newMockBloomClient(t) + ctx := context.Background() - src := filepath.Join(t.TempDir(), fileContent) - err = os.WriteFile(src, []byte(fileContent), 0700) + b1, err := putBlock(t, c, "tenant", parseTime("2024-02-05 00:00"), 0x0000, 0x0fff) require.NoError(t, err) - - fp, err := os.OpenFile(dst, os.O_CREATE|os.O_RDWR, 0700) + b2, err := putBlock(t, c, "tenant", parseTime("2024-02-05 00:00"), 0x1000, 0xffff) require.NoError(t, err) - defer fp.Close() - TarGz(t, fp, src) + t.Run("exists", func(t *testing.T) { + blockDirs, err := c.GetBlocks(ctx, []BlockRef{b1.BlockRef, b2.BlockRef}) + require.NoError(t, err) + require.Equal(t, []BlockRef{b1.BlockRef, b2.BlockRef}, []BlockRef{blockDirs[0].BlockRef, blockDirs[1].BlockRef}) + }) - return fileContent + t.Run("does not exist", func(t *testing.T) { + _, err := c.GetBlocks(ctx, []BlockRef{{}}) + require.Error(t, err) + require.True(t, c.client.IsObjectNotFoundErr(err)) + }) } -func TarGz(t *testing.T, dst io.Writer, file string) { - src, err := os.Open(file) - require.NoError(t, err) - defer src.Close() - - gzipper := chunkenc.GetWriterPool(chunkenc.EncGZIP).GetWriter(dst) - defer gzipper.Close() +func TestBloomClient_PutBlock(t *testing.T) { + c, _ := newMockBloomClient(t) + ctx := context.Background() - tarballer := tar.NewWriter(gzipper) - defer tarballer.Close() + start := parseTime("2024-02-05 12:00") - for _, f := range []*os.File{src} { - info, err := f.Stat() - require.NoError(t, err) + tmpDir := t.TempDir() + fp, _ := os.CreateTemp(t.TempDir(), "*.tar.gz") - header, err := tar.FileInfoHeader(info, f.Name()) - require.NoError(t, err) - - err = tarballer.WriteHeader(header) - require.NoError(t, err) + blockWriter := v1.NewDirectoryBlockWriter(tmpDir) + err := blockWriter.Init() + require.NoError(t, err) - _, err = io.Copy(tarballer, f) - require.NoError(t, err) - } -} + err = v1.TarGz(fp, v1.NewDirectoryBlockReader(tmpDir)) + require.NoError(t, err) -func Test_ParseMetaKey(t *testing.T) { - tests := map[string]struct { - objectKey string - expectedRef MetaRef - expectedErr string - }{ - "ValidObjectKey": { - objectKey: "bloom/table/tenant/metas/aaa-bbb-abcdef", - expectedRef: MetaRef{ - Ref: Ref{ - TenantID: "tenant", - TableName: "table", - Bounds: v1.NewBounds(0xaaa, 0xbbb), - StartTimestamp: 0, // ignored - EndTimestamp: 0, // ignored - Checksum: 0xabcdef, - }, + block := Block{ + BlockRef: BlockRef{ + Ref: Ref{ + TenantID: "tenant", + Bounds: v1.NewBounds(0x0000, 0xffff), + TableName: "table_1234", + StartTimestamp: start, + EndTimestamp: start.Add(12 * time.Hour), }, }, - "InvalidObjectKeyDelimiterCount": { - objectKey: "invalid/key/with/too/many/objectKeyWithoutDelimiters", - expectedRef: MetaRef{}, - expectedErr: "failed to split filename parts", - }, - "InvalidMinFingerprint": { - objectKey: "invalid/folder/key/metas/zzz-bbb-abcdef", - expectedErr: "failed to parse bounds", - }, - "InvalidMaxFingerprint": { - objectKey: "invalid/folder/key/metas/123-zzz-abcdef", - expectedErr: "failed to parse bounds", - }, - "InvalidChecksum": { - objectKey: "invalid/folder/key/metas/aaa-bbb-ghijklm", - expectedErr: "failed to parse checksum", - }, + Data: fp, } - for name, data := range tests { - t.Run(name, func(t *testing.T) { - actualRef, err := defaultKeyResolver{}.ParseMetaKey(key(data.objectKey)) - if data.expectedErr != "" { - require.ErrorContains(t, err, data.expectedErr) - return - } - require.NoError(t, err) - require.Equal(t, data.expectedRef, actualRef) - }) - } -} -func createStore(t *testing.T) *BloomStore { - periodicConfigs := createPeriodConfigs() - namedStores := storage.NamedStores{ - Filesystem: map[string]storage.NamedFSConfig{ - "folder-1": {Directory: t.TempDir()}, - "folder-2": {Directory: t.TempDir()}, - }} - //required to populate StoreType map in named config - require.NoError(t, namedStores.Validate()) - storageConfig := storage.Config{ - NamedStores: namedStores, - BloomShipperConfig: bloomshipperconfig.Config{ - WorkingDirectory: t.TempDir(), - BlocksDownloadingQueue: bloomshipperconfig.DownloadingQueueConfig{ - WorkersCount: 1, - }, - }, - } + err = c.PutBlock(ctx, block) + require.NoError(t, err) - metrics := storage.NewClientMetrics() - t.Cleanup(metrics.Unregister) - store, err := NewBloomStore(periodicConfigs, storageConfig, metrics, cache.NewNoopCache(), nil, log.NewNopLogger()) + oc := c.client.(*testutils.InMemoryObjectClient) + stored := oc.Internals() + _, found := stored[c.Block(block.BlockRef).Addr()] + require.True(t, found) + + blockDir, err := c.GetBlock(ctx, block.BlockRef) require.NoError(t, err) - return store -} -func createPeriodConfigs() []config.PeriodConfig { - periodicConfigs := []config.PeriodConfig{ - { - ObjectType: "folder-1", - // from 2023-09-20: table range [19620:19623] - From: parseDayTime("2023-09-20"), - IndexTables: config.IndexPeriodicTableConfig{ - PeriodicTableConfig: config.PeriodicTableConfig{ - Period: day, - // TODO(chaudum): Integrate {,Parse}MetaKey into schema config - // Prefix: "schema_a_table_", - }}, - }, - { - ObjectType: "folder-2", - // from 2023-09-24: table range [19624:19627] - From: parseDayTime("2023-09-24"), - IndexTables: config.IndexPeriodicTableConfig{ - PeriodicTableConfig: config.PeriodicTableConfig{ - Period: day, - // TODO(chaudum): Integrate {,Parse}MetaKey into schema config - // Prefix: "schema_b_table_", - }}, - }, - } - return periodicConfigs + require.Equal(t, block.BlockRef, blockDir.BlockRef) } -func createMetaInStorage(t *testing.T, s Client, tableName string, tenant string, minFingerprint uint64, maxFingerprint uint64, start model.Time) Meta { - end := start.Add(12 * time.Hour) +func TestBloomClient_DeleteBlocks(t *testing.T) { + c, _ := newMockBloomClient(t) + ctx := context.Background() - meta := createMetaEntity(tenant, tableName, minFingerprint, maxFingerprint, start, end) - err := s.PutMeta(context.Background(), meta) + b1, err := putBlock(t, c, "tenant", parseTime("2024-02-05 00:00"), 0x0000, 0xffff) + require.NoError(t, err) + b2, err := putBlock(t, c, "tenant", parseTime("2024-02-06 00:00"), 0x0000, 0xffff) + require.NoError(t, err) + b3, err := putBlock(t, c, "tenant", parseTime("2024-02-07 00:00"), 0x0000, 0xffff) require.NoError(t, err) - t.Log("create meta in store", meta.String()) - return meta -} -func createMetaEntity( - tenant string, - tableName string, - minFingerprint uint64, - maxFingerprint uint64, - startTimestamp model.Time, - endTimestamp model.Time, -) Meta { - return Meta{ - MetaRef: MetaRef{ - Ref: Ref{ - TenantID: tenant, - TableName: tableName, - Bounds: v1.NewBounds(model.Fingerprint(minFingerprint), model.Fingerprint(maxFingerprint)), - StartTimestamp: startTimestamp, - EndTimestamp: endTimestamp, - }, - }, - Tombstones: []BlockRef{ - { - Ref: Ref{ - TenantID: tenant, - Bounds: v1.NewBounds(model.Fingerprint(minFingerprint), model.Fingerprint(maxFingerprint)), - StartTimestamp: startTimestamp, - EndTimestamp: endTimestamp, - }, - }, - }, - Blocks: []BlockRef{ - { - Ref: Ref{ - TenantID: tenant, - Bounds: v1.NewBounds(model.Fingerprint(minFingerprint), model.Fingerprint(maxFingerprint)), - StartTimestamp: startTimestamp, - EndTimestamp: endTimestamp, - }, - }, - }, - } + oc := c.client.(*testutils.InMemoryObjectClient) + stored := oc.Internals() + _, found := stored[c.Block(b1.BlockRef).Addr()] + require.True(t, found) + _, found = stored[c.Block(b2.BlockRef).Addr()] + require.True(t, found) + _, found = stored[c.Block(b3.BlockRef).Addr()] + require.True(t, found) + + t.Run("all deleted", func(t *testing.T) { + err = c.DeleteBlocks(ctx, []BlockRef{b1.BlockRef, b2.BlockRef}) + require.NoError(t, err) + + _, found = stored[c.Block(b1.BlockRef).Addr()] + require.False(t, found) + _, found = stored[c.Block(b2.BlockRef).Addr()] + require.False(t, found) + }) + + t.Run("some not found", func(t *testing.T) { + err = c.DeleteBlocks(ctx, []BlockRef{b3.BlockRef, b1.BlockRef}) + require.Error(t, err) + require.True(t, c.client.IsObjectNotFoundErr(err)) + + _, found = stored[c.Block(b3.BlockRef).Addr()] + require.False(t, found) + }) } diff --git a/pkg/storage/stores/shipper/bloomshipper/fetcher.go b/pkg/storage/stores/shipper/bloomshipper/fetcher.go index cfb0f392d6..6efb654809 100644 --- a/pkg/storage/stores/shipper/bloomshipper/fetcher.go +++ b/pkg/storage/stores/shipper/bloomshipper/fetcher.go @@ -343,7 +343,9 @@ func (q *downloadQueue[T, R]) do(ctx context.Context, task downloadTask[T, R]) { q.mu.LockKey(task.key) defer func() { err := q.mu.UnlockKey(task.key) - level.Error(q.logger).Log("msg", "failed to unlock key in block lock", "err", err) + if err != nil { + level.Error(q.logger).Log("msg", "failed to unlock key in block lock", "key", task.key, "err", err) + } }() q.process(ctx, task) diff --git a/pkg/storage/stores/shipper/bloomshipper/shipper.go b/pkg/storage/stores/shipper/bloomshipper/shipper.go index 0e0604962e..614f9c6898 100644 --- a/pkg/storage/stores/shipper/bloomshipper/shipper.go +++ b/pkg/storage/stores/shipper/bloomshipper/shipper.go @@ -37,13 +37,7 @@ type Limits interface { BloomGatewayBlocksDownloadingParallelism(tenantID string) int } -// TODO(chaudum): resolve and rip out -type StoreAndClient interface { - Store - Client -} - -func NewShipper(client StoreAndClient, config config.Config, _ Limits, logger log.Logger, _ prometheus.Registerer) (*Shipper, error) { +func NewShipper(client Store, config config.Config, _ Limits, logger log.Logger, _ prometheus.Registerer) (*Shipper, error) { logger = log.With(logger, "component", "bloom-shipper") return &Shipper{ store: client, diff --git a/pkg/storage/stores/shipper/bloomshipper/store.go b/pkg/storage/stores/shipper/bloomshipper/store.go index 4de9b453ec..4e3dada7b4 100644 --- a/pkg/storage/stores/shipper/bloomshipper/store.go +++ b/pkg/storage/stores/shipper/bloomshipper/store.go @@ -30,9 +30,6 @@ type bloomStoreConfig struct { numWorkers int } -// Compiler check to ensure bloomStoreEntry implements the Client interface -var _ Client = &bloomStoreEntry{} - // Compiler check to ensure bloomStoreEntry implements the Store interface var _ Store = &bloomStoreEntry{} @@ -78,6 +75,12 @@ func (b *bloomStoreEntry) ResolveMetas(ctx context.Context, params MetaSearchPar refs = append(refs, metaRef) } } + + // return empty metaRefs/fetchers if there are no refs + if len(refs) == 0 { + return [][]MetaRef{}, []*Fetcher{}, nil + } + return [][]MetaRef{refs}, []*Fetcher{b.fetcher}, nil } @@ -112,55 +115,12 @@ func (b *bloomStoreEntry) Fetcher(_ model.Time) *Fetcher { return b.fetcher } -// DeleteBlocks implements Client. -func (b *bloomStoreEntry) DeleteBlocks(ctx context.Context, refs []BlockRef) error { - return b.bloomClient.DeleteBlocks(ctx, refs) -} - -// DeleteMeta implements Client. -func (b *bloomStoreEntry) DeleteMetas(ctx context.Context, refs []MetaRef) error { - return b.bloomClient.DeleteMetas(ctx, refs) -} - -// GetBlock implements Client. -func (b *bloomStoreEntry) GetBlock(ctx context.Context, ref BlockRef) (BlockDirectory, error) { - return b.bloomClient.GetBlock(ctx, ref) -} - -// GetBlocks implements Client. -func (b *bloomStoreEntry) GetBlocks(ctx context.Context, refs []BlockRef) ([]BlockDirectory, error) { - return b.fetcher.FetchBlocks(ctx, refs) -} - -// GetMeta implements Client. -func (b *bloomStoreEntry) GetMeta(ctx context.Context, ref MetaRef) (Meta, error) { - return b.bloomClient.GetMeta(ctx, ref) -} - -// GetMetas implements Client. -func (b *bloomStoreEntry) GetMetas(ctx context.Context, refs []MetaRef) ([]Meta, error) { - return b.fetcher.FetchMetas(ctx, refs) -} - -// PutBlocks implements Client. -func (b *bloomStoreEntry) PutBlock(ctx context.Context, block Block) error { - return b.bloomClient.PutBlock(ctx, block) -} - -// PutMeta implements Client. -func (b *bloomStoreEntry) PutMeta(ctx context.Context, meta Meta) error { - return b.bloomClient.PutMeta(ctx, meta) -} - -// Stop implements Client. +// Stop implements Store. func (b bloomStoreEntry) Stop() { b.bloomClient.Stop() b.fetcher.Close() } -// Compiler check to ensure BloomStore implements the Client interface -var _ Client = &BloomStore{} - // Compiler check to ensure BloomStore implements the Store interface var _ Store = &BloomStore{} @@ -267,8 +227,9 @@ func (b *BloomStore) Fetcher(ts model.Time) *Fetcher { // ResolveMetas implements Store. func (b *BloomStore) ResolveMetas(ctx context.Context, params MetaSearchParams) ([][]MetaRef, []*Fetcher, error) { - var refs [][]MetaRef - var fetchers []*Fetcher + refs := make([][]MetaRef, 0, len(b.stores)) + fetchers := make([]*Fetcher, 0, len(b.stores)) + err := b.forStores(ctx, params.Interval, func(innerCtx context.Context, interval Interval, store Store) error { newParams := params newParams.Interval = interval @@ -276,10 +237,14 @@ func (b *BloomStore) ResolveMetas(ctx context.Context, params MetaSearchParams) if err != nil { return err } - refs = append(refs, metas...) - fetchers = append(fetchers, fetcher...) + if len(metas) > 0 { + // only append if there are any results + refs = append(refs, metas...) + fetchers = append(fetchers, fetcher...) + } return nil }) + return refs, fetchers, err } @@ -293,70 +258,22 @@ func (b *BloomStore) FetchMetas(ctx context.Context, params MetaSearchParams) ([ return nil, errors.New("metaRefs and fetchers have unequal length") } - var metas []Meta + metas := []Meta{} for i := range fetchers { res, err := fetchers[i].FetchMetas(ctx, metaRefs[i]) if err != nil { return nil, err } - metas = append(metas, res...) + if len(res) > 0 { + metas = append(metas, res...) + } } return metas, nil } // FetchBlocks implements Store. -func (b *BloomStore) FetchBlocks(ctx context.Context, refs []BlockRef) ([]BlockDirectory, error) { - return b.GetBlocks(ctx, refs) -} - -// DeleteBlocks implements Client. -func (b *BloomStore) DeleteBlocks(ctx context.Context, refs []BlockRef) error { - for _, ref := range refs { - err := b.storeDo( - ref.StartTimestamp, - func(s *bloomStoreEntry) error { - return s.DeleteBlocks(ctx, []BlockRef{ref}) - }, - ) - if err != nil { - return err - } - } - return nil -} - -// DeleteMetas implements Client. -func (b *BloomStore) DeleteMetas(ctx context.Context, refs []MetaRef) error { - for _, ref := range refs { - err := b.storeDo( - ref.StartTimestamp, - func(s *bloomStoreEntry) error { - return s.DeleteMetas(ctx, []MetaRef{ref}) - }, - ) - if err != nil { - return err - } - } - return nil -} - -// GetBlock implements Client. -func (b *BloomStore) GetBlock(ctx context.Context, ref BlockRef) (BlockDirectory, error) { - res := make([]BlockDirectory, 1) - err := b.storeDo(ref.StartTimestamp, func(s *bloomStoreEntry) error { - block, err := s.GetBlock(ctx, ref) - if err != nil { - return err - } - res[0] = block - return nil - }) - return res[0], err -} +func (b *BloomStore) FetchBlocks(ctx context.Context, blocks []BlockRef) ([]BlockDirectory, error) { -// GetBlocks implements Client. -func (b *BloomStore) GetBlocks(ctx context.Context, blocks []BlockRef) ([]BlockDirectory, error) { var refs [][]BlockRef var fetchers []*Fetcher @@ -392,72 +309,7 @@ func (b *BloomStore) GetBlocks(ctx context.Context, blocks []BlockRef) ([]BlockD return results, nil } -// GetMeta implements Client. -func (b *BloomStore) GetMeta(ctx context.Context, ref MetaRef) (Meta, error) { - res := make([]Meta, 1) - err := b.storeDo(ref.StartTimestamp, func(s *bloomStoreEntry) error { - meta, err := s.GetMeta(ctx, ref) - if err != nil { - return err - } - res[0] = meta - return nil - }) - return res[0], err -} - -// GetMetas implements Client. -func (b *BloomStore) GetMetas(ctx context.Context, metas []MetaRef) ([]Meta, error) { - var refs [][]MetaRef - var fetchers []*Fetcher - - for i := len(b.stores) - 1; i >= 0; i-- { - s := b.stores[i] - from, through := s.start, model.Latest - if i < len(b.stores)-1 { - through = b.stores[i+1].start - } - - var res []MetaRef - for _, meta := range metas { - if meta.StartTimestamp >= from && meta.StartTimestamp < through { - res = append(res, meta) - } - } - - if len(res) > 0 { - refs = append(refs, res) - fetchers = append(fetchers, s.Fetcher(s.start)) - } - } - - results := make([]Meta, 0, len(metas)) - for i := range fetchers { - res, err := fetchers[i].FetchMetas(ctx, refs[i]) - results = append(results, res...) - if err != nil { - return results, err - } - } - - return results, nil -} - -// PutBlock implements Client. -func (b *BloomStore) PutBlock(ctx context.Context, block Block) error { - return b.storeDo(block.StartTimestamp, func(s *bloomStoreEntry) error { - return s.PutBlock(ctx, block) - }) -} - -// PutMeta implements Client. -func (b *BloomStore) PutMeta(ctx context.Context, meta Meta) error { - return b.storeDo(meta.StartTimestamp, func(s *bloomStoreEntry) error { - return s.PutMeta(ctx, meta) - }) -} - -// Stop implements Client. +// Stop implements Store. func (b *BloomStore) Stop() { for _, s := range b.stores { s.Stop() diff --git a/pkg/storage/stores/shipper/bloomshipper/store_test.go b/pkg/storage/stores/shipper/bloomshipper/store_test.go new file mode 100644 index 0000000000..87c2ed067c --- /dev/null +++ b/pkg/storage/stores/shipper/bloomshipper/store_test.go @@ -0,0 +1,268 @@ +package bloomshipper + +import ( + "bytes" + "context" + "encoding/json" + "os" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/pkg/storage" + v1 "github.com/grafana/loki/pkg/storage/bloom/v1" + "github.com/grafana/loki/pkg/storage/chunk/cache" + storageconfig "github.com/grafana/loki/pkg/storage/config" + "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper/config" +) + +func newMockBloomStore(t *testing.T) (*BloomStore, string) { + workDir := t.TempDir() + + periodicConfigs := []storageconfig.PeriodConfig{ + { + ObjectType: storageconfig.StorageTypeInMemory, + From: parseDayTime("2024-01-01"), + IndexTables: storageconfig.IndexPeriodicTableConfig{ + PeriodicTableConfig: storageconfig.PeriodicTableConfig{ + Period: 24 * time.Hour, + // TODO(chaudum): Integrate {,Parse}MetaKey into schema config + // Prefix: "schema_a_table_", + }}, + }, + { + ObjectType: storageconfig.StorageTypeInMemory, + From: parseDayTime("2024-02-01"), + IndexTables: storageconfig.IndexPeriodicTableConfig{ + PeriodicTableConfig: storageconfig.PeriodicTableConfig{ + Period: 24 * time.Hour, + // TODO(chaudum): Integrate {,Parse}MetaKey into schema config + // Prefix: "schema_b_table_", + }}, + }, + } + + storageConfig := storage.Config{ + BloomShipperConfig: config.Config{ + WorkingDirectory: workDir, + BlocksDownloadingQueue: config.DownloadingQueueConfig{ + WorkersCount: 1, + }, + BlocksCache: config.BlocksCacheConfig{ + EmbeddedCacheConfig: cache.EmbeddedCacheConfig{ + MaxSizeItems: 1000, + TTL: 1 * time.Hour, + }, + }, + }, + } + + metrics := storage.NewClientMetrics() + t.Cleanup(metrics.Unregister) + logger := log.NewLogfmtLogger(os.Stderr) + + metasCache := cache.NewMockCache() + blocksCache := NewBlocksCache(storageConfig.BloomShipperConfig, prometheus.NewPedanticRegistry(), logger) + store, err := NewBloomStore(periodicConfigs, storageConfig, metrics, metasCache, blocksCache, logger) + require.NoError(t, err) + t.Cleanup(store.Stop) + + return store, workDir +} + +func createMetaInStorage(store *BloomStore, tenant string, start model.Time, minFp, maxFp model.Fingerprint) (Meta, error) { + meta := Meta{ + MetaRef: MetaRef{ + Ref: Ref{ + TenantID: tenant, + Bounds: v1.NewBounds(minFp, maxFp), + // Unused + // StartTimestamp: start, + // EndTimestamp: start.Add(12 * time.Hour), + }, + }, + Blocks: []BlockRef{}, + Tombstones: []BlockRef{}, + } + err := store.storeDo(start, func(s *bloomStoreEntry) error { + raw, _ := json.Marshal(meta) + meta.MetaRef.Ref.TableName = tablesForRange(s.cfg, NewInterval(start, start.Add(12*time.Hour)))[0] + return s.objectClient.PutObject(context.Background(), s.Meta(meta.MetaRef).Addr(), bytes.NewReader(raw)) + }) + return meta, err +} + +func createBlockInStorage(t *testing.T, store *BloomStore, tenant string, start model.Time, minFp, maxFp model.Fingerprint) (Block, error) { + tmpDir := t.TempDir() + fp, _ := os.CreateTemp(t.TempDir(), "*.tar.gz") + + blockWriter := v1.NewDirectoryBlockWriter(tmpDir) + err := blockWriter.Init() + require.NoError(t, err) + + err = v1.TarGz(fp, v1.NewDirectoryBlockReader(tmpDir)) + require.NoError(t, err) + + _, _ = fp.Seek(0, 0) + + block := Block{ + BlockRef: BlockRef{ + Ref: Ref{ + TenantID: tenant, + Bounds: v1.NewBounds(minFp, maxFp), + StartTimestamp: start, + EndTimestamp: start.Add(12 * time.Hour), + }, + }, + Data: fp, + } + err = store.storeDo(start, func(s *bloomStoreEntry) error { + block.BlockRef.Ref.TableName = tablesForRange(s.cfg, NewInterval(start, start.Add(12*time.Hour)))[0] + return s.objectClient.PutObject(context.Background(), s.Block(block.BlockRef).Addr(), block.Data) + }) + return block, err +} + +func TestBloomStore_ResolveMetas(t *testing.T) { + store, _ := newMockBloomStore(t) + + // schema 1 + // outside of interval, outside of bounds + _, _ = createMetaInStorage(store, "tenant", parseTime("2024-01-19 00:00"), 0x00010000, 0x0001ffff) + // outside of interval, inside of bounds + _, _ = createMetaInStorage(store, "tenant", parseTime("2024-01-19 00:00"), 0x00000000, 0x0000ffff) + // inside of interval, outside of bounds + _, _ = createMetaInStorage(store, "tenant", parseTime("2024-01-20 00:00"), 0x00010000, 0x0001ffff) + // inside of interval, inside of bounds + m1, _ := createMetaInStorage(store, "tenant", parseTime("2024-01-20 00:00"), 0x00000000, 0x0000ffff) + + // schema 2 + // inside of interval, inside of bounds + m2, _ := createMetaInStorage(store, "tenant", parseTime("2024-02-05 00:00"), 0x00000000, 0x0000ffff) + // inside of interval, outside of bounds + _, _ = createMetaInStorage(store, "tenant", parseTime("2024-02-05 00:00"), 0x00010000, 0x0001ffff) + // outside of interval, inside of bounds + _, _ = createMetaInStorage(store, "tenant", parseTime("2024-02-11 00:00"), 0x00000000, 0x0000ffff) + // outside of interval, outside of bounds + _, _ = createMetaInStorage(store, "tenant", parseTime("2024-02-11 00:00"), 0x00010000, 0x0001ffff) + + t.Run("tenant matches", func(t *testing.T) { + ctx := context.Background() + params := MetaSearchParams{ + "tenant", + NewInterval(parseTime("2024-01-20 00:00"), parseTime("2024-02-10 00:00")), + v1.NewBounds(0x00000000, 0x0000ffff), + } + + refs, fetchers, err := store.ResolveMetas(ctx, params) + require.NoError(t, err) + require.Len(t, refs, 2) + require.Len(t, fetchers, 2) + + require.Equal(t, [][]MetaRef{{m1.MetaRef}, {m2.MetaRef}}, refs) + }) + + t.Run("tenant does not match", func(t *testing.T) { + ctx := context.Background() + params := MetaSearchParams{ + "other", + NewInterval(parseTime("2024-01-20 00:00"), parseTime("2024-02-10 00:00")), + v1.NewBounds(0x00000000, 0x0000ffff), + } + + refs, fetchers, err := store.ResolveMetas(ctx, params) + require.NoError(t, err) + require.Len(t, refs, 0) + require.Len(t, fetchers, 0) + require.Equal(t, [][]MetaRef{}, refs) + }) +} + +func TestBloomStore_FetchMetas(t *testing.T) { + store, _ := newMockBloomStore(t) + + // schema 1 + // outside of interval, outside of bounds + _, _ = createMetaInStorage(store, "tenant", parseTime("2024-01-19 00:00"), 0x00010000, 0x0001ffff) + // outside of interval, inside of bounds + _, _ = createMetaInStorage(store, "tenant", parseTime("2024-01-19 00:00"), 0x00000000, 0x0000ffff) + // inside of interval, outside of bounds + _, _ = createMetaInStorage(store, "tenant", parseTime("2024-01-20 00:00"), 0x00010000, 0x0001ffff) + // inside of interval, inside of bounds + m1, _ := createMetaInStorage(store, "tenant", parseTime("2024-01-20 00:00"), 0x00000000, 0x0000ffff) + + // schema 2 + // inside of interval, inside of bounds + m2, _ := createMetaInStorage(store, "tenant", parseTime("2024-02-05 00:00"), 0x00000000, 0x0000ffff) + // inside of interval, outside of bounds + _, _ = createMetaInStorage(store, "tenant", parseTime("2024-02-05 00:00"), 0x00010000, 0x0001ffff) + // outside of interval, inside of bounds + _, _ = createMetaInStorage(store, "tenant", parseTime("2024-02-11 00:00"), 0x00000000, 0x0000ffff) + // outside of interval, outside of bounds + _, _ = createMetaInStorage(store, "tenant", parseTime("2024-02-11 00:00"), 0x00010000, 0x0001ffff) + + t.Run("tenant matches", func(t *testing.T) { + ctx := context.Background() + params := MetaSearchParams{ + "tenant", + NewInterval(parseTime("2024-01-20 00:00"), parseTime("2024-02-10 00:00")), + v1.NewBounds(0x00000000, 0x0000ffff), + } + + metas, err := store.FetchMetas(ctx, params) + require.NoError(t, err) + require.Len(t, metas, 2) + + require.Equal(t, []Meta{m1, m2}, metas) + }) + + t.Run("tenant does not match", func(t *testing.T) { + ctx := context.Background() + params := MetaSearchParams{ + "other", + NewInterval(parseTime("2024-01-20 00:00"), parseTime("2024-02-10 00:00")), + v1.NewBounds(0x00000000, 0x0000ffff), + } + + metas, err := store.FetchMetas(ctx, params) + require.NoError(t, err) + require.Len(t, metas, 0) + require.Equal(t, []Meta{}, metas) + }) +} + +func TestBloomStore_FetchBlocks(t *testing.T) { + store, _ := newMockBloomStore(t) + + // schema 1 + b1, _ := createBlockInStorage(t, store, "tenant", parseTime("2024-01-20 00:00"), 0x00000000, 0x0000ffff) + b2, _ := createBlockInStorage(t, store, "tenant", parseTime("2024-01-20 00:00"), 0x00010000, 0x0001ffff) + // schema 2 + b3, _ := createBlockInStorage(t, store, "tenant", parseTime("2024-02-05 00:00"), 0x00000000, 0x0000ffff) + b4, _ := createBlockInStorage(t, store, "tenant", parseTime("2024-02-05 00:00"), 0x00000000, 0x0001ffff) + + ctx := context.Background() + + // first call fetches two blocks from cache + blockDirs, err := store.FetchBlocks(ctx, []BlockRef{b1.BlockRef, b3.BlockRef}) + require.NoError(t, err) + require.Len(t, blockDirs, 2) + + require.ElementsMatch(t, []BlockRef{b1.BlockRef, b3.BlockRef}, []BlockRef{blockDirs[0].BlockRef, blockDirs[1].BlockRef}) + + // second call fetches two blocks from cache and two from storage + blockDirs, err = store.FetchBlocks(ctx, []BlockRef{b1.BlockRef, b2.BlockRef, b3.BlockRef, b4.BlockRef}) + require.NoError(t, err) + require.Len(t, blockDirs, 4) + + // Note the order: b1 and b2 come from cache, so they are in the beginning of the response + // Do we need to sort the response based on the request order of block refs? + require.ElementsMatch(t, + []BlockRef{b1.BlockRef, b3.BlockRef, b2.BlockRef, b4.BlockRef}, + []BlockRef{blockDirs[0].BlockRef, blockDirs[1].BlockRef, blockDirs[2].BlockRef, blockDirs[3].BlockRef}, + ) +}