Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/engine/internal/executor/streams_view.go

257 lines
6.8 KiB

package executor
import (
"context"
"errors"
"fmt"
"io"
"iter"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/apache/arrow-go/v18/arrow/scalar"
"github.com/prometheus/prometheus/model/labels"
"github.com/grafana/loki/v3/pkg/dataobj/sections/streams"
"github.com/grafana/loki/v3/pkg/xcap"
)
// streamsView provides a view of the streams in a section, allowing for
// querying labels of a stream.
//
// streamsView lazily loads streams upon the first call.
type streamsView struct {
sec *streams.Section
streamIDs []int64
idColumn *streams.Column
searchColumns []*streams.Column // stream ID + labels
batchSize int
initialized bool
streams arrow.Table
idRowMapping map[int64]int // Mapping of stream ID to absolute row index in the streams table.
}
type streamsViewOptions struct {
// StreamIDs holds the list of stream IDs to include in the view. If this
// slice is empty, all streams in the section are included.
StreamIDs []int64
// LabelColumns holds the list of label columns (stream labels) to include in
// the view.
//
// If this slice is empty, all label columns in the section are included.
LabelColumns []*streams.Column
// Maximum number of stream records to read at once. Defaults to 128.
BatchSize int
// The size of the page cache to use for reading sections.
CacheSize int
}
// newStreamsView creates a new view of the given streams section. Only the
// specified ids will be included in the view.
func newStreamsView(sec *streams.Section, opts *streamsViewOptions) *streamsView {
cols := opts.LabelColumns
if len(cols) == 0 {
cols = sec.Columns()
}
if opts.BatchSize <= 0 {
opts.BatchSize = 128
}
var streamsIDColumn *streams.Column
// We need to iterate through the original columns to find the stream ID
// column, since it's never included in [streamsViewOptions].
for _, col := range sec.Columns() {
if col.Type == streams.ColumnTypeStreamID {
streamsIDColumn = col
break
}
}
return &streamsView{
sec: sec,
streamIDs: opts.StreamIDs,
idColumn: streamsIDColumn,
searchColumns: append([]*streams.Column{streamsIDColumn}, cols...),
batchSize: opts.BatchSize,
}
}
// NumLabels returns the number of labels in the view.
func (v *streamsView) NumLabels() int {
return len(v.searchColumns) - 1
}
// Labels iterates over all of the non-null labels of a stream with the given
// id. If [streamsViewOptions] included a subset of labels, only those labels
// are returned.
func (v *streamsView) Labels(ctx context.Context, id int64) (iter.Seq[labels.Label], error) {
if err := v.init(ctx); err != nil {
return nil, err
}
rowColumnIndex, ok := v.idRowMapping[id]
if !ok {
return nil, fmt.Errorf("stream ID %d not found in section or not included in filter", id)
}
return func(yield func(labels.Label) bool) {
for colIndex := range int(v.streams.NumCols()) {
// Skip any column which isn't a label column.
//
// This is safe because [streams.Reader] guarantees that the order of
// columns in the Arrow schema matches the order of [streams.Column]s
// provided to the reader.
if v.searchColumns[colIndex].Type != streams.ColumnTypeLabel {
continue
}
// Find the array where our row is. While columnChunkedRow can return nil
// if the row isn't found, we know we're giving it a valid row index so
// we can bypass the check here.
arr, rowArrayIndex := columnChunkedRow(v.streams.Column(colIndex), rowColumnIndex)
if arr.IsNull(rowArrayIndex) {
continue
}
label := labels.Label{
Name: v.searchColumns[colIndex].Name,
}
switch colValues := arr.(type) {
case *array.String:
label.Value = colValues.Value(rowArrayIndex)
case *array.Binary:
label.Value = string(colValues.Value(rowArrayIndex))
default:
panic(fmt.Sprintf("unexpected column type %T for labels", colValues))
}
if !yield(label) {
return
}
}
}, nil
}
func (v *streamsView) init(ctx context.Context) (err error) {
if v.initialized {
return nil
}
ctx, region := xcap.StartRegion(ctx, "streamsView.init")
defer region.End()
if v.idColumn == nil { // Initialized in [newStreamsView].
// The streams builder always produces a section with a streams ID column.
// If we hit this, someone probably made a custom section and provided it
// to us.
return fmt.Errorf("section does not contain a stream ID column")
}
readerOptions := streams.ReaderOptions{
Columns: v.searchColumns,
Allocator: memory.DefaultAllocator,
}
var scalarIDs []scalar.Scalar
for _, id := range v.streamIDs {
scalarIDs = append(scalarIDs, scalar.NewInt64Scalar(id))
}
if len(scalarIDs) > 0 {
readerOptions.Predicates = []streams.Predicate{
streams.InPredicate{Column: v.idColumn, Values: scalarIDs},
}
}
r := streams.NewReader(readerOptions)
defer r.Close()
var records []arrow.RecordBatch
for {
rec, err := r.Read(ctx, v.batchSize)
if rec != nil && rec.NumRows() > 0 {
records = append(records, rec)
}
if err != nil && !errors.Is(err, io.EOF) {
return err
} else if err != nil && errors.Is(err, io.EOF) {
break
}
}
table := array.NewTableFromRecords(r.Schema(), records)
idMapping := make(map[int64]int, table.NumRows())
for colIndex := range int(table.NumCols()) {
if v.searchColumns[colIndex].Type != streams.ColumnTypeStreamID {
continue
}
var chunkStartRow int
col := table.Column(colIndex)
for _, chunk := range col.Data().Chunks() {
idArray, ok := chunk.(*array.Int64)
if !ok {
return fmt.Errorf("expected streamds ID to be of type Int64, got %T", chunk)
}
for chunkRow := range idArray.Len() {
id := idArray.Value(chunkRow)
idMapping[id] = chunkStartRow + chunkRow
}
chunkStartRow += chunk.Len()
}
break // Processed the streams ID column, so we're done.
}
v.streams = table
v.initialized = true
v.idRowMapping = idMapping
return nil
}
// columnChunkedRow finds a column-wide rows in a chunked array, returning the
// array that the row is in and the relative row index inside that array.
func columnChunkedRow(col *arrow.Column, absoluteRow int) (arr arrow.Array, relativeRow int) {
var chunkStartRow int
for _, chunk := range col.Data().Chunks() {
// Subtract one to get the inclusive end row.
//
// e.g., if a chunk starts at row 0 and has a length of 1, then its end row
// is 0.
chunkEndRow := chunkStartRow + chunk.Len() - 1
if chunkStartRow <= absoluteRow && absoluteRow <= chunkEndRow {
relativeRow = absoluteRow - chunkStartRow
return chunk, relativeRow
}
// The next chunk starts one after where the current chunk ends.
chunkStartRow += chunk.Len()
}
return nil, -1 // not found
}
func (v *streamsView) Close() {
if !v.initialized {
return
}
v.initialized = false
v.streams = nil
clear(v.idRowMapping)
}