|
|
|
@ -3,14 +3,17 @@ package v1 |
|
|
|
|
import ( |
|
|
|
|
"bytes" |
|
|
|
|
"context" |
|
|
|
|
"fmt" |
|
|
|
|
"sync" |
|
|
|
|
"testing" |
|
|
|
|
|
|
|
|
|
"github.com/go-kit/log" |
|
|
|
|
"github.com/grafana/dskit/concurrency" |
|
|
|
|
"github.com/prometheus/common/model" |
|
|
|
|
"github.com/stretchr/testify/require" |
|
|
|
|
|
|
|
|
|
"github.com/grafana/loki/v3/pkg/chunkenc" |
|
|
|
|
"github.com/grafana/loki/v3/pkg/storage/bloom/v1/filter" |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
// TODO(owen-d): this is unhinged from the data it represents. I'm leaving this solely so I don't
|
|
|
|
@ -44,6 +47,18 @@ func TestFusedQuerier(t *testing.T) { |
|
|
|
|
numSeries := 1000 |
|
|
|
|
data, keys := MkBasicSeriesWithBlooms(numSeries, 0, 0x0000, 0xffff, 0, 10000) |
|
|
|
|
|
|
|
|
|
// Make the first and third series blooms too big to fit into a single page so we skip them while reading
|
|
|
|
|
for i := 0; i < 10000; i++ { |
|
|
|
|
tokenizer := NewNGramTokenizer(4, 0) |
|
|
|
|
line := fmt.Sprintf("%04x:%04x", i, i+1) |
|
|
|
|
it := tokenizer.Tokens(line) |
|
|
|
|
for it.Next() { |
|
|
|
|
key := it.At() |
|
|
|
|
data[0].Bloom.Add(key) |
|
|
|
|
data[2].Bloom.Add(key) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
builder, err := NewBlockBuilder( |
|
|
|
|
BlockOptions{ |
|
|
|
|
Schema: Schema{ |
|
|
|
@ -130,6 +145,92 @@ func TestFusedQuerier(t *testing.T) { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func TestLazyBloomIter_Seek_ResetError(t *testing.T) { |
|
|
|
|
// references for linking in memory reader+writer
|
|
|
|
|
indexBuf := bytes.NewBuffer(nil) |
|
|
|
|
bloomsBuf := bytes.NewBuffer(nil) |
|
|
|
|
writer := NewMemoryBlockWriter(indexBuf, bloomsBuf) |
|
|
|
|
reader := NewByteReader(indexBuf, bloomsBuf) |
|
|
|
|
|
|
|
|
|
numSeries := 4 |
|
|
|
|
data := make([]SeriesWithBloom, 0, numSeries) |
|
|
|
|
tokenizer := NewNGramTokenizer(4, 0) |
|
|
|
|
for i := 0; i < numSeries; i++ { |
|
|
|
|
var series Series |
|
|
|
|
series.Fingerprint = model.Fingerprint(i) |
|
|
|
|
series.Chunks = []ChunkRef{ |
|
|
|
|
{ |
|
|
|
|
From: 0, |
|
|
|
|
Through: 100, |
|
|
|
|
Checksum: uint32(i), |
|
|
|
|
}, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
var bloom Bloom |
|
|
|
|
bloom.ScalableBloomFilter = *filter.NewScalableBloomFilter(1024, 0.01, 0.8) |
|
|
|
|
|
|
|
|
|
nLines := 10 |
|
|
|
|
if i == 0 || i == 2 { |
|
|
|
|
// Add enough lines to make the bloom page too large for series 1
|
|
|
|
|
nLines = 10000 |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
for j := 0; j < nLines; j++ { |
|
|
|
|
line := fmt.Sprintf("%04x:%04x", i, j) |
|
|
|
|
it := tokenizer.Tokens(line) |
|
|
|
|
for it.Next() { |
|
|
|
|
key := it.At() |
|
|
|
|
bloom.Add(key) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
data = append(data, SeriesWithBloom{ |
|
|
|
|
Series: &series, |
|
|
|
|
Bloom: &bloom, |
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
builder, err := NewBlockBuilder( |
|
|
|
|
BlockOptions{ |
|
|
|
|
Schema: Schema{ |
|
|
|
|
version: DefaultSchemaVersion, |
|
|
|
|
encoding: chunkenc.EncSnappy, |
|
|
|
|
}, |
|
|
|
|
SeriesPageSize: 100, |
|
|
|
|
BloomPageSize: 10, // So we force one series per page
|
|
|
|
|
}, |
|
|
|
|
writer, |
|
|
|
|
) |
|
|
|
|
require.Nil(t, err) |
|
|
|
|
itr := NewSliceIter[SeriesWithBloom](data) |
|
|
|
|
_, err = builder.BuildFrom(itr) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
require.False(t, itr.Next()) |
|
|
|
|
block := NewBlock(reader, NewMetrics(nil)) |
|
|
|
|
|
|
|
|
|
querier := NewBlockQuerier(block, true, 1000) |
|
|
|
|
|
|
|
|
|
for fp := model.Fingerprint(0); fp < model.Fingerprint(numSeries); fp++ { |
|
|
|
|
err := querier.Seek(fp) |
|
|
|
|
require.NoError(t, err) |
|
|
|
|
|
|
|
|
|
require.True(t, querier.series.Next()) |
|
|
|
|
series := querier.series.At() |
|
|
|
|
require.Equal(t, fp, series.Fingerprint) |
|
|
|
|
|
|
|
|
|
querier.blooms.Seek(series.Offset) |
|
|
|
|
|
|
|
|
|
if fp == 0 || fp == 2 { |
|
|
|
|
require.False(t, querier.blooms.Next()) |
|
|
|
|
require.Error(t, querier.blooms.Err()) |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
require.True(t, querier.blooms.Next()) |
|
|
|
|
require.NoError(t, querier.blooms.Err()) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func setupBlockForBenchmark(b *testing.B) (*BlockQuerier, [][]Request, []chan Output) { |
|
|
|
|
indexBuf := bytes.NewBuffer(nil) |
|
|
|
|
bloomsBuf := bytes.NewBuffer(nil) |
|
|
|
|