Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/engine/internal/executor/pipeline_utils_test.go

166 lines
4.6 KiB

package executor
import (
"errors"
"testing"
"github.com/apache/arrow-go/v18/arrow"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
)
// AssertPipelinesEqual iterates through two pipelines and ensures they contain
// the same data, regardless of batch sizes. It compares row by row and column by column.
func AssertPipelinesEqual(t testing.TB, left, right Pipeline) {
ctx := t.Context()
defer left.Close()
defer right.Close()
var (
leftBatch, rightBatch arrow.RecordBatch
leftPos, rightPos int64
leftRemain, rightRemain int64
rowNum int64
leftErr, rightErr error
)
for {
// Load batches if needed
if leftRemain == 0 {
if leftBatch != nil {
leftBatch = nil
}
leftBatch, leftErr = left.Read(ctx)
if leftErr == nil {
leftRemain = leftBatch.NumRows()
leftPos = 0
}
}
if rightRemain == 0 {
if rightBatch != nil {
rightBatch = nil
}
rightBatch, rightErr = right.Read(ctx)
if rightErr == nil {
rightRemain = rightBatch.NumRows()
rightPos = 0
}
}
// Check conditions on our batches:
switch {
case errors.Is(leftErr, EOF) && errors.Is(rightErr, EOF):
// Both pipelines are finished; we can finish now.
return
case (errors.Is(leftErr, EOF) && rightErr == nil) || (errors.Is(rightErr, EOF) && leftErr == nil):
// One pipeline finished before the other (and the other didn't fail), then
// there's an inequal number of rows.
require.Fail(t, "Pipelines have a different number of rows",
"left error: %v, right error: %v", leftErr, rightErr)
case leftErr != nil && rightErr != nil && !errors.Is(leftErr, EOF) && !errors.Is(rightErr, EOF):
// Both pipelines failed with non-EOF errors.
require.Equal(t, leftErr, rightErr, "Pipelines failed with different errors")
case leftErr != nil && !errors.Is(leftErr, EOF):
// Left pipeline failed with a non-EOF error.
require.Fail(t, "Left pipeline failed", "error: %v", leftErr)
case rightErr != nil && !errors.Is(rightErr, EOF):
// Right pipeline failed with a non-EOF error.
require.Fail(t, "Right pipeline failed", "error: %v", rightErr)
}
// Check schema compatibility
require.True(t, leftBatch.Schema().Equal(rightBatch.Schema()),
"Pipelines have incompatible schemas:\nleft %v\nright %v",
leftBatch.Schema(), rightBatch.Schema())
// Compare rows until one of the batches is exhausted
compareRows := min(leftRemain, rightRemain)
for i := int64(0); i < compareRows; i++ {
// Compare all columns for this row
for j := 0; j < int(leftBatch.NumCols()); j++ {
colName := leftBatch.ColumnName(j)
leftCol := leftBatch.Column(j)
rightCol := rightBatch.Column(j)
// Check if both columns are null
leftIsNull := leftCol.IsNull(int(leftPos))
rightIsNull := rightCol.IsNull(int(rightPos))
require.Equal(t, leftIsNull, rightIsNull,
"row %d, column %s: null mismatch", rowNum, colName)
if leftIsNull {
continue
}
// Check validity
leftIsValid := leftCol.IsValid(int(leftPos))
rightIsValid := rightCol.IsValid(int(rightPos))
require.Equal(t, leftIsValid, rightIsValid,
"row %d, column %s: validity mismatch", rowNum, colName)
if !leftIsValid {
continue
}
// Compare the values
require.Equal(t, leftCol.ValueStr(int(leftPos)), rightCol.ValueStr(int(rightPos)),
"row %d, column %s: value differs", rowNum, colName)
}
leftPos++
rightPos++
leftRemain--
rightRemain--
rowNum++
}
}
}
func TestAssertPipelinesEqual(t *testing.T) {
// Define test schema
fields := []arrow.Field{
{Name: "name", Type: types.Arrow.String},
{Name: "age", Type: types.Arrow.Integer},
}
// Create test data
csvData := "Alice,30\nBob,25\nCharlie,35\nDave,40"
record, err := CSVToArrow(fields, csvData)
require.NoError(t, err)
// Split the record into two smaller records
csvData1 := "Alice,30\nBob,25"
csvData2 := "Charlie,35\nDave,40"
record1, err := CSVToArrow(fields, csvData1)
require.NoError(t, err)
record2, err := CSVToArrow(fields, csvData2)
require.NoError(t, err)
t.Run("identical pipelines with same batch size", func(t *testing.T) {
p1 := NewBufferedPipeline(record)
p2 := NewBufferedPipeline(record)
// This should not fail
AssertPipelinesEqual(t, p1, p2)
})
t.Run("identical pipelines with different batch sizes", func(t *testing.T) {
// First pipeline has one batch with all data
p1 := NewBufferedPipeline(record)
// Second pipeline has two smaller batches
p2 := NewBufferedPipeline(record1, record2)
// This should not fail despite different batch sizes
AssertPipelinesEqual(t, p1, p2)
})
}