package stats import ( "context" "errors" "fmt" _ "io" // Used for documenting io.EOF. "github.com/apache/arrow-go/v18/arrow" "github.com/apache/arrow-go/v18/arrow/memory" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/trace" "github.com/grafana/loki/v3/pkg/dataobj" "github.com/grafana/loki/v3/pkg/dataobj/internal/arrowconv" "github.com/grafana/loki/v3/pkg/dataobj/internal/dataset" "github.com/grafana/loki/v3/pkg/dataobj/sections/internal/columnar" memoryv2 "github.com/grafana/loki/v3/pkg/memory" "github.com/grafana/loki/v3/pkg/xcap" ) // Section represents an opened stats section. type Section struct { inner *columnar.Section columns []*Column } // Open opens a [Section] from an underlying [dataobj.Section]. Open returns an // error if the section metadata could not be read or if the provided ctx is // canceled. func Open(ctx context.Context, section *dataobj.Section) (*Section, error) { if !CheckSection(section) { return nil, fmt.Errorf("section type mismatch: got=%s want=%s", section.Type, sectionType) } else if section.Type.Version != columnar.FormatVersion { return nil, fmt.Errorf("unsupported section version: got=%d want=%d", section.Type.Version, columnar.FormatVersion) } dec, err := columnar.NewDecoder(section.Reader, section.Type.Version) if err != nil { return nil, fmt.Errorf("creating decoder: %w", err) } columnarSection, err := columnar.Open(ctx, section.Tenant, dec) if err != nil { return nil, fmt.Errorf("opening columnar section: %w", err) } sec := &Section{inner: columnarSection} if err := sec.init(); err != nil { return nil, fmt.Errorf("initializing section: %w", err) } return sec, nil } func (s *Section) init() error { for _, col := range s.inner.Columns() { colType, err := ParseColumnType(col.Type.Logical) if err != nil { // Skip over unrecognized columns; probably come from a newer // version of the code. continue } s.columns = append(s.columns, &Column{ Section: s, Name: col.Tag, Type: colType, inner: col, }) } return nil } // Columns returns the set of Columns in the section. The slice of returned // columns must not be mutated. // // Unrecognized columns (e.g., when running older code against newer stats // sections) are skipped. func (s *Section) Columns() []*Column { return s.columns } // A Column represents one of the columns in the stats section. Valid columns // can only be retrieved by calling [Section.Columns]. // // Data in columns can be read by using a [Reader]. type Column struct { Section *Section // Section that contains the column. Name string // Optional name of the column (label name for dynamic columns). Type ColumnType // Type of data in the column. inner *columnar.Column } var tracer = otel.Tracer("pkg/dataobj/sections/stats") // ReaderOptions customizes the behavior of a [Reader]. type ReaderOptions struct { // Columns to read. Each column must belong to the same [Section]. Columns []*Column // Allocator to use for allocating Arrow records. If nil, // [memory.DefaultAllocator] is used. Allocator memory.Allocator } // validate returns an error if opts is invalid. ReaderOptions are valid when // Columns is non-empty and every column belongs to the same Section. // // Package-private — not exposed publicly; called from init() to keep the // public surface minimal per spec. func (opts *ReaderOptions) validate() error { if len(opts.Columns) == 0 { return errors.New("ReaderOptions.Columns must be non-empty") } var section *Section for i, col := range opts.Columns { if col == nil { return fmt.Errorf("ReaderOptions.Columns[%d] is nil", i) } if section == nil { section = col.Section } else if col.Section != section { return fmt.Errorf("ReaderOptions.Columns[%d] belongs to a different Section", i) } } return nil } // A Reader reads batches of rows from a stats [Section]. Returned // [arrow.RecordBatch] values must be released by the caller after use. type Reader struct { opts ReaderOptions schema *arrow.Schema ready bool inner *columnar.ReaderAdapter alloc *memoryv2.Allocator readSpan trace.Span } var errReaderNotOpen = errors.New("reader not opened") // NewReader creates a new Reader. Options are not validated until the first // call to [Reader.Open]. func NewReader(opts ReaderOptions) *Reader { var r Reader r.Reset(opts) return &r } // Columns returns the [Column]s the Reader will read. func (r *Reader) Columns() []*Column { return r.opts.Columns } // Schema returns the [arrow.Schema] used by the Reader. Set on construction // (via [Reader.Reset]) so it is valid before [Reader.Open] is called. func (r *Reader) Schema() *arrow.Schema { return r.schema } // Reset reuses the Reader with new options. func (r *Reader) Reset(opts ReaderOptions) { if r.alloc == nil { r.alloc = memoryv2.NewAllocator(nil) } else { r.alloc.Reset() } r.opts = opts r.schema = columnsSchema(opts.Columns) r.readSpan = nil r.ready = false if r.inner != nil { _ = r.inner.Close() } } // Open initializes Reader resources. Must be called before [Reader.Read]. // Safe to call multiple times. func (r *Reader) Open(ctx context.Context) error { if r.ready { return nil } if err := r.init(ctx); err != nil { _ = r.Close() return fmt.Errorf("initializing Reader: %w", err) } return nil } // Read reads up to batchSize rows from the section. At end of section returns // (nil, io.EOF). May return a non-nil batch with io.EOF — callers should // process the batch before checking the error. Returned batches must be // released by the caller after use. func (r *Reader) Read(ctx context.Context, batchSize int) (arrow.RecordBatch, error) { if !r.ready { return nil, errReaderNotOpen } if r.readSpan == nil { ctx, r.readSpan = xcap.StartSpan(ctx, tracer, "stats.Reader.Read") } else { ctx = xcap.ContextWithSpan(ctx, r.readSpan) } defer r.alloc.Reclaim() rb, readErr := r.inner.Read(ctx, r.alloc, batchSize) result, err := arrowconv.ToRecordBatch(rb, r.schema) if err != nil { return nil, fmt.Errorf("convert columnar.RecordBatch to arrow.RecordBatch: %w", err) } return result, readErr } func (r *Reader) init(ctx context.Context) error { if err := r.opts.validate(); err != nil { return fmt.Errorf("invalid options: %w", err) } if r.opts.Allocator == nil { r.opts.Allocator = memory.DefaultAllocator } ctx, span := xcap.StartSpan(ctx, tracer, "stats.Reader.Open") defer span.End() cols := r.opts.Columns innerSection := cols[0].Section.inner innerColumns := make([]*columnar.Column, len(cols)) for i, c := range cols { innerColumns[i] = c.inner } dset, err := columnar.MakeDataset(innerSection, innerColumns) if err != nil { return fmt.Errorf("creating dataset: %w", err) } innerOptions := dataset.RowReaderOptions{ Dataset: dset, Columns: dset.Columns(), Prefetch: true, } if r.inner == nil { r.inner = columnar.NewReaderAdapter(innerOptions) } else { r.inner.Reset(innerOptions) } if err := r.inner.Open(ctx); err != nil { return fmt.Errorf("opening reader: %w", err) } r.ready = true return nil } // Close closes the Reader and releases any resources it holds. func (r *Reader) Close() error { if r.readSpan != nil { r.readSpan.End() } if r.inner != nil { return r.inner.Close() } return nil } // columnsSchema builds the arrow schema for the given projected columns. func columnsSchema(cols []*Column) *arrow.Schema { fields := make([]arrow.Field, 0, len(cols)) for _, col := range cols { fields = append(fields, columnToField(col)) } return arrow.NewSchema(fields, nil) } var columnDatatypes = map[ColumnType]arrow.DataType{ ColumnTypeInvalid: arrow.Null, ColumnTypeObjectPath: arrow.BinaryTypes.String, ColumnTypeSectionIndex: arrow.PrimitiveTypes.Int64, ColumnTypeSortSchema: arrow.BinaryTypes.String, ColumnTypeMinTimestamp: arrow.FixedWidthTypes.Timestamp_ns, ColumnTypeMaxTimestamp: arrow.FixedWidthTypes.Timestamp_ns, ColumnTypeRowCount: arrow.PrimitiveTypes.Int64, ColumnTypeUncompressedSize: arrow.PrimitiveTypes.Int64, ColumnTypeLabel: arrow.BinaryTypes.String, } func columnToField(col *Column) arrow.Field { dtype, ok := columnDatatypes[col.Type] if !ok { dtype = arrow.Null } return arrow.Field{ Name: makeColumnName(col.Name, col.Type.String(), dtype), Type: dtype, Nullable: true, } } // makeColumnName produces a unique field name "