chore(bloom): remove unused code from blooms (#14539)

pull/14554/head
Robert Fratto 2 years ago committed by GitHub
parent dbb3b6edc9
commit bf54cf1496
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 6
      pkg/bloombuild/builder/batch.go
  2. 4
      pkg/bloombuild/builder/batch_test.go
  3. 15
      pkg/bloombuild/builder/builder.go
  4. 28
      pkg/bloombuild/builder/spec.go
  5. 6
      pkg/bloombuild/builder/spec_test.go
  6. 2
      pkg/bloombuild/common/tsdb.go
  7. 2
      pkg/bloombuild/common/tsdb_test.go
  8. 6
      pkg/bloombuild/planner/metrics.go
  9. 8
      pkg/bloombuild/planner/planner.go
  10. 8
      pkg/bloombuild/planner/strategies/splitkeyspace.go
  11. 125
      pkg/bloombuild/planner/util.go
  12. 172
      pkg/bloombuild/planner/util_test.go
  13. 2
      pkg/bloombuild/protos/compat.go
  14. 2
      pkg/bloomgateway/bloomgateway.go
  15. 4
      pkg/bloomgateway/client.go
  16. 8
      pkg/bloomgateway/client_pool.go
  17. 7
      pkg/bloomgateway/metrics.go
  18. 13
      pkg/bloomgateway/multiplexing.go
  19. 2
      pkg/bloomgateway/multiplexing_test.go
  20. 2
      pkg/bloomgateway/processor.go
  21. 2
      pkg/bloomgateway/querier.go
  22. 12
      pkg/bloomgateway/stats.go
  23. 18
      pkg/bloomgateway/util.go
  24. 17
      pkg/bloomgateway/util_test.go
  25. 14
      pkg/storage/bloom/v1/archive_test.go
  26. 2
      pkg/storage/bloom/v1/block.go
  27. 1
      pkg/storage/bloom/v1/block_writer.go
  28. 12
      pkg/storage/bloom/v1/bloom.go
  29. 28
      pkg/storage/bloom/v1/bloom_builder.go
  30. 8
      pkg/storage/bloom/v1/bloom_tokenizer.go
  31. 2
      pkg/storage/bloom/v1/bloom_tokenizer_test.go
  32. 45
      pkg/storage/bloom/v1/bounds.go
  33. 140
      pkg/storage/bloom/v1/bounds_test.go
  34. 12
      pkg/storage/bloom/v1/builder.go
  35. 68
      pkg/storage/bloom/v1/builder_test.go
  36. 6
      pkg/storage/bloom/v1/dedupe_test.go
  37. 4
      pkg/storage/bloom/v1/filter/buckets.go
  38. 17
      pkg/storage/bloom/v1/filter/scalable.go
  39. 46
      pkg/storage/bloom/v1/filter/scalable_test.go
  40. 14
      pkg/storage/bloom/v1/fuse.go
  41. 39
      pkg/storage/bloom/v1/fuse_test.go
  42. 7
      pkg/storage/bloom/v1/index_querier.go
  43. 5
      pkg/storage/bloom/v1/merge.go
  44. 10
      pkg/storage/bloom/v1/merge_test.go
  45. 4
      pkg/storage/bloom/v1/reader.go
  46. 8
      pkg/storage/bloom/v1/schema.go
  47. 7
      pkg/storage/bloom/v1/test_util.go
  48. 4
      pkg/storage/bloom/v1/util.go
  49. 2
      pkg/storage/bloom/v1/versioned_builder.go
  50. 11
      pkg/storage/bloom/v1/versioned_builder_test.go
  51. 8
      pkg/storage/chunk/cache/resultscache/cache.go

@ -248,7 +248,7 @@ func (i *blockLoadingIter) loadNext() bool {
blockRefs := i.overlapping.At()
loader := newBatchedBlockLoader(i.ctx, i.fetcher, blockRefs, i.batchSize)
filtered := iter.NewFilterIter[*bloomshipper.CloseableBlockQuerier](loader, i.filter)
filtered := iter.NewFilterIter(loader, i.filter)
iters := make([]iter.PeekIterator[*v1.SeriesWithBlooms], 0, len(blockRefs))
for filtered.Next() {
@ -279,7 +279,7 @@ func (i *blockLoadingIter) loadNext() bool {
// two overlapping blocks can conceivably have the same series, so we need to dedupe,
// preferring the one with the most chunks already indexed since we'll have
// to add fewer chunks to the bloom
i.iter = iter.NewDedupingIter[*v1.SeriesWithBlooms, *v1.SeriesWithBlooms](
i.iter = iter.NewDedupingIter(
func(a, b *v1.SeriesWithBlooms) bool {
return a.Series.Fingerprint == b.Series.Fingerprint
},
@ -346,7 +346,7 @@ func overlappingBlocksIter(inputs []bloomshipper.BlockRef) iter.Iterator[[]bloom
// can we assume sorted blocks?
peekIter := iter.NewPeekIter(iter.NewSliceIter(inputs))
return iter.NewDedupingIter[bloomshipper.BlockRef, []bloomshipper.BlockRef](
return iter.NewDedupingIter(
func(a bloomshipper.BlockRef, b []bloomshipper.BlockRef) bool {
minFp := b[0].Bounds.Min
maxFp := slices.MaxFunc(b, func(a, b bloomshipper.BlockRef) int { return int(a.Bounds.Max - b.Bounds.Max) }).Bounds.Max

@ -120,7 +120,7 @@ func TestBatchedLoader(t *testing.T) {
)
}
loader := newBatchedLoader[int, int, int](
loader := newBatchedLoader(
tc.ctx,
fetchers,
tc.inputs,
@ -128,7 +128,7 @@ func TestBatchedLoader(t *testing.T) {
tc.batchSize,
)
got, err := v2.Collect[int](loader)
got, err := v2.Collect(loader)
if tc.err {
require.Error(t, err)
return

@ -46,7 +46,6 @@ type Builder struct {
metrics *Metrics
logger log.Logger
tsdbStore common.TSDBStore
bloomStore bloomshipper.Store
chunkLoader ChunkLoader
@ -74,18 +73,12 @@ func New(
builderID := uuid.NewString()
logger = log.With(logger, "builder_id", builderID)
tsdbStore, err := common.NewTSDBStores("bloom-builder", schemaCfg, storeCfg, storageMetrics, logger)
if err != nil {
return nil, fmt.Errorf("error creating TSDB store: %w", err)
}
metrics := NewMetrics(r)
b := &Builder{
ID: builderID,
cfg: cfg,
limits: limits,
metrics: metrics,
tsdbStore: tsdbStore,
bloomStore: bloomStore,
chunkLoader: NewStoreChunkLoader(fetcherProvider, metrics),
logger: logger,
@ -386,7 +379,7 @@ func (b *Builder) processTask(
// Blocks are built consuming the series iterator. For observability, we wrap the series iterator
// with a counter iterator to count the number of times Next() is called on it.
// This is used to observe the number of series that are being processed.
seriesItrWithCounter := iter.NewCounterIter[*v1.Series](seriesItr)
seriesItrWithCounter := iter.NewCounterIter(seriesItr)
gen := NewSimpleBloomGenerator(
tenant,
@ -416,7 +409,7 @@ func (b *Builder) processTask(
return nil, fmt.Errorf("failed to build block: %w", err)
}
logger := log.With(logger, "block", built.BlockRef.String())
logger := log.With(logger, "block", built.String())
if err := client.PutBlock(
ctx,
@ -461,7 +454,7 @@ func (b *Builder) processTask(
}
meta.MetaRef = ref
logger = log.With(logger, "meta", meta.MetaRef.String())
logger = log.With(logger, "meta", meta.String())
if err := client.PutMeta(ctx, meta); err != nil {
level.Error(logger).Log("msg", "failed to write meta", "err", err)
@ -490,7 +483,7 @@ func (b *Builder) loadWorkForGap(
table config.DayTable,
gap protos.Gap,
) (iter.Iterator[*v1.Series], iter.CloseResetIterator[*v1.SeriesWithBlooms], error) {
seriesItr := iter.NewCancelableIter[*v1.Series](ctx, iter.NewSliceIter[*v1.Series](gap.Series))
seriesItr := iter.NewCancelableIter(ctx, iter.NewSliceIter(gap.Series))
// load a blocks iterator for the gap
fetcher, err := b.bloomStore.Fetcher(table.ModelTime())

@ -3,7 +3,6 @@ package builder
import (
"context"
"fmt"
"io"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
@ -17,29 +16,8 @@ import (
"github.com/grafana/loki/v3/pkg/storage/chunk/fetcher"
"github.com/grafana/loki/v3/pkg/storage/stores"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
)
// inclusive range
type Keyspace struct {
min, max model.Fingerprint
}
func (k Keyspace) Cmp(other Keyspace) v1.BoundsCheck {
if other.max < k.min {
return v1.Before
} else if other.min > k.max {
return v1.After
}
return v1.Overlap
}
// Store is likely bound within. This allows specifying impls like ShardedStore<Store>
// to only request the shard-range needed from the existing store.
type BloomGenerator interface {
Generate(ctx context.Context) (skippedBlocks []v1.BlockMetadata, toClose []io.Closer, results iter.Iterator[*v1.Block], err error)
}
// Simple implementation of a BloomGenerator.
type SimpleBloomGenerator struct {
userID string
@ -247,12 +225,6 @@ func (b *LazyBlockBuilderIterator) Err() error {
return b.err
}
// IndexLoader loads an index. This helps us do things like
// load TSDBs for a specific period excluding multitenant (pre-compacted) indices
type indexLoader interface {
Index() (tsdb.Index, error)
}
// ChunkItersByFingerprint models the chunks belonging to a fingerprint
type ChunkItersByFingerprint struct {
fp model.Fingerprint

@ -48,7 +48,7 @@ func blocksFromSchemaWithRange(t *testing.T, n int, options v1.BlockOptions, fro
minIdx, maxIdx := i*seriesPerBlock, (i+1)*seriesPerBlock
itr := v2.NewSliceIter[v1.SeriesWithBlooms](data[minIdx:maxIdx])
itr := v2.NewSliceIter(data[minIdx:maxIdx])
_, err = builder.BuildFrom(itr)
require.Nil(t, err)
@ -134,8 +134,8 @@ func TestSimpleBloomGenerator(t *testing.T) {
} {
t.Run(fmt.Sprintf("%s/%s", tc.desc, enc), func(t *testing.T) {
sourceBlocks, data, refs := blocksFromSchemaWithRange(t, 2, tc.fromSchema, 0x00000, 0x6ffff)
storeItr := v2.NewMapIter[v1.SeriesWithBlooms, *v1.Series](
v2.NewSliceIter[v1.SeriesWithBlooms](data),
storeItr := v2.NewMapIter(
v2.NewSliceIter(data),
func(swb v1.SeriesWithBlooms) *v1.Series {
return &swb.Series.Series
},

@ -163,7 +163,7 @@ func NewTSDBSeriesIter(ctx context.Context, user string, f sharding.ForSeries, b
case <-ctx.Done():
return iter.NewEmptyIter[*v1.Series](), ctx.Err()
default:
return iter.NewCancelableIter[*v1.Series](ctx, iter.NewSliceIter[*v1.Series](series)), nil
return iter.NewCancelableIter(ctx, iter.NewSliceIter(series)), nil
}
}

@ -66,7 +66,7 @@ func TestTSDBSeriesIter(t *testing.T) {
itr, err := NewTSDBSeriesIter(context.Background(), "", forSeriesTestImpl(input), v1.NewBounds(0, math.MaxUint64))
require.NoError(t, err)
v1.EqualIterators[*v1.Series](
v1.EqualIterators(
t,
func(a, b *v1.Series) {
require.Equal(t, a, b)

@ -5,8 +5,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/grafana/loki/v3/pkg/queue"
)
const (
@ -211,7 +209,3 @@ func NewMetrics(
}),
}
}
func NewQueueMetrics(r prometheus.Registerer) *queue.Metrics {
return queue.NewMetrics(r, metricsNamespace, metricsSubsystem)
}

@ -82,7 +82,7 @@ func New(
}
// Queue to manage tasks
queueMetrics := NewQueueMetrics(r)
queueMetrics := queue.NewMetrics(r, metricsNamespace, metricsSubsystem)
tasksQueue := queue.NewRequestQueue(cfg.MaxQueuedTasksPerTenant, 0, NewQueueLimits(limits), queueMetrics)
// Clean metrics for inactive users: do not have added tasks to the queue in the last 1 hour
@ -591,14 +591,14 @@ func (p *Planner) deleteOutdatedMetasAndBlocks(
err = client.DeleteMetas(ctx, []bloomshipper.MetaRef{meta.MetaRef})
if err != nil {
if client.IsObjectNotFoundErr(err) {
level.Debug(logger).Log("msg", "meta not found while attempting delete, continuing", "meta", meta.MetaRef.String())
level.Debug(logger).Log("msg", "meta not found while attempting delete, continuing", "meta", meta.String())
} else {
level.Error(logger).Log("msg", "failed to delete meta", "err", err, "meta", meta.MetaRef.String())
level.Error(logger).Log("msg", "failed to delete meta", "err", err, "meta", meta.String())
return nil, errors.Wrap(err, "failed to delete meta")
}
}
deletedMetas++
level.Debug(logger).Log("msg", "removed outdated meta", "meta", meta.MetaRef.String())
level.Debug(logger).Log("msg", "removed outdated meta", "meta", meta.String())
}
level.Debug(logger).Log(

@ -207,13 +207,13 @@ func blockPlansForGaps(
return planGap.Blocks[i].Bounds.Less(planGap.Blocks[j].Bounds)
})
peekingBlocks := iter.NewPeekIter[bloomshipper.BlockRef](
iter.NewSliceIter[bloomshipper.BlockRef](
peekingBlocks := iter.NewPeekIter(
iter.NewSliceIter(
planGap.Blocks,
),
)
// dedupe blocks which could be in multiple metas
itr := iter.NewDedupingIter[bloomshipper.BlockRef, bloomshipper.BlockRef](
itr := iter.NewDedupingIter(
func(a, b bloomshipper.BlockRef) bool {
return a == b
},
@ -224,7 +224,7 @@ func blockPlansForGaps(
peekingBlocks,
)
deduped, err := iter.Collect[bloomshipper.BlockRef](itr)
deduped, err := iter.Collect(itr)
if err != nil {
return nil, fmt.Errorf("failed to dedupe blocks: %w", err)
}

@ -1,125 +0,0 @@
package planner
import (
"fmt"
"math"
"github.com/prometheus/common/model"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
)
// SplitFingerprintKeyspaceByFactor splits the keyspace covered by model.Fingerprint into contiguous non-overlapping ranges.
func SplitFingerprintKeyspaceByFactor(factor int) []v1.FingerprintBounds {
if factor <= 0 {
return nil
}
bounds := make([]v1.FingerprintBounds, 0, factor)
// The keyspace of a Fingerprint is from 0 to max uint64.
keyspaceSize := uint64(math.MaxUint64)
// Calculate the size of each range.
rangeSize := keyspaceSize / uint64(factor)
for i := 0; i < factor; i++ {
// Calculate the start and end of the range.
start := uint64(i) * rangeSize
end := start + rangeSize - 1
// For the last range, make sure it ends at the end of the keyspace.
if i == factor-1 {
end = keyspaceSize
}
// Create a FingerprintBounds for the range and add it to the slice.
bounds = append(bounds, v1.FingerprintBounds{
Min: model.Fingerprint(start),
Max: model.Fingerprint(end),
})
}
return bounds
}
func FindGapsInFingerprintBounds(ownershipRange v1.FingerprintBounds, metas []v1.FingerprintBounds) (gaps []v1.FingerprintBounds, err error) {
if len(metas) == 0 {
return []v1.FingerprintBounds{ownershipRange}, nil
}
// turn the available metas into a list of non-overlapping metas
// for easier processing
var nonOverlapping []v1.FingerprintBounds
// First, we reduce the metas into a smaller set by combining overlaps. They must be sorted.
var cur *v1.FingerprintBounds
for i := 0; i < len(metas); i++ {
j := i + 1
// first iteration (i == 0), set the current meta
if cur == nil {
cur = &metas[i]
}
if j >= len(metas) {
// We've reached the end of the list. Add the last meta to the non-overlapping set.
nonOverlapping = append(nonOverlapping, *cur)
break
}
combined := cur.Union(metas[j])
if len(combined) == 1 {
// There was an overlap between the two tested ranges. Combine them and keep going.
cur = &combined[0]
continue
}
// There was no overlap between the two tested ranges. Add the first to the non-overlapping set.
// and keep the second for the next iteration.
nonOverlapping = append(nonOverlapping, combined[0])
cur = &combined[1]
}
// Now, detect gaps between the non-overlapping metas and the ownership range.
// The left bound of the ownership range will be adjusted as we go.
leftBound := ownershipRange.Min
for _, meta := range nonOverlapping {
clippedMeta := meta.Intersection(ownershipRange)
// should never happen as long as we are only combining metas
// that intersect with the ownership range
if clippedMeta == nil {
return nil, fmt.Errorf("meta is not within ownership range: %v", meta)
}
searchRange := ownershipRange.Slice(leftBound, clippedMeta.Max)
// update the left bound for the next iteration
// We do the max to prevent the max bound to overflow from MaxUInt64 to 0
leftBound = min(
max(clippedMeta.Max+1, clippedMeta.Max),
max(ownershipRange.Max+1, ownershipRange.Max),
)
// since we've already ensured that the meta is within the ownership range,
// we know the xor will be of length zero (when the meta is equal to the ownership range)
// or 1 (when the meta is a subset of the ownership range)
xors := searchRange.Unless(*clippedMeta)
if len(xors) == 0 {
// meta is equal to the ownership range. This means the meta
// covers this entire section of the ownership range.
continue
}
gaps = append(gaps, xors[0])
}
// If the leftBound is less than the ownership range max, and it's smaller than MaxUInt64,
// There is a gap between the last meta and the end of the ownership range.
// Note: we check `leftBound < math.MaxUint64` since in the loop above we clamp the
// leftBound to MaxUint64 to prevent an overflow to 0: `max(clippedMeta.Max+1, clippedMeta.Max)`
if leftBound < math.MaxUint64 && leftBound <= ownershipRange.Max {
gaps = append(gaps, v1.NewBounds(leftBound, ownershipRange.Max))
}
return gaps, nil
}

@ -1,172 +0,0 @@
package planner
import (
"math"
"testing"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
)
func TestSplitFingerprintKeyspaceByFactor(t *testing.T) {
for _, tt := range []struct {
name string
factor int
}{
{
name: "Factor is 0",
factor: 0,
},
{
name: "Factor is 1",
factor: 1,
},
{
name: "Factor is 256",
factor: 256,
},
} {
t.Run(tt.name, func(t *testing.T) {
got := SplitFingerprintKeyspaceByFactor(tt.factor)
if tt.factor == 0 {
require.Empty(t, got)
return
}
// Check overall min and max values of the ranges.
require.Equal(t, model.Fingerprint(math.MaxUint64), got[len(got)-1].Max)
require.Equal(t, model.Fingerprint(0), got[0].Min)
// For each range, check that the max value of the previous range is one less than the min value of the current range.
for i := 1; i < len(got); i++ {
require.Equal(t, got[i-1].Max+1, got[i].Min)
}
})
}
}
func Test_FindGapsInFingerprintBounds(t *testing.T) {
for _, tc := range []struct {
desc string
err bool
exp []v1.FingerprintBounds
ownershipRange v1.FingerprintBounds
metas []v1.FingerprintBounds
}{
{
desc: "error nonoverlapping metas",
err: true,
exp: nil,
ownershipRange: v1.NewBounds(0, 10),
metas: []v1.FingerprintBounds{v1.NewBounds(11, 20)},
},
{
desc: "one meta with entire ownership range",
err: false,
exp: nil,
ownershipRange: v1.NewBounds(0, 10),
metas: []v1.FingerprintBounds{v1.NewBounds(0, 10)},
},
{
desc: "two non-overlapping metas with entire ownership range",
err: false,
exp: nil,
ownershipRange: v1.NewBounds(0, 10),
metas: []v1.FingerprintBounds{
v1.NewBounds(0, 5),
v1.NewBounds(6, 10),
},
},
{
desc: "two overlapping metas with entire ownership range",
err: false,
exp: nil,
ownershipRange: v1.NewBounds(0, 10),
metas: []v1.FingerprintBounds{
v1.NewBounds(0, 6),
v1.NewBounds(4, 10),
},
},
{
desc: "one meta with partial ownership range",
err: false,
exp: []v1.FingerprintBounds{
v1.NewBounds(6, 10),
},
ownershipRange: v1.NewBounds(0, 10),
metas: []v1.FingerprintBounds{
v1.NewBounds(0, 5),
},
},
{
desc: "smaller subsequent meta with partial ownership range",
err: false,
exp: []v1.FingerprintBounds{
v1.NewBounds(8, 10),
},
ownershipRange: v1.NewBounds(0, 10),
metas: []v1.FingerprintBounds{
v1.NewBounds(0, 7),
v1.NewBounds(3, 4),
},
},
{
desc: "hole in the middle",
err: false,
exp: []v1.FingerprintBounds{
v1.NewBounds(4, 5),
},
ownershipRange: v1.NewBounds(0, 10),
metas: []v1.FingerprintBounds{
v1.NewBounds(0, 3),
v1.NewBounds(6, 10),
},
},
{
desc: "holes on either end",
err: false,
exp: []v1.FingerprintBounds{
v1.NewBounds(0, 2),
v1.NewBounds(8, 10),
},
ownershipRange: v1.NewBounds(0, 10),
metas: []v1.FingerprintBounds{
v1.NewBounds(3, 5),
v1.NewBounds(6, 7),
},
},
{
desc: "full ownership range with single meta",
err: false,
exp: nil,
ownershipRange: v1.NewBounds(0, math.MaxUint64),
metas: []v1.FingerprintBounds{
v1.NewBounds(0, math.MaxUint64),
},
},
{
desc: "full ownership range with multiple metas",
err: false,
exp: nil,
ownershipRange: v1.NewBounds(0, math.MaxUint64),
// Three metas covering the whole 0 - MaxUint64
metas: []v1.FingerprintBounds{
v1.NewBounds(0, math.MaxUint64/3),
v1.NewBounds(math.MaxUint64/3+1, math.MaxUint64/2),
v1.NewBounds(math.MaxUint64/2+1, math.MaxUint64),
},
},
} {
t.Run(tc.desc, func(t *testing.T) {
gaps, err := FindGapsInFingerprintBounds(tc.ownershipRange, tc.metas)
if tc.err {
require.Error(t, err)
return
}
require.Equal(t, tc.exp, gaps)
})
}
}

@ -249,7 +249,7 @@ func (r *TaskResult) ToProtoTaskResult() *ProtoTaskResult {
}
protoMetas = append(protoMetas, &ProtoMeta{
MetaRef: meta.MetaRef.String(),
MetaRef: meta.String(),
SourcesTSDBs: metaRefs,
BlockRefs: blockRefs,
})

@ -352,7 +352,7 @@ func filterChunkRefs(req *logproto.FilterChunkRefRequest, responses []v1.Output)
// dedupe outputs, merging the same series.
// This returns an Iterator[v1.Output]
dedupedResps := iter.NewDedupingIter[v1.Output, v1.Output](
dedupedResps := iter.NewDedupingIter(
// eq
func(o1, o2 v1.Output) bool {
return o1.Fp == o2.Fp

@ -301,12 +301,12 @@ func mergeSeries(input [][]*logproto.GroupedChunkRefs, buf []*logproto.GroupedCh
iters = append(iters, iter.NewPeekIter(iter.NewSliceIter(inp)))
}
heapIter := v1.NewHeapIterator[*logproto.GroupedChunkRefs](
heapIter := v1.NewHeapIterator(
func(a, b *logproto.GroupedChunkRefs) bool { return a.Fingerprint < b.Fingerprint },
iters...,
)
dedupeIter := iter.NewDedupingIter[*logproto.GroupedChunkRefs, *logproto.GroupedChunkRefs](
dedupeIter := iter.NewDedupingIter(
// eq
func(a, b *logproto.GroupedChunkRefs) bool { return a.Fingerprint == b.Fingerprint },
// from

@ -76,14 +76,6 @@ func (p *JumpHashClientPool) Stop() {
_ = services.StopAndAwaitTerminated(context.Background(), p.Service)
}
func (p *JumpHashClientPool) AddrForFingerprint(fp uint64) (string, error) {
addr, err := p.FromUInt64(fp)
if err != nil {
return "", err
}
return addr.String(), nil
}
func (p *JumpHashClientPool) Addr(key string) (string, error) {
addr, err := p.FromString(key)
if err != nil {

@ -23,7 +23,6 @@ const (
type clientMetrics struct {
clientRequests *prometheus.CounterVec
requestLatency *prometheus.HistogramVec
clients prometheus.Gauge
}
func newClientMetrics(registerer prometheus.Registerer) *clientMetrics {
@ -41,12 +40,6 @@ func newClientMetrics(registerer prometheus.Registerer) *clientMetrics {
Help: "Time (in seconds) spent serving requests when using the bloom gateway",
Buckets: instrument.DefBuckets,
}, []string{"operation", "status_code"}),
clients: promauto.With(registerer).NewGauge(prometheus.GaugeOpts{
Namespace: constants.Loki,
Subsystem: "bloom_gateway",
Name: "clients",
Help: "The current number of bloom gateway clients.",
}),
}
}

@ -18,17 +18,20 @@ const (
Day = 24 * time.Hour
)
type tokenSettings struct {
nGramLen int
}
type wrappedError struct {
mu sync.Mutex
err error
}
func (e *wrappedError) Error() string {
return e.err.Error()
e.mu.Lock()
err := e.err
e.mu.Unlock()
if err == nil {
return ""
}
return err.Error()
}
func (e *wrappedError) Set(err error) {

@ -108,7 +108,7 @@ func TestTask_RequestIterator(t *testing.T) {
}
// merge the request iterators using the heap sort iterator
it := v1.NewHeapIterator[v1.Request](func(r1, r2 v1.Request) bool { return r1.Fp < r2.Fp }, iters...)
it := v1.NewHeapIterator(func(r1, r2 v1.Request) bool { return r1.Fp < r2.Fp }, iters...)
// first item
require.True(t, it.Next())

@ -168,7 +168,7 @@ func (p *processor) processBlock(_ context.Context, bq *bloomshipper.CloseableBl
iters = append(iters, it)
}
logger := log.With(p.logger, "block", bq.BlockRef.String())
logger := log.With(p.logger, "block", bq.String())
fq := blockQuerier.Fuse(iters, logger)
start := time.Now()

@ -149,7 +149,7 @@ func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from
// We can perform requests sequentially, because most of the time the request
// only covers a single day, and if not, it's at most two days.
for _, s := range partitionSeriesByDay(from, through, grouped) {
day := bloomshipper.NewInterval(s.day.Time, s.day.Time.Add(Day))
day := bloomshipper.NewInterval(s.day.Time, s.day.Add(Day))
blocks, skipped, err := bq.blockResolver.Resolve(ctx, tenant, day, s.series)
if err != nil {
return nil, err

@ -13,7 +13,7 @@ type Stats struct {
ChunksRequested, ChunksFiltered int
SeriesRequested, SeriesFiltered int
QueueTime *atomic.Duration
MetasFetchTime, BlocksFetchTime *atomic.Duration
BlocksFetchTime *atomic.Duration
ProcessingTime, TotalProcessingTime *atomic.Duration
PostProcessingTime *atomic.Duration
ProcessedBlocks *atomic.Int32 // blocks processed for this specific request
@ -31,7 +31,6 @@ func ContextWithEmptyStats(ctx context.Context) (*Stats, context.Context) {
ProcessedBlocks: atomic.NewInt32(0),
ProcessedBlocksTotal: atomic.NewInt32(0),
QueueTime: atomic.NewDuration(0),
MetasFetchTime: atomic.NewDuration(0),
BlocksFetchTime: atomic.NewDuration(0),
ProcessingTime: atomic.NewDuration(0),
TotalProcessingTime: atomic.NewDuration(0),
@ -54,7 +53,6 @@ func FromContext(ctx context.Context) *Stats {
// aggregates the total duration
func (s *Stats) Duration() (dur time.Duration) {
dur += s.QueueTime.Load()
dur += s.MetasFetchTime.Load()
dur += s.BlocksFetchTime.Load()
dur += s.ProcessingTime.Load()
dur += s.PostProcessingTime.Load()
@ -82,7 +80,6 @@ func (s *Stats) KVArgs() []any {
"chunks_remaining", chunksRemaining,
"filter_ratio", filterRatio,
"queue_time", s.QueueTime.Load(),
"metas_fetch_time", s.MetasFetchTime.Load(),
"blocks_fetch_time", s.BlocksFetchTime.Load(),
"processing_time", s.ProcessingTime.Load(),
"post_processing_time", s.PostProcessingTime.Load(),
@ -97,13 +94,6 @@ func (s *Stats) AddQueueTime(t time.Duration) {
s.QueueTime.Add(t)
}
func (s *Stats) AddMetasFetchTime(t time.Duration) {
if s == nil {
return
}
s.MetasFetchTime.Add(t)
}
func (s *Stats) AddBlocksFetchTime(t time.Duration) {
if s == nil {
return

@ -4,7 +4,6 @@ import (
"sort"
"github.com/prometheus/common/model"
"golang.org/x/exp/slices"
"github.com/grafana/loki/v3/pkg/logproto"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
@ -34,23 +33,6 @@ func daysForRange(from, through model.Time) []model.Time {
return days
}
// getFromThrough assumes a list of ShortRefs sorted by From time
func getFromThrough(refs []*logproto.ShortRef) (model.Time, model.Time) {
if len(refs) == 0 {
return model.Earliest, model.Latest
}
if len(refs) == 1 {
return refs[0].From, refs[0].Through
}
maxItem := slices.MaxFunc(refs, func(a, b *logproto.ShortRef) int {
return int(a.Through) - int(b.Through)
})
return refs[0].From, maxItem.Through
}
// convertToChunkRefs converts a []*logproto.ShortRef into v1.ChunkRefs
func convertToChunkRefs(refs []*logproto.ShortRef) v1.ChunkRefs {
result := make(v1.ChunkRefs, 0, len(refs))

@ -31,23 +31,6 @@ func mktime(s string) model.Time {
return model.TimeFromUnix(ts.Unix())
}
func TestGetFromThrough(t *testing.T) {
chunks := []*logproto.ShortRef{
{From: 0, Through: 6},
{From: 1, Through: 5},
{From: 2, Through: 9},
{From: 3, Through: 8},
{From: 4, Through: 7},
}
from, through := getFromThrough(chunks)
require.Equal(t, model.Time(0), from)
require.Equal(t, model.Time(9), through)
// assert that slice order did not change
require.Equal(t, model.Time(0), chunks[0].From)
require.Equal(t, model.Time(4), chunks[len(chunks)-1].From)
}
func TestTruncateDay(t *testing.T) {
expected := mktime("2024-01-24 00:00")

@ -22,10 +22,7 @@ func TestArchive(t *testing.T) {
builder, err := NewBlockBuilder(
BlockOptions{
Schema: Schema{
version: CurrentSchemaVersion,
encoding: compression.None,
},
Schema: NewSchema(CurrentSchemaVersion, compression.None),
SeriesPageSize: 100,
BloomPageSize: 10 << 10,
},
@ -33,7 +30,7 @@ func TestArchive(t *testing.T) {
)
require.Nil(t, err)
itr := v2.NewSliceIter[SeriesWithBlooms](data)
itr := v2.NewSliceIter(data)
_, err = builder.BuildFrom(itr)
require.Nil(t, err)
@ -104,10 +101,7 @@ func TestArchiveCompression(t *testing.T) {
builder, err := NewBlockBuilder(
BlockOptions{
Schema: Schema{
version: CurrentSchemaVersion,
encoding: compression.None,
},
Schema: NewSchema(CurrentSchemaVersion, compression.None),
SeriesPageSize: 100,
BloomPageSize: 10 << 10,
},
@ -115,7 +109,7 @@ func TestArchiveCompression(t *testing.T) {
)
require.Nil(t, err)
itr := v2.NewSliceIter[SeriesWithBlooms](data)
itr := v2.NewSliceIter(data)
_, err = builder.BuildFrom(itr)
require.Nil(t, err)

@ -137,7 +137,7 @@ func (bq *BlockQuerier) Schema() (Schema, error) {
func (bq *BlockQuerier) Reset() error {
bq.blooms.Reset()
return bq.LazySeriesIter.Seek(0)
return bq.Seek(0)
}
func (bq *BlockQuerier) Err() error {

@ -14,7 +14,6 @@ import (
)
const (
FileMode = 0644
BloomFileName = "bloom"
SeriesFileName = "series"
)

@ -47,18 +47,6 @@ func (b *Bloom) Encode(enc *encoding.Encbuf) error {
return nil
}
func (b *Bloom) DecodeCopy(dec *encoding.Decbuf) error {
ln := dec.Uvarint()
data := dec.Bytes(ln)
_, err := b.ReadFrom(bytes.NewReader(data))
if err != nil {
return errors.Wrap(err, "decoding copy of bloom filter")
}
return nil
}
func (b *Bloom) Decode(dec *encoding.Decbuf) error {
ln := dec.Uvarint()
data := dec.Bytes(ln)

@ -28,20 +28,9 @@ func NewBloomBlockBuilder(opts BlockOptions, writer io.WriteCloser) *BloomBlockB
}
}
func (b *BloomBlockBuilder) WriteSchema() error {
b.scratch.Reset()
b.opts.Schema.Encode(b.scratch)
if _, err := b.writer.Write(b.scratch.Get()); err != nil {
return errors.Wrap(err, "writing schema")
}
b.writtenSchema = true
b.offset += b.scratch.Len()
return nil
}
func (b *BloomBlockBuilder) Append(bloom *Bloom) (BloomOffset, error) {
if !b.writtenSchema {
if err := b.WriteSchema(); err != nil {
if err := b.writeSchema(); err != nil {
return BloomOffset{}, errors.Wrap(err, "writing schema")
}
}
@ -63,6 +52,21 @@ func (b *BloomBlockBuilder) Append(bloom *Bloom) (BloomOffset, error) {
}, nil
}
func (b *BloomBlockBuilder) writeSchema() error {
if b.writtenSchema {
return nil
}
b.scratch.Reset()
b.opts.Schema.Encode(b.scratch)
if _, err := b.writer.Write(b.scratch.Get()); err != nil {
return errors.Wrap(err, "writing schema")
}
b.writtenSchema = true
b.offset += b.scratch.Len()
return nil
}
func (b *BloomBlockBuilder) Close() (uint32, error) {
if b.page.Count() > 0 {
if err := b.flushPage(); err != nil {

@ -126,12 +126,12 @@ func (bt *BloomTokenizer) Populate(blooms v2iter.SizedIterator[*Bloom], chks v2i
}
func (bt *BloomTokenizer) sendBloom(ch chan<- *BloomCreation, bloom *Bloom, info indexingInfo) {
fillRatio := bloom.ScalableBloomFilter.FillRatio()
fillRatio := bloom.FillRatio()
bt.metrics.hammingWeightRatio.Observe(fillRatio)
bt.metrics.estimatedCount.Observe(
float64(estimatedCount(bloom.ScalableBloomFilter.Capacity(), fillRatio)),
float64(estimatedCount(bloom.Capacity(), fillRatio)),
)
bt.metrics.bloomSize.Observe(float64(bloom.ScalableBloomFilter.Capacity() / eightBits))
bt.metrics.bloomSize.Observe(float64(bloom.Capacity() / eightBits))
bt.metrics.bloomsTotal.Inc()
ch <- &BloomCreation{
Bloom: bloom,
@ -184,7 +184,7 @@ func (bt *BloomTokenizer) addChunkToBloom(bloom *Bloom, ref ChunkRef, entryIter
}
// maxBloomSize is in bytes, but blooms operate at the bit level; adjust
collision, full = bloom.ScalableBloomFilter.TestAndAddWithMaxSize([]byte(tok), bt.maxBloomSize*eightBits)
collision, full = bloom.TestAndAddWithMaxSize([]byte(tok), bt.maxBloomSize*eightBits)
if collision {
collisionInserts++

@ -173,7 +173,7 @@ func TestTokenizerPopulateWontExceedMaxSize(t *testing.T) {
var ct int
for created := range ch {
ct++
capacity := created.Bloom.ScalableBloomFilter.Capacity() / 8
capacity := created.Bloom.Capacity() / 8
t.Log(ct, int(capacity), maxSize)
require.Less(t, int(capacity), maxSize)
}

@ -5,11 +5,9 @@ import (
"hash"
"math"
"strings"
"unsafe"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"golang.org/x/exp/slices"
iter "github.com/grafana/loki/v3/pkg/iter/v2"
"github.com/grafana/loki/v3/pkg/logproto"
@ -41,13 +39,6 @@ func BoundsFromProto(pb logproto.FPBounds) FingerprintBounds {
return FingerprintBounds(pb)
}
// Unsafe cast to avoid allocation. This _requires_ that the underlying types are the same
// which is checked by the compiler above
func MultiBoundsFromProto(pb []logproto.FPBounds) MultiFingerprintBounds {
//nolint:unconvert
return MultiFingerprintBounds(*(*MultiFingerprintBounds)(unsafe.Pointer(&pb)))
}
// ParseBoundsFromAddr parses a fingerprint bounds from a string
func ParseBoundsFromAddr(s string) (FingerprintBounds, error) {
parts := strings.Split(s, "-")
@ -207,40 +198,6 @@ func (b FingerprintBounds) Range() uint64 {
return uint64(b.Max - b.Min)
}
type MultiFingerprintBounds []FingerprintBounds
func (mb MultiFingerprintBounds) Union(target FingerprintBounds) MultiFingerprintBounds {
if len(mb) == 0 {
return MultiFingerprintBounds{target}
}
if len(mb) == 1 {
return mb[0].Union(target)
}
mb = append(mb, target)
slices.SortFunc(mb, func(a, b FingerprintBounds) int {
if a.Less(b) {
return -1
} else if a.Equal(b) {
return 0
}
return 1
})
var union MultiFingerprintBounds
for i := 0; i < len(mb); i++ {
j := len(union) - 1 // index of last item of union
if j >= 0 && union[j].Max >= mb[i].Min-1 {
union[j] = NewBounds(union[j].Min, max(mb[i].Max, union[j].Max))
} else {
union = append(union, mb[i])
}
}
mb = union
return mb
}
// unused, but illustrative
type BoundedIter[V any] struct {
iter.Iterator[V]
@ -249,7 +206,7 @@ type BoundedIter[V any] struct {
func (bi *BoundedIter[V]) Next() bool {
for bi.Iterator.Next() {
switch bi.cmp(bi.Iterator.At()) {
switch bi.cmp(bi.At()) {
case Before:
continue
case After:

@ -17,17 +17,6 @@ func TestBoundsFromProto(t *testing.T) {
assert.Equal(t, NewBounds(10, 2000), bounds)
}
func TestMultiBoundsFromProto(t *testing.T) {
bounds := MultiBoundsFromProto([]logproto.FPBounds{
{Min: 10, Max: 2000},
{Min: 2001, Max: 4000},
})
assert.Equal(t, MultiFingerprintBounds{
NewBounds(10, 2000),
NewBounds(2001, 4000),
}, bounds)
}
func Test_ParseFingerprint(t *testing.T) {
t.Parallel()
fp, err := model.ParseFingerprint("7d0")
@ -150,132 +139,3 @@ func Test_FingerprintBounds_Unless(t *testing.T) {
}, NewBounds(5, 25).Unless(target))
assert.Nil(t, NewBounds(14, 15).Unless(target))
}
func Test_MultiFingerprintBounds(t *testing.T) {
for _, tc := range []struct {
desc string
mb MultiFingerprintBounds
target FingerprintBounds
exp MultiFingerprintBounds
}{
{
desc: "no elements",
mb: MultiFingerprintBounds{},
target: NewBounds(0, 9),
exp: MultiFingerprintBounds{
NewBounds(0, 9),
},
},
{
desc: "single element before",
mb: MultiFingerprintBounds{
NewBounds(5, 9),
},
target: NewBounds(15, 19),
exp: MultiFingerprintBounds{
NewBounds(5, 9),
NewBounds(15, 19),
},
},
{
desc: "single element after",
mb: MultiFingerprintBounds{
NewBounds(5, 9),
},
target: NewBounds(0, 3),
exp: MultiFingerprintBounds{
NewBounds(0, 3),
NewBounds(5, 9),
},
},
{
desc: "single element overlapping",
mb: MultiFingerprintBounds{
NewBounds(5, 9),
},
target: NewBounds(0, 14),
exp: MultiFingerprintBounds{
NewBounds(0, 14),
},
},
{
desc: "multiple elements single overlapping",
mb: MultiFingerprintBounds{
NewBounds(5, 9),
NewBounds(15, 19),
},
target: NewBounds(0, 6),
exp: MultiFingerprintBounds{
NewBounds(0, 9),
NewBounds(15, 19),
},
},
{
desc: "multiple elements single overlapping",
mb: MultiFingerprintBounds{
NewBounds(5, 9),
NewBounds(15, 19),
},
target: NewBounds(11, 25),
exp: MultiFingerprintBounds{
NewBounds(5, 9),
NewBounds(11, 25),
},
},
{
desc: "multiple elements combining overlapping",
mb: MultiFingerprintBounds{
NewBounds(5, 9),
NewBounds(15, 19),
},
target: NewBounds(9, 15),
exp: MultiFingerprintBounds{
NewBounds(5, 19),
},
},
{
desc: "combination",
mb: MultiFingerprintBounds{
NewBounds(0, 2),
NewBounds(5, 9),
NewBounds(15, 19),
NewBounds(25, 29),
},
target: NewBounds(9, 15),
exp: MultiFingerprintBounds{
NewBounds(0, 2),
NewBounds(5, 19),
NewBounds(25, 29),
},
},
{
desc: "overlapping ranges",
mb: MultiFingerprintBounds{
NewBounds(0, 6),
NewBounds(5, 15),
},
target: NewBounds(8, 10),
exp: MultiFingerprintBounds{
NewBounds(0, 15),
},
},
{
desc: "disjoint ranges and target is between",
mb: MultiFingerprintBounds{
NewBounds(0, 9),
NewBounds(30, 39),
},
target: NewBounds(15, 19),
exp: MultiFingerprintBounds{
NewBounds(0, 9),
NewBounds(15, 19),
NewBounds(30, 39),
},
},
} {
t.Run(tc.desc, func(t *testing.T) {
res := tc.mb.Union(tc.target)
assert.Equal(t, tc.exp, res)
})
}
}

@ -67,10 +67,8 @@ func (b BlockOptions) Encode(enc *encoding.Encbuf) {
}
func NewBlockOptions(enc compression.Codec, maxBlockSizeBytes, maxBloomSizeBytes uint64) BlockOptions {
opts := NewBlockOptionsFromSchema(Schema{
version: CurrentSchemaVersion,
encoding: enc,
}, maxBloomSizeBytes)
schema := NewSchema(CurrentSchemaVersion, enc)
opts := NewBlockOptionsFromSchema(schema, maxBloomSizeBytes)
opts.BlockSize = maxBlockSizeBytes
opts.UnencodedBlockOptions.MaxBloomSizeBytes = maxBloomSizeBytes
return opts
@ -209,7 +207,7 @@ func NewMergeBuilder(
// because blooms dont contain the label-set (only the fingerprint),
// in the case of a fingerprint collision we simply treat it as one
// series with multiple chunks.
combinedSeriesIter := iter.NewDedupingIter[*Series, *Series](
combinedSeriesIter := iter.NewDedupingIter(
// eq
func(s1, s2 *Series) bool {
return s1.Fingerprint == s2.Fingerprint
@ -223,7 +221,7 @@ func NewMergeBuilder(
Chunks: s1.Chunks.Union(s2.Chunks),
}
},
iter.NewPeekIter[*Series](store),
iter.NewPeekIter(store),
)
return &MergeBuilder{
@ -295,7 +293,7 @@ func (mb *MergeBuilder) processNextSeries(
chunksCopied += len(nextInStore.Chunks) - len(chunksToAdd)
preExistingBlooms = nextInBlocks.Blooms
// we also need to carry over existing indexed fields from the series metadata
info.indexedFields.Union(nextInBlocks.Series.Meta.Fields)
info.indexedFields.Union(nextInBlocks.Series.Fields)
}
chunksIndexed += len(chunksToAdd)

@ -42,10 +42,7 @@ func TestBlockOptions_BloomPageSize(t *testing.T) {
func TestBlockOptions_RoundTrip(t *testing.T) {
t.Parallel()
opts := BlockOptions{
Schema: Schema{
version: CurrentSchemaVersion,
encoding: compression.Snappy,
},
Schema: NewSchema(CurrentSchemaVersion, compression.Snappy),
SeriesPageSize: 100,
BloomPageSize: 10 << 10,
BlockSize: 10 << 20,
@ -100,10 +97,7 @@ func TestBlockBuilder_RoundTrip(t *testing.T) {
desc := fmt.Sprintf("%s/%s", tc.desc, enc)
t.Run(desc, func(t *testing.T) {
blockOpts := BlockOptions{
Schema: Schema{
version: CurrentSchemaVersion,
encoding: enc,
},
Schema: NewSchema(CurrentSchemaVersion, enc),
SeriesPageSize: 100,
BloomPageSize: 10 << 10,
BlockSize: tc.maxBlockSize,
@ -192,7 +186,7 @@ func TestBlockBuilder_RoundTrip(t *testing.T) {
func dedupedBlocks(blocks []iter.PeekIterator[*SeriesWithBlooms]) iter.Iterator[*SeriesWithBlooms] {
orderedBlocks := NewHeapIterForSeriesWithBloom(blocks...)
return iter.NewDedupingIter[*SeriesWithBlooms](
return iter.NewDedupingIter(
func(a *SeriesWithBlooms, b *SeriesWithBlooms) bool {
return a.Series.Fingerprint == b.Series.Fingerprint
},
@ -203,7 +197,7 @@ func dedupedBlocks(blocks []iter.PeekIterator[*SeriesWithBlooms]) iter.Iterator[
}
return b
},
iter.NewPeekIter[*SeriesWithBlooms](orderedBlocks),
iter.NewPeekIter(orderedBlocks),
)
}
@ -215,10 +209,7 @@ func TestMergeBuilder(t *testing.T) {
blocks := make([]iter.PeekIterator[*SeriesWithBlooms], 0, nBlocks)
data, _ := MkBasicSeriesWithBlooms(numSeries, 0, 0xffff, 0, 10000)
blockOpts := BlockOptions{
Schema: Schema{
version: CurrentSchemaVersion,
encoding: compression.Snappy,
},
Schema: NewSchema(CurrentSchemaVersion, compression.Snappy),
SeriesPageSize: 100,
BloomPageSize: 10 << 10,
}
@ -244,10 +235,10 @@ func TestMergeBuilder(t *testing.T) {
)
require.Nil(t, err)
itr := iter.NewSliceIter[SeriesWithBlooms](data[min:max])
itr := iter.NewSliceIter(data[min:max])
_, err = builder.BuildFrom(itr)
require.Nil(t, err)
blocks = append(blocks, iter.NewPeekIter[*SeriesWithBlooms](NewBlockQuerier(NewBlock(reader, NewMetrics(nil)), &mempool.SimpleHeapAllocator{}, DefaultMaxPageSize).Iter()))
blocks = append(blocks, iter.NewPeekIter(NewBlockQuerier(NewBlock(reader, NewMetrics(nil)), &mempool.SimpleHeapAllocator{}, DefaultMaxPageSize).Iter()))
}
// We're not testing the ability to extend a bloom in this test
@ -264,8 +255,8 @@ func TestMergeBuilder(t *testing.T) {
// storage should contain references to all the series we ingested,
// regardless of block allocation/overlap.
storeItr := iter.NewMapIter[SeriesWithBlooms, *Series](
iter.NewSliceIter[SeriesWithBlooms](data),
storeItr := iter.NewMapIter(
iter.NewSliceIter(data),
func(swb SeriesWithBlooms) *Series {
return &swb.Series.Series
},
@ -287,16 +278,16 @@ func TestMergeBuilder(t *testing.T) {
block := NewBlock(reader, NewMetrics(nil))
querier := NewBlockQuerier(block, &mempool.SimpleHeapAllocator{}, DefaultMaxPageSize)
EqualIterators[*SeriesWithBlooms](
EqualIterators(
t,
func(a, b *SeriesWithBlooms) {
require.Equal(t, a.Series.Series, b.Series.Series, "expected series %+v, got %+v", a.Series.Series, b.Series.Series)
require.Equal(t, a.Series.Meta.Fields, b.Series.Meta.Fields, "expected fields %+v, got %+v", a.Series.Meta.Fields, b.Series.Meta.Fields)
require.Equal(t, a.Series.Fields, b.Series.Fields, "expected fields %+v, got %+v", a.Series.Fields, b.Series.Fields)
// TODO(chaudum): Investigate why offsets not match
// This has not been tested before, so I'm not too worried about something being broken.
// require.Equal(t, a.Series.Meta.Offsets, b.Series.Meta.Offsets, "expected offsets %+v, got %+v", a.Series.Meta.Offsets, b.Series.Meta.Offsets)
},
iter.NewSliceIter[*SeriesWithBlooms](PointerSlice(data)),
iter.NewSliceIter(PointerSlice(data)),
querier.Iter(),
)
}
@ -312,10 +303,7 @@ func TestMergeBuilderFingerprintCollision(t *testing.T) {
reader := NewByteReader(indexBuf, bloomsBuf)
blockOpts := BlockOptions{
Schema: Schema{
version: CurrentSchemaVersion,
encoding: compression.Snappy,
},
Schema: NewSchema(CurrentSchemaVersion, compression.Snappy),
SeriesPageSize: 100,
BloomPageSize: 10 << 10,
}
@ -409,10 +397,7 @@ func TestBlockReset(t *testing.T) {
writer := NewMemoryBlockWriter(indexBuf, bloomsBuf)
reader := NewByteReader(indexBuf, bloomsBuf)
schema := Schema{
version: CurrentSchemaVersion,
encoding: compression.Snappy,
}
schema := NewSchema(CurrentSchemaVersion, compression.Snappy)
builder, err := NewBlockBuilder(
BlockOptions{
@ -424,7 +409,7 @@ func TestBlockReset(t *testing.T) {
)
require.Nil(t, err)
itr := iter.NewSliceIter[SeriesWithBlooms](data)
itr := iter.NewSliceIter(data)
_, err = builder.BuildFrom(itr)
require.Nil(t, err)
block := NewBlock(reader, NewMetrics(nil))
@ -434,7 +419,7 @@ func TestBlockReset(t *testing.T) {
for i := 0; i < len(rounds); i++ {
for querier.Next() {
rounds[i] = append(rounds[i], querier.At().Series.Fingerprint)
rounds[i] = append(rounds[i], querier.At().Fingerprint)
}
err = querier.Seek(0) // reset at end
@ -465,10 +450,9 @@ func TestMergeBuilder_Roundtrip(t *testing.T) {
}
blockOpts := BlockOptions{
Schema: Schema{
version: CurrentSchemaVersion,
encoding: compression.Snappy, // test with different encodings?
},
// test with different encodings?
Schema: NewSchema(CurrentSchemaVersion, compression.Snappy),
SeriesPageSize: 100,
BloomPageSize: 10 << 10,
}
@ -487,7 +471,7 @@ func TestMergeBuilder_Roundtrip(t *testing.T) {
require.Nil(t, err)
// each set of copies gets a different slice of the data
minIdx, maxIdx := i*len(xs)/len(sets), (i+1)*len(xs)/len(sets)
itr := iter.NewSliceIter[SeriesWithBlooms](xs[minIdx:maxIdx])
itr := iter.NewSliceIter(xs[minIdx:maxIdx])
_, err = builder.BuildFrom(itr)
require.Nil(t, err)
block := NewBlock(reader, NewMetrics(nil))
@ -509,12 +493,12 @@ func TestMergeBuilder_Roundtrip(t *testing.T) {
var store []iter.PeekIterator[*SeriesWithBlooms]
for _, x := range data {
blocks = append(blocks, iter.NewPeekIter[*SeriesWithBlooms](iter.NewSliceIter[*SeriesWithBlooms](x)))
store = append(store, iter.NewPeekIter[*SeriesWithBlooms](iter.NewSliceIter[*SeriesWithBlooms](x)))
blocks = append(blocks, iter.NewPeekIter(iter.NewSliceIter(x)))
store = append(store, iter.NewPeekIter(iter.NewSliceIter(x)))
}
orderedStore := NewHeapIterForSeriesWithBloom(store...)
dedupedStore := iter.NewDedupingIter[*SeriesWithBlooms, *Series](
dedupedStore := iter.NewDedupingIter(
func(a *SeriesWithBlooms, b *Series) bool {
return a.Series.Fingerprint == b.Fingerprint
},
@ -527,7 +511,7 @@ func TestMergeBuilder_Roundtrip(t *testing.T) {
}
return b
},
iter.NewPeekIter[*SeriesWithBlooms](orderedStore),
iter.NewPeekIter(orderedStore),
)
// We're not testing the ability to extend a bloom in this test
@ -568,9 +552,9 @@ func TestMergeBuilder_Roundtrip(t *testing.T) {
// ensure the new block contains one copy of all the data
// by comparing it against an iterator over the source data
mergedBlockQuerier := NewBlockQuerier(NewBlock(reader, NewMetrics(nil)), &mempool.SimpleHeapAllocator{}, DefaultMaxPageSize)
sourceItr := iter.NewSliceIter[*SeriesWithBlooms](PointerSlice[SeriesWithBlooms](xs))
sourceItr := iter.NewSliceIter(PointerSlice(xs))
EqualIterators[*SeriesWithBlooms](
EqualIterators(
t,
func(a, b *SeriesWithBlooms) {
require.Equal(t, a.Series.Fingerprint, b.Series.Fingerprint)

@ -18,7 +18,7 @@ func TestMergeDedupeIter(t *testing.T) {
)
for i := 0; i < len(queriers); i++ {
queriers[i] = iter.NewPeekIter[*SeriesWithBlooms](iter.NewSliceIter[*SeriesWithBlooms](dataPtr))
queriers[i] = iter.NewPeekIter(iter.NewSliceIter(dataPtr))
}
mbq := NewHeapIterForSeriesWithBloom(queriers...)
@ -28,11 +28,11 @@ func TestMergeDedupeIter(t *testing.T) {
merge := func(a, _ *SeriesWithBlooms) *SeriesWithBlooms {
return a
}
deduper := iter.NewDedupingIter[*SeriesWithBlooms, *SeriesWithBlooms](
deduper := iter.NewDedupingIter(
eq,
iter.Identity[*SeriesWithBlooms],
merge,
iter.NewPeekIter[*SeriesWithBlooms](mbq),
iter.NewPeekIter(mbq),
)
for i := 0; i < len(data); i++ {

@ -17,10 +17,6 @@ import (
"math/bits"
)
type BucketGetter interface {
Get(bucket uint) uint32
}
// Buckets is a fast, space-efficient array of buckets where each bucket can
// store up to a configured maximum value.
type Buckets struct {

@ -71,9 +71,7 @@ type ScalableBloomFilter struct {
const fillCheckFraction = 100
// NewScalableBloomFilter creates a new Scalable Bloom Filter with the
// specified target false-positive rate and tightening ratio. Use
// NewDefaultScalableBloomFilter if you don't want to calculate these
// parameters.
// specified target false-positive rate and tightening ratio.
func NewScalableBloomFilter(hint uint, fpRate, r float64) *ScalableBloomFilter {
s := &ScalableBloomFilter{
filters: make([]*PartitionedBloomFilter, 0, 1),
@ -88,11 +86,6 @@ func NewScalableBloomFilter(hint uint, fpRate, r float64) *ScalableBloomFilter {
return s
}
// NewDefaultScalableBloomFilter creates a new Scalable Bloom Filter.
func NewDefaultScalableBloomFilter() *ScalableBloomFilter {
return NewScalableBloomFilter(10e3, 0.1, 0.8)
}
// Capacity returns the current Scalable Bloom Filter capacity, which is the
// sum of the capacities for the contained series of Bloom filters.
func (s *ScalableBloomFilter) Capacity() uint {
@ -210,14 +203,6 @@ func (s *ScalableBloomFilter) TestAndAddWithMaxSize(data []byte, maxSize int) (e
return member, full
}
// Reset restores the Bloom filter to its original state. It returns the filter
// to allow for chaining.
func (s *ScalableBloomFilter) Reset() *ScalableBloomFilter {
s.filters = make([]*PartitionedBloomFilter, 0, 1)
s.addFilter()
return s
}
func (s *ScalableBloomFilter) nextFilterCapacity() (m uint, fpRate float64) {
fpRate = s.fp * math.Pow(s.r, float64(len(s.filters)))

@ -17,24 +17,6 @@ import (
"github.com/d4l3k/messagediff"
)
// Ensures that NewDefaultScalableBloomFilter creates a Scalable Bloom Filter
// with hint = 10000 and r = 0.8.
func TestNewDefaultScalableBloomFilter(t *testing.T) {
f := NewDefaultScalableBloomFilter()
if f.fp != 0.1 {
t.Errorf("Expected 0.1, got %f", f.fp)
}
if f.hint != 10000 {
t.Errorf("Expected 10000, got %d", f.hint)
}
if f.r != 0.8 {
t.Errorf("Expected 0.8, got %f", f.r)
}
}
// Ensures that K returns the number of hash functions used in each Bloom
// filter.
func TestScalableBloomK(t *testing.T) {
@ -111,34 +93,6 @@ func TestScalableBloomTestAndAdd(t *testing.T) {
}
}
// Ensures that Reset removes all Bloom filters and resets the initial one.
func TestScalableBloomReset(t *testing.T) {
f := NewScalableBloomFilter(10, 0.1, 0.8)
for i := 0; i < 1000; i++ {
f.Add([]byte(strconv.Itoa(i)))
}
if len(f.filters) < 2 {
t.Errorf("Expected more than 1 filter, got %d", len(f.filters))
}
if f.Reset() != f {
t.Error("Returned ScalableBloomFilter should be the same instance")
}
if len(f.filters) != 1 {
t.Errorf("Expected 1 filter, got %d", len(f.filters))
}
for _, partition := range f.filters[0].partitions {
for i := uint(0); i < partition.Count(); i++ {
if partition.Get(i) != 0 {
t.Error("Expected all bits to be unset")
}
}
}
}
// Ensures that ScalableBloomFilter can be serialized and deserialized without errors.
func TestScalableBloomGob(t *testing.T) {
f := NewScalableBloomFilter(10, 0.1, 0.8)

@ -137,14 +137,14 @@ type FusedQuerier struct {
}
func NewFusedQuerier(bq *BlockQuerier, inputs []iter.PeekIterator[Request], logger log.Logger) *FusedQuerier {
heap := NewHeapIterator[Request](
heap := NewHeapIterator(
func(a, b Request) bool {
return a.Fp < b.Fp
},
inputs...,
)
merging := iter.NewDedupingIter[Request, []Request](
merging := iter.NewDedupingIter(
func(a Request, b []Request) bool {
return a.Fp == b[0].Fp
},
@ -152,7 +152,7 @@ func NewFusedQuerier(bq *BlockQuerier, inputs []iter.PeekIterator[Request], logg
func(a Request, b []Request) []Request {
return append(b, a)
},
iter.NewPeekIter[Request](heap),
iter.NewPeekIter(heap),
)
return &FusedQuerier{
bq: bq,
@ -263,9 +263,10 @@ func (fq *FusedQuerier) runSeries(_ Schema, series *SeriesWithMeta, reqs []Reque
Missing ChunkRefs // chunks that do not exist in the blooms and cannot be queried
InBlooms ChunkRefs // chunks which do exist in the blooms and can be queried
found map[int]bool // map of the index in `InBlooms` to whether the chunk
// was found in _any_ of the blooms for the series. In order to
// be eligible for removal, a chunk must be found in _no_ blooms.
// Map of the index in `InBlooms` to whether the chunk was found in _any_
// of the blooms for the series. In order to be eligible for removal, a
// chunk must be found in _no_ blooms.
found map[int]bool
}
inputs := make([]inputChunks, 0, len(reqs))
@ -354,7 +355,6 @@ func (fq *FusedQuerier) runSeries(_ Schema, series *SeriesWithMeta, reqs []Reque
}
for i, req := range reqs {
removals := removalsFor(inputs[i].InBlooms, inputs[i].found)
req.Recorder.record(

@ -58,17 +58,14 @@ func TestFusedQuerier(t *testing.T) {
builder, err := NewBlockBuilder(
BlockOptions{
Schema: Schema{
version: CurrentSchemaVersion,
encoding: compression.Snappy,
},
Schema: NewSchema(CurrentSchemaVersion, compression.Snappy),
SeriesPageSize: 100,
BloomPageSize: 10 << 10,
},
writer,
)
require.Nil(t, err)
itr := v2.NewSliceIter[SeriesWithBlooms](data)
itr := v2.NewSliceIter(data)
_, err = builder.BuildFrom(itr)
require.NoError(t, err)
require.False(t, itr.Next())
@ -99,7 +96,7 @@ func TestFusedQuerier(t *testing.T) {
var itrs []v2.PeekIterator[Request]
for _, reqs := range inputs {
itrs = append(itrs, v2.NewPeekIter[Request](v2.NewSliceIter[Request](reqs)))
itrs = append(itrs, v2.NewPeekIter(v2.NewSliceIter(reqs)))
}
resps := make([][]Output, nReqs)
@ -145,10 +142,7 @@ func TestFusedQuerier_MultiPage(t *testing.T) {
builder, err := NewBlockBuilder(
BlockOptions{
Schema: Schema{
version: CurrentSchemaVersion,
encoding: compression.Snappy,
},
Schema: NewSchema(CurrentSchemaVersion, compression.Snappy),
SeriesPageSize: 100,
BloomPageSize: 10, // So we force one bloom per page
},
@ -294,17 +288,14 @@ func TestLazyBloomIter_Seek_ResetError(t *testing.T) {
builder, err := NewBlockBuilder(
BlockOptions{
Schema: Schema{
version: CurrentSchemaVersion,
encoding: compression.Snappy,
},
Schema: NewSchema(CurrentSchemaVersion, compression.Snappy),
SeriesPageSize: 100,
BloomPageSize: 10, // So we force one series per page
},
writer,
)
require.Nil(t, err)
itr := v2.NewSliceIter[SeriesWithBlooms](data)
itr := v2.NewSliceIter(data)
_, err = builder.BuildFrom(itr)
require.NoError(t, err)
require.False(t, itr.Next())
@ -352,10 +343,7 @@ func TestFusedQuerier_SkipsEmptyBlooms(t *testing.T) {
builder, err := NewBlockBuilder(
BlockOptions{
Schema: Schema{
version: CurrentSchemaVersion,
encoding: compression.None,
},
Schema: NewSchema(CurrentSchemaVersion, compression.Snappy),
SeriesPageSize: 100,
BloomPageSize: 10 << 10,
},
@ -379,7 +367,7 @@ func TestFusedQuerier_SkipsEmptyBlooms(t *testing.T) {
Blooms: v2.NewSliceIter([]*Bloom{NewBloom()}),
}
itr := v2.NewSliceIter[SeriesWithBlooms]([]SeriesWithBlooms{data})
itr := v2.NewSliceIter([]SeriesWithBlooms{data})
_, err = builder.BuildFrom(itr)
require.NoError(t, err)
require.False(t, itr.Next())
@ -394,7 +382,7 @@ func TestFusedQuerier_SkipsEmptyBlooms(t *testing.T) {
}
err = NewBlockQuerier(block, BloomPagePool, DefaultMaxPageSize).Fuse(
[]v2.PeekIterator[Request]{
v2.NewPeekIter[Request](v2.NewSliceIter[Request]([]Request{req})),
v2.NewPeekIter(v2.NewSliceIter([]Request{req})),
},
log.NewNopLogger(),
).Run()
@ -413,17 +401,14 @@ func setupBlockForBenchmark(b *testing.B) (*BlockQuerier, [][]Request, []chan Ou
builder, err := NewBlockBuilder(
BlockOptions{
Schema: Schema{
version: CurrentSchemaVersion,
encoding: compression.Snappy,
},
Schema: NewSchema(CurrentSchemaVersion, compression.Snappy),
SeriesPageSize: 256 << 10, // 256k
BloomPageSize: 1 << 20, // 1MB
},
writer,
)
require.Nil(b, err)
itr := v2.NewSliceIter[SeriesWithBlooms](data)
itr := v2.NewSliceIter(data)
_, err = builder.BuildFrom(itr)
require.Nil(b, err)
block := NewBlock(reader, NewMetrics(nil))
@ -485,7 +470,7 @@ func BenchmarkBlockQuerying(b *testing.B) {
for i := 0; i < b.N; i++ {
itrs = itrs[:0]
for _, reqs := range requestChains {
itrs = append(itrs, v2.NewPeekIter[Request](v2.NewSliceIter[Request](reqs)))
itrs = append(itrs, v2.NewPeekIter(v2.NewSliceIter(reqs)))
}
fused := querier.Fuse(itrs, log.NewNopLogger())
_ = fused.Run()

@ -5,15 +5,8 @@ import (
"github.com/efficientgo/core/errors"
"github.com/prometheus/common/model"
iter "github.com/grafana/loki/v3/pkg/iter/v2"
)
type SeriesIterator interface {
iter.Iterator[*SeriesWithMeta]
Reset()
}
type LazySeriesIter struct {
b *Block

@ -69,11 +69,6 @@ func (mbq *HeapIterator[T]) At() T {
return mbq.cache
}
func (mbq *HeapIterator[T]) push(x iter.PeekIterator[T]) {
mbq.itrs = append(mbq.itrs, x)
mbq.up(mbq.Len() - 1)
}
func (mbq *HeapIterator[T]) pop() (T, bool) {
for {
if mbq.Len() == 0 {

@ -21,7 +21,7 @@ func TestMergeBlockQuerier_NonOverlapping(t *testing.T) {
for j := 0; j < numSeries/numQueriers; j++ {
ptrs = append(ptrs, &data[i*numSeries/numQueriers+j])
}
queriers = append(queriers, v2.NewPeekIter[*SeriesWithBlooms](v2.NewSliceIter[*SeriesWithBlooms](ptrs)))
queriers = append(queriers, v2.NewPeekIter(v2.NewSliceIter(ptrs)))
}
mbq := NewHeapIterForSeriesWithBloom(queriers...)
@ -46,11 +46,7 @@ func TestMergeBlockQuerier_Duplicate(t *testing.T) {
for i := 0; i < numQueriers; i++ {
queriers = append(
queriers,
v2.NewPeekIter[*SeriesWithBlooms](
v2.NewSliceIter[*SeriesWithBlooms](
PointerSlice[SeriesWithBlooms](data),
),
),
v2.NewPeekIter(v2.NewSliceIter(PointerSlice(data))),
)
}
@ -79,7 +75,7 @@ func TestMergeBlockQuerier_Overlapping(t *testing.T) {
slices[i%numQueriers] = append(slices[i%numQueriers], &data[i])
}
for i := 0; i < numQueriers; i++ {
queriers = append(queriers, v2.NewPeekIter[*SeriesWithBlooms](v2.NewSliceIter[*SeriesWithBlooms](slices[i])))
queriers = append(queriers, v2.NewPeekIter(v2.NewSliceIter(slices[i])))
}
mbq := NewHeapIterForSeriesWithBloom(queriers...)

@ -61,7 +61,7 @@ func (r *ByteReader) TarEntries() (iter.Iterator[TarEntry], error) {
},
}
return iter.NewSliceIter[TarEntry](entries), err
return iter.NewSliceIter(entries), err
}
func (r *ByteReader) Cleanup() error {
@ -162,7 +162,7 @@ func (r *DirectoryBlockReader) TarEntries() (iter.Iterator[TarEntry], error) {
},
}
return iter.NewSliceIter[TarEntry](entries), nil
return iter.NewSliceIter(entries), nil
}
func (r *DirectoryBlockReader) Cleanup() error {

@ -33,7 +33,6 @@ const (
var (
SupportedVersions = []Version{V3}
ErrInvalidSchemaVersion = errors.New("invalid schema version")
ErrUnsupportedSchemaVersion = errors.New("unsupported schema version")
)
@ -42,10 +41,10 @@ type Schema struct {
encoding compression.Codec
}
func NewSchema() Schema {
func NewSchema(version Version, encoding compression.Codec) Schema {
return Schema{
version: CurrentSchemaVersion,
encoding: compression.None,
version: version,
encoding: encoding,
}
}
@ -80,7 +79,6 @@ func (s *Schema) Encode(enc *encoding.Encbuf) {
enc.PutBE32(magicNumber)
enc.PutByte(byte(s.version))
enc.PutByte(byte(s.encoding))
}
func (s *Schema) DecodeFrom(r io.ReadSeeker) error {

@ -28,17 +28,14 @@ func MakeBlock(t testing.TB, nth int, fromFp, throughFp model.Fingerprint, fromT
builder, err := NewBlockBuilder(
BlockOptions{
Schema: Schema{
version: CurrentSchemaVersion,
encoding: compression.Snappy,
},
Schema: NewSchema(CurrentSchemaVersion, compression.Snappy),
SeriesPageSize: 100,
BloomPageSize: 10 << 10,
},
writer,
)
require.Nil(t, err)
itr := iter.NewSliceIter[SeriesWithBlooms](data)
itr := iter.NewSliceIter(data)
_, err = builder.BuildFrom(itr)
require.Nil(t, err)
block := NewBlock(reader, NewMetrics(nil))

@ -26,10 +26,6 @@ var (
SeriesPagePool = mempool.NewBytePoolAllocator(1<<10, 128<<10, 2)
)
func newCRC32() hash.Hash32 {
return crc32.New(castagnoliTable)
}
type ChecksumPool struct {
sync.Pool
}

@ -81,7 +81,7 @@ func (b *V3Builder) BuildFrom(itr iter.Iterator[SeriesWithBlooms]) (uint32, erro
return 0, errors.Wrap(err, "iterating blooms")
}
blockFull, err := b.AddSeries(at.Series.Series, offsets, at.Series.Meta.Fields)
blockFull, err := b.AddSeries(at.Series.Series, offsets, at.Series.Fields)
if err != nil {
return 0, errors.Wrapf(err, "writing series")
}

@ -16,10 +16,7 @@ import (
// characterized by small page sizes
func smallBlockOpts(v Version, enc compression.Codec) BlockOptions {
return BlockOptions{
Schema: Schema{
version: v,
encoding: enc,
},
Schema: NewSchema(v, enc),
SeriesPageSize: 100,
BloomPageSize: 2 << 10,
BlockSize: 0, // unlimited
@ -55,11 +52,11 @@ func TestV3Roundtrip(t *testing.T) {
block := NewBlock(reader, NewMetrics(nil))
querier := NewBlockQuerier(block, &mempool.SimpleHeapAllocator{}, DefaultMaxPageSize).Iter()
CompareIterators[SeriesWithBlooms, *SeriesWithBlooms](
CompareIterators(
t,
func(t *testing.T, a SeriesWithBlooms, b *SeriesWithBlooms) {
require.Equal(t, a.Series.Series.Fingerprint, b.Series.Series.Fingerprint)
require.ElementsMatch(t, a.Series.Series.Chunks, b.Series.Series.Chunks)
require.Equal(t, a.Series.Fingerprint, b.Series.Fingerprint)
require.ElementsMatch(t, a.Series.Chunks, b.Series.Chunks)
bloomsA, err := v2.Collect(a.Blooms)
require.NoError(t, err)
bloomsB, err := v2.Collect(b.Blooms)

@ -289,10 +289,10 @@ func merge(extents []Extent, acc *accumulator) ([]Extent, error) {
return nil, err
}
return append(extents, Extent{
Start: acc.Extent.Start,
End: acc.Extent.End,
Start: acc.Start,
End: acc.End,
Response: anyResp,
TraceId: acc.Extent.TraceId,
TraceId: acc.TraceId,
}), nil
}
@ -386,7 +386,7 @@ func (s ResultsCache) partition(req Request, extents []Extent) ([]Request, []Res
// If start and end are the same (valid in promql), start == req.GetEnd() and we won't do the query.
// But we should only do the request if we don't have a valid cached response for it.
if req.GetStart() == req.GetEnd() && len(cachedResponses) == 0 {
if req.GetStart().Equal(req.GetEnd()) && len(cachedResponses) == 0 {
requests = append(requests, req)
}

Loading…
Cancel
Save