mirror of https://github.com/grafana/loki
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
396 lines
12 KiB
396 lines
12 KiB
package executor
|
|
|
|
import (
|
|
"cmp"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"slices"
|
|
"time"
|
|
|
|
"github.com/apache/arrow-go/v18/arrow"
|
|
"github.com/apache/arrow-go/v18/arrow/memory"
|
|
"github.com/apache/arrow-go/v18/arrow/scalar"
|
|
"github.com/go-kit/log"
|
|
|
|
"github.com/grafana/loki/v3/pkg/dataobj/sections/logs"
|
|
"github.com/grafana/loki/v3/pkg/dataobj/sections/streams"
|
|
"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical"
|
|
"github.com/grafana/loki/v3/pkg/engine/internal/semconv"
|
|
"github.com/grafana/loki/v3/pkg/engine/internal/types"
|
|
"github.com/grafana/loki/v3/pkg/xcap"
|
|
)
|
|
|
|
type dataobjScanOptions struct {
|
|
StreamsSection *streams.Section
|
|
LogsSection *logs.Section
|
|
StreamIDs []int64 // Stream IDs to match from logs sections.
|
|
Predicates []logs.Predicate // Predicate to apply to the logs.
|
|
Projections []physical.ColumnExpression // Columns to include. An empty slice means all columns.
|
|
|
|
BatchSize int64 // The buffer size for reading rows, derived from the engine batch size.
|
|
}
|
|
|
|
type dataobjScan struct {
|
|
opts dataobjScanOptions
|
|
logger log.Logger
|
|
region *xcap.Region
|
|
|
|
initialized bool
|
|
initializedAt time.Time
|
|
streams *streamsView
|
|
streamsInjector *streamInjector
|
|
reader *logs.Reader
|
|
desiredSchema *arrow.Schema
|
|
}
|
|
|
|
var _ Pipeline = (*dataobjScan)(nil)
|
|
|
|
// newDataobjScanPipeline creates a new Pipeline which emits a single
|
|
// [arrow.RecordBatch] composed of the requested log section in a data object. Rows
|
|
// in the returned record are ordered by timestamp in the direction specified
|
|
// by opts.Direction.
|
|
func newDataobjScanPipeline(opts dataobjScanOptions, logger log.Logger, region *xcap.Region) *dataobjScan {
|
|
return &dataobjScan{
|
|
opts: opts,
|
|
logger: logger,
|
|
region: region,
|
|
}
|
|
}
|
|
|
|
func (s *dataobjScan) Read(ctx context.Context) (arrow.RecordBatch, error) {
|
|
if err := s.init(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return s.read(xcap.ContextWithRegion(ctx, s.region))
|
|
}
|
|
|
|
func (s *dataobjScan) init() error {
|
|
if s.initialized {
|
|
return nil
|
|
}
|
|
|
|
// [dataobjScan.initLogs] depends on the result of [dataobjScan.initStreams]
|
|
// (to know whether label columns are needed), so we must initialize streams
|
|
// first.
|
|
if err := s.initStreams(); err != nil {
|
|
return fmt.Errorf("initializing streams: %w", err)
|
|
} else if err := s.initLogs(); err != nil {
|
|
return fmt.Errorf("initializing logs: %w", err)
|
|
}
|
|
|
|
s.initialized = true
|
|
s.initializedAt = time.Now().UTC()
|
|
return nil
|
|
}
|
|
|
|
func (s *dataobjScan) initStreams() error {
|
|
if s.opts.StreamsSection == nil {
|
|
return fmt.Errorf("no streams section provided")
|
|
}
|
|
|
|
columnsToRead := projectedLabelColumns(s.opts.StreamsSection, s.opts.Projections)
|
|
if len(columnsToRead) == 0 {
|
|
s.streams = nil
|
|
s.streamsInjector = nil
|
|
return nil
|
|
}
|
|
|
|
s.streams = newStreamsView(s.opts.StreamsSection, &streamsViewOptions{
|
|
StreamIDs: s.opts.StreamIDs,
|
|
LabelColumns: columnsToRead,
|
|
BatchSize: int(s.opts.BatchSize),
|
|
})
|
|
|
|
s.streamsInjector = newStreamInjector(s.streams)
|
|
return nil
|
|
}
|
|
|
|
// projectedLabelColumns returns the label columns to read from the given
|
|
// streams section for the provided list of projections. If projections is
|
|
// empty, all stream label columns are returned.
|
|
//
|
|
// References to columns in projections that do not exist in the section are
|
|
// ignored.
|
|
//
|
|
// If projections is non-empty but contains no references to labels or
|
|
// ambiguous columns, projectedLabelColumns returns nil to indicate that no
|
|
// label columns are needed.
|
|
func projectedLabelColumns(sec *streams.Section, projections []physical.ColumnExpression) []*streams.Column {
|
|
var found []*streams.Column
|
|
|
|
// Special case: if projections is empty, we return all label columns. While
|
|
// [streamsView] accepts a nil list to mean "all columns," we reserve nil
|
|
// here to mean "we're not projecting stream labels at all."
|
|
if len(projections) == 0 {
|
|
for _, col := range sec.Columns() {
|
|
if col.Type != streams.ColumnTypeLabel {
|
|
continue
|
|
}
|
|
found = append(found, col)
|
|
}
|
|
return found
|
|
}
|
|
|
|
// Inefficient search. Will we have enough columns + projections such that
|
|
// this needs to be optimized?
|
|
for _, projection := range projections {
|
|
expr, ok := projection.(*physical.ColumnExpr)
|
|
if !ok {
|
|
panic("invalid projection type, expected *physical.ColumnExpr")
|
|
}
|
|
|
|
// We're loading the sterams section for joining stream labels into
|
|
// records, so we only need to consider label and ambiguous columns here.
|
|
if expr.Ref.Type != types.ColumnTypeLabel && expr.Ref.Type != types.ColumnTypeAmbiguous {
|
|
continue
|
|
}
|
|
|
|
for _, col := range sec.Columns() {
|
|
if col.Type != streams.ColumnTypeLabel {
|
|
continue
|
|
}
|
|
|
|
if col.Name == expr.Ref.Column {
|
|
found = append(found, col)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
return found
|
|
}
|
|
|
|
func (s *dataobjScan) initLogs() error {
|
|
if s.opts.LogsSection == nil {
|
|
return fmt.Errorf("no logs section provided")
|
|
}
|
|
|
|
predicates := s.opts.Predicates
|
|
|
|
var columnsToRead []*logs.Column
|
|
|
|
if s.streams != nil || len(s.opts.StreamIDs) > 0 {
|
|
// We're reading sreams, so we need to include the stream ID column.
|
|
var streamIDColumn *logs.Column
|
|
|
|
for _, col := range s.opts.LogsSection.Columns() {
|
|
if col.Type == logs.ColumnTypeStreamID {
|
|
streamIDColumn = col
|
|
columnsToRead = append(columnsToRead, streamIDColumn)
|
|
break
|
|
}
|
|
}
|
|
|
|
if streamIDColumn == nil {
|
|
return fmt.Errorf("logs section does not contain stream ID column")
|
|
}
|
|
|
|
if len(s.opts.StreamIDs) > 0 {
|
|
predicates = append([]logs.Predicate{
|
|
logs.InPredicate{
|
|
Column: streamIDColumn,
|
|
Values: makeScalars(s.opts.StreamIDs),
|
|
},
|
|
}, predicates...)
|
|
}
|
|
}
|
|
|
|
columnsToRead = append(columnsToRead, projectedLogsColumns(s.opts.LogsSection, s.opts.Projections)...)
|
|
|
|
s.reader = logs.NewReader(logs.ReaderOptions{
|
|
// TODO(rfratto): is it possible to hit an edge case where len(columnsToRead)
|
|
// == 0, indicating that we don't need to read any logs at all? How should we
|
|
// handle that?
|
|
Columns: columnsToRead,
|
|
|
|
Predicates: predicates,
|
|
Allocator: memory.DefaultAllocator,
|
|
})
|
|
|
|
// Create the engine-compatible expected schema for the logs section.
|
|
origSchema := s.reader.Schema()
|
|
if got, want := origSchema.NumFields(), len(columnsToRead); got != want {
|
|
return fmt.Errorf("logs.Reader returned schema with %d fields, expected %d", got, want)
|
|
}
|
|
|
|
// Convert the logs columns to engine-compatible fields.
|
|
var desiredFields []arrow.Field
|
|
for _, col := range columnsToRead {
|
|
field, err := logsColumnToEngineField(col)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
desiredFields = append(desiredFields, field)
|
|
}
|
|
|
|
s.desiredSchema = arrow.NewSchema(desiredFields, nil)
|
|
return nil
|
|
}
|
|
|
|
func makeScalars[S ~[]E, E any](s S) []scalar.Scalar {
|
|
scalars := make([]scalar.Scalar, len(s))
|
|
for i, v := range s {
|
|
scalars[i] = scalar.MakeScalar(v)
|
|
}
|
|
return scalars
|
|
}
|
|
|
|
// logsColumnToEngineField gets the engine-compatible [arrow.Field] for the
|
|
// given [logs.Column]. Returns an error if the column is not expected by the
|
|
// engine.
|
|
func logsColumnToEngineField(col *logs.Column) (arrow.Field, error) {
|
|
switch col.Type {
|
|
case logs.ColumnTypeStreamID:
|
|
return semconv.FieldFromIdent(streamInjectorColumnIdent, true), nil
|
|
|
|
case logs.ColumnTypeTimestamp:
|
|
return semconv.FieldFromIdent(semconv.ColumnIdentTimestamp, true), nil
|
|
|
|
case logs.ColumnTypeMessage:
|
|
return semconv.FieldFromIdent(semconv.ColumnIdentMessage, true), nil
|
|
|
|
case logs.ColumnTypeMetadata:
|
|
return semconv.FieldFromIdent(semconv.NewIdentifier(col.Name, types.ColumnTypeMetadata, types.Loki.String), true), nil
|
|
}
|
|
|
|
return arrow.Field{}, fmt.Errorf("unsupported logs column type %s", col.Type)
|
|
}
|
|
|
|
var logsColumnPrecedence = map[logs.ColumnType]int{
|
|
logs.ColumnTypeInvalid: 0,
|
|
logs.ColumnTypeStreamID: 1,
|
|
logs.ColumnTypeMetadata: 2,
|
|
logs.ColumnTypeTimestamp: 3,
|
|
logs.ColumnTypeMessage: 4,
|
|
}
|
|
|
|
// projectedLogsColumns returns the section columns to read from the given logs
|
|
// section for the provided list of projections. If projections is empty, all
|
|
// section columns are returned (except for stream ID).
|
|
//
|
|
// References to columns in projections that do not exist in the section are
|
|
// ignored. If no projections reference any known column in the logs section,
|
|
// projectedLogsColumns returns nil.
|
|
//
|
|
// projectedLogsColumns never includes the stream ID column in its results, as
|
|
// projections can never reference a stream ID column.
|
|
func projectedLogsColumns(sec *logs.Section, projections []physical.ColumnExpression) []*logs.Column {
|
|
var found []*logs.Column
|
|
|
|
defer func() {
|
|
// Use consistent sort order for columns. While at some point we may wish
|
|
// to have a consistent order based on projections, that would need to be
|
|
// handled at read time after we inject stream labels.
|
|
slices.SortFunc(found, func(a, b *logs.Column) int {
|
|
if a.Type != b.Type {
|
|
// Sort by type precedence.
|
|
return cmp.Compare(logsColumnPrecedence[a.Type], logsColumnPrecedence[b.Type])
|
|
}
|
|
return cmp.Compare(a.Name, b.Name)
|
|
})
|
|
}()
|
|
|
|
// Special case: if projections is empty, we return all columns. While
|
|
// [logs.Reader] accepts a nil list to mean "all columns," we reserve nil
|
|
// here to mean "we're not projecting any columns at all."
|
|
if len(projections) == 0 {
|
|
for _, col := range sec.Columns() {
|
|
if col.Type == logs.ColumnTypeStreamID {
|
|
continue
|
|
}
|
|
found = append(found, col)
|
|
}
|
|
|
|
return found
|
|
}
|
|
|
|
// Inefficient search. Will we have enough columns + projections such that
|
|
// this needs to be optimized?
|
|
NextProjection:
|
|
for _, projection := range projections {
|
|
expr, ok := projection.(*physical.ColumnExpr)
|
|
if !ok {
|
|
panic("invalid projection type, expected *physical.ColumnExpr")
|
|
}
|
|
|
|
// Ignore columns that cannot exist in the logs section.
|
|
switch expr.Ref.Type {
|
|
case types.ColumnTypeLabel, types.ColumnTypeParsed, types.ColumnTypeGenerated:
|
|
continue NextProjection
|
|
}
|
|
|
|
for _, col := range sec.Columns() {
|
|
switch {
|
|
case expr.Ref.Type == types.ColumnTypeBuiltin && expr.Ref.Column == types.ColumnNameBuiltinTimestamp && col.Type == logs.ColumnTypeTimestamp:
|
|
found = append(found, col)
|
|
continue NextProjection
|
|
|
|
case expr.Ref.Type == types.ColumnTypeBuiltin && expr.Ref.Column == types.ColumnNameBuiltinMessage && col.Type == logs.ColumnTypeMessage:
|
|
found = append(found, col)
|
|
continue NextProjection
|
|
|
|
case expr.Ref.Type == types.ColumnTypeMetadata && col.Type == logs.ColumnTypeMetadata && col.Name == expr.Ref.Column:
|
|
found = append(found, col)
|
|
continue NextProjection
|
|
|
|
case expr.Ref.Type == types.ColumnTypeAmbiguous && col.Type == logs.ColumnTypeMetadata && col.Name == expr.Ref.Column:
|
|
found = append(found, col)
|
|
continue NextProjection
|
|
}
|
|
}
|
|
}
|
|
|
|
return found
|
|
}
|
|
|
|
// read reads the entire section into memory and generates an [arrow.RecordBatch]
|
|
// from the data. It returns an error if reading a section resulted in an
|
|
// error.
|
|
func (s *dataobjScan) read(ctx context.Context) (arrow.RecordBatch, error) {
|
|
rec, err := s.reader.Read(ctx, int(s.opts.BatchSize))
|
|
if err != nil && !errors.Is(err, io.EOF) {
|
|
return nil, err
|
|
} else if (rec == nil || rec.NumRows() == 0) && errors.Is(err, io.EOF) {
|
|
return nil, EOF
|
|
}
|
|
|
|
// Update the schema of the record to match the schema the engine expects.
|
|
rec, err = changeSchema(rec, s.desiredSchema)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("changing schema: %w", err)
|
|
}
|
|
|
|
if s.streamsInjector == nil {
|
|
// No streams injector needed, so we return the record as-is.
|
|
return rec, nil
|
|
}
|
|
|
|
return s.streamsInjector.Inject(ctx, rec)
|
|
}
|
|
|
|
// Close closes s and releases all resources.
|
|
func (s *dataobjScan) Close() {
|
|
if s.region != nil {
|
|
s.region.End()
|
|
}
|
|
if s.streams != nil {
|
|
s.streams.Close()
|
|
}
|
|
if s.reader != nil {
|
|
_ = s.reader.Close()
|
|
}
|
|
|
|
s.initialized = false
|
|
s.streams = nil
|
|
s.streamsInjector = nil
|
|
s.reader = nil
|
|
s.region = nil
|
|
}
|
|
|
|
// Region implements RegionProvider.
|
|
func (s *dataobjScan) Region() *xcap.Region {
|
|
return s.region
|
|
}
|
|
|