mirror of https://github.com/grafana/loki
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1308 lines
58 KiB
1308 lines
58 KiB
package workflow
|
|
|
|
import (
|
|
"context"
|
|
"math"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/go-kit/log"
|
|
"github.com/oklog/ulid/v2"
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"github.com/grafana/loki/v3/pkg/engine/internal/executor"
|
|
"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical"
|
|
"github.com/grafana/loki/v3/pkg/engine/internal/semconv"
|
|
"github.com/grafana/loki/v3/pkg/engine/internal/types"
|
|
"github.com/grafana/loki/v3/pkg/engine/internal/util/dag"
|
|
"github.com/grafana/loki/v3/pkg/logqlmodel/stats"
|
|
"github.com/grafana/loki/v3/pkg/storage/chunk/cache"
|
|
)
|
|
|
|
// TODO(rfratto): Once physical plans can be serializable, we should be able to
|
|
// write these tests using serialized physical plans rather than constructing
|
|
// them with the DAG.
|
|
//
|
|
// That would also allow for writing tests using txtar[^1], which may be easier
|
|
// to write and maintain.
|
|
//
|
|
// [^1]: https://research.swtch.com/testing
|
|
|
|
func Test_planWorkflow(t *testing.T) {
|
|
t.Run("no pipeline breakers", func(t *testing.T) {
|
|
ulidGen := ulidGenerator{}
|
|
|
|
var physicalGraph dag.Graph[physical.Node]
|
|
physicalGraph.Add(&physical.DataObjScan{
|
|
MaxTimeRange: physical.TimeRange{Start: time.Unix(10, 0).UTC(), End: time.Unix(50, 0).UTC()},
|
|
})
|
|
|
|
physicalPlan := physical.FromGraph(physicalGraph)
|
|
|
|
graph, err := planWorkflow("", physicalPlan, cacheParams{}, log.NewNopLogger())
|
|
require.NoError(t, err)
|
|
require.Equal(t, 1, graph.Len())
|
|
requireUniqueStreams(t, graph)
|
|
generateConsistentULIDs(&ulidGen, graph)
|
|
|
|
expectOuptut := strings.TrimSpace(`
|
|
┌ Task 00000000000000000000000001
|
|
│ @max_time_range start=1970-01-01T00:00:10Z end=1970-01-01T00:00:50Z
|
|
│
|
|
│ DataObjScan location= streams=0 section_id=0 projections=()
|
|
│ └── @max_time_range start=1970-01-01T00:00:10Z end=1970-01-01T00:00:50Z
|
|
└
|
|
`)
|
|
|
|
actualOutput := Sprint(&Workflow{graph: graph})
|
|
require.Equal(t, strings.TrimSpace(expectOuptut), strings.TrimSpace(actualOutput))
|
|
})
|
|
|
|
t.Run("ends with one pipeline breaker", func(t *testing.T) {
|
|
ulidGen := ulidGenerator{}
|
|
|
|
var physicalGraph dag.Graph[physical.Node]
|
|
|
|
var (
|
|
scan = physicalGraph.Add(&physical.DataObjScan{
|
|
MaxTimeRange: physical.TimeRange{Start: time.Unix(10, 0).UTC(), End: time.Unix(50, 0).UTC()},
|
|
})
|
|
rangeAgg = physicalGraph.Add(&physical.RangeAggregation{
|
|
Start: time.Unix(30, 0).UTC(),
|
|
End: time.Unix(45, 0).UTC(),
|
|
})
|
|
)
|
|
|
|
_ = physicalGraph.AddEdge(dag.Edge[physical.Node]{Parent: rangeAgg, Child: scan})
|
|
|
|
physicalPlan := physical.FromGraph(physicalGraph)
|
|
|
|
graph, err := planWorkflow("", physicalPlan, cacheParams{}, log.NewNopLogger())
|
|
require.NoError(t, err)
|
|
require.Equal(t, 1, graph.Len())
|
|
requireUniqueStreams(t, graph)
|
|
generateConsistentULIDs(&ulidGen, graph)
|
|
|
|
expectOuptut := strings.TrimSpace(`
|
|
┌ Task 00000000000000000000000001
|
|
│ @max_time_range start=1970-01-01T00:00:30Z end=1970-01-01T00:00:45Z
|
|
│
|
|
│ RangeAggregation operation=invalid start=1970-01-01T00:00:30Z end=1970-01-01T00:00:45Z step=0s range=0s group_by=()
|
|
│ └── DataObjScan location= streams=0 section_id=0 projections=()
|
|
│ └── @max_time_range start=1970-01-01T00:00:10Z end=1970-01-01T00:00:50Z
|
|
└
|
|
`)
|
|
|
|
actualOutput := Sprint(&Workflow{graph: graph})
|
|
require.Equal(t, strings.TrimSpace(expectOuptut), strings.TrimSpace(actualOutput))
|
|
})
|
|
|
|
t.Run("split on pipeline breaker", func(t *testing.T) {
|
|
ulidGen := ulidGenerator{}
|
|
|
|
var physicalGraph dag.Graph[physical.Node]
|
|
|
|
var (
|
|
scan = physicalGraph.Add(&physical.DataObjScan{
|
|
MaxTimeRange: physical.TimeRange{Start: time.Unix(10, 0).UTC(), End: time.Unix(50, 0).UTC()},
|
|
})
|
|
rangeAgg = physicalGraph.Add(&physical.RangeAggregation{
|
|
Start: time.Unix(30, 0).UTC(),
|
|
End: time.Unix(45, 0).UTC(),
|
|
})
|
|
vectorAgg = physicalGraph.Add(&physical.VectorAggregation{})
|
|
)
|
|
|
|
_ = physicalGraph.AddEdge(dag.Edge[physical.Node]{Parent: rangeAgg, Child: scan})
|
|
_ = physicalGraph.AddEdge(dag.Edge[physical.Node]{Parent: vectorAgg, Child: rangeAgg})
|
|
|
|
physicalPlan := physical.FromGraph(physicalGraph)
|
|
physicalPlan, err := physical.WrapWithBatching(physicalPlan, 500)
|
|
require.NoError(t, err)
|
|
|
|
graph, err := planWorkflow("", physicalPlan, cacheParams{}, log.NewNopLogger())
|
|
require.NoError(t, err)
|
|
require.Equal(t, 2, graph.Len())
|
|
requireUniqueStreams(t, graph)
|
|
generateConsistentULIDs(&ulidGen, graph)
|
|
|
|
expectOuptut := strings.TrimSpace(`
|
|
┌ Task 00000000000000000000000001
|
|
│ @max_time_range start=1970-01-01T00:00:30Z end=1970-01-01T00:00:45Z
|
|
│
|
|
│ Batching batch_size=500
|
|
│ └── VectorAggregation operation=invalid group_by=()
|
|
│ └── @source stream=00000000000000000000000003
|
|
└
|
|
┌ Task 00000000000000000000000002
|
|
│ @max_time_range start=1970-01-01T00:00:30Z end=1970-01-01T00:00:45Z
|
|
│
|
|
│ Batching batch_size=500
|
|
│ │ └── @sink stream=00000000000000000000000003
|
|
│ └── RangeAggregation operation=invalid start=1970-01-01T00:00:30Z end=1970-01-01T00:00:45Z step=0s range=0s group_by=()
|
|
│ └── DataObjScan location= streams=0 section_id=0 projections=()
|
|
│ └── @max_time_range start=1970-01-01T00:00:10Z end=1970-01-01T00:00:50Z
|
|
└
|
|
`)
|
|
|
|
actualOutput := Sprint(&Workflow{graph: graph})
|
|
require.Equal(t, strings.TrimSpace(expectOuptut), strings.TrimSpace(actualOutput))
|
|
|
|
t.Run("with caching", func(t *testing.T) {
|
|
ulidGen := ulidGenerator{}
|
|
|
|
graph, err := planWorkflow("", physicalPlan, cacheParams{
|
|
enabled: true,
|
|
taskCacheMaxSizeBytes: 1 * 1024 * 1024,
|
|
dataObjScanMaxSizeBytes: 0,
|
|
}, log.NewNopLogger())
|
|
require.NoError(t, err)
|
|
require.Equal(t, 2, graph.Len())
|
|
requireUniqueStreams(t, graph)
|
|
generateConsistentULIDs(&ulidGen, graph)
|
|
|
|
expectOutput := strings.TrimSpace(`
|
|
┌ Task 00000000000000000000000001
|
|
│ @max_time_range start=1970-01-01T00:00:30Z end=1970-01-01T00:00:45Z
|
|
│
|
|
│ Batching batch_size=500
|
|
│ └── VectorAggregation operation=invalid group_by=()
|
|
│ └── @source stream=00000000000000000000000003
|
|
└
|
|
┌ Task 00000000000000000000000002
|
|
│ @max_time_range start=1970-01-01T00:00:30Z end=1970-01-01T00:00:45Z
|
|
│
|
|
│ Cache max_cacheable_size=1.0 MiB hashed_key=0dbee591f4bc143d key= |>>| Batching |>>| RangeAggregation{operation=invalid,start=1970-01-01T00:00:30Z,end=1970-01-01T00:00:45Z,step=0s,range=0s,grouping=by=[],max_series=0} |>>| DataObjScan{location=,section=0,stream_ids=[],projections=[],predicates=[],max_time_range_start=1970-01-01T00:00:10Z,max_time_range_end=1970-01-01T00:00:50Z}
|
|
│ │ └── @sink stream=00000000000000000000000003
|
|
│ └── Batching batch_size=500
|
|
│ └── RangeAggregation operation=invalid start=1970-01-01T00:00:30Z end=1970-01-01T00:00:45Z step=0s range=0s group_by=()
|
|
│ └── Cache max_cacheable_size=0 B hashed_key=309bb327739e663a key= |>>| DataObjScan{location=,section=0,stream_ids=[],projections=[],predicates=[],max_time_range_start=1970-01-01T00:00:10Z,max_time_range_end=1970-01-01T00:00:50Z}
|
|
│ └── DataObjScan location= streams=0 section_id=0 projections=()
|
|
│ └── @max_time_range start=1970-01-01T00:00:10Z end=1970-01-01T00:00:50Z
|
|
└
|
|
`)
|
|
|
|
actualOutput := Sprint(&Workflow{graph: graph})
|
|
require.Equal(t, strings.TrimSpace(expectOutput), strings.TrimSpace(actualOutput))
|
|
})
|
|
})
|
|
|
|
t.Run("split on parallelize", func(t *testing.T) {
|
|
ulidGen := ulidGenerator{}
|
|
|
|
var physicalGraph dag.Graph[physical.Node]
|
|
|
|
var (
|
|
vectorAgg = physicalGraph.Add(&physical.VectorAggregation{})
|
|
rangeAgg = physicalGraph.Add(&physical.RangeAggregation{
|
|
Start: time.Unix(5, 0).UTC(),
|
|
End: time.Unix(45, 0).UTC(),
|
|
})
|
|
|
|
parallelize = physicalGraph.Add(&physical.Parallelize{})
|
|
|
|
filter = physicalGraph.Add(&physical.Filter{})
|
|
project = physicalGraph.Add(
|
|
&physical.Projection{
|
|
Expressions: []physical.Expression{
|
|
&physical.VariadicExpr{
|
|
Op: types.VariadicOpParseLogfmt,
|
|
Expressions: []physical.Expression{
|
|
&physical.ColumnExpr{
|
|
Ref: semconv.ColumnIdentMessage.ColumnRef(),
|
|
},
|
|
},
|
|
},
|
|
},
|
|
All: true,
|
|
Expand: true,
|
|
},
|
|
)
|
|
scanSet = physicalGraph.Add(&physical.ScanSet{
|
|
Targets: []*physical.ScanTarget{
|
|
{
|
|
Type: physical.ScanTypeDataObject, DataObject: &physical.DataObjScan{
|
|
Location: "a",
|
|
MaxTimeRange: physical.TimeRange{Start: time.Unix(10, 0).UTC(), End: time.Unix(50, 0).UTC()},
|
|
},
|
|
},
|
|
{
|
|
Type: physical.ScanTypeDataObject, DataObject: &physical.DataObjScan{
|
|
Location: "b",
|
|
MaxTimeRange: physical.TimeRange{Start: time.Unix(20, 0).UTC(), End: time.Unix(60, 0).UTC()},
|
|
},
|
|
},
|
|
{
|
|
Type: physical.ScanTypeDataObject, DataObject: &physical.DataObjScan{
|
|
Location: "c",
|
|
MaxTimeRange: physical.TimeRange{Start: time.Unix(0, 0).UTC(), End: time.Unix(50, 0).UTC()},
|
|
},
|
|
},
|
|
},
|
|
})
|
|
)
|
|
|
|
_ = physicalGraph.AddEdge(dag.Edge[physical.Node]{Parent: vectorAgg, Child: rangeAgg})
|
|
_ = physicalGraph.AddEdge(dag.Edge[physical.Node]{Parent: rangeAgg, Child: parallelize})
|
|
_ = physicalGraph.AddEdge(dag.Edge[physical.Node]{Parent: parallelize, Child: filter})
|
|
_ = physicalGraph.AddEdge(dag.Edge[physical.Node]{Parent: filter, Child: project})
|
|
_ = physicalGraph.AddEdge(dag.Edge[physical.Node]{Parent: project, Child: scanSet})
|
|
|
|
physicalPlan := physical.FromGraph(physicalGraph)
|
|
physicalPlan, err := physical.WrapWithBatching(physicalPlan, 500)
|
|
require.NoError(t, err)
|
|
|
|
graph, err := planWorkflow("", physicalPlan, cacheParams{}, log.NewNopLogger())
|
|
require.NoError(t, err)
|
|
require.Equal(t, 5, graph.Len())
|
|
requireUniqueStreams(t, graph)
|
|
generateConsistentULIDs(&ulidGen, graph)
|
|
|
|
expectOuptut := strings.TrimSpace(`
|
|
┌ Task 00000000000000000000000001
|
|
│ @max_time_range start=1970-01-01T00:00:05Z end=1970-01-01T00:00:45Z
|
|
│
|
|
│ Batching batch_size=500
|
|
│ └── VectorAggregation operation=invalid group_by=()
|
|
│ └── @source stream=00000000000000000000000006
|
|
└
|
|
┌ Task 00000000000000000000000002
|
|
│ @max_time_range start=1970-01-01T00:00:05Z end=1970-01-01T00:00:45Z
|
|
│
|
|
│ Batching batch_size=500
|
|
│ │ └── @sink stream=00000000000000000000000006
|
|
│ └── RangeAggregation operation=invalid start=1970-01-01T00:00:05Z end=1970-01-01T00:00:45Z step=0s range=0s group_by=()
|
|
│ ├── @source stream=00000000000000000000000007
|
|
│ ├── @source stream=00000000000000000000000008
|
|
│ └── @source stream=00000000000000000000000009
|
|
└
|
|
┌ Task 00000000000000000000000003
|
|
│ @max_time_range start=1970-01-01T00:00:10Z end=1970-01-01T00:00:50Z
|
|
│
|
|
│ Batching batch_size=500
|
|
│ │ └── @sink stream=00000000000000000000000007
|
|
│ └── Filter
|
|
│ └── Projection all=true expand=(PARSE_LOGFMT(builtin.message))
|
|
│ └── DataObjScan location=a streams=0 section_id=0 projections=()
|
|
│ └── @max_time_range start=1970-01-01T00:00:10Z end=1970-01-01T00:00:50Z
|
|
└
|
|
┌ Task 00000000000000000000000004
|
|
│ @max_time_range start=1970-01-01T00:00:20Z end=1970-01-01T00:01:00Z
|
|
│
|
|
│ Batching batch_size=500
|
|
│ │ └── @sink stream=00000000000000000000000008
|
|
│ └── Filter
|
|
│ └── Projection all=true expand=(PARSE_LOGFMT(builtin.message))
|
|
│ └── DataObjScan location=b streams=0 section_id=0 projections=()
|
|
│ └── @max_time_range start=1970-01-01T00:00:20Z end=1970-01-01T00:01:00Z
|
|
└
|
|
┌ Task 00000000000000000000000005
|
|
│ @max_time_range start=1970-01-01T00:00:00Z end=1970-01-01T00:00:50Z
|
|
│
|
|
│ Batching batch_size=500
|
|
│ │ └── @sink stream=00000000000000000000000009
|
|
│ └── Filter
|
|
│ └── Projection all=true expand=(PARSE_LOGFMT(builtin.message))
|
|
│ └── DataObjScan location=c streams=0 section_id=0 projections=()
|
|
│ └── @max_time_range start=1970-01-01T00:00:00Z end=1970-01-01T00:00:50Z
|
|
└
|
|
`)
|
|
|
|
actualOutput := Sprint(&Workflow{graph: graph})
|
|
require.Equal(t, strings.TrimSpace(expectOuptut), strings.TrimSpace(actualOutput))
|
|
|
|
t.Run("with caching", func(t *testing.T) {
|
|
ulidGen := ulidGenerator{}
|
|
|
|
graph, err := planWorkflow("", physicalPlan, cacheParams{
|
|
enabled: true,
|
|
taskCacheMaxSizeBytes: 1 * 1024 * 1024,
|
|
dataObjScanMaxSizeBytes: 0,
|
|
}, log.NewNopLogger())
|
|
require.NoError(t, err)
|
|
require.Equal(t, 5, graph.Len())
|
|
requireUniqueStreams(t, graph)
|
|
generateConsistentULIDs(&ulidGen, graph)
|
|
|
|
expectOutput := strings.TrimSpace(`
|
|
┌ Task 00000000000000000000000001
|
|
│ @max_time_range start=1970-01-01T00:00:05Z end=1970-01-01T00:00:45Z
|
|
│
|
|
│ Batching batch_size=500
|
|
│ └── VectorAggregation operation=invalid group_by=()
|
|
│ └── @source stream=00000000000000000000000006
|
|
└
|
|
┌ Task 00000000000000000000000002
|
|
│ @max_time_range start=1970-01-01T00:00:05Z end=1970-01-01T00:00:45Z
|
|
│
|
|
│ Batching batch_size=500
|
|
│ │ └── @sink stream=00000000000000000000000006
|
|
│ └── RangeAggregation operation=invalid start=1970-01-01T00:00:05Z end=1970-01-01T00:00:45Z step=0s range=0s group_by=()
|
|
│ ├── @source stream=00000000000000000000000007
|
|
│ ├── @source stream=00000000000000000000000008
|
|
│ └── @source stream=00000000000000000000000009
|
|
└
|
|
┌ Task 00000000000000000000000003
|
|
│ @max_time_range start=1970-01-01T00:00:10Z end=1970-01-01T00:00:50Z
|
|
│
|
|
│ Cache max_cacheable_size=1.0 MiB hashed_key=c993ba951c1c73ef key= |>>| Batching |>>| Filter{predicates=[]} |>>| Projection{all=true,mode=expand,expressions=[PARSE_LOGFMT(builtin.message)]} |>>| DataObjScan{location=a,section=0,stream_ids=[],projections=[],predicates=[],max_time_range_start=1970-01-01T00:00:10Z,max_time_range_end=1970-01-01T00:00:50Z}
|
|
│ │ └── @sink stream=00000000000000000000000007
|
|
│ └── Batching batch_size=500
|
|
│ └── Filter
|
|
│ └── Projection all=true expand=(PARSE_LOGFMT(builtin.message))
|
|
│ └── Cache max_cacheable_size=0 B hashed_key=4a6082f4d1b54ead key= |>>| DataObjScan{location=a,section=0,stream_ids=[],projections=[],predicates=[],max_time_range_start=1970-01-01T00:00:10Z,max_time_range_end=1970-01-01T00:00:50Z}
|
|
│ └── DataObjScan location=a streams=0 section_id=0 projections=()
|
|
│ └── @max_time_range start=1970-01-01T00:00:10Z end=1970-01-01T00:00:50Z
|
|
└
|
|
┌ Task 00000000000000000000000004
|
|
│ @max_time_range start=1970-01-01T00:00:20Z end=1970-01-01T00:01:00Z
|
|
│
|
|
│ Cache max_cacheable_size=1.0 MiB hashed_key=9332cb1201f497a1 key= |>>| Batching |>>| Filter{predicates=[]} |>>| Projection{all=true,mode=expand,expressions=[PARSE_LOGFMT(builtin.message)]} |>>| DataObjScan{location=b,section=0,stream_ids=[],projections=[],predicates=[],max_time_range_start=1970-01-01T00:00:20Z,max_time_range_end=1970-01-01T00:01:00Z}
|
|
│ │ └── @sink stream=00000000000000000000000008
|
|
│ └── Batching batch_size=500
|
|
│ └── Filter
|
|
│ └── Projection all=true expand=(PARSE_LOGFMT(builtin.message))
|
|
│ └── Cache max_cacheable_size=0 B hashed_key=57460db08f81adaf key= |>>| DataObjScan{location=b,section=0,stream_ids=[],projections=[],predicates=[],max_time_range_start=1970-01-01T00:00:20Z,max_time_range_end=1970-01-01T00:01:00Z}
|
|
│ └── DataObjScan location=b streams=0 section_id=0 projections=()
|
|
│ └── @max_time_range start=1970-01-01T00:00:20Z end=1970-01-01T00:01:00Z
|
|
└
|
|
┌ Task 00000000000000000000000005
|
|
│ @max_time_range start=1970-01-01T00:00:00Z end=1970-01-01T00:00:50Z
|
|
│
|
|
│ Cache max_cacheable_size=1.0 MiB hashed_key=aea367473a8b42dc key= |>>| Batching |>>| Filter{predicates=[]} |>>| Projection{all=true,mode=expand,expressions=[PARSE_LOGFMT(builtin.message)]} |>>| DataObjScan{location=c,section=0,stream_ids=[],projections=[],predicates=[],max_time_range_start=1970-01-01T00:00:00Z,max_time_range_end=1970-01-01T00:00:50Z}
|
|
│ │ └── @sink stream=00000000000000000000000009
|
|
│ └── Batching batch_size=500
|
|
│ └── Filter
|
|
│ └── Projection all=true expand=(PARSE_LOGFMT(builtin.message))
|
|
│ └── Cache max_cacheable_size=0 B hashed_key=61a13f0daaf5f8fe key= |>>| DataObjScan{location=c,section=0,stream_ids=[],projections=[],predicates=[],max_time_range_start=1970-01-01T00:00:00Z,max_time_range_end=1970-01-01T00:00:50Z}
|
|
│ └── DataObjScan location=c streams=0 section_id=0 projections=()
|
|
│ └── @max_time_range start=1970-01-01T00:00:00Z end=1970-01-01T00:00:50Z
|
|
└
|
|
`)
|
|
|
|
actualOutput := Sprint(&Workflow{graph: graph})
|
|
require.Equal(t, strings.TrimSpace(expectOutput), strings.TrimSpace(actualOutput))
|
|
})
|
|
})
|
|
|
|
t.Run("split vector aggregation parallelism", func(t *testing.T) {
|
|
ulidGen := ulidGenerator{}
|
|
|
|
var (
|
|
physicalGraph dag.Graph[physical.Node]
|
|
vecAgg = physicalGraph.Add(&physical.VectorAggregation{
|
|
Operation: types.VectorAggregationTypeSum,
|
|
})
|
|
parallelize = physicalGraph.Add(&physical.Parallelize{})
|
|
rangeAgg = physicalGraph.Add(&physical.RangeAggregation{
|
|
Operation: types.RangeAggregationTypeCount,
|
|
Start: time.Unix(5, 0).UTC(),
|
|
End: time.Unix(45, 0).UTC(),
|
|
})
|
|
scanSet = physicalGraph.Add(&physical.ScanSet{
|
|
Targets: []*physical.ScanTarget{
|
|
{Type: physical.ScanTypeDataObject, DataObject: &physical.DataObjScan{
|
|
Location: "a",
|
|
MaxTimeRange: physical.TimeRange{Start: time.Unix(10, 0).UTC(), End: time.Unix(50, 0).UTC()}},
|
|
},
|
|
{Type: physical.ScanTypeDataObject, DataObject: &physical.DataObjScan{
|
|
Location: "b",
|
|
MaxTimeRange: physical.TimeRange{Start: time.Unix(20, 0).UTC(), End: time.Unix(60, 0).UTC()}},
|
|
},
|
|
},
|
|
})
|
|
)
|
|
|
|
_ = physicalGraph.AddEdge(dag.Edge[physical.Node]{Parent: vecAgg, Child: parallelize})
|
|
_ = physicalGraph.AddEdge(dag.Edge[physical.Node]{Parent: parallelize, Child: rangeAgg})
|
|
_ = physicalGraph.AddEdge(dag.Edge[physical.Node]{Parent: rangeAgg, Child: scanSet})
|
|
|
|
physicalPlan := physical.FromGraph(physicalGraph)
|
|
physicalPlan, err := physical.WrapWithBatching(physicalPlan, 500)
|
|
require.NoError(t, err)
|
|
|
|
graph, err := planWorkflow("", physicalPlan, cacheParams{}, log.NewNopLogger())
|
|
require.NoError(t, err)
|
|
require.Equal(t, 3, graph.Len()) // 1 global + 2 local (one per scan target)
|
|
requireUniqueStreams(t, graph)
|
|
generateConsistentULIDs(&ulidGen, graph)
|
|
|
|
// The workflow should have:
|
|
// - Task 1: Global VectorAgg reading from 2 streams (one per partition)
|
|
// - Task 2: RangeAgg + Scan for partition "a"
|
|
// - Task 3: RangeAgg + Scan for partition "b"
|
|
// All task fragments are wrapped with a Batching node.
|
|
expectOuptut := strings.TrimSpace(`
|
|
┌ Task 00000000000000000000000001
|
|
│ @max_time_range start=1970-01-01T00:00:05Z end=1970-01-01T00:00:45Z
|
|
│
|
|
│ Batching batch_size=500
|
|
│ └── VectorAggregation operation=sum group_by=()
|
|
│ ├── @source stream=00000000000000000000000004
|
|
│ └── @source stream=00000000000000000000000005
|
|
└
|
|
┌ Task 00000000000000000000000002
|
|
│ @max_time_range start=1970-01-01T00:00:05Z end=1970-01-01T00:00:45Z
|
|
│
|
|
│ Batching batch_size=500
|
|
│ │ └── @sink stream=00000000000000000000000004
|
|
│ └── RangeAggregation operation=count start=1970-01-01T00:00:05Z end=1970-01-01T00:00:45Z step=0s range=0s group_by=()
|
|
│ └── DataObjScan location=a streams=0 section_id=0 projections=()
|
|
│ └── @max_time_range start=1970-01-01T00:00:10Z end=1970-01-01T00:00:50Z
|
|
└
|
|
┌ Task 00000000000000000000000003
|
|
│ @max_time_range start=1970-01-01T00:00:05Z end=1970-01-01T00:00:45Z
|
|
│
|
|
│ Batching batch_size=500
|
|
│ │ └── @sink stream=00000000000000000000000005
|
|
│ └── RangeAggregation operation=count start=1970-01-01T00:00:05Z end=1970-01-01T00:00:45Z step=0s range=0s group_by=()
|
|
│ └── DataObjScan location=b streams=0 section_id=0 projections=()
|
|
│ └── @max_time_range start=1970-01-01T00:00:20Z end=1970-01-01T00:01:00Z
|
|
└
|
|
`)
|
|
|
|
actualOutput := Sprint(&Workflow{graph: graph})
|
|
require.Equal(t, strings.TrimSpace(expectOuptut), strings.TrimSpace(actualOutput))
|
|
|
|
t.Run("with caching", func(t *testing.T) {
|
|
ulidGen := ulidGenerator{}
|
|
|
|
graph, err := planWorkflow("", physicalPlan, cacheParams{
|
|
enabled: true,
|
|
taskCacheMaxSizeBytes: 1 * 1024 * 1024,
|
|
dataObjScanMaxSizeBytes: 0,
|
|
}, log.NewNopLogger())
|
|
require.NoError(t, err)
|
|
require.Equal(t, 3, graph.Len())
|
|
requireUniqueStreams(t, graph)
|
|
generateConsistentULIDs(&ulidGen, graph)
|
|
|
|
expectOutput := strings.TrimSpace(`
|
|
┌ Task 00000000000000000000000001
|
|
│ @max_time_range start=1970-01-01T00:00:05Z end=1970-01-01T00:00:45Z
|
|
│
|
|
│ Batching batch_size=500
|
|
│ └── VectorAggregation operation=sum group_by=()
|
|
│ ├── @source stream=00000000000000000000000004
|
|
│ └── @source stream=00000000000000000000000005
|
|
└
|
|
┌ Task 00000000000000000000000002
|
|
│ @max_time_range start=1970-01-01T00:00:05Z end=1970-01-01T00:00:45Z
|
|
│
|
|
│ Cache max_cacheable_size=1.0 MiB hashed_key=502e8c56ecf762f4 key= |>>| Batching |>>| RangeAggregation{operation=count,start=1970-01-01T00:00:05Z,end=1970-01-01T00:00:45Z,step=0s,range=0s,grouping=by=[],max_series=0} |>>| DataObjScan{location=a,section=0,stream_ids=[],projections=[],predicates=[],max_time_range_start=1970-01-01T00:00:10Z,max_time_range_end=1970-01-01T00:00:50Z}
|
|
│ │ └── @sink stream=00000000000000000000000004
|
|
│ └── Batching batch_size=500
|
|
│ └── RangeAggregation operation=count start=1970-01-01T00:00:05Z end=1970-01-01T00:00:45Z step=0s range=0s group_by=()
|
|
│ └── Cache max_cacheable_size=0 B hashed_key=4a6082f4d1b54ead key= |>>| DataObjScan{location=a,section=0,stream_ids=[],projections=[],predicates=[],max_time_range_start=1970-01-01T00:00:10Z,max_time_range_end=1970-01-01T00:00:50Z}
|
|
│ └── DataObjScan location=a streams=0 section_id=0 projections=()
|
|
│ └── @max_time_range start=1970-01-01T00:00:10Z end=1970-01-01T00:00:50Z
|
|
└
|
|
┌ Task 00000000000000000000000003
|
|
│ @max_time_range start=1970-01-01T00:00:05Z end=1970-01-01T00:00:45Z
|
|
│
|
|
│ Cache max_cacheable_size=1.0 MiB hashed_key=9888d9ec9f19e11e key= |>>| Batching |>>| RangeAggregation{operation=count,start=1970-01-01T00:00:05Z,end=1970-01-01T00:00:45Z,step=0s,range=0s,grouping=by=[],max_series=0} |>>| DataObjScan{location=b,section=0,stream_ids=[],projections=[],predicates=[],max_time_range_start=1970-01-01T00:00:20Z,max_time_range_end=1970-01-01T00:01:00Z}
|
|
│ │ └── @sink stream=00000000000000000000000005
|
|
│ └── Batching batch_size=500
|
|
│ └── RangeAggregation operation=count start=1970-01-01T00:00:05Z end=1970-01-01T00:00:45Z step=0s range=0s group_by=()
|
|
│ └── Cache max_cacheable_size=0 B hashed_key=57460db08f81adaf key= |>>| DataObjScan{location=b,section=0,stream_ids=[],projections=[],predicates=[],max_time_range_start=1970-01-01T00:00:20Z,max_time_range_end=1970-01-01T00:01:00Z}
|
|
│ └── DataObjScan location=b streams=0 section_id=0 projections=()
|
|
│ └── @max_time_range start=1970-01-01T00:00:20Z end=1970-01-01T00:01:00Z
|
|
└
|
|
`)
|
|
|
|
actualOutput := Sprint(&Workflow{graph: graph})
|
|
require.Equal(t, strings.TrimSpace(expectOutput), strings.TrimSpace(actualOutput))
|
|
})
|
|
})
|
|
|
|
t.Run("Predicates on the ScanSet are combined with the predicate pushed down to the DataObjScan", func(t *testing.T) {
|
|
ulidGen := ulidGenerator{}
|
|
|
|
var physicalGraph dag.Graph[physical.Node]
|
|
|
|
parallelize := physicalGraph.Add(&physical.Parallelize{})
|
|
topk := physicalGraph.Add(&physical.TopK{
|
|
SortBy: &physical.ColumnExpr{Ref: semconv.ColumnIdentTimestamp.ColumnRef()},
|
|
Ascending: false,
|
|
K: 100,
|
|
})
|
|
scanSet := physicalGraph.Add(&physical.ScanSet{
|
|
Predicates: []physical.Expression{
|
|
&physical.BinaryExpr{
|
|
Left: &physical.ColumnExpr{Ref: semconv.ColumnIdentTimestamp.ColumnRef()},
|
|
Right: physical.NewLiteral(time.Unix(5, 0).UTC().UnixNano()),
|
|
Op: types.BinaryOpGt,
|
|
},
|
|
},
|
|
Targets: []*physical.ScanTarget{
|
|
{
|
|
Type: physical.ScanTypeDataObject, DataObject: &physical.DataObjScan{
|
|
Location: "a",
|
|
MaxTimeRange: physical.TimeRange{Start: time.Unix(10, 0).UTC(), End: time.Unix(50, 0).UTC()},
|
|
Predicates: []physical.Expression{
|
|
&physical.BinaryExpr{
|
|
Left: &physical.ColumnExpr{Ref: semconv.ColumnIdentError.ColumnRef()},
|
|
Right: physical.NewLiteral(""),
|
|
Op: types.BinaryOpNeq,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
})
|
|
_ = physicalGraph.AddEdge(dag.Edge[physical.Node]{Parent: topk, Child: parallelize})
|
|
_ = physicalGraph.AddEdge(dag.Edge[physical.Node]{Parent: parallelize, Child: scanSet})
|
|
|
|
physicalPlan := physical.FromGraph(physicalGraph)
|
|
|
|
graph, err := planWorkflow("", physicalPlan, cacheParams{}, log.NewNopLogger())
|
|
require.NoError(t, err)
|
|
require.Equal(t, 2, graph.Len())
|
|
requireUniqueStreams(t, graph)
|
|
generateConsistentULIDs(&ulidGen, graph)
|
|
|
|
expectOuptut := strings.TrimSpace(`
|
|
┌ Task 00000000000000000000000001
|
|
│ @max_time_range start=1970-01-01T00:00:10Z end=1970-01-01T00:00:50Z
|
|
│
|
|
│ TopK sort_by=builtin.timestamp ascending=false nulls_first=false k=100
|
|
│ └── @source stream=00000000000000000000000003
|
|
└
|
|
┌ Task 00000000000000000000000002
|
|
│ @max_time_range start=1970-01-01T00:00:10Z end=1970-01-01T00:00:50Z
|
|
│
|
|
│ DataObjScan location=a streams=0 section_id=0 projections=() predicate[0]=GT(builtin.timestamp, 5000000000) predicate[1]=NEQ(generated.__error__, "")
|
|
│ ├── @max_time_range start=1970-01-01T00:00:10Z end=1970-01-01T00:00:50Z
|
|
│ └── @sink stream=00000000000000000000000003
|
|
└
|
|
`)
|
|
|
|
actualOutput := Sprint(&Workflow{graph: graph})
|
|
require.Equal(t, strings.TrimSpace(expectOuptut), strings.TrimSpace(actualOutput))
|
|
})
|
|
|
|
t.Run("clamp scanset predicates and rangeaggregation", func(t *testing.T) {
|
|
ulidGen := ulidGenerator{}
|
|
|
|
var physicalGraph dag.Graph[physical.Node]
|
|
|
|
var (
|
|
vectorAgg = physicalGraph.Add(&physical.VectorAggregation{})
|
|
rangeAgg = physicalGraph.Add(&physical.RangeAggregation{
|
|
Start: time.Unix(5, 0).UTC(),
|
|
End: time.Unix(45, 0).UTC(),
|
|
})
|
|
|
|
parallelize = physicalGraph.Add(&physical.Parallelize{})
|
|
|
|
predA = &physical.BinaryExpr{Left: &physical.ColumnExpr{Ref: types.ColumnRef{Column: types.ColumnNameBuiltinTimestamp, Type: types.ColumnTypeBuiltin}},
|
|
Right: physical.NewLiteral(types.Timestamp(60 * 1000 * 1000 * 1000)), Op: types.BinaryOpLt}
|
|
predB = &physical.BinaryExpr{Left: &physical.ColumnExpr{Ref: types.ColumnRef{Column: types.ColumnNameBuiltinTimestamp, Type: types.ColumnTypeBuiltin}},
|
|
Right: physical.NewLiteral(types.Timestamp(18 * 1000 * 1000 * 1000)), Op: types.BinaryOpGt}
|
|
|
|
scanSet = physicalGraph.Add(&physical.ScanSet{
|
|
Targets: []*physical.ScanTarget{
|
|
{
|
|
Type: physical.ScanTypeDataObject, DataObject: &physical.DataObjScan{
|
|
Location: "a",
|
|
MaxTimeRange: physical.TimeRange{Start: time.Unix(10, 0).UTC(), End: time.Unix(50, 0).UTC()},
|
|
Predicates: []physical.Expression{predA},
|
|
},
|
|
},
|
|
{
|
|
Type: physical.ScanTypeDataObject, DataObject: &physical.DataObjScan{
|
|
Location: "b",
|
|
MaxTimeRange: physical.TimeRange{Start: time.Unix(20, 0).UTC(), End: time.Unix(60, 0).UTC()},
|
|
Predicates: []physical.Expression{predB},
|
|
},
|
|
},
|
|
{
|
|
Type: physical.ScanTypeDataObject, DataObject: &physical.DataObjScan{
|
|
Location: "c",
|
|
MaxTimeRange: physical.TimeRange{Start: time.Unix(0, 0).UTC(), End: time.Unix(40, 0).UTC()},
|
|
},
|
|
},
|
|
},
|
|
})
|
|
)
|
|
|
|
_ = physicalGraph.AddEdge(dag.Edge[physical.Node]{Parent: vectorAgg, Child: parallelize})
|
|
_ = physicalGraph.AddEdge(dag.Edge[physical.Node]{Parent: parallelize, Child: rangeAgg})
|
|
_ = physicalGraph.AddEdge(dag.Edge[physical.Node]{Parent: rangeAgg, Child: scanSet})
|
|
|
|
physicalPlan := physical.FromGraph(physicalGraph)
|
|
physicalPlan, err := physical.WrapWithBatching(physicalPlan, 500)
|
|
require.NoError(t, err)
|
|
|
|
graph, err := planWorkflow("", physicalPlan, cacheParams{}, log.NewNopLogger())
|
|
require.NoError(t, err)
|
|
require.Equal(t, 4, graph.Len())
|
|
requireUniqueStreams(t, graph)
|
|
generateConsistentULIDs(&ulidGen, graph)
|
|
|
|
expectOuptut := strings.TrimSpace(`
|
|
┌ Task 00000000000000000000000001
|
|
│ @max_time_range start=1970-01-01T00:00:05Z end=1970-01-01T00:00:45Z
|
|
│
|
|
│ Batching batch_size=500
|
|
│ └── VectorAggregation operation=invalid group_by=()
|
|
│ ├── @source stream=00000000000000000000000005
|
|
│ ├── @source stream=00000000000000000000000006
|
|
│ └── @source stream=00000000000000000000000007
|
|
└
|
|
┌ Task 00000000000000000000000002
|
|
│ @max_time_range start=1970-01-01T00:00:05Z end=1970-01-01T00:00:45Z
|
|
│
|
|
│ Batching batch_size=500
|
|
│ │ └── @sink stream=00000000000000000000000005
|
|
│ └── RangeAggregation operation=invalid start=1970-01-01T00:00:05Z end=1970-01-01T00:00:45Z step=0s range=0s group_by=()
|
|
│ └── DataObjScan location=a streams=0 section_id=0 projections=() predicate[0]=LTE(builtin.timestamp, 1970-01-01T00:00:50Z)
|
|
│ └── @max_time_range start=1970-01-01T00:00:10Z end=1970-01-01T00:00:50Z
|
|
└
|
|
┌ Task 00000000000000000000000003
|
|
│ @max_time_range start=1970-01-01T00:00:05Z end=1970-01-01T00:00:45Z
|
|
│
|
|
│ Batching batch_size=500
|
|
│ │ └── @sink stream=00000000000000000000000006
|
|
│ └── RangeAggregation operation=invalid start=1970-01-01T00:00:05Z end=1970-01-01T00:00:45Z step=0s range=0s group_by=()
|
|
│ └── DataObjScan location=b streams=0 section_id=0 projections=() predicate[0]=GTE(builtin.timestamp, 1970-01-01T00:00:20Z)
|
|
│ └── @max_time_range start=1970-01-01T00:00:20Z end=1970-01-01T00:01:00Z
|
|
└
|
|
┌ Task 00000000000000000000000004
|
|
│ @max_time_range start=1970-01-01T00:00:05Z end=1970-01-01T00:00:45Z
|
|
│
|
|
│ Batching batch_size=500
|
|
│ │ └── @sink stream=00000000000000000000000007
|
|
│ └── RangeAggregation operation=invalid start=1970-01-01T00:00:05Z end=1970-01-01T00:00:45Z step=0s range=0s group_by=()
|
|
│ └── DataObjScan location=c streams=0 section_id=0 projections=()
|
|
│ └── @max_time_range start=1970-01-01T00:00:00Z end=1970-01-01T00:00:40Z
|
|
└
|
|
`)
|
|
|
|
actualOutput := Sprint(&Workflow{graph: graph})
|
|
require.Equal(t, strings.TrimSpace(expectOuptut), strings.TrimSpace(actualOutput))
|
|
|
|
t.Run("with caching", func(t *testing.T) {
|
|
ulidGen := ulidGenerator{}
|
|
|
|
graph, err := planWorkflow("", physicalPlan, cacheParams{enabled: true, taskCacheMaxSizeBytes: 1 * 1024 * 1024, dataObjScanMaxSizeBytes: 1 * 1024 * 1024}, log.NewNopLogger())
|
|
require.NoError(t, err)
|
|
require.Equal(t, 4, graph.Len())
|
|
requireUniqueStreams(t, graph)
|
|
generateConsistentULIDs(&ulidGen, graph)
|
|
|
|
expectOutput := strings.TrimSpace(`
|
|
┌ Task 00000000000000000000000001
|
|
│ @max_time_range start=1970-01-01T00:00:05Z end=1970-01-01T00:00:45Z
|
|
│
|
|
│ Batching batch_size=500
|
|
│ └── VectorAggregation operation=invalid group_by=()
|
|
│ ├── @source stream=00000000000000000000000005
|
|
│ ├── @source stream=00000000000000000000000006
|
|
│ └── @source stream=00000000000000000000000007
|
|
└
|
|
┌ Task 00000000000000000000000002
|
|
│ @max_time_range start=1970-01-01T00:00:05Z end=1970-01-01T00:00:45Z
|
|
│
|
|
│ Cache max_cacheable_size=1.0 MiB hashed_key=beb71f7dcfcd7095 key= |>>| Batching |>>| RangeAggregation{operation=invalid,start=1970-01-01T00:00:05Z,end=1970-01-01T00:00:45Z,step=0s,range=0s,grouping=by=[],max_series=0} |>>| DataObjScan{location=a,section=0,stream_ids=[],projections=[],predicates=[LTE(builtin.timestamp, 1970-01-01T00:00:50Z)],max_time_range_start=1970-01-01T00:00:10Z,max_time_range_end=1970-01-01T00:00:50Z}
|
|
│ │ └── @sink stream=00000000000000000000000005
|
|
│ └── Batching batch_size=500
|
|
│ └── RangeAggregation operation=invalid start=1970-01-01T00:00:05Z end=1970-01-01T00:00:45Z step=0s range=0s group_by=()
|
|
│ └── Cache max_cacheable_size=1.0 MiB hashed_key=cfbb813ac32eac14 key= |>>| DataObjScan{location=a,section=0,stream_ids=[],projections=[],predicates=[LTE(builtin.timestamp, 1970-01-01T00:00:50Z)],max_time_range_start=1970-01-01T00:00:10Z,max_time_range_end=1970-01-01T00:00:50Z}
|
|
│ └── DataObjScan location=a streams=0 section_id=0 projections=() predicate[0]=LTE(builtin.timestamp, 1970-01-01T00:00:50Z)
|
|
│ └── @max_time_range start=1970-01-01T00:00:10Z end=1970-01-01T00:00:50Z
|
|
└
|
|
┌ Task 00000000000000000000000003
|
|
│ @max_time_range start=1970-01-01T00:00:05Z end=1970-01-01T00:00:45Z
|
|
│
|
|
│ Cache max_cacheable_size=1.0 MiB hashed_key=edb84a0ddd883413 key= |>>| Batching |>>| RangeAggregation{operation=invalid,start=1970-01-01T00:00:05Z,end=1970-01-01T00:00:45Z,step=0s,range=0s,grouping=by=[],max_series=0} |>>| DataObjScan{location=b,section=0,stream_ids=[],projections=[],predicates=[GTE(builtin.timestamp, 1970-01-01T00:00:20Z)],max_time_range_start=1970-01-01T00:00:20Z,max_time_range_end=1970-01-01T00:01:00Z}
|
|
│ │ └── @sink stream=00000000000000000000000006
|
|
│ └── Batching batch_size=500
|
|
│ └── RangeAggregation operation=invalid start=1970-01-01T00:00:05Z end=1970-01-01T00:00:45Z step=0s range=0s group_by=()
|
|
│ └── Cache max_cacheable_size=1.0 MiB hashed_key=ec866af4d1638d82 key= |>>| DataObjScan{location=b,section=0,stream_ids=[],projections=[],predicates=[GTE(builtin.timestamp, 1970-01-01T00:00:20Z)],max_time_range_start=1970-01-01T00:00:20Z,max_time_range_end=1970-01-01T00:01:00Z}
|
|
│ └── DataObjScan location=b streams=0 section_id=0 projections=() predicate[0]=GTE(builtin.timestamp, 1970-01-01T00:00:20Z)
|
|
│ └── @max_time_range start=1970-01-01T00:00:20Z end=1970-01-01T00:01:00Z
|
|
└
|
|
┌ Task 00000000000000000000000004
|
|
│ @max_time_range start=1970-01-01T00:00:05Z end=1970-01-01T00:00:45Z
|
|
│
|
|
│ Cache max_cacheable_size=1.0 MiB hashed_key=ea0acf82724b9cfa key= |>>| Batching |>>| RangeAggregation{operation=invalid,start=1970-01-01T00:00:05Z,end=1970-01-01T00:00:45Z,step=0s,range=0s,grouping=by=[],max_series=0} |>>| DataObjScan{location=c,section=0,stream_ids=[],projections=[],predicates=[],max_time_range_start=1970-01-01T00:00:00Z,max_time_range_end=1970-01-01T00:00:40Z}
|
|
│ │ └── @sink stream=00000000000000000000000007
|
|
│ └── Batching batch_size=500
|
|
│ └── RangeAggregation operation=invalid start=1970-01-01T00:00:05Z end=1970-01-01T00:00:45Z step=0s range=0s group_by=()
|
|
│ └── Cache max_cacheable_size=1.0 MiB hashed_key=acadc504b7eeee93 key= |>>| DataObjScan{location=c,section=0,stream_ids=[],projections=[],predicates=[],max_time_range_start=1970-01-01T00:00:00Z,max_time_range_end=1970-01-01T00:00:40Z}
|
|
│ └── DataObjScan location=c streams=0 section_id=0 projections=()
|
|
│ └── @max_time_range start=1970-01-01T00:00:00Z end=1970-01-01T00:00:40Z
|
|
└
|
|
`)
|
|
|
|
actualOutput := Sprint(&Workflow{graph: graph})
|
|
require.Equal(t, strings.TrimSpace(expectOutput), strings.TrimSpace(actualOutput))
|
|
})
|
|
})
|
|
|
|
}
|
|
|
|
// requireUniqueStreams asserts that for each stream found in g, that stream has
|
|
// exactly one reader (from Task.Sources) and exactly one writer (from
|
|
// Task.Sinks).
|
|
func requireUniqueStreams(t testing.TB, g dag.Graph[*Task]) {
|
|
t.Helper()
|
|
|
|
type streamInfo struct {
|
|
Reader physical.Node
|
|
Writer physical.Node
|
|
}
|
|
|
|
streamInfos := make(map[*Stream]*streamInfo)
|
|
|
|
for _, root := range g.Roots() {
|
|
_ = g.Walk(root, func(task *Task) error {
|
|
for node, streams := range task.Sources {
|
|
for _, stream := range streams {
|
|
info, ok := streamInfos[stream]
|
|
if !ok {
|
|
info = new(streamInfo)
|
|
streamInfos[stream] = info
|
|
}
|
|
|
|
require.Nil(t, info.Reader, "each stream must have exactly one reader")
|
|
info.Reader = node
|
|
}
|
|
}
|
|
|
|
for node, streams := range task.Sinks {
|
|
for _, stream := range streams {
|
|
info, ok := streamInfos[stream]
|
|
if !ok {
|
|
info = new(streamInfo)
|
|
streamInfos[stream] = info
|
|
}
|
|
|
|
require.Nil(t, info.Writer, "each stream must have exactly one writer")
|
|
info.Writer = node
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}, dag.PreOrderWalk)
|
|
}
|
|
|
|
for _, info := range streamInfos {
|
|
require.NotNil(t, info.Reader, "each stream must have exactly one reader")
|
|
require.NotNil(t, info.Writer, "each stream must have exactly one writer")
|
|
}
|
|
}
|
|
|
|
// generateConsistentULIDs reassigns ULIDs to all tasks and streams in the graph
|
|
// using the ULID generator for consistent output.
|
|
func generateConsistentULIDs(gen *ulidGenerator, g dag.Graph[*Task]) {
|
|
for _, root := range g.Roots() {
|
|
_ = g.Walk(root, func(task *Task) error {
|
|
task.ULID = gen.Make()
|
|
return nil
|
|
}, dag.PreOrderWalk)
|
|
}
|
|
|
|
// For easier debugging, we do a second pass for all the streams. That way
|
|
// we know stream ULIDs start after task ULIDs.
|
|
for _, root := range g.Roots() {
|
|
_ = g.Walk(root, func(task *Task) error {
|
|
for _, streams := range task.Sources {
|
|
for _, stream := range streams {
|
|
stream.ULID = gen.Make()
|
|
}
|
|
}
|
|
return nil
|
|
}, dag.PreOrderWalk)
|
|
}
|
|
}
|
|
|
|
// Test_eliminateEmptyCachedTasks verifies that tasks whose cached result is
|
|
// empty are removed from the workflow graph at plan time.
|
|
func Test_pruneCachedTasks(t *testing.T) {
|
|
// Create a physical plan:
|
|
// VectorAggregation --> Parallelize --> RangeAggr --> ScanSet{DataAbjScan(A), DataAbjScan(B)}
|
|
// That will translate into three tasks:
|
|
// - Task 1 (root): global VectorAggregation
|
|
// - Task 2 (leaf): RangeAggregation + DataObjScan for location "a"
|
|
// - Task 3 (leaf): RangeAggregation + DataObjScan for location "b"
|
|
var physicalGraph dag.Graph[physical.Node]
|
|
var (
|
|
vecAgg = physicalGraph.Add(&physical.VectorAggregation{Operation: types.VectorAggregationTypeSum})
|
|
parallelize = physicalGraph.Add(&physical.Parallelize{})
|
|
rangeAgg = physicalGraph.Add(&physical.RangeAggregation{
|
|
Operation: types.RangeAggregationTypeCount,
|
|
Start: time.Unix(5, 0).UTC(),
|
|
End: time.Unix(45, 0).UTC(),
|
|
})
|
|
scanSet = physicalGraph.Add(&physical.ScanSet{
|
|
Targets: []*physical.ScanTarget{
|
|
{Type: physical.ScanTypeDataObject, DataObject: &physical.DataObjScan{
|
|
Location: "a",
|
|
MaxTimeRange: physical.TimeRange{Start: time.Unix(10, 0).UTC(), End: time.Unix(50, 0).UTC()},
|
|
}},
|
|
{Type: physical.ScanTypeDataObject, DataObject: &physical.DataObjScan{
|
|
Location: "b",
|
|
MaxTimeRange: physical.TimeRange{Start: time.Unix(20, 0).UTC(), End: time.Unix(60, 0).UTC()},
|
|
}},
|
|
},
|
|
})
|
|
)
|
|
_ = physicalGraph.AddEdge(dag.Edge[physical.Node]{Parent: vecAgg, Child: parallelize})
|
|
_ = physicalGraph.AddEdge(dag.Edge[physical.Node]{Parent: parallelize, Child: rangeAgg})
|
|
_ = physicalGraph.AddEdge(dag.Edge[physical.Node]{Parent: rangeAgg, Child: scanSet})
|
|
physicalPlan := physical.FromGraph(physicalGraph)
|
|
physicalPlan, err := physical.WrapWithBatching(physicalPlan, 500)
|
|
require.NoError(t, err)
|
|
|
|
cacheP := cacheParams{
|
|
enabled: true,
|
|
taskCacheMaxSizeBytes: 1 << 20,
|
|
// dataObjScanMaxSizeBytes=0: DataObjScan Cache nodes are still injected
|
|
// (with max_cacheable_size=0 B) so their keys can be used for elimination.
|
|
dataObjScanMaxSizeBytes: 0,
|
|
pruneEmptyCachedTasks: true,
|
|
nonEmptyCachedTasksMaxBytes: math.MaxUint64,
|
|
}
|
|
|
|
// Extract raw cache keys for every task in graph traversal order.
|
|
// tasks[0] = root (VecAgg, no graph parents), tasks[1] = leaf "a", tasks[2] = leaf "b".
|
|
// Each entry maps cache-type name → raw key for that task.
|
|
baseGraph, err := planWorkflow("", physicalPlan, cacheP, log.NewNopLogger())
|
|
require.NoError(t, err)
|
|
require.Equal(t, 3, baseGraph.Len())
|
|
|
|
// Collect cache keys from all tasks in the workflow
|
|
var tasks []map[physical.TaskCacheName]string
|
|
for _, root := range baseGraph.Roots() {
|
|
_ = baseGraph.Walk(root, func(task *Task) error {
|
|
keys := make(map[physical.TaskCacheName]string)
|
|
if fragRoot, err := task.Fragment.Root(); err == nil {
|
|
if cn, ok := fragRoot.(*physical.Cache); ok {
|
|
keys[cn.CacheName] = cn.Key
|
|
}
|
|
}
|
|
for n := range task.Fragment.Graph().Nodes() {
|
|
if cn, ok := n.(*physical.Cache); ok && cn.CacheName == physical.TaskCacheDataObjScanResult {
|
|
keys[cn.CacheName] = cn.Key
|
|
}
|
|
}
|
|
tasks = append(tasks, keys)
|
|
return nil
|
|
}, dag.PreOrderWalk)
|
|
}
|
|
require.Len(t, tasks, 3, "expected three tasks in traversal order")
|
|
require.Len(t, tasks[0], 0, "task[0] must not have any cache keys")
|
|
require.Contains(t, tasks[1], physical.TaskCacheLogsScanRangeAggr, "task[1] must have a task-level cache key")
|
|
require.Contains(t, tasks[2], physical.TaskCacheLogsScanRangeAggr, "task[2] must have a task-level cache key")
|
|
require.Contains(t, tasks[1], physical.TaskCacheDataObjScanResult, "task[1] must have a DataObjScan cache key")
|
|
require.Contains(t, tasks[2], physical.TaskCacheDataObjScanResult, "task[2] must have a DataObjScan cache key")
|
|
|
|
baseUlidGen := ulidGenerator{}
|
|
generateConsistentULIDs(&baseUlidGen, baseGraph)
|
|
originalPlan := strings.TrimSpace(Sprint(&Workflow{graph: baseGraph}))
|
|
|
|
for _, tc := range []struct {
|
|
name string
|
|
payloads map[string][]byte // raw cache key → payload; absent keys = cache miss
|
|
maxNonEmptyBytes *uint64 // nil = use cacheP default (math.MaxUint64)
|
|
pruneFetchTimeout time.Duration // 0 = no timeout
|
|
cacheOverride cache.Cache // if non-nil, used instead of building from payloads
|
|
expected string // expected Sprint output after elimination
|
|
}{
|
|
{
|
|
// All cache keys miss → no elimination; plan is identical to baseGraph.
|
|
name: "cache miss",
|
|
payloads: nil,
|
|
expected: originalPlan,
|
|
},
|
|
{
|
|
// Both task-level cache keys return non-empty with pruneCachedTasks=true →
|
|
// both leaves eliminated; root gets @cachedSource buffers=2.
|
|
name: "cache non-empty",
|
|
payloads: map[string][]byte{
|
|
tasks[1][physical.TaskCacheLogsScanRangeAggr]: nonEmptyResultPayload(),
|
|
tasks[2][physical.TaskCacheLogsScanRangeAggr]: nonEmptyResultPayload(),
|
|
},
|
|
expected: `
|
|
┌ Task 00000000000000000000000001
|
|
│ @max_time_range start=1970-01-01T00:00:05Z end=1970-01-01T00:00:45Z
|
|
│
|
|
│ Batching batch_size=500
|
|
│ └── VectorAggregation operation=sum group_by=()
|
|
│ └── @cachedSource buffers=2 size=18 B
|
|
└`,
|
|
},
|
|
{
|
|
// Task "a" task-level cache returns empty → task "a" eliminated;
|
|
// root and task "b" remain.
|
|
name: "one task empty",
|
|
payloads: map[string][]byte{
|
|
tasks[1][physical.TaskCacheLogsScanRangeAggr]: emptyResultPayload(),
|
|
},
|
|
expected: `
|
|
┌ Task 00000000000000000000000001
|
|
│ @max_time_range start=1970-01-01T00:00:05Z end=1970-01-01T00:00:45Z
|
|
│
|
|
│ Batching batch_size=500
|
|
│ └── VectorAggregation operation=sum group_by=()
|
|
│ └── @source stream=00000000000000000000000003
|
|
└
|
|
┌ Task 00000000000000000000000002
|
|
│ @max_time_range start=1970-01-01T00:00:05Z end=1970-01-01T00:00:45Z
|
|
│
|
|
│ Cache max_cacheable_size=1.0 MiB hashed_key=9888d9ec9f19e11e key= |>>| Batching |>>| RangeAggregation{operation=count,start=1970-01-01T00:00:05Z,end=1970-01-01T00:00:45Z,step=0s,range=0s,grouping=by=[],max_series=0} |>>| DataObjScan{location=b,section=0,stream_ids=[],projections=[],predicates=[],max_time_range_start=1970-01-01T00:00:20Z,max_time_range_end=1970-01-01T00:01:00Z}
|
|
│ │ └── @sink stream=00000000000000000000000003
|
|
│ └── Batching batch_size=500
|
|
│ └── RangeAggregation operation=count start=1970-01-01T00:00:05Z end=1970-01-01T00:00:45Z step=0s range=0s group_by=()
|
|
│ └── Cache max_cacheable_size=0 B hashed_key=57460db08f81adaf key= |>>| DataObjScan{location=b,section=0,stream_ids=[],projections=[],predicates=[],max_time_range_start=1970-01-01T00:00:20Z,max_time_range_end=1970-01-01T00:01:00Z}
|
|
│ └── DataObjScan location=b streams=0 section_id=0 projections=()
|
|
│ └── @max_time_range start=1970-01-01T00:00:20Z end=1970-01-01T00:01:00Z
|
|
└`,
|
|
},
|
|
{
|
|
// Task "a" returns empty, task "b" returns non-empty with pruneCachedTasks=true →
|
|
// both leaves eliminated; root gets @cachedSource buffers=1.
|
|
name: "cache one empty one non-empty",
|
|
payloads: map[string][]byte{
|
|
tasks[1][physical.TaskCacheLogsScanRangeAggr]: emptyResultPayload(),
|
|
tasks[2][physical.TaskCacheLogsScanRangeAggr]: nonEmptyResultPayload(),
|
|
},
|
|
expected: `
|
|
┌ Task 00000000000000000000000001
|
|
│ @max_time_range start=1970-01-01T00:00:05Z end=1970-01-01T00:00:45Z
|
|
│
|
|
│ Batching batch_size=500
|
|
│ └── VectorAggregation operation=sum group_by=()
|
|
│ └── @cachedSource buffers=1 size=9 B
|
|
└`,
|
|
},
|
|
{
|
|
// Scan-level cache non-empty should NOT eliminate the task: the parent
|
|
// expects task-level (aggregated) results, not raw scan data.
|
|
// Plan is identical to the full base graph.
|
|
name: "cache non-empty at scan",
|
|
payloads: map[string][]byte{
|
|
tasks[1][physical.TaskCacheDataObjScanResult]: nonEmptyResultPayload(),
|
|
},
|
|
expected: originalPlan,
|
|
},
|
|
{
|
|
// Scan-level cache empty should NOT eliminate the task: only root-level
|
|
// cache hits are checked. Plan is identical to the full base graph.
|
|
name: "cache empty at scan",
|
|
payloads: map[string][]byte{
|
|
tasks[1][physical.TaskCacheDataObjScanResult]: emptyResultPayload(),
|
|
},
|
|
expected: originalPlan,
|
|
},
|
|
{
|
|
// Task "a" is a cache miss → stays as network stream source.
|
|
// Task "b" is non-empty with pruneCachedTasks=true → eliminated; root gets @cachedSource keys=1.
|
|
// Root shows both @source (task "a" stream) and @cachedSource (task "b" result).
|
|
name: "cache one miss one non-empty",
|
|
payloads: map[string][]byte{
|
|
tasks[2][physical.TaskCacheLogsScanRangeAggr]: nonEmptyResultPayload(),
|
|
},
|
|
expected: `
|
|
┌ Task 00000000000000000000000001
|
|
│ @max_time_range start=1970-01-01T00:00:05Z end=1970-01-01T00:00:45Z
|
|
│
|
|
│ Batching batch_size=500
|
|
│ └── VectorAggregation operation=sum group_by=()
|
|
│ ├── @source stream=00000000000000000000000003
|
|
│ └── @cachedSource buffers=1 size=9 B
|
|
└
|
|
┌ Task 00000000000000000000000002
|
|
│ @max_time_range start=1970-01-01T00:00:05Z end=1970-01-01T00:00:45Z
|
|
│
|
|
│ Cache max_cacheable_size=1.0 MiB hashed_key=502e8c56ecf762f4 key= |>>| Batching |>>| RangeAggregation{operation=count,start=1970-01-01T00:00:05Z,end=1970-01-01T00:00:45Z,step=0s,range=0s,grouping=by=[],max_series=0} |>>| DataObjScan{location=a,section=0,stream_ids=[],projections=[],predicates=[],max_time_range_start=1970-01-01T00:00:10Z,max_time_range_end=1970-01-01T00:00:50Z}
|
|
│ │ └── @sink stream=00000000000000000000000003
|
|
│ └── Batching batch_size=500
|
|
│ └── RangeAggregation operation=count start=1970-01-01T00:00:05Z end=1970-01-01T00:00:45Z step=0s range=0s group_by=()
|
|
│ └── Cache max_cacheable_size=0 B hashed_key=4a6082f4d1b54ead key= |>>| DataObjScan{location=a,section=0,stream_ids=[],projections=[],predicates=[],max_time_range_start=1970-01-01T00:00:10Z,max_time_range_end=1970-01-01T00:00:50Z}
|
|
│ └── DataObjScan location=a streams=0 section_id=0 projections=()
|
|
│ └── @max_time_range start=1970-01-01T00:00:10Z end=1970-01-01T00:00:50Z
|
|
└`,
|
|
},
|
|
{
|
|
// Both task-level caches return empty → both leaves eliminated; the root
|
|
// is cascade-eliminated because all its sources are gone.
|
|
name: "both tasks empty",
|
|
payloads: map[string][]byte{
|
|
tasks[1][physical.TaskCacheLogsScanRangeAggr]: emptyResultPayload(),
|
|
tasks[2][physical.TaskCacheLogsScanRangeAggr]: emptyResultPayload(),
|
|
},
|
|
expected: "Empty",
|
|
},
|
|
{
|
|
// Fetch times out mid-flight: task "a" is returned as a partial result
|
|
// (empty hit) before the deadline; task "b" is missed. Task "a" is still
|
|
// eliminated from the partial result; task "b" stays scheduled.
|
|
name: "fetch timeout with partial results",
|
|
cacheOverride: func() cache.Cache {
|
|
mc := newWorkflowMockCache()
|
|
mc.storeHashed(cache.HashKey(tasks[1][physical.TaskCacheLogsScanRangeAggr]), emptyResultPayload())
|
|
return &timeoutMockCache{workflowMockCache: *mc}
|
|
}(),
|
|
pruneFetchTimeout: time.Second,
|
|
expected: `
|
|
┌ Task 00000000000000000000000001
|
|
│ @max_time_range start=1970-01-01T00:00:05Z end=1970-01-01T00:00:45Z
|
|
│
|
|
│ Batching batch_size=500
|
|
│ └── VectorAggregation operation=sum group_by=()
|
|
│ └── @source stream=00000000000000000000000003
|
|
└
|
|
┌ Task 00000000000000000000000002
|
|
│ @max_time_range start=1970-01-01T00:00:05Z end=1970-01-01T00:00:45Z
|
|
│
|
|
│ Cache max_cacheable_size=1.0 MiB hashed_key=9888d9ec9f19e11e key= |>>| Batching |>>| RangeAggregation{operation=count,start=1970-01-01T00:00:05Z,end=1970-01-01T00:00:45Z,step=0s,range=0s,grouping=by=[],max_series=0} |>>| DataObjScan{location=b,section=0,stream_ids=[],projections=[],predicates=[],max_time_range_start=1970-01-01T00:00:20Z,max_time_range_end=1970-01-01T00:01:00Z}
|
|
│ │ └── @sink stream=00000000000000000000000003
|
|
│ └── Batching batch_size=500
|
|
│ └── RangeAggregation operation=count start=1970-01-01T00:00:05Z end=1970-01-01T00:00:45Z step=0s range=0s group_by=()
|
|
│ └── Cache max_cacheable_size=0 B hashed_key=57460db08f81adaf key= |>>| DataObjScan{location=b,section=0,stream_ids=[],projections=[],predicates=[],max_time_range_start=1970-01-01T00:00:20Z,max_time_range_end=1970-01-01T00:01:00Z}
|
|
│ └── DataObjScan location=b streams=0 section_id=0 projections=()
|
|
│ └── @max_time_range start=1970-01-01T00:00:20Z end=1970-01-01T00:01:00Z
|
|
└`,
|
|
},
|
|
{
|
|
// maxNonEmptyBytes=0 disables non-empty pruning even when both tasks hit.
|
|
// Neither leaf is eliminated; plan is identical to baseGraph.
|
|
name: "non-empty pruning disabled when max size is 0",
|
|
payloads: map[string][]byte{
|
|
tasks[1][physical.TaskCacheLogsScanRangeAggr]: nonEmptyResultPayload(),
|
|
tasks[2][physical.TaskCacheLogsScanRangeAggr]: nonEmptyResultPayload(),
|
|
},
|
|
maxNonEmptyBytes: ptr[uint64](0),
|
|
expected: originalPlan,
|
|
},
|
|
{
|
|
// Budget (5 B) is smaller than a single cached result (9 B) → the result
|
|
// is skipped by the greedy bin-packing loop; the task stays scheduled.
|
|
name: "non-empty result skipped when it exceeds size budget",
|
|
payloads: map[string][]byte{
|
|
tasks[2][physical.TaskCacheLogsScanRangeAggr]: nonEmptyResultPayload(),
|
|
},
|
|
maxNonEmptyBytes: ptr[uint64](5),
|
|
expected: originalPlan,
|
|
},
|
|
} {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
ulidGen := ulidGenerator{}
|
|
|
|
p := cacheP
|
|
if tc.maxNonEmptyBytes != nil {
|
|
p.nonEmptyCachedTasksMaxBytes = *tc.maxNonEmptyBytes
|
|
}
|
|
if tc.pruneFetchTimeout != 0 {
|
|
p.pruneFetchTimeout = tc.pruneFetchTimeout
|
|
}
|
|
if tc.cacheOverride != nil {
|
|
p.registry = executor.NewTestTaskCacheRegistry(map[physical.TaskCacheName]cache.Cache{
|
|
physical.TaskCacheLogsScanRangeAggr: tc.cacheOverride,
|
|
physical.TaskCacheDataObjScanResult: tc.cacheOverride,
|
|
})
|
|
} else if len(tc.payloads) > 0 {
|
|
mc := newWorkflowMockCache()
|
|
for rawKey, payload := range tc.payloads {
|
|
mc.storeHashed(cache.HashKey(rawKey), payload)
|
|
}
|
|
p.registry = executor.NewTestTaskCacheRegistry(map[physical.TaskCacheName]cache.Cache{
|
|
physical.TaskCacheLogsScanRangeAggr: mc,
|
|
physical.TaskCacheDataObjScanResult: mc,
|
|
})
|
|
}
|
|
|
|
g, err := planWorkflow("", physicalPlan, p, log.NewNopLogger())
|
|
require.NoError(t, err)
|
|
requireUniqueStreams(t, g)
|
|
generateConsistentULIDs(&ulidGen, g)
|
|
|
|
actualOutput := Sprint(&Workflow{graph: g})
|
|
require.Equal(t, strings.TrimSpace(tc.expected), strings.TrimSpace(actualOutput))
|
|
})
|
|
}
|
|
}
|
|
|
|
// Test_pruneCachedTasks_cascadeProtection verifies that a task with a non-empty
|
|
// cache hit is not lost when all of its children have empty cache hits.
|
|
//
|
|
// Graph: root → intermediate (non-empty hit) → [leaf_a (empty), leaf_b (empty)]
|
|
//
|
|
// Without the fix, the empty-elimination pass cascade-eliminates intermediate
|
|
// (because its two children are both eliminated and CachedSources hasn't been
|
|
// wired yet), so root never gets intermediate's cached result.
|
|
// With the fix, wireCachedSources runs before the empty pass, so root's
|
|
// CachedSources is populated before the cascade check runs.
|
|
func Test_pruneCachedTasks_cascadeProtection(t *testing.T) {
|
|
const cacheName = physical.TaskCacheLogsScanRangeAggr
|
|
|
|
// Build minimal single-node fragment graphs so that Fragment.Root() returns
|
|
// the expected physical node type.
|
|
makeFragment := func(root physical.Node) *physical.Plan {
|
|
var g dag.Graph[physical.Node]
|
|
g.Add(root)
|
|
return physical.FromGraph(g)
|
|
}
|
|
|
|
// Physical nodes that serve as map keys for Sources/Sinks/CachedSources.
|
|
rootNode := &physical.VectorAggregation{}
|
|
intermNode := &physical.Cache{CacheName: cacheName, Key: "interm-key"}
|
|
leafANode := &physical.Cache{CacheName: cacheName, Key: "leaf-a-key"}
|
|
leafBNode := &physical.Cache{CacheName: cacheName, Key: "leaf-b-key"}
|
|
|
|
// Streams that connect the tasks.
|
|
streamAB := &Stream{ULID: ulid.Make()} // intermediate → root
|
|
streamA := &Stream{ULID: ulid.Make()} // leaf_a → intermediate
|
|
streamB := &Stream{ULID: ulid.Make()} // leaf_b → intermediate
|
|
|
|
taskRoot := &Task{
|
|
ULID: ulid.Make(),
|
|
Fragment: makeFragment(rootNode),
|
|
Sources: map[physical.Node][]*Stream{rootNode: {streamAB}},
|
|
}
|
|
taskInterm := &Task{
|
|
ULID: ulid.Make(),
|
|
Fragment: makeFragment(intermNode),
|
|
Sources: map[physical.Node][]*Stream{intermNode: {streamA, streamB}},
|
|
Sinks: map[physical.Node][]*Stream{intermNode: {streamAB}},
|
|
}
|
|
taskLeafA := &Task{
|
|
ULID: ulid.Make(),
|
|
Fragment: makeFragment(leafANode),
|
|
Sinks: map[physical.Node][]*Stream{leafANode: {streamA}},
|
|
}
|
|
taskLeafB := &Task{
|
|
ULID: ulid.Make(),
|
|
Fragment: makeFragment(leafBNode),
|
|
Sinks: map[physical.Node][]*Stream{leafBNode: {streamB}},
|
|
}
|
|
|
|
// Assemble DAG: root ← intermediate ← [leaf_a, leaf_b]
|
|
var graphTasks dag.Graph[*Task]
|
|
graphTasks.Add(taskRoot)
|
|
graphTasks.Add(taskInterm)
|
|
graphTasks.Add(taskLeafA)
|
|
graphTasks.Add(taskLeafB)
|
|
require.NoError(t, graphTasks.AddEdge(dag.Edge[*Task]{Parent: taskRoot, Child: taskInterm}))
|
|
require.NoError(t, graphTasks.AddEdge(dag.Edge[*Task]{Parent: taskInterm, Child: taskLeafA}))
|
|
require.NoError(t, graphTasks.AddEdge(dag.Edge[*Task]{Parent: taskInterm, Child: taskLeafB}))
|
|
|
|
// Cache: intermediate → non-empty, leaf_a → empty, leaf_b → empty.
|
|
mc := newWorkflowMockCache()
|
|
mc.storeHashed(cache.HashKey(intermNode.Key), nonEmptyResultPayload())
|
|
mc.storeHashed(cache.HashKey(leafANode.Key), emptyResultPayload())
|
|
mc.storeHashed(cache.HashKey(leafBNode.Key), emptyResultPayload())
|
|
|
|
p := &planner{
|
|
graph: graphTasks,
|
|
streamWriters: map[*Stream]*Task{
|
|
streamAB: taskInterm,
|
|
streamA: taskLeafA,
|
|
streamB: taskLeafB,
|
|
},
|
|
}
|
|
cacheOpts := cacheParams{
|
|
enabled: true,
|
|
pruneEmptyCachedTasks: true,
|
|
nonEmptyCachedTasksMaxBytes: math.MaxUint64,
|
|
registry: executor.NewTestTaskCacheRegistry(map[physical.TaskCacheName]cache.Cache{
|
|
cacheName: mc,
|
|
}),
|
|
}
|
|
|
|
require.NoError(t, pruneCachedTasks(p, cacheOpts, log.NewNopLogger()))
|
|
|
|
// Root and intermediate are the only tasks that should remain.
|
|
// leaf_a and leaf_b are eliminated (empty hits); intermediate is also
|
|
// eliminated but its result is wired into root's CachedSources.
|
|
require.Equal(t, 1, p.graph.Len(), "only root should remain")
|
|
roots := p.graph.Roots()
|
|
require.Len(t, roots, 1)
|
|
require.Equal(t, taskRoot, roots[0], "root must still be in graph")
|
|
|
|
// Root must have received intermediate's cached result.
|
|
require.NotEmpty(t, taskRoot.CachedSources, "root must have CachedSources wired")
|
|
require.Empty(t, taskRoot.Sources, "stream from intermediate must be gone from Sources")
|
|
}
|
|
|
|
// workflowMockCache is a simple in-memory cache.Cache for workflow planning tests.
|
|
type workflowMockCache struct {
|
|
data map[string][]byte
|
|
}
|
|
|
|
func newWorkflowMockCache() *workflowMockCache {
|
|
return &workflowMockCache{data: make(map[string][]byte)}
|
|
}
|
|
|
|
func (m *workflowMockCache) storeHashed(hashedKey string, buf []byte) {
|
|
m.data[hashedKey] = buf
|
|
}
|
|
|
|
func (m *workflowMockCache) Store(_ context.Context, keys []string, bufs [][]byte) error {
|
|
for i, k := range keys {
|
|
m.data[k] = bufs[i]
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (m *workflowMockCache) Fetch(_ context.Context, keys []string) (found []string, bufs [][]byte, missing []string, err error) {
|
|
for _, k := range keys {
|
|
if buf, ok := m.data[k]; ok {
|
|
found = append(found, k)
|
|
bufs = append(bufs, buf)
|
|
} else {
|
|
missing = append(missing, k)
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
func (m *workflowMockCache) Stop() {}
|
|
|
|
func (m *workflowMockCache) GetCacheType() stats.CacheType { return stats.TaskResultCache }
|
|
|
|
// timeoutMockCache wraps workflowMockCache and always returns context.DeadlineExceeded
|
|
// alongside whatever results were found, simulating a cache backend that times out
|
|
// mid-flight after returning partial results.
|
|
type timeoutMockCache struct {
|
|
workflowMockCache
|
|
}
|
|
|
|
func (m *timeoutMockCache) Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missing []string, err error) {
|
|
found, bufs, missing, _ = m.workflowMockCache.Fetch(ctx, keys)
|
|
err = context.DeadlineExceeded
|
|
return
|
|
}
|
|
|
|
// ptr returns a pointer to v. Used to set optional test-case fields inline.
|
|
func ptr[T any](v T) *T { return &v }
|
|
|
|
// emptyResultPayload returns a zero-length buffer which is treated as an empty cached result.
|
|
func emptyResultPayload() []byte { return []byte{} }
|
|
|
|
// nonEmptyResultPayload returns a non-empty buffer so that the cache entry is not treated as empty.
|
|
func nonEmptyResultPayload() []byte {
|
|
// Wire format: [8 bytes count big-endian] [1 byte codec]
|
|
return []byte{0, 0, 0, 0, 0, 0, 0, 1, 0}
|
|
}
|
|
|
|
type ulidGenerator struct {
|
|
lastCounter uint64
|
|
}
|
|
|
|
func (g *ulidGenerator) Make() ulid.ULID {
|
|
g.lastCounter++
|
|
value := g.lastCounter
|
|
|
|
// Manually set ULID bytes - counter in the last 8 bytes, zeros in first 8
|
|
var ulidBytes [16]byte
|
|
// Put the counter value in the last 8 bytes (big-endian)
|
|
for i := range 8 {
|
|
ulidBytes[15-i] = byte(value >> (8 * i))
|
|
}
|
|
return ulidBytes
|
|
}
|
|
|