Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
loki/pkg/bloomgateway/processor_test.go

125 lines
3.1 KiB

package bloomgateway
import (
"context"
"math/rand"
"sync"
"testing"
"time"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"github.com/grafana/loki/pkg/logql/syntax"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
)
var _ store = &dummyStore{}
type dummyStore struct {
metas []bloomshipper.Meta
blocks []bloomshipper.BlockRef
querieres []bloomshipper.BlockQuerierWithFingerprintRange
}
func (s *dummyStore) ResolveMetas(_ context.Context, _ bloomshipper.MetaSearchParams) ([][]bloomshipper.MetaRef, []*bloomshipper.Fetcher, error) {
//TODO(chaudum) Filter metas based on search params
refs := make([]bloomshipper.MetaRef, 0, len(s.metas))
for _, meta := range s.metas {
refs = append(refs, meta.MetaRef)
}
return [][]bloomshipper.MetaRef{refs}, []*bloomshipper.Fetcher{nil}, nil
}
func (s *dummyStore) FetchMetas(_ context.Context, _ bloomshipper.MetaSearchParams) ([]bloomshipper.Meta, error) {
//TODO(chaudum) Filter metas based on search params
return s.metas, nil
}
(chore) Bloom store: Rewrite block downloading/caching in bloom store fetcher (#11857) **What this PR does / why we need it**: 1. The `BloomStore` interface gets a new function `FetchBlocks()`, which accepts `[]BlockRef` and returns `[]BlockDirectory`. The fetcher implements a new function `FetchBlocks()` which returns a list of block directories. A block directory represents a local file path that contains the extracted files of a bloom block. It also holds a counter of active readers that access the directory. This is used for safely deleting the directory in case it needs to be removed from disk (eg. max disk/cache size reached). The fetcher resolves the block directory from three places: 1. Cache: The in-memory cache that holds actively accessed directories and keeps track of used disk size. 2. Filesystem: In case the cache was emptied (e.g. when restarting the process) but the block directory is present on disk, it can be re-assambled into a bloom directory that can be put to cache. 3. Storage: If the directory is not present locally, the block archive is downloaded and extracted and the block directory is put to cache. 2. The `{Meta,Block}Client` interfaces are unified: Both clients define the same set of operations: Get{Meta,Block} Get{Meta,Block}s Put{Meta,Block} Delete{Meta,Block}s 3. The `blockDownloader` is replaced by a simple queue powered by a channel. The queue is implemented using generics, so it can be re-used for different request/response types. 4. Code related to "block caching" moved into a separate file `cache.go` and unused code is removed. --------- Signed-off-by: Christian Haudum <christian.haudum@gmail.com> Co-authored-by: Owen Diehl <ow.diehl@gmail.com>
2 years ago
func (s *dummyStore) FetchBlocks(_ context.Context, _ []bloomshipper.BlockRef) ([]bloomshipper.BlockDirectory, error) {
panic("don't call me")
}
func (s *dummyStore) Fetcher(_ model.Time) (*bloomshipper.Fetcher, error) {
return nil, nil
}
func (s *dummyStore) Client(_ model.Time) (bloomshipper.Client, error) {
return nil, nil
}
func (s *dummyStore) Stop() {
}
func (s *dummyStore) LoadBlocks(_ context.Context, refs []bloomshipper.BlockRef) (v1.Iterator[bloomshipper.BlockQuerierWithFingerprintRange], error) {
result := make([]bloomshipper.BlockQuerierWithFingerprintRange, len(s.querieres))
for _, ref := range refs {
for _, bq := range s.querieres {
if ref.Bounds.Equal(bq.FingerprintBounds) {
result = append(result, bq)
}
}
}
rand.Shuffle(len(result), func(i, j int) {
result[i], result[j] = result[j], result[i]
})
return v1.NewSliceIter(result), nil
}
func TestProcessor(t *testing.T) {
ctx := context.Background()
tenant := "fake"
now := mktime("2024-01-27 12:00")
t.Run("dummy", func(t *testing.T) {
blocks, metas, queriers, data := createBlocks(t, tenant, 10, now.Add(-1*time.Hour), now, 0x0000, 0x1000)
p := &processor{
store: &dummyStore{
querieres: queriers,
metas: metas,
blocks: blocks,
},
}
chunkRefs := createQueryInputFromBlockData(t, tenant, data, 10)
swb := seriesWithBounds{
series: groupRefs(t, chunkRefs),
bounds: model.Interval{
Start: now.Add(-1 * time.Hour),
End: now,
},
day: truncateDay(now),
}
filters := []syntax.LineFilter{
{Ty: 0, Match: "no match"},
}
t.Log("series", len(swb.series))
task, _ := NewTask(ctx, "fake", swb, filters)
tasks := []Task{task}
results := atomic.NewInt64(0)
var wg sync.WaitGroup
for i := range tasks {
wg.Add(1)
go func(ta Task) {
defer wg.Done()
for range ta.resCh {
results.Inc()
}
t.Log("done", results.Load())
}(tasks[i])
}
err := p.run(ctx, tasks)
wg.Wait()
require.NoError(t, err)
require.Equal(t, int64(len(swb.series)), results.Load())
})
}