Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/engine/internal/workflow/runner.go

196 lines
6.1 KiB

package workflow
import (
"context"
"fmt"
"time"
"github.com/apache/arrow-go/v18/arrow"
"github.com/grafana/loki/v3/pkg/logqlmodel/stats"
"github.com/grafana/loki/v3/pkg/xcap"
)
// A Manifest is a collection of related Tasks and Streams. A manifest is given
// to a [Runner] before tasks can run.
type Manifest struct {
// Streams are the collection of streams within a manifest.
Streams []*Stream
// Tasks are the collection of Tasks within a manifest. Tasks only reference
// Streams within the same manifest.
Tasks []*Task
StreamEventHandler StreamEventHandler // Handler for stream events.
TaskEventHandler TaskEventHandler // Handler for task events.
}
// A RecordWriter is used to write records to a stream.
type RecordWriter interface {
// Write writes a new record to the stream.
Write(ctx context.Context, record arrow.RecordBatch) error
}
// A Runner can asynchronously execute a workflow.
type Runner interface {
// RegisterManifest registers a Manifest to use with the runner. Registering
// a manifest records all of the streams and tasks inside of it for use.
//
// Tasks within the manifest must only reference streams within the same
// manifest.
//
// RegisterManifest returns an error if a stream or task is already
// associated with a different manifest.
//
// The handlers defined by the manifest will be invoked whenever the
// corresponding resources of the manifest change state.
RegisterManifest(ctx context.Context, manifest *Manifest) error
// UnregisterManifest unregisters a Manifest from the runner. Unregistering
// a manifest forcibly cancels any tasks associated with it.
//
// UnregisterManifest returns an error if the manifest is not registered or
// if it contains unregistered streams or tasks.
UnregisterManifest(ctx context.Context, manifest *Manifest) error
// Listen binds the stream to write to the provided writer. Listen returns
// an error if the stream was not defined in a registered manifest, or if a
// task is already bound to the stream as a reader.
Listen(ctx context.Context, writer RecordWriter, stream *Stream) error
// Start begins executing the provided tasks in the background. Start
// returns an error if any of the Tasks were not registered through a
// manifest.
//
// The provided context is used for the lifetime of the Start call. To
// cancel started tasks, use [Runner.Cancel].
Start(ctx context.Context, tasks ...*Task) error
// Cancel requests cancellation of the specified tasks. Cancel returns an
// error if any of the tasks were not found.
Cancel(ctx context.Context, tasks ...*Task) error
}
// StreamEventHandler is a function that handles events for changed streams.
type StreamEventHandler func(ctx context.Context, s *Stream, newState StreamState)
// StreamState represents the state of a stream. It is sent as an event by a
// [Runner] whenever a stream associated with a task changes its state.
//
// The zero value of StreamState is an inactive stream.
type StreamState int
const (
// StreamStateIdle represents a stream that is waiting for both the sender
// and receiver to be available.
StreamStateIdle StreamState = iota
// StreamStateOpen represents a stream that is open and transmitting data.
StreamStateOpen
// StreamStateBlocked represents a stream that is blocked (by backpressure)
// on sending data.
StreamStateBlocked
// StreamStateClosed represents a stream that is closed and no longer
// transmitting data.
StreamStateClosed
)
var streamStates = [...]string{
"Idle",
"Open",
"Blocked",
"Closed",
}
// String returns a string representation of the StreamState.
func (s StreamState) String() string {
if s >= 0 && int(s) < len(streamStates) {
return streamStates[s]
}
return fmt.Sprintf("StreamState(%d)", s)
}
// TaskEventHandler is a function that handles events for changed tasks.
type TaskEventHandler func(ctx context.Context, t *Task, newStatus TaskStatus)
// TaskStatus holds the state of a task and additional information about that
// state.
type TaskStatus struct {
State TaskState
// Error holds the error that occurred during task execution, if any. Only
// set when State is [TaskStateFailed].
Error error
// Capture contains observations about the execution of the task.
Capture *xcap.Capture
// Statistics report analytics about the lifetime of a task. Only set
// for terminal task states (see [TaskState.Terminal]).
Statistics *stats.Result
// ContributingTimeRange of a running task. Only set for non-terminal states.
ContributingTimeRange ContributingTimeRange
}
// ContributingTimeRange represents a time range of input data that can change the
// current state of a running task. Anything outside of this range can not meaningfully
// contribute to the task state.
type ContributingTimeRange struct {
// End of the range
Timestamp time.Time
// Less than Timestamp
LessThan bool
}
// TaskState represents the state of a Task. It is sent as an event by a
// [Runner] whenever a Task associated with a task changes its state.
type TaskState int
const (
// TaskStateCreated represents the initial state for a Task, where it has
// been created but not given to a [Runner].
TaskStateCreated TaskState = iota
// TaskStatePending represents a Task that is pending execution by a
// [Runner].
TaskStatePending
// TaskStateRunning represents a Task that is currently being executed by a
// [Runner].
TaskStateRunning
// TaskStateCompleted represents a Task that has completed successfully.
TaskStateCompleted
// TaskStateCancelled represents a Task that has been cancelled, either by a
// [Runner] or the owning [Workflow].
TaskStateCancelled
// TaskStateFailed represents a Task that has failed during execution.
TaskStateFailed
)
var TaskStates = [...]string{
"Created",
"Pending",
"Running",
"Completed",
"Cancelled",
"Failed",
}
// Terminal returns true if the TaskState is terminal.
func (s TaskState) Terminal() bool {
return s == TaskStateCompleted || s == TaskStateCancelled || s == TaskStateFailed
}
// String returns a string representation of the TaskState.
func (s TaskState) String() string {
if s >= 0 && int(s) < len(TaskStates) {
return TaskStates[s]
}
return fmt.Sprintf("TaskState(%d)", s)
}