refactor(blooms): Extract planner queue into pkg (backport k227) (#14925)

Co-authored-by: Salva Corts <salva.corts@grafana.com>
pull/14927/head
loki-gh-app[bot] 6 months ago committed by GitHub
parent dc36a1e128
commit 98ad8c8b4a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 9
      docs/sources/shared/configuration.md
  2. 17
      pkg/bloombuild/planner/config.go
  3. 84
      pkg/bloombuild/planner/planner.go
  4. 31
      pkg/bloombuild/planner/planner_test.go
  5. 22
      pkg/bloombuild/planner/queue/config.go
  6. 171
      pkg/bloombuild/planner/queue/queue.go
  7. 12
      pkg/bloombuild/planner/task.go

@ -1277,15 +1277,16 @@ planner:
# CLI flag: -bloom-build.planner.max-table-offset
[max_table_offset: <int> | default = 2]
# Maximum number of tasks to queue per tenant.
# CLI flag: -bloom-build.planner.max-tasks-per-tenant
[max_queued_tasks_per_tenant: <int> | default = 30000]
retention:
# Enable bloom retention.
# CLI flag: -bloom-build.planner.retention.enabled
[enabled: <boolean> | default = false]
queue:
# Maximum number of tasks to queue per tenant.
# CLI flag: -bloom-build.planner.queue.max-tasks-per-tenant
[max_queued_tasks_per_tenant: <int> | default = 30000]
builder:
# The grpc_client block configures the gRPC client used to communicate between
# a client and server component in Loki.

@ -5,16 +5,17 @@ import (
"fmt"
"time"
"github.com/grafana/loki/v3/pkg/bloombuild/planner/queue"
"github.com/grafana/loki/v3/pkg/bloombuild/planner/strategies"
)
// Config configures the bloom-planner component.
type Config struct {
PlanningInterval time.Duration `yaml:"planning_interval"`
MinTableOffset int `yaml:"min_table_offset"`
MaxTableOffset int `yaml:"max_table_offset"`
MaxQueuedTasksPerTenant int `yaml:"max_queued_tasks_per_tenant"`
RetentionConfig RetentionConfig `yaml:"retention"`
PlanningInterval time.Duration `yaml:"planning_interval"`
MinTableOffset int `yaml:"min_table_offset"`
MaxTableOffset int `yaml:"max_table_offset"`
RetentionConfig RetentionConfig `yaml:"retention"`
Queue queue.Config `yaml:"queue"`
}
// RegisterFlagsWithPrefix registers flags for the bloom-planner configuration.
@ -28,8 +29,8 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
// dynamically reloaded.
// I'm doing it the simple way for now.
f.IntVar(&cfg.MaxTableOffset, prefix+".max-table-offset", 2, "Oldest day-table offset (from today, inclusive) to compact. This can be used to lower cost by not trying to compact older data which doesn't change. This can be optimized by aligning it with the maximum `reject_old_samples_max_age` setting of any tenant.")
f.IntVar(&cfg.MaxQueuedTasksPerTenant, prefix+".max-tasks-per-tenant", 30000, "Maximum number of tasks to queue per tenant.")
cfg.RetentionConfig.RegisterFlagsWithPrefix(prefix+".retention", f)
cfg.Queue.RegisterFlagsWithPrefix(prefix+".queue", f)
}
func (cfg *Config) Validate() error {
@ -41,6 +42,10 @@ func (cfg *Config) Validate() error {
return err
}
if err := cfg.Queue.Validate(); err != nil {
return err
}
return nil
}

@ -17,16 +17,15 @@ import (
"go.uber.org/atomic"
"github.com/grafana/loki/v3/pkg/bloombuild/common"
"github.com/grafana/loki/v3/pkg/bloombuild/planner/queue"
"github.com/grafana/loki/v3/pkg/bloombuild/planner/strategies"
"github.com/grafana/loki/v3/pkg/bloombuild/protos"
iter "github.com/grafana/loki/v3/pkg/iter/v2"
"github.com/grafana/loki/v3/pkg/queue"
"github.com/grafana/loki/v3/pkg/storage"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
"github.com/grafana/loki/v3/pkg/util"
utillog "github.com/grafana/loki/v3/pkg/util/log"
"github.com/grafana/loki/v3/pkg/util/ring"
)
@ -50,10 +49,7 @@ type Planner struct {
tsdbStore common.TSDBStore
bloomStore bloomshipper.StoreBase
tasksQueue *queue.RequestQueue
activeUsers *util.ActiveUsersCleanupService
pendingTasks sync.Map
tasksQueue *queue.Queue
metrics *Metrics
logger log.Logger
@ -83,23 +79,21 @@ func New(
// Queue to manage tasks
queueMetrics := queue.NewMetrics(r, metricsNamespace, metricsSubsystem)
tasksQueue := queue.NewRequestQueue(cfg.MaxQueuedTasksPerTenant, 0, NewQueueLimits(limits), queueMetrics)
// Clean metrics for inactive users: do not have added tasks to the queue in the last 1 hour
activeUsers := util.NewActiveUsersCleanupService(5*time.Minute, 1*time.Hour, func(user string) {
queueMetrics.Cleanup(user)
})
queueLimits := NewQueueLimits(limits)
tasksQueue, err := queue.NewQueue(logger, cfg.Queue, queueLimits, queueMetrics)
if err != nil {
return nil, fmt.Errorf("error creating tasks queue: %w", err)
}
p := &Planner{
cfg: cfg,
limits: limits,
schemaCfg: schemaCfg,
tsdbStore: tsdbStore,
bloomStore: bloomStore,
tasksQueue: tasksQueue,
activeUsers: activeUsers,
metrics: NewMetrics(r, tasksQueue.GetConnectedConsumersMetric),
logger: logger,
cfg: cfg,
limits: limits,
schemaCfg: schemaCfg,
tsdbStore: tsdbStore,
bloomStore: bloomStore,
tasksQueue: tasksQueue,
metrics: NewMetrics(r, tasksQueue.GetConnectedConsumersMetric),
logger: logger,
}
p.retentionManager = NewRetentionManager(
@ -110,7 +104,7 @@ func New(
p.logger,
)
svcs := []services.Service{p.tasksQueue, p.activeUsers}
svcs := []services.Service{p.tasksQueue}
if rm != nil {
p.ringWatcher = common.NewRingWatcher(rm.RingLifecycler.GetInstanceID(), rm.Ring, time.Minute, logger)
@ -204,7 +198,7 @@ func (p *Planner) trackInflightRequests(ctx context.Context) {
return
case <-inflightTasksTicker.C:
inflight := p.totalPendingTasks()
inflight := p.tasksQueue.TotalPending()
p.metrics.inflightRequests.Observe(float64(inflight))
}
}
@ -708,27 +702,9 @@ func (p *Planner) tenants(ctx context.Context, table config.DayTable) (*iter.Sli
return iter.NewSliceIter(tenants), nil
}
func (p *Planner) addPendingTask(task *QueueTask) {
p.pendingTasks.Store(task.ID, task)
}
func (p *Planner) removePendingTask(task *QueueTask) {
p.pendingTasks.Delete(task.ID)
}
func (p *Planner) totalPendingTasks() (total int) {
p.pendingTasks.Range(func(_, _ interface{}) bool {
total++
return true
})
return total
}
func (p *Planner) enqueueTask(task *QueueTask) error {
p.activeUsers.UpdateUserTimestamp(task.Tenant, time.Now())
return p.tasksQueue.Enqueue(task.Tenant, nil, task, func() {
return p.tasksQueue.Enqueue(task.Tenant(), task, func() {
task.timesEnqueued.Add(1)
p.addPendingTask(task)
})
}
@ -773,11 +749,11 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer
lastIndex = idx
if item == nil {
return fmt.Errorf("dequeue() call resulted in nil response. builder: %s", builderID)
}
task := item.(*QueueTask)
logger := log.With(logger, "task", task.ID)
logger := log.With(logger, "task", task.ID())
queueTime := time.Since(task.queueTime)
p.metrics.queueDuration.Observe(queueTime.Seconds())
@ -785,15 +761,15 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer
if task.ctx.Err() != nil {
level.Warn(logger).Log("msg", "task context done after dequeue", "err", task.ctx.Err())
lastIndex = lastIndex.ReuseLastIndex()
p.removePendingTask(task)
p.tasksQueue.Release(task)
continue
}
result, err := p.forwardTaskToBuilder(builder, builderID, task)
if err != nil {
maxRetries := p.limits.BloomTaskMaxRetries(task.Tenant)
maxRetries := p.limits.BloomTaskMaxRetries(task.Tenant())
if maxRetries > 0 && int(task.timesEnqueued.Load()) >= maxRetries {
p.removePendingTask(task)
p.tasksQueue.Release(task)
level.Error(logger).Log(
"msg", "task failed after max retries",
"retries", task.timesEnqueued.Load(),
@ -801,7 +777,7 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer
"err", err,
)
task.resultsChannel <- &protos.TaskResult{
TaskID: task.ID,
TaskID: task.ID(),
Error: fmt.Errorf("task failed after max retries (%d): %w", maxRetries, err),
}
continue
@ -810,10 +786,10 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer
// Re-queue the task if the builder is failing to process the tasks
if err := p.enqueueTask(task); err != nil {
p.metrics.taskLost.Inc()
p.removePendingTask(task)
p.tasksQueue.Release(task)
level.Error(logger).Log("msg", "error re-enqueuing task. this task will be lost", "err", err)
task.resultsChannel <- &protos.TaskResult{
TaskID: task.ID,
TaskID: task.ID(),
Error: fmt.Errorf("error re-enqueuing task: %w", err),
}
continue
@ -833,7 +809,7 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer
"duration", time.Since(task.queueTime).Seconds(),
"retries", task.timesEnqueued.Load()-1, // -1 because the first enqueue is not a retry
)
p.removePendingTask(task)
p.tasksQueue.Release(task)
// Send the result back to the task. The channel is buffered, so this should not block.
task.resultsChannel <- result
@ -870,7 +846,7 @@ func (p *Planner) forwardTaskToBuilder(
}()
timeout := make(<-chan time.Time)
taskTimeout := p.limits.BuilderResponseTimeout(task.Tenant)
taskTimeout := p.limits.BuilderResponseTimeout(task.Tenant())
if taskTimeout != 0 {
// If the timeout is not 0 (disabled), configure it
timeout = time.After(taskTimeout)
@ -910,8 +886,8 @@ func (p *Planner) receiveResultFromBuilder(
if err != nil {
return nil, fmt.Errorf("error processing task result in builder (%s): %w", builderID, err)
}
if result.TaskID != task.ID {
return nil, fmt.Errorf("unexpected task ID (%s) in response from builder (%s). Expected task ID is %s", result.TaskID, builderID, task.ID)
if result.TaskID != task.ID() {
return nil, fmt.Errorf("unexpected task ID (%s) in response from builder (%s). Expected task ID is %s", result.TaskID, builderID, task.ID())
}
return result, nil

@ -18,6 +18,7 @@ import (
"google.golang.org/grpc"
"github.com/grafana/loki/v3/pkg/bloombuild/planner/plannertest"
"github.com/grafana/loki/v3/pkg/bloombuild/planner/queue"
"github.com/grafana/loki/v3/pkg/bloombuild/planner/strategies"
"github.com/grafana/loki/v3/pkg/bloombuild/protos"
"github.com/grafana/loki/v3/pkg/storage"
@ -163,8 +164,10 @@ func Test_BuilderLoop(t *testing.T) {
//logger := log.NewLogfmtLogger(os.Stdout)
cfg := Config{
PlanningInterval: 1 * time.Hour,
MaxQueuedTasksPerTenant: 10000,
PlanningInterval: 1 * time.Hour,
Queue: queue.Config{
MaxQueuedTasksPerTenant: 10000,
},
}
planner := createPlanner(t, cfg, tc.limits, logger)
@ -206,7 +209,7 @@ func Test_BuilderLoop(t *testing.T) {
}, 5*time.Second, 10*time.Millisecond)
// Finally, the queue should be empty
require.Equal(t, 0, planner.totalPendingTasks())
require.Equal(t, 0, planner.tasksQueue.TotalPending())
// consume all tasks result to free up the channel for the next round of tasks
for i := 0; i < nTasks; i++ {
@ -228,15 +231,15 @@ func Test_BuilderLoop(t *testing.T) {
if tc.shouldConsumeAfterModify {
require.Eventuallyf(
t, func() bool {
return planner.totalPendingTasks() == 0
return planner.tasksQueue.TotalPending() == 0
},
5*time.Second, 10*time.Millisecond,
"tasks not consumed, pending: %d", planner.totalPendingTasks(),
"tasks not consumed, pending: %d", planner.tasksQueue.TotalPending(),
)
} else {
require.Neverf(
t, func() bool {
return planner.totalPendingTasks() == 0
return planner.tasksQueue.TotalPending() == 0
},
5*time.Second, 10*time.Millisecond,
"all tasks were consumed but they should not be",
@ -254,10 +257,10 @@ func Test_BuilderLoop(t *testing.T) {
// Now all tasks should be consumed
require.Eventuallyf(
t, func() bool {
return planner.totalPendingTasks() == 0
return planner.tasksQueue.TotalPending() == 0
},
5*time.Second, 10*time.Millisecond,
"tasks not consumed, pending: %d", planner.totalPendingTasks(),
"tasks not consumed, pending: %d", planner.tasksQueue.TotalPending(),
)
}
})
@ -384,8 +387,10 @@ func Test_processTenantTaskResults(t *testing.T) {
//logger := log.NewLogfmtLogger(os.Stdout)
cfg := Config{
PlanningInterval: 1 * time.Hour,
MaxQueuedTasksPerTenant: 10000,
PlanningInterval: 1 * time.Hour,
Queue: queue.Config{
MaxQueuedTasksPerTenant: 10000,
},
}
planner := createPlanner(t, cfg, &fakeLimits{}, logger)
@ -544,8 +549,10 @@ func Test_deleteOutdatedMetas(t *testing.T) {
// logger := log.NewLogfmtLogger(os.Stdout)
cfg := Config{
PlanningInterval: 1 * time.Hour,
MaxQueuedTasksPerTenant: 10000,
PlanningInterval: 1 * time.Hour,
Queue: queue.Config{
MaxQueuedTasksPerTenant: 10000,
},
}
planner := createPlanner(t, cfg, &fakeLimits{}, logger)

@ -0,0 +1,22 @@
package queue
import (
"flag"
"github.com/grafana/loki/v3/pkg/queue"
)
type Config struct {
MaxQueuedTasksPerTenant int `yaml:"max_queued_tasks_per_tenant"`
}
// RegisterFlagsWithPrefix registers flags for the bloom-planner configuration.
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.IntVar(&cfg.MaxQueuedTasksPerTenant, prefix+".max-tasks-per-tenant", 30000, "Maximum number of tasks to queue per tenant.")
}
func (cfg *Config) Validate() error {
return nil
}
type Limits = queue.Limits

@ -0,0 +1,171 @@
package queue
import (
"context"
"fmt"
"path/filepath"
"sync"
"time"
"github.com/go-kit/log"
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
"github.com/grafana/loki/v3/pkg/queue"
"github.com/grafana/loki/v3/pkg/util"
)
type Task interface {
Tenant() string
Table() string
ID() string
}
// Queue is a wrapper of queue.RequestQueue that uses the file system to store the pending tasks.
// When a task is enqueued, it's stored in the file system and recorded ad pending.
// When it's dequeued, it's removed from the queue but kept in FS until removed.
type Queue struct {
services.Service
queue *queue.RequestQueue
// pendingTasks is a map of task ID to the file where the task is stored.
pendingTasks sync.Map
activeUsers *util.ActiveUsersCleanupService
cfg Config
logger log.Logger
// Subservices manager.
subservices *services.Manager
subservicesWatcher *services.FailureWatcher
}
func NewQueue(logger log.Logger, cfg Config, limits Limits, metrics *Metrics) (*Queue, error) {
tasksQueue := queue.NewRequestQueue(cfg.MaxQueuedTasksPerTenant, 0, limits, metrics)
// Clean metrics for inactive users: do not have added tasks to the queue in the last 1 hour
activeUsers := util.NewActiveUsersCleanupService(5*time.Minute, 1*time.Hour, func(user string) {
metrics.Cleanup(user)
})
svcs := []services.Service{tasksQueue, activeUsers}
subservices, err := services.NewManager(svcs...)
if err != nil {
return nil, fmt.Errorf("failed to create subservices manager: %w", err)
}
subservicesWatcher := services.NewFailureWatcher()
subservicesWatcher.WatchManager(subservices)
q := &Queue{
queue: tasksQueue,
activeUsers: activeUsers,
cfg: cfg,
logger: logger,
subservices: subservices,
subservicesWatcher: subservicesWatcher,
}
q.Service = services.NewIdleService(q.starting, q.stopping)
return q, nil
}
func (q *Queue) starting(ctx context.Context) error {
if err := services.StartManagerAndAwaitHealthy(ctx, q.subservices); err != nil {
return fmt.Errorf("failed to start task queue subservices: %w", err)
}
return nil
}
func (q *Queue) stopping(_ error) error {
if err := services.StopManagerAndAwaitStopped(context.Background(), q.subservices); err != nil {
return fmt.Errorf("failed to stop task queue subservices: %w", err)
}
return nil
}
func (q *Queue) GetConnectedConsumersMetric() float64 {
return q.queue.GetConnectedConsumersMetric()
}
func (q *Queue) NotifyConsumerShutdown(consumer string) {
q.queue.NotifyConsumerShutdown(consumer)
}
func (q *Queue) RegisterConsumerConnection(consumer string) {
q.queue.RegisterConsumerConnection(consumer)
}
func (q *Queue) UnregisterConsumerConnection(consumer string) {
q.queue.UnregisterConsumerConnection(consumer)
}
// Enqueue adds a task to the queue.
func (q *Queue) Enqueue(tenant string, task Task, successFn func()) error {
q.activeUsers.UpdateUserTimestamp(tenant, time.Now())
return q.queue.Enqueue(tenant, nil, task, func() {
taskPath := getTaskPath(task)
_, existed := q.pendingTasks.LoadOrStore(task.ID(), taskPath)
if existed {
// Task already exists, so it's already in the FS
return
}
// TODO: Write to FS
_ = taskPath
if successFn != nil {
successFn()
}
})
}
// Dequeue takes a task from the queue. The task is not removed from the filesystem until Release is called.
func (q *Queue) Dequeue(ctx context.Context, last Index, consumerID string) (Task, Index, error) {
item, idx, err := q.queue.Dequeue(ctx, last, consumerID)
if err != nil {
return nil, idx, err
}
return item.(Task), idx, nil
}
// Release removes a task from the filesystem.
// Dequeue should be called before Remove.
func (q *Queue) Release(task Task) {
taskPath, existed := q.pendingTasks.LoadAndDelete(task.ID())
if !existed {
// Task doesn't exist, so it's not in the FS
return
}
// TODO: Remove from FS
_ = taskPath
}
func (q *Queue) TotalPending() (total int) {
q.pendingTasks.Range(func(_, _ interface{}) bool {
total++
return true
})
return total
}
func getTaskPath(task Task) string {
return filepath.Join("tasks", task.Tenant(), task.Table(), task.ID())
}
// The following are aliases for the queue package types.
type Metrics = queue.Metrics
func NewMetrics(registerer prometheus.Registerer, metricsNamespace string, subsystem string) *Metrics {
return queue.NewMetrics(registerer, metricsNamespace, subsystem)
}
type Index = queue.QueueIndex
var StartIndex = queue.StartIndex
var ErrStopped = queue.ErrStopped

@ -33,3 +33,15 @@ func NewQueueTask(
queueTime: queueTime,
}
}
func (t QueueTask) Tenant() string {
return t.Task.Tenant
}
func (t QueueTask) Table() string {
return t.Task.Table.String()
}
func (t QueueTask) ID() string {
return t.Task.ID
}

Loading…
Cancel
Save