mirror of https://github.com/grafana/loki
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
307 lines
8.7 KiB
307 lines
8.7 KiB
package stats
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
_ "io" // Used for documenting io.EOF.
|
|
|
|
"github.com/apache/arrow-go/v18/arrow"
|
|
"github.com/apache/arrow-go/v18/arrow/memory"
|
|
"go.opentelemetry.io/otel"
|
|
"go.opentelemetry.io/otel/trace"
|
|
|
|
"github.com/grafana/loki/v3/pkg/dataobj"
|
|
"github.com/grafana/loki/v3/pkg/dataobj/internal/arrowconv"
|
|
"github.com/grafana/loki/v3/pkg/dataobj/internal/dataset"
|
|
"github.com/grafana/loki/v3/pkg/dataobj/sections/internal/columnar"
|
|
memoryv2 "github.com/grafana/loki/v3/pkg/memory"
|
|
"github.com/grafana/loki/v3/pkg/xcap"
|
|
)
|
|
|
|
// Section represents an opened stats section.
|
|
type Section struct {
|
|
inner *columnar.Section
|
|
columns []*Column
|
|
}
|
|
|
|
// Open opens a [Section] from an underlying [dataobj.Section]. Open returns an
|
|
// error if the section metadata could not be read or if the provided ctx is
|
|
// canceled.
|
|
func Open(ctx context.Context, section *dataobj.Section) (*Section, error) {
|
|
if !CheckSection(section) {
|
|
return nil, fmt.Errorf("section type mismatch: got=%s want=%s", section.Type, sectionType)
|
|
} else if section.Type.Version != columnar.FormatVersion {
|
|
return nil, fmt.Errorf("unsupported section version: got=%d want=%d", section.Type.Version, columnar.FormatVersion)
|
|
}
|
|
|
|
dec, err := columnar.NewDecoder(section.Reader, section.Type.Version)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("creating decoder: %w", err)
|
|
}
|
|
|
|
columnarSection, err := columnar.Open(ctx, section.Tenant, dec)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("opening columnar section: %w", err)
|
|
}
|
|
|
|
sec := &Section{inner: columnarSection}
|
|
if err := sec.init(); err != nil {
|
|
return nil, fmt.Errorf("initializing section: %w", err)
|
|
}
|
|
return sec, nil
|
|
}
|
|
|
|
func (s *Section) init() error {
|
|
for _, col := range s.inner.Columns() {
|
|
colType, err := ParseColumnType(col.Type.Logical)
|
|
if err != nil {
|
|
// Skip over unrecognized columns; probably come from a newer
|
|
// version of the code.
|
|
continue
|
|
}
|
|
|
|
s.columns = append(s.columns, &Column{
|
|
Section: s,
|
|
Name: col.Tag,
|
|
Type: colType,
|
|
|
|
inner: col,
|
|
})
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Columns returns the set of Columns in the section. The slice of returned
|
|
// columns must not be mutated.
|
|
//
|
|
// Unrecognized columns (e.g., when running older code against newer stats
|
|
// sections) are skipped.
|
|
func (s *Section) Columns() []*Column { return s.columns }
|
|
|
|
// A Column represents one of the columns in the stats section. Valid columns
|
|
// can only be retrieved by calling [Section.Columns].
|
|
//
|
|
// Data in columns can be read by using a [Reader].
|
|
type Column struct {
|
|
Section *Section // Section that contains the column.
|
|
Name string // Optional name of the column (label name for dynamic columns).
|
|
Type ColumnType // Type of data in the column.
|
|
|
|
inner *columnar.Column
|
|
}
|
|
|
|
var tracer = otel.Tracer("pkg/dataobj/sections/stats")
|
|
|
|
// ReaderOptions customizes the behavior of a [Reader].
|
|
type ReaderOptions struct {
|
|
// Columns to read. Each column must belong to the same [Section].
|
|
Columns []*Column
|
|
|
|
// Allocator to use for allocating Arrow records. If nil,
|
|
// [memory.DefaultAllocator] is used.
|
|
Allocator memory.Allocator
|
|
}
|
|
|
|
// validate returns an error if opts is invalid. ReaderOptions are valid when
|
|
// Columns is non-empty and every column belongs to the same Section.
|
|
//
|
|
// Package-private — not exposed publicly; called from init() to keep the
|
|
// public surface minimal per spec.
|
|
func (opts *ReaderOptions) validate() error {
|
|
if len(opts.Columns) == 0 {
|
|
return errors.New("ReaderOptions.Columns must be non-empty")
|
|
}
|
|
var section *Section
|
|
for i, col := range opts.Columns {
|
|
if col == nil {
|
|
return fmt.Errorf("ReaderOptions.Columns[%d] is nil", i)
|
|
}
|
|
if section == nil {
|
|
section = col.Section
|
|
} else if col.Section != section {
|
|
return fmt.Errorf("ReaderOptions.Columns[%d] belongs to a different Section", i)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// A Reader reads batches of rows from a stats [Section]. Returned
|
|
// [arrow.RecordBatch] values must be released by the caller after use.
|
|
type Reader struct {
|
|
opts ReaderOptions
|
|
schema *arrow.Schema
|
|
ready bool
|
|
inner *columnar.ReaderAdapter
|
|
alloc *memoryv2.Allocator
|
|
readSpan trace.Span
|
|
}
|
|
|
|
var errReaderNotOpen = errors.New("reader not opened")
|
|
|
|
// NewReader creates a new Reader. Options are not validated until the first
|
|
// call to [Reader.Open].
|
|
func NewReader(opts ReaderOptions) *Reader {
|
|
var r Reader
|
|
r.Reset(opts)
|
|
return &r
|
|
}
|
|
|
|
// Columns returns the [Column]s the Reader will read.
|
|
func (r *Reader) Columns() []*Column { return r.opts.Columns }
|
|
|
|
// Schema returns the [arrow.Schema] used by the Reader. Set on construction
|
|
// (via [Reader.Reset]) so it is valid before [Reader.Open] is called.
|
|
func (r *Reader) Schema() *arrow.Schema { return r.schema }
|
|
|
|
// Reset reuses the Reader with new options.
|
|
func (r *Reader) Reset(opts ReaderOptions) {
|
|
if r.alloc == nil {
|
|
r.alloc = memoryv2.NewAllocator(nil)
|
|
} else {
|
|
r.alloc.Reset()
|
|
}
|
|
r.opts = opts
|
|
r.schema = columnsSchema(opts.Columns)
|
|
r.readSpan = nil
|
|
r.ready = false
|
|
if r.inner != nil {
|
|
_ = r.inner.Close()
|
|
}
|
|
}
|
|
|
|
// Open initializes Reader resources. Must be called before [Reader.Read].
|
|
// Safe to call multiple times.
|
|
func (r *Reader) Open(ctx context.Context) error {
|
|
if r.ready {
|
|
return nil
|
|
}
|
|
if err := r.init(ctx); err != nil {
|
|
_ = r.Close()
|
|
return fmt.Errorf("initializing Reader: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Read reads up to batchSize rows from the section. At end of section returns
|
|
// (nil, io.EOF). May return a non-nil batch with io.EOF — callers should
|
|
// process the batch before checking the error. Returned batches must be
|
|
// released by the caller after use.
|
|
func (r *Reader) Read(ctx context.Context, batchSize int) (arrow.RecordBatch, error) {
|
|
if !r.ready {
|
|
return nil, errReaderNotOpen
|
|
}
|
|
if r.readSpan == nil {
|
|
ctx, r.readSpan = xcap.StartSpan(ctx, tracer, "stats.Reader.Read")
|
|
} else {
|
|
ctx = xcap.ContextWithSpan(ctx, r.readSpan)
|
|
}
|
|
defer r.alloc.Reclaim()
|
|
rb, readErr := r.inner.Read(ctx, r.alloc, batchSize)
|
|
result, err := arrowconv.ToRecordBatch(rb, r.schema)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("convert columnar.RecordBatch to arrow.RecordBatch: %w", err)
|
|
}
|
|
return result, readErr
|
|
}
|
|
|
|
func (r *Reader) init(ctx context.Context) error {
|
|
if err := r.opts.validate(); err != nil {
|
|
return fmt.Errorf("invalid options: %w", err)
|
|
}
|
|
if r.opts.Allocator == nil {
|
|
r.opts.Allocator = memory.DefaultAllocator
|
|
}
|
|
|
|
ctx, span := xcap.StartSpan(ctx, tracer, "stats.Reader.Open")
|
|
defer span.End()
|
|
|
|
cols := r.opts.Columns
|
|
innerSection := cols[0].Section.inner
|
|
innerColumns := make([]*columnar.Column, len(cols))
|
|
for i, c := range cols {
|
|
innerColumns[i] = c.inner
|
|
}
|
|
|
|
dset, err := columnar.MakeDataset(innerSection, innerColumns)
|
|
if err != nil {
|
|
return fmt.Errorf("creating dataset: %w", err)
|
|
}
|
|
|
|
innerOptions := dataset.RowReaderOptions{
|
|
Dataset: dset,
|
|
Columns: dset.Columns(),
|
|
Prefetch: true,
|
|
}
|
|
if r.inner == nil {
|
|
r.inner = columnar.NewReaderAdapter(innerOptions)
|
|
} else {
|
|
r.inner.Reset(innerOptions)
|
|
}
|
|
if err := r.inner.Open(ctx); err != nil {
|
|
return fmt.Errorf("opening reader: %w", err)
|
|
}
|
|
r.ready = true
|
|
return nil
|
|
}
|
|
|
|
// Close closes the Reader and releases any resources it holds.
|
|
func (r *Reader) Close() error {
|
|
if r.readSpan != nil {
|
|
r.readSpan.End()
|
|
}
|
|
if r.inner != nil {
|
|
return r.inner.Close()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// columnsSchema builds the arrow schema for the given projected columns.
|
|
func columnsSchema(cols []*Column) *arrow.Schema {
|
|
fields := make([]arrow.Field, 0, len(cols))
|
|
for _, col := range cols {
|
|
fields = append(fields, columnToField(col))
|
|
}
|
|
return arrow.NewSchema(fields, nil)
|
|
}
|
|
|
|
var columnDatatypes = map[ColumnType]arrow.DataType{
|
|
ColumnTypeInvalid: arrow.Null,
|
|
ColumnTypeObjectPath: arrow.BinaryTypes.String,
|
|
ColumnTypeSectionIndex: arrow.PrimitiveTypes.Int64,
|
|
ColumnTypeSortSchema: arrow.BinaryTypes.String,
|
|
ColumnTypeMinTimestamp: arrow.FixedWidthTypes.Timestamp_ns,
|
|
ColumnTypeMaxTimestamp: arrow.FixedWidthTypes.Timestamp_ns,
|
|
ColumnTypeRowCount: arrow.PrimitiveTypes.Int64,
|
|
ColumnTypeUncompressedSize: arrow.PrimitiveTypes.Int64,
|
|
ColumnTypeLabel: arrow.BinaryTypes.String,
|
|
}
|
|
|
|
func columnToField(col *Column) arrow.Field {
|
|
dtype, ok := columnDatatypes[col.Type]
|
|
if !ok {
|
|
dtype = arrow.Null
|
|
}
|
|
return arrow.Field{
|
|
Name: makeColumnName(col.Name, col.Type.String(), dtype),
|
|
Type: dtype,
|
|
Nullable: true,
|
|
}
|
|
}
|
|
|
|
// makeColumnName produces a unique field name "<label>.<type>.<dtype>" or
|
|
// "<type>.<dtype>" when the column has no label. Mirrors streams/reader.go.
|
|
func makeColumnName(label, name string, dty arrow.DataType) string {
|
|
switch {
|
|
case label == "" && name == "":
|
|
return dty.Name()
|
|
case label == "" && name != "":
|
|
return name + "." + dty.Name()
|
|
default:
|
|
if name == "" {
|
|
name = "<invalid>"
|
|
}
|
|
return label + "." + name + "." + dty.Name()
|
|
}
|
|
}
|
|
|