package streams import ( "context" "errors" "fmt" _ "io" // Used for documenting io.EOF. "github.com/apache/arrow-go/v18/arrow" "github.com/apache/arrow-go/v18/arrow/array" "github.com/apache/arrow-go/v18/arrow/memory" "github.com/apache/arrow-go/v18/arrow/scalar" "github.com/grafana/loki/v3/pkg/dataobj/internal/arrowconv" "github.com/grafana/loki/v3/pkg/dataobj/internal/dataset" "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd" "github.com/grafana/loki/v3/pkg/dataobj/internal/util/slicegrow" "github.com/grafana/loki/v3/pkg/dataobj/sections/internal/columnar" ) // ReaderOptions customizes the behavior of a [Reader]. type ReaderOptions struct { // Columns to read. Each column must belong to the same [Section]. Columns []*Column // Predicates holds a set of predicates to apply when reading the section. // Columns referenced in Predicates must be in the set of Columns. Predicates []Predicate // Allocator to use for allocating Arrow records. If nil, // [memory.DefaultAllocator] is used. Allocator memory.Allocator } // Validate returns an error if the opts is not valid. ReaderOptions are only // valid when: // // - Each [Column] in Columns and Predicates belongs to the same [Section]. // - Scalar values used in predicates are of a supported type: an int64, // uint64, timestamp, or a byte array. func (opts *ReaderOptions) Validate() error { // Ensure all columns belong to the same section. var checkSection *Section var errs []error validateSection := func(col *Column) { if checkSection != nil && col.Section != checkSection { errs = append(errs, fmt.Errorf("all columns must belong to the same section: got=%p want=%p", col.Section, checkSection)) } else if checkSection == nil { checkSection = col.Section } } for _, col := range opts.Columns { validateSection(col) } if len(errs) > 0 { return errors.Join(errs...) } validateScalar := func(s scalar.Scalar) { _, ok := arrowconv.DatasetType(s.DataType()) if !ok { errs = append(errs, fmt.Errorf("unsupported scalar type %s", s.DataType())) } } for _, p := range opts.Predicates { walkPredicate(p, func(p Predicate) bool { // Validate that predicates use valid scalars. switch p := p.(type) { case nil: // End of walk; nothing to do. case AndPredicate: // Nothing to do. case OrPredicate: // Nothing to do. case NotPredicate: // Nothing to do. case TruePredicate: // Nothing to do. case FalsePredicate: // Nothing to do. case EqualPredicate: validateSection(p.Column) validateScalar(p.Value) case InPredicate: validateSection(p.Column) for _, val := range p.Values { validateScalar(val) } case GreaterThanPredicate: validateSection(p.Column) validateScalar(p.Value) case LessThanPredicate: validateSection(p.Column) validateScalar(p.Value) case FuncPredicate: validateSection(p.Column) default: errs = append(errs, fmt.Errorf("unrecognized predicate type %T", p)) } return true }) } return errors.Join(errs...) } // A Reader reads batches of rows from a [Section]. type Reader struct { opts ReaderOptions schema *arrow.Schema // Set on [Reader.Reset]. ready bool inner *dataset.Reader buf []dataset.Row } // NewReader creates a new Reader from the provided options. Options are not // validated until the first call to [Reader.Read]. func NewReader(opts ReaderOptions) *Reader { var r Reader r.Reset(opts) return &r } // Schema returns the [arrow.Schema] used by the Reader. Fields in the schema // match the order of columns listed in [ReaderOptions]. // // Names of fields in the schema are guaranteed to be unique per column but are // not guaranteed to be stable. // // The returned Schema must not be modified. func (r *Reader) Schema() *arrow.Schema { return r.schema } // Read reads the batch of rows from the section, returning them as an Arrow // record. // // If [ReaderOptions] has predicates, only rows that match the predicates are // returned. If none of the next batchSize rows matched the predicate, Read // returns a nil record with a nil error. // // Read will return an error if the next batch of rows could not be read due to // invalid options or I/O errors. At the end of the section, Read returns nil, // [io.EOF]. // // Read may return a non-nil record with a non-nil error, including if the // error is [io.EOF]. Callers should always process the record before // processing the error value. // // When a record is returned, it will match the schema specified by // [Reader.Schema]. These records must always be released after use. func (r *Reader) Read(ctx context.Context, batchSize int) (arrow.RecordBatch, error) { if !r.ready { err := r.init() if err != nil { return nil, fmt.Errorf("initializing Reader: %w", err) } } r.buf = slicegrow.GrowToCap(r.buf, batchSize) r.buf = r.buf[:batchSize] builder := array.NewRecordBuilder(r.opts.Allocator, r.schema) n, readErr := r.inner.Read(ctx, r.buf) for rowIndex := range n { row := r.buf[rowIndex] for columnIndex, val := range row.Values { if columnIndex >= len(r.opts.Columns) { // Ignore columns that are not in projection list. continue } columnBuilder := builder.Field(columnIndex) if val.IsNil() { columnBuilder.AppendNull() continue } // Append non-null values. We switch on [ColumnType] here so it's easier // to follow the mapping of ColumnType to Arrow type. The mappings here // should align with both [columnToField] (for Arrow type) and // [Builder.encodeTo] (for dataset type). // // Passing our byte slices to [array.StringBuilder.BinaryBuilder.Append] are safe; it // will copy the contents of the value and we can reuse the buffer on the // next call to [dataset.Reader.Read]. columnType := r.opts.Columns[columnIndex].Type switch columnType { case ColumnTypeInvalid: columnBuilder.AppendNull() // Unsupported column case ColumnTypeStreamID: // Appends IDs as int64 columnBuilder.(*array.Int64Builder).Append(val.Int64()) case ColumnTypeMinTimestamp, ColumnTypeMaxTimestamp: // Values are nanosecond timestamps as int64 columnBuilder.(*array.TimestampBuilder).Append(arrow.Timestamp(val.Int64())) case ColumnTypeLabel: // Appends labels as byte arrays columnBuilder.(*array.StringBuilder).BinaryBuilder.Append(val.Binary()) case ColumnTypeRows: // Appends rows as int64 columnBuilder.(*array.Int64Builder).Append(val.Int64()) case ColumnTypeUncompressedSize: // Appends uncompressed size as int64 columnBuilder.(*array.Int64Builder).Append(val.Int64()) default: // We'll only hit this if we added a new column type but forgot to // support reading it. return nil, fmt.Errorf("unsupported column type %s for column %d", columnType, columnIndex) } } } // We only return readErr after processing n so that we properly handle n>0 // while also getting an error such as io.EOF. return builder.NewRecordBatch(), readErr } func (r *Reader) init() error { if err := r.opts.Validate(); err != nil { return fmt.Errorf("invalid options: %w", err) } else if r.opts.Allocator == nil { r.opts.Allocator = memory.DefaultAllocator } // Compose dataset using projected columns and any additional columns // used for evaluating predicates. // // Non-projected columns are appended to the end of the list to allow // easy filtering of Row Values with index >= len(r.opts.Columns). cols := appendMissingColumns(r.opts.Columns, predicateColumns(r.opts.Predicates)) var innerSection *columnar.Section innerColumns := make([]*columnar.Column, len(cols)) for i, column := range cols { if innerSection == nil { innerSection = column.Section.inner } innerColumns[i] = column.inner } dset, err := columnar.MakeDataset(innerSection, innerColumns) if err != nil { return fmt.Errorf("creating dataset: %w", err) } else if len(dset.Columns()) != len(cols) { return fmt.Errorf("dataset has %d columns, expected %d", len(dset.Columns()), len(cols)) } columnLookup := make(map[*Column]dataset.Column, len(cols)) for i, col := range dset.Columns() { columnLookup[cols[i]] = col } preds, err := mapPredicates(r.opts.Predicates, columnLookup) if err != nil { return fmt.Errorf("mapping predicates: %w", err) } innerOptions := dataset.ReaderOptions{ Dataset: dset, Columns: dset.Columns(), Predicates: preds, Prefetch: true, } if r.inner == nil { r.inner = dataset.NewReader(innerOptions) } else { r.inner.Reset(innerOptions) } r.ready = true return nil } func mapPredicates(ps []Predicate, columnLookup map[*Column]dataset.Column) (predicates []dataset.Predicate, err error) { // For simplicity, [mapPredicate] and the functions it calls panic if they // encounter an unsupported conversion. // // These should normally be handled by [ReaderOptions.Validate], but we catch // any panics here to gracefully return an error to the caller instead of // potentially crashing the goroutine. defer func() { if r := recover(); r == nil { return } else if recoveredErr, ok := r.(error); ok { err = recoveredErr } else { err = fmt.Errorf("error while mapping: %v", r) } }() for _, p := range ps { predicates = append(predicates, mapPredicate(p, columnLookup)) } return } func mapPredicate(p Predicate, columnLookup map[*Column]dataset.Column) dataset.Predicate { switch p := p.(type) { case AndPredicate: return dataset.AndPredicate{ Left: mapPredicate(p.Left, columnLookup), Right: mapPredicate(p.Right, columnLookup), } case OrPredicate: return dataset.OrPredicate{ Left: mapPredicate(p.Left, columnLookup), Right: mapPredicate(p.Right, columnLookup), } case NotPredicate: return dataset.NotPredicate{ Inner: mapPredicate(p.Inner, columnLookup), } case TruePredicate: return dataset.TruePredicate{} case FalsePredicate: return dataset.FalsePredicate{} case EqualPredicate: col, ok := columnLookup[p.Column] if !ok { panic(fmt.Sprintf("column %p not found in column lookup", p.Column)) } return dataset.EqualPredicate{ Column: col, Value: arrowconv.FromScalar(p.Value, mustConvertType(p.Value.DataType())), } case InPredicate: col, ok := columnLookup[p.Column] if !ok { panic(fmt.Sprintf("column %p not found in column lookup", p.Column)) } vals := make([]dataset.Value, len(p.Values)) for i := range p.Values { vals[i] = arrowconv.FromScalar(p.Values[i], mustConvertType(p.Values[i].DataType())) } var valueSet dataset.ValueSet switch col.ColumnDesc().Type.Physical { case datasetmd.PHYSICAL_TYPE_INT64: valueSet = dataset.NewInt64ValueSet(vals) case datasetmd.PHYSICAL_TYPE_UINT64: valueSet = dataset.NewUint64ValueSet(vals) case datasetmd.PHYSICAL_TYPE_BINARY: valueSet = dataset.NewBinaryValueSet(vals) default: panic("InPredicate not implemented for datatype") } return dataset.InPredicate{ Column: col, Values: valueSet, } case GreaterThanPredicate: col, ok := columnLookup[p.Column] if !ok { panic(fmt.Sprintf("column %p not found in column lookup", p.Column)) } return dataset.GreaterThanPredicate{ Column: col, Value: arrowconv.FromScalar(p.Value, mustConvertType(p.Value.DataType())), } case LessThanPredicate: col, ok := columnLookup[p.Column] if !ok { panic(fmt.Sprintf("column %p not found in column lookup", p.Column)) } return dataset.LessThanPredicate{ Column: col, Value: arrowconv.FromScalar(p.Value, mustConvertType(p.Value.DataType())), } case FuncPredicate: col, ok := columnLookup[p.Column] if !ok { panic(fmt.Sprintf("column %p not found in column lookup", p.Column)) } fieldType := columnToField(p.Column).Type return dataset.FuncPredicate{ Column: col, Keep: func(_ dataset.Column, value dataset.Value) bool { return p.Keep(p.Column, arrowconv.ToScalar(value, fieldType)) }, } default: panic(fmt.Sprintf("unsupported predicate type %T", p)) } } func mustConvertType(dtype arrow.DataType) datasetmd.PhysicalType { toType, ok := arrowconv.DatasetType(dtype) if !ok { panic(fmt.Sprintf("unsupported dataset type %s", dtype)) } return toType } // Reset discards any state and resets r with a new set of optiosn. This // permits reusing a Reader rather than allocating a new one. func (r *Reader) Reset(opts ReaderOptions) { r.opts = opts r.schema = columnsSchema(opts.Columns) r.ready = false if r.inner != nil { // Close our inner reader so it releases resources immediately. It'll be // fully reset on the next call to [Reader.init]. _ = r.inner.Close() } } // Close closes the Reader and releases any resources it holds. Closed Readers // can be reused by calling [Reader.Reset]. func (r *Reader) Close() error { if r.inner != nil { return r.inner.Close() } return nil } func appendMissingColumns(dst, src []*Column) []*Column { exists := make(map[*Column]struct{}, len(dst)) for _, col := range dst { exists[col] = struct{}{} } for _, col := range src { if _, ok := exists[col]; !ok { // Not seen, add it. dst = append(dst, col) } } return dst } func columnsSchema(cols []*Column) *arrow.Schema { fields := make([]arrow.Field, 0, len(cols)) for _, col := range cols { fields = append(fields, columnToField(col)) } return arrow.NewSchema(fields, nil) } var columnDatatypes = map[ColumnType]arrow.DataType{ ColumnTypeInvalid: arrow.Null, ColumnTypeStreamID: arrow.PrimitiveTypes.Int64, ColumnTypeMinTimestamp: arrow.FixedWidthTypes.Timestamp_ns, ColumnTypeMaxTimestamp: arrow.FixedWidthTypes.Timestamp_ns, ColumnTypeLabel: arrow.BinaryTypes.String, ColumnTypeRows: arrow.PrimitiveTypes.Int64, ColumnTypeUncompressedSize: arrow.PrimitiveTypes.Int64, } func columnToField(col *Column) arrow.Field { dtype, ok := columnDatatypes[col.Type] if !ok { dtype = arrow.Null } return arrow.Field{ Name: makeColumnName(col.Name, col.Type.String(), dtype), Type: dtype, Nullable: true, // All columns are nullable. } } // makeColumnName returns a unique name for a [Column] and its expected data // type. // // Unique names are used by unit tests to be able to produce expected rows. func makeColumnName(label string, name string, dty arrow.DataType) string { switch { case label == "" && name == "": return dty.Name() case label == "" && name != "": return name + "." + dty.Name() default: if name == "" { name = "" } return label + "." + name + "." + dty.Name() } }