Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/engine/internal/workflow/workflow_test.go

542 lines
16 KiB

package workflow
import (
"context"
"errors"
"fmt"
"maps"
"sync"
"testing"
"testing/synctest"
"time"
"github.com/apache/arrow-go/v18/arrow"
"github.com/go-kit/log"
"github.com/oklog/ulid/v2"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical"
"github.com/grafana/loki/v3/pkg/engine/internal/semconv"
"github.com/grafana/loki/v3/pkg/engine/internal/util/dag"
)
// Test performs an end-to-end test of a workflow, asserting that:
//
// 1. All streams get registered before running tasks.
// 2. There is never more than one sender/receiver for a stream.
// 3. The root task has a pipeline listener.
// 4. Closing the pipeline properly cleans up tasks and streams.
//
// Some of these assertions are handled by [fakeRunner], which returns an error
// when used improperly.
func Test(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
var physicalGraph dag.Graph[physical.Node]
var (
scan = physicalGraph.Add(&physical.DataObjScan{})
rangeAgg = physicalGraph.Add(&physical.RangeAggregation{})
vectorAgg = physicalGraph.Add(&physical.VectorAggregation{})
)
_ = physicalGraph.AddEdge(dag.Edge[physical.Node]{Parent: rangeAgg, Child: scan})
_ = physicalGraph.AddEdge(dag.Edge[physical.Node]{Parent: vectorAgg, Child: rangeAgg})
physicalPlan := physical.FromGraph(physicalGraph)
fr := newFakeRunner()
wf, err := New(Options{}, log.NewNopLogger(), "", fr, physicalPlan)
require.NoError(t, err, "workflow should construct properly")
require.NotNil(t, wf.resultsStream, "workflow should have created results stream")
defer func() {
wf.Close()
// Closing the workflow should remove all remaining streams and tasks.
require.Len(t, fr.streams, 0, "all streams should be removed after closing the workflow")
require.Len(t, fr.tasks, 0, "all tasks should be removed after closing the workflow")
}()
defer func() {
if !t.Failed() {
return
}
t.Log("Failing workflow:")
t.Log(Sprint(wf))
}()
// Run returns an error if any of the methods in our fake runner failed.
p, err := wf.Run(t.Context())
require.NoError(t, err, "Workflow should start properly")
defer p.Close()
synctest.Wait()
require.Equal(t, len(wf.taskStates), 2, "workflow should have enqueued two tasks")
rs, ok := fr.streams[wf.resultsStream.ULID]
require.True(t, ok, "results stream should be registered in runner")
require.NotEqual(t, ulid.Zero, rs.Sender, "results stream should have a sender")
require.Equal(t, ulid.Zero, rs.TaskReceiver, "results stream should not have a task receiver")
require.NotNil(t, rs.Listener, "results stream should have a listener")
// Check to make sure all known tasks have been given to the runner.
for _, task := range wf.allTasks() {
_, exist := fr.tasks[task.ULID]
require.True(t, exist, "workflow should give all tasks to runner (task %s is missing)", task.ULID)
}
})
}
// TestCancellation tests that a task entering a terminal state cancels all
// downstream tasks.
func TestCancellation(t *testing.T) {
var physicalGraph dag.Graph[physical.Node]
var (
scan = physicalGraph.Add(&physical.DataObjScan{})
parallelize = physicalGraph.Add(&physical.Parallelize{})
rangeAgg = physicalGraph.Add(&physical.RangeAggregation{})
vectorAgg = physicalGraph.Add(&physical.VectorAggregation{})
)
_ = physicalGraph.AddEdge(dag.Edge[physical.Node]{Parent: parallelize, Child: scan})
_ = physicalGraph.AddEdge(dag.Edge[physical.Node]{Parent: rangeAgg, Child: parallelize})
_ = physicalGraph.AddEdge(dag.Edge[physical.Node]{Parent: vectorAgg, Child: rangeAgg})
physicalPlan := physical.FromGraph(physicalGraph)
terminalStates := []TaskState{TaskStateCancelled, TaskStateCompleted, TaskStateFailed}
for _, state := range terminalStates {
t.Run(state.String(), func(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
fr := newFakeRunner()
wf, err := New(Options{}, log.NewNopLogger(), "", fr, physicalPlan)
require.NoError(t, err, "workflow should construct properly")
require.NotNil(t, wf.resultsStream, "workflow should have created results stream")
defer func() {
if !t.Failed() {
return
}
t.Log("Failing workflow:")
t.Log(Sprint(wf))
}()
// Run returns an error if any of the methods in our fake runner failed.
p, err := wf.Run(t.Context())
require.NoError(t, err, "Workflow should start properly")
defer p.Close()
// Wait for the workflow to register all tasks with the runner.
synctest.Wait()
rootTask, err := wf.graph.Root()
require.NoError(t, err, "should be able to retrieve singular root task")
rt, ok := fr.tasks[rootTask.ULID]
require.True(t, ok, "root task should be registered with runner")
// Notify the workflow that the root task has entered a terminal
// state.
rt.handler(t.Context(), rootTask, TaskStatus{State: state})
// Wait for the workflow to process all status updates.
synctest.Wait()
_ = wf.graph.Walk(rootTask, func(n *Task) error {
if n == rootTask {
return nil
}
require.Equal(t, TaskStateCancelled, wf.taskStates[n], "downstream task %s should be canceled", n.ULID)
return nil
}, dag.PreOrderWalk)
})
})
}
}
// TestShortCircuiting tests that a running task updating its contributing time range cancels irrelevant
// downstream tasks.
func TestShortCircuiting(t *testing.T) {
var physicalGraph dag.Graph[physical.Node]
now := time.Now()
var (
scan = physicalGraph.Add(&physical.ScanSet{
Targets: []*physical.ScanTarget{
{
Type: physical.ScanTypeDataObject,
DataObject: &physical.DataObjScan{
MaxTimeRange: physical.TimeRange{
Start: now.Add(-time.Hour),
End: now,
},
},
},
{
Type: physical.ScanTypeDataObject,
DataObject: &physical.DataObjScan{
MaxTimeRange: physical.TimeRange{
Start: now.Add(-2 * time.Hour),
End: now.Add(-time.Hour),
},
},
},
{
Type: physical.ScanTypeDataObject,
DataObject: &physical.DataObjScan{
MaxTimeRange: physical.TimeRange{
Start: now.Add(-3 * time.Hour),
End: now.Add(-2 * time.Hour),
},
},
},
},
})
parallelize = physicalGraph.Add(&physical.Parallelize{})
topk = physicalGraph.Add(&physical.TopK{
SortBy: &physical.ColumnExpr{Ref: semconv.ColumnIdentTimestamp.ColumnRef()},
Ascending: false,
K: 100,
})
)
_ = physicalGraph.AddEdge(dag.Edge[physical.Node]{Parent: parallelize, Child: scan})
_ = physicalGraph.AddEdge(dag.Edge[physical.Node]{Parent: topk, Child: parallelize})
physicalPlan := physical.FromGraph(physicalGraph)
synctest.Test(t, func(t *testing.T) {
fr := newFakeRunner()
wf, err := New(Options{}, log.NewNopLogger(), "", fr, physicalPlan)
require.NoError(t, err, "workflow should construct properly")
require.NotNil(t, wf.resultsStream, "workflow should have created results stream")
defer func() {
if !t.Failed() {
return
}
t.Log("Failing workflow:")
t.Log(Sprint(wf))
}()
// Run returns an error if any of the methods in our fake runner failed.
p, err := wf.Run(t.Context())
require.NoError(t, err, "Workflow should start properly")
defer p.Close()
// Wait for the workflow to register all tasks with the runner.
synctest.Wait()
rootTask, err := wf.graph.Root()
require.NoError(t, err, "should be able to retrieve singular root task")
rt, ok := fr.tasks[rootTask.ULID]
require.True(t, ok, "root task should be registered with runner")
// Notify the workflow that the root task has updated its contributing time range.
rt.handler(t.Context(), rootTask, TaskStatus{
State: TaskStateRunning,
ContributingTimeRange: ContributingTimeRange{
Timestamp: now.Add(-45 * time.Minute),
LessThan: false,
},
})
// Wait for the workflow to process all status updates.
synctest.Wait()
_ = wf.graph.Walk(rootTask, func(n *Task) error {
// Skip root task
if n == rootTask {
return nil
}
// Skip tasks that have intersecting time range
if n.MaxTimeRange.End.After(now.Add(-45 * time.Minute)) {
return nil
}
// Tasks should be canceled
require.Equal(t, TaskStateCancelled, wf.taskStates[n], "downstream task %s should be canceled", n.ULID)
return nil
}, dag.PreOrderWalk)
})
}
func TestAdmissionControl(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
numScanTasks := 100 // more tasks than the capacity of the token bucket
var physicalGraph dag.Graph[physical.Node]
scanSet := &physical.ScanSet{}
for i := range numScanTasks {
scanSet.Targets = append(scanSet.Targets, &physical.ScanTarget{
Type: physical.ScanTypeDataObject,
DataObject: &physical.DataObjScan{
Section: i,
},
})
}
var (
rangeAgg = physicalGraph.Add(&physical.RangeAggregation{})
parallel = physicalGraph.Add(&physical.Parallelize{})
_ = physicalGraph.Add(scanSet)
)
_ = physicalGraph.AddEdge(dag.Edge[physical.Node]{Parent: rangeAgg, Child: parallel})
_ = physicalGraph.AddEdge(dag.Edge[physical.Node]{Parent: parallel, Child: scanSet})
physicalPlan := physical.FromGraph(physicalGraph)
fr := newFakeRunner()
opts := Options{
MaxRunningScanTasks: 32, // less than numScanTasks
MaxRunningOtherTasks: 0, // unlimited
}
wf, err := New(opts, log.NewNopLogger(), "tenant", fr, physicalPlan)
require.NoError(t, err, "workflow should construct properly")
require.NotNil(t, wf.resultsStream, "workflow should have created results stream")
defer func() {
if !t.Failed() {
return
}
t.Log("Failing workflow:")
t.Log(Sprint(wf))
}()
// Run returns an error if any of the methods in our fake runner failed.
ctx, cancel := context.WithTimeout(t.Context(), 10*time.Second)
t.Cleanup(cancel)
// Run() dispatches task in background goroutine, so it does not block
p, err := wf.Run(ctx)
require.NoError(t, err, "Workflow should start properly")
defer p.Close()
// Wait for first "batch" of scan tests to be enqueued.
synctest.Wait()
require.Equal(t, opts.MaxRunningScanTasks+1, len(wf.taskStates), "expected all tasks up to batch to be enqueued") // 32 scan tasks + 1 other task
// Simulate scan tasks being completed
for _, task := range wf.allTasks() {
if !isScanTask(task) {
continue
}
time.Sleep(10 * time.Millisecond) // need to make sure that we don't "finish" tasks before they are dispatched
wf.onTaskChange(ctx, task, TaskStatus{State: TaskStateCompleted})
}
// Wait for all other tasks to be enqueued.
synctest.Wait()
require.Equal(t, numScanTasks+1, len(wf.taskStates), "expected all tasks up to batch to be enqueued") // 100 scan tasks + 1 other task
})
}
type fakeRunner struct {
streams map[ulid.ULID]*runnerStream
tasks map[ulid.ULID]*runnerTask
tasksMtx sync.RWMutex
}
func newFakeRunner() *fakeRunner {
return &fakeRunner{
streams: make(map[ulid.ULID]*runnerStream),
tasks: make(map[ulid.ULID]*runnerTask),
}
}
func (f *fakeRunner) RegisterManifest(_ context.Context, manifest *Manifest) error {
f.tasksMtx.Lock()
defer f.tasksMtx.Unlock()
var (
manifestStreams = make(map[ulid.ULID]*runnerStream, len(manifest.Streams))
manifestTasks = make(map[ulid.ULID]*runnerTask, len(manifest.Tasks))
)
for _, stream := range manifest.Streams {
if _, exist := f.streams[stream.ULID]; exist {
return fmt.Errorf("stream %s already added", stream.ULID)
} else if _, exist := manifestStreams[stream.ULID]; exist {
return fmt.Errorf("stream %s already added in manifest", stream.ULID)
}
manifestStreams[stream.ULID] = &runnerStream{
Stream: stream,
Handler: manifest.StreamEventHandler,
}
}
for _, task := range manifest.Tasks {
if _, exist := f.tasks[task.ULID]; exist {
return fmt.Errorf("task %s already added", task.ULID)
} else if _, exist := manifestTasks[task.ULID]; exist {
return fmt.Errorf("task %s already added in manifest", task.ULID)
}
for _, streams := range task.Sinks {
for _, stream := range streams {
rs, exist := manifestStreams[stream.ULID]
if !exist {
return fmt.Errorf("sink stream %s not found in manifest", stream.ULID)
} else if rs.Sender != ulid.Zero {
return fmt.Errorf("stream %s already bound to sender %s", stream.ULID, rs.Sender)
}
rs.Sender = task.ULID
}
}
for _, streams := range task.Sources {
for _, stream := range streams {
rs, exist := manifestStreams[stream.ULID]
if !exist {
return fmt.Errorf("source stream %s not found in manifest", stream.ULID)
} else if rs.TaskReceiver != ulid.Zero {
return fmt.Errorf("source stream %s already bound to %s", stream.ULID, rs.TaskReceiver)
} else if rs.Listener != nil {
return fmt.Errorf("source stream %s already bound to local receiver", stream.ULID)
}
rs.TaskReceiver = task.ULID
}
}
manifestTasks[task.ULID] = &runnerTask{
task: task,
handler: manifest.TaskEventHandler,
}
}
// If we got to this point, the manifest is valid and we can copy everything
// over into the map.
maps.Copy(f.streams, manifestStreams)
maps.Copy(f.tasks, manifestTasks)
return nil
}
func (f *fakeRunner) UnregisterManifest(_ context.Context, manifest *Manifest) error {
f.tasksMtx.Lock()
defer f.tasksMtx.Unlock()
// Validate that everything in the manifest was registered.
for _, stream := range manifest.Streams {
if _, exist := f.streams[stream.ULID]; !exist {
return fmt.Errorf("stream %s not found", stream.ULID)
}
}
for _, task := range manifest.Tasks {
if _, exist := f.tasks[task.ULID]; !exist {
return fmt.Errorf("task %s not found", task.ULID)
}
}
for _, stream := range manifest.Streams {
delete(f.streams, stream.ULID)
}
for _, task := range manifest.Tasks {
delete(f.tasks, task.ULID)
}
return nil
}
func (f *fakeRunner) Listen(_ context.Context, writer RecordWriter, stream *Stream) error {
f.tasksMtx.Lock()
defer f.tasksMtx.Unlock()
rs, exist := f.streams[stream.ULID]
if !exist {
return fmt.Errorf("stream %s not found", stream.ULID)
} else if rs.Listener != nil || rs.TaskReceiver != ulid.Zero {
return fmt.Errorf("stream %s already bound", stream.ULID)
}
rs.Listener = writer
return nil
}
func (f *fakeRunner) Start(ctx context.Context, tasks ...*Task) error {
var errs []error
for _, task := range tasks {
f.tasksMtx.Lock()
var (
rt, exist = f.tasks[task.ULID]
)
f.tasksMtx.Unlock()
if !exist {
errs = append(errs, fmt.Errorf("task %s not registered", task.ULID))
continue
}
// Inform handler of task state change.
rt.handler(ctx, task, TaskStatus{State: TaskStatePending})
}
if len(errs) > 0 {
return errors.Join(errs...)
}
return nil
}
func (f *fakeRunner) Cancel(ctx context.Context, tasks ...*Task) error {
var errs []error
for _, task := range tasks {
f.tasksMtx.RLock()
var (
rt, exist = f.tasks[task.ULID]
)
f.tasksMtx.RUnlock()
if !exist {
errs = append(errs, fmt.Errorf("task %s not found", task.ULID))
continue
}
// Inform handler of task state change.
rt.handler(ctx, task, TaskStatus{State: TaskStateCancelled})
}
if len(errs) > 0 {
return errors.Join(errs...)
}
return nil
}
// runnerStream wraps a stream with the edges of the connection. A stream sender
// is always another task, while the receiver is either another task or a
// pipeline owned by the runner, given to the workflow.
type runnerStream struct {
Stream *Stream
Handler StreamEventHandler
TaskReceiver ulid.ULID // Task listening for messages on stream.
Listener RecordWriter // Pipeline listening for messages on stream.
Sender ulid.ULID
}
type runnerTask struct {
task *Task
handler TaskEventHandler
}
type noopPipeline struct{}
func (noopPipeline) Read(ctx context.Context) (arrow.RecordBatch, error) {
<-ctx.Done()
return nil, ctx.Err()
}
func (noopPipeline) Close() {}