Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/engine/internal/executor/pipeline_test.go

204 lines
5.9 KiB

package executor
import (
"context"
"errors"
"strings"
"testing"
"time"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/csv"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
"github.com/grafana/loki/v3/pkg/util/arrowtest"
"github.com/grafana/loki/v3/pkg/xcap"
)
// CSVToArrow converts a CSV string to an Arrow record based on the provided schema.
// It uses the Arrow CSV reader for parsing.
func CSVToArrow(fields []arrow.Field, csvData string) (arrow.RecordBatch, error) {
return CSVToArrowWithAllocator(memory.NewGoAllocator(), fields, csvData)
}
// CSVToArrowWithAllocator converts a CSV string to an Arrow record based on the provided schema
// using the specified memory allocator. It reads all rows from the CSV into a single record.
func CSVToArrowWithAllocator(allocator memory.Allocator, fields []arrow.Field, csvData string) (arrow.RecordBatch, error) {
// first, trim the csvData to remove any preceding and trailing whitespace/line breaks
csvData = strings.TrimSpace(csvData)
// Create schema
schema := arrow.NewSchema(fields, nil)
// Set up CSV reader with stringified data
input := strings.NewReader(csvData)
reader := csv.NewReader(
input,
schema,
csv.WithAllocator(allocator),
csv.WithNullReader(true),
csv.WithComma(','),
csv.WithChunk(-1), // Read all rows
)
if !reader.Next() {
return nil, errors.New("failed to read CSV data")
}
return reader.RecordBatch(), nil
}
func TestCSVPipeline(t *testing.T) {
// Define test schema
fields := []arrow.Field{
{Name: "name", Type: types.Arrow.String},
{Name: "age", Type: types.Arrow.Integer},
}
schema := arrow.NewSchema(fields, nil)
// Create test data
csvData1 := "Alice,30\nBob,25"
csvData2 := "Charlie,35\nDave,40"
// Convert to Arrow records
record1, err := CSVToArrow(fields, csvData1)
require.NoError(t, err)
record2, err := CSVToArrow(fields, csvData2)
require.NoError(t, err)
// Create a CSVPipeline with the test records
pipeline := NewBufferedPipeline(record1, record2)
defer pipeline.Close()
t.Run("should return records in order", func(t *testing.T) {
ctx := t.Context()
// First read should return the first record
batch, err := pipeline.Read(ctx)
require.NoError(t, err)
require.Equal(t, record1.NumRows(), batch.NumRows())
require.Equal(t, schema, batch.Schema())
// Second read should return the second record
batch, err = pipeline.Read(ctx)
require.NoError(t, err)
require.Equal(t, record2.NumRows(), batch.NumRows())
require.Equal(t, schema, batch.Schema())
// Third read should return EOF
_, err = pipeline.Read(ctx)
require.Equal(t, EOF, err)
})
}
func newInstrumentedPipeline(inner Pipeline) *instrumentedPipeline {
return &instrumentedPipeline{
inner: inner,
callCount: make(map[string]int),
}
}
type instrumentedPipeline struct {
inner Pipeline
callCount map[string]int
}
// Close implements Pipeline.
func (i *instrumentedPipeline) Close() {
i.callCount["Close"]++
i.inner.Close()
}
// Read implements Pipeline.
func (i *instrumentedPipeline) Read(ctx context.Context) (arrow.RecordBatch, error) {
i.callCount["Read"]++
return i.inner.Read(ctx)
}
// Region implements RegionProvider.
func (i *instrumentedPipeline) Region() *xcap.Region {
if provider, ok := i.inner.(RegionProvider); ok {
return provider.Region()
}
return nil
}
var _ Pipeline = (*instrumentedPipeline)(nil)
func Test_prefetchWrapper_Read(t *testing.T) {
batch1 := arrowtest.Rows{
{"message": "log line 1"},
{"message": "log line 2"},
}
batch2 := arrowtest.Rows{
{"message": "log line 3"},
{"message": "log line 4"},
}
batch3 := arrowtest.Rows{
{"message": "log line 5"},
}
records := []arrow.RecordBatch{
batch1.Record(memory.DefaultAllocator, batch1.Schema()),
batch2.Record(memory.DefaultAllocator, batch2.Schema()),
batch3.Record(memory.DefaultAllocator, batch3.Schema()),
}
pipeline := NewBufferedPipeline(records...)
instrumentedPipeline := newInstrumentedPipeline(pipeline)
prefetchingPipeline := newPrefetchingPipeline(instrumentedPipeline)
defer prefetchingPipeline.Close()
require.Equal(t, 0, instrumentedPipeline.callCount["Read"])
ctx := t.Context()
// Read first batch
v, err := prefetchingPipeline.Read(ctx)
require.NoError(t, err)
require.Equal(t, int64(2), v.NumRows())
time.Sleep(10 * time.Millisecond) // ensure that next batch has been prefetched
require.Equal(t, 2, instrumentedPipeline.callCount["Read"]) // 1 record consumed + 1 record pre-fetched
// Read second batch
v, err = prefetchingPipeline.Read(ctx)
require.NoError(t, err)
require.Equal(t, int64(2), v.NumRows())
time.Sleep(10 * time.Millisecond) // ensure that next batch has been prefetched
require.Equal(t, 3, instrumentedPipeline.callCount["Read"]) // 2 records consumed + 1 record pre-fetched
// Read third/last batch
v, err = prefetchingPipeline.Read(ctx)
require.NoError(t, err)
require.Equal(t, int64(1), v.NumRows())
time.Sleep(10 * time.Millisecond) // ensure that next batch has been prefetched
require.Equal(t, 4, instrumentedPipeline.callCount["Read"]) // 3 records consumed + 1 EOF pre-fetched
// Read EOF
_, err = prefetchingPipeline.Read(ctx)
require.ErrorContains(t, err, EOF.Error())
time.Sleep(10 * time.Millisecond) // ensure that next batch has been prefetched
require.Equal(t, 4, instrumentedPipeline.callCount["Read"]) // 3 records + 1 EOF consumed
}
func Test_prefetchWrapper_Close(t *testing.T) {
t.Run("initialized prefetcher", func(t *testing.T) {
w := newPrefetchingPipeline(emptyPipeline())
_, err := w.Read(t.Context())
require.ErrorIs(t, EOF, err)
require.NotPanics(t, w.Close)
})
t.Run("uninitialized prefetcher", func(t *testing.T) {
w := newPrefetchingPipeline(emptyPipeline())
require.NotPanics(t, w.Close)
})
}