Bloom-compactor Sharding (#11154)

**What this PR does / why we need it**:
This PR adds tenant and fingerprint (FP) sharding to bloom compactors.
Note that the bloom-compactor doesn't yet perform any compaction, but
iterates through all tables, tenants, and series checking if the
compactor owns the tenant and the series (by the series FP). Actual
compaction will be implemented with
https://github.com/grafana/loki/pull/11115.

A new structure `Job` is added which will carry around all the context
for a compaction job such as the tenant ID, the table name, and the
series FP. The sharding strategy has two methods:
- `OwnsTenant(tenant string)`: Checks if the compactor shard owns the
tenant.
- `OwnsJob(job Job)`: Checks (again) if the compactor owns the job's
tenant. Then, it checks if the compactor owns the job's fingerprint by
looking inside the tenant subring.

We add a new per-tenant limit: `bloom_compactor_shard_size`. If it's 0,
the tenant can use all compactors (i.e. `OwnsTenant` will always return
`true`), otherwise, only `bloom_compactor_shard_size` out of the total
number of compactors will own the tenant. A given job's FP will be owned
by exactly one compactor within the tenant shard.

**Special notes for your reviewer**:
- Added a bunch of metrics in `metrics.go`
- Added a test for the sharding strategy
pull/11203/head
Salva Corts 2 years ago committed by GitHub
parent 0bc38e5775
commit 4248825ad2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 41
      docs/sources/configure/_index.md
  2. 572
      pkg/bloomcompactor/bloomcompactor.go
  3. 134
      pkg/bloomcompactor/bloomcompactor_test.go
  4. 26
      pkg/bloomcompactor/config.go
  5. 97
      pkg/bloomcompactor/job.go
  6. 105
      pkg/bloomcompactor/metrics.go
  7. 43
      pkg/bloomcompactor/sharding.go
  8. 145
      pkg/bloomcompactor/sharding_test.go
  9. 49
      pkg/bloomgateway/sharding.go
  10. 12
      pkg/loki/modules.go
  11. 2
      pkg/util/limiter/combined_limits.go
  12. 85
      pkg/util/ring/sharding.go
  13. 22
      pkg/validation/limits.go

@ -2527,7 +2527,31 @@ ring:
# CLI flag: -bloom-compactor.enabled
[enabled: <boolean> | default = false]
# Directory where files can be downloaded for compaction.
# CLI flag: -bloom-compactor.working-directory
[working_directory: <string> | default = ""]
# Interval at which to re-run the compaction operation.
# CLI flag: -bloom-compactor.compaction-interval
[compaction_interval: <duration> | default = 10m]
# Minimum backoff time between retries.
# CLI flag: -bloom-compactor.compaction-retries-min-backoff
[compaction_retries_min_backoff: <duration> | default = 10s]
# Maximum backoff time between retries.
# CLI flag: -bloom-compactor.compaction-retries-max-backoff
[compaction_retries_max_backoff: <duration> | default = 1m]
# Number of retries to perform when compaction fails.
# CLI flag: -bloom-compactor.compaction-retries
[compaction_retries: <int> | default = 3]
# Maximum number of tables to compact in parallel. While increasing this value,
# please make sure compactor has enough disk space allocated to be able to store
# and compact as many tables.
# CLI flag: -bloom-compactor.max-compaction-parallelism
[max_compaction_parallelism: <int> | default = 1]
```
### limits_config
@ -2914,6 +2938,23 @@ shard_streams:
# CLI flag: -bloom-gateway.shard-size
[bloom_gateway_shard_size: <int> | default = 1]
# The shard size defines how many bloom compactors should be used by a tenant
# when computing blooms. If it's set to 0, shuffle sharding is disabled.
# CLI flag: -bloom-compactor.shard-size
[bloom_compactor_shard_size: <int> | default = 1]
# The maximum age of a table before it is compacted. Do not compact tables older
# than the the configured time. Default to 7 days. 0s means no limit.
# CLI flag: -bloom-compactor.max-table-age
[bloom_compactor_max_table_age: <duration> | default = 168h]
# The minimum age of a table before it is compacted. Do not compact tables newer
# than the the configured time. Default to 1 hour. 0s means no limit. This is
# useful to avoid compacting tables that will be updated with out-of-order
# writes.
# CLI flag: -bloom-compactor.min-table-age
[bloom_compactor_min_table_age: <duration> | default = 1h]
# Allow user to send structured metadata in push payload.
# CLI flag: -validation.allow-structured-metadata
[allow_structured_metadata: <boolean> | default = false]

@ -30,36 +30,35 @@ import (
"math"
"os"
"path/filepath"
"strconv"
"strings"
"sort"
"time"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/storage/chunk"
chunk_client "github.com/grafana/loki/pkg/storage/chunk/client"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/downloads"
shipperindex "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/index"
index_storage "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/storage"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb"
tsdbindex "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb/index"
util_log "github.com/grafana/loki/pkg/util/log"
"github.com/go-kit/log"
"github.com/grafana/dskit/ring"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/concurrency"
"github.com/grafana/dskit/multierror"
"github.com/grafana/dskit/services"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/grafana/loki/pkg/compactor/retention"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/storage"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/bloom/v1/filter"
"github.com/grafana/loki/pkg/storage/chunk"
chunk_client "github.com/grafana/loki/pkg/storage/chunk/client"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper"
shipperindex "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/index"
index_storage "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/storage"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb"
tsdbindex "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb/index"
"github.com/grafana/loki/pkg/util"
)
const (
@ -70,15 +69,20 @@ const (
type Compactor struct {
services.Service
cfg Config
logger log.Logger
bloomCompactorRing ring.ReadRing
cfg Config
logger log.Logger
schemaCfg config.SchemaConfig
limits Limits
// temporary workaround until store has implemented read/write shipper interface
bloomShipperClient bloomshipper.Client
// Client used to run operations on the bucket storing bloom blocks.
storeClients map[config.DayTime]storeClient
// temporary workaround until store has implemented read/write shipper interface
bloomShipperClient bloomshipper.Client
sharding ShardingStrategy
metrics *metrics
}
type storeClient struct {
@ -88,21 +92,25 @@ type storeClient struct {
indexShipper indexshipper.IndexShipper
}
func New(cfg Config,
readRing ring.ReadRing,
func New(
cfg Config,
storageCfg storage.Config,
schemaConfig config.SchemaConfig,
limits downloads.Limits,
limits Limits,
logger log.Logger,
sharding ShardingStrategy,
clientMetrics storage.ClientMetrics,
_ prometheus.Registerer) (*Compactor, error) {
r prometheus.Registerer,
) (*Compactor, error) {
c := &Compactor{
cfg: cfg,
logger: logger,
bloomCompactorRing: readRing,
cfg: cfg,
logger: logger,
schemaCfg: schemaConfig,
sharding: sharding,
limits: limits,
}
//Configure BloomClient for meta.json management
// Configure BloomClient for meta.json management
bloomClient, err := bloomshipper.NewBloomClient(schemaConfig.Configs, storageCfg, clientMetrics)
if err != nil {
return nil, err
@ -118,11 +126,11 @@ func New(cfg Config,
case config.BoltDBShipperType:
indexStorageCfg = storageCfg.BoltDBShipperConfig.Config
default:
level.Warn(util_log.Logger).Log("msg", "skipping period because index type is unsupported")
level.Warn(c.logger).Log("msg", "skipping period because index type is unsupported")
continue
}
//Configure ObjectClient and IndexShipper for series and chunk management
// Configure ObjectClient and IndexShipper for series and chunk management
objectClient, err := storage.NewObjectClient(periodicConfig.ObjectType, storageCfg, clientMetrics)
if err != nil {
return nil, fmt.Errorf("error creating object client '%s': %w", periodicConfig.ObjectType, err)
@ -157,33 +165,274 @@ func New(cfg Config,
chunk: chunk_client.NewClient(objectClient, nil, schemaConfig),
indexShipper: indexShipper,
}
}
// temporary workaround until store has implemented read/write shipper interface
c.bloomShipperClient = bloomClient
// TODO use a new service with a loop
c.Service = services.NewIdleService(c.starting, c.stopping)
c.metrics = newMetrics(r)
c.metrics.compactionRunInterval.Set(cfg.CompactionInterval.Seconds())
c.Service = services.NewBasicService(c.starting, c.running, c.stopping)
return c, nil
}
func (c *Compactor) starting(_ context.Context) error {
return nil
func (c *Compactor) starting(_ context.Context) (err error) {
c.metrics.compactorRunning.Set(1)
return err
}
func (c *Compactor) running(ctx context.Context) error {
// Run an initial compaction before starting the interval.
if err := c.runCompaction(ctx); err != nil {
level.Error(c.logger).Log("msg", "failed to run compaction", "err", err)
}
ticker := time.NewTicker(util.DurationWithJitter(c.cfg.CompactionInterval, 0.05))
defer ticker.Stop()
for {
select {
case <-ticker.C:
c.metrics.compactionRunsStarted.Inc()
if err := c.runCompaction(ctx); err != nil {
c.metrics.compactionRunsErred.Inc()
level.Error(c.logger).Log("msg", "failed to run compaction", "err", err)
continue
}
c.metrics.compactionRunsCompleted.Inc()
case <-ctx.Done():
return nil
}
}
}
func (c *Compactor) stopping(_ error) error {
c.metrics.compactorRunning.Set(0)
return nil
}
type Series struct { // TODO this can be replaced with Job struct based on Salva's ring work.
tableName, tenant string
labels labels.Labels
fingerPrint model.Fingerprint
chunks []chunk.Chunk
from, through model.Time
indexPath string
func (c *Compactor) runCompaction(ctx context.Context) error {
var tables []string
for _, sc := range c.storeClients {
// refresh index list cache since previous compaction would have changed the index files in the object store
sc.index.RefreshIndexTableNamesCache(ctx)
tbls, err := sc.index.ListTables(ctx)
if err != nil {
return fmt.Errorf("failed to list tables: %w", err)
}
tables = append(tables, tbls...)
}
// process most recent tables first
tablesIntervals := getIntervalsForTables(tables)
sortTablesByRange(tables, tablesIntervals)
parallelism := c.cfg.MaxCompactionParallelism
if parallelism == 0 {
parallelism = len(tables)
}
// TODO(salvacorts): We currently parallelize at the table level. We may want to parallelize at the tenant and job level as well.
// To do that, we should create a worker pool with c.cfg.MaxCompactionParallelism number of workers.
errs := multierror.New()
_ = concurrency.ForEachJob(ctx, len(tables), parallelism, func(ctx context.Context, i int) error {
tableName := tables[i]
logger := log.With(c.logger, "table", tableName)
level.Info(logger).Log("msg", "compacting table")
err := c.compactTable(ctx, logger, tableName, tablesIntervals[tableName])
if err != nil {
errs.Add(err)
return nil
}
level.Info(logger).Log("msg", "finished compacting table")
return nil
})
return errs.Err()
}
func (c *Compactor) compactTable(ctx context.Context, logger log.Logger, tableName string, tableInterval model.Interval) error {
// Ensure the context has not been canceled (ie. compactor shutdown has been triggered).
if err := ctx.Err(); err != nil {
return fmt.Errorf("interrupting compaction of table: %w", err)
}
schemaCfg, ok := schemaPeriodForTable(c.schemaCfg, tableName)
if !ok {
level.Error(logger).Log("msg", "skipping compaction since we can't find schema for table")
return nil
}
sc, ok := c.storeClients[schemaCfg.From]
if !ok {
return fmt.Errorf("index store client not found for period starting at %s", schemaCfg.From.String())
}
_, tenants, err := sc.index.ListFiles(ctx, tableName, false)
if err != nil {
return fmt.Errorf("failed to list files for table %s: %w", tableName, err)
}
c.metrics.compactionRunDiscoveredTenants.Add(float64(len(tenants)))
level.Info(logger).Log("msg", "discovered tenants from bucket", "users", len(tenants))
return c.compactUsers(ctx, logger, sc, tableName, tableInterval, tenants)
}
// See: https://github.com/grafana/mimir/blob/34852137c332d4050e53128481f4f6417daee91e/pkg/compactor/compactor.go#L566-L689
func (c *Compactor) compactUsers(ctx context.Context, logger log.Logger, sc storeClient, tableName string, tableInterval model.Interval, tenants []string) error {
// Keep track of tenants owned by this shard, so that we can delete the local files for all other users.
errs := multierror.New()
ownedTenants := make(map[string]struct{}, len(tenants))
for _, tenant := range tenants {
tenantLogger := log.With(logger, "tenant", tenant)
// Ensure the context has not been canceled (ie. compactor shutdown has been triggered).
if err := ctx.Err(); err != nil {
return fmt.Errorf("interrupting compaction of tenants: %w", err)
}
// Skip this table if it is too new/old for the tenant limits.
now := model.Now()
tableMinAge := c.limits.BloomCompactorMinTableAge(tenant)
tableMaxAge := c.limits.BloomCompactorMaxTableAge(tenant)
if tableMinAge > 0 && tableInterval.End.After(now.Add(-tableMinAge)) {
level.Debug(tenantLogger).Log("msg", "skipping tenant because table is too new ", "table-min-age", tableMinAge, "table-end", tableInterval.End, "now", now)
continue
}
if tableMaxAge > 0 && tableInterval.Start.Before(now.Add(-tableMaxAge)) {
level.Debug(tenantLogger).Log("msg", "skipping tenant because table is too old", "table-max-age", tableMaxAge, "table-start", tableInterval.Start, "now", now)
continue
}
// Ensure the tenant ID belongs to our shard.
if !c.sharding.OwnsTenant(tenant) {
c.metrics.compactionRunSkippedTenants.Inc()
level.Debug(tenantLogger).Log("msg", "skipping tenant because it is not owned by this shard")
continue
}
ownedTenants[tenant] = struct{}{}
if err := c.compactTenantWithRetries(ctx, tenantLogger, sc, tableName, tenant); err != nil {
switch {
case errors.Is(err, context.Canceled):
// We don't want to count shutdowns as failed compactions because we will pick up with the rest of the compaction after the restart.
level.Info(tenantLogger).Log("msg", "compaction for tenant was interrupted by a shutdown")
return nil
default:
c.metrics.compactionRunFailedTenants.Inc()
level.Error(tenantLogger).Log("msg", "failed to compact tenant", "err", err)
errs.Add(err)
}
continue
}
c.metrics.compactionRunSucceededTenants.Inc()
level.Info(tenantLogger).Log("msg", "successfully compacted tenant")
}
return errs.Err()
// TODO: Delete local files for unowned tenants, if there are any.
}
func (c *Compactor) compactTenant(ctx context.Context, logger log.Logger, sc storeClient, tableName string, tenant string) error {
level.Info(logger).Log("msg", "starting compaction of tenant")
// Ensure the context has not been canceled (ie. compactor shutdown has been triggered).
if err := ctx.Err(); err != nil {
return err
}
// Tokenizer is not thread-safe so we need one per goroutine.
bt, _ := v1.NewBloomTokenizer(prometheus.DefaultRegisterer)
// TODO: Use ForEachConcurrent?
errs := multierror.New()
if err := sc.indexShipper.ForEach(ctx, tableName, tenant, func(isMultiTenantIndex bool, idx shipperindex.Index) error {
if isMultiTenantIndex {
return fmt.Errorf("unexpected multi-tenant")
}
// TODO: Make these casts safely
if err := idx.(*tsdb.TSDBFile).Index.(*tsdb.TSDBIndex).ForSeries(
ctx, nil,
0, math.MaxInt64, // TODO: Replace with MaxLookBackPeriod
func(labels labels.Labels, fingerprint model.Fingerprint, chksMetas []tsdbindex.ChunkMeta) {
job := NewJob(tenant, tableName, idx.Path(), fingerprint, labels, chksMetas)
jobLogger := log.With(logger, "job", job.String())
ownsJob, err := c.sharding.OwnsJob(job)
if err != nil {
c.metrics.compactionRunUnownedJobs.Inc()
level.Error(jobLogger).Log("msg", "failed to check if compactor owns job", "err", err)
errs.Add(err)
return
}
if !ownsJob {
c.metrics.compactionRunUnownedJobs.Inc()
level.Debug(jobLogger).Log("msg", "skipping job because it is not owned by this shard")
return
}
if err := c.runCompact(ctx, jobLogger, job, c.bloomShipperClient, bt, sc); err != nil {
c.metrics.compactionRunFailedJobs.Inc()
errs.Add(errors.Wrap(err, "runBloomCompact"))
return
}
c.metrics.compactionRunSucceededJobs.Inc()
},
); err != nil {
errs.Add(err)
}
return nil
}); err != nil {
errs.Add(err)
}
return errs.Err()
}
func runWithRetries(
ctx context.Context,
minBackoff, maxBackoff time.Duration,
maxRetries int,
f func(ctx context.Context) error,
) error {
var lastErr error
retries := backoff.New(ctx, backoff.Config{
MinBackoff: minBackoff,
MaxBackoff: maxBackoff,
MaxRetries: maxRetries,
})
for retries.Ongoing() {
lastErr = f(ctx)
if lastErr == nil {
return nil
}
retries.Wait()
}
return lastErr
}
func (c *Compactor) compactTenantWithRetries(ctx context.Context, logger log.Logger, sc storeClient, tableName string, tenant string) error {
return runWithRetries(
ctx,
c.cfg.RetryMinBackoff,
c.cfg.RetryMaxBackoff,
c.cfg.CompactionRetries,
func(ctx context.Context) error {
return c.compactTenant(ctx, logger, sc, tableName, tenant)
},
)
}
func makeChunkRefs(chksMetas []tsdbindex.ChunkMeta, tenant string, fp model.Fingerprint) []chunk.Chunk {
@ -204,39 +453,44 @@ func makeChunkRefs(chksMetas []tsdbindex.ChunkMeta, tenant string, fp model.Fing
}
// TODO Revisit this step once v1/bloom lib updated to combine blooms in the same series
func buildBloomBlock(bloomForChks v1.SeriesWithBloom, series Series, workingDir string) (bloomshipper.Block, error) {
localDst := createLocalDirName(workingDir, series)
func buildBloomBlock(ctx context.Context, logger log.Logger, bloomForChks v1.SeriesWithBloom, job Job, workingDir string) (bloomshipper.Block, error) {
// Ensure the context has not been canceled (ie. compactor shutdown has been triggered).
if err := ctx.Err(); err != nil {
return bloomshipper.Block{}, err
}
//write bloom to a local dir
localDst := createLocalDirName(workingDir, job)
// write bloom to a local dir
builder, err := v1.NewBlockBuilder(v1.NewBlockOptions(), v1.NewDirectoryBlockWriter(localDst))
if err != nil {
level.Info(util_log.Logger).Log("creating builder", err)
level.Error(logger).Log("creating builder", err)
return bloomshipper.Block{}, err
}
checksum, err := builder.BuildFrom(v1.NewSliceIter([]v1.SeriesWithBloom{bloomForChks}))
if err != nil {
level.Info(util_log.Logger).Log("writing bloom", err)
level.Error(logger).Log("writing bloom", err)
return bloomshipper.Block{}, err
}
blockFile, err := os.Open(filepath.Join(localDst, bloomFileName))
if err != nil {
level.Info(util_log.Logger).Log("reading bloomBlock", err)
level.Error(logger).Log("reading bloomBlock", err)
}
blocks := bloomshipper.Block{
BlockRef: bloomshipper.BlockRef{
Ref: bloomshipper.Ref{
TenantID: series.tenant,
TableName: series.tableName,
MinFingerprint: uint64(series.fingerPrint), //TODO will change once we compact multiple blooms into a block
MaxFingerprint: uint64(series.fingerPrint),
StartTimestamp: series.from.Unix(),
EndTimestamp: series.through.Unix(),
TenantID: job.Tenant(),
TableName: job.TableName(),
MinFingerprint: uint64(job.Fingerprint()), // TODO will change once we compact multiple blooms into a block
MaxFingerprint: uint64(job.Fingerprint()),
StartTimestamp: job.From().Unix(),
EndTimestamp: job.Through().Unix(),
Checksum: checksum,
},
IndexPath: series.indexPath,
IndexPath: job.IndexPath(),
},
Data: blockFile,
}
@ -244,50 +498,21 @@ func buildBloomBlock(bloomForChks v1.SeriesWithBloom, series Series, workingDir
return blocks, nil
}
// TODO Will be replaced with ring implementation in https://github.com/grafana/loki/pull/11154/
func listSeriesForBlooms(ctx context.Context, objectClient storeClient) ([]Series, error) {
// Returns all the TSDB files, including subdirectories
prefix := "index/"
indices, _, err := objectClient.object.List(ctx, prefix, "")
if err != nil {
return nil, err
}
var result []Series
for _, index := range indices {
s := strings.Split(index.Key, "/")
if len(s) > 3 {
tableName := s[1]
if !strings.HasPrefix(tableName, "loki_") || strings.Contains(tableName, "backup") {
continue
}
userID := s[2]
_, err := strconv.Atoi(userID)
if err != nil {
continue
}
result = append(result, Series{tableName: tableName, tenant: userID, indexPath: index.Key})
}
}
return result, nil
}
func createLocalDirName(workingDir string, series Series) string {
dir := fmt.Sprintf("bloomBlock-%s-%s-%s-%s-%s-%s", series.tableName, series.tenant, series.fingerPrint, series.fingerPrint, series.from, series.through)
func createLocalDirName(workingDir string, job Job) string {
dir := fmt.Sprintf("bloomBlock-%s-%s-%s-%s-%s-%s", job.TableName(), job.Tenant(), job.Fingerprint(), job.Fingerprint(), job.From(), job.Through())
return filepath.Join(workingDir, dir)
}
func CompactNewChunks(ctx context.Context, series Series, bt *v1.BloomTokenizer, bloomShipperClient bloomshipper.Client, dst string) (err error) {
func CompactNewChunks(ctx context.Context, logger log.Logger, job Job, chunks []chunk.Chunk, bt *v1.BloomTokenizer, bloomShipperClient bloomshipper.Client, dst string) (err error) {
// Ensure the context has not been canceled (ie. compactor shutdown has been triggered).
if err := ctx.Err(); err != nil {
return err
}
// Create a bloom for this series
bloomForChks := v1.SeriesWithBloom{
Series: &v1.Series{
Fingerprint: series.fingerPrint,
Fingerprint: job.Fingerprint(),
},
Bloom: &v1.Bloom{
ScalableBloomFilter: *filter.NewDefaultScalableBloomFilter(fpRate),
@ -295,19 +520,18 @@ func CompactNewChunks(ctx context.Context, series Series, bt *v1.BloomTokenizer,
}
// Tokenize data into n-grams
bt.PopulateSeriesWithBloom(&bloomForChks, series.chunks)
bt.PopulateSeriesWithBloom(&bloomForChks, chunks)
// Build and upload bloomBlock to storage
blocks, err := buildBloomBlock(bloomForChks, series, dst)
blocks, err := buildBloomBlock(ctx, logger, bloomForChks, job, dst)
if err != nil {
level.Info(util_log.Logger).Log("building bloomBlocks", err)
level.Error(logger).Log("building bloomBlocks", err)
return
}
storedBlocks, err := bloomShipperClient.PutBlocks(ctx, []bloomshipper.Block{blocks})
if err != nil {
level.Info(util_log.Logger).Log("putting blocks to storage", err)
level.Error(logger).Log("putting blocks to storage", err)
return
}
@ -319,108 +543,80 @@ func CompactNewChunks(ctx context.Context, series Series, bt *v1.BloomTokenizer,
Blocks: storedBlockRefs,
}
//TODO move this to an outer layer, otherwise creates a meta per block
// TODO move this to an outer layer, otherwise creates a meta per block
err = bloomShipperClient.PutMeta(ctx, meta)
if err != nil {
level.Info(util_log.Logger).Log("putting meta.json to storage", err)
level.Error(logger).Log("putting meta.json to storage", err)
return
}
return nil
}
func (c *Compactor) runCompact(ctx context.Context, bloomShipperClient bloomshipper.Client, storeClient storeClient) error {
series, err := listSeriesForBlooms(ctx, storeClient)
// TODO tokenizer is not thread-safe
// consider moving to Job/worker level with https://github.com/grafana/loki/pull/11154/
// create a tokenizer
bt, _ := v1.NewBloomTokenizer(prometheus.DefaultRegisterer)
if err != nil {
func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job, bloomShipperClient bloomshipper.Client, bt *v1.BloomTokenizer, storeClient storeClient) error {
// Ensure the context has not been canceled (ie. compactor shutdown has been triggered).
if err := ctx.Err(); err != nil {
return err
}
for _, s := range series {
err := storeClient.indexShipper.ForEach(ctx, s.tableName, s.tenant, func(isMultiTenantIndex bool, idx shipperindex.Index) error {
if isMultiTenantIndex {
return nil
}
// TODO call bloomShipperClient.GetMetas to get existing meta.json
var metas []bloomshipper.Meta
// TODO make this casting safe
_ = idx.(*tsdb.TSDBFile).Index.(*tsdb.TSDBIndex).ForSeries(
ctx,
nil, // Process all shards
0, math.MaxInt64, // Replace with MaxLookBackPeriod
// Get chunks for a series label and a fp
func(ls labels.Labels, fp model.Fingerprint, chksMetas []tsdbindex.ChunkMeta) {
// TODO call bloomShipperClient.GetMetas to get existing meta.json
var metas []bloomshipper.Meta
if len(metas) == 0 {
// Get chunks data from list of chunkRefs
chks, err := storeClient.chunk.GetChunks(
ctx,
makeChunkRefs(chksMetas, s.tenant, fp),
)
if err != nil {
level.Info(util_log.Logger).Log("getting chunks", err)
return
}
// effectively get min and max of timestamps of the list of chunks in a series
// There must be a better way to get this, ordering chunkRefs by timestamp doesn't fully solve it
// chunk files name have this info in ObjectStore, but it's not really exposed
minFrom := model.Latest
maxThrough := model.Earliest
for _, c := range chks {
if minFrom > c.From {
minFrom = c.From
}
if maxThrough < c.From {
maxThrough = c.Through
}
}
series := Series{
tableName: s.tableName,
tenant: s.tenant,
labels: ls,
fingerPrint: fp,
chunks: chks,
from: minFrom,
through: maxThrough,
indexPath: s.indexPath,
}
err = CompactNewChunks(ctx, series, bt, bloomShipperClient, c.cfg.WorkingDirectory)
if err != nil {
return
}
} else {
// TODO complete part 2 - periodic compaction for delta from previous period
// When already compacted metas exists
// Deduplicate index paths
uniqueIndexPaths := make(map[string]struct{})
for _, meta := range metas {
for _, blockRef := range meta.Blocks {
uniqueIndexPaths[blockRef.IndexPath] = struct{}{}
//...
}
}
}
})
return nil
})
if len(metas) == 0 {
// Get chunks data from list of chunkRefs
chks, err := storeClient.chunk.GetChunks(
ctx,
makeChunkRefs(job.Chunks(), job.Tenant(), job.Fingerprint()),
)
if err != nil {
return err
}
err = CompactNewChunks(ctx, logger, job, chks, bt, bloomShipperClient, c.cfg.WorkingDirectory)
if err != nil {
return errors.Wrap(err, "getting each series")
return err
}
} else {
// TODO complete part 2 - periodic compaction for delta from previous period
// When already compacted metas exists
// Deduplicate index paths
uniqueIndexPaths := make(map[string]struct{})
for _, meta := range metas {
for _, blockRef := range meta.Blocks {
uniqueIndexPaths[blockRef.IndexPath] = struct{}{}
// ...
}
}
}
return nil
}
func getIntervalsForTables(tables []string) map[string]model.Interval {
tablesIntervals := make(map[string]model.Interval, len(tables))
for _, table := range tables {
tablesIntervals[table] = retention.ExtractIntervalFromTableName(table)
}
return tablesIntervals
}
func sortTablesByRange(tables []string, intervals map[string]model.Interval) {
sort.Slice(tables, func(i, j int) bool {
// less than if start time is after produces a most recent first sort order
return intervals[tables[i]].Start.After(intervals[tables[j]].Start)
})
}
// TODO: comes from pkg/compactor/compactor.go
func schemaPeriodForTable(cfg config.SchemaConfig, tableName string) (config.PeriodConfig, bool) {
tableInterval := retention.ExtractIntervalFromTableName(tableName)
schemaCfg, err := cfg.SchemaForTime(tableInterval.Start)
if err != nil || schemaCfg.IndexTables.TableFor(tableInterval.Start) != tableName {
return config.PeriodConfig{}, false
}
return schemaCfg, true
}

@ -0,0 +1,134 @@
package bloomcompactor
import (
"context"
"flag"
"fmt"
"path/filepath"
"testing"
"time"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/kv/consul"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/server"
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/compactor"
"github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/storage/chunk/client/local"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper"
util_log "github.com/grafana/loki/pkg/util/log"
lokiring "github.com/grafana/loki/pkg/util/ring"
"github.com/grafana/loki/pkg/validation"
)
const (
indexTablePrefix = "table_"
workingDirName = "working-dir"
)
func TestCompactor_RunCompaction(t *testing.T) {
servercfg := &server.Config{}
require.Nil(t, servercfg.LogLevel.Set("debug"))
util_log.InitLogger(servercfg, nil, false)
tempDir := t.TempDir()
indexDir := filepath.Join(tempDir, "index")
schemaCfg := config.SchemaConfig{
Configs: []config.PeriodConfig{
{
From: config.DayTime{Time: model.Time(0)},
IndexType: "tsdb",
ObjectType: "filesystem",
Schema: "v12",
IndexTables: config.IndexPeriodicTableConfig{
PathPrefix: "index/",
PeriodicTableConfig: config.PeriodicTableConfig{
Prefix: indexTablePrefix,
Period: config.ObjectStorageIndexRequiredPeriod,
}},
},
},
}
daySeconds := int64(24 * time.Hour / time.Second)
tableNumEnd := time.Now().Unix() / daySeconds
tableNumStart := tableNumEnd - 5
for i := tableNumStart; i <= tableNumEnd; i++ {
compactor.SetupTable(
t,
filepath.Join(indexDir, fmt.Sprintf("%s%d", indexTablePrefix, i)),
compactor.IndexesConfig{
NumUnCompactedFiles: 5,
NumCompactedFiles: 5,
},
compactor.PerUserIndexesConfig{
NumUsers: 5,
IndexesConfig: compactor.IndexesConfig{
NumUnCompactedFiles: 5,
NumCompactedFiles: 5,
},
},
)
}
kvStore, cleanUp := consul.NewInMemoryClient(ring.GetCodec(), util_log.Logger, nil)
t.Cleanup(func() { assert.NoError(t, cleanUp.Close()) })
var cfg Config
flagext.DefaultValues(&cfg)
cfg.WorkingDirectory = filepath.Join(tempDir, workingDirName)
cfg.Ring.KVStore.Mock = kvStore
cfg.Ring.ListenPort = 0
cfg.Ring.InstanceAddr = "bloomcompactor"
cfg.Ring.InstanceID = "bloomcompactor"
storageConfig := storage.Config{
FSConfig: local.FSConfig{Directory: tempDir},
TSDBShipperConfig: indexshipper.Config{
ActiveIndexDirectory: indexDir,
ResyncInterval: 1 * time.Minute,
Mode: indexshipper.ModeReadWrite,
CacheLocation: filepath.Join(tempDir, "cache"),
},
}
var limits validation.Limits
limits.RegisterFlags(flag.NewFlagSet("limits", flag.PanicOnError))
overrides, _ := validation.NewOverrides(limits, nil)
clientMetrics := storage.NewClientMetrics()
t.Cleanup(clientMetrics.Unregister)
ringManager, err := lokiring.NewRingManager("bloom-compactor", lokiring.ServerMode, cfg.Ring, 1, 1, util_log.Logger, prometheus.DefaultRegisterer)
require.NoError(t, err)
err = ringManager.StartAsync(context.Background())
require.NoError(t, err)
require.Eventually(t, func() bool {
return ringManager.State() == services.Running
}, 1*time.Minute, 100*time.Millisecond)
defer func() {
ringManager.StopAsync()
require.Eventually(t, func() bool {
return ringManager.State() == services.Terminated
}, 1*time.Minute, 100*time.Millisecond)
}()
shuffleSharding := NewShuffleShardingStrategy(ringManager.Ring, ringManager.RingLifecycler, overrides)
c, err := New(cfg, storageConfig, schemaCfg, overrides, util_log.Logger, shuffleSharding, clientMetrics, nil)
require.NoError(t, err)
err = c.runCompaction(context.Background())
require.NoError(t, err)
// TODO: Once compaction is implemented, verify compaction here.
}

@ -2,7 +2,9 @@ package bloomcompactor
import (
"flag"
"time"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/downloads"
"github.com/grafana/loki/pkg/util/ring"
)
@ -13,12 +15,32 @@ type Config struct {
// section and the ingester configuration by default).
Ring ring.RingConfig `yaml:"ring,omitempty" doc:"description=Defines the ring to be used by the bloom-compactor servers. In case this isn't configured, this block supports inheriting configuration from the common ring section."`
// Enabled configures whether bloom-compactors should be used to compact index values into bloomfilters
Enabled bool `yaml:"enabled"`
WorkingDirectory string `yaml:"working_directory"`
Enabled bool `yaml:"enabled"`
WorkingDirectory string `yaml:"working_directory"`
CompactionInterval time.Duration `yaml:"compaction_interval"`
RetryMinBackoff time.Duration `yaml:"compaction_retries_min_backoff"`
RetryMaxBackoff time.Duration `yaml:"compaction_retries_max_backoff"`
CompactionRetries int `yaml:"compaction_retries"`
MaxCompactionParallelism int `yaml:"max_compaction_parallelism"`
}
// RegisterFlags registers flags for the Bloom-Compactor configuration.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.Ring.RegisterFlagsWithPrefix("bloom-compactor.", "collectors/", f)
f.BoolVar(&cfg.Enabled, "bloom-compactor.enabled", false, "Flag to enable or disable the usage of the bloom-compactor component.")
f.StringVar(&cfg.WorkingDirectory, "bloom-compactor.working-directory", "", "Directory where files can be downloaded for compaction.")
f.DurationVar(&cfg.CompactionInterval, "bloom-compactor.compaction-interval", 10*time.Minute, "Interval at which to re-run the compaction operation.")
f.DurationVar(&cfg.RetryMinBackoff, "bloom-compactor.compaction-retries-min-backoff", 10*time.Second, "Minimum backoff time between retries.")
f.DurationVar(&cfg.RetryMaxBackoff, "bloom-compactor.compaction-retries-max-backoff", time.Minute, "Maximum backoff time between retries.")
f.IntVar(&cfg.CompactionRetries, "bloom-compactor.compaction-retries", 3, "Number of retries to perform when compaction fails.")
f.IntVar(&cfg.MaxCompactionParallelism, "bloom-compactor.max-compaction-parallelism", 1, "Maximum number of tables to compact in parallel. While increasing this value, please make sure compactor has enough disk space allocated to be able to store and compact as many tables.")
}
type Limits interface {
downloads.Limits
BloomCompactorShardSize(tenantID string) int
BloomCompactorMaxTableAge(tenantID string) time.Duration
BloomCompactorMinTableAge(tenantID string) time.Duration
}

@ -0,0 +1,97 @@
package bloomcompactor
import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb/index"
)
type Job struct {
tableName, tenantID, indexPath string
seriesLbs labels.Labels
seriesFP model.Fingerprint
chunks []index.ChunkMeta
// We compute them lazily.
from, through *model.Time
}
// NewJob returns a new compaction Job.
func NewJob(
tenantID string,
tableName string,
indexPath string,
seriesFP model.Fingerprint,
seriesLbs labels.Labels,
chunks []index.ChunkMeta,
) Job {
return Job{
tenantID: tenantID,
tableName: tableName,
indexPath: indexPath,
seriesFP: seriesFP,
seriesLbs: seriesLbs,
chunks: chunks,
}
}
func (j *Job) String() string {
return j.tableName + "_" + j.tenantID + "_" + j.seriesFP.String()
}
func (j *Job) TableName() string {
return j.tableName
}
func (j *Job) Tenant() string {
return j.tenantID
}
func (j *Job) Fingerprint() model.Fingerprint {
return j.seriesFP
}
func (j *Job) Chunks() []index.ChunkMeta {
return j.chunks
}
func (j *Job) Labels() labels.Labels {
return j.seriesLbs
}
func (j *Job) IndexPath() string {
return j.indexPath
}
func (j *Job) From() model.Time {
if j.from == nil {
j.computeFromThrough()
}
return *j.from
}
func (j *Job) Through() model.Time {
if j.through == nil {
j.computeFromThrough()
}
return *j.through
}
func (j *Job) computeFromThrough() {
minFrom := model.Latest
maxThrough := model.Earliest
for _, chunk := range j.chunks {
from, through := chunk.Bounds()
if minFrom > from {
minFrom = from
}
if maxThrough < through {
maxThrough = through
}
}
j.from = &minFrom
j.through = &maxThrough
}

@ -0,0 +1,105 @@
package bloomcompactor
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
const (
metricsNamespace = "loki"
metricsSubsystem = "bloomcompactor"
)
type metrics struct {
compactionRunsStarted prometheus.Counter
compactionRunsCompleted prometheus.Counter
compactionRunsErred prometheus.Counter
compactionRunDiscoveredTenants prometheus.Counter
compactionRunSkippedTenants prometheus.Counter
compactionRunSucceededTenants prometheus.Counter
compactionRunFailedTenants prometheus.Counter
compactionRunUnownedJobs prometheus.Counter
compactionRunSucceededJobs prometheus.Counter
compactionRunFailedJobs prometheus.Counter
compactionRunInterval prometheus.Gauge
compactorRunning prometheus.Gauge
}
func newMetrics(r prometheus.Registerer) *metrics {
m := metrics{
compactionRunsStarted: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "runs_started_total",
Help: "Total number of compactions started",
}),
compactionRunsCompleted: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "runs_completed_total",
Help: "Total number of compactions completed successfully",
}),
compactionRunsErred: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "runs_failed_total",
Help: "Total number of compaction runs failed",
}),
compactionRunDiscoveredTenants: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "tenants_discovered",
Help: "Number of tenants discovered during the current compaction run",
}),
compactionRunSkippedTenants: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "tenants_skipped",
Help: "Number of tenants skipped during the current compaction run",
}),
compactionRunSucceededTenants: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "tenants_succeeded",
Help: "Number of tenants successfully processed during the current compaction run",
}),
compactionRunFailedTenants: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "tenants_failed",
Help: "Number of tenants failed processing during the current compaction run",
}),
compactionRunUnownedJobs: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "jobs_unowned",
Help: "Number of unowned jobs skipped during the current compaction run",
}),
compactionRunSucceededJobs: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "jobs_succeeded",
Help: "Number of jobs successfully processed during the current compaction run",
}),
compactionRunFailedJobs: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "jobs_failed",
Help: "Number of jobs failed processing during the current compaction run",
}),
compactionRunInterval: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "compaction_interval_seconds",
Help: "The configured interval on which compaction is run in seconds",
}),
compactorRunning: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "running",
Help: "Value will be 1 if compactor is currently running on this instance",
}),
}
return &m
}

@ -0,0 +1,43 @@
package bloomcompactor
import (
"github.com/grafana/dskit/ring"
util_ring "github.com/grafana/loki/pkg/util/ring"
)
var (
// TODO: Should we include LEAVING instances in the replication set?
RingOp = ring.NewOp([]ring.InstanceState{ring.JOINING, ring.ACTIVE}, nil)
)
// ShardingStrategy describes whether compactor "owns" given user or job.
type ShardingStrategy interface {
util_ring.TenantSharding
OwnsJob(job Job) (bool, error)
}
type ShuffleShardingStrategy struct {
util_ring.TenantSharding
ringLifeCycler *ring.BasicLifecycler
}
func NewShuffleShardingStrategy(r *ring.Ring, ringLifecycler *ring.BasicLifecycler, limits Limits) *ShuffleShardingStrategy {
s := ShuffleShardingStrategy{
TenantSharding: util_ring.NewTenantShuffleSharding(r, ringLifecycler, limits.BloomCompactorShardSize),
ringLifeCycler: ringLifecycler,
}
return &s
}
// OwnsJob makes sure only a single compactor should execute the job.
func (s *ShuffleShardingStrategy) OwnsJob(job Job) (bool, error) {
if !s.OwnsTenant(job.Tenant()) {
return false, nil
}
tenantRing := s.GetTenantSubRing(job.Tenant())
fpSharding := util_ring.NewFingerprintShuffleSharding(tenantRing, s.ringLifeCycler, RingOp)
return fpSharding.OwnsFingerprint(uint64(job.Fingerprint()))
}

@ -0,0 +1,145 @@
package bloomcompactor
import (
"context"
"flag"
"fmt"
"testing"
"time"
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/downloads"
util_log "github.com/grafana/loki/pkg/util/log"
lokiring "github.com/grafana/loki/pkg/util/ring"
"github.com/grafana/loki/pkg/validation"
)
func TestShuffleSharding(t *testing.T) {
const shardSize = 2
const rings = 4
const tenants = 2000
const jobsPerTenant = 200
var limits validation.Limits
limits.RegisterFlags(flag.NewFlagSet("limits", flag.PanicOnError))
overrides, err := validation.NewOverrides(limits, nil)
require.NoError(t, err)
var ringManagers []*lokiring.RingManager
var shards []*ShuffleShardingStrategy
for i := 0; i < rings; i++ {
var ringCfg lokiring.RingConfig
ringCfg.RegisterFlagsWithPrefix("", "", flag.NewFlagSet("ring", flag.PanicOnError))
ringCfg.KVStore.Store = "inmemory"
ringCfg.InstanceID = fmt.Sprintf("bloom-compactor-%d", i)
ringCfg.InstanceAddr = fmt.Sprintf("localhost-%d", i)
ringManager, err := lokiring.NewRingManager("bloom-compactor", lokiring.ServerMode, ringCfg, 1, 1, util_log.Logger, prometheus.NewRegistry())
require.NoError(t, err)
require.NoError(t, ringManager.StartAsync(context.Background()))
sharding := NewShuffleShardingStrategy(ringManager.Ring, ringManager.RingLifecycler, mockLimits{
Limits: overrides,
bloomCompactorShardSize: shardSize,
})
ringManagers = append(ringManagers, ringManager)
shards = append(shards, sharding)
}
// Wait for all rings to see each other.
for i := 0; i < rings; i++ {
require.Eventually(t, func() bool {
running := ringManagers[i].State() == services.Running
discovered := ringManagers[i].Ring.InstancesCount() == rings
return running && discovered
}, 1*time.Minute, 100*time.Millisecond)
}
// This is kind of an un-deterministic test, because sharding is random
// and the seed is initialized by the ring lib.
// Here we'll generate a bunch of tenants and test that if the sharding doesn't own the tenant,
// that's because the tenant is owned by other ring instances.
shard := shards[0]
otherShards := shards[1:]
var ownedTenants, ownedJobs int
for i := 0; i < tenants; i++ {
tenant := fmt.Sprintf("tenant-%d", i)
ownsTenant := shard.OwnsTenant(tenant)
var tenantOwnedByOther int
for _, other := range otherShards {
otherOwns := other.OwnsTenant(tenant)
if otherOwns {
tenantOwnedByOther++
}
}
// If this shard owns the tenant, shardSize-1 other members should also own the tenant.
// Otherwise, shardSize other members should own the tenant.
if ownsTenant {
require.Equal(t, shardSize-1, tenantOwnedByOther)
ownedTenants++
} else {
require.Equal(t, shardSize, tenantOwnedByOther)
}
for j := 0; j < jobsPerTenant; j++ {
lbls := labels.FromStrings("namespace", fmt.Sprintf("namespace-%d", j))
job := NewJob(tenant, "", "", model.Fingerprint(lbls.Hash()), lbls, nil)
ownsJob, err := shard.OwnsJob(job)
require.NoError(t, err)
var jobOwnedByOther int
for _, other := range otherShards {
otherOwns, err := other.OwnsJob(job)
require.NoError(t, err)
if otherOwns {
jobOwnedByOther++
}
}
// If this shard owns the job, no one else should own the job.
// And if this shard doesn't own the job, only one of the other shards should own the job.
if ownsJob {
require.Equal(t, 0, jobOwnedByOther)
ownedJobs++
} else {
require.Equal(t, 1, jobOwnedByOther)
}
}
}
t.Logf("owned tenants: %d (out of %d)", ownedTenants, tenants)
t.Logf("owned jobs: %d (out of %d)", ownedJobs, tenants*jobsPerTenant)
// Stop all rings and wait for them to stop.
for i := 0; i < rings; i++ {
ringManagers[i].StopAsync()
require.Eventually(t, func() bool {
return ringManagers[i].State() == services.Terminated
}, 1*time.Minute, 100*time.Millisecond)
}
}
type mockLimits struct {
downloads.Limits
bloomCompactorShardSize int
}
func (m mockLimits) BloomCompactorShardSize(_ string) int {
return m.bloomCompactorShardSize
}
func (m mockLimits) BloomCompactorMaxTableAge(_ string) time.Duration {
return 0
}
func (m mockLimits) BloomCompactorMinTableAge(_ string) time.Duration {
return 0
}

@ -5,6 +5,8 @@ import (
"github.com/go-kit/log"
"github.com/grafana/dskit/ring"
util_ring "github.com/grafana/loki/pkg/util/ring"
)
// TODO(chaudum): Replace this placeholder with actual BlockRef struct.
@ -45,20 +47,17 @@ type ShardingStrategy interface {
}
type ShuffleShardingStrategy struct {
r ring.ReadRing
limits Limits
instanceAddr string
instanceID string
logger log.Logger
util_ring.TenantSharding
r ring.ReadRing
ringLifeCycler *ring.BasicLifecycler
logger log.Logger
}
func NewShuffleShardingStrategy(r ring.ReadRing, l Limits, instanceAddr, instanceID string, logger log.Logger) *ShuffleShardingStrategy {
func NewShuffleShardingStrategy(r ring.ReadRing, ringLifecycler *ring.BasicLifecycler, limits Limits, logger log.Logger) *ShuffleShardingStrategy {
return &ShuffleShardingStrategy{
r: r,
limits: l,
instanceAddr: instanceAddr,
instanceID: instanceID,
logger: logger,
TenantSharding: util_ring.NewTenantShuffleSharding(r, ringLifecycler, limits.BloomGatewayShardSize),
ringLifeCycler: ringLifecycler,
logger: logger,
}
}
@ -69,17 +68,15 @@ func (s *ShuffleShardingStrategy) FilterTenants(_ context.Context, tenantIDs []s
// instance, because of the auto-forget feature.
if set, err := s.r.GetAllHealthy(BlocksOwnerSync); err != nil {
return nil, err
} else if !set.Includes(s.instanceAddr) {
} else if !set.Includes(s.ringLifeCycler.GetInstanceID()) {
return nil, errGatewayUnhealthy
}
var filteredIDs []string
for _, tenantID := range tenantIDs {
subRing := GetShuffleShardingSubring(s.r, tenantID, s.limits)
// Include the user only if it belongs to this bloom gateway shard.
if subRing.HasInstance(s.instanceID) {
if s.OwnsTenant(tenantID) {
filteredIDs = append(filteredIDs, tenantID)
}
}
@ -94,35 +91,35 @@ func getBucket(rangeMin, rangeMax, pos uint64) int {
// FilterBlocks implements ShardingStrategy.
func (s *ShuffleShardingStrategy) FilterBlocks(_ context.Context, tenantID string, blockRefs []BlockRef) ([]BlockRef, error) {
filteredBlockRefs := make([]BlockRef, 0, len(blockRefs))
if !s.OwnsTenant(tenantID) {
return nil, nil
}
subRing := GetShuffleShardingSubring(s.r, tenantID, s.limits)
filteredBlockRefs := make([]BlockRef, 0, len(blockRefs))
bufDescs, bufHosts, bufZones := ring.MakeBuffersForGet()
var rs ring.ReplicationSet
var err error
tenantRing := s.GetTenantSubRing(tenantID)
fpSharding := util_ring.NewFingerprintShuffleSharding(tenantRing, s.ringLifeCycler, BlocksOwnerSync)
for _, blockRef := range blockRefs {
rs, err = subRing.Get(uint32(blockRef.FromFp), BlocksOwnerSync, bufDescs, bufHosts, bufZones)
owns, err := fpSharding.OwnsFingerprint(blockRef.FromFp)
if err != nil {
return nil, err
}
// Include the block only if it belongs to this bloom gateway shard.
if rs.Includes(s.instanceID) {
if owns {
filteredBlockRefs = append(filteredBlockRefs, blockRef)
continue
}
rs, err = subRing.Get(uint32(blockRef.ThroughFp), BlocksOwnerSync, bufDescs, bufHosts, bufZones)
owns, err = fpSharding.OwnsFingerprint(blockRef.ThroughFp)
if err != nil {
return nil, err
}
// Include the block only if it belongs to this bloom gateway shard.
if rs.Includes(s.instanceID) {
if owns {
filteredBlockRefs = append(filteredBlockRefs, blockRef)
continue
}
}
return filteredBlockRefs, nil
}

@ -1260,9 +1260,7 @@ func (t *Loki) addCompactorMiddleware(h http.HandlerFunc) http.Handler {
func (t *Loki) initBloomGateway() (services.Service, error) {
logger := log.With(util_log.Logger, "component", "bloom-gateway")
instanceAddr := t.bloomGatewayRingManager.RingLifecycler.GetInstanceAddr()
instanceID := t.bloomGatewayRingManager.RingLifecycler.GetInstanceID()
shuffleSharding := bloomgateway.NewShuffleShardingStrategy(t.bloomGatewayRingManager.Ring, t.Overrides, instanceAddr, instanceID, logger)
shuffleSharding := bloomgateway.NewShuffleShardingStrategy(t.bloomGatewayRingManager.Ring, t.bloomGatewayRingManager.RingLifecycler, t.Overrides, logger)
gateway, err := bloomgateway.New(t.Cfg.BloomGateway, t.Cfg.SchemaConfig, t.Cfg.StorageConfig, shuffleSharding, t.clientMetrics, logger, prometheus.DefaultRegisterer)
if err != nil {
@ -1399,12 +1397,16 @@ func (t *Loki) initIndexGatewayInterceptors() (services.Service, error) {
func (t *Loki) initBloomCompactor() (services.Service, error) {
logger := log.With(util_log.Logger, "component", "bloom-compactor")
compactor, err := bloomcompactor.New(t.Cfg.BloomCompactor,
t.ring,
shuffleSharding := bloomcompactor.NewShuffleShardingStrategy(t.bloomCompactorRingManager.Ring, t.bloomCompactorRingManager.RingLifecycler, t.Overrides)
compactor, err := bloomcompactor.New(
t.Cfg.BloomCompactor,
t.Cfg.StorageConfig,
t.Cfg.SchemaConfig,
t.Overrides,
logger,
shuffleSharding,
t.clientMetrics,
prometheus.DefaultRegisterer)

@ -1,6 +1,7 @@
package limiter
import (
"github.com/grafana/loki/pkg/bloomcompactor"
"github.com/grafana/loki/pkg/bloomgateway"
"github.com/grafana/loki/pkg/compactor"
"github.com/grafana/loki/pkg/distributor"
@ -24,4 +25,5 @@ type CombinedLimits interface {
storage.StoreLimits
indexgateway.Limits
bloomgateway.Limits
bloomcompactor.Limits
}

@ -0,0 +1,85 @@
package ring
import (
"github.com/grafana/dskit/ring"
"github.com/prometheus/common/model"
)
type TenantSharding interface {
GetTenantSubRing(tenantID string) ring.ReadRing
OwnsTenant(tenantID string) bool
}
type TenantShuffleSharding struct {
r ring.ReadRing
ringLifeCycler *ring.BasicLifecycler
shardSizeForTenant func(tenantID string) int
}
func NewTenantShuffleSharding(
r ring.ReadRing,
ringLifeCycler *ring.BasicLifecycler,
shardSizeForTenant func(tenantID string) int,
) *TenantShuffleSharding {
return &TenantShuffleSharding{
r: r,
ringLifeCycler: ringLifeCycler,
shardSizeForTenant: shardSizeForTenant,
}
}
func (s *TenantShuffleSharding) GetTenantSubRing(tenantID string) ring.ReadRing {
shardSize := s.shardSizeForTenant(tenantID)
// A shard size of 0 means shuffle sharding is disabled for this specific user,
if shardSize <= 0 {
return s.r
}
return s.r.ShuffleShard(tenantID, shardSize)
}
func (s *TenantShuffleSharding) OwnsTenant(tenantID string) bool {
subRing := s.GetTenantSubRing(tenantID)
return subRing.HasInstance(s.ringLifeCycler.GetInstanceID())
}
type FingerprintSharding interface {
OwnsFingerprint(fp model.Fingerprint) (bool, error)
}
// FingerprintShuffleSharding is not thread-safe.
type FingerprintShuffleSharding struct {
r ring.ReadRing
ringLifeCycler *ring.BasicLifecycler
ringOp ring.Operation
// Buffers for ring.Get() calls.
bufDescs []ring.InstanceDesc
bufHosts, bufZones []string
}
func NewFingerprintShuffleSharding(
r ring.ReadRing,
ringLifeCycler *ring.BasicLifecycler,
ringOp ring.Operation,
) *FingerprintShuffleSharding {
s := FingerprintShuffleSharding{
r: r,
ringLifeCycler: ringLifeCycler,
ringOp: ringOp,
}
s.bufDescs, s.bufHosts, s.bufZones = ring.MakeBuffersForGet()
return &s
}
func (s *FingerprintShuffleSharding) OwnsFingerprint(fp uint64) (bool, error) {
rs, err := s.r.Get(uint32(fp), s.ringOp, s.bufDescs, s.bufHosts, s.bufZones)
if err != nil {
return false, err
}
return rs.Includes(s.ringLifeCycler.GetInstanceAddr()), nil
}

@ -177,8 +177,11 @@ type Limits struct {
RequiredLabels []string `yaml:"required_labels,omitempty" json:"required_labels,omitempty" doc:"description=Define a list of required selector labels."`
RequiredNumberLabels int `yaml:"minimum_labels_number,omitempty" json:"minimum_labels_number,omitempty" doc:"description=Minimum number of label matchers a query should contain."`
IndexGatewayShardSize int `yaml:"index_gateway_shard_size" json:"index_gateway_shard_size"`
BloomGatewayShardSize int `yaml:"bloom_gateway_shard_size" json:"bloom_gateway_shard_size"`
IndexGatewayShardSize int `yaml:"index_gateway_shard_size" json:"index_gateway_shard_size"`
BloomGatewayShardSize int `yaml:"bloom_gateway_shard_size" json:"bloom_gateway_shard_size"`
BloomCompactorShardSize int `yaml:"bloom_compactor_shard_size" json:"bloom_compactor_shard_size"`
BloomCompactorMaxTableAge time.Duration `yaml:"bloom_compactor_max_table_age" json:"bloom_compactor_max_table_age"`
BloomCompactorMinTableAge time.Duration `yaml:"bloom_compactor_min_table_age" json:"bloom_compactor_min_table_age"`
AllowStructuredMetadata bool `yaml:"allow_structured_metadata,omitempty" json:"allow_structured_metadata,omitempty" doc:"description=Allow user to send structured metadata in push payload."`
MaxStructuredMetadataSize flagext.ByteSize `yaml:"max_structured_metadata_size" json:"max_structured_metadata_size" doc:"description=Maximum size accepted for structured metadata per log line."`
@ -289,6 +292,9 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&l.IndexGatewayShardSize, "index-gateway.shard-size", 0, "The shard size defines how many index gateways should be used by a tenant for querying. If the global shard factor is 0, the global shard factor is set to the deprecated -replication-factor for backwards compatibility reasons.")
f.IntVar(&l.BloomGatewayShardSize, "bloom-gateway.shard-size", 1, "The shard size defines how many bloom gateways should be used by a tenant for querying.")
f.IntVar(&l.BloomCompactorShardSize, "bloom-compactor.shard-size", 1, "The shard size defines how many bloom compactors should be used by a tenant when computing blooms. If it's set to 0, shuffle sharding is disabled.")
f.DurationVar(&l.BloomCompactorMaxTableAge, "bloom-compactor.max-table-age", 7*24*time.Hour, "The maximum age of a table before it is compacted. Do not compact tables older than the the configured time. Default to 7 days. 0s means no limit.")
f.DurationVar(&l.BloomCompactorMinTableAge, "bloom-compactor.min-table-age", 1*time.Hour, "The minimum age of a table before it is compacted. Do not compact tables newer than the the configured time. Default to 1 hour. 0s means no limit. This is useful to avoid compacting tables that will be updated with out-of-order writes.")
l.ShardStreams = &shardstreams.Config{}
l.ShardStreams.RegisterFlagsWithPrefix("shard-streams", f)
@ -768,6 +774,18 @@ func (o *Overrides) BloomGatewayShardSize(userID string) int {
return o.getOverridesForUser(userID).BloomGatewayShardSize
}
func (o *Overrides) BloomCompactorShardSize(userID string) int {
return o.getOverridesForUser(userID).BloomCompactorShardSize
}
func (o *Overrides) BloomCompactorMaxTableAge(userID string) time.Duration {
return o.getOverridesForUser(userID).BloomCompactorMaxTableAge
}
func (o *Overrides) BloomCompactorMinTableAge(userID string) time.Duration {
return o.getOverridesForUser(userID).BloomCompactorMinTableAge
}
func (o *Overrides) AllowStructuredMetadata(userID string) bool {
return o.getOverridesForUser(userID).AllowStructuredMetadata
}

Loading…
Cancel
Save