mirror of https://github.com/grafana/loki
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
312 lines
9.1 KiB
312 lines
9.1 KiB
package queue
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"path/filepath"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/go-kit/log"
|
|
"github.com/go-kit/log/level"
|
|
"github.com/gogo/protobuf/proto"
|
|
"github.com/grafana/dskit/services"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
|
|
"github.com/grafana/loki/v3/pkg/bloombuild/protos"
|
|
"github.com/grafana/loki/v3/pkg/queue"
|
|
"github.com/grafana/loki/v3/pkg/storage"
|
|
"github.com/grafana/loki/v3/pkg/storage/chunk/client"
|
|
"github.com/grafana/loki/v3/pkg/storage/chunk/client/local"
|
|
"github.com/grafana/loki/v3/pkg/storage/types"
|
|
"github.com/grafana/loki/v3/pkg/util"
|
|
)
|
|
|
|
// Queue is a wrapper of queue.RequestQueue that uses the file system to store the pending tasks.
|
|
// The queue also allows to store metadata with the task. This metadata can be anything. Metadata is stored in memory.
|
|
// When a task is enqueued (Enqueue), it's stored in the file system and recorded as pending.
|
|
// When it's dequeued (Dequeue), it's removed from the queue but kept in FS until released (Release).
|
|
//
|
|
// TODO(salvacorts): In the future we may reuse this queue to store any proto message. We would need to use generics for that.
|
|
type Queue struct {
|
|
services.Service
|
|
|
|
// queue is the in-memory queue where the tasks are stored.
|
|
// If cfg.StoreTasksOnDisk is false, we store the whole task here and the metadata.
|
|
// Otherwise, we store the task path and the metadata.
|
|
queue *queue.RequestQueue
|
|
// pendingTasks is a map of task ID to the file where the task is stored.
|
|
// If cfg.StoreTasksOnDisk is false, the value is empty.
|
|
pendingTasks sync.Map
|
|
activeUsers *util.ActiveUsersCleanupService
|
|
|
|
cfg Config
|
|
logger log.Logger
|
|
disk client.ObjectClient
|
|
|
|
// Subservices manager.
|
|
subservices *services.Manager
|
|
subservicesWatcher *services.FailureWatcher
|
|
}
|
|
|
|
func NewQueue(
|
|
logger log.Logger,
|
|
cfg Config,
|
|
limits Limits,
|
|
metrics *Metrics,
|
|
storeMetrics storage.ClientMetrics,
|
|
) (*Queue, error) {
|
|
// Configure the filesystem client if we are storing tasks on disk.
|
|
var diskClient client.ObjectClient
|
|
if cfg.StoreTasksOnDisk {
|
|
storeCfg := storage.Config{
|
|
FSConfig: local.FSConfig{Directory: cfg.TasksDiskDirectory},
|
|
}
|
|
var err error
|
|
diskClient, err = storage.NewObjectClient(types.StorageTypeFileSystem, "bloom-planner-queue", storeCfg, storeMetrics)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create object client: %w", err)
|
|
}
|
|
}
|
|
|
|
tasksQueue := queue.NewRequestQueue(cfg.MaxQueuedTasksPerTenant, 0, limits, metrics)
|
|
|
|
// Clean metrics for inactive users: do not have added tasks to the queue in the last 1 hour
|
|
activeUsers := util.NewActiveUsersCleanupService(5*time.Minute, 1*time.Hour, func(user string) {
|
|
metrics.Cleanup(user)
|
|
})
|
|
|
|
svcs := []services.Service{tasksQueue, activeUsers}
|
|
subservices, err := services.NewManager(svcs...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create subservices manager: %w", err)
|
|
}
|
|
subservicesWatcher := services.NewFailureWatcher()
|
|
subservicesWatcher.WatchManager(subservices)
|
|
|
|
q := &Queue{
|
|
queue: tasksQueue,
|
|
activeUsers: activeUsers,
|
|
cfg: cfg,
|
|
logger: logger,
|
|
disk: diskClient,
|
|
subservices: subservices,
|
|
subservicesWatcher: subservicesWatcher,
|
|
}
|
|
|
|
q.Service = services.NewIdleService(q.starting, q.stopping)
|
|
|
|
return q, nil
|
|
}
|
|
|
|
func (q *Queue) starting(ctx context.Context) error {
|
|
// TODO(salvacorts): We do not recover the queue from the disk yet.
|
|
// Until then, we just remove all the files in the directory so the disk
|
|
// doesn't grow indefinitely.
|
|
if q.cfg.StoreTasksOnDisk && q.cfg.CleanTasksDirectory {
|
|
if err := q.cleanTasksOnDisk(ctx); err != nil {
|
|
return fmt.Errorf("failed to clean tasks on disk during startup: %w", err)
|
|
}
|
|
}
|
|
|
|
if err := services.StartManagerAndAwaitHealthy(ctx, q.subservices); err != nil {
|
|
return fmt.Errorf("failed to start task queue subservices: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (q *Queue) cleanTasksOnDisk(ctx context.Context) error {
|
|
objects, _, err := q.disk.List(ctx, "", "")
|
|
if err != nil {
|
|
return fmt.Errorf("failed to list tasks: %w", err)
|
|
}
|
|
|
|
for _, o := range objects {
|
|
if err = q.disk.DeleteObject(ctx, o.Key); err != nil {
|
|
return fmt.Errorf("failed to delete task (%s)", o.Key)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (q *Queue) stopping(_ error) error {
|
|
if err := services.StopManagerAndAwaitStopped(context.Background(), q.subservices); err != nil {
|
|
return fmt.Errorf("failed to stop task queue subservices: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (q *Queue) GetConnectedConsumersMetric() float64 {
|
|
return q.queue.GetConnectedConsumersMetric()
|
|
}
|
|
|
|
func (q *Queue) NotifyConsumerShutdown(consumer string) {
|
|
q.queue.NotifyConsumerShutdown(consumer)
|
|
}
|
|
|
|
func (q *Queue) RegisterConsumerConnection(consumer string) {
|
|
q.queue.RegisterConsumerConnection(consumer)
|
|
}
|
|
func (q *Queue) UnregisterConsumerConnection(consumer string) {
|
|
q.queue.UnregisterConsumerConnection(consumer)
|
|
}
|
|
|
|
type metaWithPath struct {
|
|
taskPath string
|
|
metadata any
|
|
}
|
|
|
|
type metaWithTask struct {
|
|
task *protos.ProtoTask
|
|
metadata any
|
|
}
|
|
|
|
// Enqueue adds a task to the queue.
|
|
// The task is enqueued only if it doesn't already exist in the queue.
|
|
func (q *Queue) Enqueue(task *protos.ProtoTask, metadata any, successFn func()) error {
|
|
tenant := task.Tenant
|
|
|
|
q.activeUsers.UpdateUserTimestamp(tenant, time.Now())
|
|
|
|
var taskPath string
|
|
var enqueuedValue any = metaWithTask{task: task, metadata: metadata}
|
|
if q.cfg.StoreTasksOnDisk {
|
|
taskPath = getTaskPath(task)
|
|
if err := q.writeTask(task, taskPath); err != nil {
|
|
return err
|
|
}
|
|
|
|
// If we're storing tasks on disk, we don't want to store the task in the queue but just the path (and the metadata)
|
|
enqueuedValue = metaWithPath{taskPath: taskPath, metadata: metadata}
|
|
}
|
|
|
|
return q.queue.Enqueue(tenant, nil, enqueuedValue, func() {
|
|
// Note: If we are storing tasks in-mem, taskPath is empty
|
|
q.pendingTasks.Store(task.Id, taskPath)
|
|
if successFn != nil {
|
|
successFn()
|
|
}
|
|
})
|
|
}
|
|
|
|
// Dequeue takes a task from the queue. The task is not removed from the filesystem until Release is called.
|
|
func (q *Queue) Dequeue(ctx context.Context, last Index, consumerID string) (*protos.ProtoTask, any, Index, error) {
|
|
item, idx, err := q.queue.Dequeue(ctx, last, consumerID)
|
|
if err != nil {
|
|
return nil, nil, idx, err
|
|
}
|
|
|
|
if !q.cfg.StoreTasksOnDisk {
|
|
val := item.(metaWithTask)
|
|
return val.task, val.metadata, idx, nil
|
|
}
|
|
|
|
// Read task from disk
|
|
meta := item.(metaWithPath)
|
|
task, err := q.readTask(meta.taskPath)
|
|
if err != nil {
|
|
return nil, nil, idx, err
|
|
}
|
|
|
|
return task, meta.metadata, idx, nil
|
|
}
|
|
|
|
// Release removes a task from the filesystem.
|
|
// Dequeue should be called before Remove.
|
|
func (q *Queue) Release(task *protos.ProtoTask) {
|
|
val, existed := q.pendingTasks.LoadAndDelete(task.Id)
|
|
if !existed {
|
|
// Task doesn't exist, so it's not in the FS
|
|
return
|
|
}
|
|
|
|
if !q.cfg.StoreTasksOnDisk {
|
|
return
|
|
}
|
|
|
|
// Remove task from FS
|
|
taskPath := val.(string)
|
|
if err := q.deleteTask(taskPath); err != nil {
|
|
level.Error(q.logger).Log("msg", "failed to remove task from disk", "task", task.Id, "err", err)
|
|
}
|
|
}
|
|
|
|
func (q *Queue) writeTask(task *protos.ProtoTask, taskPath string) error {
|
|
// Do not write the task if it's already pending. I.e. was already enqueued thus written to disk.
|
|
if _, exists := q.pendingTasks.Load(task.Id); exists {
|
|
return nil
|
|
}
|
|
|
|
taskSer, err := proto.Marshal(task)
|
|
if err != nil {
|
|
q.logger.Log("msg", "failed to serialize task", "task", task.Id, "err", err)
|
|
return fmt.Errorf("failed to serialize task: %w", err)
|
|
}
|
|
|
|
reader := bytes.NewReader(taskSer)
|
|
if err = q.disk.PutObject(context.Background(), taskPath, reader); err != nil {
|
|
q.logger.Log("msg", "failed to write task to disk", "task", task.Id, "err", err)
|
|
return fmt.Errorf("failed to write task to disk: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (q *Queue) readTask(taskPath string) (*protos.ProtoTask, error) {
|
|
reader, _, err := q.disk.GetObject(context.Background(), taskPath)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to read task from disk: %w", err)
|
|
}
|
|
|
|
taskSer, err := io.ReadAll(reader)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to read task from disk: %w", err)
|
|
}
|
|
|
|
task := &protos.ProtoTask{}
|
|
if err = proto.Unmarshal(taskSer, task); err != nil {
|
|
return nil, fmt.Errorf("failed to deserialize task: %w", err)
|
|
}
|
|
|
|
return task, nil
|
|
}
|
|
|
|
func (q *Queue) deleteTask(taskPath string) error {
|
|
if err := q.disk.DeleteObject(context.Background(), taskPath); err != nil {
|
|
return fmt.Errorf("failed to delete task from disk: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (q *Queue) TotalPending() (total int) {
|
|
q.pendingTasks.Range(func(_, _ interface{}) bool {
|
|
total++
|
|
return true
|
|
})
|
|
return total
|
|
}
|
|
|
|
func getTaskPath(task *protos.ProtoTask) string {
|
|
table := protos.FromProtoDayTableToDayTable(task.Table)
|
|
taskFile := task.Id + ".protobuf"
|
|
return filepath.Join("tasks", task.Tenant, table.String(), taskFile)
|
|
}
|
|
|
|
// The following are aliases for the queue package types.
|
|
|
|
type Metrics = queue.Metrics
|
|
|
|
func NewMetrics(registerer prometheus.Registerer, metricsNamespace string, subsystem string) *Metrics {
|
|
return queue.NewMetrics(registerer, metricsNamespace, subsystem)
|
|
}
|
|
|
|
type Index = queue.QueueIndex
|
|
|
|
var StartIndex = queue.StartIndex
|
|
|
|
var ErrStopped = queue.ErrStopped
|
|
|