Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/dataobj/sections/logs/reader_test.go

143 lines
3.8 KiB

package logs_test
import (
"bytes"
"context"
"errors"
"io"
"testing"
"time"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/apache/arrow-go/v18/arrow/scalar"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/dataobj/sections/logs"
"github.com/grafana/loki/v3/pkg/util/arrowtest"
)
// TestReader does a basic end-to-end test over a reader with a predicate applied.
func TestReader(t *testing.T) {
alloc := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer alloc.AssertSize(t, 0)
sec := buildSection(t, []logs.Record{
{StreamID: 2, Timestamp: unixTime(40), Metadata: labels.FromStrings("trace_id", "789012"), Line: []byte("baz qux")},
{StreamID: 2, Timestamp: unixTime(30), Metadata: labels.FromStrings("trace_id", "123456"), Line: []byte("foo bar")},
{StreamID: 1, Timestamp: unixTime(20), Metadata: labels.FromStrings("trace_id", "abcdef"), Line: []byte("goodbye, world!")},
{StreamID: 1, Timestamp: unixTime(10), Metadata: labels.EmptyLabels(), Line: []byte("hello, world!")},
})
var (
streamID = sec.Columns()[0]
traceID = sec.Columns()[2]
message = sec.Columns()[3]
)
require.Equal(t, "", streamID.Name)
require.Equal(t, logs.ColumnTypeStreamID, streamID.Type)
require.Equal(t, "trace_id", traceID.Name)
require.Equal(t, logs.ColumnTypeMetadata, traceID.Type)
require.Equal(t, "", message.Name)
require.Equal(t, logs.ColumnTypeMessage, message.Type)
r := logs.NewReader(logs.ReaderOptions{
Columns: []*logs.Column{streamID, traceID, message},
Allocator: alloc,
Predicates: []logs.Predicate{
logs.FuncPredicate{
Column: traceID,
Keep: func(_ *logs.Column, value scalar.Scalar) bool {
if !value.IsValid() {
return false
}
bb := value.(*scalar.String).Value.Bytes()
return bytes.Equal(bb, []byte("abcdef")) || bytes.Equal(bb, []byte("123456"))
},
},
logs.InPredicate{
Column: streamID,
Values: []scalar.Scalar{
scalar.NewInt64Scalar(1),
scalar.NewInt64Scalar(2),
},
},
},
})
expect := arrowtest.Rows{
{"stream_id.int64": int64(2), "trace_id.metadata.utf8": "123456", "message.utf8": "foo bar"},
{"stream_id.int64": int64(1), "trace_id.metadata.utf8": "abcdef", "message.utf8": "goodbye, world!"},
}
actualTable, err := readTable(context.Background(), r)
if actualTable != nil {
defer actualTable.Release()
}
require.NoError(t, err)
actual, err := arrowtest.TableRows(alloc, actualTable)
require.NoError(t, err, "failed to get rows from table")
require.Equal(t, expect, actual)
}
func buildSection(t *testing.T, recs []logs.Record) *logs.Section {
t.Helper()
sectionBuilder := logs.NewBuilder(nil, logs.BuilderOptions{
PageSizeHint: 8192,
BufferSize: 4192,
StripeMergeLimit: 2,
})
for _, rec := range recs {
sectionBuilder.Append(rec)
}
objectBuilder := dataobj.NewBuilder()
require.NoError(t, objectBuilder.Append(sectionBuilder))
var buf bytes.Buffer
_, err := objectBuilder.Flush(&buf)
require.NoError(t, err)
obj, err := dataobj.FromReaderAt(bytes.NewReader(buf.Bytes()), int64(buf.Len()))
require.NoError(t, err)
sec, err := logs.Open(t.Context(), obj.Sections()[0])
require.NoError(t, err)
return sec
}
func unixTime(sec int64) time.Time { return time.Unix(sec, 0) }
func readTable(ctx context.Context, r *logs.Reader) (arrow.Table, error) {
var recs []arrow.Record
for {
rec, err := r.Read(ctx, 128)
if rec != nil {
if rec.NumRows() > 0 {
recs = append(recs, rec)
}
defer rec.Release()
}
if err != nil && errors.Is(err, io.EOF) {
break
} else if err != nil {
return nil, err
}
}
if len(recs) == 0 {
return nil, io.EOF
}
return array.NewTableFromRecords(recs[0].Schema(), recs), nil
}