diff --git a/pkg/engine/planner/physical/catalog.go b/pkg/engine/planner/physical/catalog.go index eaaeabd4bb..bec74d7518 100644 --- a/pkg/engine/planner/physical/catalog.go +++ b/pkg/engine/planner/physical/catalog.go @@ -116,37 +116,6 @@ func (c *MetastoreCatalog) ResolveShardDescriptorsWithShard(selector Expression, return c.resolveShardDescriptorsWithIndex(selector, predicates, shard, from, through) } -func filterForShard(shard ShardInfo, paths []string, streamIDs [][]int64, numSections []int, from, through time.Time) ([]FilteredShardDescriptor, error) { - filteredDescriptors := make([]FilteredShardDescriptor, 0, len(paths)) - - var count int - for i := range paths { - sec := make([]int, 0, numSections[i]) - - for j := range numSections[i] { - if count%int(shard.Of) == int(shard.Shard) { - sec = append(sec, j) - } - count++ - } - - if len(sec) > 0 { - filteredDescriptor := FilteredShardDescriptor{} - filteredDescriptor.Location = DataObjLocation(paths[i]) - filteredDescriptor.Sections = sec - filteredDescriptor.Streams = streamIDs[i] - tr, err := newTimeRange(from, through) - if err != nil { - return nil, err - } - filteredDescriptor.TimeRange = tr - filteredDescriptors = append(filteredDescriptors, filteredDescriptor) - } - } - - return filteredDescriptors, nil -} - // resolveShardDescriptorsWithIndex expects the metastore to initially point to index objects, not the log objects directly. func (c *MetastoreCatalog) resolveShardDescriptorsWithIndex(selector Expression, predicates []Expression, shard ShardInfo, from, through time.Time) ([]FilteredShardDescriptor, error) { if c.metastore == nil { diff --git a/pkg/engine/planner/physical/optimizer_test.go b/pkg/engine/planner/physical/optimizer_test.go index 376d064cb2..817d07ea51 100644 --- a/pkg/engine/planner/physical/optimizer_test.go +++ b/pkg/engine/planner/physical/optimizer_test.go @@ -7,6 +7,7 @@ import ( "github.com/stretchr/testify/require" + "github.com/grafana/loki/v3/pkg/dataobj/metastore" "github.com/grafana/loki/v3/pkg/engine/internal/datatype" "github.com/grafana/loki/v3/pkg/engine/internal/types" "github.com/grafana/loki/v3/pkg/engine/planner/logical" @@ -869,10 +870,14 @@ func TestProjectionPushdown_PushesRequestedKeysToParseNodes(t *testing.T) { require.NoError(t, err) // Create physical planner with test catalog - catalog := &catalog{ - streamsByObject: map[string]objectMeta{ - "/test/object": {streamIDs: []int64{1, 2}, sections: 10}, - }, + catalog := &catalog{} + for i := 0; i < 10; i++ { + catalog.sectionDescriptors = append(catalog.sectionDescriptors, &metastore.DataobjSectionDescriptor{ + SectionKey: metastore.SectionKey{ObjectPath: "/test/object", SectionIdx: int64(i)}, + StreamIDs: []int64{1, 2}, + Start: time.Unix(0, 0), + End: time.Unix(3600, 0), + }) } ctx := NewContext(time.Unix(0, 0), time.Unix(3600, 0)) planner := NewPlanner(ctx, catalog) diff --git a/pkg/engine/planner/physical/planner.go b/pkg/engine/planner/physical/planner.go index 8676e7db5f..d8572aa3fc 100644 --- a/pkg/engine/planner/physical/planner.go +++ b/pkg/engine/planner/physical/planner.go @@ -3,6 +3,7 @@ package physical import ( "errors" "fmt" + "slices" "sort" "time" @@ -240,6 +241,10 @@ func (p *Planner) processMakeTable(lp *logical.MakeTable, ctx *Context) ([]Node, p.plan.addNode(merge) groups := overlappingShardDescriptors(filteredShardDescriptors) + if ctx.direction == DESC { + slices.Reverse(groups) + } + for _, gr := range groups { if err := p.buildNodeGroup(gr, merge, ctx); err != nil { return nil, err diff --git a/pkg/engine/planner/physical/planner_test.go b/pkg/engine/planner/physical/planner_test.go index 384b35b353..5a3f733698 100644 --- a/pkg/engine/planner/physical/planner_test.go +++ b/pkg/engine/planner/physical/planner_test.go @@ -1,24 +1,20 @@ package physical import ( - "sort" + "regexp" "testing" "time" "github.com/stretchr/testify/require" + "github.com/grafana/loki/v3/pkg/dataobj/metastore" "github.com/grafana/loki/v3/pkg/engine/internal/datatype" "github.com/grafana/loki/v3/pkg/engine/internal/types" "github.com/grafana/loki/v3/pkg/engine/planner/logical" ) -type objectMeta struct { - streamIDs []int64 - sections int -} - type catalog struct { - streamsByObject map[string]objectMeta + sectionDescriptors []*metastore.DataobjSectionDescriptor } // ResolveShardDescriptors implements Catalog. @@ -27,61 +23,51 @@ func (c *catalog) ResolveShardDescriptors(e Expression, from, through time.Time) } // ResolveDataObjForShard implements Catalog. -func (c *catalog) ResolveShardDescriptorsWithShard(_ Expression, _ []Expression, shard ShardInfo, from, through time.Time) ([]FilteredShardDescriptor, error) { - paths := make([]string, 0, len(c.streamsByObject)) - streams := make([][]int64, 0, len(c.streamsByObject)) - sections := make([]int, 0, len(c.streamsByObject)) - - for o, s := range c.streamsByObject { - paths = append(paths, o) - streams = append(streams, s.streamIDs) - sections = append(sections, s.sections) - } - - // The function needs to return objects and their streamIDs and sections in predictable order - sort.Slice(streams, func(i, j int) bool { - return paths[i] < paths[j] - }) - sort.Slice(sections, func(i, j int) bool { - return paths[i] < paths[j] - }) - sort.Slice(paths, func(i, j int) bool { - return paths[i] < paths[j] - }) - - return filterForShard(shard, paths, streams, sections, from, through) +func (c *catalog) ResolveShardDescriptorsWithShard(_ Expression, _ []Expression, shard ShardInfo, _, _ time.Time) ([]FilteredShardDescriptor, error) { + return filterDescriptorsForShard(shard, c.sectionDescriptors) } var _ Catalog = (*catalog)(nil) func TestMockCatalog(t *testing.T) { + timeStart := time.Now() + timeEnd := timeStart.Add(time.Second * 10) catalog := &catalog{ - streamsByObject: map[string]objectMeta{ - "obj1": {streamIDs: []int64{1, 2}, sections: 3}, - "obj2": {streamIDs: []int64{3, 4}, sections: 2}, + sectionDescriptors: []*metastore.DataobjSectionDescriptor{ + {SectionKey: metastore.SectionKey{ObjectPath: "obj1", SectionIdx: 0}, StreamIDs: []int64{1, 2}, Start: timeStart, End: timeEnd}, + {SectionKey: metastore.SectionKey{ObjectPath: "obj1", SectionIdx: 1}, StreamIDs: []int64{1, 2}, Start: timeStart, End: timeEnd}, + {SectionKey: metastore.SectionKey{ObjectPath: "obj1", SectionIdx: 2}, StreamIDs: []int64{1, 2}, Start: timeStart, End: timeEnd}, + {SectionKey: metastore.SectionKey{ObjectPath: "obj2", SectionIdx: 0}, StreamIDs: []int64{3, 4}, Start: timeStart, End: timeEnd}, + {SectionKey: metastore.SectionKey{ObjectPath: "obj2", SectionIdx: 1}, StreamIDs: []int64{3, 4}, Start: timeStart, End: timeEnd}, }, } - timeStart := time.Now() - timeEnd := timeStart.Add(time.Second * 10) for _, tt := range []struct { shard ShardInfo expDescriptors []FilteredShardDescriptor }{ { shard: ShardInfo{0, 1}, - expDescriptors: []FilteredShardDescriptor{{Location: "obj1", Streams: []int64{1, 2}, Sections: []int{0, 1, 2}, TimeRange: TimeRange{Start: timeStart, End: timeEnd}}, - {Location: "obj2", Streams: []int64{3, 4}, Sections: []int{0, 1}, TimeRange: TimeRange{Start: timeStart, End: timeEnd}}, + expDescriptors: []FilteredShardDescriptor{ + {Location: "obj1", Streams: []int64{1, 2}, Sections: []int{0}, TimeRange: TimeRange{Start: timeStart, End: timeEnd}}, + {Location: "obj1", Streams: []int64{1, 2}, Sections: []int{1}, TimeRange: TimeRange{Start: timeStart, End: timeEnd}}, + {Location: "obj1", Streams: []int64{1, 2}, Sections: []int{2}, TimeRange: TimeRange{Start: timeStart, End: timeEnd}}, + {Location: "obj2", Streams: []int64{3, 4}, Sections: []int{0}, TimeRange: TimeRange{Start: timeStart, End: timeEnd}}, + {Location: "obj2", Streams: []int64{3, 4}, Sections: []int{1}, TimeRange: TimeRange{Start: timeStart, End: timeEnd}}, }, }, { shard: ShardInfo{0, 4}, - expDescriptors: []FilteredShardDescriptor{{Location: "obj1", Streams: []int64{1, 2}, Sections: []int{0}, TimeRange: TimeRange{Start: timeStart, End: timeEnd}}, - {Location: "obj2", Streams: []int64{3, 4}, Sections: []int{1}, TimeRange: TimeRange{Start: timeStart, End: timeEnd}}, + expDescriptors: []FilteredShardDescriptor{ + {Location: "obj1", Streams: []int64{1, 2}, Sections: []int{0}, TimeRange: TimeRange{Start: timeStart, End: timeEnd}}, + {Location: "obj2", Streams: []int64{3, 4}, Sections: []int{0}, TimeRange: TimeRange{Start: timeStart, End: timeEnd}}, }, }, { - shard: ShardInfo{1, 4}, - expDescriptors: []FilteredShardDescriptor{{Location: "obj1", Streams: []int64{1, 2}, Sections: []int{1}, TimeRange: TimeRange{Start: timeStart, End: timeEnd}}}, + shard: ShardInfo{1, 4}, + expDescriptors: []FilteredShardDescriptor{ + {Location: "obj1", Streams: []int64{1, 2}, Sections: []int{1}, TimeRange: TimeRange{Start: timeStart, End: timeEnd}}, + {Location: "obj2", Streams: []int64{3, 4}, Sections: []int{1}, TimeRange: TimeRange{Start: timeStart, End: timeEnd}}, + }, }, { shard: ShardInfo{2, 4}, @@ -89,7 +75,7 @@ func TestMockCatalog(t *testing.T) { }, { shard: ShardInfo{3, 4}, - expDescriptors: []FilteredShardDescriptor{{Location: "obj2", Streams: []int64{3, 4}, Sections: []int{0}, TimeRange: TimeRange{Start: timeStart, End: timeEnd}}}, + expDescriptors: []FilteredShardDescriptor{}, }, } { t.Run("shard "+tt.shard.String(), func(t *testing.T) { @@ -133,13 +119,20 @@ func sections(t *testing.T, plan *Plan, nodes []Node) [][]int { } func TestPlanner_ConvertMaketable(t *testing.T) { + timeStart := time.Now() + timeEnd := timeStart.Add(time.Second * 10) catalog := &catalog{ - streamsByObject: map[string]objectMeta{ - "obj1": {streamIDs: []int64{1, 2}, sections: 2}, - "obj2": {streamIDs: []int64{3, 4}, sections: 2}, - "obj3": {streamIDs: []int64{5, 1}, sections: 2}, - "obj4": {streamIDs: []int64{2, 3}, sections: 2}, - "obj5": {streamIDs: []int64{4, 5}, sections: 2}, + sectionDescriptors: []*metastore.DataobjSectionDescriptor{ + {SectionKey: metastore.SectionKey{ObjectPath: "obj1", SectionIdx: 0}, StreamIDs: []int64{1, 2}, Start: timeStart, End: timeEnd}, + {SectionKey: metastore.SectionKey{ObjectPath: "obj1", SectionIdx: 1}, StreamIDs: []int64{1, 2}, Start: timeStart, End: timeEnd}, + {SectionKey: metastore.SectionKey{ObjectPath: "obj2", SectionIdx: 0}, StreamIDs: []int64{3, 4}, Start: timeStart, End: timeEnd}, + {SectionKey: metastore.SectionKey{ObjectPath: "obj2", SectionIdx: 1}, StreamIDs: []int64{3, 4}, Start: timeStart, End: timeEnd}, + {SectionKey: metastore.SectionKey{ObjectPath: "obj3", SectionIdx: 0}, StreamIDs: []int64{5, 1}, Start: timeStart, End: timeEnd}, + {SectionKey: metastore.SectionKey{ObjectPath: "obj3", SectionIdx: 1}, StreamIDs: []int64{5, 1}, Start: timeStart, End: timeEnd}, + {SectionKey: metastore.SectionKey{ObjectPath: "obj4", SectionIdx: 0}, StreamIDs: []int64{2, 3}, Start: timeStart, End: timeEnd}, + {SectionKey: metastore.SectionKey{ObjectPath: "obj4", SectionIdx: 1}, StreamIDs: []int64{2, 3}, Start: timeStart, End: timeEnd}, + {SectionKey: metastore.SectionKey{ObjectPath: "obj5", SectionIdx: 0}, StreamIDs: []int64{4, 5}, Start: timeStart, End: timeEnd}, + {SectionKey: metastore.SectionKey{ObjectPath: "obj5", SectionIdx: 1}, StreamIDs: []int64{4, 5}, Start: timeStart, End: timeEnd}, }, } planner := NewPlanner(NewContext(time.Now(), time.Now()), catalog) @@ -176,23 +169,23 @@ func TestPlanner_ConvertMaketable(t *testing.T) { }, { shard: logical.NewShard(0, 4), // shard 1 of 4 - expPaths: []string{"obj1", "obj3", "obj5"}, - expSections: [][]int{{0}, {0}, {0}}, + expPaths: []string{"obj1", "obj2", "obj3", "obj4", "obj5"}, + expSections: [][]int{{0}, {0}, {0}, {0}, {0}}, }, { shard: logical.NewShard(1, 4), // shard 2 of 4 - expPaths: []string{"obj1", "obj3", "obj5"}, - expSections: [][]int{{1}, {1}, {1}}, + expPaths: []string{"obj1", "obj2", "obj3", "obj4", "obj5"}, + expSections: [][]int{{1}, {1}, {1}, {1}, {1}}, }, { shard: logical.NewShard(2, 4), // shard 3 of 4 - expPaths: []string{"obj2", "obj4"}, - expSections: [][]int{{0}, {0}}, + expPaths: []string{}, + expSections: [][]int{}, }, { shard: logical.NewShard(3, 4), // shard 4 of 4 - expPaths: []string{"obj2", "obj4"}, - expSections: [][]int{{1}, {1}}, + expPaths: []string{}, + expSections: [][]int{}, }, } { t.Run("shard "+tt.shard.String(), func(t *testing.T) { @@ -201,8 +194,6 @@ func TestPlanner_ConvertMaketable(t *testing.T) { Shard: tt.shard, } planner.reset() - timeStart := time.Now() - timeEnd := timeStart.Add(time.Second * 10) nodes, err := planner.processMakeTable(relation, NewContext(timeStart, timeEnd)) require.NoError(t, err) require.ElementsMatch(t, tt.expPaths, locations(t, planner.plan, nodes)) @@ -244,10 +235,12 @@ func TestPlanner_Convert(t *testing.T) { logicalPlan, err := b.ToPlan() require.NoError(t, err) + timeStart := time.Now() + timeEnd := timeStart.Add(time.Second * 10) catalog := &catalog{ - streamsByObject: map[string]objectMeta{ - "obj1": {streamIDs: []int64{1, 2}, sections: 3}, - "obj2": {streamIDs: []int64{3, 4}, sections: 1}, + sectionDescriptors: []*metastore.DataobjSectionDescriptor{ + {SectionKey: metastore.SectionKey{ObjectPath: "obj1", SectionIdx: 0}, StreamIDs: []int64{1, 2}, Start: timeStart, End: timeEnd}, + {SectionKey: metastore.SectionKey{ObjectPath: "obj2", SectionIdx: 0}, StreamIDs: []int64{3, 4}, Start: timeStart, End: timeEnd}, }, } planner := NewPlanner(NewContext(time.Now(), time.Now()), catalog) @@ -288,8 +281,8 @@ func TestPlanner_Convert_WithParse(t *testing.T) { require.NoError(t, err) catalog := &catalog{ - streamsByObject: map[string]objectMeta{ - "obj1": {streamIDs: []int64{1, 2}, sections: 1}, + sectionDescriptors: []*metastore.DataobjSectionDescriptor{ + {SectionKey: metastore.SectionKey{ObjectPath: "obj1", SectionIdx: 0}, StreamIDs: []int64{1, 2}, Start: time.Now(), End: time.Now().Add(time.Second * 10)}, }, } planner := NewPlanner(NewContext(time.Now(), time.Now()), catalog) @@ -358,8 +351,8 @@ func TestPlanner_Convert_WithParse(t *testing.T) { require.NoError(t, err) catalog := &catalog{ - streamsByObject: map[string]objectMeta{ - "obj1": {streamIDs: []int64{1, 2}, sections: 1}, + sectionDescriptors: []*metastore.DataobjSectionDescriptor{ + {SectionKey: metastore.SectionKey{ObjectPath: "obj1", SectionIdx: 0}, StreamIDs: []int64{1, 2}, Start: start, End: end}, }, } planner := NewPlanner(NewContext(start, end), catalog) @@ -429,13 +422,15 @@ func TestPlanner_Convert_RangeAggregations(t *testing.T) { logicalPlan, err := b.ToPlan() require.NoError(t, err) + timeStart := time.Now() + timeEnd := timeStart.Add(time.Second * 10) catalog := &catalog{ - streamsByObject: map[string]objectMeta{ - "obj1": {streamIDs: []int64{1, 2}, sections: 3}, - "obj2": {streamIDs: []int64{3, 4}, sections: 1}, + sectionDescriptors: []*metastore.DataobjSectionDescriptor{ + {SectionKey: metastore.SectionKey{ObjectPath: "obj1", SectionIdx: 3}, StreamIDs: []int64{1, 2}, Start: timeStart, End: timeEnd}, + {SectionKey: metastore.SectionKey{ObjectPath: "obj2", SectionIdx: 1}, StreamIDs: []int64{3, 4}, Start: timeStart, End: timeEnd}, }, } - planner := NewPlanner(NewContext(time.Now(), time.Now()), catalog) + planner := NewPlanner(NewContext(timeStart, timeEnd), catalog) physicalPlan, err := planner.Build(logicalPlan) require.NoError(t, err) @@ -445,3 +440,96 @@ func TestPlanner_Convert_RangeAggregations(t *testing.T) { require.NoError(t, err) t.Logf("Optimized plan\n%s\n", PrintAsTree(physicalPlan)) } + +func TestPlanner_MakeTable_Ordering(t *testing.T) { + // Two separate groups with different timestamps in each group + catalog := &catalog{ + sectionDescriptors: []*metastore.DataobjSectionDescriptor{ + {SectionKey: metastore.SectionKey{ObjectPath: "obj1", SectionIdx: 3}, StreamIDs: []int64{1, 2}, Start: time.Now(), End: time.Now().Add(time.Second * 10)}, + {SectionKey: metastore.SectionKey{ObjectPath: "obj2", SectionIdx: 1}, StreamIDs: []int64{3, 4}, Start: time.Now(), End: time.Now().Add(time.Second * 10)}, + {SectionKey: metastore.SectionKey{ObjectPath: "obj3", SectionIdx: 2}, StreamIDs: []int64{5, 1}, Start: time.Now().Add(-time.Minute), End: time.Now().Add(-time.Minute).Add(time.Second * 10)}, + {SectionKey: metastore.SectionKey{ObjectPath: "obj3", SectionIdx: 3}, StreamIDs: []int64{5, 1}, Start: time.Now().Add(-2 * time.Minute), End: time.Now().Add(-time.Minute).Add(time.Second * 7)}, + }, + } + + // simple logical plan for { app="users" } + b := logical.NewBuilder( + &logical.MakeTable{ + Selector: &logical.BinOp{ + Left: logical.NewColumnRef("app", types.ColumnTypeLabel), + Right: logical.NewLiteral("users"), + Op: types.BinaryOpEq, + }, + Shard: logical.NewShard(0, 1), // no sharding + }, + ) + + logicalPlan, err := b.ToPlan() + require.NoError(t, err) + + t.Run("ascending", func(t *testing.T) { + planner := NewPlanner(NewContext(time.Now(), time.Now()).WithDirection(ASC), catalog) + plan, err := planner.Build(logicalPlan) + require.NoError(t, err) + + expectedPlan := &Plan{} + merge := expectedPlan.addNode(&Merge{id: "merge"}) + sortMerge1 := expectedPlan.addNode(&SortMerge{id: "sortmerge1", Order: ASC, Column: &ColumnExpr{Ref: types.ColumnRef{Column: "timestamp", Type: types.ColumnTypeBuiltin}}}) + sortMerge2 := expectedPlan.addNode(&SortMerge{id: "sortmerge2", Order: ASC, Column: &ColumnExpr{Ref: types.ColumnRef{Column: "timestamp", Type: types.ColumnTypeBuiltin}}}) + scan1 := expectedPlan.addNode(&DataObjScan{id: "scan1", Location: "obj1", Section: 3, StreamIDs: []int64{1, 2}, Direction: ASC}) + scan2 := expectedPlan.addNode(&DataObjScan{id: "scan2", Location: "obj2", Section: 1, StreamIDs: []int64{3, 4}, Direction: ASC}) + scan3 := expectedPlan.addNode(&DataObjScan{id: "scan3", Location: "obj3", Section: 2, StreamIDs: []int64{5, 1}, Direction: ASC}) + scan4 := expectedPlan.addNode(&DataObjScan{id: "scan4", Location: "obj3", Section: 3, StreamIDs: []int64{5, 1}, Direction: ASC}) + + _ = expectedPlan.addEdge(Edge{Parent: merge, Child: sortMerge1}) + _ = expectedPlan.addEdge(Edge{Parent: merge, Child: sortMerge2}) + + // Sort merges should be added in the order of the scan timestamps + _ = expectedPlan.addEdge(Edge{Parent: sortMerge1, Child: scan4}) + _ = expectedPlan.addEdge(Edge{Parent: sortMerge1, Child: scan3}) + _ = expectedPlan.addEdge(Edge{Parent: sortMerge2, Child: scan1}) + _ = expectedPlan.addEdge(Edge{Parent: sortMerge2, Child: scan2}) + + actual := PrintAsTree(plan) + expected := PrintAsTree(expectedPlan) + + pat := regexp.MustCompile("<.+?>") + actual = pat.ReplaceAllString(actual, "") + expected = pat.ReplaceAllString(expected, "") + + require.Equal(t, actual, expected) + }) + + t.Run("descending", func(t *testing.T) { + planner := NewPlanner(NewContext(time.Now(), time.Now()).WithDirection(DESC), catalog) + plan, err := planner.Build(logicalPlan) + require.NoError(t, err) + + expectedPlan := &Plan{} + merge := expectedPlan.addNode(&Merge{id: "merge"}) + sortMerge1 := expectedPlan.addNode(&SortMerge{id: "sortmerge1", Order: DESC, Column: &ColumnExpr{Ref: types.ColumnRef{Column: "timestamp", Type: types.ColumnTypeBuiltin}}}) + sortMerge2 := expectedPlan.addNode(&SortMerge{id: "sortmerge2", Order: DESC, Column: &ColumnExpr{Ref: types.ColumnRef{Column: "timestamp", Type: types.ColumnTypeBuiltin}}}) + scan1 := expectedPlan.addNode(&DataObjScan{id: "scan1", Location: "obj1", Section: 3, StreamIDs: []int64{1, 2}, Direction: DESC}) + scan2 := expectedPlan.addNode(&DataObjScan{id: "scan2", Location: "obj2", Section: 1, StreamIDs: []int64{3, 4}, Direction: DESC}) + scan3 := expectedPlan.addNode(&DataObjScan{id: "scan3", Location: "obj3", Section: 2, StreamIDs: []int64{5, 1}, Direction: DESC}) + scan4 := expectedPlan.addNode(&DataObjScan{id: "scan4", Location: "obj3", Section: 3, StreamIDs: []int64{5, 1}, Direction: DESC}) + + _ = expectedPlan.addEdge(Edge{Parent: merge, Child: sortMerge1}) + _ = expectedPlan.addEdge(Edge{Parent: merge, Child: sortMerge2}) + + // Sort merges should be added in the order of the scan timestamps + _ = expectedPlan.addEdge(Edge{Parent: sortMerge1, Child: scan1}) + _ = expectedPlan.addEdge(Edge{Parent: sortMerge1, Child: scan2}) + _ = expectedPlan.addEdge(Edge{Parent: sortMerge2, Child: scan4}) + _ = expectedPlan.addEdge(Edge{Parent: sortMerge2, Child: scan3}) + + actual := PrintAsTree(plan) + expected := PrintAsTree(expectedPlan) + + pat := regexp.MustCompile("<.+?>") + actual = pat.ReplaceAllString(actual, "") + expected = pat.ReplaceAllString(expected, "") + + require.Equal(t, actual, expected) + }) +} diff --git a/pkg/logql/bench/Makefile b/pkg/logql/bench/Makefile index 458ca17f77..6e6995d670 100644 --- a/pkg/logql/bench/Makefile +++ b/pkg/logql/bench/Makefile @@ -67,9 +67,9 @@ loki: .PHONY: server server: grafana loki -# server-debug will start a local Grafana and Loki with an exposed delve debug port, useful for -# interacting with bench data. The loki proceess will block, but can be killed using the DAP api, -# so this task defines a teardown command to kill grafana once loki is stopped. +# server-debug will start a local Grafana and Loki with an exposed delve debug port, useful for +# interacting with bench data. The loki proceess will block, but can be killed using the DAP api, +# so this task defines a teardown command to kill grafana once loki is stopped. .PHONY: server-debug server-debug: grafana loki-debug # loki will block, so this won't run until we kill the process