Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/dataobj/sections/stats/reader.go

307 lines
8.7 KiB

package stats
import (
"context"
"errors"
"fmt"
_ "io" // Used for documenting io.EOF.
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/memory"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/dataobj/internal/arrowconv"
"github.com/grafana/loki/v3/pkg/dataobj/internal/dataset"
"github.com/grafana/loki/v3/pkg/dataobj/sections/internal/columnar"
memoryv2 "github.com/grafana/loki/v3/pkg/memory"
"github.com/grafana/loki/v3/pkg/xcap"
)
// Section represents an opened stats section.
type Section struct {
inner *columnar.Section
columns []*Column
}
// Open opens a [Section] from an underlying [dataobj.Section]. Open returns an
// error if the section metadata could not be read or if the provided ctx is
// canceled.
func Open(ctx context.Context, section *dataobj.Section) (*Section, error) {
if !CheckSection(section) {
return nil, fmt.Errorf("section type mismatch: got=%s want=%s", section.Type, sectionType)
} else if section.Type.Version != columnar.FormatVersion {
return nil, fmt.Errorf("unsupported section version: got=%d want=%d", section.Type.Version, columnar.FormatVersion)
}
dec, err := columnar.NewDecoder(section.Reader, section.Type.Version)
if err != nil {
return nil, fmt.Errorf("creating decoder: %w", err)
}
columnarSection, err := columnar.Open(ctx, section.Tenant, dec)
if err != nil {
return nil, fmt.Errorf("opening columnar section: %w", err)
}
sec := &Section{inner: columnarSection}
if err := sec.init(); err != nil {
return nil, fmt.Errorf("initializing section: %w", err)
}
return sec, nil
}
func (s *Section) init() error {
for _, col := range s.inner.Columns() {
colType, err := ParseColumnType(col.Type.Logical)
if err != nil {
// Skip over unrecognized columns; probably come from a newer
// version of the code.
continue
}
s.columns = append(s.columns, &Column{
Section: s,
Name: col.Tag,
Type: colType,
inner: col,
})
}
return nil
}
// Columns returns the set of Columns in the section. The slice of returned
// columns must not be mutated.
//
// Unrecognized columns (e.g., when running older code against newer stats
// sections) are skipped.
func (s *Section) Columns() []*Column { return s.columns }
// A Column represents one of the columns in the stats section. Valid columns
// can only be retrieved by calling [Section.Columns].
//
// Data in columns can be read by using a [Reader].
type Column struct {
Section *Section // Section that contains the column.
Name string // Optional name of the column (label name for dynamic columns).
Type ColumnType // Type of data in the column.
inner *columnar.Column
}
var tracer = otel.Tracer("pkg/dataobj/sections/stats")
// ReaderOptions customizes the behavior of a [Reader].
type ReaderOptions struct {
// Columns to read. Each column must belong to the same [Section].
Columns []*Column
// Allocator to use for allocating Arrow records. If nil,
// [memory.DefaultAllocator] is used.
Allocator memory.Allocator
}
// validate returns an error if opts is invalid. ReaderOptions are valid when
// Columns is non-empty and every column belongs to the same Section.
//
// Package-private — not exposed publicly; called from init() to keep the
// public surface minimal per spec.
func (opts *ReaderOptions) validate() error {
if len(opts.Columns) == 0 {
return errors.New("ReaderOptions.Columns must be non-empty")
}
var section *Section
for i, col := range opts.Columns {
if col == nil {
return fmt.Errorf("ReaderOptions.Columns[%d] is nil", i)
}
if section == nil {
section = col.Section
} else if col.Section != section {
return fmt.Errorf("ReaderOptions.Columns[%d] belongs to a different Section", i)
}
}
return nil
}
// A Reader reads batches of rows from a stats [Section]. Returned
// [arrow.RecordBatch] values must be released by the caller after use.
type Reader struct {
opts ReaderOptions
schema *arrow.Schema
ready bool
inner *columnar.ReaderAdapter
alloc *memoryv2.Allocator
readSpan trace.Span
}
var errReaderNotOpen = errors.New("reader not opened")
// NewReader creates a new Reader. Options are not validated until the first
// call to [Reader.Open].
func NewReader(opts ReaderOptions) *Reader {
var r Reader
r.Reset(opts)
return &r
}
// Columns returns the [Column]s the Reader will read.
func (r *Reader) Columns() []*Column { return r.opts.Columns }
// Schema returns the [arrow.Schema] used by the Reader. Set on construction
// (via [Reader.Reset]) so it is valid before [Reader.Open] is called.
func (r *Reader) Schema() *arrow.Schema { return r.schema }
// Reset reuses the Reader with new options.
func (r *Reader) Reset(opts ReaderOptions) {
if r.alloc == nil {
r.alloc = memoryv2.NewAllocator(nil)
} else {
r.alloc.Reset()
}
r.opts = opts
r.schema = columnsSchema(opts.Columns)
r.readSpan = nil
r.ready = false
if r.inner != nil {
_ = r.inner.Close()
}
}
// Open initializes Reader resources. Must be called before [Reader.Read].
// Safe to call multiple times.
func (r *Reader) Open(ctx context.Context) error {
if r.ready {
return nil
}
if err := r.init(ctx); err != nil {
_ = r.Close()
return fmt.Errorf("initializing Reader: %w", err)
}
return nil
}
// Read reads up to batchSize rows from the section. At end of section returns
// (nil, io.EOF). May return a non-nil batch with io.EOF — callers should
// process the batch before checking the error. Returned batches must be
// released by the caller after use.
func (r *Reader) Read(ctx context.Context, batchSize int) (arrow.RecordBatch, error) {
if !r.ready {
return nil, errReaderNotOpen
}
if r.readSpan == nil {
ctx, r.readSpan = xcap.StartSpan(ctx, tracer, "stats.Reader.Read")
} else {
ctx = xcap.ContextWithSpan(ctx, r.readSpan)
}
defer r.alloc.Reclaim()
rb, readErr := r.inner.Read(ctx, r.alloc, batchSize)
result, err := arrowconv.ToRecordBatch(rb, r.schema)
if err != nil {
return nil, fmt.Errorf("convert columnar.RecordBatch to arrow.RecordBatch: %w", err)
}
return result, readErr
}
func (r *Reader) init(ctx context.Context) error {
if err := r.opts.validate(); err != nil {
return fmt.Errorf("invalid options: %w", err)
}
if r.opts.Allocator == nil {
r.opts.Allocator = memory.DefaultAllocator
}
ctx, span := xcap.StartSpan(ctx, tracer, "stats.Reader.Open")
defer span.End()
cols := r.opts.Columns
innerSection := cols[0].Section.inner
innerColumns := make([]*columnar.Column, len(cols))
for i, c := range cols {
innerColumns[i] = c.inner
}
dset, err := columnar.MakeDataset(innerSection, innerColumns)
if err != nil {
return fmt.Errorf("creating dataset: %w", err)
}
innerOptions := dataset.RowReaderOptions{
Dataset: dset,
Columns: dset.Columns(),
Prefetch: true,
}
if r.inner == nil {
r.inner = columnar.NewReaderAdapter(innerOptions)
} else {
r.inner.Reset(innerOptions)
}
if err := r.inner.Open(ctx); err != nil {
return fmt.Errorf("opening reader: %w", err)
}
r.ready = true
return nil
}
// Close closes the Reader and releases any resources it holds.
func (r *Reader) Close() error {
if r.readSpan != nil {
r.readSpan.End()
}
if r.inner != nil {
return r.inner.Close()
}
return nil
}
// columnsSchema builds the arrow schema for the given projected columns.
func columnsSchema(cols []*Column) *arrow.Schema {
fields := make([]arrow.Field, 0, len(cols))
for _, col := range cols {
fields = append(fields, columnToField(col))
}
return arrow.NewSchema(fields, nil)
}
var columnDatatypes = map[ColumnType]arrow.DataType{
ColumnTypeInvalid: arrow.Null,
ColumnTypeObjectPath: arrow.BinaryTypes.String,
ColumnTypeSectionIndex: arrow.PrimitiveTypes.Int64,
ColumnTypeSortSchema: arrow.BinaryTypes.String,
ColumnTypeMinTimestamp: arrow.FixedWidthTypes.Timestamp_ns,
ColumnTypeMaxTimestamp: arrow.FixedWidthTypes.Timestamp_ns,
ColumnTypeRowCount: arrow.PrimitiveTypes.Int64,
ColumnTypeUncompressedSize: arrow.PrimitiveTypes.Int64,
ColumnTypeLabel: arrow.BinaryTypes.String,
}
func columnToField(col *Column) arrow.Field {
dtype, ok := columnDatatypes[col.Type]
if !ok {
dtype = arrow.Null
}
return arrow.Field{
Name: makeColumnName(col.Name, col.Type.String(), dtype),
Type: dtype,
Nullable: true,
}
}
// makeColumnName produces a unique field name "<label>.<type>.<dtype>" or
// "<type>.<dtype>" when the column has no label. Mirrors streams/reader.go.
func makeColumnName(label, name string, dty arrow.DataType) string {
switch {
case label == "" && name == "":
return dty.Name()
case label == "" && name != "":
return name + "." + dty.Name()
default:
if name == "" {
name = "<invalid>"
}
return label + "." + name + "." + dty.Name()
}
}