Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/engine/internal/workflow/workflow_planner_test.go

346 lines
11 KiB

package workflow
import (
"strings"
"testing"
"time"
"github.com/oklog/ulid/v2"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical"
"github.com/grafana/loki/v3/pkg/engine/internal/semconv"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
"github.com/grafana/loki/v3/pkg/engine/internal/util/dag"
)
// TODO(rfratto): Once physical plans can be serializable, we should be able to
// write these tests using serialized physical plans rather than constructing
// them with the DAG.
//
// That would also allow for writing tests using txtar[^1], which may be easier
// to write and maintain.
//
// [^1]: https://research.swtch.com/testing
func Test_planWorkflow(t *testing.T) {
t.Run("no pipeline breakers", func(t *testing.T) {
ulidGen := ulidGenerator{}
var physicalGraph dag.Graph[physical.Node]
physicalGraph.Add(&physical.DataObjScan{
MaxTimeRange: physical.TimeRange{Start: time.Unix(10, 0).UTC(), End: time.Unix(50, 0).UTC()},
})
physicalPlan := physical.FromGraph(physicalGraph)
graph, err := planWorkflow("", physicalPlan)
require.NoError(t, err)
require.Equal(t, 1, graph.Len())
requireUniqueStreams(t, graph)
generateConsistentULIDs(&ulidGen, graph)
expectOuptut := strings.TrimSpace(`
┌ Task 00000000000000000000000001
│ @max_time_range start=1970-01-01T00:00:10Z end=1970-01-01T00:00:50Z
│ DataObjScan location= streams=0 section_id=0 projections=()
│ └── @max_time_range start=1970-01-01T00:00:10Z end=1970-01-01T00:00:50Z
`)
actualOutput := Sprint(&Workflow{graph: graph})
require.Equal(t, strings.TrimSpace(expectOuptut), strings.TrimSpace(actualOutput))
})
t.Run("ends with one pipeline breaker", func(t *testing.T) {
ulidGen := ulidGenerator{}
var physicalGraph dag.Graph[physical.Node]
var (
scan = physicalGraph.Add(&physical.DataObjScan{
MaxTimeRange: physical.TimeRange{Start: time.Unix(10, 0).UTC(), End: time.Unix(50, 0).UTC()},
})
rangeAgg = physicalGraph.Add(&physical.RangeAggregation{
Start: time.Unix(30, 0).UTC(),
End: time.Unix(45, 0).UTC(),
})
)
_ = physicalGraph.AddEdge(dag.Edge[physical.Node]{Parent: rangeAgg, Child: scan})
physicalPlan := physical.FromGraph(physicalGraph)
graph, err := planWorkflow("", physicalPlan)
require.NoError(t, err)
require.Equal(t, 1, graph.Len())
requireUniqueStreams(t, graph)
generateConsistentULIDs(&ulidGen, graph)
expectOuptut := strings.TrimSpace(`
┌ Task 00000000000000000000000001
│ @max_time_range start=1970-01-01T00:00:30Z end=1970-01-01T00:00:45Z
│ RangeAggregation operation=invalid start=1970-01-01T00:00:30Z end=1970-01-01T00:00:45Z step=0s range=0s group_by=()
│ └── DataObjScan location= streams=0 section_id=0 projections=()
│ └── @max_time_range start=1970-01-01T00:00:10Z end=1970-01-01T00:00:50Z
`)
actualOutput := Sprint(&Workflow{graph: graph})
require.Equal(t, strings.TrimSpace(expectOuptut), strings.TrimSpace(actualOutput))
})
t.Run("split on pipeline breaker", func(t *testing.T) {
ulidGen := ulidGenerator{}
var physicalGraph dag.Graph[physical.Node]
var (
scan = physicalGraph.Add(&physical.DataObjScan{
MaxTimeRange: physical.TimeRange{Start: time.Unix(10, 0).UTC(), End: time.Unix(50, 0).UTC()},
})
rangeAgg = physicalGraph.Add(&physical.RangeAggregation{
Start: time.Unix(30, 0).UTC(),
End: time.Unix(45, 0).UTC(),
})
vectorAgg = physicalGraph.Add(&physical.VectorAggregation{})
)
_ = physicalGraph.AddEdge(dag.Edge[physical.Node]{Parent: rangeAgg, Child: scan})
_ = physicalGraph.AddEdge(dag.Edge[physical.Node]{Parent: vectorAgg, Child: rangeAgg})
physicalPlan := physical.FromGraph(physicalGraph)
graph, err := planWorkflow("", physicalPlan)
require.NoError(t, err)
require.Equal(t, 2, graph.Len())
requireUniqueStreams(t, graph)
generateConsistentULIDs(&ulidGen, graph)
expectOuptut := strings.TrimSpace(`
┌ Task 00000000000000000000000001
│ @max_time_range start=1970-01-01T00:00:30Z end=1970-01-01T00:00:45Z
│ VectorAggregation operation=invalid group_by=()
│ └── @source stream=00000000000000000000000003
┌ Task 00000000000000000000000002
│ @max_time_range start=1970-01-01T00:00:30Z end=1970-01-01T00:00:45Z
│ RangeAggregation operation=invalid start=1970-01-01T00:00:30Z end=1970-01-01T00:00:45Z step=0s range=0s group_by=()
│ │ └── @sink stream=00000000000000000000000003
│ └── DataObjScan location= streams=0 section_id=0 projections=()
│ └── @max_time_range start=1970-01-01T00:00:10Z end=1970-01-01T00:00:50Z
`)
actualOutput := Sprint(&Workflow{graph: graph})
require.Equal(t, strings.TrimSpace(expectOuptut), strings.TrimSpace(actualOutput))
})
t.Run("split on parallelize", func(t *testing.T) {
ulidGen := ulidGenerator{}
var physicalGraph dag.Graph[physical.Node]
var (
vectorAgg = physicalGraph.Add(&physical.VectorAggregation{})
rangeAgg = physicalGraph.Add(&physical.RangeAggregation{
Start: time.Unix(5, 0).UTC(),
End: time.Unix(45, 0).UTC(),
})
parallelize = physicalGraph.Add(&physical.Parallelize{})
filter = physicalGraph.Add(&physical.Filter{})
project = physicalGraph.Add(
&physical.Projection{
Expressions: []physical.Expression{
&physical.VariadicExpr{
Op: types.VariadicOpParseLogfmt,
Expressions: []physical.Expression{
&physical.ColumnExpr{
Ref: semconv.ColumnIdentMessage.ColumnRef(),
},
},
},
},
All: true,
Expand: true,
},
)
scanSet = physicalGraph.Add(&physical.ScanSet{
Targets: []*physical.ScanTarget{
{Type: physical.ScanTypeDataObject, DataObject: &physical.DataObjScan{
Location: "a",
MaxTimeRange: physical.TimeRange{Start: time.Unix(10, 0).UTC(), End: time.Unix(50, 0).UTC()}},
},
{Type: physical.ScanTypeDataObject, DataObject: &physical.DataObjScan{
Location: "b",
MaxTimeRange: physical.TimeRange{Start: time.Unix(20, 0).UTC(), End: time.Unix(60, 0).UTC()}},
},
{Type: physical.ScanTypeDataObject, DataObject: &physical.DataObjScan{
Location: "c",
MaxTimeRange: physical.TimeRange{Start: time.Unix(0, 0).UTC(), End: time.Unix(50, 0).UTC()}},
},
},
})
)
_ = physicalGraph.AddEdge(dag.Edge[physical.Node]{Parent: vectorAgg, Child: rangeAgg})
_ = physicalGraph.AddEdge(dag.Edge[physical.Node]{Parent: rangeAgg, Child: parallelize})
_ = physicalGraph.AddEdge(dag.Edge[physical.Node]{Parent: parallelize, Child: filter})
_ = physicalGraph.AddEdge(dag.Edge[physical.Node]{Parent: filter, Child: project})
_ = physicalGraph.AddEdge(dag.Edge[physical.Node]{Parent: project, Child: scanSet})
physicalPlan := physical.FromGraph(physicalGraph)
graph, err := planWorkflow("", physicalPlan)
require.NoError(t, err)
require.Equal(t, 5, graph.Len())
requireUniqueStreams(t, graph)
generateConsistentULIDs(&ulidGen, graph)
expectOuptut := strings.TrimSpace(`
┌ Task 00000000000000000000000001
│ @max_time_range start=1970-01-01T00:00:05Z end=1970-01-01T00:00:45Z
│ VectorAggregation operation=invalid group_by=()
│ └── @source stream=00000000000000000000000006
┌ Task 00000000000000000000000002
│ @max_time_range start=1970-01-01T00:00:05Z end=1970-01-01T00:00:45Z
│ RangeAggregation operation=invalid start=1970-01-01T00:00:05Z end=1970-01-01T00:00:45Z step=0s range=0s group_by=()
│ ├── @source stream=00000000000000000000000007
│ ├── @source stream=00000000000000000000000008
│ ├── @source stream=00000000000000000000000009
│ └── @sink stream=00000000000000000000000006
┌ Task 00000000000000000000000003
│ @max_time_range start=1970-01-01T00:00:10Z end=1970-01-01T00:00:50Z
│ Filter
│ │ └── @sink stream=00000000000000000000000007
│ └── Projection all=true expand=(PARSE_LOGFMT(builtin.message))
│ └── DataObjScan location=a streams=0 section_id=0 projections=()
│ └── @max_time_range start=1970-01-01T00:00:10Z end=1970-01-01T00:00:50Z
┌ Task 00000000000000000000000004
│ @max_time_range start=1970-01-01T00:00:20Z end=1970-01-01T00:01:00Z
│ Filter
│ │ └── @sink stream=00000000000000000000000008
│ └── Projection all=true expand=(PARSE_LOGFMT(builtin.message))
│ └── DataObjScan location=b streams=0 section_id=0 projections=()
│ └── @max_time_range start=1970-01-01T00:00:20Z end=1970-01-01T00:01:00Z
┌ Task 00000000000000000000000005
│ @max_time_range start=1970-01-01T00:00:00Z end=1970-01-01T00:00:50Z
│ Filter
│ │ └── @sink stream=00000000000000000000000009
│ └── Projection all=true expand=(PARSE_LOGFMT(builtin.message))
│ └── DataObjScan location=c streams=0 section_id=0 projections=()
│ └── @max_time_range start=1970-01-01T00:00:00Z end=1970-01-01T00:00:50Z
`)
actualOutput := Sprint(&Workflow{graph: graph})
require.Equal(t, strings.TrimSpace(expectOuptut), strings.TrimSpace(actualOutput))
})
}
// requireUniqueStreams asserts that for each stream found in g, that stream has
// exactly one reader (from Task.Sources) and exactly one writer (from
// Task.Sinks).
func requireUniqueStreams(t testing.TB, g dag.Graph[*Task]) {
t.Helper()
type streamInfo struct {
Reader physical.Node
Writer physical.Node
}
streamInfos := make(map[*Stream]*streamInfo)
for _, root := range g.Roots() {
_ = g.Walk(root, func(task *Task) error {
for node, streams := range task.Sources {
for _, stream := range streams {
info, ok := streamInfos[stream]
if !ok {
info = new(streamInfo)
streamInfos[stream] = info
}
require.Nil(t, info.Reader, "each stream must have exactly one reader")
info.Reader = node
}
}
for node, streams := range task.Sinks {
for _, stream := range streams {
info, ok := streamInfos[stream]
if !ok {
info = new(streamInfo)
streamInfos[stream] = info
}
require.Nil(t, info.Writer, "each stream must have exactly one writer")
info.Writer = node
}
}
return nil
}, dag.PreOrderWalk)
}
for _, info := range streamInfos {
require.NotNil(t, info.Reader, "each stream must have exactly one reader")
require.NotNil(t, info.Writer, "each stream must have exactly one writer")
}
}
// generateConsistentULIDs reassigns ULIDs to all tasks and streams in the graph
// using the ULID generator for consistent output.
func generateConsistentULIDs(gen *ulidGenerator, g dag.Graph[*Task]) {
for _, root := range g.Roots() {
_ = g.Walk(root, func(task *Task) error {
task.ULID = gen.Make()
return nil
}, dag.PreOrderWalk)
}
// For easier debugging, we do a second pass for all the streams. That way
// we know stream ULIDs start after task ULIDs.
for _, root := range g.Roots() {
_ = g.Walk(root, func(task *Task) error {
for _, streams := range task.Sources {
for _, stream := range streams {
stream.ULID = gen.Make()
}
}
return nil
}, dag.PreOrderWalk)
}
}
type ulidGenerator struct {
lastCounter uint64
}
func (g *ulidGenerator) Make() ulid.ULID {
g.lastCounter++
value := g.lastCounter
// Manually set ULID bytes - counter in the last 8 bytes, zeros in first 8
var ulidBytes [16]byte
// Put the counter value in the last 8 bytes (big-endian)
for i := range 8 {
ulidBytes[15-i] = byte(value >> (8 * i))
}
return ulid.ULID(ulidBytes)
}