Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/dataobj/sections/streams/reader_test.go

210 lines
5.4 KiB

package streams_test
import (
"context"
"errors"
"io"
"testing"
"time"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/apache/arrow-go/v18/arrow/scalar"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/dataobj/sections/streams"
"github.com/grafana/loki/v3/pkg/util/arrowtest"
)
func TestReader(t *testing.T) {
expect := arrowtest.Rows{
{
"stream_id.int64": int64(1),
"app.label.utf8": "foo",
"cluster.label.utf8": "test",
"min_timestamp.timestamp": time.Unix(10, 0).UTC(),
"max_timestamp.timestamp": time.Unix(15, 0).UTC(),
"rows.int64": int64(2),
"uncompressed_size.int64": int64(25),
},
{
"stream_id.int64": int64(2),
"app.label.utf8": "bar",
"cluster.label.utf8": "test",
"min_timestamp.timestamp": time.Unix(5, 0).UTC(),
"max_timestamp.timestamp": time.Unix(20, 0).UTC(),
"rows.int64": int64(2),
"uncompressed_size.int64": int64(45),
},
{
"stream_id.int64": int64(3),
"app.label.utf8": "baz",
"cluster.label.utf8": "test",
"min_timestamp.timestamp": time.Unix(25, 0).UTC(),
"max_timestamp.timestamp": time.Unix(30, 0).UTC(),
"rows.int64": int64(2),
"uncompressed_size.int64": int64(35),
},
}
sec := buildStreamsSection(t, 1, 0)
r := streams.NewReader(streams.ReaderOptions{
Columns: sec.Columns(),
Predicates: nil,
Allocator: memory.DefaultAllocator,
})
actualTable, err := readTable(context.Background(), r)
require.NoError(t, err)
actual, err := arrowtest.TableRows(memory.DefaultAllocator, actualTable)
require.NoError(t, err, "failed to get rows from table")
require.Equal(t, expect, actual)
}
func TestReader_Predicate(t *testing.T) {
expect := arrowtest.Rows{
{
"stream_id.int64": int64(2),
"app.label.utf8": "bar",
"cluster.label.utf8": "test",
"min_timestamp.timestamp": time.Unix(5, 0).UTC(),
"max_timestamp.timestamp": time.Unix(20, 0).UTC(),
"rows.int64": int64(2),
"uncompressed_size.int64": int64(45),
},
}
sec := buildStreamsSection(t, 1, 0)
appLabel := sec.Columns()[5]
require.Equal(t, "app", appLabel.Name)
require.Equal(t, streams.ColumnTypeLabel, appLabel.Type)
r := streams.NewReader(streams.ReaderOptions{
Columns: sec.Columns(),
Predicates: []streams.Predicate{
streams.EqualPredicate{
Column: appLabel,
Value: scalar.NewBinaryScalar(memory.NewBufferBytes([]byte("bar")), arrow.BinaryTypes.Binary),
},
},
Allocator: memory.DefaultAllocator,
})
actualTable, err := readTable(context.Background(), r)
require.NoError(t, err)
actual, err := arrowtest.TableRows(memory.DefaultAllocator, actualTable)
require.NoError(t, err, "failed to get rows from table")
require.Equal(t, expect, actual)
}
func TestReader_InPredicate(t *testing.T) {
expect := arrowtest.Rows{
{
"stream_id.int64": int64(2),
"app.label.utf8": "bar",
"cluster.label.utf8": "test",
"min_timestamp.timestamp": time.Unix(5, 0).UTC(),
"max_timestamp.timestamp": time.Unix(20, 0).UTC(),
"rows.int64": int64(2),
"uncompressed_size.int64": int64(45),
},
}
sec := buildStreamsSection(t, 1, 0)
streamID := sec.Columns()[0]
require.Equal(t, "", streamID.Name)
require.Equal(t, streams.ColumnTypeStreamID, streamID.Type)
r := streams.NewReader(streams.ReaderOptions{
Columns: sec.Columns(),
Predicates: []streams.Predicate{
streams.InPredicate{
Column: streamID,
Values: []scalar.Scalar{
scalar.NewInt64Scalar(2),
},
},
},
Allocator: memory.DefaultAllocator,
})
actualTable, err := readTable(context.Background(), r)
require.NoError(t, err)
actual, err := arrowtest.TableRows(memory.DefaultAllocator, actualTable)
require.NoError(t, err, "failed to get rows from table")
require.Equal(t, expect, actual)
}
func TestReader_ColumnSubset(t *testing.T) {
expect := arrowtest.Rows{
{
"stream_id.int64": int64(1),
"app.label.utf8": "foo",
},
{
"stream_id.int64": int64(2),
"app.label.utf8": "bar",
},
{
"stream_id.int64": int64(3),
"app.label.utf8": "baz",
},
}
sec := buildStreamsSection(t, 1, 0)
var (
streamID = sec.Columns()[0]
appLabel = sec.Columns()[5]
)
require.Equal(t, "", streamID.Name)
require.Equal(t, streams.ColumnTypeStreamID, streamID.Type)
require.Equal(t, "app", appLabel.Name)
require.Equal(t, streams.ColumnTypeLabel, appLabel.Type)
r := streams.NewReader(streams.ReaderOptions{
Columns: []*streams.Column{streamID, appLabel},
Predicates: nil,
Allocator: memory.DefaultAllocator,
})
actualTable, err := readTable(context.Background(), r)
require.NoError(t, err)
actual, err := arrowtest.TableRows(memory.DefaultAllocator, actualTable)
require.NoError(t, err, "failed to get rows from table")
require.Equal(t, expect, actual)
}
func readTable(ctx context.Context, r *streams.Reader) (arrow.Table, error) {
var recs []arrow.RecordBatch
for {
rec, err := r.Read(ctx, 128)
if rec != nil {
if rec.NumRows() > 0 {
recs = append(recs, rec)
}
}
if err != nil && errors.Is(err, io.EOF) {
break
} else if err != nil {
return nil, err
}
}
if len(recs) == 0 {
return nil, io.EOF
}
return array.NewTableFromRecords(recs[0].Schema(), recs), nil
}