[bloom-compactor] downloading chunks in batches (#11649)

**What this PR does / why we need it**:
Added chunks batches iterator to download chunks in batches instead of
downloading all of them at once. Otherwise, when the stream contains a
lot of chunks, it can lead to OOM.

**Special notes for your reviewer**:

**Checklist**
- [x] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- [x] Documentation added
- [x] Tests updated
- [ ] `CHANGELOG.md` updated
- [ ] If the change is worth mentioning in the release notes, add
`add-to-release-notes` label
- [ ] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/setup/upgrade/_index.md`
- [ ] For Helm chart changes bump the Helm chart version in
`production/helm/loki/Chart.yaml` and update
`production/helm/loki/CHANGELOG.md` and
`production/helm/loki/README.md`. [Example
PR](d10549e3ec)
- [ ] If the change is deprecating or removing a configuration option,
update the `deprecated-config.yaml` and `deleted-config.yaml` files
respectively in the `tools/deprecated-config-checker` directory.
[Example
PR](0d4416a4b0)

---------

Signed-off-by: Vladyslav Diachenko <vlad.diachenko@grafana.com>
pull/11670/head^2
Vladyslav Diachenko 1 year ago committed by GitHub
parent 6ae46dc6ef
commit a5aa8b315d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      docs/sources/configure/_index.md
  2. 5
      pkg/bloomcompactor/bloomcompactor.go
  3. 53
      pkg/bloomcompactor/chunkcompactor.go
  4. 12
      pkg/bloomcompactor/chunkcompactor_test.go
  5. 48
      pkg/bloomcompactor/chunksbatchesiterator.go
  6. 96
      pkg/bloomcompactor/chunksbatchesiterator_test.go
  7. 1
      pkg/bloomcompactor/config.go
  8. 10
      pkg/bloomcompactor/mergecompactor.go
  9. 15
      pkg/bloomcompactor/sharding_test.go
  10. 120
      pkg/storage/bloom/v1/bloom_tokenizer.go
  11. 4
      pkg/storage/bloom/v1/bloom_tokenizer_test.go
  12. 6
      pkg/validation/limits.go
  13. 9
      pkg/validation/limits_test.go

@ -3094,6 +3094,10 @@ shard_streams:
# CLI flag: -bloom-compactor.enable-compaction
[bloom_compactor_enable_compaction: <boolean> | default = false]
# The batch size of the chunks the bloom-compactor downloads at once.
# CLI flag: -bloom-compactor.chunks-batch-size
[bloom_compactor_chunks_batch_size: <int> | default = 100]
# Length of the n-grams created when computing blooms from log lines.
# CLI flag: -bloom-compactor.ngram-length
[bloom_ngram_length: <int> | default = 4]

@ -535,8 +535,7 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job,
return err
}
fpRate := c.limits.BloomFalsePositiveRate(job.tenantID)
resultingBlock, err = compactNewChunks(ctx, logger, job, fpRate, bt, storeClient.chunk, builder)
resultingBlock, err = compactNewChunks(ctx, logger, job, bt, storeClient.chunk, builder, c.limits)
if err != nil {
return level.Error(logger).Log("msg", "failed compacting new chunks", "err", err)
}
@ -545,7 +544,7 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job,
// When already compacted metas exists, we need to merge all blocks with amending blooms with new series
level.Info(logger).Log("msg", "already compacted metas exists, use mergeBlockBuilder")
var populate = createPopulateFunc(ctx, logger, job, storeClient, bt)
var populate = createPopulateFunc(ctx, job, storeClient, bt, c.limits)
seriesIter := makeSeriesIterFromSeriesMeta(job)

@ -22,7 +22,7 @@ import (
)
type compactorTokenizer interface {
PopulateSeriesWithBloom(bloom *v1.SeriesWithBloom, chunks []chunk.Chunk) error
PopulateSeriesWithBloom(bloom *v1.SeriesWithBloom, chunkBatchesIterator v1.Iterator[[]chunk.Chunk]) error
}
type chunkClient interface {
@ -86,7 +86,7 @@ func makeChunkRefs(chksMetas []tsdbindex.ChunkMeta, tenant string, fp model.Fing
return chunkRefs
}
func buildBloomFromSeries(seriesMeta seriesMeta, fpRate float64, tokenizer compactorTokenizer, chunks []chunk.Chunk) (v1.SeriesWithBloom, error) {
func buildBloomFromSeries(seriesMeta seriesMeta, fpRate float64, tokenizer compactorTokenizer, chunks v1.Iterator[[]chunk.Chunk]) (v1.SeriesWithBloom, error) {
// Create a bloom for this series
bloomForChks := v1.SeriesWithBloom{
Series: &v1.Series{
@ -155,21 +155,20 @@ func createLocalDirName(workingDir string, job Job) string {
}
// Compacts given list of chunks, uploads them to storage and returns a list of bloomBlocks
func compactNewChunks(
ctx context.Context,
func compactNewChunks(ctx context.Context,
logger log.Logger,
job Job,
fpRate float64,
bt compactorTokenizer,
storeClient chunkClient,
builder blockBuilder,
limits Limits,
) (bloomshipper.Block, error) {
// Ensure the context has not been canceled (ie. compactor shutdown has been triggered).
if err := ctx.Err(); err != nil {
return bloomshipper.Block{}, err
}
bloomIter := newLazyBloomBuilder(ctx, job, storeClient, bt, fpRate, logger)
bloomIter := newLazyBloomBuilder(ctx, job, storeClient, bt, logger, limits)
// Build and upload bloomBlock to storage
block, err := buildBlockFromBlooms(ctx, logger, builder, bloomIter, job)
@ -182,13 +181,14 @@ func compactNewChunks(
}
type lazyBloomBuilder struct {
ctx context.Context
metas v1.Iterator[seriesMeta]
tenant string
client chunkClient
bt compactorTokenizer
fpRate float64
logger log.Logger
ctx context.Context
metas v1.Iterator[seriesMeta]
tenant string
client chunkClient
bt compactorTokenizer
fpRate float64
logger log.Logger
chunksBatchSize int
cur v1.SeriesWithBloom // retured by At()
err error // returned by Err()
@ -198,15 +198,16 @@ type lazyBloomBuilder struct {
// which are used by the blockBuilder to write a bloom block.
// We use an interator to avoid loading all blooms into memory first, before
// building the block.
func newLazyBloomBuilder(ctx context.Context, job Job, client chunkClient, bt compactorTokenizer, fpRate float64, logger log.Logger) *lazyBloomBuilder {
func newLazyBloomBuilder(ctx context.Context, job Job, client chunkClient, bt compactorTokenizer, logger log.Logger, limits Limits) *lazyBloomBuilder {
return &lazyBloomBuilder{
ctx: ctx,
metas: v1.NewSliceIter(job.seriesMetas),
client: client,
tenant: job.tenantID,
bt: bt,
fpRate: fpRate,
logger: logger,
ctx: ctx,
metas: v1.NewSliceIter(job.seriesMetas),
client: client,
tenant: job.tenantID,
bt: bt,
fpRate: limits.BloomFalsePositiveRate(job.tenantID),
logger: logger,
chunksBatchSize: limits.BloomCompactorChunksBatchSize(job.tenantID),
}
}
@ -218,20 +219,18 @@ func (it *lazyBloomBuilder) Next() bool {
}
meta := it.metas.At()
// Get chunks data from list of chunkRefs
chks, err := it.client.GetChunks(it.ctx, makeChunkRefs(meta.chunkRefs, it.tenant, meta.seriesFP))
batchesIterator, err := newChunkBatchesIterator(it.ctx, it.client, makeChunkRefs(meta.chunkRefs, it.tenant, meta.seriesFP), it.chunksBatchSize)
if err != nil {
it.err = err
it.cur = v1.SeriesWithBloom{}
level.Debug(it.logger).Log("err in getChunks", err)
level.Debug(it.logger).Log("msg", "err creating chunks batches iterator", "err", err)
return false
}
it.cur, err = buildBloomFromSeries(meta, it.fpRate, it.bt, chks)
it.cur, err = buildBloomFromSeries(meta, it.fpRate, it.bt, batchesIterator)
if err != nil {
it.err = err
it.cur = v1.SeriesWithBloom{}
level.Debug(it.logger).Log("err in buildBloomFromSeries", err)
level.Debug(it.logger).Log("msg", "err in buildBloomFromSeries", "err", err)
return false
}
return true

@ -59,7 +59,7 @@ func TestChunkCompactor_BuildBloomFromSeries(t *testing.T) {
chunks := []chunk.Chunk{createTestChunk(fp, label)}
mbt := mockBloomTokenizer{}
bloom, err := buildBloomFromSeries(seriesMeta, fpRate, &mbt, chunks)
bloom, err := buildBloomFromSeries(seriesMeta, fpRate, &mbt, v1.NewSliceIter([][]chunk.Chunk{chunks}))
require.NoError(t, err)
require.Equal(t, seriesMeta.seriesFP, bloom.Series.Fingerprint)
require.Equal(t, chunks, mbt.chunks)
@ -110,7 +110,7 @@ func TestChunkCompactor_CompactNewChunks(t *testing.T) {
pbb := mockPersistentBlockBuilder{}
// Run Compaction
compactedBlock, err := compactNewChunks(context.Background(), logger, job, fpRate, &mbt, &mcc, &pbb)
compactedBlock, err := compactNewChunks(context.Background(), logger, job, &mbt, &mcc, &pbb, mockLimits{fpRate: fpRate})
// Validate Compaction Succeeds
require.NoError(t, err)
@ -169,7 +169,7 @@ func TestLazyBloomBuilder(t *testing.T) {
mbt := &mockBloomTokenizer{}
mcc := &mockChunkClient{}
it := newLazyBloomBuilder(context.Background(), job, mcc, mbt, fpRate, logger)
it := newLazyBloomBuilder(context.Background(), job, mcc, mbt, logger, mockLimits{chunksDownloadingBatchSize: 10, fpRate: fpRate})
// first seriesMeta has 1 chunks
require.True(t, it.Next())
@ -199,8 +199,10 @@ type mockBloomTokenizer struct {
chunks []chunk.Chunk
}
func (mbt *mockBloomTokenizer) PopulateSeriesWithBloom(_ *v1.SeriesWithBloom, c []chunk.Chunk) error {
mbt.chunks = append(mbt.chunks, c...)
func (mbt *mockBloomTokenizer) PopulateSeriesWithBloom(_ *v1.SeriesWithBloom, c v1.Iterator[[]chunk.Chunk]) error {
for c.Next() {
mbt.chunks = append(mbt.chunks, c.At()...)
}
return nil
}

@ -0,0 +1,48 @@
package bloomcompactor
import (
"context"
"errors"
"github.com/grafana/loki/pkg/storage/chunk"
)
type chunksBatchesIterator struct {
context context.Context
client chunkClient
chunksToDownload []chunk.Chunk
batchSize int
currentBatch []chunk.Chunk
err error
}
func newChunkBatchesIterator(context context.Context, client chunkClient, chunksToDownload []chunk.Chunk, batchSize int) (*chunksBatchesIterator, error) {
if batchSize <= 0 {
return nil, errors.New("batchSize must be greater than 0")
}
return &chunksBatchesIterator{context: context, client: client, chunksToDownload: chunksToDownload, batchSize: batchSize}, nil
}
func (c *chunksBatchesIterator) Next() bool {
if len(c.chunksToDownload) == 0 {
return false
}
batchSize := c.batchSize
chunksToDownloadCount := len(c.chunksToDownload)
if chunksToDownloadCount < batchSize {
batchSize = chunksToDownloadCount
}
chunksToDownload := c.chunksToDownload[:batchSize]
c.chunksToDownload = c.chunksToDownload[batchSize:]
c.currentBatch, c.err = c.client.GetChunks(c.context, chunksToDownload)
return c.err == nil
}
func (c *chunksBatchesIterator) Err() error {
return c.err
}
func (c *chunksBatchesIterator) At() []chunk.Chunk {
return c.currentBatch
}

@ -0,0 +1,96 @@
package bloomcompactor
import (
"context"
"errors"
"testing"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/storage/chunk"
tsdbindex "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb/index"
)
func Test_chunksBatchesIterator(t *testing.T) {
tests := map[string]struct {
batchSize int
chunksToDownload []chunk.Chunk
constructorError error
hadNextCount int
}{
"expected error if batch size is set to 0": {
batchSize: 0,
constructorError: errors.New("batchSize must be greater than 0"),
},
"expected no error if there are no chunks": {
hadNextCount: 0,
batchSize: 10,
},
"expected 1 call to the client": {
chunksToDownload: createFakeChunks(10),
hadNextCount: 1,
batchSize: 20,
},
"expected 1 call to the client(2)": {
chunksToDownload: createFakeChunks(10),
hadNextCount: 1,
batchSize: 10,
},
"expected 2 calls to the client": {
chunksToDownload: createFakeChunks(10),
hadNextCount: 2,
batchSize: 6,
},
"expected 10 calls to the client": {
chunksToDownload: createFakeChunks(10),
hadNextCount: 10,
batchSize: 1,
},
}
for name, data := range tests {
t.Run(name, func(t *testing.T) {
client := &fakeClient{}
iterator, err := newChunkBatchesIterator(context.Background(), client, data.chunksToDownload, data.batchSize)
if data.constructorError != nil {
require.Equal(t, err, data.constructorError)
return
}
hadNextCount := 0
var downloadedChunks []chunk.Chunk
for iterator.Next() {
hadNextCount++
downloaded := iterator.At()
downloadedChunks = append(downloadedChunks, downloaded...)
require.LessOrEqual(t, len(downloaded), data.batchSize)
}
require.NoError(t, iterator.Err())
require.Equal(t, data.chunksToDownload, downloadedChunks)
require.Equal(t, data.hadNextCount, client.callsCount)
require.Equal(t, data.hadNextCount, hadNextCount)
})
}
}
func createFakeChunks(count int) []chunk.Chunk {
metas := make([]tsdbindex.ChunkMeta, 0, count)
for i := 0; i < count; i++ {
metas = append(metas, tsdbindex.ChunkMeta{
Checksum: uint32(i),
MinTime: int64(i),
MaxTime: int64(i + 100),
KB: uint32(i * 100),
Entries: uint32(i * 10),
})
}
return makeChunkRefs(metas, "fake", 0xFFFF)
}
type fakeClient struct {
callsCount int
}
func (f *fakeClient) GetChunks(_ context.Context, chunks []chunk.Chunk) ([]chunk.Chunk, error) {
f.callsCount++
return chunks, nil
}

@ -41,6 +41,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
type Limits interface {
downloads.Limits
BloomCompactorShardSize(tenantID string) int
BloomCompactorChunksBatchSize(userID string) int
BloomCompactorMaxTableAge(tenantID string) time.Duration
BloomCompactorEnabled(tenantID string) bool
BloomNGramLength(tenantID string) int

@ -2,6 +2,7 @@ package bloomcompactor
import (
"context"
"fmt"
"github.com/grafana/dskit/concurrency"
@ -74,7 +75,7 @@ func makeBlockIterFromBlocks(ctx context.Context, logger log.Logger,
return blockIters, blockPaths, nil
}
func createPopulateFunc(ctx context.Context, logger log.Logger, job Job, storeClient storeClient, bt *v1.BloomTokenizer) func(series *v1.Series, bloom *v1.Bloom) error {
func createPopulateFunc(ctx context.Context, job Job, storeClient storeClient, bt *v1.BloomTokenizer, limits Limits) func(series *v1.Series, bloom *v1.Bloom) error {
return func(series *v1.Series, bloom *v1.Bloom) error {
bloomForChks := v1.SeriesWithBloom{
Series: series,
@ -95,12 +96,11 @@ func createPopulateFunc(ctx context.Context, logger log.Logger, job Job, storeCl
}
}
chks, err := storeClient.chunk.GetChunks(ctx, chunkRefs)
batchesIterator, err := newChunkBatchesIterator(ctx, storeClient.chunk, chunkRefs, limits.BloomCompactorChunksBatchSize(job.tenantID))
if err != nil {
level.Error(logger).Log("msg", "failed downloading chunks", "err", err)
return err
return fmt.Errorf("error creating chunks batches iterator: %w", err)
}
err = bt.PopulateSeriesWithBloom(&bloomForChks, chks)
err = bt.PopulateSeriesWithBloom(&bloomForChks, batchesIterator)
if err != nil {
return err
}

@ -128,9 +128,22 @@ func TestShuffleSharding(t *testing.T) {
type mockLimits struct {
*validation.Overrides
bloomCompactorShardSize int
bloomCompactorShardSize int
chunksDownloadingBatchSize int
fpRate float64
}
func (m mockLimits) BloomFalsePositiveRate(_ string) float64 {
return m.fpRate
}
func (m mockLimits) BloomCompactorShardSize(_ string) int {
return m.bloomCompactorShardSize
}
func (m mockLimits) BloomCompactorChunksBatchSize(_ string) int {
if m.chunksDownloadingBatchSize != 0 {
return m.chunksDownloadingBatchSize
}
return 1
}

@ -2,6 +2,7 @@ package v1
import (
"context"
"fmt"
"math"
"time"
@ -82,75 +83,82 @@ func prefixedToken(ngram int, chk logproto.ChunkRef) ([]byte, int) {
}
// PopulateSeriesWithBloom is intended to be called on the write path, and is used to populate the bloom filter for a given series.
func (bt *BloomTokenizer) PopulateSeriesWithBloom(seriesWithBloom *SeriesWithBloom, chunks []chunk.Chunk) error {
func (bt *BloomTokenizer) PopulateSeriesWithBloom(seriesWithBloom *SeriesWithBloom, chunks Iterator[[]chunk.Chunk]) error {
startTime := time.Now().UnixMilli()
level.Debug(util_log.Logger).Log("msg", "PopulateSeriesWithBloom")
clearCache(bt.cache)
chunkTotalUncompressedSize := 0
for idx := range chunks {
lc := chunks[idx].Data.(*chunkenc.Facade).LokiChunk()
tokenBuf, prefixLn := prefixedToken(bt.lineTokenizer.N, chunks[idx].ChunkRef)
chunkTotalUncompressedSize += lc.UncompressedSize()
itr, err := lc.Iterator(
context.Background(),
time.Unix(0, 0), // TODO: Parameterize/better handle the timestamps?
time.Unix(0, math.MaxInt64),
logproto.FORWARD,
log.NewNoopPipeline().ForStream(chunks[idx].Metric),
)
if err != nil {
level.Error(util_log.Logger).Log("msg", "chunk iterator cannot be created", "err", err)
return err
}
defer itr.Close()
for itr.Next() && itr.Error() == nil {
chunkTokenizer := NewPrefixedTokenIter(tokenBuf, prefixLn, bt.lineTokenizer.Tokens(itr.Entry().Line))
for chunkTokenizer.Next() {
tok := chunkTokenizer.At()
if tok != nil {
str := string(tok)
_, found := bt.cache[str] // A cache is used ahead of the SBF, as it cuts out the costly operations of scaling bloom filters
if !found {
bt.cache[str] = nil
seriesWithBloom.Bloom.ScalableBloomFilter.TestAndAdd(tok)
if len(bt.cache) >= cacheSize { // While crude, this has proven efficient in performance testing. This speaks to the similarity in log lines near each other
clearCache(bt.cache)
for chunks.Next() {
chunksBatch := chunks.At()
for idx := range chunksBatch {
lc := chunksBatch[idx].Data.(*chunkenc.Facade).LokiChunk()
tokenBuf, prefixLn := prefixedToken(bt.lineTokenizer.N, chunksBatch[idx].ChunkRef)
chunkTotalUncompressedSize += lc.UncompressedSize()
itr, err := lc.Iterator(
context.Background(),
time.Unix(0, 0), // TODO: Parameterize/better handle the timestamps?
time.Unix(0, math.MaxInt64),
logproto.FORWARD,
log.NewNoopPipeline().ForStream(chunksBatch[idx].Metric),
)
if err != nil {
level.Error(util_log.Logger).Log("msg", "chunk iterator cannot be created", "err", err)
return err
}
defer itr.Close()
for itr.Next() && itr.Error() == nil {
chunkTokenizer := NewPrefixedTokenIter(tokenBuf, prefixLn, bt.lineTokenizer.Tokens(itr.Entry().Line))
for chunkTokenizer.Next() {
tok := chunkTokenizer.At()
if tok != nil {
str := string(tok)
_, found := bt.cache[str] // A cache is used ahead of the SBF, as it cuts out the costly operations of scaling bloom filters
if !found {
bt.cache[str] = nil
seriesWithBloom.Bloom.ScalableBloomFilter.TestAndAdd(tok)
if len(bt.cache) >= cacheSize { // While crude, this has proven efficient in performance testing. This speaks to the similarity in log lines near each other
clearCache(bt.cache)
}
}
}
}
}
lineTokenizer := bt.lineTokenizer.Tokens(itr.Entry().Line)
for lineTokenizer.Next() {
tok := lineTokenizer.At()
if tok != nil {
str := string(tok)
_, found := bt.cache[str] // A cache is used ahead of the SBF, as it cuts out the costly operations of scaling bloom filters
if !found {
bt.cache[str] = nil
seriesWithBloom.Bloom.ScalableBloomFilter.TestAndAdd(tok)
if len(bt.cache) >= cacheSize { // While crude, this has proven efficient in performance testing. This speaks to the similarity in log lines near each other
clearCache(bt.cache)
lineTokenizer := bt.lineTokenizer.Tokens(itr.Entry().Line)
for lineTokenizer.Next() {
tok := lineTokenizer.At()
if tok != nil {
str := string(tok)
_, found := bt.cache[str] // A cache is used ahead of the SBF, as it cuts out the costly operations of scaling bloom filters
if !found {
bt.cache[str] = nil
seriesWithBloom.Bloom.ScalableBloomFilter.TestAndAdd(tok)
if len(bt.cache) >= cacheSize { // While crude, this has proven efficient in performance testing. This speaks to the similarity in log lines near each other
clearCache(bt.cache)
}
}
}
}
}
}
seriesWithBloom.Series.Chunks = append(seriesWithBloom.Series.Chunks, ChunkRef{
Start: chunks[idx].From,
End: chunks[idx].Through,
Checksum: chunks[idx].Checksum,
})
} // for each chunk
}
seriesWithBloom.Series.Chunks = append(seriesWithBloom.Series.Chunks, ChunkRef{
Start: chunksBatch[idx].From,
End: chunksBatch[idx].Through,
Checksum: chunksBatch[idx].Checksum,
})
} // for each chunk
}
if err := chunks.Err(); err != nil {
level.Error(util_log.Logger).Log("msg", "error downloading chunks batch", "err", err)
return fmt.Errorf("error downloading chunks batch: %w", err)
}
endTime := time.Now().UnixMilli()

@ -123,7 +123,7 @@ func TestPopulateSeriesWithBloom(t *testing.T) {
Series: &series,
}
err := bt.PopulateSeriesWithBloom(&swb, chunks)
err := bt.PopulateSeriesWithBloom(&swb, NewSliceIter([][]chunk.Chunk{chunks}))
require.NoError(t, err)
tokenizer := NewNGramTokenizer(DefaultNGramLength, DefaultNGramSkip)
itr := tokenizer.Tokens(testLine)
@ -171,7 +171,7 @@ func BenchmarkPopulateSeriesWithBloom(b *testing.B) {
Series: &series,
}
err := bt.PopulateSeriesWithBloom(&swb, chunks)
err := bt.PopulateSeriesWithBloom(&swb, NewSliceIter([][]chunk.Chunk{chunks}))
require.NoError(b, err)
}
}

@ -188,6 +188,7 @@ type Limits struct {
BloomCompactorShardSize int `yaml:"bloom_compactor_shard_size" json:"bloom_compactor_shard_size"`
BloomCompactorMaxTableAge time.Duration `yaml:"bloom_compactor_max_table_age" json:"bloom_compactor_max_table_age"`
BloomCompactorEnabled bool `yaml:"bloom_compactor_enable_compaction" json:"bloom_compactor_enable_compaction"`
BloomCompactorChunksBatchSize int `yaml:"bloom_compactor_chunks_batch_size" json:"bloom_compactor_chunks_batch_size"`
BloomNGramLength int `yaml:"bloom_ngram_length" json:"bloom_ngram_length"`
BloomNGramSkip int `yaml:"bloom_ngram_skip" json:"bloom_ngram_skip"`
BloomFalsePositiveRate float64 `yaml:"bloom_false_positive_rate" json:"bloom_false_positive_rate"`
@ -316,6 +317,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&l.BloomCompactorShardSize, "bloom-compactor.shard-size", 1, "The shard size defines how many bloom compactors should be used by a tenant when computing blooms. If it's set to 0, shuffle sharding is disabled.")
f.DurationVar(&l.BloomCompactorMaxTableAge, "bloom-compactor.max-table-age", 7*24*time.Hour, "The maximum age of a table before it is compacted. Do not compact tables older than the the configured time. Default to 7 days. 0s means no limit.")
f.BoolVar(&l.BloomCompactorEnabled, "bloom-compactor.enable-compaction", false, "Whether to compact chunks into bloom filters.")
f.IntVar(&l.BloomCompactorChunksBatchSize, "bloom-compactor.chunks-batch-size", 100, "The batch size of the chunks the bloom-compactor downloads at once.")
f.IntVar(&l.BloomNGramLength, "bloom-compactor.ngram-length", 4, "Length of the n-grams created when computing blooms from log lines.")
f.IntVar(&l.BloomNGramSkip, "bloom-compactor.ngram-skip", 0, "Skip factor for the n-grams created when computing blooms from log lines.")
f.Float64Var(&l.BloomFalsePositiveRate, "bloom-compactor.false-positive-rate", 0.01, "Scalable Bloom Filter desired false-positive rate.")
@ -838,6 +840,10 @@ func (o *Overrides) BloomGatewayEnabled(userID string) bool {
return o.getOverridesForUser(userID).BloomGatewayEnabled
}
func (o *Overrides) BloomCompactorChunksBatchSize(userID string) int {
return o.getOverridesForUser(userID).BloomCompactorChunksBatchSize
}
func (o *Overrides) BloomCompactorShardSize(userID string) int {
return o.getOverridesForUser(userID).BloomCompactorShardSize
}

@ -6,7 +6,6 @@ import (
"testing"
"time"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -290,7 +289,7 @@ query_timeout: 5m
}
}
func TestLimitsValidation(t *testing.T) {
func TestLimitsValidation_deletionMode(t *testing.T) {
for _, tc := range []struct {
mode string
expected error
@ -300,7 +299,9 @@ func TestLimitsValidation(t *testing.T) {
{mode: "filter-and-delete", expected: nil},
{mode: "something-else", expected: deletionmode.ErrUnknownMode},
} {
limits := Limits{DeletionMode: tc.mode}
require.True(t, errors.Is(limits.Validate(), tc.expected))
t.Run(tc.mode, func(t *testing.T) {
limits := Limits{DeletionMode: tc.mode}
require.ErrorIs(t, limits.Validate(), tc.expected)
})
}
}

Loading…
Cancel
Save