From efd8f5dc1b3bb3313de1ed6b26750d5bd5632b16 Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Wed, 22 May 2024 10:43:32 +0200 Subject: [PATCH] refactor(blooms): Add queue to bloom planner and enqueue tasks (#13005) --- docs/sources/shared/configuration.md | 9 ++++ pkg/bloombuild/planner/config.go | 32 ++++++++++-- pkg/bloombuild/planner/metrics.go | 40 ++++++++++++++- pkg/bloombuild/planner/planner.go | 73 +++++++++++++++++++++------- pkg/bloombuild/planner/task.go | 7 +++ pkg/validation/limits.go | 6 +++ 6 files changed, 145 insertions(+), 22 deletions(-) diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index d30dce2f77..acf11102be 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -351,6 +351,10 @@ bloom_build: # CLI flag: -bloom-build.planner.max-table-offset [max_table_offset: | default = 2] + # Maximum number of tasks to queue per tenant. + # CLI flag: -bloom-build.planner.max-tasks-per-tenant + [max_queued_tasks_per_tenant: | default = 30000] + builder: # Experimental: The bloom_gateway block configures the Loki bloom gateway @@ -3409,6 +3413,11 @@ shard_streams: # CLI flag: -bloom-build.split-keyspace-by [bloom_split_series_keyspace_by: | default = 256] +# Experimental. Maximum number of builders to use when building blooms. 0 allows +# unlimited builders. +# CLI flag: -bloom-build.max-builders +[bloom_build_max_builders: | default = 0] + # Experimental. Length of the n-grams created when computing blooms from log # lines. # CLI flag: -bloom-compactor.ngram-length diff --git a/pkg/bloombuild/planner/config.go b/pkg/bloombuild/planner/config.go index 47b01c0b28..aff25873b1 100644 --- a/pkg/bloombuild/planner/config.go +++ b/pkg/bloombuild/planner/config.go @@ -8,9 +8,10 @@ import ( // Config configures the bloom-planner component. type Config struct { - PlanningInterval time.Duration `yaml:"planning_interval"` - MinTableOffset int `yaml:"min_table_offset"` - MaxTableOffset int `yaml:"max_table_offset"` + PlanningInterval time.Duration `yaml:"planning_interval"` + MinTableOffset int `yaml:"min_table_offset"` + MaxTableOffset int `yaml:"max_table_offset"` + MaxQueuedTasksPerTenant int `yaml:"max_queued_tasks_per_tenant"` } // RegisterFlagsWithPrefix registers flags for the bloom-planner configuration. @@ -24,6 +25,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { // dynamically reloaded. // I'm doing it the simple way for now. f.IntVar(&cfg.MaxTableOffset, prefix+".max-table-offset", 2, "Oldest day-table offset (from today, inclusive) to compact. This can be used to lower cost by not trying to compact older data which doesn't change. This can be optimized by aligning it with the maximum `reject_old_samples_max_age` setting of any tenant.") + f.IntVar(&cfg.MaxQueuedTasksPerTenant, prefix+".max-tasks-per-tenant", 30000, "Maximum number of tasks to queue per tenant.") } func (cfg *Config) Validate() error { @@ -37,4 +39,28 @@ func (cfg *Config) Validate() error { type Limits interface { BloomCreationEnabled(tenantID string) bool BloomSplitSeriesKeyspaceBy(tenantID string) int + BloomBuildMaxBuilders(tenantID string) int +} + +type QueueLimits struct { + limits Limits +} + +func NewQueueLimits(limits Limits) *QueueLimits { + return &QueueLimits{limits: limits} +} + +// MaxConsumers is used to compute how many of the available builders are allowed to handle tasks for a given tenant. +// 0 is returned when neither limits are applied. 0 means all builders can be used. +func (c *QueueLimits) MaxConsumers(tenantID string, allConsumers int) int { + if c == nil || c.limits == nil { + return 0 + } + + maxBuilders := c.limits.BloomBuildMaxBuilders(tenantID) + if maxBuilders == 0 { + return 0 + } + + return min(allConsumers, maxBuilders) } diff --git a/pkg/bloombuild/planner/metrics.go b/pkg/bloombuild/planner/metrics.go index c0028237d9..347af19266 100644 --- a/pkg/bloombuild/planner/metrics.go +++ b/pkg/bloombuild/planner/metrics.go @@ -1,8 +1,12 @@ package planner import ( + "time" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + + "github.com/grafana/loki/v3/pkg/queue" ) const ( @@ -16,6 +20,11 @@ const ( type Metrics struct { running prometheus.Gauge + // Extra Queue metrics + connectedBuilders prometheus.GaugeFunc + queueDuration prometheus.Histogram + inflightRequests prometheus.Summary + buildStarted prometheus.Counter buildCompleted *prometheus.CounterVec buildTime *prometheus.HistogramVec @@ -23,7 +32,10 @@ type Metrics struct { tenantsDiscovered prometheus.Counter } -func NewMetrics(r prometheus.Registerer) *Metrics { +func NewMetrics( + r prometheus.Registerer, + getConnectedBuilders func() float64, +) *Metrics { return &Metrics{ running: promauto.With(r).NewGauge(prometheus.GaugeOpts{ Namespace: metricsNamespace, @@ -31,6 +43,28 @@ func NewMetrics(r prometheus.Registerer) *Metrics { Name: "running", Help: "Value will be 1 if bloom planner is currently running on this instance", }), + connectedBuilders: promauto.With(r).NewGaugeFunc(prometheus.GaugeOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "connected_builders", + Help: "Number of builders currently connected to the planner.", + }, getConnectedBuilders), + queueDuration: promauto.With(r).NewHistogram(prometheus.HistogramOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "queue_duration_seconds", + Help: "Time spend by tasks in queue before getting picked up by a builder.", + Buckets: prometheus.DefBuckets, + }), + inflightRequests: promauto.With(r).NewSummary(prometheus.SummaryOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "inflight_tasks", + Help: "Number of inflight tasks (either queued or processing) sampled at a regular interval. Quantile buckets keep track of inflight tasks over the last 60s.", + Objectives: map[float64]float64{0.5: 0.05, 0.75: 0.02, 0.8: 0.02, 0.9: 0.01, 0.95: 0.01, 0.99: 0.001}, + MaxAge: time.Minute, + AgeBuckets: 6, + }), buildStarted: promauto.With(r).NewCounter(prometheus.CounterOpts{ Namespace: metricsNamespace, @@ -60,3 +94,7 @@ func NewMetrics(r prometheus.Registerer) *Metrics { }), } } + +func NewQueueMetrics(r prometheus.Registerer) *queue.Metrics { + return queue.NewMetrics(r, metricsNamespace, metricsSubsystem) +} diff --git a/pkg/bloombuild/planner/planner.go b/pkg/bloombuild/planner/planner.go index 0be853a2f6..9a5b9f6dc2 100644 --- a/pkg/bloombuild/planner/planner.go +++ b/pkg/bloombuild/planner/planner.go @@ -12,16 +12,21 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" + "github.com/grafana/loki/v3/pkg/queue" "github.com/grafana/loki/v3/pkg/storage" v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" "github.com/grafana/loki/v3/pkg/storage/config" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb" + "github.com/grafana/loki/v3/pkg/util" utillog "github.com/grafana/loki/v3/pkg/util/log" ) type Planner struct { services.Service + // Subservices manager. + subservices *services.Manager + subservicesWatcher *services.FailureWatcher cfg Config limits Limits @@ -30,6 +35,9 @@ type Planner struct { tsdbStore TSDBStore bloomStore bloomshipper.Store + tasksQueue *queue.RequestQueue + activeUsers *util.ActiveUsersCleanupService + metrics *Metrics logger log.Logger } @@ -51,15 +59,34 @@ func New( return nil, fmt.Errorf("error creating TSDB store: %w", err) } + // Queue to manage tasks + queueMetrics := NewQueueMetrics(r) + tasksQueue := queue.NewRequestQueue(cfg.MaxQueuedTasksPerTenant, 0, NewQueueLimits(limits), queueMetrics) + + // Clean metrics for inactive users: do not have added tasks to the queue in the last 1 hour + activeUsers := util.NewActiveUsersCleanupService(5*time.Minute, 1*time.Hour, func(user string) { + queueMetrics.Cleanup(user) + }) + p := &Planner{ - cfg: cfg, - limits: limits, - schemaCfg: schemaCfg, - tsdbStore: tsdbStore, - bloomStore: bloomStore, - metrics: NewMetrics(r), - logger: logger, + cfg: cfg, + limits: limits, + schemaCfg: schemaCfg, + tsdbStore: tsdbStore, + bloomStore: bloomStore, + tasksQueue: tasksQueue, + activeUsers: activeUsers, + metrics: NewMetrics(r, tasksQueue.GetConnectedConsumersMetric), + logger: logger, + } + + svcs := []services.Service{p.tasksQueue, p.activeUsers} + p.subservices, err = services.NewManager(svcs...) + if err != nil { + return nil, fmt.Errorf("error creating subservices manager: %w", err) } + p.subservicesWatcher = services.NewFailureWatcher() + p.subservicesWatcher.WatchManager(p.subservices) p.Service = services.NewBasicService(p.starting, p.running, p.stopping) return p, nil @@ -91,6 +118,7 @@ func (p *Planner) running(ctx context.Context) error { return err case <-ticker.C: + level.Info(p.logger).Log("msg", "starting bloom build iteration") if err := p.runOne(ctx); err != nil { level.Error(p.logger).Log("msg", "bloom build iteration failed", "err", err) } @@ -109,44 +137,53 @@ func (p *Planner) runOne(ctx context.Context) error { }() p.metrics.buildStarted.Inc() - level.Info(p.logger).Log("msg", "running bloom build planning") tables := p.tables(time.Now()) level.Debug(p.logger).Log("msg", "loaded tables", "tables", tables.TotalDays()) work, err := p.loadWork(ctx, tables) if err != nil { - level.Error(p.logger).Log("msg", "error loading work", "err", err) return fmt.Errorf("error loading work: %w", err) } - // TODO: Enqueue instead of buffering here - // This is just a placeholder for now - var tasks []Task - + var totalTasks int for _, w := range work { + logger := log.With(p.logger, "tenant", w.tenant, "table", w.table.Addr(), "ownership", w.ownershipRange.String()) + gaps, err := p.findGapsForBounds(ctx, w.tenant, w.table, w.ownershipRange) if err != nil { - level.Error(p.logger).Log("msg", "error finding gaps", "err", err, "tenant", w.tenant, "table", w.table, "ownership", w.ownershipRange.String()) - return fmt.Errorf("error finding gaps for tenant (%s) in table (%s) for bounds (%s): %w", w.tenant, w.table, w.ownershipRange, err) + level.Error(logger).Log("msg", "error finding gaps", "err", err) + continue } + now := time.Now() for _, gap := range gaps { - tasks = append(tasks, Task{ + totalTasks++ + task := Task{ table: w.table.Addr(), tenant: w.tenant, OwnershipBounds: w.ownershipRange, tsdb: gap.tsdb, gaps: gap.gaps, - }) + + queueTime: now, + ctx: ctx, + } + + p.activeUsers.UpdateUserTimestamp(task.tenant, now) + if err := p.tasksQueue.Enqueue(task.tenant, nil, task, nil); err != nil { + level.Error(logger).Log("msg", "error enqueuing task", "err", err) + continue + } } } + level.Debug(p.logger).Log("msg", "planning completed", "tasks", totalTasks) + status = statusSuccess level.Info(p.logger).Log( "msg", "bloom build iteration completed", "duration", time.Since(start).Seconds(), - "tasks", len(tasks), ) return nil } diff --git a/pkg/bloombuild/planner/task.go b/pkg/bloombuild/planner/task.go index 80f730c4fb..bff459fe17 100644 --- a/pkg/bloombuild/planner/task.go +++ b/pkg/bloombuild/planner/task.go @@ -1,6 +1,9 @@ package planner import ( + "context" + "time" + v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb" @@ -19,4 +22,8 @@ type Task struct { OwnershipBounds v1.FingerprintBounds tsdb tsdb.SingleTenantTSDBIdentifier gaps []GapWithBlocks + + // Tracking + queueTime time.Time + ctx context.Context } diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index b0660686f5..148205b306 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -207,6 +207,7 @@ type Limits struct { BloomCreationEnabled bool `yaml:"bloom_creation_enabled" json:"bloom_creation_enabled" category:"experimental"` BloomSplitSeriesKeyspaceBy int `yaml:"bloom_split_series_keyspace_by" json:"bloom_split_series_keyspace_by" category:"experimental"` + BloomBuildMaxBuilders int `yaml:"bloom_build_max_builders" json:"bloom_build_max_builders" category:"experimental"` BloomNGramLength int `yaml:"bloom_ngram_length" json:"bloom_ngram_length" category:"experimental"` BloomNGramSkip int `yaml:"bloom_ngram_skip" json:"bloom_ngram_skip" category:"experimental"` @@ -385,6 +386,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&l.BloomCreationEnabled, "bloom-build.enable", false, "Experimental. Whether to create blooms for the tenant.") f.IntVar(&l.BloomSplitSeriesKeyspaceBy, "bloom-build.split-keyspace-by", 256, "Experimental. Number of splits to create for the series keyspace when building blooms. The series keyspace is split into this many parts to parallelize bloom creation.") + f.IntVar(&l.BloomBuildMaxBuilders, "bloom-build.max-builders", 0, "Experimental. Maximum number of builders to use when building blooms. 0 allows unlimited builders.") _ = l.BloomCompactorMaxBloomSize.Set(defaultBloomCompactorMaxBloomSize) f.Var(&l.BloomCompactorMaxBloomSize, "bloom-compactor.max-bloom-size", @@ -987,6 +989,10 @@ func (o *Overrides) BloomSplitSeriesKeyspaceBy(userID string) int { return o.getOverridesForUser(userID).BloomSplitSeriesKeyspaceBy } +func (o *Overrides) BloomBuildMaxBuilders(userID string) int { + return o.getOverridesForUser(userID).BloomBuildMaxBuilders +} + func (o *Overrides) BloomNGramLength(userID string) int { return o.getOverridesForUser(userID).BloomNGramLength }