Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
loki/pkg/engine/internal/executor/dataobjscan.go

397 lines
12 KiB

package executor
import (
"cmp"
"context"
"errors"
"fmt"
"io"
"slices"
"time"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/apache/arrow-go/v18/arrow/scalar"
"github.com/go-kit/log"
refactor(dataobj): invert dependency between dataobj and sections (#17762) Originally, the dataobj package was a higher-level API around sections. This design caused it to become a bottleneck: * Implementing any new public behaviour for a section required bubbling it up to the dataobj API for it to be exposed, making it tedious to add new sections or update existing ones. * The `dataobj.Builder` pattern was focused on constructing dataobjs for storing log data, which will cause friction as we build objects around other use cases. This PR builds on top of the foundation laid out by #17704 and #17708, fully inverting the dependency between dataobj and sections: * The `dataobj` package has no knowledge of what sections exist, and can now be used for writing and reading generic sections. Section packages now create higher-level APIs around the abstractions provided by `dataobj`. * Section packages are now public, and callers interact directly with these packages for writing and reading section-specific data. * All logic for a section (encoding, decoding, buffering, reading) is now fully self-contained inside the section package. Previously, the implementation of each section was spread across three packages (`pkg/dataobj/internal/encoding`, `pkg/dataobj/internal/sections/SECTION`, `pkg/dataobj`). * Cutting a section is now a decision made by the caller rather than the section implementation. Previously, the logs section builder would create multiple sections. For the most part, this change is a no-op, with two exceptions: 1. Section cutting is now performed by the caller; however, this shouldn't result in any issues. 2. Removing the high-level `dataobj.Stream` and `dataobj.Record` types will temporarily reduce the allocation gains from #16988. I will address this after this PR is merged.
7 months ago
"github.com/grafana/loki/v3/pkg/dataobj/sections/logs"
"github.com/grafana/loki/v3/pkg/dataobj/sections/streams"
"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical"
"github.com/grafana/loki/v3/pkg/engine/internal/semconv"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
"github.com/grafana/loki/v3/pkg/xcap"
)
type dataobjScanOptions struct {
StreamsSection *streams.Section
LogsSection *logs.Section
StreamIDs []int64 // Stream IDs to match from logs sections.
Predicates []logs.Predicate // Predicate to apply to the logs.
Projections []physical.ColumnExpression // Columns to include. An empty slice means all columns.
BatchSize int64 // The buffer size for reading rows, derived from the engine batch size.
}
type dataobjScan struct {
opts dataobjScanOptions
logger log.Logger
region *xcap.Region
initialized bool
initializedAt time.Time
streams *streamsView
streamsInjector *streamInjector
reader *logs.Reader
desiredSchema *arrow.Schema
}
var _ Pipeline = (*dataobjScan)(nil)
// newDataobjScanPipeline creates a new Pipeline which emits a single
// [arrow.RecordBatch] composed of the requested log section in a data object. Rows
// in the returned record are ordered by timestamp in the direction specified
// by opts.Direction.
func newDataobjScanPipeline(opts dataobjScanOptions, logger log.Logger, region *xcap.Region) *dataobjScan {
return &dataobjScan{
opts: opts,
logger: logger,
region: region,
}
}
func (s *dataobjScan) Read(ctx context.Context) (arrow.RecordBatch, error) {
if err := s.init(); err != nil {
return nil, err
}
return s.read(xcap.ContextWithRegion(ctx, s.region))
}
func (s *dataobjScan) init() error {
if s.initialized {
return nil
}
// [dataobjScan.initLogs] depends on the result of [dataobjScan.initStreams]
// (to know whether label columns are needed), so we must initialize streams
// first.
if err := s.initStreams(); err != nil {
return fmt.Errorf("initializing streams: %w", err)
} else if err := s.initLogs(); err != nil {
return fmt.Errorf("initializing logs: %w", err)
}
s.initialized = true
s.initializedAt = time.Now().UTC()
return nil
}
func (s *dataobjScan) initStreams() error {
if s.opts.StreamsSection == nil {
return fmt.Errorf("no streams section provided")
}
columnsToRead := projectedLabelColumns(s.opts.StreamsSection, s.opts.Projections)
if len(columnsToRead) == 0 {
s.streams = nil
s.streamsInjector = nil
return nil
}
s.streams = newStreamsView(s.opts.StreamsSection, &streamsViewOptions{
StreamIDs: s.opts.StreamIDs,
LabelColumns: columnsToRead,
BatchSize: int(s.opts.BatchSize),
})
s.streamsInjector = newStreamInjector(s.streams)
return nil
}
// projectedLabelColumns returns the label columns to read from the given
// streams section for the provided list of projections. If projections is
// empty, all stream label columns are returned.
//
// References to columns in projections that do not exist in the section are
// ignored.
//
// If projections is non-empty but contains no references to labels or
// ambiguous columns, projectedLabelColumns returns nil to indicate that no
// label columns are needed.
func projectedLabelColumns(sec *streams.Section, projections []physical.ColumnExpression) []*streams.Column {
var found []*streams.Column
// Special case: if projections is empty, we return all label columns. While
// [streamsView] accepts a nil list to mean "all columns," we reserve nil
// here to mean "we're not projecting stream labels at all."
if len(projections) == 0 {
for _, col := range sec.Columns() {
if col.Type != streams.ColumnTypeLabel {
continue
}
found = append(found, col)
}
return found
}
// Inefficient search. Will we have enough columns + projections such that
// this needs to be optimized?
for _, projection := range projections {
expr, ok := projection.(*physical.ColumnExpr)
if !ok {
panic("invalid projection type, expected *physical.ColumnExpr")
refactor(dataobj): invert dependency between dataobj and sections (#17762) Originally, the dataobj package was a higher-level API around sections. This design caused it to become a bottleneck: * Implementing any new public behaviour for a section required bubbling it up to the dataobj API for it to be exposed, making it tedious to add new sections or update existing ones. * The `dataobj.Builder` pattern was focused on constructing dataobjs for storing log data, which will cause friction as we build objects around other use cases. This PR builds on top of the foundation laid out by #17704 and #17708, fully inverting the dependency between dataobj and sections: * The `dataobj` package has no knowledge of what sections exist, and can now be used for writing and reading generic sections. Section packages now create higher-level APIs around the abstractions provided by `dataobj`. * Section packages are now public, and callers interact directly with these packages for writing and reading section-specific data. * All logic for a section (encoding, decoding, buffering, reading) is now fully self-contained inside the section package. Previously, the implementation of each section was spread across three packages (`pkg/dataobj/internal/encoding`, `pkg/dataobj/internal/sections/SECTION`, `pkg/dataobj`). * Cutting a section is now a decision made by the caller rather than the section implementation. Previously, the logs section builder would create multiple sections. For the most part, this change is a no-op, with two exceptions: 1. Section cutting is now performed by the caller; however, this shouldn't result in any issues. 2. Removing the high-level `dataobj.Stream` and `dataobj.Record` types will temporarily reduce the allocation gains from #16988. I will address this after this PR is merged.
7 months ago
}
// We're loading the sterams section for joining stream labels into
// records, so we only need to consider label and ambiguous columns here.
if expr.Ref.Type != types.ColumnTypeLabel && expr.Ref.Type != types.ColumnTypeAmbiguous {
continue
}
for _, col := range sec.Columns() {
if col.Type != streams.ColumnTypeLabel {
continue
}
if col.Name == expr.Ref.Column {
found = append(found, col)
break
}
}
}
return found
}
func (s *dataobjScan) initLogs() error {
if s.opts.LogsSection == nil {
return fmt.Errorf("no logs section provided")
}
predicates := s.opts.Predicates
var columnsToRead []*logs.Column
if s.streams != nil || len(s.opts.StreamIDs) > 0 {
// We're reading sreams, so we need to include the stream ID column.
var streamIDColumn *logs.Column
for _, col := range s.opts.LogsSection.Columns() {
if col.Type == logs.ColumnTypeStreamID {
streamIDColumn = col
columnsToRead = append(columnsToRead, streamIDColumn)
break
}
}
if streamIDColumn == nil {
return fmt.Errorf("logs section does not contain stream ID column")
}
if len(s.opts.StreamIDs) > 0 {
predicates = append([]logs.Predicate{
logs.InPredicate{
Column: streamIDColumn,
Values: makeScalars(s.opts.StreamIDs),
},
}, predicates...)
}
}
columnsToRead = append(columnsToRead, projectedLogsColumns(s.opts.LogsSection, s.opts.Projections)...)
s.reader = logs.NewReader(logs.ReaderOptions{
// TODO(rfratto): is it possible to hit an edge case where len(columnsToRead)
// == 0, indicating that we don't need to read any logs at all? How should we
// handle that?
Columns: columnsToRead,
Predicates: predicates,
Allocator: memory.DefaultAllocator,
})
// Create the engine-compatible expected schema for the logs section.
origSchema := s.reader.Schema()
if got, want := origSchema.NumFields(), len(columnsToRead); got != want {
return fmt.Errorf("logs.Reader returned schema with %d fields, expected %d", got, want)
}
// Convert the logs columns to engine-compatible fields.
var desiredFields []arrow.Field
for _, col := range columnsToRead {
field, err := logsColumnToEngineField(col)
if err != nil {
return err
}
desiredFields = append(desiredFields, field)
}
s.desiredSchema = arrow.NewSchema(desiredFields, nil)
return nil
}
func makeScalars[S ~[]E, E any](s S) []scalar.Scalar {
scalars := make([]scalar.Scalar, len(s))
for i, v := range s {
scalars[i] = scalar.MakeScalar(v)
}
return scalars
}
// logsColumnToEngineField gets the engine-compatible [arrow.Field] for the
// given [logs.Column]. Returns an error if the column is not expected by the
// engine.
func logsColumnToEngineField(col *logs.Column) (arrow.Field, error) {
switch col.Type {
case logs.ColumnTypeStreamID:
return semconv.FieldFromIdent(streamInjectorColumnIdent, true), nil
case logs.ColumnTypeTimestamp:
return semconv.FieldFromIdent(semconv.ColumnIdentTimestamp, true), nil
case logs.ColumnTypeMessage:
return semconv.FieldFromIdent(semconv.ColumnIdentMessage, true), nil
case logs.ColumnTypeMetadata:
return semconv.FieldFromIdent(semconv.NewIdentifier(col.Name, types.ColumnTypeMetadata, types.Loki.String), true), nil
}
return arrow.Field{}, fmt.Errorf("unsupported logs column type %s", col.Type)
}
var logsColumnPrecedence = map[logs.ColumnType]int{
logs.ColumnTypeInvalid: 0,
logs.ColumnTypeStreamID: 1,
logs.ColumnTypeMetadata: 2,
logs.ColumnTypeTimestamp: 3,
logs.ColumnTypeMessage: 4,
}
// projectedLogsColumns returns the section columns to read from the given logs
// section for the provided list of projections. If projections is empty, all
// section columns are returned (except for stream ID).
//
// References to columns in projections that do not exist in the section are
// ignored. If no projections reference any known column in the logs section,
// projectedLogsColumns returns nil.
//
// projectedLogsColumns never includes the stream ID column in its results, as
// projections can never reference a stream ID column.
func projectedLogsColumns(sec *logs.Section, projections []physical.ColumnExpression) []*logs.Column {
var found []*logs.Column
defer func() {
// Use consistent sort order for columns. While at some point we may wish
// to have a consistent order based on projections, that would need to be
// handled at read time after we inject stream labels.
slices.SortFunc(found, func(a, b *logs.Column) int {
if a.Type != b.Type {
// Sort by type precedence.
return cmp.Compare(logsColumnPrecedence[a.Type], logsColumnPrecedence[b.Type])
}
return cmp.Compare(a.Name, b.Name)
})
}()
// Special case: if projections is empty, we return all columns. While
// [logs.Reader] accepts a nil list to mean "all columns," we reserve nil
// here to mean "we're not projecting any columns at all."
if len(projections) == 0 {
for _, col := range sec.Columns() {
if col.Type == logs.ColumnTypeStreamID {
continue
}
found = append(found, col)
}
return found
}
// Inefficient search. Will we have enough columns + projections such that
// this needs to be optimized?
NextProjection:
for _, projection := range projections {
expr, ok := projection.(*physical.ColumnExpr)
if !ok {
panic("invalid projection type, expected *physical.ColumnExpr")
}
// Ignore columns that cannot exist in the logs section.
switch expr.Ref.Type {
case types.ColumnTypeLabel, types.ColumnTypeParsed, types.ColumnTypeGenerated:
continue NextProjection
}
for _, col := range sec.Columns() {
switch {
case expr.Ref.Type == types.ColumnTypeBuiltin && expr.Ref.Column == types.ColumnNameBuiltinTimestamp && col.Type == logs.ColumnTypeTimestamp:
found = append(found, col)
continue NextProjection
case expr.Ref.Type == types.ColumnTypeBuiltin && expr.Ref.Column == types.ColumnNameBuiltinMessage && col.Type == logs.ColumnTypeMessage:
found = append(found, col)
continue NextProjection
case expr.Ref.Type == types.ColumnTypeMetadata && col.Type == logs.ColumnTypeMetadata && col.Name == expr.Ref.Column:
found = append(found, col)
continue NextProjection
case expr.Ref.Type == types.ColumnTypeAmbiguous && col.Type == logs.ColumnTypeMetadata && col.Name == expr.Ref.Column:
found = append(found, col)
continue NextProjection
}
}
}
return found
}
// read reads the entire section into memory and generates an [arrow.RecordBatch]
// from the data. It returns an error if reading a section resulted in an
// error.
func (s *dataobjScan) read(ctx context.Context) (arrow.RecordBatch, error) {
rec, err := s.reader.Read(ctx, int(s.opts.BatchSize))
if err != nil && !errors.Is(err, io.EOF) {
return nil, err
} else if (rec == nil || rec.NumRows() == 0) && errors.Is(err, io.EOF) {
return nil, EOF
}
// Update the schema of the record to match the schema the engine expects.
rec, err = changeSchema(rec, s.desiredSchema)
if err != nil {
return nil, fmt.Errorf("changing schema: %w", err)
}
if s.streamsInjector == nil {
// No streams injector needed, so we return the record as-is.
return rec, nil
}
return s.streamsInjector.Inject(ctx, rec)
}
// Close closes s and releases all resources.
func (s *dataobjScan) Close() {
if s.region != nil {
s.region.End()
}
if s.streams != nil {
s.streams.Close()
}
if s.reader != nil {
_ = s.reader.Close()
}
s.initialized = false
s.streams = nil
s.streamsInjector = nil
s.reader = nil
s.region = nil
}
// Region implements RegionProvider.
func (s *dataobjScan) Region() *xcap.Region {
return s.region
}