Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
loki/pkg/engine/internal/executor/dataobjscan_test.go

356 lines
11 KiB

package executor
import (
"math"
"testing"
"time"
"github.com/apache/arrow-go/v18/arrow"
"github.com/go-kit/log"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/dataobj"
refactor(dataobj): invert dependency between dataobj and sections (#17762) Originally, the dataobj package was a higher-level API around sections. This design caused it to become a bottleneck: * Implementing any new public behaviour for a section required bubbling it up to the dataobj API for it to be exposed, making it tedious to add new sections or update existing ones. * The `dataobj.Builder` pattern was focused on constructing dataobjs for storing log data, which will cause friction as we build objects around other use cases. This PR builds on top of the foundation laid out by #17704 and #17708, fully inverting the dependency between dataobj and sections: * The `dataobj` package has no knowledge of what sections exist, and can now be used for writing and reading generic sections. Section packages now create higher-level APIs around the abstractions provided by `dataobj`. * Section packages are now public, and callers interact directly with these packages for writing and reading section-specific data. * All logic for a section (encoding, decoding, buffering, reading) is now fully self-contained inside the section package. Previously, the implementation of each section was spread across three packages (`pkg/dataobj/internal/encoding`, `pkg/dataobj/internal/sections/SECTION`, `pkg/dataobj`). * Cutting a section is now a decision made by the caller rather than the section implementation. Previously, the logs section builder would create multiple sections. For the most part, this change is a no-op, with two exceptions: 1. Section cutting is now performed by the caller; however, this shouldn't result in any issues. 2. Removing the high-level `dataobj.Stream` and `dataobj.Record` types will temporarily reduce the allocation gains from #16988. I will address this after this PR is merged.
7 months ago
"github.com/grafana/loki/v3/pkg/dataobj/consumer/logsobj"
"github.com/grafana/loki/v3/pkg/dataobj/sections/logs"
"github.com/grafana/loki/v3/pkg/dataobj/sections/streams"
"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical"
"github.com/grafana/loki/v3/pkg/engine/internal/semconv"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/pkg/push"
)
func Test_dataobjScan(t *testing.T) {
obj := buildDataobj(t, []logproto.Stream{
{
Labels: `{service="loki", env="prod"}`,
Entries: []logproto.Entry{
{
Timestamp: time.Unix(5, 0),
Line: "hello world",
StructuredMetadata: []push.LabelAdapter{{Name: "guid", Value: "aaaa-bbbb-cccc-dddd"}},
},
{
Timestamp: time.Unix(10, 0),
Line: "goodbye world",
StructuredMetadata: []push.LabelAdapter{{Name: "guid", Value: "eeee-ffff-aaaa-bbbb"}},
},
},
},
{
Labels: `{service="notloki", env="prod"}`,
Entries: []logproto.Entry{
{
Timestamp: time.Unix(2, 0),
Line: "hello world",
StructuredMetadata: []push.LabelAdapter{{Name: "pod", Value: "notloki-pod-1"}},
},
{
Timestamp: time.Unix(3, 0),
Line: "goodbye world",
StructuredMetadata: []push.LabelAdapter{{Name: "pod", Value: "notloki-pod-1"}},
},
},
},
})
var (
streamsSection *streams.Section
logsSection *logs.Section
)
for _, sec := range obj.Sections() {
var err error
switch {
case streams.CheckSection(sec):
streamsSection, err = streams.Open(t.Context(), sec)
require.NoError(t, err, "failed to open streams section")
case logs.CheckSection(sec):
logsSection, err = logs.Open(t.Context(), sec)
require.NoError(t, err, "failed to open logs section")
}
}
t.Run("All columns", func(t *testing.T) {
pipeline := newDataobjScanPipeline(dataobjScanOptions{
StreamsSection: streamsSection,
LogsSection: logsSection,
StreamIDs: []int64{1, 2}, // All streams
Projections: nil, // All columns
BatchSize: 512,
}, log.NewNopLogger(), nil)
expectFields := []arrow.Field{
semconv.FieldFromFQN("utf8.label.env", true),
semconv.FieldFromFQN("utf8.label.service", true),
semconv.FieldFromFQN("utf8.metadata.guid", true),
semconv.FieldFromFQN("utf8.metadata.pod", true),
semconv.FieldFromFQN("timestamp_ns.builtin.timestamp", true), // should be nullable=false?
semconv.FieldFromFQN("utf8.builtin.message", true), // should be nullable=false?
}
expectCSV := `prod,loki,eeee-ffff-aaaa-bbbb,NULL,1970-01-01 00:00:10,goodbye world
prod,loki,aaaa-bbbb-cccc-dddd,NULL,1970-01-01 00:00:05,hello world
prod,notloki,NULL,notloki-pod-1,1970-01-01 00:00:03,goodbye world
prod,notloki,NULL,notloki-pod-1,1970-01-01 00:00:02,hello world`
expectRecord, err := CSVToArrow(expectFields, expectCSV)
require.NoError(t, err)
AssertPipelinesEqual(t, pipeline, NewBufferedPipeline(expectRecord))
})
t.Run("Column subset", func(t *testing.T) {
pipeline := newDataobjScanPipeline(dataobjScanOptions{
StreamsSection: streamsSection,
LogsSection: logsSection,
StreamIDs: []int64{1, 2}, // All streams
Projections: []physical.ColumnExpression{
&physical.ColumnExpr{Ref: types.ColumnRef{Column: "env", Type: types.ColumnTypeLabel}},
&physical.ColumnExpr{Ref: types.ColumnRef{Column: "timestamp", Type: types.ColumnTypeBuiltin}},
},
BatchSize: 512,
}, log.NewNopLogger(), nil)
expectFields := []arrow.Field{
semconv.FieldFromFQN("utf8.label.env", true),
semconv.FieldFromFQN("timestamp_ns.builtin.timestamp", true), // should be not nullable?
}
expectCSV := `prod,1970-01-01 00:00:10
prod,1970-01-01 00:00:05
prod,1970-01-01 00:00:03
prod,1970-01-01 00:00:02`
expectRecord, err := CSVToArrow(expectFields, expectCSV)
require.NoError(t, err)
AssertPipelinesEqual(t, pipeline, NewBufferedPipeline(expectRecord))
})
t.Run("Streams subset", func(t *testing.T) {
pipeline := newDataobjScanPipeline(dataobjScanOptions{
StreamsSection: streamsSection,
LogsSection: logsSection,
StreamIDs: []int64{2}, // Only stream 2
Projections: nil, // All columns
BatchSize: 512,
}, log.NewNopLogger(), nil)
expectFields := []arrow.Field{
semconv.FieldFromFQN("utf8.label.env", true),
semconv.FieldFromFQN("utf8.label.service", true),
semconv.FieldFromFQN("utf8.metadata.guid", true),
semconv.FieldFromFQN("utf8.metadata.pod", true),
semconv.FieldFromFQN("timestamp_ns.builtin.timestamp", true), // should be nullable=false?
semconv.FieldFromFQN("utf8.builtin.message", true), // should be nullable=false?
}
expectCSV := `prod,notloki,NULL,notloki-pod-1,1970-01-01 00:00:03,goodbye world
prod,notloki,NULL,notloki-pod-1,1970-01-01 00:00:02,hello world`
expectRecord, err := CSVToArrow(expectFields, expectCSV)
require.NoError(t, err)
AssertPipelinesEqual(t, pipeline, NewBufferedPipeline(expectRecord))
})
t.Run("Ambiguous column", func(t *testing.T) {
// Here, we'll check for a column which only exists once in the dataobj but is
// ambiguous from the perspective of the caller.
pipeline := newDataobjScanPipeline(dataobjScanOptions{
StreamsSection: streamsSection,
LogsSection: logsSection,
StreamIDs: []int64{1, 2}, // All streams
Projections: []physical.ColumnExpression{
&physical.ColumnExpr{Ref: types.ColumnRef{Column: "env", Type: types.ColumnTypeAmbiguous}},
},
BatchSize: 512,
}, log.NewNopLogger(), nil)
expectFields := []arrow.Field{
semconv.FieldFromFQN("utf8.label.env", true),
}
expectCSV := `prod
prod
prod
prod`
expectRecord, err := CSVToArrow(expectFields, expectCSV)
require.NoError(t, err)
AssertPipelinesEqual(t, pipeline, NewBufferedPipeline(expectRecord))
})
}
func Test_dataobjScan_DuplicateColumns(t *testing.T) {
obj := buildDataobj(t, []logproto.Stream{
// Case 1: A single row has a value for a label and metadata column with
// the same name.
{
Labels: `{service="loki", env="prod", pod="pod-1"}`,
Entries: []logproto.Entry{
{
Timestamp: time.Unix(1, 0),
Line: "message 1",
StructuredMetadata: []push.LabelAdapter{{Name: "pod", Value: "override"}},
},
},
},
// Case 2: A label and metadata column share a name but have values in
// different rows.
{
Labels: `{service="loki", env="prod"}`,
Entries: []logproto.Entry{{
Timestamp: time.Unix(2, 0),
Line: "message 2",
StructuredMetadata: []push.LabelAdapter{{Name: "namespace", Value: "namespace-1"}},
}},
},
{
Labels: `{service="loki", env="prod", namespace="namespace-2"}`,
Entries: []logproto.Entry{{
Timestamp: time.Unix(3, 0),
Line: "message 3",
StructuredMetadata: nil,
}},
},
})
var (
streamsSection *streams.Section
logsSection *logs.Section
)
for _, sec := range obj.Sections() {
var err error
switch {
case streams.CheckSection(sec):
streamsSection, err = streams.Open(t.Context(), sec)
require.NoError(t, err, "failed to open streams section")
case logs.CheckSection(sec):
logsSection, err = logs.Open(t.Context(), sec)
require.NoError(t, err, "failed to open logs section")
}
}
t.Run("All columns", func(t *testing.T) {
pipeline := newDataobjScanPipeline(dataobjScanOptions{
StreamsSection: streamsSection,
LogsSection: logsSection,
StreamIDs: []int64{1, 2, 3}, // All streams
Projections: nil, // All columns
BatchSize: 512,
}, log.NewNopLogger(), nil)
expectFields := []arrow.Field{
semconv.FieldFromFQN("utf8.label.env", true),
semconv.FieldFromFQN("utf8.label.namespace", true),
semconv.FieldFromFQN("utf8.label.pod", true),
semconv.FieldFromFQN("utf8.label.service", true),
semconv.FieldFromFQN("utf8.metadata.namespace", true),
semconv.FieldFromFQN("utf8.metadata.pod", true),
semconv.FieldFromFQN("timestamp_ns.builtin.timestamp", true),
semconv.FieldFromFQN("utf8.builtin.message", true),
}
expectCSV := `prod,namespace-2,NULL,loki,NULL,NULL,1970-01-01 00:00:03,message 3
prod,NULL,NULL,loki,namespace-1,NULL,1970-01-01 00:00:02,message 2
prod,NULL,pod-1,loki,NULL,override,1970-01-01 00:00:01,message 1`
expectRecord, err := CSVToArrow(expectFields, expectCSV)
require.NoError(t, err)
AssertPipelinesEqual(t, pipeline, NewBufferedPipeline(expectRecord))
})
t.Run("Ambiguous pod", func(t *testing.T) {
pipeline := newDataobjScanPipeline(dataobjScanOptions{
StreamsSection: streamsSection,
LogsSection: logsSection,
StreamIDs: []int64{1, 2, 3}, // All streams
Projections: []physical.ColumnExpression{
&physical.ColumnExpr{Ref: types.ColumnRef{Column: "pod", Type: types.ColumnTypeAmbiguous}},
},
BatchSize: 512,
}, log.NewNopLogger(), nil)
expectFields := []arrow.Field{
semconv.FieldFromFQN("utf8.label.pod", true),
semconv.FieldFromFQN("utf8.metadata.pod", true),
}
expectCSV := `NULL,NULL
NULL,NULL
pod-1,override`
expectRecord, err := CSVToArrow(expectFields, expectCSV)
require.NoError(t, err)
AssertPipelinesEqual(t, pipeline, NewBufferedPipeline(expectRecord))
})
t.Run("Ambiguous namespace", func(t *testing.T) {
pipeline := newDataobjScanPipeline(dataobjScanOptions{
StreamsSection: streamsSection,
LogsSection: logsSection,
StreamIDs: []int64{1, 2, 3}, // All streams
Projections: []physical.ColumnExpression{
&physical.ColumnExpr{Ref: types.ColumnRef{Column: "namespace", Type: types.ColumnTypeAmbiguous}},
},
BatchSize: 512,
}, log.NewNopLogger(), nil)
expectFields := []arrow.Field{
semconv.FieldFromFQN("utf8.label.namespace", true),
semconv.FieldFromFQN("utf8.metadata.namespace", true),
}
expectCSV := `namespace-2,NULL
NULL,namespace-1
NULL,NULL`
expectRecord, err := CSVToArrow(expectFields, expectCSV)
require.NoError(t, err)
AssertPipelinesEqual(t, pipeline, NewBufferedPipeline(expectRecord))
})
}
func buildDataobj(t testing.TB, streams []logproto.Stream) *dataobj.Object {
t.Helper()
refactor(dataobj): invert dependency between dataobj and sections (#17762) Originally, the dataobj package was a higher-level API around sections. This design caused it to become a bottleneck: * Implementing any new public behaviour for a section required bubbling it up to the dataobj API for it to be exposed, making it tedious to add new sections or update existing ones. * The `dataobj.Builder` pattern was focused on constructing dataobjs for storing log data, which will cause friction as we build objects around other use cases. This PR builds on top of the foundation laid out by #17704 and #17708, fully inverting the dependency between dataobj and sections: * The `dataobj` package has no knowledge of what sections exist, and can now be used for writing and reading generic sections. Section packages now create higher-level APIs around the abstractions provided by `dataobj`. * Section packages are now public, and callers interact directly with these packages for writing and reading section-specific data. * All logic for a section (encoding, decoding, buffering, reading) is now fully self-contained inside the section package. Previously, the implementation of each section was spread across three packages (`pkg/dataobj/internal/encoding`, `pkg/dataobj/internal/sections/SECTION`, `pkg/dataobj`). * Cutting a section is now a decision made by the caller rather than the section implementation. Previously, the logs section builder would create multiple sections. For the most part, this change is a no-op, with two exceptions: 1. Section cutting is now performed by the caller; however, this shouldn't result in any issues. 2. Removing the high-level `dataobj.Stream` and `dataobj.Record` types will temporarily reduce the allocation gains from #16988. I will address this after this PR is merged.
7 months ago
builder, err := logsobj.NewBuilder(logsobj.BuilderConfig{
BuilderBaseConfig: logsobj.BuilderBaseConfig{
TargetPageSize: 8_000,
TargetObjectSize: math.MaxInt,
TargetSectionSize: 32_000,
BufferSize: 8_000,
SectionStripeMergeLimit: 2,
},
DataobjSortOrder: "timestamp-desc",
}, nil)
require.NoError(t, err)
for _, stream := range streams {
require.NoError(t, builder.Append("tenant", stream))
}
obj, closer, err := builder.Flush()
refactor(dataobj): invert dependency between dataobj and sections (#17762) Originally, the dataobj package was a higher-level API around sections. This design caused it to become a bottleneck: * Implementing any new public behaviour for a section required bubbling it up to the dataobj API for it to be exposed, making it tedious to add new sections or update existing ones. * The `dataobj.Builder` pattern was focused on constructing dataobjs for storing log data, which will cause friction as we build objects around other use cases. This PR builds on top of the foundation laid out by #17704 and #17708, fully inverting the dependency between dataobj and sections: * The `dataobj` package has no knowledge of what sections exist, and can now be used for writing and reading generic sections. Section packages now create higher-level APIs around the abstractions provided by `dataobj`. * Section packages are now public, and callers interact directly with these packages for writing and reading section-specific data. * All logic for a section (encoding, decoding, buffering, reading) is now fully self-contained inside the section package. Previously, the implementation of each section was spread across three packages (`pkg/dataobj/internal/encoding`, `pkg/dataobj/internal/sections/SECTION`, `pkg/dataobj`). * Cutting a section is now a decision made by the caller rather than the section implementation. Previously, the logs section builder would create multiple sections. For the most part, this change is a no-op, with two exceptions: 1. Section cutting is now performed by the caller; however, this shouldn't result in any issues. 2. Removing the high-level `dataobj.Stream` and `dataobj.Record` types will temporarily reduce the allocation gains from #16988. I will address this after this PR is merged.
7 months ago
require.NoError(t, err)
t.Cleanup(func() { closer.Close() })
refactor(dataobj): invert dependency between dataobj and sections (#17762) Originally, the dataobj package was a higher-level API around sections. This design caused it to become a bottleneck: * Implementing any new public behaviour for a section required bubbling it up to the dataobj API for it to be exposed, making it tedious to add new sections or update existing ones. * The `dataobj.Builder` pattern was focused on constructing dataobjs for storing log data, which will cause friction as we build objects around other use cases. This PR builds on top of the foundation laid out by #17704 and #17708, fully inverting the dependency between dataobj and sections: * The `dataobj` package has no knowledge of what sections exist, and can now be used for writing and reading generic sections. Section packages now create higher-level APIs around the abstractions provided by `dataobj`. * Section packages are now public, and callers interact directly with these packages for writing and reading section-specific data. * All logic for a section (encoding, decoding, buffering, reading) is now fully self-contained inside the section package. Previously, the implementation of each section was spread across three packages (`pkg/dataobj/internal/encoding`, `pkg/dataobj/internal/sections/SECTION`, `pkg/dataobj`). * Cutting a section is now a decision made by the caller rather than the section implementation. Previously, the logs section builder would create multiple sections. For the most part, this change is a no-op, with two exceptions: 1. Section cutting is now performed by the caller; however, this shouldn't result in any issues. 2. Removing the high-level `dataobj.Stream` and `dataobj.Record` types will temporarily reduce the allocation gains from #16988. I will address this after this PR is merged.
7 months ago
return obj
}