(chore) Bloomshipper: Separate store and client (#11865)

**What this PR does / why we need it**:

This PR removes the `StoreAndClient` interface that was accepted by the
`BloomShipper`. Since the `BloomStore` had to not only implement the
`Store` interface, but also the `Client` interface, it caused
re-implementation of the same methods in different ways.

Now the shipper solely relies on the `Store` interface.

See individual commit messages for more context.

Tests have been rewritten from scratch and placed in their own
respective test files for store and client.

---------

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
pull/11756/head
Christian Haudum 2 years ago committed by GitHub
parent 2e3fa3b861
commit 73edf7a943
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 2
      go.mod
  2. 4
      pkg/bloomcompactor/bloomcompactor.go
  3. 51
      pkg/storage/chunk/client/testutils/inmemory_storage_client.go
  4. 662
      pkg/storage/stores/shipper/bloomshipper/client_test.go
  5. 4
      pkg/storage/stores/shipper/bloomshipper/fetcher.go
  6. 8
      pkg/storage/stores/shipper/bloomshipper/shipper.go
  7. 192
      pkg/storage/stores/shipper/bloomshipper/store.go
  8. 268
      pkg/storage/stores/shipper/bloomshipper/store_test.go

@ -119,7 +119,6 @@ require (
github.com/DmitriyVTitov/size v1.5.0
github.com/IBM/go-sdk-core/v5 v5.13.1
github.com/IBM/ibm-cos-sdk-go v1.10.0
github.com/aws/smithy-go v1.11.1
github.com/axiomhq/hyperloglog v0.0.0-20230201085229-3ddf4bad03dc
github.com/d4l3k/messagediff v1.2.1
github.com/efficientgo/core v1.0.0-rc.2
@ -183,6 +182,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.1 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.11.1 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.16.1 // indirect
github.com/aws/smithy-go v1.11.1 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect
github.com/cncf/udpa/go v0.0.0-20220112060539-c52dc94e7fbe // indirect

@ -38,7 +38,7 @@ type Compactor struct {
limits Limits
// temporary workaround until store has implemented read/write shipper interface
store bloomshipper.StoreAndClient
store bloomshipper.Store
sharding ShardingStrategy
@ -48,7 +48,7 @@ type Compactor struct {
func New(
cfg Config,
store bloomshipper.StoreAndClient,
store bloomshipper.Store,
sharding ShardingStrategy,
limits Limits,
logger log.Logger,

@ -33,9 +33,10 @@ const (
// MockStorage is a fake in-memory StorageClient.
type MockStorage struct {
*InMemoryObjectClient
mtx sync.RWMutex
tables map[string]*mockTable
objects map[string][]byte
schemaCfg config.SchemaConfig
numIndexWrites int
@ -43,6 +44,25 @@ type MockStorage struct {
mode MockStorageMode
}
// compiler check
var _ client.ObjectClient = &InMemoryObjectClient{}
type InMemoryObjectClient struct {
objects map[string][]byte
mtx sync.RWMutex
mode MockStorageMode
}
func NewInMemoryObjectClient() *InMemoryObjectClient {
return &InMemoryObjectClient{
objects: make(map[string][]byte),
}
}
func (m *InMemoryObjectClient) Internals() map[string][]byte {
return m.objects
}
type mockTable struct {
items map[string][]mockItem
write, read int64
@ -64,6 +84,7 @@ func ResetMockStorage() {
func NewMockStorage() *MockStorage {
if singleton == nil {
singleton = &MockStorage{
InMemoryObjectClient: NewInMemoryObjectClient(),
schemaCfg: config.SchemaConfig{
Configs: []config.PeriodConfig{
{
@ -73,8 +94,7 @@ func NewMockStorage() *MockStorage {
},
},
},
tables: map[string]*mockTable{},
objects: map[string][]byte{},
tables: map[string]*mockTable{},
}
}
return singleton
@ -109,6 +129,7 @@ func (*MockStorage) Stop() {
func (m *MockStorage) SetMode(mode MockStorageMode) {
m.mode = mode
m.InMemoryObjectClient.mode = mode
}
// ListTables implements StorageClient.
@ -370,7 +391,8 @@ func (m *MockStorage) query(ctx context.Context, query index.Query, callback fun
return nil
}
func (m *MockStorage) ObjectExists(_ context.Context, objectKey string) (bool, error) {
// ObjectExists implments client.ObjectClient
func (m *InMemoryObjectClient) ObjectExists(_ context.Context, objectKey string) (bool, error) {
m.mtx.RLock()
defer m.mtx.RUnlock()
@ -386,7 +408,8 @@ func (m *MockStorage) ObjectExists(_ context.Context, objectKey string) (bool, e
return true, nil
}
func (m *MockStorage) GetObject(_ context.Context, objectKey string) (io.ReadCloser, int64, error) {
// GetObject implements client.ObjectClient.
func (m *InMemoryObjectClient) GetObject(_ context.Context, objectKey string) (io.ReadCloser, int64, error) {
m.mtx.RLock()
defer m.mtx.RUnlock()
@ -402,7 +425,8 @@ func (m *MockStorage) GetObject(_ context.Context, objectKey string) (io.ReadClo
return io.NopCloser(bytes.NewReader(buf)), int64(len(buf)), nil
}
func (m *MockStorage) PutObject(_ context.Context, objectKey string, object io.ReadSeeker) error {
// PutObject implements client.ObjectClient.
func (m *InMemoryObjectClient) PutObject(_ context.Context, objectKey string, object io.ReadSeeker) error {
buf, err := io.ReadAll(object)
if err != nil {
return err
@ -419,7 +443,8 @@ func (m *MockStorage) PutObject(_ context.Context, objectKey string, object io.R
return nil
}
func (m *MockStorage) IsObjectNotFoundErr(err error) bool {
// IsObjectNotFoundErr implements client.ObjectClient.
func (m *InMemoryObjectClient) IsObjectNotFoundErr(err error) bool {
return errors.Is(err, errStorageObjectNotFound)
}
@ -427,9 +452,11 @@ func (m *MockStorage) IsChunkNotFoundErr(err error) bool {
return m.IsObjectNotFoundErr(err)
}
func (m *MockStorage) IsRetryableErr(error) bool { return false }
// IsRetryableErr implements client.ObjectClient.
func (m *InMemoryObjectClient) IsRetryableErr(error) bool { return false }
func (m *MockStorage) DeleteObject(_ context.Context, objectKey string) error {
// DeleteObject implements client.ObjectClient.
func (m *InMemoryObjectClient) DeleteObject(_ context.Context, objectKey string) error {
m.mtx.Lock()
defer m.mtx.Unlock()
@ -446,7 +473,7 @@ func (m *MockStorage) DeleteObject(_ context.Context, objectKey string) error {
}
// List implements chunk.ObjectClient.
func (m *MockStorage) List(_ context.Context, prefix, delimiter string) ([]client.StorageObject, []client.StorageCommonPrefix, error) {
func (m *InMemoryObjectClient) List(_ context.Context, prefix, delimiter string) ([]client.StorageObject, []client.StorageCommonPrefix, error) {
m.mtx.RLock()
defer m.mtx.RUnlock()
@ -494,6 +521,10 @@ func (m *MockStorage) List(_ context.Context, prefix, delimiter string) ([]clien
return storageObjects, commonPrefixes, nil
}
// Stop implements client.ObjectClient
func (*InMemoryObjectClient) Stop() {
}
type mockWriteBatch struct {
inserts []struct {
tableName, hashValue string

@ -1,43 +1,29 @@
package bloomshipper
import (
"archive/tar"
"bytes"
"context"
"encoding/json"
"io"
"fmt"
"os"
"path/filepath"
"strings"
"testing"
"time"
awsio "github.com/aws/smithy-go/io"
"github.com/go-kit/log"
"github.com/google/uuid"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/storage"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/chunk/cache"
"github.com/grafana/loki/pkg/storage/chunk/client/testutils"
"github.com/grafana/loki/pkg/storage/config"
bloomshipperconfig "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper/config"
)
const (
day = 24 * time.Hour
)
var (
// table 19627
fixedDay = Date(2023, time.September, 27, 0, 0, 0)
)
func Date(year int, month time.Month, day, hour, min, sec int) model.Time {
date := time.Date(year, month, day, hour, min, sec, 0, time.UTC)
return model.TimeFromUnixNano(date.UnixNano())
func parseTime(s string) model.Time {
t, err := time.Parse("2006-01-02 15:04", s)
if err != nil {
panic(err)
}
return model.TimeFromUnix(t.Unix())
}
func parseDayTime(s string) config.DayTime {
@ -50,462 +36,310 @@ func parseDayTime(s string) config.DayTime {
}
}
func Test_BloomClient_FetchMetas(t *testing.T) {
store := createStore(t)
var expected []Meta
// metas that belong to 1st schema stored in folder-1
// must not be present in results because it is outside of time range
createMetaInStorage(t, store, "19621", "tenantA", 0, 100, fixedDay.Add(-7*day))
// must be present in the results
expected = append(expected, createMetaInStorage(t, store, "19621", "tenantA", 0, 100, fixedDay.Add(-6*day)))
// must not be present in results because it belongs to another tenant
createMetaInStorage(t, store, "19621", "tenantB", 0, 100, fixedDay.Add(-6*day))
// must be present in the results
expected = append(expected, createMetaInStorage(t, store, "19621", "tenantA", 101, 200, fixedDay.Add(-6*day)))
// metas that belong to 2nd schema stored in folder-2
// must not be present in results because it's out of the time range
createMetaInStorage(t, store, "19626", "tenantA", 0, 100, fixedDay.Add(-1*day))
// must be present in the results
expected = append(expected, createMetaInStorage(t, store, "19625", "tenantA", 0, 100, fixedDay.Add(-2*day)))
// must not be present in results because it belongs to another tenant
createMetaInStorage(t, store, "19624", "tenantB", 0, 100, fixedDay.Add(-3*day))
searchParams := MetaSearchParams{
TenantID: "tenantA",
Keyspace: v1.NewBounds(50, 150),
Interval: NewInterval(fixedDay.Add(-6*day), fixedDay.Add(-1*day-1*time.Hour)),
func newMockBloomClient(t *testing.T) (*BloomClient, string) {
oc := testutils.NewInMemoryObjectClient()
dir := t.TempDir()
logger := log.NewLogfmtLogger(os.Stderr)
cfg := bloomStoreConfig{
workingDir: dir,
numWorkers: 3,
}
fetched, err := store.FetchMetas(context.Background(), searchParams)
client, err := NewBloomClient(cfg, oc, logger)
require.NoError(t, err)
return client, dir
}
require.Equal(t, len(expected), len(fetched))
for i := range expected {
require.Equal(t, expected[i].String(), fetched[i].String())
require.ElementsMatch(t, expected[i].Blocks, fetched[i].Blocks)
require.ElementsMatch(t, expected[i].Tombstones, fetched[i].Tombstones)
func putMeta(c *BloomClient, tenant string, start model.Time, minFp, maxFp model.Fingerprint) (Meta, error) {
step := int64((24 * time.Hour).Seconds())
day := start.Unix() / step
meta := Meta{
MetaRef: MetaRef{
Ref: Ref{
TenantID: tenant,
Bounds: v1.NewBounds(minFp, maxFp),
TableName: fmt.Sprintf("table_%d", day),
// Unused
// StartTimestamp: start,
// EndTimestamp: start.Add(12 * time.Hour),
},
},
Blocks: []BlockRef{},
Tombstones: []BlockRef{},
}
raw, _ := json.Marshal(meta)
return meta, c.client.PutObject(context.Background(), c.Meta(meta.MetaRef).Addr(), bytes.NewReader(raw))
}
resolved, _, err := store.ResolveMetas(context.Background(), searchParams)
func TestBloomClient_GetMeta(t *testing.T) {
c, _ := newMockBloomClient(t)
ctx := context.Background()
m, err := putMeta(c, "tenant", parseTime("2024-02-05 00:00"), 0x0000, 0xffff)
require.NoError(t, err)
var resolvedRefs []MetaRef
for _, refs := range resolved {
resolvedRefs = append(resolvedRefs, refs...)
}
for i := range resolvedRefs {
require.Equal(t, fetched[i].MetaRef, resolvedRefs[i])
}
t.Run("exists", func(t *testing.T) {
meta, err := c.GetMeta(ctx, m.MetaRef)
require.NoError(t, err)
require.Equal(t, meta, m)
})
t.Run("does not exist", func(t *testing.T) {
meta, err := c.GetMeta(ctx, MetaRef{})
require.Error(t, err)
require.True(t, c.client.IsObjectNotFoundErr(err))
require.Equal(t, meta, Meta{})
})
}
func Test_BloomClient_PutMeta(t *testing.T) {
tests := map[string]struct {
source Meta
expectedFilePath string
expectedStorage string
}{
"expected meta to be uploaded to the first folder": {
source: createMetaEntity("tenantA",
"table_19621",
0xff,
0xfff,
Date(2023, time.September, 21, 5, 0, 0),
Date(2023, time.September, 21, 6, 0, 0),
),
expectedStorage: "folder-1",
expectedFilePath: "bloom/table_19621/tenantA/metas/00000000000000ff-0000000000000fff-0",
},
"expected meta to be uploaded to the second folder": {
source: createMetaEntity("tenantA",
"table_19625",
200,
300,
Date(2023, time.September, 25, 0, 0, 0),
Date(2023, time.September, 25, 1, 0, 0),
),
expectedStorage: "folder-2",
expectedFilePath: "bloom/table_19625/tenantA/metas/00000000000000c8-000000000000012c-0",
},
}
for name, data := range tests {
t.Run(name, func(t *testing.T) {
bloomClient := createStore(t)
err := bloomClient.PutMeta(context.Background(), data.source)
require.NoError(t, err)
directory := bloomClient.storageConfig.NamedStores.Filesystem[data.expectedStorage].Directory
filePath := filepath.Join(directory, data.expectedFilePath)
require.FileExistsf(t, filePath, data.source.String())
content, err := os.ReadFile(filePath)
require.NoError(t, err)
result := Meta{}
err = json.Unmarshal(content, &result)
require.NoError(t, err)
require.Equal(t, data.source.Blocks, result.Blocks)
require.Equal(t, data.source.Tombstones, result.Tombstones)
})
}
func TestBloomClient_GetMetas(t *testing.T) {
c, _ := newMockBloomClient(t)
ctx := context.Background()
m1, err := putMeta(c, "tenant", parseTime("2024-02-05 00:00"), 0x0000, 0x0fff)
require.NoError(t, err)
m2, err := putMeta(c, "tenant", parseTime("2024-02-05 00:00"), 0x1000, 0xffff)
require.NoError(t, err)
t.Run("exists", func(t *testing.T) {
metas, err := c.GetMetas(ctx, []MetaRef{m1.MetaRef, m2.MetaRef})
require.NoError(t, err)
require.Equal(t, metas, []Meta{m1, m2})
})
t.Run("does not exist", func(t *testing.T) {
metas, err := c.GetMetas(ctx, []MetaRef{{}})
require.Error(t, err)
require.True(t, c.client.IsObjectNotFoundErr(err))
require.Equal(t, metas, []Meta{{}})
})
}
func Test_BloomClient_DeleteMeta(t *testing.T) {
tests := map[string]struct {
source Meta
expectedFilePath string
expectedStorage string
}{
"expected meta to be deleted from the first folder": {
source: createMetaEntity("tenantA",
"table_19621",
0xff,
0xfff,
Date(2023, time.September, 21, 5, 0, 0),
Date(2023, time.September, 21, 6, 0, 0),
),
expectedStorage: "folder-1",
expectedFilePath: "bloom/table_19621/tenantA/metas/00000000000000ff-0000000000000fff-0",
},
"expected meta to be delete from the second folder": {
source: createMetaEntity("tenantA",
"table_19625",
200,
300,
Date(2023, time.September, 25, 0, 0, 0),
Date(2023, time.September, 25, 1, 0, 0),
),
expectedStorage: "folder-2",
expectedFilePath: "bloom/table_19625/tenantA/metas/00000000000000c8-000000000000012c-0",
func TestBloomClient_PutMeta(t *testing.T) {
c, _ := newMockBloomClient(t)
ctx := context.Background()
meta := Meta{
MetaRef: MetaRef{
Ref: Ref{
TenantID: "tenant",
Bounds: v1.NewBounds(0x0000, 0xffff),
TableName: "table_1234",
// Unused
// StartTimestamp: start,
// EndTimestamp: start.Add(12 * time.Hour),
},
},
Blocks: []BlockRef{},
Tombstones: []BlockRef{},
}
for name, data := range tests {
t.Run(name, func(t *testing.T) {
bloomClient := createStore(t)
directory := bloomClient.storageConfig.NamedStores.Filesystem[data.expectedStorage].Directory
file := filepath.Join(directory, data.expectedFilePath)
// requires that Test_BloomClient_PutMeta does not fail
err := bloomClient.PutMeta(context.Background(), data.source)
require.NoError(t, err)
require.FileExists(t, file, data.source.String())
err := c.PutMeta(ctx, meta)
require.NoError(t, err)
err = bloomClient.DeleteMetas(context.Background(), []MetaRef{data.source.MetaRef})
require.NoError(t, err)
oc := c.client.(*testutils.InMemoryObjectClient)
stored := oc.Internals()
_, found := stored[c.Meta(meta.MetaRef).Addr()]
require.True(t, found)
require.NoFileExists(t, file, data.source.String())
})
}
fromStorage, err := c.GetMeta(ctx, meta.MetaRef)
require.NoError(t, err)
require.Equal(t, meta, fromStorage)
}
func Test_BloomClient_GetBlocks(t *testing.T) {
firstBlockRef := BlockRef{
Ref: Ref{
TenantID: "tenantA",
TableName: "schema_a_table_19621",
Bounds: v1.NewBounds(0xeeee, 0xffff),
StartTimestamp: Date(2023, time.September, 21, 5, 0, 0),
EndTimestamp: Date(2023, time.September, 21, 6, 0, 0),
Checksum: 1,
},
}
secondBlockRef := BlockRef{
Ref: Ref{
TenantID: "tenantA",
TableName: "schema_b_table_19624",
Bounds: v1.NewBounds(0xaaaa, 0xbbbb),
StartTimestamp: Date(2023, time.September, 24, 5, 0, 0),
EndTimestamp: Date(2023, time.September, 24, 6, 0, 0),
Checksum: 2,
},
}
func TestBloomClient_DeleteMetas(t *testing.T) {
c, _ := newMockBloomClient(t)
ctx := context.Background()
bloomClient := createStore(t)
m1, err := putMeta(c, "tenant", parseTime("2024-02-05 00:00"), 0x0000, 0xffff)
require.NoError(t, err)
m2, err := putMeta(c, "tenant", parseTime("2024-02-06 00:00"), 0x0000, 0xffff)
require.NoError(t, err)
m3, err := putMeta(c, "tenant", parseTime("2024-02-07 00:00"), 0x0000, 0xffff)
require.NoError(t, err)
oc := c.client.(*testutils.InMemoryObjectClient)
stored := oc.Internals()
_, found := stored[c.Meta(m1.MetaRef).Addr()]
require.True(t, found)
_, found = stored[c.Meta(m2.MetaRef).Addr()]
require.True(t, found)
_, found = stored[c.Meta(m3.MetaRef).Addr()]
require.True(t, found)
t.Run("all deleted", func(t *testing.T) {
err = c.DeleteMetas(ctx, []MetaRef{m1.MetaRef, m2.MetaRef})
require.NoError(t, err)
_, found = stored[c.Meta(m1.MetaRef).Addr()]
require.False(t, found)
_, found = stored[c.Meta(m2.MetaRef).Addr()]
require.False(t, found)
})
t.Run("some not found", func(t *testing.T) {
err = c.DeleteMetas(ctx, []MetaRef{m3.MetaRef, m1.MetaRef})
require.Error(t, err)
require.True(t, c.client.IsObjectNotFoundErr(err))
fsNamedStores := bloomClient.storageConfig.NamedStores.Filesystem
_, found = stored[c.Meta(m3.MetaRef).Addr()]
require.False(t, found)
})
}
firstBlockFullPath := NewPrefixedResolver(
fsNamedStores["folder-1"].Directory,
defaultKeyResolver{},
).Block(firstBlockRef).LocalPath()
_ = createBlockFile(t, firstBlockFullPath)
require.FileExists(t, firstBlockFullPath)
func putBlock(t *testing.T, c *BloomClient, tenant string, start model.Time, minFp, maxFp model.Fingerprint) (Block, error) {
step := int64((24 * time.Hour).Seconds())
day := start.Unix() / step
secondBlockFullPath := NewPrefixedResolver(
fsNamedStores["folder-2"].Directory,
defaultKeyResolver{},
).Block(secondBlockRef).LocalPath()
_ = createBlockFile(t, secondBlockFullPath)
require.FileExists(t, secondBlockFullPath)
tmpDir := t.TempDir()
fp, _ := os.CreateTemp(t.TempDir(), "*.tar.gz")
_, err := bloomClient.GetBlock(context.Background(), firstBlockRef)
blockWriter := v1.NewDirectoryBlockWriter(tmpDir)
err := blockWriter.Init()
require.NoError(t, err)
// firstBlockActualData, err := io.ReadAll(downloadedFirstBlock.Data)
// require.NoError(t, err)
// require.Equal(t, firstBlockData, string(firstBlockActualData))
_, err = bloomClient.GetBlock(context.Background(), secondBlockRef)
err = v1.TarGz(fp, v1.NewDirectoryBlockReader(tmpDir))
require.NoError(t, err)
// secondBlockActualData, err := io.ReadAll(downloadedSecondBlock.Data)
// require.NoError(t, err)
// require.Equal(t, secondBlockData, string(secondBlockActualData))
}
func Test_BloomClient_PutBlocks(t *testing.T) {
bloomClient := createStore(t)
_, _ = fp.Seek(0, 0)
block := Block{
BlockRef: BlockRef{
Ref: Ref{
TenantID: "tenantA",
TableName: "table_19621",
Bounds: v1.NewBounds(0xeeee, 0xffff),
StartTimestamp: Date(2023, time.September, 21, 5, 0, 0),
EndTimestamp: Date(2023, time.September, 21, 6, 0, 0),
Checksum: 1,
TenantID: tenant,
Bounds: v1.NewBounds(minFp, maxFp),
TableName: fmt.Sprintf("table_%d", day),
StartTimestamp: start,
EndTimestamp: start.Add(12 * time.Hour),
},
},
Data: awsio.ReadSeekNopCloser{ReadSeeker: bytes.NewReader([]byte("data"))},
Data: fp,
}
err := bloomClient.PutBlock(context.Background(), block)
require.NoError(t, err)
_ = bloomClient.storeDo(block.StartTimestamp, func(s *bloomStoreEntry) error {
c := s.bloomClient.(*BloomClient)
rc, _, err := c.client.GetObject(context.Background(), block.BlockRef.String())
require.NoError(t, err)
data, err := io.ReadAll(rc)
require.NoError(t, err)
require.Equal(t, "data", string(data))
return nil
})
return block, c.client.PutObject(context.Background(), c.Block(block.BlockRef).Addr(), block.Data)
}
func Test_BloomClient_DeleteBlocks(t *testing.T) {
block := BlockRef{
Ref: Ref{
TenantID: "tenantA",
TableName: "table_19621",
Bounds: v1.NewBounds(0xeeee, 0xffff),
StartTimestamp: Date(2023, time.September, 21, 5, 0, 0),
EndTimestamp: Date(2023, time.September, 21, 6, 0, 0),
Checksum: 1,
},
}
bloomClient := createStore(t)
fsNamedStores := bloomClient.storageConfig.NamedStores.Filesystem
blockFullPath := NewPrefixedResolver(
fsNamedStores["folder-1"].Directory,
defaultKeyResolver{},
).Block(block).LocalPath()
_ = createBlockFile(t, blockFullPath)
require.FileExists(t, blockFullPath)
func TestBloomClient_GetBlock(t *testing.T) {
c, _ := newMockBloomClient(t)
ctx := context.Background()
err := bloomClient.DeleteBlocks(context.Background(), []BlockRef{block})
b, err := putBlock(t, c, "tenant", parseTime("2024-02-05 00:00"), 0x0000, 0xffff)
require.NoError(t, err)
require.NoFileExists(t, blockFullPath)
t.Run("exists", func(t *testing.T) {
blockDir, err := c.GetBlock(ctx, b.BlockRef)
require.NoError(t, err)
require.Equal(t, b.BlockRef, blockDir.BlockRef)
})
t.Run("does not exist", func(t *testing.T) {
blockDir, err := c.GetBlock(ctx, BlockRef{})
require.Error(t, err)
require.True(t, c.client.IsObjectNotFoundErr(err))
require.Equal(t, blockDir, BlockDirectory{})
})
}
func createBlockFile(t *testing.T, dst string) string {
err := os.MkdirAll(dst[:strings.LastIndex(dst, "/")], 0755)
require.NoError(t, err)
fileContent := uuid.NewString()
func TestBloomClient_GetBlocks(t *testing.T) {
c, _ := newMockBloomClient(t)
ctx := context.Background()
src := filepath.Join(t.TempDir(), fileContent)
err = os.WriteFile(src, []byte(fileContent), 0700)
b1, err := putBlock(t, c, "tenant", parseTime("2024-02-05 00:00"), 0x0000, 0x0fff)
require.NoError(t, err)
fp, err := os.OpenFile(dst, os.O_CREATE|os.O_RDWR, 0700)
b2, err := putBlock(t, c, "tenant", parseTime("2024-02-05 00:00"), 0x1000, 0xffff)
require.NoError(t, err)
defer fp.Close()
TarGz(t, fp, src)
t.Run("exists", func(t *testing.T) {
blockDirs, err := c.GetBlocks(ctx, []BlockRef{b1.BlockRef, b2.BlockRef})
require.NoError(t, err)
require.Equal(t, []BlockRef{b1.BlockRef, b2.BlockRef}, []BlockRef{blockDirs[0].BlockRef, blockDirs[1].BlockRef})
})
return fileContent
t.Run("does not exist", func(t *testing.T) {
_, err := c.GetBlocks(ctx, []BlockRef{{}})
require.Error(t, err)
require.True(t, c.client.IsObjectNotFoundErr(err))
})
}
func TarGz(t *testing.T, dst io.Writer, file string) {
src, err := os.Open(file)
require.NoError(t, err)
defer src.Close()
gzipper := chunkenc.GetWriterPool(chunkenc.EncGZIP).GetWriter(dst)
defer gzipper.Close()
func TestBloomClient_PutBlock(t *testing.T) {
c, _ := newMockBloomClient(t)
ctx := context.Background()
tarballer := tar.NewWriter(gzipper)
defer tarballer.Close()
start := parseTime("2024-02-05 12:00")
for _, f := range []*os.File{src} {
info, err := f.Stat()
require.NoError(t, err)
tmpDir := t.TempDir()
fp, _ := os.CreateTemp(t.TempDir(), "*.tar.gz")
header, err := tar.FileInfoHeader(info, f.Name())
require.NoError(t, err)
err = tarballer.WriteHeader(header)
require.NoError(t, err)
blockWriter := v1.NewDirectoryBlockWriter(tmpDir)
err := blockWriter.Init()
require.NoError(t, err)
_, err = io.Copy(tarballer, f)
require.NoError(t, err)
}
}
err = v1.TarGz(fp, v1.NewDirectoryBlockReader(tmpDir))
require.NoError(t, err)
func Test_ParseMetaKey(t *testing.T) {
tests := map[string]struct {
objectKey string
expectedRef MetaRef
expectedErr string
}{
"ValidObjectKey": {
objectKey: "bloom/table/tenant/metas/aaa-bbb-abcdef",
expectedRef: MetaRef{
Ref: Ref{
TenantID: "tenant",
TableName: "table",
Bounds: v1.NewBounds(0xaaa, 0xbbb),
StartTimestamp: 0, // ignored
EndTimestamp: 0, // ignored
Checksum: 0xabcdef,
},
block := Block{
BlockRef: BlockRef{
Ref: Ref{
TenantID: "tenant",
Bounds: v1.NewBounds(0x0000, 0xffff),
TableName: "table_1234",
StartTimestamp: start,
EndTimestamp: start.Add(12 * time.Hour),
},
},
"InvalidObjectKeyDelimiterCount": {
objectKey: "invalid/key/with/too/many/objectKeyWithoutDelimiters",
expectedRef: MetaRef{},
expectedErr: "failed to split filename parts",
},
"InvalidMinFingerprint": {
objectKey: "invalid/folder/key/metas/zzz-bbb-abcdef",
expectedErr: "failed to parse bounds",
},
"InvalidMaxFingerprint": {
objectKey: "invalid/folder/key/metas/123-zzz-abcdef",
expectedErr: "failed to parse bounds",
},
"InvalidChecksum": {
objectKey: "invalid/folder/key/metas/aaa-bbb-ghijklm",
expectedErr: "failed to parse checksum",
},
Data: fp,
}
for name, data := range tests {
t.Run(name, func(t *testing.T) {
actualRef, err := defaultKeyResolver{}.ParseMetaKey(key(data.objectKey))
if data.expectedErr != "" {
require.ErrorContains(t, err, data.expectedErr)
return
}
require.NoError(t, err)
require.Equal(t, data.expectedRef, actualRef)
})
}
}
func createStore(t *testing.T) *BloomStore {
periodicConfigs := createPeriodConfigs()
namedStores := storage.NamedStores{
Filesystem: map[string]storage.NamedFSConfig{
"folder-1": {Directory: t.TempDir()},
"folder-2": {Directory: t.TempDir()},
}}
//required to populate StoreType map in named config
require.NoError(t, namedStores.Validate())
storageConfig := storage.Config{
NamedStores: namedStores,
BloomShipperConfig: bloomshipperconfig.Config{
WorkingDirectory: t.TempDir(),
BlocksDownloadingQueue: bloomshipperconfig.DownloadingQueueConfig{
WorkersCount: 1,
},
},
}
err = c.PutBlock(ctx, block)
require.NoError(t, err)
metrics := storage.NewClientMetrics()
t.Cleanup(metrics.Unregister)
store, err := NewBloomStore(periodicConfigs, storageConfig, metrics, cache.NewNoopCache(), nil, log.NewNopLogger())
oc := c.client.(*testutils.InMemoryObjectClient)
stored := oc.Internals()
_, found := stored[c.Block(block.BlockRef).Addr()]
require.True(t, found)
blockDir, err := c.GetBlock(ctx, block.BlockRef)
require.NoError(t, err)
return store
}
func createPeriodConfigs() []config.PeriodConfig {
periodicConfigs := []config.PeriodConfig{
{
ObjectType: "folder-1",
// from 2023-09-20: table range [19620:19623]
From: parseDayTime("2023-09-20"),
IndexTables: config.IndexPeriodicTableConfig{
PeriodicTableConfig: config.PeriodicTableConfig{
Period: day,
// TODO(chaudum): Integrate {,Parse}MetaKey into schema config
// Prefix: "schema_a_table_",
}},
},
{
ObjectType: "folder-2",
// from 2023-09-24: table range [19624:19627]
From: parseDayTime("2023-09-24"),
IndexTables: config.IndexPeriodicTableConfig{
PeriodicTableConfig: config.PeriodicTableConfig{
Period: day,
// TODO(chaudum): Integrate {,Parse}MetaKey into schema config
// Prefix: "schema_b_table_",
}},
},
}
return periodicConfigs
require.Equal(t, block.BlockRef, blockDir.BlockRef)
}
func createMetaInStorage(t *testing.T, s Client, tableName string, tenant string, minFingerprint uint64, maxFingerprint uint64, start model.Time) Meta {
end := start.Add(12 * time.Hour)
func TestBloomClient_DeleteBlocks(t *testing.T) {
c, _ := newMockBloomClient(t)
ctx := context.Background()
meta := createMetaEntity(tenant, tableName, minFingerprint, maxFingerprint, start, end)
err := s.PutMeta(context.Background(), meta)
b1, err := putBlock(t, c, "tenant", parseTime("2024-02-05 00:00"), 0x0000, 0xffff)
require.NoError(t, err)
b2, err := putBlock(t, c, "tenant", parseTime("2024-02-06 00:00"), 0x0000, 0xffff)
require.NoError(t, err)
b3, err := putBlock(t, c, "tenant", parseTime("2024-02-07 00:00"), 0x0000, 0xffff)
require.NoError(t, err)
t.Log("create meta in store", meta.String())
return meta
}
func createMetaEntity(
tenant string,
tableName string,
minFingerprint uint64,
maxFingerprint uint64,
startTimestamp model.Time,
endTimestamp model.Time,
) Meta {
return Meta{
MetaRef: MetaRef{
Ref: Ref{
TenantID: tenant,
TableName: tableName,
Bounds: v1.NewBounds(model.Fingerprint(minFingerprint), model.Fingerprint(maxFingerprint)),
StartTimestamp: startTimestamp,
EndTimestamp: endTimestamp,
},
},
Tombstones: []BlockRef{
{
Ref: Ref{
TenantID: tenant,
Bounds: v1.NewBounds(model.Fingerprint(minFingerprint), model.Fingerprint(maxFingerprint)),
StartTimestamp: startTimestamp,
EndTimestamp: endTimestamp,
},
},
},
Blocks: []BlockRef{
{
Ref: Ref{
TenantID: tenant,
Bounds: v1.NewBounds(model.Fingerprint(minFingerprint), model.Fingerprint(maxFingerprint)),
StartTimestamp: startTimestamp,
EndTimestamp: endTimestamp,
},
},
},
}
oc := c.client.(*testutils.InMemoryObjectClient)
stored := oc.Internals()
_, found := stored[c.Block(b1.BlockRef).Addr()]
require.True(t, found)
_, found = stored[c.Block(b2.BlockRef).Addr()]
require.True(t, found)
_, found = stored[c.Block(b3.BlockRef).Addr()]
require.True(t, found)
t.Run("all deleted", func(t *testing.T) {
err = c.DeleteBlocks(ctx, []BlockRef{b1.BlockRef, b2.BlockRef})
require.NoError(t, err)
_, found = stored[c.Block(b1.BlockRef).Addr()]
require.False(t, found)
_, found = stored[c.Block(b2.BlockRef).Addr()]
require.False(t, found)
})
t.Run("some not found", func(t *testing.T) {
err = c.DeleteBlocks(ctx, []BlockRef{b3.BlockRef, b1.BlockRef})
require.Error(t, err)
require.True(t, c.client.IsObjectNotFoundErr(err))
_, found = stored[c.Block(b3.BlockRef).Addr()]
require.False(t, found)
})
}

@ -343,7 +343,9 @@ func (q *downloadQueue[T, R]) do(ctx context.Context, task downloadTask[T, R]) {
q.mu.LockKey(task.key)
defer func() {
err := q.mu.UnlockKey(task.key)
level.Error(q.logger).Log("msg", "failed to unlock key in block lock", "err", err)
if err != nil {
level.Error(q.logger).Log("msg", "failed to unlock key in block lock", "key", task.key, "err", err)
}
}()
q.process(ctx, task)

@ -37,13 +37,7 @@ type Limits interface {
BloomGatewayBlocksDownloadingParallelism(tenantID string) int
}
// TODO(chaudum): resolve and rip out
type StoreAndClient interface {
Store
Client
}
func NewShipper(client StoreAndClient, config config.Config, _ Limits, logger log.Logger, _ prometheus.Registerer) (*Shipper, error) {
func NewShipper(client Store, config config.Config, _ Limits, logger log.Logger, _ prometheus.Registerer) (*Shipper, error) {
logger = log.With(logger, "component", "bloom-shipper")
return &Shipper{
store: client,

@ -30,9 +30,6 @@ type bloomStoreConfig struct {
numWorkers int
}
// Compiler check to ensure bloomStoreEntry implements the Client interface
var _ Client = &bloomStoreEntry{}
// Compiler check to ensure bloomStoreEntry implements the Store interface
var _ Store = &bloomStoreEntry{}
@ -78,6 +75,12 @@ func (b *bloomStoreEntry) ResolveMetas(ctx context.Context, params MetaSearchPar
refs = append(refs, metaRef)
}
}
// return empty metaRefs/fetchers if there are no refs
if len(refs) == 0 {
return [][]MetaRef{}, []*Fetcher{}, nil
}
return [][]MetaRef{refs}, []*Fetcher{b.fetcher}, nil
}
@ -112,55 +115,12 @@ func (b *bloomStoreEntry) Fetcher(_ model.Time) *Fetcher {
return b.fetcher
}
// DeleteBlocks implements Client.
func (b *bloomStoreEntry) DeleteBlocks(ctx context.Context, refs []BlockRef) error {
return b.bloomClient.DeleteBlocks(ctx, refs)
}
// DeleteMeta implements Client.
func (b *bloomStoreEntry) DeleteMetas(ctx context.Context, refs []MetaRef) error {
return b.bloomClient.DeleteMetas(ctx, refs)
}
// GetBlock implements Client.
func (b *bloomStoreEntry) GetBlock(ctx context.Context, ref BlockRef) (BlockDirectory, error) {
return b.bloomClient.GetBlock(ctx, ref)
}
// GetBlocks implements Client.
func (b *bloomStoreEntry) GetBlocks(ctx context.Context, refs []BlockRef) ([]BlockDirectory, error) {
return b.fetcher.FetchBlocks(ctx, refs)
}
// GetMeta implements Client.
func (b *bloomStoreEntry) GetMeta(ctx context.Context, ref MetaRef) (Meta, error) {
return b.bloomClient.GetMeta(ctx, ref)
}
// GetMetas implements Client.
func (b *bloomStoreEntry) GetMetas(ctx context.Context, refs []MetaRef) ([]Meta, error) {
return b.fetcher.FetchMetas(ctx, refs)
}
// PutBlocks implements Client.
func (b *bloomStoreEntry) PutBlock(ctx context.Context, block Block) error {
return b.bloomClient.PutBlock(ctx, block)
}
// PutMeta implements Client.
func (b *bloomStoreEntry) PutMeta(ctx context.Context, meta Meta) error {
return b.bloomClient.PutMeta(ctx, meta)
}
// Stop implements Client.
// Stop implements Store.
func (b bloomStoreEntry) Stop() {
b.bloomClient.Stop()
b.fetcher.Close()
}
// Compiler check to ensure BloomStore implements the Client interface
var _ Client = &BloomStore{}
// Compiler check to ensure BloomStore implements the Store interface
var _ Store = &BloomStore{}
@ -267,8 +227,9 @@ func (b *BloomStore) Fetcher(ts model.Time) *Fetcher {
// ResolveMetas implements Store.
func (b *BloomStore) ResolveMetas(ctx context.Context, params MetaSearchParams) ([][]MetaRef, []*Fetcher, error) {
var refs [][]MetaRef
var fetchers []*Fetcher
refs := make([][]MetaRef, 0, len(b.stores))
fetchers := make([]*Fetcher, 0, len(b.stores))
err := b.forStores(ctx, params.Interval, func(innerCtx context.Context, interval Interval, store Store) error {
newParams := params
newParams.Interval = interval
@ -276,10 +237,14 @@ func (b *BloomStore) ResolveMetas(ctx context.Context, params MetaSearchParams)
if err != nil {
return err
}
refs = append(refs, metas...)
fetchers = append(fetchers, fetcher...)
if len(metas) > 0 {
// only append if there are any results
refs = append(refs, metas...)
fetchers = append(fetchers, fetcher...)
}
return nil
})
return refs, fetchers, err
}
@ -293,70 +258,22 @@ func (b *BloomStore) FetchMetas(ctx context.Context, params MetaSearchParams) ([
return nil, errors.New("metaRefs and fetchers have unequal length")
}
var metas []Meta
metas := []Meta{}
for i := range fetchers {
res, err := fetchers[i].FetchMetas(ctx, metaRefs[i])
if err != nil {
return nil, err
}
metas = append(metas, res...)
if len(res) > 0 {
metas = append(metas, res...)
}
}
return metas, nil
}
// FetchBlocks implements Store.
func (b *BloomStore) FetchBlocks(ctx context.Context, refs []BlockRef) ([]BlockDirectory, error) {
return b.GetBlocks(ctx, refs)
}
// DeleteBlocks implements Client.
func (b *BloomStore) DeleteBlocks(ctx context.Context, refs []BlockRef) error {
for _, ref := range refs {
err := b.storeDo(
ref.StartTimestamp,
func(s *bloomStoreEntry) error {
return s.DeleteBlocks(ctx, []BlockRef{ref})
},
)
if err != nil {
return err
}
}
return nil
}
// DeleteMetas implements Client.
func (b *BloomStore) DeleteMetas(ctx context.Context, refs []MetaRef) error {
for _, ref := range refs {
err := b.storeDo(
ref.StartTimestamp,
func(s *bloomStoreEntry) error {
return s.DeleteMetas(ctx, []MetaRef{ref})
},
)
if err != nil {
return err
}
}
return nil
}
// GetBlock implements Client.
func (b *BloomStore) GetBlock(ctx context.Context, ref BlockRef) (BlockDirectory, error) {
res := make([]BlockDirectory, 1)
err := b.storeDo(ref.StartTimestamp, func(s *bloomStoreEntry) error {
block, err := s.GetBlock(ctx, ref)
if err != nil {
return err
}
res[0] = block
return nil
})
return res[0], err
}
func (b *BloomStore) FetchBlocks(ctx context.Context, blocks []BlockRef) ([]BlockDirectory, error) {
// GetBlocks implements Client.
func (b *BloomStore) GetBlocks(ctx context.Context, blocks []BlockRef) ([]BlockDirectory, error) {
var refs [][]BlockRef
var fetchers []*Fetcher
@ -392,72 +309,7 @@ func (b *BloomStore) GetBlocks(ctx context.Context, blocks []BlockRef) ([]BlockD
return results, nil
}
// GetMeta implements Client.
func (b *BloomStore) GetMeta(ctx context.Context, ref MetaRef) (Meta, error) {
res := make([]Meta, 1)
err := b.storeDo(ref.StartTimestamp, func(s *bloomStoreEntry) error {
meta, err := s.GetMeta(ctx, ref)
if err != nil {
return err
}
res[0] = meta
return nil
})
return res[0], err
}
// GetMetas implements Client.
func (b *BloomStore) GetMetas(ctx context.Context, metas []MetaRef) ([]Meta, error) {
var refs [][]MetaRef
var fetchers []*Fetcher
for i := len(b.stores) - 1; i >= 0; i-- {
s := b.stores[i]
from, through := s.start, model.Latest
if i < len(b.stores)-1 {
through = b.stores[i+1].start
}
var res []MetaRef
for _, meta := range metas {
if meta.StartTimestamp >= from && meta.StartTimestamp < through {
res = append(res, meta)
}
}
if len(res) > 0 {
refs = append(refs, res)
fetchers = append(fetchers, s.Fetcher(s.start))
}
}
results := make([]Meta, 0, len(metas))
for i := range fetchers {
res, err := fetchers[i].FetchMetas(ctx, refs[i])
results = append(results, res...)
if err != nil {
return results, err
}
}
return results, nil
}
// PutBlock implements Client.
func (b *BloomStore) PutBlock(ctx context.Context, block Block) error {
return b.storeDo(block.StartTimestamp, func(s *bloomStoreEntry) error {
return s.PutBlock(ctx, block)
})
}
// PutMeta implements Client.
func (b *BloomStore) PutMeta(ctx context.Context, meta Meta) error {
return b.storeDo(meta.StartTimestamp, func(s *bloomStoreEntry) error {
return s.PutMeta(ctx, meta)
})
}
// Stop implements Client.
// Stop implements Store.
func (b *BloomStore) Stop() {
for _, s := range b.stores {
s.Stop()

@ -0,0 +1,268 @@
package bloomshipper
import (
"bytes"
"context"
"encoding/json"
"os"
"testing"
"time"
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/storage"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/chunk/cache"
storageconfig "github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper/config"
)
func newMockBloomStore(t *testing.T) (*BloomStore, string) {
workDir := t.TempDir()
periodicConfigs := []storageconfig.PeriodConfig{
{
ObjectType: storageconfig.StorageTypeInMemory,
From: parseDayTime("2024-01-01"),
IndexTables: storageconfig.IndexPeriodicTableConfig{
PeriodicTableConfig: storageconfig.PeriodicTableConfig{
Period: 24 * time.Hour,
// TODO(chaudum): Integrate {,Parse}MetaKey into schema config
// Prefix: "schema_a_table_",
}},
},
{
ObjectType: storageconfig.StorageTypeInMemory,
From: parseDayTime("2024-02-01"),
IndexTables: storageconfig.IndexPeriodicTableConfig{
PeriodicTableConfig: storageconfig.PeriodicTableConfig{
Period: 24 * time.Hour,
// TODO(chaudum): Integrate {,Parse}MetaKey into schema config
// Prefix: "schema_b_table_",
}},
},
}
storageConfig := storage.Config{
BloomShipperConfig: config.Config{
WorkingDirectory: workDir,
BlocksDownloadingQueue: config.DownloadingQueueConfig{
WorkersCount: 1,
},
BlocksCache: config.BlocksCacheConfig{
EmbeddedCacheConfig: cache.EmbeddedCacheConfig{
MaxSizeItems: 1000,
TTL: 1 * time.Hour,
},
},
},
}
metrics := storage.NewClientMetrics()
t.Cleanup(metrics.Unregister)
logger := log.NewLogfmtLogger(os.Stderr)
metasCache := cache.NewMockCache()
blocksCache := NewBlocksCache(storageConfig.BloomShipperConfig, prometheus.NewPedanticRegistry(), logger)
store, err := NewBloomStore(periodicConfigs, storageConfig, metrics, metasCache, blocksCache, logger)
require.NoError(t, err)
t.Cleanup(store.Stop)
return store, workDir
}
func createMetaInStorage(store *BloomStore, tenant string, start model.Time, minFp, maxFp model.Fingerprint) (Meta, error) {
meta := Meta{
MetaRef: MetaRef{
Ref: Ref{
TenantID: tenant,
Bounds: v1.NewBounds(minFp, maxFp),
// Unused
// StartTimestamp: start,
// EndTimestamp: start.Add(12 * time.Hour),
},
},
Blocks: []BlockRef{},
Tombstones: []BlockRef{},
}
err := store.storeDo(start, func(s *bloomStoreEntry) error {
raw, _ := json.Marshal(meta)
meta.MetaRef.Ref.TableName = tablesForRange(s.cfg, NewInterval(start, start.Add(12*time.Hour)))[0]
return s.objectClient.PutObject(context.Background(), s.Meta(meta.MetaRef).Addr(), bytes.NewReader(raw))
})
return meta, err
}
func createBlockInStorage(t *testing.T, store *BloomStore, tenant string, start model.Time, minFp, maxFp model.Fingerprint) (Block, error) {
tmpDir := t.TempDir()
fp, _ := os.CreateTemp(t.TempDir(), "*.tar.gz")
blockWriter := v1.NewDirectoryBlockWriter(tmpDir)
err := blockWriter.Init()
require.NoError(t, err)
err = v1.TarGz(fp, v1.NewDirectoryBlockReader(tmpDir))
require.NoError(t, err)
_, _ = fp.Seek(0, 0)
block := Block{
BlockRef: BlockRef{
Ref: Ref{
TenantID: tenant,
Bounds: v1.NewBounds(minFp, maxFp),
StartTimestamp: start,
EndTimestamp: start.Add(12 * time.Hour),
},
},
Data: fp,
}
err = store.storeDo(start, func(s *bloomStoreEntry) error {
block.BlockRef.Ref.TableName = tablesForRange(s.cfg, NewInterval(start, start.Add(12*time.Hour)))[0]
return s.objectClient.PutObject(context.Background(), s.Block(block.BlockRef).Addr(), block.Data)
})
return block, err
}
func TestBloomStore_ResolveMetas(t *testing.T) {
store, _ := newMockBloomStore(t)
// schema 1
// outside of interval, outside of bounds
_, _ = createMetaInStorage(store, "tenant", parseTime("2024-01-19 00:00"), 0x00010000, 0x0001ffff)
// outside of interval, inside of bounds
_, _ = createMetaInStorage(store, "tenant", parseTime("2024-01-19 00:00"), 0x00000000, 0x0000ffff)
// inside of interval, outside of bounds
_, _ = createMetaInStorage(store, "tenant", parseTime("2024-01-20 00:00"), 0x00010000, 0x0001ffff)
// inside of interval, inside of bounds
m1, _ := createMetaInStorage(store, "tenant", parseTime("2024-01-20 00:00"), 0x00000000, 0x0000ffff)
// schema 2
// inside of interval, inside of bounds
m2, _ := createMetaInStorage(store, "tenant", parseTime("2024-02-05 00:00"), 0x00000000, 0x0000ffff)
// inside of interval, outside of bounds
_, _ = createMetaInStorage(store, "tenant", parseTime("2024-02-05 00:00"), 0x00010000, 0x0001ffff)
// outside of interval, inside of bounds
_, _ = createMetaInStorage(store, "tenant", parseTime("2024-02-11 00:00"), 0x00000000, 0x0000ffff)
// outside of interval, outside of bounds
_, _ = createMetaInStorage(store, "tenant", parseTime("2024-02-11 00:00"), 0x00010000, 0x0001ffff)
t.Run("tenant matches", func(t *testing.T) {
ctx := context.Background()
params := MetaSearchParams{
"tenant",
NewInterval(parseTime("2024-01-20 00:00"), parseTime("2024-02-10 00:00")),
v1.NewBounds(0x00000000, 0x0000ffff),
}
refs, fetchers, err := store.ResolveMetas(ctx, params)
require.NoError(t, err)
require.Len(t, refs, 2)
require.Len(t, fetchers, 2)
require.Equal(t, [][]MetaRef{{m1.MetaRef}, {m2.MetaRef}}, refs)
})
t.Run("tenant does not match", func(t *testing.T) {
ctx := context.Background()
params := MetaSearchParams{
"other",
NewInterval(parseTime("2024-01-20 00:00"), parseTime("2024-02-10 00:00")),
v1.NewBounds(0x00000000, 0x0000ffff),
}
refs, fetchers, err := store.ResolveMetas(ctx, params)
require.NoError(t, err)
require.Len(t, refs, 0)
require.Len(t, fetchers, 0)
require.Equal(t, [][]MetaRef{}, refs)
})
}
func TestBloomStore_FetchMetas(t *testing.T) {
store, _ := newMockBloomStore(t)
// schema 1
// outside of interval, outside of bounds
_, _ = createMetaInStorage(store, "tenant", parseTime("2024-01-19 00:00"), 0x00010000, 0x0001ffff)
// outside of interval, inside of bounds
_, _ = createMetaInStorage(store, "tenant", parseTime("2024-01-19 00:00"), 0x00000000, 0x0000ffff)
// inside of interval, outside of bounds
_, _ = createMetaInStorage(store, "tenant", parseTime("2024-01-20 00:00"), 0x00010000, 0x0001ffff)
// inside of interval, inside of bounds
m1, _ := createMetaInStorage(store, "tenant", parseTime("2024-01-20 00:00"), 0x00000000, 0x0000ffff)
// schema 2
// inside of interval, inside of bounds
m2, _ := createMetaInStorage(store, "tenant", parseTime("2024-02-05 00:00"), 0x00000000, 0x0000ffff)
// inside of interval, outside of bounds
_, _ = createMetaInStorage(store, "tenant", parseTime("2024-02-05 00:00"), 0x00010000, 0x0001ffff)
// outside of interval, inside of bounds
_, _ = createMetaInStorage(store, "tenant", parseTime("2024-02-11 00:00"), 0x00000000, 0x0000ffff)
// outside of interval, outside of bounds
_, _ = createMetaInStorage(store, "tenant", parseTime("2024-02-11 00:00"), 0x00010000, 0x0001ffff)
t.Run("tenant matches", func(t *testing.T) {
ctx := context.Background()
params := MetaSearchParams{
"tenant",
NewInterval(parseTime("2024-01-20 00:00"), parseTime("2024-02-10 00:00")),
v1.NewBounds(0x00000000, 0x0000ffff),
}
metas, err := store.FetchMetas(ctx, params)
require.NoError(t, err)
require.Len(t, metas, 2)
require.Equal(t, []Meta{m1, m2}, metas)
})
t.Run("tenant does not match", func(t *testing.T) {
ctx := context.Background()
params := MetaSearchParams{
"other",
NewInterval(parseTime("2024-01-20 00:00"), parseTime("2024-02-10 00:00")),
v1.NewBounds(0x00000000, 0x0000ffff),
}
metas, err := store.FetchMetas(ctx, params)
require.NoError(t, err)
require.Len(t, metas, 0)
require.Equal(t, []Meta{}, metas)
})
}
func TestBloomStore_FetchBlocks(t *testing.T) {
store, _ := newMockBloomStore(t)
// schema 1
b1, _ := createBlockInStorage(t, store, "tenant", parseTime("2024-01-20 00:00"), 0x00000000, 0x0000ffff)
b2, _ := createBlockInStorage(t, store, "tenant", parseTime("2024-01-20 00:00"), 0x00010000, 0x0001ffff)
// schema 2
b3, _ := createBlockInStorage(t, store, "tenant", parseTime("2024-02-05 00:00"), 0x00000000, 0x0000ffff)
b4, _ := createBlockInStorage(t, store, "tenant", parseTime("2024-02-05 00:00"), 0x00000000, 0x0001ffff)
ctx := context.Background()
// first call fetches two blocks from cache
blockDirs, err := store.FetchBlocks(ctx, []BlockRef{b1.BlockRef, b3.BlockRef})
require.NoError(t, err)
require.Len(t, blockDirs, 2)
require.ElementsMatch(t, []BlockRef{b1.BlockRef, b3.BlockRef}, []BlockRef{blockDirs[0].BlockRef, blockDirs[1].BlockRef})
// second call fetches two blocks from cache and two from storage
blockDirs, err = store.FetchBlocks(ctx, []BlockRef{b1.BlockRef, b2.BlockRef, b3.BlockRef, b4.BlockRef})
require.NoError(t, err)
require.Len(t, blockDirs, 4)
// Note the order: b1 and b2 come from cache, so they are in the beginning of the response
// Do we need to sort the response based on the request order of block refs?
require.ElementsMatch(t,
[]BlockRef{b1.BlockRef, b3.BlockRef, b2.BlockRef, b4.BlockRef},
[]BlockRef{blockDirs[0].BlockRef, blockDirs[1].BlockRef, blockDirs[2].BlockRef, blockDirs[3].BlockRef},
)
}
Loading…
Cancel
Save