chore: compat should also handles metadata collisions (#20005)

pull/19996/head
Ashwanth 1 month ago committed by GitHub
parent e37d83fab1
commit 5bb6b54052
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 115
      pkg/engine/internal/executor/compat.go
  2. 322
      pkg/engine/internal/executor/compat_test.go
  3. 7
      pkg/engine/internal/executor/executor.go
  4. 10
      pkg/engine/internal/planner/physical/compat.go
  5. 5
      pkg/engine/internal/planner/physical/planner.go
  6. 4
      pkg/engine/internal/planner/physical/planner_test.go
  7. 2
      pkg/engine/internal/planner/physical/printer.go
  8. 32
      pkg/engine/internal/planner/planner_test.go
  9. 13
      pkg/engine/internal/proto/physicalpb/marshal_node.go
  10. 318
      pkg/engine/internal/proto/physicalpb/physicalpb.pb.go
  11. 2
      pkg/engine/internal/proto/physicalpb/physicalpb.proto
  12. 2
      pkg/engine/internal/proto/physicalpb/physicalpb_test.go
  13. 12
      pkg/engine/internal/proto/physicalpb/unmarshal_node.go
  14. 2
      pkg/engine/internal/scheduler/wire/wire_http2_test.go

@ -11,6 +11,7 @@ import (
"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical"
"github.com/grafana/loki/v3/pkg/engine/internal/semconv"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
"github.com/grafana/loki/v3/pkg/xcap"
)
@ -30,48 +31,63 @@ func newColumnCompatibilityPipeline(compat *physical.ColumnCompat, input Pipelin
}
// First, find all fields in the schema that have colliding names,
// based on the collision column type and the source column type.
// based on the collision column types and the source column type.
var (
collisionFieldIndices []int
collisionFieldNames []string
sourceFieldIndices []int
sourceFieldNames []string
sourceFieldIndices []int
sourceFieldNames []string
collisionIdxsByName = make(map[string][]int)
)
schema := batch.Schema()
collisionTypes := make(map[types.ColumnType]bool, len(compat.Collisions))
for _, ct := range compat.Collisions {
collisionTypes[ct] = true
}
for idx := range schema.NumFields() {
ident, err := semconv.ParseFQN(schema.Field(idx).Name)
if err != nil {
return nil, err
}
switch ident.ColumnType() {
case compat.Collision:
collisionFieldIndices = append(collisionFieldIndices, idx)
collisionFieldNames = append(collisionFieldNames, ident.ShortName())
case compat.Source:
colType := ident.ColumnType()
if collisionTypes[colType] {
shortName := ident.ShortName()
collisionIdxsByName[shortName] = append(collisionIdxsByName[shortName], idx)
} else if colType == compat.Source {
sourceFieldIndices = append(sourceFieldIndices, idx)
sourceFieldNames = append(sourceFieldNames, ident.ShortName())
}
}
duplicates := findDuplicates(collisionFieldNames, sourceFieldNames)
// Find source columns that have collisions
var duplicates []duplicateColumn
for i, sourceName := range sourceFieldNames {
if collisionIdxs, exists := collisionIdxsByName[sourceName]; exists {
duplicates = append(duplicates, duplicateColumn{
name: sourceName,
collisionIdxs: collisionIdxs,
sourceIdx: sourceFieldIndices[i],
})
}
}
// Return early if there are no colliding column names.
if len(duplicates) == 0 {
return batch, nil
}
// Sort by name for deterministic ordering of _extracted columns
slices.SortStableFunc(duplicates, func(a, b duplicateColumn) int {
return cmp.Compare(a.name, b.name)
})
region.Record(statCompatCollisionFound.Observe(true))
// Next, update the schema with the new columns that have the _extracted suffix.
newSchema := batch.Schema()
duplicateCols := make([]duplicateColumn, 0, len(duplicates))
r := int(batch.NumCols())
for i, duplicate := range duplicates {
collisionFieldIdx := collisionFieldIndices[duplicate.s1Idx]
sourceFieldIdx := sourceFieldIndices[duplicate.s2Idx]
for i := range duplicates {
sourceFieldIdx := duplicates[i].sourceIdx
sourceField := newSchema.Field(sourceFieldIdx)
sourceIdent, err := semconv.ParseFQN(sourceField.Name)
if err != nil {
@ -83,13 +99,7 @@ func newColumnCompatibilityPipeline(compat *physical.ColumnCompat, input Pipelin
if err != nil {
return nil, err
}
duplicateCols = append(duplicateCols, duplicateColumn{
name: duplicate.value,
collisionIdx: collisionFieldIdx,
sourceIdx: sourceFieldIdx,
destinationIdx: r + i,
})
duplicates[i].destinationIdx = r + i
}
// Create a new builder with the updated schema.
@ -104,7 +114,7 @@ func newColumnCompatibilityPipeline(compat *physical.ColumnCompat, input Pipelin
for idx := range schema.NumFields() {
col := batch.Column(idx)
duplicateIdx := slices.IndexFunc(duplicateCols, func(d duplicateColumn) bool { return d.sourceIdx == idx })
duplicateIdx := slices.IndexFunc(duplicates, func(d duplicateColumn) bool { return d.sourceIdx == idx })
// If not a colliding column, just copy over the column data of the original record.
if duplicateIdx < 0 {
@ -115,8 +125,11 @@ func newColumnCompatibilityPipeline(compat *physical.ColumnCompat, input Pipelin
// If the currently processed column is the source field for a colliding column,
// then write non-null values from source column into destination column.
// Also, "clear" the original column value by writing a NULL instead of the original value.
duplicate := duplicateCols[duplicateIdx]
collisionCol := batch.Column(duplicate.collisionIdx)
duplicate := duplicates[duplicateIdx]
collisionCols := make([]arrow.Array, len(duplicate.collisionIdxs))
for i, collIdx := range duplicate.collisionIdxs {
collisionCols[i] = batch.Column(collIdx)
}
switch sourceFieldBuilder := builder.Field(idx).(type) {
case *array.StringBuilder:
@ -125,7 +138,8 @@ func newColumnCompatibilityPipeline(compat *physical.ColumnCompat, input Pipelin
if col.IsNull(i) || !col.IsValid(i) {
sourceFieldBuilder.AppendNull() // append NULL to original column
destinationFieldBuilder.AppendNull() // append NULL to _extracted column
} else if collisionCol.IsNull(i) || !collisionCol.IsValid(i) {
} else if allColumnsNull(collisionCols, i) {
// All collision columns are null for this row, keep value in source column
v := col.(*array.String).Value(i)
sourceFieldBuilder.Append(v) // append value to original column
destinationFieldBuilder.AppendNull() // append NULL to _extracted column
@ -150,52 +164,23 @@ func newColumnCompatibilityPipeline(compat *physical.ColumnCompat, input Pipelin
}, region, input)
}
// duplicate holds indexes to a duplicate values in two slices
type duplicate struct {
value string
s1Idx, s2Idx int
}
// findDuplicates finds strings that appear in both slices and returns
// their indexes in each slice.
// The function assumes that elements in a slices are unique.
func findDuplicates(s1, s2 []string) []duplicate {
if len(s1) == 0 || len(s2) == 0 {
return nil
}
set1 := make(map[string]int)
for i, v := range s1 {
set1[v] = i
}
set2 := make(map[string]int)
for i, v := range s2 {
set2[v] = i
}
// Find duplicates that exist in both slices
var duplicates []duplicate
for value, s1Idx := range set1 {
if s2Idx, exists := set2[value]; exists {
duplicates = append(duplicates, duplicate{
value: value,
s1Idx: s1Idx,
s2Idx: s2Idx,
})
// allColumnsNull returns true if all columns are null or invalid at the given row index.
func allColumnsNull(collisionCols []arrow.Array, rowIdx int) bool {
for _, col := range collisionCols {
if !col.IsNull(rowIdx) && col.IsValid(rowIdx) {
return false
}
}
slices.SortStableFunc(duplicates, func(a, b duplicate) int { return cmp.Compare(a.value, b.value) })
return duplicates
return true
}
// duplicateColumn holds indexes to fields/columns in an [*arrow.Schema].
type duplicateColumn struct {
// name is the duplicate column name
name string
// collisionIdx is the index of the collision column
collisionIdx int
// collisionIdxs holds the indices of ALL collision columns with this name.
// Multiple collision types (e.g., label and metadata) can have columns with the same short name.
collisionIdxs []int
// sourceIdx is the index of the source column
sourceIdx int
// destinationIdx is the index of the destination column

@ -2,8 +2,6 @@ package executor
import (
"errors"
"slices"
"strings"
"testing"
"time"
@ -17,158 +15,6 @@ import (
"github.com/grafana/loki/v3/pkg/util/arrowtest"
)
func TestFindDuplicates(t *testing.T) {
tests := []struct {
name string
slice1 []string
slice2 []string
expected []duplicate
}{
{
name: "empty slices",
slice1: []string{},
slice2: []string{},
expected: nil,
},
{
name: "first slice empty",
slice1: []string{},
slice2: []string{"a", "b", "c"},
expected: nil,
},
{
name: "second slice empty",
slice1: []string{"a", "b", "c"},
slice2: []string{},
expected: nil,
},
{
name: "no duplicates",
slice1: []string{"a", "b", "c"},
slice2: []string{"d", "e", "f"},
expected: nil,
},
{
name: "single duplicate",
slice1: []string{"a", "b", "c"},
slice2: []string{"c", "d", "e"},
expected: []duplicate{
{
value: "c",
s1Idx: 2,
s2Idx: 0,
},
},
},
{
name: "multiple duplicates",
slice1: []string{"a", "b", "c", "d"},
slice2: []string{"c", "d", "e", "f"},
expected: []duplicate{
{
value: "c",
s1Idx: 2,
s2Idx: 0,
},
{
value: "d",
s1Idx: 3,
s2Idx: 1,
},
},
},
{
name: "duplicate with different positions",
slice1: []string{"x", "y", "z"},
slice2: []string{"z", "y", "x"},
expected: []duplicate{
{
value: "x",
s1Idx: 0,
s2Idx: 2,
},
{
value: "y",
s1Idx: 1,
s2Idx: 1,
},
{
value: "z",
s1Idx: 2,
s2Idx: 0,
},
},
},
{
name: "identical slices",
slice1: []string{"a", "b", "c"},
slice2: []string{"a", "b", "c"},
expected: []duplicate{
{
value: "a",
s1Idx: 0,
s2Idx: 0,
},
{
value: "b",
s1Idx: 1,
s2Idx: 1,
},
{
value: "c",
s1Idx: 2,
s2Idx: 2,
},
},
},
{
name: "duplicate values in first slice - function assumes unique elements",
slice1: []string{"a", "b", "a"}, // Note: function assumes unique elements
slice2: []string{"a", "c"},
expected: []duplicate{
{
value: "a",
s1Idx: 2,
s2Idx: 0, // Will use the last occurrence index due to map overwrite
},
},
},
{
name: "duplicate values in second slice - function assumes unique elements",
slice1: []string{"a", "b"},
slice2: []string{"a", "c", "a"}, // Note: function assumes unique elements
expected: []duplicate{
{
value: "a",
s1Idx: 0, // Will use the last occurrence index due to map overwrite
s2Idx: 2,
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := findDuplicates(tt.slice1, tt.slice2)
// Sort both expected and result slices by value for consistent comparison
// since the order of duplicates in the result is not guaranteed
sortDuplicatesByValue(result)
sortDuplicatesByValue(tt.expected)
require.Equal(t, tt.expected, result)
})
}
}
// sortDuplicatesByValue sorts a slice of duplicate structs by their value field
func sortDuplicatesByValue(duplicates []duplicate) {
if len(duplicates) <= 1 {
return
}
slices.SortStableFunc(duplicates, func(a, b duplicate) int { return strings.Compare(a.value, b.value) })
}
func TestNewColumnCompatibilityPipeline(t *testing.T) {
tests := []struct {
name string
@ -183,7 +29,7 @@ func TestNewColumnCompatibilityPipeline(t *testing.T) {
{
name: "no column collisions - returns early",
compat: &physical.ColumnCompat{
Collision: types.ColumnTypeLabel,
Collisions: []types.ColumnType{types.ColumnTypeLabel},
Source: types.ColumnTypeMetadata,
Destination: types.ColumnTypeMetadata,
},
@ -215,7 +61,7 @@ func TestNewColumnCompatibilityPipeline(t *testing.T) {
{
name: "single column collision - string type",
compat: &physical.ColumnCompat{
Collision: types.ColumnTypeLabel,
Collisions: []types.ColumnType{types.ColumnTypeLabel},
Source: types.ColumnTypeMetadata,
Destination: types.ColumnTypeMetadata,
},
@ -248,7 +94,7 @@ func TestNewColumnCompatibilityPipeline(t *testing.T) {
{
name: "multiple column collisions",
compat: &physical.ColumnCompat{
Collision: types.ColumnTypeLabel,
Collisions: []types.ColumnType{types.ColumnTypeLabel},
Source: types.ColumnTypeMetadata,
Destination: types.ColumnTypeMetadata,
},
@ -300,7 +146,7 @@ func TestNewColumnCompatibilityPipeline(t *testing.T) {
{
name: "collision with null values in collision column",
compat: &physical.ColumnCompat{
Collision: types.ColumnTypeLabel,
Collisions: []types.ColumnType{types.ColumnTypeLabel},
Source: types.ColumnTypeMetadata,
Destination: types.ColumnTypeMetadata,
},
@ -333,7 +179,7 @@ func TestNewColumnCompatibilityPipeline(t *testing.T) {
{
name: "collision with null values in source column",
compat: &physical.ColumnCompat{
Collision: types.ColumnTypeLabel,
Collisions: []types.ColumnType{types.ColumnTypeLabel},
Source: types.ColumnTypeMetadata,
Destination: types.ColumnTypeMetadata,
},
@ -360,7 +206,7 @@ func TestNewColumnCompatibilityPipeline(t *testing.T) {
{
name: "multiple batches with collisions",
compat: &physical.ColumnCompat{
Collision: types.ColumnTypeLabel,
Collisions: []types.ColumnType{types.ColumnTypeLabel},
Source: types.ColumnTypeMetadata,
Destination: types.ColumnTypeMetadata,
},
@ -395,7 +241,7 @@ func TestNewColumnCompatibilityPipeline(t *testing.T) {
{
name: "empty batch does not add _extracted column",
compat: &physical.ColumnCompat{
Collision: types.ColumnTypeLabel,
Collisions: []types.ColumnType{types.ColumnTypeLabel},
Source: types.ColumnTypeMetadata,
Destination: types.ColumnTypeMetadata,
},
@ -417,7 +263,7 @@ func TestNewColumnCompatibilityPipeline(t *testing.T) {
{
name: "non-string column types - should copy through unchanged",
compat: &physical.ColumnCompat{
Collision: types.ColumnTypeLabel,
Collisions: []types.ColumnType{types.ColumnTypeLabel},
Source: types.ColumnTypeMetadata,
Destination: types.ColumnTypeMetadata,
},
@ -464,6 +310,152 @@ func TestNewColumnCompatibilityPipeline(t *testing.T) {
},
},
},
{
name: "multiple collision types - label and metadata",
compat: &physical.ColumnCompat{
Collisions: []types.ColumnType{types.ColumnTypeLabel, types.ColumnTypeMetadata},
Source: types.ColumnTypeParsed,
Destination: types.ColumnTypeParsed,
},
schema: arrow.NewSchema([]arrow.Field{
semconv.FieldFromFQN("utf8.builtin.message", true),
// Collision columns from Label type
semconv.FieldFromFQN("utf8.label.status", true),
semconv.FieldFromFQN("utf8.label.level", true),
// Collision columns from Metadata type
semconv.FieldFromFQN("utf8.metadata.status", true),
semconv.FieldFromFQN("utf8.metadata.env", true),
// Source columns (Parsed) that collide with both types
semconv.FieldFromFQN("utf8.parsed.status", true), // collides with both label.status and metadata.status
semconv.FieldFromFQN("utf8.parsed.level", true), // collides with label.level
semconv.FieldFromFQN("utf8.parsed.env", true), // collides with metadata.env
semconv.FieldFromFQN("utf8.parsed.unique", true), // no collision
}, nil),
inputRows: []arrowtest.Rows{
{
{
"utf8.builtin.message": "test message",
"utf8.label.status": "active",
"utf8.label.level": "debug",
"utf8.metadata.status": "200",
"utf8.metadata.env": "production",
"utf8.parsed.status": "ok",
"utf8.parsed.level": "info",
"utf8.parsed.env": "staging",
"utf8.parsed.unique": "value",
},
{
"utf8.builtin.message": "test message 2",
"utf8.label.status": "inactive",
"utf8.label.level": "debug",
"utf8.metadata.status": "404",
"utf8.metadata.env": "development",
"utf8.parsed.status": "error",
"utf8.parsed.level": "debug",
"utf8.parsed.env": "local",
"utf8.parsed.unique": "another",
},
// no duplicates as collision columns are null
{
"utf8.builtin.message": "test message 3",
"utf8.label.status": nil,
"utf8.label.level": nil,
"utf8.metadata.status": nil,
"utf8.metadata.env": nil,
"utf8.parsed.status": "error",
"utf8.parsed.level": "debug",
"utf8.parsed.env": "local",
"utf8.parsed.unique": "another",
},
{
"utf8.builtin.message": "test message 4",
"utf8.label.status": "inactive",
"utf8.label.level": "debug",
"utf8.metadata.status": nil,
"utf8.metadata.env": nil,
"utf8.parsed.status": "error",
"utf8.parsed.level": "info",
"utf8.parsed.env": "local",
"utf8.parsed.unique": "another",
},
},
},
expectedSchema: arrow.NewSchema([]arrow.Field{
semconv.FieldFromFQN("utf8.builtin.message", true),
semconv.FieldFromFQN("utf8.label.status", true),
semconv.FieldFromFQN("utf8.label.level", true),
semconv.FieldFromFQN("utf8.metadata.status", true),
semconv.FieldFromFQN("utf8.metadata.env", true),
semconv.FieldFromFQN("utf8.parsed.status", true),
semconv.FieldFromFQN("utf8.parsed.level", true),
semconv.FieldFromFQN("utf8.parsed.env", true),
semconv.FieldFromFQN("utf8.parsed.unique", true),
// Extracted columns (sorted by name)
semconv.FieldFromFQN("utf8.parsed.env_extracted", true),
semconv.FieldFromFQN("utf8.parsed.level_extracted", true),
semconv.FieldFromFQN("utf8.parsed.status_extracted", true),
}, nil),
expectedRows: []arrowtest.Rows{
{
{
"utf8.builtin.message": "test message",
"utf8.label.status": "active",
"utf8.label.level": "debug",
"utf8.metadata.status": "200",
"utf8.metadata.env": "production",
"utf8.parsed.status": nil,
"utf8.parsed.level": nil,
"utf8.parsed.env": nil,
"utf8.parsed.unique": "value",
"utf8.parsed.env_extracted": "staging",
"utf8.parsed.level_extracted": "info",
"utf8.parsed.status_extracted": "ok",
},
{
"utf8.builtin.message": "test message 2",
"utf8.label.status": "inactive",
"utf8.label.level": "debug",
"utf8.metadata.status": "404",
"utf8.metadata.env": "development",
"utf8.parsed.status": nil,
"utf8.parsed.level": nil,
"utf8.parsed.env": nil,
"utf8.parsed.unique": "another",
"utf8.parsed.env_extracted": "local",
"utf8.parsed.level_extracted": "debug",
"utf8.parsed.status_extracted": "error",
},
{
"utf8.builtin.message": "test message 3",
"utf8.label.status": nil,
"utf8.label.level": nil,
"utf8.metadata.status": nil,
"utf8.metadata.env": nil,
"utf8.parsed.status": "error",
"utf8.parsed.level": "debug",
"utf8.parsed.env": "local",
"utf8.parsed.unique": "another",
"utf8.parsed.env_extracted": nil,
"utf8.parsed.level_extracted": nil,
"utf8.parsed.status_extracted": nil,
},
{
"utf8.builtin.message": "test message 4",
"utf8.label.status": "inactive",
"utf8.label.level": "debug",
"utf8.metadata.status": nil,
"utf8.metadata.env": nil,
"utf8.parsed.status": nil,
"utf8.parsed.level": nil,
"utf8.parsed.env": "local",
"utf8.parsed.unique": "another",
"utf8.parsed.env_extracted": nil,
"utf8.parsed.level_extracted": "info",
"utf8.parsed.status_extracted": "error",
},
},
},
},
}
for _, tt := range tests {
@ -537,7 +529,7 @@ func TestNewColumnCompatibilityPipeline(t *testing.T) {
func TestNewColumnCompatibilityPipeline_ErrorCases(t *testing.T) {
t.Run("invalid field name in schema", func(t *testing.T) {
compat := &physical.ColumnCompat{
Collision: types.ColumnTypeLabel,
Collisions: []types.ColumnType{types.ColumnTypeLabel},
Source: types.ColumnTypeMetadata,
Destination: types.ColumnTypeMetadata,
}
@ -561,7 +553,7 @@ func TestNewColumnCompatibilityPipeline_ErrorCases(t *testing.T) {
t.Run("input pipeline error", func(t *testing.T) {
compat := &physical.ColumnCompat{
Collision: types.ColumnTypeLabel,
Collisions: []types.ColumnType{types.ColumnTypeLabel},
Source: types.ColumnTypeMetadata,
Destination: types.ColumnTypeMetadata,
}
@ -579,7 +571,7 @@ func TestNewColumnCompatibilityPipeline_ErrorCases(t *testing.T) {
t.Run("non-string collision column should panic", func(t *testing.T) {
compat := &physical.ColumnCompat{
Collision: types.ColumnTypeLabel,
Collisions: []types.ColumnType{types.ColumnTypeLabel},
Source: types.ColumnTypeMetadata,
Destination: types.ColumnTypeMetadata,
}

@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"strings"
"github.com/go-kit/log"
"github.com/grafana/dskit/user"
@ -500,10 +501,14 @@ func startRegionForNode(ctx context.Context, n physical.Node) (context.Context,
)
case *physical.ColumnCompat:
collisionStrs := make([]string, len(n.Collisions))
for i, ct := range n.Collisions {
collisionStrs[i] = ct.String()
}
attributes = append(attributes,
attribute.String("src", n.Source.String()),
attribute.String("dst", n.Destination.String()),
attribute.String("collision", n.Collision.String()),
attribute.String("collisions", fmt.Sprintf("[%s]", strings.Join(collisionStrs, ", "))),
)
case *physical.ScanSet:

@ -1,6 +1,8 @@
package physical
import (
"slices"
"github.com/oklog/ulid/v2"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
@ -12,9 +14,9 @@ type ColumnCompat struct {
NodeID ulid.ULID
// TODO(chaudum): These fields are poorly named. Come up with more descriptive names.
Source types.ColumnType // column type of the column that may colide with columns of the same name but with collision type
Destination types.ColumnType // column type of the generated _extracted column (should be same as source)
Collision types.ColumnType // column type of the column that a source type column may collide with
Source types.ColumnType // column type of the column that may colide with columns of the same name but with collision type
Destination types.ColumnType // column type of the generated _extracted column (should be same as source)
Collisions []types.ColumnType // column types of the columns that a source type column may collide with
}
// ID implements the [Node] interface.
@ -28,7 +30,7 @@ func (m *ColumnCompat) Clone() Node {
Source: m.Source,
Destination: m.Destination,
Collision: m.Collision,
Collisions: slices.Clone(m.Collisions),
}
}

@ -237,7 +237,7 @@ func (p *Planner) processMakeTable(lp *logical.MakeTable, ctx *Context) (Node, e
Source: types.ColumnTypeMetadata,
Destination: types.ColumnTypeMetadata,
Collision: types.ColumnTypeLabel,
Collisions: []types.ColumnType{types.ColumnTypeLabel},
}
base, err = p.wrapNodeWith(base, compat)
if err != nil {
@ -370,7 +370,8 @@ func (p *Planner) processProjection(lp *logical.Projection, ctx *Context) (Node,
Source: types.ColumnTypeParsed,
Destination: types.ColumnTypeParsed,
Collision: types.ColumnTypeLabel,
// Check for collisions against both label and metadata columns.
Collisions: []types.ColumnType{types.ColumnTypeLabel, types.ColumnTypeMetadata},
}
var err error
node, err = p.wrapNodeWith(node, compat)

@ -776,7 +776,7 @@ func TestPlanner_MakeTable_Ordering(t *testing.T) {
expectedPlan := &Plan{}
parallelize := expectedPlan.graph.Add(&Parallelize{})
compat := expectedPlan.graph.Add(&ColumnCompat{Source: types.ColumnTypeMetadata, Destination: types.ColumnTypeMetadata, Collision: types.ColumnTypeLabel})
compat := expectedPlan.graph.Add(&ColumnCompat{Source: types.ColumnTypeMetadata, Destination: types.ColumnTypeMetadata, Collisions: []types.ColumnType{types.ColumnTypeLabel}})
scanSet := expectedPlan.graph.Add(&ScanSet{
// Targets should be added in the order of the scan timestamps
// ASC => oldest to newest
@ -816,7 +816,7 @@ func TestPlanner_MakeTable_Ordering(t *testing.T) {
expectedPlan := &Plan{}
parallelize := expectedPlan.graph.Add(&Parallelize{})
compat := expectedPlan.graph.Add(&ColumnCompat{Source: types.ColumnTypeMetadata, Destination: types.ColumnTypeMetadata, Collision: types.ColumnTypeLabel})
compat := expectedPlan.graph.Add(&ColumnCompat{Source: types.ColumnTypeMetadata, Destination: types.ColumnTypeMetadata, Collisions: []types.ColumnType{types.ColumnTypeLabel}})
scanSet := expectedPlan.graph.Add(&ScanSet{
// Targets should be added in the order of the scan timestamps
Targets: []*ScanTarget{

@ -88,7 +88,7 @@ func toTreeNode(n Node) *tree.Node {
treeNode.Properties = []tree.Property{
tree.NewProperty("src", false, node.Source),
tree.NewProperty("dst", false, node.Destination),
tree.NewProperty("collision", false, node.Collision),
tree.NewProperty("collisions", true, toAnySlice(node.Collisions)...),
}
case *TopK:
treeNode.Properties = []tree.Property{

@ -140,7 +140,7 @@ func TestFullQueryPlanning(t *testing.T) {
TopK sort_by=builtin.timestamp ascending=false nulls_first=false k=1000
Parallelize
TopK sort_by=builtin.timestamp ascending=false nulls_first=false k=1000
Compat src=metadata dst=metadata collision=label
Compat src=metadata dst=metadata collisions=(label)
ScanSet num_targets=2 predicate[0]=GTE(builtin.timestamp, 2025-01-01T00:00:00Z) predicate[1]=LT(builtin.timestamp, 2025-01-01T01:00:00Z)
@target type=ScanTypeDataObject location=objects/00/0000000000.dataobj streams=5 section_id=1 projections=()
@target type=ScanTypeDataObject location=objects/00/0000000000.dataobj streams=5 section_id=0 projections=()
@ -154,7 +154,7 @@ TopK sort_by=builtin.timestamp ascending=false nulls_first=false k=1000
Parallelize
TopK sort_by=builtin.timestamp ascending=false nulls_first=false k=1000
Filter predicate[0]=EQ(ambiguous.label_foo, "bar")
Compat src=metadata dst=metadata collision=label
Compat src=metadata dst=metadata collisions=(label)
ScanSet num_targets=2 predicate[0]=GTE(builtin.timestamp, 2025-01-01T00:00:00Z) predicate[1]=LT(builtin.timestamp, 2025-01-01T01:00:00Z) predicate[2]=MATCH_STR(builtin.message, "baz")
@target type=ScanTypeDataObject location=objects/00/0000000000.dataobj streams=5 section_id=1 projections=()
@target type=ScanTypeDataObject location=objects/00/0000000000.dataobj streams=5 section_id=0 projections=()
@ -168,9 +168,9 @@ TopK sort_by=builtin.timestamp ascending=false nulls_first=false k=1000
Parallelize
TopK sort_by=builtin.timestamp ascending=false nulls_first=false k=1000
Filter predicate[0]=EQ(ambiguous.level, "error")
Compat src=parsed dst=parsed collision=label
Compat src=parsed dst=parsed collisions=(label, metadata)
Projection all=true expand=(PARSE_LOGFMT(builtin.message, [], false, false))
Compat src=metadata dst=metadata collision=label
Compat src=metadata dst=metadata collisions=(label)
ScanSet num_targets=2 predicate[0]=GTE(builtin.timestamp, 2025-01-01T00:00:00Z) predicate[1]=LT(builtin.timestamp, 2025-01-01T01:00:00Z) predicate[2]=MATCH_STR(builtin.message, "bar")
@target type=ScanTypeDataObject location=objects/00/0000000000.dataobj streams=5 section_id=1 projections=()
@target type=ScanTypeDataObject location=objects/00/0000000000.dataobj streams=5 section_id=0 projections=()
@ -184,9 +184,9 @@ TopK sort_by=builtin.timestamp ascending=false nulls_first=false k=1000
Parallelize
TopK sort_by=builtin.timestamp ascending=false nulls_first=false k=1000
Projection all=true drop=(ambiguous.service_name, ambiguous.__error__)
Compat src=parsed dst=parsed collision=label
Compat src=parsed dst=parsed collisions=(label, metadata)
Projection all=true expand=(PARSE_LOGFMT(builtin.message, [], false, false))
Compat src=metadata dst=metadata collision=label
Compat src=metadata dst=metadata collisions=(label)
ScanSet num_targets=2 predicate[0]=GTE(builtin.timestamp, 2025-01-01T00:00:00Z) predicate[1]=LT(builtin.timestamp, 2025-01-01T01:00:00Z)
@target type=ScanTypeDataObject location=objects/00/0000000000.dataobj streams=5 section_id=1 projections=()
@target type=ScanTypeDataObject location=objects/00/0000000000.dataobj streams=5 section_id=0 projections=()
@ -205,9 +205,9 @@ VectorAggregation operation=sum group_by=(ambiguous.bar)
Parallelize
Projection all=true expand=(CAST_DURATION(ambiguous.request_duration))
Filter predicate[0]=NEQ(ambiguous.request_duration, "")
Compat src=parsed dst=parsed collision=label
Compat src=parsed dst=parsed collisions=(label, metadata)
Projection all=true expand=(PARSE_LOGFMT(builtin.message, [bar, request_duration], false, false))
Compat src=metadata dst=metadata collision=label
Compat src=metadata dst=metadata collisions=(label)
ScanSet num_targets=2 projections=(ambiguous.bar, builtin.message, ambiguous.request_duration, builtin.timestamp) predicate[0]=GTE(builtin.timestamp, 2024-12-31T23:59:00Z) predicate[1]=LT(builtin.timestamp, 2025-01-01T01:00:00Z)
@target type=ScanTypeDataObject location=objects/00/0000000000.dataobj streams=5 section_id=1 projections=()
@target type=ScanTypeDataObject location=objects/00/0000000000.dataobj streams=5 section_id=0 projections=()
@ -221,12 +221,12 @@ VectorAggregation operation=sum
RangeAggregation operation=count start=2025-01-01T00:00:00Z end=2025-01-01T01:00:00Z step=0s range=1m0s
Parallelize
Projection all=true drop=(ambiguous.__error__, ambiguous.__error_details__)
Compat src=parsed dst=parsed collision=label
Compat src=parsed dst=parsed collisions=(label, metadata)
Projection all=true expand=(PARSE_JSON(builtin.message, [], false, false))
Compat src=parsed dst=parsed collision=label
Compat src=parsed dst=parsed collisions=(label, metadata)
Projection all=true expand=(PARSE_LOGFMT(builtin.message, [], false, false))
Filter predicate[0]=EQ(ambiguous.detected_level, "error")
Compat src=metadata dst=metadata collision=label
Compat src=metadata dst=metadata collisions=(label)
ScanSet num_targets=2 projections=(ambiguous.detected_level, builtin.message, builtin.timestamp) predicate[0]=GTE(builtin.timestamp, 2024-12-31T23:59:00Z) predicate[1]=LT(builtin.timestamp, 2025-01-01T01:00:00Z)
@target type=ScanTypeDataObject location=objects/00/0000000000.dataobj streams=5 section_id=1 projections=()
@target type=ScanTypeDataObject location=objects/00/0000000000.dataobj streams=5 section_id=0 projections=()
@ -241,7 +241,7 @@ VectorAggregation operation=sum group_by=(ambiguous.bar)
Projection all=true expand=(DIV(generated.value, 300))
RangeAggregation operation=count start=2025-01-01T00:00:00Z end=2025-01-01T01:00:00Z step=0s range=1m0s partition_by=(ambiguous.bar)
Parallelize
Compat src=metadata dst=metadata collision=label
Compat src=metadata dst=metadata collisions=(label)
ScanSet num_targets=2 projections=(ambiguous.bar, builtin.timestamp) predicate[0]=GTE(builtin.timestamp, 2024-12-31T23:59:00Z) predicate[1]=LT(builtin.timestamp, 2025-01-01T01:00:00Z)
@target type=ScanTypeDataObject location=objects/00/0000000000.dataobj streams=5 section_id=1 projections=()
@target type=ScanTypeDataObject location=objects/00/0000000000.dataobj streams=5 section_id=0 projections=()
@ -254,9 +254,9 @@ VectorAggregation operation=sum group_by=(ambiguous.bar)
TopK sort_by=builtin.timestamp ascending=false nulls_first=false k=1000
Parallelize
TopK sort_by=builtin.timestamp ascending=false nulls_first=false k=1000
Compat src=parsed dst=parsed collision=label
Compat src=parsed dst=parsed collisions=(label, metadata)
Projection all=true expand=(PARSE_LOGFMT(builtin.message, [], false, false))
Compat src=metadata dst=metadata collision=label
Compat src=metadata dst=metadata collisions=(label)
ScanSet num_targets=2 predicate[0]=GTE(builtin.timestamp, 2025-01-01T00:00:00Z) predicate[1]=LT(builtin.timestamp, 2025-01-01T01:00:00Z)
@target type=ScanTypeDataObject location=objects/00/0000000000.dataobj streams=5 section_id=1 projections=()
@target type=ScanTypeDataObject location=objects/00/0000000000.dataobj streams=5 section_id=0 projections=()
@ -269,9 +269,9 @@ TopK sort_by=builtin.timestamp ascending=false nulls_first=false k=1000
VectorAggregation operation=sum group_by=(ambiguous.bar)
RangeAggregation operation=count start=2025-01-01T00:00:00Z end=2025-01-01T01:00:00Z step=0s range=1m0s partition_by=(ambiguous.bar)
Parallelize
Compat src=parsed dst=parsed collision=label
Compat src=parsed dst=parsed collisions=(label, metadata)
Projection all=true expand=(PARSE_LOGFMT(builtin.message, [bar], false, false))
Compat src=metadata dst=metadata collision=label
Compat src=metadata dst=metadata collisions=(label)
ScanSet num_targets=2 projections=(ambiguous.bar, builtin.message, builtin.timestamp) predicate[0]=GTE(builtin.timestamp, 2024-12-31T23:59:00Z) predicate[1]=LT(builtin.timestamp, 2025-01-01T01:00:00Z)
@target type=ScanTypeDataObject location=objects/00/0000000000.dataobj streams=5 section_id=1 projections=()
@target type=ScanTypeDataObject location=objects/00/0000000000.dataobj streams=5 section_id=0 projections=()

@ -7,6 +7,7 @@ import (
"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical"
"github.com/grafana/loki/v3/pkg/engine/internal/proto/expressionpb"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
)
type marshaler interface {
@ -231,9 +232,13 @@ func (n *ColumnCompat) MarshalPhysical(nodeID ulid.ULID) (physical.Node, error)
return nil, err
}
collision, err := n.Collision.MarshalType()
if err != nil {
return nil, err
collisions := make([]types.ColumnType, len(n.Collisions))
for i, collision := range n.Collisions {
ct, err := collision.MarshalType()
if err != nil {
return nil, err
}
collisions[i] = ct
}
return &physical.ColumnCompat{
@ -241,7 +246,7 @@ func (n *ColumnCompat) MarshalPhysical(nodeID ulid.ULID) (physical.Node, error)
Source: source,
Destination: destination,
Collision: collision,
Collisions: collisions,
}, nil
}

@ -957,9 +957,9 @@ func (m *Projection) GetDrop() bool {
// ColumnCompat represents a compactibilty operation in the physical plan that
// moves a values from a conflicting metadata column with a label column into a new column suffixed with `_extracted`.
type ColumnCompat struct {
Source expressionpb.ColumnType `protobuf:"varint,1,opt,name=source,proto3,enum=loki.expression.ColumnType" json:"source,omitempty"`
Destination expressionpb.ColumnType `protobuf:"varint,2,opt,name=destination,proto3,enum=loki.expression.ColumnType" json:"destination,omitempty"`
Collision expressionpb.ColumnType `protobuf:"varint,3,opt,name=collision,proto3,enum=loki.expression.ColumnType" json:"collision,omitempty"`
Source expressionpb.ColumnType `protobuf:"varint,1,opt,name=source,proto3,enum=loki.expression.ColumnType" json:"source,omitempty"`
Destination expressionpb.ColumnType `protobuf:"varint,2,opt,name=destination,proto3,enum=loki.expression.ColumnType" json:"destination,omitempty"`
Collisions []expressionpb.ColumnType `protobuf:"varint,3,rep,packed,name=collisions,proto3,enum=loki.expression.ColumnType" json:"collisions,omitempty"`
}
func (m *ColumnCompat) Reset() { *m = ColumnCompat{} }
@ -1008,11 +1008,11 @@ func (m *ColumnCompat) GetDestination() expressionpb.ColumnType {
return expressionpb.COLUMN_TYPE_INVALID
}
func (m *ColumnCompat) GetCollision() expressionpb.ColumnType {
func (m *ColumnCompat) GetCollisions() []expressionpb.ColumnType {
if m != nil {
return m.Collision
return m.Collisions
}
return expressionpb.COLUMN_TYPE_INVALID
return nil
}
// TopK represents a physical plan node that performs topK operation.
@ -1329,102 +1329,102 @@ func init() {
}
var fileDescriptor_d1fdbb44b95b211f = []byte{
// 1516 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x57, 0x4f, 0x6f, 0xdb, 0xc8,
0x15, 0x27, 0xf5, 0x5f, 0x4f, 0xb6, 0x97, 0x9d, 0x66, 0x13, 0xda, 0xd9, 0xd0, 0x59, 0x9d, 0x12,
0xb7, 0x95, 0x9a, 0x18, 0x6d, 0x91, 0x6d, 0xd7, 0x85, 0xfe, 0xc5, 0x52, 0x9d, 0x48, 0xc6, 0x48,
0x31, 0xb6, 0xbd, 0xa8, 0x23, 0x72, 0x4c, 0x33, 0xa6, 0x48, 0x82, 0xa4, 0x0c, 0xbb, 0x97, 0xed,
0xad, 0xe8, 0xa9, 0x7b, 0xec, 0x07, 0xe8, 0xa1, 0xc7, 0x7e, 0x81, 0x02, 0x45, 0x4f, 0x7b, 0xcc,
0x71, 0xd1, 0xc3, 0xb6, 0x71, 0x2e, 0x3d, 0xee, 0x17, 0x28, 0x50, 0xcc, 0x0c, 0x25, 0x52, 0xb2,
0xe4, 0xb5, 0x83, 0xbd, 0x18, 0x33, 0xef, 0xfd, 0x7e, 0x6f, 0xde, 0x7b, 0x7c, 0x7f, 0x2c, 0xf8,
0x89, 0x77, 0x6a, 0x56, 0xa9, 0x63, 0x5a, 0x0e, 0xad, 0x5a, 0x4e, 0x48, 0x7d, 0x87, 0xd8, 0x55,
0xcf, 0x77, 0x43, 0xb7, 0xea, 0x9d, 0x5c, 0x04, 0x96, 0x4e, 0x6c, 0x6f, 0x94, 0x38, 0x56, 0xb8,
0x0e, 0xad, 0xdb, 0xee, 0xa9, 0x55, 0x99, 0x8a, 0xb7, 0xee, 0x98, 0xae, 0xe9, 0x0a, 0x16, 0x3b,
0x09, 0xd0, 0x96, 0x66, 0xba, 0xae, 0x69, 0x53, 0x61, 0x6e, 0x34, 0x39, 0xae, 0x1a, 0x13, 0x9f,
0x84, 0x96, 0xeb, 0x44, 0xfa, 0xed, 0x45, 0x7d, 0x68, 0x8d, 0x69, 0x10, 0x92, 0xb1, 0x17, 0x01,
0x9e, 0xad, 0x76, 0x8e, 0x9e, 0x7b, 0x3e, 0x0d, 0x02, 0xcb, 0x75, 0xbc, 0xd1, 0xdc, 0x25, 0xa2,
0x3e, 0x5e, 0x4d, 0x9d, 0xd8, 0x96, 0xc1, 0xff, 0x08, 0x68, 0xf9, 0xb7, 0x90, 0x39, 0xb4, 0x89,
0x83, 0x1e, 0x43, 0xd6, 0x71, 0x0d, 0x1a, 0xa8, 0xf2, 0xc3, 0xf4, 0xa3, 0xd2, 0xd3, 0xef, 0x57,
0xe6, 0x62, 0xac, 0x74, 0x5d, 0x83, 0x62, 0x81, 0x40, 0x3f, 0x82, 0x2c, 0x35, 0x4c, 0x1a, 0xa8,
0x29, 0x0e, 0xbd, 0xb7, 0x00, 0x65, 0xe6, 0x5a, 0x86, 0x49, 0xb1, 0x40, 0x95, 0x7d, 0x28, 0x4c,
0x45, 0x68, 0x17, 0x72, 0x1e, 0xf1, 0xa9, 0x13, 0xaa, 0xf2, 0x43, 0xf9, 0x51, 0xe9, 0xe9, 0x87,
0x4b, 0x9e, 0xe9, 0x34, 0xeb, 0x99, 0x2f, 0xbf, 0xde, 0x96, 0x70, 0x04, 0x45, 0x4f, 0x20, 0xab,
0x9f, 0x58, 0xb6, 0xa1, 0xa6, 0xbe, 0x9d, 0x23, 0x90, 0xe5, 0xcf, 0x21, 0x27, 0xc4, 0x68, 0x02,
0xd9, 0x33, 0x62, 0x4f, 0x68, 0xf4, 0xe0, 0xbd, 0x0a, 0x8f, 0x9d, 0x5b, 0x38, 0x7b, 0x52, 0x39,
0x64, 0x39, 0x78, 0xf5, 0xa2, 0xd3, 0xac, 0x3f, 0x67, 0xf4, 0x7f, 0x7d, 0xbd, 0xbd, 0x67, 0x5a,
0xe1, 0xc9, 0x64, 0x54, 0xd1, 0xdd, 0x71, 0xd5, 0xf4, 0xc9, 0x31, 0x71, 0x48, 0x95, 0xa1, 0xab,
0x67, 0xbb, 0xd5, 0xeb, 0xb3, 0x5a, 0x61, 0x76, 0xb0, 0x78, 0xad, 0xfc, 0xb7, 0x2c, 0x64, 0x98,
0x07, 0xe8, 0x07, 0x90, 0xb2, 0x8c, 0x9b, 0x44, 0x9b, 0xb2, 0x0c, 0xd4, 0x86, 0x0f, 0x88, 0x69,
0xfa, 0xd4, 0x24, 0x21, 0x1d, 0xfa, 0xc4, 0x31, 0x69, 0x14, 0xf3, 0x83, 0x05, 0x66, 0x6d, 0x8a,
0xc2, 0x0c, 0xd4, 0x96, 0xf0, 0x06, 0x99, 0x93, 0xa0, 0x03, 0x50, 0x62, 0x4b, 0x67, 0x54, 0x0f,
0x5d, 0x5f, 0x4d, 0x73, 0x53, 0xda, 0x2a, 0x53, 0x47, 0x1c, 0xd5, 0x96, 0x70, 0xec, 0x83, 0x10,
0xa1, 0x1f, 0x43, 0x26, 0xd0, 0x89, 0xa3, 0x66, 0xb8, 0x81, 0xad, 0x05, 0x03, 0x4d, 0x12, 0x92,
0xde, 0xe8, 0x75, 0x5f, 0x27, 0x4e, 0x5b, 0xc2, 0x1c, 0x89, 0xaa, 0x90, 0x3b, 0xb6, 0xec, 0x90,
0xfa, 0x6a, 0x76, 0x69, 0xe4, 0xcf, 0xb9, 0xb2, 0x2d, 0xe1, 0x08, 0x86, 0x7e, 0x08, 0x59, 0xdb,
0x1a, 0x5b, 0xa1, 0x9a, 0xe3, 0xf8, 0x3b, 0x0b, 0xf8, 0x17, 0x4c, 0xd7, 0x96, 0xb0, 0x00, 0xa1,
0x9f, 0x03, 0x78, 0xbe, 0xfb, 0x9a, 0xea, 0xac, 0x9f, 0xd4, 0x3c, 0xa7, 0x6c, 0x2e, 0x96, 0xe1,
0x0c, 0xd0, 0x96, 0x70, 0x02, 0x8e, 0xea, 0xb0, 0xae, 0xbb, 0xf6, 0x64, 0xec, 0x0c, 0x75, 0x77,
0xec, 0x91, 0x50, 0x2d, 0x70, 0xfe, 0xfd, 0x05, 0x7e, 0x83, 0x63, 0x1a, 0x1c, 0xd2, 0x96, 0xf0,
0x9a, 0x9e, 0xb8, 0xa3, 0x5d, 0x28, 0xb0, 0x38, 0x87, 0x01, 0x0d, 0xd5, 0x22, 0xa7, 0xdf, 0x5d,
0xa0, 0xb3, 0x74, 0xf4, 0x29, 0x63, 0xe6, 0x03, 0x71, 0x44, 0x3b, 0x90, 0x0d, 0x5d, 0x6f, 0x78,
0xaa, 0x02, 0x67, 0x2c, 0xb6, 0xd8, 0xc0, 0xf5, 0x0e, 0x58, 0x02, 0x43, 0xd7, 0x3b, 0x40, 0x7b,
0x50, 0xf2, 0x88, 0x4f, 0x6c, 0x9b, 0xda, 0xd6, 0xef, 0xa8, 0x5a, 0x5a, 0x9a, 0xf9, 0xc3, 0x18,
0xd1, 0x96, 0x70, 0x92, 0x80, 0x1e, 0x43, 0xe6, 0xb5, 0x6b, 0x39, 0xea, 0xda, 0xd2, 0xa7, 0x7e,
0xe5, 0x5a, 0xfc, 0x5b, 0x31, 0x48, 0x3d, 0x07, 0x99, 0x53, 0xcb, 0x31, 0xca, 0xff, 0x4b, 0xc1,
0xc6, 0x7c, 0x5d, 0xa1, 0x26, 0xac, 0x79, 0xc4, 0x0f, 0x2d, 0x96, 0xb7, 0xe1, 0xe8, 0x22, 0x9a,
0x0d, 0x1f, 0x0b, 0x6b, 0xf1, 0xdc, 0x89, 0x72, 0xd5, 0x9a, 0x09, 0xb8, 0x2f, 0x82, 0x56, 0xbf,
0x40, 0x9f, 0x42, 0xd1, 0xf5, 0xa8, 0x18, 0x7e, 0xbc, 0x9e, 0x37, 0x9e, 0x6e, 0x5f, 0x5b, 0xcf,
0x3d, 0x0f, 0xc7, 0x0c, 0xf4, 0x09, 0x64, 0x83, 0x90, 0xf8, 0x61, 0x54, 0xbf, 0x5b, 0x15, 0x31,
0x38, 0x2b, 0xd3, 0xc1, 0x59, 0x19, 0x4c, 0x07, 0x67, 0xbd, 0xc0, 0x3a, 0xe9, 0x8b, 0x7f, 0x6f,
0xcb, 0x58, 0x50, 0xd0, 0x4f, 0x21, 0x4d, 0x1d, 0x63, 0x56, 0xb8, 0x37, 0x61, 0x32, 0x02, 0xfa,
0x19, 0x64, 0x82, 0x90, 0x7a, 0x51, 0xf5, 0x6e, 0x5e, 0x21, 0x36, 0xa3, 0x59, 0x2e, 0x78, 0x7f,
0x66, 0x3c, 0x4e, 0x40, 0xcf, 0x20, 0x2b, 0xfa, 0x36, 0x77, 0x73, 0xa6, 0x60, 0x94, 0xff, 0x24,
0xc3, 0x07, 0x0b, 0xcd, 0x88, 0x7e, 0x01, 0x05, 0xd3, 0x77, 0x27, 0xde, 0xad, 0x92, 0x9f, 0xe7,
0x94, 0xfa, 0x05, 0xda, 0xbb, 0x9a, 0xf8, 0x87, 0xd7, 0x77, 0xff, 0x5c, 0xe6, 0xcb, 0x7f, 0x49,
0x41, 0x29, 0xd1, 0xdd, 0x68, 0x0b, 0x0a, 0xb6, 0xab, 0x0b, 0x73, 0x6c, 0xa2, 0x15, 0xf1, 0xec,
0x8e, 0x54, 0xc8, 0x07, 0x51, 0x3f, 0xb2, 0x97, 0xd2, 0x78, 0x7a, 0x45, 0x0f, 0x00, 0x82, 0xd0,
0xa7, 0x64, 0x3c, 0xb4, 0x8c, 0x40, 0x4d, 0x3f, 0x4c, 0x3f, 0x4a, 0xe3, 0xa2, 0x90, 0x74, 0x8c,
0x00, 0x35, 0xa0, 0x14, 0x37, 0x67, 0xa0, 0x66, 0x6e, 0x5e, 0x62, 0x31, 0x4b, 0x0c, 0x04, 0x6a,
0x58, 0x3a, 0x09, 0x69, 0xa0, 0x66, 0xb9, 0x8d, 0xfb, 0x57, 0x6c, 0x24, 0xd8, 0x09, 0x38, 0xda,
0x83, 0x8d, 0x31, 0x39, 0x1f, 0xb2, 0xfd, 0x3b, 0x4c, 0x7e, 0x3c, 0x75, 0xb1, 0x41, 0xad, 0xb1,
0xa8, 0x4f, 0xbc, 0x36, 0x26, 0xe7, 0xb3, 0x5b, 0xf9, 0x73, 0x28, 0xce, 0x2e, 0x71, 0xb5, 0xca,
0xef, 0x5d, 0xad, 0xa9, 0x5b, 0x56, 0x6b, 0xb9, 0x05, 0x39, 0x31, 0x50, 0x17, 0xf2, 0x20, 0xdf,
0x2a, 0x0f, 0xe5, 0x27, 0x90, 0xe5, 0x73, 0x16, 0x21, 0xc8, 0x04, 0xa7, 0x96, 0xc7, 0x43, 0x58,
0xc7, 0xfc, 0x8c, 0xee, 0x40, 0xf6, 0x98, 0x86, 0xfa, 0x09, 0xf7, 0x6e, 0x1d, 0x8b, 0x4b, 0xf9,
0x8f, 0x32, 0x40, 0x3c, 0x68, 0xd1, 0xa7, 0x50, 0x8a, 0x9f, 0xb9, 0xd1, 0xfb, 0x49, 0x3c, 0x52,
0x20, 0x4d, 0x6c, 0x9b, 0xf7, 0x79, 0x01, 0xb3, 0x23, 0xba, 0x0b, 0x39, 0x7a, 0xee, 0x91, 0xa8,
0x85, 0x0b, 0x38, 0xba, 0x31, 0x0f, 0x0d, 0xdf, 0x15, 0xfd, 0x59, 0xc0, 0xfc, 0x5c, 0xfe, 0xbb,
0x0c, 0x6b, 0x8d, 0xf9, 0x21, 0x9d, 0x0b, 0xdc, 0x89, 0xaf, 0x8b, 0xdd, 0xbf, 0xb1, 0xc4, 0x11,
0x01, 0x1f, 0x5c, 0x78, 0x14, 0x47, 0x50, 0x16, 0x82, 0x41, 0x83, 0xd0, 0x72, 0x92, 0x5d, 0x73,
0x2d, 0x33, 0x89, 0x47, 0xcf, 0xa0, 0xa8, 0xbb, 0xb6, 0x6d, 0x31, 0x10, 0x0f, 0xe4, 0x5b, 0xc8,
0x31, 0x9a, 0xf5, 0x7f, 0x86, 0xed, 0x00, 0xf4, 0x09, 0xe4, 0x03, 0xd7, 0x0f, 0x45, 0xcf, 0xcb,
0x37, 0xeb, 0x86, 0x1c, 0x63, 0xd4, 0x2f, 0xd0, 0x47, 0x50, 0x24, 0x81, 0x4e, 0x1d, 0xc3, 0x72,
0x4c, 0xee, 0x7c, 0x01, 0xc7, 0x02, 0xb4, 0x0d, 0x25, 0x67, 0x62, 0xdb, 0xc1, 0xf0, 0xd8, 0xf2,
0x83, 0x30, 0x4a, 0x34, 0x70, 0xd1, 0x73, 0x26, 0x41, 0x6b, 0x20, 0x9f, 0xf2, 0x54, 0xa7, 0xb1,
0x7c, 0x5a, 0xfe, 0x87, 0x0c, 0xf9, 0x68, 0x8f, 0xa1, 0x5d, 0xc8, 0x87, 0xc4, 0x37, 0x69, 0x38,
0xfd, 0xac, 0x9b, 0x4b, 0x16, 0xde, 0x80, 0x23, 0xf0, 0x14, 0xb9, 0xd8, 0xdb, 0xa9, 0xef, 0xa0,
0xb7, 0xd3, 0xb7, 0xab, 0xe9, 0x3e, 0x40, 0xec, 0x18, 0xff, 0xb8, 0x24, 0x24, 0x43, 0x77, 0xc4,
0xac, 0xcf, 0x5a, 0xf4, 0xba, 0xff, 0x67, 0xc0, 0x10, 0x57, 0xaa, 0x87, 0xb3, 0x4d, 0xb9, 0x0e,
0xa5, 0xc4, 0xea, 0x2d, 0xe7, 0x20, 0xc3, 0x16, 0xea, 0xce, 0x3f, 0x65, 0x50, 0x16, 0x17, 0x19,
0xd2, 0x60, 0xab, 0xb6, 0xbf, 0x8f, 0x5b, 0xfb, 0xb5, 0x41, 0x6b, 0x88, 0x6b, 0xdd, 0xfd, 0xd6,
0xb0, 0x77, 0x38, 0xec, 0x74, 0x8f, 0x6a, 0x2f, 0x3a, 0x4d, 0x45, 0x42, 0x1f, 0x81, 0xba, 0x44,
0xdf, 0xe8, 0xbd, 0xea, 0x0e, 0x14, 0x19, 0x6d, 0xc1, 0xdd, 0x25, 0xda, 0xfe, 0xab, 0x97, 0x4a,
0x6a, 0x85, 0xee, 0x65, 0xed, 0x33, 0x25, 0xbd, 0x4a, 0xd7, 0xe9, 0x2a, 0x99, 0x15, 0x2f, 0xd6,
0x7f, 0x3d, 0x68, 0xf5, 0x95, 0xec, 0xce, 0x1f, 0xd2, 0xf0, 0xbd, 0x2b, 0x4b, 0x01, 0x6d, 0xc3,
0xfd, 0x98, 0x73, 0xd4, 0x6a, 0x0c, 0x7a, 0x78, 0x3e, 0x8c, 0xfb, 0x70, 0x6f, 0x19, 0x80, 0x79,
0x2a, 0xaf, 0x52, 0x32, 0x57, 0x53, 0x2b, 0x95, 0x9d, 0xae, 0x92, 0x46, 0x0f, 0x60, 0x73, 0x99,
0x52, 0xa4, 0x27, 0xb3, 0x8a, 0x5b, 0x3b, 0xda, 0x57, 0xb2, 0xf3, 0x99, 0x4f, 0xb8, 0x34, 0x68,
0x36, 0x5b, 0x47, 0x4a, 0xee, 0x1a, 0xfd, 0x51, 0x0d, 0x2b, 0xf9, 0x55, 0x31, 0xd7, 0x7b, 0x83,
0x41, 0xef, 0xe5, 0x81, 0x52, 0x98, 0x4f, 0x64, 0x0c, 0x18, 0xf4, 0x0e, 0x0f, 0x94, 0xe2, 0x2a,
0x6d, 0xbf, 0x87, 0x07, 0x0a, 0xa0, 0x8f, 0xe1, 0xc1, 0x2a, 0xed, 0xb0, 0xd9, 0xea, 0x37, 0x94,
0xd2, 0xce, 0x67, 0x50, 0xec, 0xbb, 0x7e, 0xd8, 0xf3, 0x0d, 0xea, 0xa3, 0xbb, 0x80, 0xb8, 0xae,
0x87, 0x9b, 0x2d, 0x9c, 0xc8, 0xbb, 0x0a, 0x77, 0x12, 0xf2, 0x5a, 0xbf, 0xd1, 0xea, 0x36, 0x3b,
0xdd, 0x7d, 0x45, 0x46, 0x9b, 0xf0, 0x61, 0x42, 0xc3, 0x6c, 0x46, 0xaa, 0x54, 0x7d, 0xf2, 0xe6,
0xad, 0x26, 0x7d, 0xf5, 0x56, 0x93, 0xbe, 0x79, 0xab, 0xc9, 0xbf, 0xbf, 0xd4, 0xe4, 0xbf, 0x5e,
0x6a, 0xf2, 0x97, 0x97, 0x9a, 0xfc, 0xe6, 0x52, 0x93, 0xff, 0x73, 0xa9, 0xc9, 0xff, 0xbd, 0xd4,
0xa4, 0x6f, 0x2e, 0x35, 0xf9, 0x8b, 0x77, 0x9a, 0xf4, 0xe6, 0x9d, 0x26, 0x7d, 0xf5, 0x4e, 0x93,
0x7e, 0xf3, 0xcb, 0xf7, 0xfa, 0x65, 0x14, 0xff, 0x78, 0x1e, 0xe5, 0xb8, 0x64, 0xf7, 0xff, 0x01,
0x00, 0x00, 0xff, 0xff, 0xd1, 0xf5, 0x32, 0xc4, 0x76, 0x0f, 0x00, 0x00,
// 1515 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x57, 0x4f, 0x73, 0xe3, 0x48,
0x15, 0x97, 0xfc, 0xdf, 0xcf, 0x49, 0x56, 0x34, 0xd9, 0x8c, 0x92, 0x6c, 0x94, 0xac, 0x4f, 0x33,
0x01, 0x6c, 0x66, 0x52, 0x40, 0xed, 0xc2, 0x86, 0xf2, 0xbf, 0x89, 0x4d, 0x66, 0xec, 0x54, 0xdb,
0x93, 0x5a, 0xb8, 0x98, 0xb6, 0xd4, 0x51, 0x34, 0x91, 0x25, 0x95, 0x24, 0xa7, 0x12, 0x2e, 0xcb,
0x8d, 0xe2, 0xc4, 0x1e, 0xf9, 0x00, 0x1c, 0x38, 0xf2, 0x0d, 0xa0, 0x38, 0xed, 0x71, 0x8e, 0x5b,
0x1c, 0x16, 0x26, 0x73, 0xe1, 0xb8, 0x5f, 0x80, 0x2a, 0xaa, 0xbb, 0x65, 0x5b, 0x76, 0xec, 0x6c,
0xb2, 0xb5, 0x97, 0x54, 0xf7, 0x7b, 0xbf, 0xdf, 0xeb, 0xf7, 0x9e, 0xde, 0x9f, 0x18, 0x7e, 0xe2,
0x5d, 0x98, 0x65, 0xea, 0x98, 0x96, 0x43, 0xcb, 0x96, 0x13, 0x52, 0xdf, 0x21, 0x76, 0xd9, 0xf3,
0xdd, 0xd0, 0x2d, 0x7b, 0xe7, 0xd7, 0x81, 0xa5, 0x13, 0xdb, 0x1b, 0xc4, 0x8e, 0x25, 0xae, 0x43,
0xab, 0xb6, 0x7b, 0x61, 0x95, 0xc6, 0xe2, 0xad, 0x75, 0xd3, 0x35, 0x5d, 0xc1, 0x62, 0x27, 0x01,
0xda, 0xd2, 0x4c, 0xd7, 0x35, 0x6d, 0x2a, 0xcc, 0x0d, 0x46, 0x67, 0x65, 0x63, 0xe4, 0x93, 0xd0,
0x72, 0x9d, 0x48, 0xbf, 0x3b, 0xaf, 0x0f, 0xad, 0x21, 0x0d, 0x42, 0x32, 0xf4, 0x22, 0xc0, 0x47,
0xcb, 0x9d, 0xa3, 0x57, 0x9e, 0x4f, 0x83, 0xc0, 0x72, 0x1d, 0x6f, 0x30, 0x73, 0x89, 0xa8, 0x4f,
0x96, 0x53, 0x47, 0xb6, 0x65, 0xf0, 0x3f, 0x02, 0x5a, 0xfc, 0x2d, 0xa4, 0x4e, 0x6c, 0xe2, 0xa0,
0x27, 0x90, 0x76, 0x5c, 0x83, 0x06, 0xaa, 0xbc, 0x97, 0x7c, 0x5c, 0x78, 0xf6, 0xfd, 0xd2, 0x4c,
0x8c, 0xa5, 0xb6, 0x6b, 0x50, 0x2c, 0x10, 0xe8, 0x47, 0x90, 0xa6, 0x86, 0x49, 0x03, 0x35, 0xc1,
0xa1, 0x8f, 0xe6, 0xa0, 0xcc, 0x5c, 0xc3, 0x30, 0x29, 0x16, 0xa8, 0xa2, 0x0f, 0xb9, 0xb1, 0x08,
0x1d, 0x40, 0xc6, 0x23, 0x3e, 0x75, 0x42, 0x55, 0xde, 0x93, 0x1f, 0x17, 0x9e, 0xbd, 0xbf, 0xe0,
0x99, 0x56, 0xbd, 0x9a, 0xfa, 0xe2, 0xab, 0x5d, 0x09, 0x47, 0x50, 0xf4, 0x14, 0xd2, 0xfa, 0xb9,
0x65, 0x1b, 0x6a, 0xe2, 0x9b, 0x39, 0x02, 0x59, 0xfc, 0x0c, 0x32, 0x42, 0x8c, 0x46, 0x90, 0xbe,
0x24, 0xf6, 0x88, 0x46, 0x0f, 0x3e, 0x2a, 0xf1, 0xd8, 0xb9, 0x85, 0xcb, 0xa7, 0xa5, 0x13, 0x96,
0x83, 0x57, 0x2f, 0x5a, 0xf5, 0xea, 0x73, 0x46, 0xff, 0xd7, 0x57, 0xbb, 0x87, 0xa6, 0x15, 0x9e,
0x8f, 0x06, 0x25, 0xdd, 0x1d, 0x96, 0x4d, 0x9f, 0x9c, 0x11, 0x87, 0x94, 0x19, 0xba, 0x7c, 0x79,
0x50, 0xbe, 0x3b, 0xab, 0x25, 0x66, 0x07, 0x8b, 0xd7, 0x8a, 0x7f, 0x4b, 0x43, 0x8a, 0x79, 0x80,
0x7e, 0x00, 0x09, 0xcb, 0xb8, 0x4f, 0xb4, 0x09, 0xcb, 0x40, 0x4d, 0x78, 0x8f, 0x98, 0xa6, 0x4f,
0x4d, 0x12, 0xd2, 0xbe, 0x4f, 0x1c, 0x93, 0x46, 0x31, 0xef, 0xcc, 0x31, 0x2b, 0x63, 0x14, 0x66,
0xa0, 0xa6, 0x84, 0xd7, 0xc8, 0x8c, 0x04, 0x1d, 0x83, 0x32, 0xb5, 0x74, 0x49, 0xf5, 0xd0, 0xf5,
0xd5, 0x24, 0x37, 0xa5, 0x2d, 0x33, 0x75, 0xca, 0x51, 0x4d, 0x09, 0x4f, 0x7d, 0x10, 0x22, 0xf4,
0x63, 0x48, 0x05, 0x3a, 0x71, 0xd4, 0x14, 0x37, 0xb0, 0x35, 0x67, 0xa0, 0x4e, 0x42, 0xd2, 0x19,
0xbc, 0xee, 0xea, 0xc4, 0x69, 0x4a, 0x98, 0x23, 0x51, 0x19, 0x32, 0x67, 0x96, 0x1d, 0x52, 0x5f,
0x4d, 0x2f, 0x8c, 0xfc, 0x39, 0x57, 0x36, 0x25, 0x1c, 0xc1, 0xd0, 0x0f, 0x21, 0x6d, 0x5b, 0x43,
0x2b, 0x54, 0x33, 0x1c, 0xbf, 0x3e, 0x87, 0x7f, 0xc1, 0x74, 0x4d, 0x09, 0x0b, 0x10, 0xfa, 0x39,
0x80, 0xe7, 0xbb, 0xaf, 0xa9, 0xce, 0xfa, 0x49, 0xcd, 0x72, 0xca, 0xe6, 0x7c, 0x19, 0x4e, 0x00,
0x4d, 0x09, 0xc7, 0xe0, 0xa8, 0x0a, 0xab, 0xba, 0x6b, 0x8f, 0x86, 0x4e, 0x5f, 0x77, 0x87, 0x1e,
0x09, 0xd5, 0x1c, 0xe7, 0x6f, 0xcf, 0xf1, 0x6b, 0x1c, 0x53, 0xe3, 0x90, 0xa6, 0x84, 0x57, 0xf4,
0xd8, 0x1d, 0x1d, 0x40, 0x8e, 0xc5, 0xd9, 0x0f, 0x68, 0xa8, 0xe6, 0x39, 0x7d, 0x63, 0x8e, 0xce,
0xd2, 0xd1, 0xa5, 0x8c, 0x99, 0x0d, 0xc4, 0x11, 0xed, 0x43, 0x3a, 0x74, 0xbd, 0xfe, 0x85, 0x0a,
0x9c, 0x31, 0xdf, 0x62, 0x3d, 0xd7, 0x3b, 0x66, 0x09, 0x0c, 0x5d, 0xef, 0x18, 0x1d, 0x42, 0xc1,
0x23, 0x3e, 0xb1, 0x6d, 0x6a, 0x5b, 0xbf, 0xa3, 0x6a, 0x61, 0x61, 0xe6, 0x4f, 0xa6, 0x88, 0xa6,
0x84, 0xe3, 0x04, 0xf4, 0x04, 0x52, 0xaf, 0x5d, 0xcb, 0x51, 0x57, 0x16, 0x3e, 0xf5, 0x2b, 0xd7,
0xe2, 0xdf, 0x8a, 0x41, 0xaa, 0x19, 0x48, 0x5d, 0x58, 0x8e, 0x51, 0xfc, 0x5f, 0x02, 0xd6, 0x66,
0xeb, 0x0a, 0xd5, 0x61, 0xc5, 0x23, 0x7e, 0x68, 0xb1, 0xbc, 0xf5, 0x07, 0xd7, 0xd1, 0x6c, 0xf8,
0x50, 0x58, 0x9b, 0xce, 0x9d, 0x28, 0x57, 0x8d, 0x89, 0x80, 0xfb, 0x22, 0x68, 0xd5, 0x6b, 0xf4,
0x09, 0xe4, 0x5d, 0x8f, 0x8a, 0xe1, 0xc7, 0xeb, 0x79, 0xed, 0xd9, 0xee, 0x9d, 0xf5, 0xdc, 0xf1,
0xf0, 0x94, 0x81, 0x3e, 0x86, 0x74, 0x10, 0x12, 0x3f, 0x8c, 0xea, 0x77, 0xab, 0x24, 0x06, 0x67,
0x69, 0x3c, 0x38, 0x4b, 0xbd, 0xf1, 0xe0, 0xac, 0xe6, 0x58, 0x27, 0x7d, 0xfe, 0xef, 0x5d, 0x19,
0x0b, 0x0a, 0xfa, 0x29, 0x24, 0xa9, 0x63, 0x4c, 0x0a, 0xf7, 0x3e, 0x4c, 0x46, 0x40, 0x3f, 0x83,
0x54, 0x10, 0x52, 0x2f, 0xaa, 0xde, 0xcd, 0x5b, 0xc4, 0x7a, 0x34, 0xcb, 0x05, 0xef, 0xcf, 0x8c,
0xc7, 0x09, 0xe8, 0x23, 0x48, 0x8b, 0xbe, 0xcd, 0xdc, 0x9f, 0x29, 0x18, 0xc5, 0x3f, 0xc9, 0xf0,
0xde, 0x5c, 0x33, 0xa2, 0x5f, 0x40, 0xce, 0xf4, 0xdd, 0x91, 0xf7, 0xa0, 0xe4, 0x67, 0x39, 0xa5,
0x7a, 0x8d, 0x0e, 0x6f, 0x27, 0x7e, 0xef, 0xee, 0xee, 0x9f, 0xc9, 0x7c, 0xf1, 0x2f, 0x09, 0x28,
0xc4, 0xba, 0x1b, 0x6d, 0x41, 0xce, 0x76, 0x75, 0x61, 0x8e, 0x4d, 0xb4, 0x3c, 0x9e, 0xdc, 0x91,
0x0a, 0xd9, 0x20, 0xea, 0x47, 0xf6, 0x52, 0x12, 0x8f, 0xaf, 0x68, 0x07, 0x20, 0x08, 0x7d, 0x4a,
0x86, 0x7d, 0xcb, 0x08, 0xd4, 0xe4, 0x5e, 0xf2, 0x71, 0x12, 0xe7, 0x85, 0xa4, 0x65, 0x04, 0xa8,
0x06, 0x85, 0x69, 0x73, 0x06, 0x6a, 0xea, 0xfe, 0x25, 0x36, 0x65, 0x89, 0x81, 0x40, 0x0d, 0x4b,
0x27, 0x21, 0x0d, 0xd4, 0x34, 0xb7, 0xb1, 0x7d, 0xcb, 0x46, 0x8c, 0x1d, 0x83, 0xa3, 0x43, 0x58,
0x1b, 0x92, 0xab, 0x3e, 0xdb, 0xbf, 0xfd, 0xf8, 0xc7, 0x53, 0xe7, 0x1b, 0xd4, 0x1a, 0x8a, 0xfa,
0xc4, 0x2b, 0x43, 0x72, 0x35, 0xb9, 0x15, 0x3f, 0x83, 0xfc, 0xe4, 0x32, 0xad, 0x56, 0xf9, 0x5b,
0x57, 0x6b, 0xe2, 0x81, 0xd5, 0x5a, 0x6c, 0x40, 0x46, 0x0c, 0xd4, 0xb9, 0x3c, 0xc8, 0x0f, 0xca,
0x43, 0xf1, 0x29, 0xa4, 0xf9, 0x9c, 0x45, 0x08, 0x52, 0xc1, 0x85, 0xe5, 0xf1, 0x10, 0x56, 0x31,
0x3f, 0xa3, 0x75, 0x48, 0x9f, 0xd1, 0x50, 0x3f, 0xe7, 0xde, 0xad, 0x62, 0x71, 0x29, 0xfe, 0x51,
0x06, 0x98, 0x0e, 0x5a, 0xf4, 0x09, 0x14, 0xa6, 0xcf, 0xdc, 0xeb, 0xfd, 0x38, 0x1e, 0x29, 0x90,
0x24, 0xb6, 0xcd, 0xfb, 0x3c, 0x87, 0xd9, 0x11, 0x6d, 0x40, 0x86, 0x5e, 0x79, 0x24, 0x6a, 0xe1,
0x1c, 0x8e, 0x6e, 0xcc, 0x43, 0xc3, 0x77, 0x45, 0x7f, 0xe6, 0x30, 0x3f, 0x17, 0xff, 0x2e, 0xc3,
0x4a, 0x6d, 0x76, 0x48, 0x67, 0x02, 0x77, 0xe4, 0xeb, 0x62, 0xf7, 0xaf, 0x2d, 0x70, 0x44, 0xc0,
0x7b, 0xd7, 0x1e, 0xc5, 0x11, 0x94, 0x85, 0x60, 0xd0, 0x20, 0xb4, 0x9c, 0x78, 0xd7, 0xdc, 0xc9,
0x8c, 0xe3, 0xd9, 0x07, 0xd0, 0x5d, 0xdb, 0xb6, 0x44, 0x02, 0x58, 0xb1, 0x7f, 0x03, 0x3b, 0x06,
0x67, 0x13, 0x20, 0xc5, 0xb6, 0x00, 0xfa, 0x18, 0xb2, 0x81, 0xeb, 0x87, 0xa2, 0xeb, 0xe5, 0xfb,
0xf5, 0x43, 0x86, 0x31, 0xaa, 0xd7, 0xe8, 0x03, 0xc8, 0x93, 0x40, 0xa7, 0x8e, 0x61, 0x39, 0x26,
0x77, 0x3f, 0x87, 0xa7, 0x02, 0xb4, 0x0b, 0x05, 0x67, 0x64, 0xdb, 0x41, 0xff, 0xcc, 0xf2, 0x83,
0x30, 0x4a, 0x35, 0x70, 0xd1, 0x73, 0x26, 0x41, 0x2b, 0x20, 0x5f, 0xf0, 0x64, 0x27, 0xb1, 0x7c,
0x51, 0xfc, 0x87, 0x0c, 0xd9, 0x68, 0x93, 0xa1, 0x03, 0xc8, 0x86, 0xc4, 0x37, 0x69, 0x38, 0xfe,
0xb0, 0x9b, 0x0b, 0x56, 0x5e, 0x8f, 0x23, 0xf0, 0x18, 0x39, 0xdf, 0xdd, 0x89, 0xef, 0xa0, 0xbb,
0x93, 0x0f, 0xab, 0xea, 0x2e, 0xc0, 0xd4, 0x31, 0xfe, 0x79, 0x49, 0x48, 0xfa, 0xee, 0x80, 0x59,
0x9f, 0x34, 0xe9, 0x5d, 0xff, 0xd1, 0x80, 0x21, 0xae, 0x54, 0x0f, 0x27, 0xbb, 0x72, 0x15, 0x0a,
0xb1, 0xe5, 0x5b, 0xcc, 0x40, 0x8a, 0xad, 0xd4, 0xfd, 0x7f, 0xca, 0xa0, 0xcc, 0xaf, 0x32, 0xa4,
0xc1, 0x56, 0xe5, 0xe8, 0x08, 0x37, 0x8e, 0x2a, 0xbd, 0x46, 0x1f, 0x57, 0xda, 0x47, 0x8d, 0x7e,
0xe7, 0xa4, 0xdf, 0x6a, 0x9f, 0x56, 0x5e, 0xb4, 0xea, 0x8a, 0x84, 0x3e, 0x00, 0x75, 0x81, 0xbe,
0xd6, 0x79, 0xd5, 0xee, 0x29, 0x32, 0xda, 0x82, 0x8d, 0x05, 0xda, 0xee, 0xab, 0x97, 0x4a, 0x62,
0x89, 0xee, 0x65, 0xe5, 0x53, 0x25, 0xb9, 0x4c, 0xd7, 0x6a, 0x2b, 0xa9, 0x25, 0x2f, 0x56, 0x7f,
0xdd, 0x6b, 0x74, 0x95, 0xf4, 0xfe, 0x1f, 0x92, 0xf0, 0xbd, 0x5b, 0x6b, 0x01, 0xed, 0xc2, 0xf6,
0x94, 0x73, 0xda, 0xa8, 0xf5, 0x3a, 0x78, 0x36, 0x8c, 0x6d, 0x78, 0xb4, 0x08, 0xc0, 0x3c, 0x95,
0x97, 0x29, 0x99, 0xab, 0x89, 0xa5, 0xca, 0x56, 0x5b, 0x49, 0xa2, 0x1d, 0xd8, 0x5c, 0xa4, 0x14,
0xe9, 0x49, 0x2d, 0xe3, 0x56, 0x4e, 0x8f, 0x94, 0xf4, 0x6c, 0xe6, 0x63, 0x2e, 0xf5, 0xea, 0xf5,
0xc6, 0xa9, 0x92, 0xb9, 0x43, 0x7f, 0x5a, 0xc1, 0x4a, 0x76, 0x59, 0xcc, 0xd5, 0x4e, 0xaf, 0xd7,
0x79, 0x79, 0xac, 0xe4, 0x66, 0x13, 0x39, 0x05, 0xf4, 0x3a, 0x27, 0xc7, 0x4a, 0x7e, 0x99, 0xb6,
0xdb, 0xc1, 0x3d, 0x05, 0xd0, 0x87, 0xb0, 0xb3, 0x4c, 0xdb, 0xaf, 0x37, 0xba, 0x35, 0xa5, 0xb0,
0xff, 0x29, 0xe4, 0xbb, 0xae, 0x1f, 0x76, 0x7c, 0x83, 0xfa, 0x68, 0x03, 0x10, 0xd7, 0x75, 0x70,
0xbd, 0x81, 0x63, 0x79, 0x57, 0x61, 0x3d, 0x26, 0xaf, 0x74, 0x6b, 0x8d, 0x76, 0xbd, 0xd5, 0x3e,
0x52, 0x64, 0xb4, 0x09, 0xef, 0xc7, 0x34, 0xcc, 0x66, 0xa4, 0x4a, 0x54, 0x47, 0x6f, 0xde, 0x6a,
0xd2, 0x97, 0x6f, 0x35, 0xe9, 0xeb, 0xb7, 0x9a, 0xfc, 0xfb, 0x1b, 0x4d, 0xfe, 0xeb, 0x8d, 0x26,
0x7f, 0x71, 0xa3, 0xc9, 0x6f, 0x6e, 0x34, 0xf9, 0x3f, 0x37, 0x9a, 0xfc, 0xdf, 0x1b, 0x4d, 0xfa,
0xfa, 0x46, 0x93, 0x3f, 0x7f, 0xa7, 0x49, 0x6f, 0xde, 0x69, 0xd2, 0x97, 0xef, 0x34, 0xe9, 0x37,
0xbf, 0xfc, 0x56, 0xbf, 0x8d, 0xa6, 0x3f, 0x9f, 0x07, 0x19, 0x2e, 0x39, 0xf8, 0x7f, 0x00, 0x00,
0x00, 0xff, 0xff, 0x62, 0x3e, 0x36, 0x6d, 0x78, 0x0f, 0x00, 0x00,
}
func (x AggregateRangeOp) String() string {
@ -2109,9 +2109,14 @@ func (this *ColumnCompat) Equal(that interface{}) bool {
if this.Destination != that1.Destination {
return false
}
if this.Collision != that1.Collision {
if len(this.Collisions) != len(that1.Collisions) {
return false
}
for i := range this.Collisions {
if this.Collisions[i] != that1.Collisions[i] {
return false
}
}
return true
}
func (this *TopK) Equal(that interface{}) bool {
@ -2533,7 +2538,7 @@ func (this *ColumnCompat) GoString() string {
s = append(s, "&physicalpb.ColumnCompat{")
s = append(s, "Source: "+fmt.Sprintf("%#v", this.Source)+",\n")
s = append(s, "Destination: "+fmt.Sprintf("%#v", this.Destination)+",\n")
s = append(s, "Collision: "+fmt.Sprintf("%#v", this.Collision)+",\n")
s = append(s, "Collisions: "+fmt.Sprintf("%#v", this.Collisions)+",\n")
s = append(s, "}")
return strings.Join(s, "")
}
@ -3411,10 +3416,23 @@ func (m *ColumnCompat) MarshalToSizedBuffer(dAtA []byte) (int, error) {
_ = i
var l int
_ = l
if m.Collision != 0 {
i = encodeVarintPhysicalpb(dAtA, i, uint64(m.Collision))
if len(m.Collisions) > 0 {
dAtA26 := make([]byte, len(m.Collisions)*10)
var j25 int
for _, num := range m.Collisions {
for num >= 1<<7 {
dAtA26[j25] = uint8(uint64(num)&0x7f | 0x80)
num >>= 7
j25++
}
dAtA26[j25] = uint8(num)
j25++
}
i -= j25
copy(dAtA[i:], dAtA26[:j25])
i = encodeVarintPhysicalpb(dAtA, i, uint64(j25))
i--
dAtA[i] = 0x18
dAtA[i] = 0x1a
}
if m.Destination != 0 {
i = encodeVarintPhysicalpb(dAtA, i, uint64(m.Destination))
@ -4016,8 +4034,12 @@ func (m *ColumnCompat) Size() (n int) {
if m.Destination != 0 {
n += 1 + sovPhysicalpb(uint64(m.Destination))
}
if m.Collision != 0 {
n += 1 + sovPhysicalpb(uint64(m.Collision))
if len(m.Collisions) > 0 {
l = 0
for _, e := range m.Collisions {
l += sovPhysicalpb(uint64(e))
}
n += 1 + sovPhysicalpb(uint64(l)) + l
}
return n
}
@ -4405,7 +4427,7 @@ func (this *ColumnCompat) String() string {
s := strings.Join([]string{`&ColumnCompat{`,
`Source:` + fmt.Sprintf("%v", this.Source) + `,`,
`Destination:` + fmt.Sprintf("%v", this.Destination) + `,`,
`Collision:` + fmt.Sprintf("%v", this.Collision) + `,`,
`Collisions:` + fmt.Sprintf("%v", this.Collisions) + `,`,
`}`,
}, "")
return s
@ -6433,23 +6455,73 @@ func (m *ColumnCompat) Unmarshal(dAtA []byte) error {
}
}
case 3:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Collision", wireType)
}
m.Collision = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPhysicalpb
if wireType == 0 {
var v expressionpb.ColumnType
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPhysicalpb
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
v |= expressionpb.ColumnType(b&0x7F) << shift
if b < 0x80 {
break
}
}
if iNdEx >= l {
m.Collisions = append(m.Collisions, v)
} else if wireType == 2 {
var packedLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPhysicalpb
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
packedLen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if packedLen < 0 {
return ErrInvalidLengthPhysicalpb
}
postIndex := iNdEx + packedLen
if postIndex < 0 {
return ErrInvalidLengthPhysicalpb
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.Collision |= expressionpb.ColumnType(b&0x7F) << shift
if b < 0x80 {
break
var elementCount int
if elementCount != 0 && len(m.Collisions) == 0 {
m.Collisions = make([]expressionpb.ColumnType, 0, elementCount)
}
for iNdEx < postIndex {
var v expressionpb.ColumnType
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPhysicalpb
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
v |= expressionpb.ColumnType(b&0x7F) << shift
if b < 0x80 {
break
}
}
m.Collisions = append(m.Collisions, v)
}
} else {
return fmt.Errorf("proto: wrong wireType = %d for field Collisions", wireType)
}
default:
iNdEx = preIndex

@ -192,7 +192,7 @@ message ColumnCompat {
loki.expression.ColumnType source = 1; // column type of the column that may colide with columns of the same name but with collision type
loki.expression.ColumnType destination = 2; // column type of the generated _extracted column (should be same as source)
loki.expression.ColumnType collision = 3; // column type of the column that a source type column may collide with}
repeated loki.expression.ColumnType collisions = 3; // column types of the columns that a source type column may collide with}
}
// TopK represents a physical plan node that performs topK operation.

@ -140,7 +140,7 @@ func Test_Node(t *testing.T) {
Source: types.ColumnTypeMetadata,
Destination: types.ColumnTypeMetadata,
Collision: types.ColumnTypeLabel,
Collisions: []types.ColumnType{types.ColumnTypeLabel},
},
},
{

@ -317,21 +317,25 @@ func (n *ColumnCompat) UnmarshalPhysical(from physical.Node) error {
var (
source expressionpb.ColumnType
destination expressionpb.ColumnType
collision expressionpb.ColumnType
)
if err := source.UnmarshalType(compat.Source); err != nil {
return err
} else if err := destination.UnmarshalType(compat.Destination); err != nil {
return err
} else if err := collision.UnmarshalType(compat.Collision); err != nil {
return err
}
collisions := make([]expressionpb.ColumnType, len(compat.Collisions))
for i, ct := range compat.Collisions {
if err := collisions[i].UnmarshalType(ct); err != nil {
return err
}
}
*n = ColumnCompat{
Source: source,
Destination: destination,
Collision: collision,
Collisions: collisions,
}
return nil
}

@ -491,7 +491,7 @@ func TestHTTP2MessageFrameSerialization(t *testing.T) {
expectedPlan := &physical.Plan{}
parallelize := expectedPlan.Graph().Add(&physical.Parallelize{NodeID: ulid.Make()})
compat := expectedPlan.Graph().Add(&physical.ColumnCompat{NodeID: ulid.Make(), Source: types.ColumnTypeMetadata, Destination: types.ColumnTypeMetadata, Collision: types.ColumnTypeLabel})
compat := expectedPlan.Graph().Add(&physical.ColumnCompat{NodeID: ulid.Make(), Source: types.ColumnTypeMetadata, Destination: types.ColumnTypeMetadata, Collisions: []types.ColumnType{types.ColumnTypeLabel}})
scanSet := expectedPlan.Graph().Add(&physical.ScanSet{
NodeID: ulid.Make(),
Targets: []*physical.ScanTarget{

Loading…
Cancel
Save