Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/engine/internal/worker/thread.go

270 lines
7.5 KiB

package worker
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/user"
"github.com/thanos-io/objstore"
"github.com/grafana/loki/v3/pkg/dataobj/metastore"
"github.com/grafana/loki/v3/pkg/engine/internal/executor"
"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical"
"github.com/grafana/loki/v3/pkg/engine/internal/scheduler/wire"
"github.com/grafana/loki/v3/pkg/engine/internal/workflow"
"github.com/grafana/loki/v3/pkg/storage/bucket"
utillog "github.com/grafana/loki/v3/pkg/util/log"
"github.com/grafana/loki/v3/pkg/xcap"
)
type threadState int
const (
// threadStateIdle reports that a thread is not running.
threadStateIdle threadState = iota
// threadStateReady reports that a thread is ready to run a task.
threadStateReady
// threadStateBusy reports that a thread is currently running a task.
threadStateBusy
)
func (s threadState) String() string {
switch s {
case threadStateIdle:
return "idle"
case threadStateReady:
return "ready"
case threadStateBusy:
return "busy"
default:
return fmt.Sprintf("threadState(%d)", s)
}
}
// thread represents a worker thread that executes one task at a time.
type thread struct {
BatchSize int64
Bucket objstore.Bucket
Metastore metastore.Metastore
Logger log.Logger
Metrics *metrics
JobManager *jobManager
stateMut sync.RWMutex
state threadState
}
// State returns the current state of the thread.
func (t *thread) State() threadState {
t.stateMut.RLock()
defer t.stateMut.RUnlock()
return t.state
}
// Run starts the thread. Run will request and run tasks in a loop until the
// context is canceled.
func (t *thread) Run(ctx context.Context) error {
defer t.setState(threadStateIdle)
for {
level.Debug(t.Logger).Log("msg", "requesting task")
t.setState(threadStateReady)
job, err := t.JobManager.Recv(ctx)
if err != nil {
return nil
}
t.setState(threadStateBusy)
t.runJob(job.Context, job)
}
}
func (t *thread) setState(state threadState) {
t.stateMut.Lock()
defer t.stateMut.Unlock()
t.state = state
}
func (t *thread) runJob(ctx context.Context, job *threadJob) {
defer job.Close()
logger := log.With(t.Logger, "task_id", job.Task.ULID)
logger = utillog.WithContext(ctx, logger) // Extract trace ID
startTime := time.Now()
level.Info(logger).Log("msg", "starting task")
cfg := executor.Config{
BatchSize: t.BatchSize,
Bucket: bucket.NewXCapBucket(t.Bucket),
Metastore: t.Metastore,
GetExternalInputs: func(_ context.Context, node physical.Node) []executor.Pipeline {
streams := job.Task.Sources[node]
if len(streams) == 0 {
return nil
}
// Create a single nodeSource for all streams. Having streams
// forward to a node source allows for backpressure to be applied
// based on the number of nodeSource (typically 1) rather than the
// number of source streams (unbounded).
//
// Binding the [streamSource] to the input in the loop below
// increases the reference count. As sources are closed, the
// reference count decreases. Once the reference count reaches 0,
// the nodeSource is closed, and reads return EOF.
input := new(nodeSource)
var errs []error
for _, stream := range streams {
source, found := job.Sources[stream.ULID]
if !found {
level.Warn(logger).Log("msg", "no source found for input stream", "stream_id", stream.ULID)
continue
} else if err := source.Bind(input); err != nil {
level.Error(logger).Log("msg", "failed to bind source", "err", err)
errs = append(errs, fmt.Errorf("binding source %s: %w", stream.ULID, err))
continue
}
}
if len(errs) > 0 {
// Since we're returning an error pipeline, we need to close
// input so that writing to any already bound streams doesn't
// block.
input.Close()
return []executor.Pipeline{errorPipeline(errs)}
}
return []executor.Pipeline{input}
},
}
ctx = user.InjectOrgID(ctx, job.Task.TenantID)
ctx, capture := xcap.NewCapture(ctx, nil)
defer capture.End()
pipeline := executor.Run(ctx, cfg, job.Task.Fragment, logger)
// If the root pipeline can be interested in some specific contributing time range
// then subscribe to changes.
// TODO(spiridonov): find a way to subscribe on non-root pipelines.
notifier, ok := executor.Unwrap(pipeline).(executor.ContributingTimeRangeChangedNotifier)
if ok {
notifier.SubscribeToTimeRangeChanges(func(ts time.Time, lessThan bool) {
// Send a Running task status update with the current time range
err := job.Scheduler.SendMessage(ctx, wire.TaskStatusMessage{
ID: job.Task.ULID,
Status: workflow.TaskStatus{
State: workflow.TaskStateRunning,
ContributingTimeRange: workflow.ContributingTimeRange{
Timestamp: ts,
LessThan: lessThan,
},
},
})
if err != nil {
level.Warn(logger).Log("msg", "failed to inform scheduler of task status", "err", err)
}
})
}
err := job.Scheduler.SendMessageAsync(ctx, wire.TaskStatusMessage{
ID: job.Task.ULID,
Status: workflow.TaskStatus{State: workflow.TaskStateRunning},
})
if err != nil {
// For now, we'll continue even if the scheduler didn't get the message
// about the task status.
level.Warn(logger).Log("msg", "failed to inform scheduler of task status", "err", err)
}
_, err = t.drainPipeline(ctx, pipeline, job, logger)
if err != nil {
level.Warn(logger).Log("msg", "task failed", "err", err)
_ = job.Scheduler.SendMessageAsync(ctx, wire.TaskStatusMessage{
ID: job.Task.ULID,
Status: workflow.TaskStatus{
State: workflow.TaskStateFailed,
Error: err,
},
})
pipeline.Close()
return
}
// Finally, close all sinks.
for _, sink := range job.Sinks {
err := sink.Close(ctx)
if err != nil {
level.Warn(logger).Log("msg", "failed to close sink", "err", err)
}
}
// Close before ending capture to ensure all observations are recorded.
pipeline.Close()
// Explicitly call End() here (even though we have a defer statement)
// to finalize the capture before it's included in the TaskStatusMessage.
capture.End()
duration := time.Since(startTime)
level.Info(logger).Log("msg", "task completed", "duration", duration)
t.Metrics.taskExecSeconds.Observe(duration.Seconds())
// Wait for the scheduler to confirm the task has completed before
// requesting a new one. This allows the scheduler to update its bookkeeping
// for how many threads have capacity for requesting tasks.
err = job.Scheduler.SendMessage(ctx, wire.TaskStatusMessage{
ID: job.Task.ULID,
Status: workflow.TaskStatus{State: workflow.TaskStateCompleted, Capture: capture},
})
if err != nil {
level.Warn(logger).Log("msg", "failed to inform scheduler of task status", "err", err)
}
}
func (t *thread) drainPipeline(ctx context.Context, pipeline executor.Pipeline, job *threadJob, logger log.Logger) (int, error) {
var totalRows int
for {
rec, err := pipeline.Read(ctx)
if err != nil && errors.Is(err, executor.EOF) {
break
} else if err != nil {
return totalRows, err
}
totalRows += int(rec.NumRows())
// Don't bother writing empty records to our peers.
if rec.NumRows() == 0 {
continue
}
for _, sink := range job.Sinks {
err := sink.Send(ctx, rec)
if err != nil {
// If a sink doesn't accept the result, we'll continue
// best-effort processing our task. It's possible that one of
// the receiving sinks got canceled, and other sinks may still
// need data.
level.Warn(logger).Log("msg", "failed to send result", "err", err)
continue
}
}
}
return totalRows, nil
}