mirror of https://github.com/grafana/loki
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
442 lines
13 KiB
442 lines
13 KiB
// Package workflow defines how to represent physical plans as distributed
|
|
// workflows.
|
|
package workflow
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
|
|
"github.com/apache/arrow-go/v18/arrow"
|
|
"github.com/go-kit/log"
|
|
"github.com/go-kit/log/level"
|
|
"github.com/oklog/ulid/v2"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/prometheus/client_golang/prometheus/promauto"
|
|
|
|
"github.com/grafana/loki/v3/pkg/engine/internal/executor"
|
|
"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical"
|
|
"github.com/grafana/loki/v3/pkg/engine/internal/util/dag"
|
|
"github.com/grafana/loki/v3/pkg/logqlmodel/stats"
|
|
"github.com/grafana/loki/v3/pkg/xcap"
|
|
)
|
|
|
|
var shortCircuitsTotal = promauto.NewCounter(prometheus.CounterOpts{
|
|
Name: "loki_engine_v2_task_short_circuits_total",
|
|
Help: "Total number of tasks preemptively canceled by short circuiting.",
|
|
})
|
|
|
|
// Options configures a [Workflow].
|
|
type Options struct {
|
|
// MaxRunningScanTasks specifies the maximum number of scan tasks that may
|
|
// run concurrently within a single workflow. 0 means no limit.
|
|
MaxRunningScanTasks int
|
|
|
|
// MaxRunningOtherTasks specifies the maximum number of non-scan tasks that
|
|
// may run concurrently within a single workflow. 0 means no limit.
|
|
MaxRunningOtherTasks int
|
|
|
|
// DebugTasks toggles debug messages for a task. This is very verbose and
|
|
// should only be enabled for debugging purposes.
|
|
//
|
|
// Regardless of the value of DebugTasks, workers still log when
|
|
// they start and finish assigned tasks.
|
|
DebugTasks bool
|
|
|
|
// DebugStreams toggles debug messages for data streams. This is very
|
|
// verbose and should only be enabled for debugging purposes.
|
|
DebugStreams bool
|
|
}
|
|
|
|
// Workflow represents a physical plan that has been partitioned into
|
|
// parallelizable tasks.
|
|
type Workflow struct {
|
|
opts Options
|
|
logger log.Logger
|
|
runner Runner
|
|
graph dag.Graph[*Task]
|
|
resultsStream *Stream
|
|
resultsPipeline *streamPipe
|
|
manifest *Manifest
|
|
|
|
statsMut sync.Mutex
|
|
stats stats.Result
|
|
|
|
captureMut sync.Mutex
|
|
capture *xcap.Capture
|
|
|
|
tasksMut sync.RWMutex
|
|
taskStates map[*Task]TaskState
|
|
|
|
streamsMut sync.RWMutex
|
|
streamStates map[*Stream]StreamState
|
|
|
|
admissionControl *admissionControl
|
|
}
|
|
|
|
// New creates a new Workflow from a physical plan. New returns an error if the
|
|
// physical plan does not have exactly one root node, or if the physical plan
|
|
// cannot be partitioned into a Workflow.
|
|
//
|
|
// The provided Runner will be used for Workflow execution.
|
|
func New(opts Options, logger log.Logger, tenantID string, runner Runner, plan *physical.Plan) (*Workflow, error) {
|
|
graph, err := planWorkflow(tenantID, plan)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Inject a stream for final task results.
|
|
results, err := injectResultsStream(tenantID, &graph)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
wf := &Workflow{
|
|
opts: opts,
|
|
logger: logger,
|
|
runner: runner,
|
|
graph: graph,
|
|
resultsStream: results,
|
|
resultsPipeline: newStreamPipe(),
|
|
|
|
taskStates: make(map[*Task]TaskState),
|
|
streamStates: make(map[*Stream]StreamState),
|
|
}
|
|
if err := wf.init(context.Background()); err != nil {
|
|
wf.Close()
|
|
return nil, err
|
|
}
|
|
return wf, nil
|
|
}
|
|
|
|
// injectResultsStream injects a new stream into the sinks of the root task for
|
|
// the workflow to receive final results.
|
|
func injectResultsStream(tenantID string, graph *dag.Graph[*Task]) (*Stream, error) {
|
|
results := &Stream{ULID: ulid.Make(), TenantID: tenantID}
|
|
|
|
// Inject a stream for final task results.
|
|
rootTask, err := graph.Root()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
rootNode, err := rootTask.Fragment.Root()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
rootTask.Sinks[rootNode] = append(rootTask.Sinks[rootNode], results)
|
|
return results, nil
|
|
}
|
|
|
|
// init initializes the workflow.
|
|
func (wf *Workflow) init(ctx context.Context) error {
|
|
wf.manifest = &Manifest{
|
|
Streams: wf.allStreams(),
|
|
Tasks: wf.allTasks(),
|
|
|
|
StreamEventHandler: wf.onStreamChange,
|
|
TaskEventHandler: wf.onTaskChange,
|
|
}
|
|
if err := wf.runner.RegisterManifest(ctx, wf.manifest); err != nil {
|
|
return err
|
|
}
|
|
return wf.runner.Listen(ctx, wf.resultsPipeline, wf.resultsStream)
|
|
}
|
|
|
|
// Close releases resources associated with the workflow.
|
|
func (wf *Workflow) Close() {
|
|
if err := wf.runner.UnregisterManifest(context.Background(), wf.manifest); err != nil {
|
|
level.Warn(wf.logger).Log("msg", "failed to unregister workflow manifest", "err", err)
|
|
}
|
|
}
|
|
|
|
// Run executes the workflow, returning a pipeline to read results from. The
|
|
// provided context is used for the lifetime of the workflow execution.
|
|
//
|
|
// The returned pipeline must be closed when the workflow is complete to release
|
|
// resources.
|
|
func (wf *Workflow) Run(ctx context.Context) (pipeline executor.Pipeline, err error) {
|
|
wf.capture = xcap.CaptureFromContext(ctx)
|
|
|
|
wrapped := &wrappedPipeline{
|
|
inner: wf.resultsPipeline,
|
|
onClose: func() {
|
|
// Merge final stats results back into the caller.
|
|
wf.statsMut.Lock()
|
|
defer wf.statsMut.Unlock()
|
|
stats.JoinResults(ctx, wf.stats)
|
|
},
|
|
}
|
|
|
|
// Start dispatching in background goroutine
|
|
go func() {
|
|
err := wf.dispatchTasks(ctx, wf.manifest.Tasks)
|
|
if err != nil {
|
|
wf.resultsPipeline.SetError(err)
|
|
wrapped.Close()
|
|
}
|
|
}()
|
|
|
|
return wrapped, nil
|
|
}
|
|
|
|
// dispatchTasks groups the slice of tasks by their associated "admission lane" (token bucket)
|
|
// and dispatches them to the runner.
|
|
// Tasks from different admission lanes are dispatched concurrently.
|
|
// The caller needs to wait on the returned error group.
|
|
func (wf *Workflow) dispatchTasks(ctx context.Context, tasks []*Task) error {
|
|
wf.admissionControl = newAdmissionControl(
|
|
int64(wf.opts.MaxRunningScanTasks),
|
|
int64(wf.opts.MaxRunningOtherTasks),
|
|
)
|
|
|
|
groups := wf.admissionControl.groupByType(tasks)
|
|
for _, taskType := range []taskType{
|
|
taskTypeOther,
|
|
taskTypeScan,
|
|
} {
|
|
lane := wf.admissionControl.get(taskType)
|
|
tasks := groups[taskType]
|
|
|
|
var offset, batchSize int64
|
|
total := int64(len(tasks))
|
|
|
|
for ; offset < total; offset += batchSize {
|
|
batchSize = int64(1)
|
|
if err := lane.Acquire(ctx, batchSize); err != nil {
|
|
return fmt.Errorf("failed to acquire tokens from admission lane %s: %w", taskType, err)
|
|
}
|
|
if err := wf.runner.Start(ctx, tasks[offset:offset+batchSize]...); err != nil {
|
|
return fmt.Errorf("failed to start tasks: %w", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (wf *Workflow) allStreams() []*Stream {
|
|
var (
|
|
result []*Stream
|
|
seenStreams = map[*Stream]struct{}{}
|
|
)
|
|
|
|
// We only iterate over sources below (for convenience), and since
|
|
// wf.results is only used as a sink, we need to manually add it here.
|
|
result = append(result, wf.resultsStream)
|
|
seenStreams[wf.resultsStream] = struct{}{}
|
|
|
|
for _, root := range wf.graph.Roots() {
|
|
_ = wf.graph.Walk(root, func(t *Task) error {
|
|
// Task construction guarantees that there is a sink for each source
|
|
// (minus the results stream we generate), so there's no point in
|
|
// iterating over Sources and Sinks.
|
|
for _, streams := range t.Sources {
|
|
for _, stream := range streams {
|
|
if _, seen := seenStreams[stream]; seen {
|
|
continue
|
|
}
|
|
seenStreams[stream] = struct{}{}
|
|
result = append(result, stream)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}, dag.PreOrderWalk)
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
func (wf *Workflow) allTasks() []*Task {
|
|
var tasks []*Task
|
|
|
|
for _, root := range wf.graph.Roots() {
|
|
// [dag.Graph.Walk] guarantees that each node is only visited once, so
|
|
// we can safely append the task to the list without checking if it's
|
|
// already been seen.
|
|
_ = wf.graph.Walk(root, func(t *Task) error {
|
|
tasks = append(tasks, t)
|
|
return nil
|
|
}, dag.PreOrderWalk)
|
|
}
|
|
|
|
return tasks
|
|
}
|
|
|
|
func (wf *Workflow) onStreamChange(_ context.Context, stream *Stream, newState StreamState) {
|
|
if wf.opts.DebugStreams {
|
|
level.Debug(wf.logger).Log("msg", "stream state change", "stream_id", stream.ULID, "new_state", newState)
|
|
}
|
|
|
|
wf.streamsMut.Lock()
|
|
defer wf.streamsMut.Unlock()
|
|
|
|
wf.streamStates[stream] = newState
|
|
|
|
if newState == StreamStateClosed && stream.ULID == wf.resultsStream.ULID {
|
|
// Close the results pipeline once the results stream has closed.
|
|
wf.resultsPipeline.Close()
|
|
}
|
|
}
|
|
|
|
func (wf *Workflow) onTaskChange(ctx context.Context, task *Task, newStatus TaskStatus) {
|
|
if wf.opts.DebugTasks {
|
|
level.Debug(wf.logger).Log("msg", "task state change", "task_id", task.ULID, "new_state", newStatus.State)
|
|
}
|
|
|
|
wf.tasksMut.Lock()
|
|
oldState := wf.taskStates[task]
|
|
wf.taskStates[task] = newStatus.State
|
|
wf.tasksMut.Unlock()
|
|
|
|
if newStatus.State.Terminal() {
|
|
wf.handleTerminalStateChange(ctx, task, oldState, newStatus)
|
|
} else {
|
|
wf.handleNonTerminalStateChange(ctx, task, newStatus)
|
|
}
|
|
}
|
|
|
|
func (wf *Workflow) handleTerminalStateChange(ctx context.Context, task *Task, oldState TaskState, newStatus TaskStatus) {
|
|
// State has not changed
|
|
if oldState == newStatus.State {
|
|
return
|
|
}
|
|
|
|
if newStatus.State == TaskStateFailed {
|
|
// Use the first failure from a task as the failure for the entire
|
|
// workflow.
|
|
wf.resultsPipeline.SetError(newStatus.Error)
|
|
}
|
|
|
|
if wf.admissionControl == nil {
|
|
level.Warn(wf.logger).Log("msg", "admission control was not initialized")
|
|
} else if oldState == TaskStatePending || oldState == TaskStateRunning {
|
|
// Release tokens only if the task was already enqueued and therefore either pending or running.
|
|
defer wf.admissionControl.laneFor(task).Release(1)
|
|
}
|
|
|
|
if newStatus.Capture != nil {
|
|
wf.mergeCapture(newStatus.Capture)
|
|
}
|
|
|
|
if newStatus.Statistics != nil {
|
|
wf.mergeResults(*newStatus.Statistics)
|
|
}
|
|
|
|
// task reached a terminal state. We need to detect if task's immediate
|
|
// children should be canceled. We only look at immediate unterminated
|
|
// children, since canceling them will trigger onTaskChange to process
|
|
// indirect children.
|
|
var tasksToCancel []*Task
|
|
|
|
wf.tasksMut.RLock()
|
|
{
|
|
NextChild:
|
|
for _, child := range wf.graph.Children(task) {
|
|
// Ignore children in terminal states.
|
|
if childState := wf.taskStates[child]; childState.Terminal() {
|
|
continue
|
|
}
|
|
|
|
// Cancel the child if and only if all of the child's parents (which
|
|
// includes the task that just updated) are in a terminal state.
|
|
for _, parent := range wf.graph.Parents(child) {
|
|
parentState := wf.taskStates[parent]
|
|
if !parentState.Terminal() {
|
|
continue NextChild
|
|
}
|
|
}
|
|
|
|
tasksToCancel = append(tasksToCancel, child)
|
|
}
|
|
}
|
|
wf.tasksMut.RUnlock()
|
|
|
|
wf.cancelTasks(ctx, tasksToCancel)
|
|
}
|
|
|
|
func (wf *Workflow) handleNonTerminalStateChange(ctx context.Context, task *Task, newStatus TaskStatus) {
|
|
// If the task is running, but its contributing time range has been changed
|
|
if newStatus.State == TaskStateRunning && !newStatus.ContributingTimeRange.Timestamp.IsZero() {
|
|
// We need to detect if task's immediate children should be canceled because they can no longer contribute
|
|
// to the state of the running task. We only look at immediate unterminated
|
|
// children, since canceling them will trigger onTaskChange to process indirect children.
|
|
var tasksToCancel []*Task
|
|
|
|
ts := newStatus.ContributingTimeRange.Timestamp
|
|
lessThan := newStatus.ContributingTimeRange.LessThan
|
|
|
|
wf.tasksMut.RLock()
|
|
{
|
|
for _, child := range wf.graph.Children(task) {
|
|
// Ignore children in terminal states.
|
|
if childState := wf.taskStates[child]; childState.Terminal() {
|
|
continue
|
|
}
|
|
|
|
// Ignore if time ranges intersect, so they can contribute
|
|
if lessThan && child.MaxTimeRange.Start.Before(ts) ||
|
|
!lessThan && child.MaxTimeRange.End.After(ts) {
|
|
continue
|
|
}
|
|
|
|
// TODO(spiridonov): We do not check parents here right now, there is only 1 parent now,
|
|
// but in general a task can be canceled only if all its parents are in terminal states OR
|
|
// have non-inersecting contributing time range.
|
|
tasksToCancel = append(tasksToCancel, child)
|
|
shortCircuitsTotal.Inc()
|
|
}
|
|
}
|
|
wf.tasksMut.RUnlock()
|
|
|
|
wf.cancelTasks(ctx, tasksToCancel)
|
|
}
|
|
}
|
|
|
|
func (wf *Workflow) cancelTasks(ctx context.Context, tasks []*Task) {
|
|
// Runners may re-invoke onTaskChange, so we don't want to hold the mutex
|
|
// when calling this.
|
|
if err := wf.runner.Cancel(ctx, tasks...); err != nil {
|
|
level.Warn(wf.logger).Log("msg", "failed to cancel tasks", "err", err)
|
|
}
|
|
}
|
|
|
|
func (wf *Workflow) mergeCapture(capture *xcap.Capture) {
|
|
wf.captureMut.Lock()
|
|
defer wf.captureMut.Unlock()
|
|
|
|
if wf.capture == nil || capture == nil {
|
|
return
|
|
}
|
|
|
|
// Merge all regions from the task's capture into the workflow's capture.
|
|
for _, region := range capture.Regions() {
|
|
wf.capture.AddRegion(region)
|
|
}
|
|
}
|
|
|
|
func (wf *Workflow) mergeResults(results stats.Result) {
|
|
wf.statsMut.Lock()
|
|
defer wf.statsMut.Unlock()
|
|
|
|
wf.stats.Merge(results)
|
|
}
|
|
|
|
type wrappedPipeline struct {
|
|
initOnce sync.Once
|
|
|
|
inner executor.Pipeline
|
|
onClose func()
|
|
}
|
|
|
|
func (p *wrappedPipeline) Read(ctx context.Context) (arrow.RecordBatch, error) {
|
|
return p.inner.Read(ctx)
|
|
}
|
|
|
|
// Close closes the resources of the pipeline.
|
|
func (p *wrappedPipeline) Close() {
|
|
p.inner.Close()
|
|
p.onClose()
|
|
}
|
|
|