package executor import ( "math" "testing" "time" "github.com/apache/arrow-go/v18/arrow" "github.com/go-kit/log" "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/dataobj" "github.com/grafana/loki/v3/pkg/dataobj/consumer/logsobj" "github.com/grafana/loki/v3/pkg/dataobj/sections/logs" "github.com/grafana/loki/v3/pkg/dataobj/sections/streams" "github.com/grafana/loki/v3/pkg/engine/internal/planner/physical" "github.com/grafana/loki/v3/pkg/engine/internal/semconv" "github.com/grafana/loki/v3/pkg/engine/internal/types" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/pkg/push" ) func Test_dataobjScan(t *testing.T) { obj := buildDataobj(t, []logproto.Stream{ { Labels: `{service="loki", env="prod"}`, Entries: []logproto.Entry{ { Timestamp: time.Unix(5, 0), Line: "hello world", StructuredMetadata: []push.LabelAdapter{{Name: "guid", Value: "aaaa-bbbb-cccc-dddd"}}, }, { Timestamp: time.Unix(10, 0), Line: "goodbye world", StructuredMetadata: []push.LabelAdapter{{Name: "guid", Value: "eeee-ffff-aaaa-bbbb"}}, }, }, }, { Labels: `{service="notloki", env="prod"}`, Entries: []logproto.Entry{ { Timestamp: time.Unix(2, 0), Line: "hello world", StructuredMetadata: []push.LabelAdapter{{Name: "pod", Value: "notloki-pod-1"}}, }, { Timestamp: time.Unix(3, 0), Line: "goodbye world", StructuredMetadata: []push.LabelAdapter{{Name: "pod", Value: "notloki-pod-1"}}, }, }, }, }) var ( streamsSection *streams.Section logsSection *logs.Section ) for _, sec := range obj.Sections() { var err error switch { case streams.CheckSection(sec): streamsSection, err = streams.Open(t.Context(), sec) require.NoError(t, err, "failed to open streams section") case logs.CheckSection(sec): logsSection, err = logs.Open(t.Context(), sec) require.NoError(t, err, "failed to open logs section") } } t.Run("All columns", func(t *testing.T) { pipeline := newDataobjScanPipeline(dataobjScanOptions{ StreamsSection: streamsSection, LogsSection: logsSection, StreamIDs: []int64{1, 2}, // All streams Projections: nil, // All columns BatchSize: 512, }, log.NewNopLogger(), nil) expectFields := []arrow.Field{ semconv.FieldFromFQN("utf8.label.env", true), semconv.FieldFromFQN("utf8.label.service", true), semconv.FieldFromFQN("utf8.metadata.guid", true), semconv.FieldFromFQN("utf8.metadata.pod", true), semconv.FieldFromFQN("timestamp_ns.builtin.timestamp", true), // should be nullable=false? semconv.FieldFromFQN("utf8.builtin.message", true), // should be nullable=false? } expectCSV := `prod,loki,eeee-ffff-aaaa-bbbb,NULL,1970-01-01 00:00:10,goodbye world prod,loki,aaaa-bbbb-cccc-dddd,NULL,1970-01-01 00:00:05,hello world prod,notloki,NULL,notloki-pod-1,1970-01-01 00:00:03,goodbye world prod,notloki,NULL,notloki-pod-1,1970-01-01 00:00:02,hello world` expectRecord, err := CSVToArrow(expectFields, expectCSV) require.NoError(t, err) AssertPipelinesEqual(t, pipeline, NewBufferedPipeline(expectRecord)) }) t.Run("Column subset", func(t *testing.T) { pipeline := newDataobjScanPipeline(dataobjScanOptions{ StreamsSection: streamsSection, LogsSection: logsSection, StreamIDs: []int64{1, 2}, // All streams Projections: []physical.ColumnExpression{ &physical.ColumnExpr{Ref: types.ColumnRef{Column: "env", Type: types.ColumnTypeLabel}}, &physical.ColumnExpr{Ref: types.ColumnRef{Column: "timestamp", Type: types.ColumnTypeBuiltin}}, }, BatchSize: 512, }, log.NewNopLogger(), nil) expectFields := []arrow.Field{ semconv.FieldFromFQN("utf8.label.env", true), semconv.FieldFromFQN("timestamp_ns.builtin.timestamp", true), // should be not nullable? } expectCSV := `prod,1970-01-01 00:00:10 prod,1970-01-01 00:00:05 prod,1970-01-01 00:00:03 prod,1970-01-01 00:00:02` expectRecord, err := CSVToArrow(expectFields, expectCSV) require.NoError(t, err) AssertPipelinesEqual(t, pipeline, NewBufferedPipeline(expectRecord)) }) t.Run("Streams subset", func(t *testing.T) { pipeline := newDataobjScanPipeline(dataobjScanOptions{ StreamsSection: streamsSection, LogsSection: logsSection, StreamIDs: []int64{2}, // Only stream 2 Projections: nil, // All columns BatchSize: 512, }, log.NewNopLogger(), nil) expectFields := []arrow.Field{ semconv.FieldFromFQN("utf8.label.env", true), semconv.FieldFromFQN("utf8.label.service", true), semconv.FieldFromFQN("utf8.metadata.guid", true), semconv.FieldFromFQN("utf8.metadata.pod", true), semconv.FieldFromFQN("timestamp_ns.builtin.timestamp", true), // should be nullable=false? semconv.FieldFromFQN("utf8.builtin.message", true), // should be nullable=false? } expectCSV := `prod,notloki,NULL,notloki-pod-1,1970-01-01 00:00:03,goodbye world prod,notloki,NULL,notloki-pod-1,1970-01-01 00:00:02,hello world` expectRecord, err := CSVToArrow(expectFields, expectCSV) require.NoError(t, err) AssertPipelinesEqual(t, pipeline, NewBufferedPipeline(expectRecord)) }) t.Run("Ambiguous column", func(t *testing.T) { // Here, we'll check for a column which only exists once in the dataobj but is // ambiguous from the perspective of the caller. pipeline := newDataobjScanPipeline(dataobjScanOptions{ StreamsSection: streamsSection, LogsSection: logsSection, StreamIDs: []int64{1, 2}, // All streams Projections: []physical.ColumnExpression{ &physical.ColumnExpr{Ref: types.ColumnRef{Column: "env", Type: types.ColumnTypeAmbiguous}}, }, BatchSize: 512, }, log.NewNopLogger(), nil) expectFields := []arrow.Field{ semconv.FieldFromFQN("utf8.label.env", true), } expectCSV := `prod prod prod prod` expectRecord, err := CSVToArrow(expectFields, expectCSV) require.NoError(t, err) AssertPipelinesEqual(t, pipeline, NewBufferedPipeline(expectRecord)) }) } func Test_dataobjScan_DuplicateColumns(t *testing.T) { obj := buildDataobj(t, []logproto.Stream{ // Case 1: A single row has a value for a label and metadata column with // the same name. { Labels: `{service="loki", env="prod", pod="pod-1"}`, Entries: []logproto.Entry{ { Timestamp: time.Unix(1, 0), Line: "message 1", StructuredMetadata: []push.LabelAdapter{{Name: "pod", Value: "override"}}, }, }, }, // Case 2: A label and metadata column share a name but have values in // different rows. { Labels: `{service="loki", env="prod"}`, Entries: []logproto.Entry{{ Timestamp: time.Unix(2, 0), Line: "message 2", StructuredMetadata: []push.LabelAdapter{{Name: "namespace", Value: "namespace-1"}}, }}, }, { Labels: `{service="loki", env="prod", namespace="namespace-2"}`, Entries: []logproto.Entry{{ Timestamp: time.Unix(3, 0), Line: "message 3", StructuredMetadata: nil, }}, }, }) var ( streamsSection *streams.Section logsSection *logs.Section ) for _, sec := range obj.Sections() { var err error switch { case streams.CheckSection(sec): streamsSection, err = streams.Open(t.Context(), sec) require.NoError(t, err, "failed to open streams section") case logs.CheckSection(sec): logsSection, err = logs.Open(t.Context(), sec) require.NoError(t, err, "failed to open logs section") } } t.Run("All columns", func(t *testing.T) { pipeline := newDataobjScanPipeline(dataobjScanOptions{ StreamsSection: streamsSection, LogsSection: logsSection, StreamIDs: []int64{1, 2, 3}, // All streams Projections: nil, // All columns BatchSize: 512, }, log.NewNopLogger(), nil) expectFields := []arrow.Field{ semconv.FieldFromFQN("utf8.label.env", true), semconv.FieldFromFQN("utf8.label.namespace", true), semconv.FieldFromFQN("utf8.label.pod", true), semconv.FieldFromFQN("utf8.label.service", true), semconv.FieldFromFQN("utf8.metadata.namespace", true), semconv.FieldFromFQN("utf8.metadata.pod", true), semconv.FieldFromFQN("timestamp_ns.builtin.timestamp", true), semconv.FieldFromFQN("utf8.builtin.message", true), } expectCSV := `prod,namespace-2,NULL,loki,NULL,NULL,1970-01-01 00:00:03,message 3 prod,NULL,NULL,loki,namespace-1,NULL,1970-01-01 00:00:02,message 2 prod,NULL,pod-1,loki,NULL,override,1970-01-01 00:00:01,message 1` expectRecord, err := CSVToArrow(expectFields, expectCSV) require.NoError(t, err) AssertPipelinesEqual(t, pipeline, NewBufferedPipeline(expectRecord)) }) t.Run("Ambiguous pod", func(t *testing.T) { pipeline := newDataobjScanPipeline(dataobjScanOptions{ StreamsSection: streamsSection, LogsSection: logsSection, StreamIDs: []int64{1, 2, 3}, // All streams Projections: []physical.ColumnExpression{ &physical.ColumnExpr{Ref: types.ColumnRef{Column: "pod", Type: types.ColumnTypeAmbiguous}}, }, BatchSize: 512, }, log.NewNopLogger(), nil) expectFields := []arrow.Field{ semconv.FieldFromFQN("utf8.label.pod", true), semconv.FieldFromFQN("utf8.metadata.pod", true), } expectCSV := `NULL,NULL NULL,NULL pod-1,override` expectRecord, err := CSVToArrow(expectFields, expectCSV) require.NoError(t, err) AssertPipelinesEqual(t, pipeline, NewBufferedPipeline(expectRecord)) }) t.Run("Ambiguous namespace", func(t *testing.T) { pipeline := newDataobjScanPipeline(dataobjScanOptions{ StreamsSection: streamsSection, LogsSection: logsSection, StreamIDs: []int64{1, 2, 3}, // All streams Projections: []physical.ColumnExpression{ &physical.ColumnExpr{Ref: types.ColumnRef{Column: "namespace", Type: types.ColumnTypeAmbiguous}}, }, BatchSize: 512, }, log.NewNopLogger(), nil) expectFields := []arrow.Field{ semconv.FieldFromFQN("utf8.label.namespace", true), semconv.FieldFromFQN("utf8.metadata.namespace", true), } expectCSV := `namespace-2,NULL NULL,namespace-1 NULL,NULL` expectRecord, err := CSVToArrow(expectFields, expectCSV) require.NoError(t, err) AssertPipelinesEqual(t, pipeline, NewBufferedPipeline(expectRecord)) }) } func buildDataobj(t testing.TB, streams []logproto.Stream) *dataobj.Object { t.Helper() builder, err := logsobj.NewBuilder(logsobj.BuilderConfig{ BuilderBaseConfig: logsobj.BuilderBaseConfig{ TargetPageSize: 8_000, TargetObjectSize: math.MaxInt, TargetSectionSize: 32_000, BufferSize: 8_000, SectionStripeMergeLimit: 2, }, DataobjSortOrder: "timestamp-desc", }, nil) require.NoError(t, err) for _, stream := range streams { require.NoError(t, builder.Append("tenant", stream)) } obj, closer, err := builder.Flush() require.NoError(t, err) t.Cleanup(func() { closer.Close() }) return obj }