refactor(blooms): Add queue to bloom planner and enqueue tasks (#13005)

pull/13011/head^2
Salva Corts 1 year ago committed by GitHub
parent d6f29fc789
commit efd8f5dc1b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 9
      docs/sources/shared/configuration.md
  2. 32
      pkg/bloombuild/planner/config.go
  3. 40
      pkg/bloombuild/planner/metrics.go
  4. 73
      pkg/bloombuild/planner/planner.go
  5. 7
      pkg/bloombuild/planner/task.go
  6. 6
      pkg/validation/limits.go

@ -351,6 +351,10 @@ bloom_build:
# CLI flag: -bloom-build.planner.max-table-offset
[max_table_offset: <int> | default = 2]
# Maximum number of tasks to queue per tenant.
# CLI flag: -bloom-build.planner.max-tasks-per-tenant
[max_queued_tasks_per_tenant: <int> | default = 30000]
builder:
# Experimental: The bloom_gateway block configures the Loki bloom gateway
@ -3409,6 +3413,11 @@ shard_streams:
# CLI flag: -bloom-build.split-keyspace-by
[bloom_split_series_keyspace_by: <int> | default = 256]
# Experimental. Maximum number of builders to use when building blooms. 0 allows
# unlimited builders.
# CLI flag: -bloom-build.max-builders
[bloom_build_max_builders: <int> | default = 0]
# Experimental. Length of the n-grams created when computing blooms from log
# lines.
# CLI flag: -bloom-compactor.ngram-length

@ -8,9 +8,10 @@ import (
// Config configures the bloom-planner component.
type Config struct {
PlanningInterval time.Duration `yaml:"planning_interval"`
MinTableOffset int `yaml:"min_table_offset"`
MaxTableOffset int `yaml:"max_table_offset"`
PlanningInterval time.Duration `yaml:"planning_interval"`
MinTableOffset int `yaml:"min_table_offset"`
MaxTableOffset int `yaml:"max_table_offset"`
MaxQueuedTasksPerTenant int `yaml:"max_queued_tasks_per_tenant"`
}
// RegisterFlagsWithPrefix registers flags for the bloom-planner configuration.
@ -24,6 +25,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
// dynamically reloaded.
// I'm doing it the simple way for now.
f.IntVar(&cfg.MaxTableOffset, prefix+".max-table-offset", 2, "Oldest day-table offset (from today, inclusive) to compact. This can be used to lower cost by not trying to compact older data which doesn't change. This can be optimized by aligning it with the maximum `reject_old_samples_max_age` setting of any tenant.")
f.IntVar(&cfg.MaxQueuedTasksPerTenant, prefix+".max-tasks-per-tenant", 30000, "Maximum number of tasks to queue per tenant.")
}
func (cfg *Config) Validate() error {
@ -37,4 +39,28 @@ func (cfg *Config) Validate() error {
type Limits interface {
BloomCreationEnabled(tenantID string) bool
BloomSplitSeriesKeyspaceBy(tenantID string) int
BloomBuildMaxBuilders(tenantID string) int
}
type QueueLimits struct {
limits Limits
}
func NewQueueLimits(limits Limits) *QueueLimits {
return &QueueLimits{limits: limits}
}
// MaxConsumers is used to compute how many of the available builders are allowed to handle tasks for a given tenant.
// 0 is returned when neither limits are applied. 0 means all builders can be used.
func (c *QueueLimits) MaxConsumers(tenantID string, allConsumers int) int {
if c == nil || c.limits == nil {
return 0
}
maxBuilders := c.limits.BloomBuildMaxBuilders(tenantID)
if maxBuilders == 0 {
return 0
}
return min(allConsumers, maxBuilders)
}

@ -1,8 +1,12 @@
package planner
import (
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/grafana/loki/v3/pkg/queue"
)
const (
@ -16,6 +20,11 @@ const (
type Metrics struct {
running prometheus.Gauge
// Extra Queue metrics
connectedBuilders prometheus.GaugeFunc
queueDuration prometheus.Histogram
inflightRequests prometheus.Summary
buildStarted prometheus.Counter
buildCompleted *prometheus.CounterVec
buildTime *prometheus.HistogramVec
@ -23,7 +32,10 @@ type Metrics struct {
tenantsDiscovered prometheus.Counter
}
func NewMetrics(r prometheus.Registerer) *Metrics {
func NewMetrics(
r prometheus.Registerer,
getConnectedBuilders func() float64,
) *Metrics {
return &Metrics{
running: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Namespace: metricsNamespace,
@ -31,6 +43,28 @@ func NewMetrics(r prometheus.Registerer) *Metrics {
Name: "running",
Help: "Value will be 1 if bloom planner is currently running on this instance",
}),
connectedBuilders: promauto.With(r).NewGaugeFunc(prometheus.GaugeOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "connected_builders",
Help: "Number of builders currently connected to the planner.",
}, getConnectedBuilders),
queueDuration: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "queue_duration_seconds",
Help: "Time spend by tasks in queue before getting picked up by a builder.",
Buckets: prometheus.DefBuckets,
}),
inflightRequests: promauto.With(r).NewSummary(prometheus.SummaryOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "inflight_tasks",
Help: "Number of inflight tasks (either queued or processing) sampled at a regular interval. Quantile buckets keep track of inflight tasks over the last 60s.",
Objectives: map[float64]float64{0.5: 0.05, 0.75: 0.02, 0.8: 0.02, 0.9: 0.01, 0.95: 0.01, 0.99: 0.001},
MaxAge: time.Minute,
AgeBuckets: 6,
}),
buildStarted: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace,
@ -60,3 +94,7 @@ func NewMetrics(r prometheus.Registerer) *Metrics {
}),
}
}
func NewQueueMetrics(r prometheus.Registerer) *queue.Metrics {
return queue.NewMetrics(r, metricsNamespace, metricsSubsystem)
}

@ -12,16 +12,21 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/grafana/loki/v3/pkg/queue"
"github.com/grafana/loki/v3/pkg/storage"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
"github.com/grafana/loki/v3/pkg/util"
utillog "github.com/grafana/loki/v3/pkg/util/log"
)
type Planner struct {
services.Service
// Subservices manager.
subservices *services.Manager
subservicesWatcher *services.FailureWatcher
cfg Config
limits Limits
@ -30,6 +35,9 @@ type Planner struct {
tsdbStore TSDBStore
bloomStore bloomshipper.Store
tasksQueue *queue.RequestQueue
activeUsers *util.ActiveUsersCleanupService
metrics *Metrics
logger log.Logger
}
@ -51,15 +59,34 @@ func New(
return nil, fmt.Errorf("error creating TSDB store: %w", err)
}
// Queue to manage tasks
queueMetrics := NewQueueMetrics(r)
tasksQueue := queue.NewRequestQueue(cfg.MaxQueuedTasksPerTenant, 0, NewQueueLimits(limits), queueMetrics)
// Clean metrics for inactive users: do not have added tasks to the queue in the last 1 hour
activeUsers := util.NewActiveUsersCleanupService(5*time.Minute, 1*time.Hour, func(user string) {
queueMetrics.Cleanup(user)
})
p := &Planner{
cfg: cfg,
limits: limits,
schemaCfg: schemaCfg,
tsdbStore: tsdbStore,
bloomStore: bloomStore,
metrics: NewMetrics(r),
logger: logger,
cfg: cfg,
limits: limits,
schemaCfg: schemaCfg,
tsdbStore: tsdbStore,
bloomStore: bloomStore,
tasksQueue: tasksQueue,
activeUsers: activeUsers,
metrics: NewMetrics(r, tasksQueue.GetConnectedConsumersMetric),
logger: logger,
}
svcs := []services.Service{p.tasksQueue, p.activeUsers}
p.subservices, err = services.NewManager(svcs...)
if err != nil {
return nil, fmt.Errorf("error creating subservices manager: %w", err)
}
p.subservicesWatcher = services.NewFailureWatcher()
p.subservicesWatcher.WatchManager(p.subservices)
p.Service = services.NewBasicService(p.starting, p.running, p.stopping)
return p, nil
@ -91,6 +118,7 @@ func (p *Planner) running(ctx context.Context) error {
return err
case <-ticker.C:
level.Info(p.logger).Log("msg", "starting bloom build iteration")
if err := p.runOne(ctx); err != nil {
level.Error(p.logger).Log("msg", "bloom build iteration failed", "err", err)
}
@ -109,44 +137,53 @@ func (p *Planner) runOne(ctx context.Context) error {
}()
p.metrics.buildStarted.Inc()
level.Info(p.logger).Log("msg", "running bloom build planning")
tables := p.tables(time.Now())
level.Debug(p.logger).Log("msg", "loaded tables", "tables", tables.TotalDays())
work, err := p.loadWork(ctx, tables)
if err != nil {
level.Error(p.logger).Log("msg", "error loading work", "err", err)
return fmt.Errorf("error loading work: %w", err)
}
// TODO: Enqueue instead of buffering here
// This is just a placeholder for now
var tasks []Task
var totalTasks int
for _, w := range work {
logger := log.With(p.logger, "tenant", w.tenant, "table", w.table.Addr(), "ownership", w.ownershipRange.String())
gaps, err := p.findGapsForBounds(ctx, w.tenant, w.table, w.ownershipRange)
if err != nil {
level.Error(p.logger).Log("msg", "error finding gaps", "err", err, "tenant", w.tenant, "table", w.table, "ownership", w.ownershipRange.String())
return fmt.Errorf("error finding gaps for tenant (%s) in table (%s) for bounds (%s): %w", w.tenant, w.table, w.ownershipRange, err)
level.Error(logger).Log("msg", "error finding gaps", "err", err)
continue
}
now := time.Now()
for _, gap := range gaps {
tasks = append(tasks, Task{
totalTasks++
task := Task{
table: w.table.Addr(),
tenant: w.tenant,
OwnershipBounds: w.ownershipRange,
tsdb: gap.tsdb,
gaps: gap.gaps,
})
queueTime: now,
ctx: ctx,
}
p.activeUsers.UpdateUserTimestamp(task.tenant, now)
if err := p.tasksQueue.Enqueue(task.tenant, nil, task, nil); err != nil {
level.Error(logger).Log("msg", "error enqueuing task", "err", err)
continue
}
}
}
level.Debug(p.logger).Log("msg", "planning completed", "tasks", totalTasks)
status = statusSuccess
level.Info(p.logger).Log(
"msg", "bloom build iteration completed",
"duration", time.Since(start).Seconds(),
"tasks", len(tasks),
)
return nil
}

@ -1,6 +1,9 @@
package planner
import (
"context"
"time"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
@ -19,4 +22,8 @@ type Task struct {
OwnershipBounds v1.FingerprintBounds
tsdb tsdb.SingleTenantTSDBIdentifier
gaps []GapWithBlocks
// Tracking
queueTime time.Time
ctx context.Context
}

@ -207,6 +207,7 @@ type Limits struct {
BloomCreationEnabled bool `yaml:"bloom_creation_enabled" json:"bloom_creation_enabled" category:"experimental"`
BloomSplitSeriesKeyspaceBy int `yaml:"bloom_split_series_keyspace_by" json:"bloom_split_series_keyspace_by" category:"experimental"`
BloomBuildMaxBuilders int `yaml:"bloom_build_max_builders" json:"bloom_build_max_builders" category:"experimental"`
BloomNGramLength int `yaml:"bloom_ngram_length" json:"bloom_ngram_length" category:"experimental"`
BloomNGramSkip int `yaml:"bloom_ngram_skip" json:"bloom_ngram_skip" category:"experimental"`
@ -385,6 +386,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&l.BloomCreationEnabled, "bloom-build.enable", false, "Experimental. Whether to create blooms for the tenant.")
f.IntVar(&l.BloomSplitSeriesKeyspaceBy, "bloom-build.split-keyspace-by", 256, "Experimental. Number of splits to create for the series keyspace when building blooms. The series keyspace is split into this many parts to parallelize bloom creation.")
f.IntVar(&l.BloomBuildMaxBuilders, "bloom-build.max-builders", 0, "Experimental. Maximum number of builders to use when building blooms. 0 allows unlimited builders.")
_ = l.BloomCompactorMaxBloomSize.Set(defaultBloomCompactorMaxBloomSize)
f.Var(&l.BloomCompactorMaxBloomSize, "bloom-compactor.max-bloom-size",
@ -987,6 +989,10 @@ func (o *Overrides) BloomSplitSeriesKeyspaceBy(userID string) int {
return o.getOverridesForUser(userID).BloomSplitSeriesKeyspaceBy
}
func (o *Overrides) BloomBuildMaxBuilders(userID string) int {
return o.getOverridesForUser(userID).BloomBuildMaxBuilders
}
func (o *Overrides) BloomNGramLength(userID string) int {
return o.getOverridesForUser(userID).BloomNGramLength
}

Loading…
Cancel
Save