Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/engine/internal/workflow/workflow.go

442 lines
13 KiB

// Package workflow defines how to represent physical plans as distributed
// workflows.
package workflow
import (
"context"
"fmt"
"sync"
"github.com/apache/arrow-go/v18/arrow"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/oklog/ulid/v2"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/grafana/loki/v3/pkg/engine/internal/executor"
"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical"
"github.com/grafana/loki/v3/pkg/engine/internal/util/dag"
"github.com/grafana/loki/v3/pkg/logqlmodel/stats"
"github.com/grafana/loki/v3/pkg/xcap"
)
var shortCircuitsTotal = promauto.NewCounter(prometheus.CounterOpts{
Name: "loki_engine_v2_task_short_circuits_total",
Help: "Total number of tasks preemptively canceled by short circuiting.",
})
// Options configures a [Workflow].
type Options struct {
// MaxRunningScanTasks specifies the maximum number of scan tasks that may
// run concurrently within a single workflow. 0 means no limit.
MaxRunningScanTasks int
// MaxRunningOtherTasks specifies the maximum number of non-scan tasks that
// may run concurrently within a single workflow. 0 means no limit.
MaxRunningOtherTasks int
// DebugTasks toggles debug messages for a task. This is very verbose and
// should only be enabled for debugging purposes.
//
// Regardless of the value of DebugTasks, workers still log when
// they start and finish assigned tasks.
DebugTasks bool
// DebugStreams toggles debug messages for data streams. This is very
// verbose and should only be enabled for debugging purposes.
DebugStreams bool
}
// Workflow represents a physical plan that has been partitioned into
// parallelizable tasks.
type Workflow struct {
opts Options
logger log.Logger
runner Runner
graph dag.Graph[*Task]
resultsStream *Stream
resultsPipeline *streamPipe
manifest *Manifest
statsMut sync.Mutex
stats stats.Result
captureMut sync.Mutex
capture *xcap.Capture
tasksMut sync.RWMutex
taskStates map[*Task]TaskState
streamsMut sync.RWMutex
streamStates map[*Stream]StreamState
admissionControl *admissionControl
}
// New creates a new Workflow from a physical plan. New returns an error if the
// physical plan does not have exactly one root node, or if the physical plan
// cannot be partitioned into a Workflow.
//
// The provided Runner will be used for Workflow execution.
func New(opts Options, logger log.Logger, tenantID string, runner Runner, plan *physical.Plan) (*Workflow, error) {
graph, err := planWorkflow(tenantID, plan)
if err != nil {
return nil, err
}
// Inject a stream for final task results.
results, err := injectResultsStream(tenantID, &graph)
if err != nil {
return nil, err
}
wf := &Workflow{
opts: opts,
logger: logger,
runner: runner,
graph: graph,
resultsStream: results,
resultsPipeline: newStreamPipe(),
taskStates: make(map[*Task]TaskState),
streamStates: make(map[*Stream]StreamState),
}
if err := wf.init(context.Background()); err != nil {
wf.Close()
return nil, err
}
return wf, nil
}
// injectResultsStream injects a new stream into the sinks of the root task for
// the workflow to receive final results.
func injectResultsStream(tenantID string, graph *dag.Graph[*Task]) (*Stream, error) {
results := &Stream{ULID: ulid.Make(), TenantID: tenantID}
// Inject a stream for final task results.
rootTask, err := graph.Root()
if err != nil {
return nil, err
}
rootNode, err := rootTask.Fragment.Root()
if err != nil {
return nil, err
}
rootTask.Sinks[rootNode] = append(rootTask.Sinks[rootNode], results)
return results, nil
}
// init initializes the workflow.
func (wf *Workflow) init(ctx context.Context) error {
wf.manifest = &Manifest{
Streams: wf.allStreams(),
Tasks: wf.allTasks(),
StreamEventHandler: wf.onStreamChange,
TaskEventHandler: wf.onTaskChange,
}
if err := wf.runner.RegisterManifest(ctx, wf.manifest); err != nil {
return err
}
return wf.runner.Listen(ctx, wf.resultsPipeline, wf.resultsStream)
}
// Close releases resources associated with the workflow.
func (wf *Workflow) Close() {
if err := wf.runner.UnregisterManifest(context.Background(), wf.manifest); err != nil {
level.Warn(wf.logger).Log("msg", "failed to unregister workflow manifest", "err", err)
}
}
// Run executes the workflow, returning a pipeline to read results from. The
// provided context is used for the lifetime of the workflow execution.
//
// The returned pipeline must be closed when the workflow is complete to release
// resources.
func (wf *Workflow) Run(ctx context.Context) (pipeline executor.Pipeline, err error) {
wf.capture = xcap.CaptureFromContext(ctx)
wrapped := &wrappedPipeline{
inner: wf.resultsPipeline,
onClose: func() {
// Merge final stats results back into the caller.
wf.statsMut.Lock()
defer wf.statsMut.Unlock()
stats.JoinResults(ctx, wf.stats)
},
}
// Start dispatching in background goroutine
go func() {
err := wf.dispatchTasks(ctx, wf.manifest.Tasks)
if err != nil {
wf.resultsPipeline.SetError(err)
wrapped.Close()
}
}()
return wrapped, nil
}
// dispatchTasks groups the slice of tasks by their associated "admission lane" (token bucket)
// and dispatches them to the runner.
// Tasks from different admission lanes are dispatched concurrently.
// The caller needs to wait on the returned error group.
func (wf *Workflow) dispatchTasks(ctx context.Context, tasks []*Task) error {
wf.admissionControl = newAdmissionControl(
int64(wf.opts.MaxRunningScanTasks),
int64(wf.opts.MaxRunningOtherTasks),
)
groups := wf.admissionControl.groupByType(tasks)
for _, taskType := range []taskType{
taskTypeOther,
taskTypeScan,
} {
lane := wf.admissionControl.get(taskType)
tasks := groups[taskType]
var offset, batchSize int64
total := int64(len(tasks))
for ; offset < total; offset += batchSize {
batchSize = int64(1)
if err := lane.Acquire(ctx, batchSize); err != nil {
return fmt.Errorf("failed to acquire tokens from admission lane %s: %w", taskType, err)
}
if err := wf.runner.Start(ctx, tasks[offset:offset+batchSize]...); err != nil {
return fmt.Errorf("failed to start tasks: %w", err)
}
}
}
return nil
}
func (wf *Workflow) allStreams() []*Stream {
var (
result []*Stream
seenStreams = map[*Stream]struct{}{}
)
// We only iterate over sources below (for convenience), and since
// wf.results is only used as a sink, we need to manually add it here.
result = append(result, wf.resultsStream)
seenStreams[wf.resultsStream] = struct{}{}
for _, root := range wf.graph.Roots() {
_ = wf.graph.Walk(root, func(t *Task) error {
// Task construction guarantees that there is a sink for each source
// (minus the results stream we generate), so there's no point in
// iterating over Sources and Sinks.
for _, streams := range t.Sources {
for _, stream := range streams {
if _, seen := seenStreams[stream]; seen {
continue
}
seenStreams[stream] = struct{}{}
result = append(result, stream)
}
}
return nil
}, dag.PreOrderWalk)
}
return result
}
func (wf *Workflow) allTasks() []*Task {
var tasks []*Task
for _, root := range wf.graph.Roots() {
// [dag.Graph.Walk] guarantees that each node is only visited once, so
// we can safely append the task to the list without checking if it's
// already been seen.
_ = wf.graph.Walk(root, func(t *Task) error {
tasks = append(tasks, t)
return nil
}, dag.PreOrderWalk)
}
return tasks
}
func (wf *Workflow) onStreamChange(_ context.Context, stream *Stream, newState StreamState) {
if wf.opts.DebugStreams {
level.Debug(wf.logger).Log("msg", "stream state change", "stream_id", stream.ULID, "new_state", newState)
}
wf.streamsMut.Lock()
defer wf.streamsMut.Unlock()
wf.streamStates[stream] = newState
if newState == StreamStateClosed && stream.ULID == wf.resultsStream.ULID {
// Close the results pipeline once the results stream has closed.
wf.resultsPipeline.Close()
}
}
func (wf *Workflow) onTaskChange(ctx context.Context, task *Task, newStatus TaskStatus) {
if wf.opts.DebugTasks {
level.Debug(wf.logger).Log("msg", "task state change", "task_id", task.ULID, "new_state", newStatus.State)
}
wf.tasksMut.Lock()
oldState := wf.taskStates[task]
wf.taskStates[task] = newStatus.State
wf.tasksMut.Unlock()
if newStatus.State.Terminal() {
wf.handleTerminalStateChange(ctx, task, oldState, newStatus)
} else {
wf.handleNonTerminalStateChange(ctx, task, newStatus)
}
}
func (wf *Workflow) handleTerminalStateChange(ctx context.Context, task *Task, oldState TaskState, newStatus TaskStatus) {
// State has not changed
if oldState == newStatus.State {
return
}
if newStatus.State == TaskStateFailed {
// Use the first failure from a task as the failure for the entire
// workflow.
wf.resultsPipeline.SetError(newStatus.Error)
}
if wf.admissionControl == nil {
level.Warn(wf.logger).Log("msg", "admission control was not initialized")
} else if oldState == TaskStatePending || oldState == TaskStateRunning {
// Release tokens only if the task was already enqueued and therefore either pending or running.
defer wf.admissionControl.laneFor(task).Release(1)
}
if newStatus.Capture != nil {
wf.mergeCapture(newStatus.Capture)
}
if newStatus.Statistics != nil {
wf.mergeResults(*newStatus.Statistics)
}
// task reached a terminal state. We need to detect if task's immediate
// children should be canceled. We only look at immediate unterminated
// children, since canceling them will trigger onTaskChange to process
// indirect children.
var tasksToCancel []*Task
wf.tasksMut.RLock()
{
NextChild:
for _, child := range wf.graph.Children(task) {
// Ignore children in terminal states.
if childState := wf.taskStates[child]; childState.Terminal() {
continue
}
// Cancel the child if and only if all of the child's parents (which
// includes the task that just updated) are in a terminal state.
for _, parent := range wf.graph.Parents(child) {
parentState := wf.taskStates[parent]
if !parentState.Terminal() {
continue NextChild
}
}
tasksToCancel = append(tasksToCancel, child)
}
}
wf.tasksMut.RUnlock()
wf.cancelTasks(ctx, tasksToCancel)
}
func (wf *Workflow) handleNonTerminalStateChange(ctx context.Context, task *Task, newStatus TaskStatus) {
// If the task is running, but its contributing time range has been changed
if newStatus.State == TaskStateRunning && !newStatus.ContributingTimeRange.Timestamp.IsZero() {
// We need to detect if task's immediate children should be canceled because they can no longer contribute
// to the state of the running task. We only look at immediate unterminated
// children, since canceling them will trigger onTaskChange to process indirect children.
var tasksToCancel []*Task
ts := newStatus.ContributingTimeRange.Timestamp
lessThan := newStatus.ContributingTimeRange.LessThan
wf.tasksMut.RLock()
{
for _, child := range wf.graph.Children(task) {
// Ignore children in terminal states.
if childState := wf.taskStates[child]; childState.Terminal() {
continue
}
// Ignore if time ranges intersect, so they can contribute
if lessThan && child.MaxTimeRange.Start.Before(ts) ||
!lessThan && child.MaxTimeRange.End.After(ts) {
continue
}
// TODO(spiridonov): We do not check parents here right now, there is only 1 parent now,
// but in general a task can be canceled only if all its parents are in terminal states OR
// have non-inersecting contributing time range.
tasksToCancel = append(tasksToCancel, child)
shortCircuitsTotal.Inc()
}
}
wf.tasksMut.RUnlock()
wf.cancelTasks(ctx, tasksToCancel)
}
}
func (wf *Workflow) cancelTasks(ctx context.Context, tasks []*Task) {
// Runners may re-invoke onTaskChange, so we don't want to hold the mutex
// when calling this.
if err := wf.runner.Cancel(ctx, tasks...); err != nil {
level.Warn(wf.logger).Log("msg", "failed to cancel tasks", "err", err)
}
}
func (wf *Workflow) mergeCapture(capture *xcap.Capture) {
wf.captureMut.Lock()
defer wf.captureMut.Unlock()
if wf.capture == nil || capture == nil {
return
}
// Merge all regions from the task's capture into the workflow's capture.
for _, region := range capture.Regions() {
wf.capture.AddRegion(region)
}
}
func (wf *Workflow) mergeResults(results stats.Result) {
wf.statsMut.Lock()
defer wf.statsMut.Unlock()
wf.stats.Merge(results)
}
type wrappedPipeline struct {
initOnce sync.Once
inner executor.Pipeline
onClose func()
}
func (p *wrappedPipeline) Read(ctx context.Context) (arrow.RecordBatch, error) {
return p.inner.Read(ctx)
}
// Close closes the resources of the pipeline.
func (p *wrappedPipeline) Close() {
p.inner.Close()
p.onClose()
}