Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
loki/pkg/dataobj/sections/pointers/iter.go

197 lines
6.1 KiB

package pointers
import (
"context"
"errors"
"fmt"
"io"
"time"
"unsafe"
"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/dataobj/internal/dataset"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/result"
"github.com/grafana/loki/v3/pkg/dataobj/internal/util/slicegrow"
"github.com/grafana/loki/v3/pkg/dataobj/internal/util/symbolizer"
"github.com/grafana/loki/v3/pkg/dataobj/sections/internal/columnar"
)
// Iter iterates over pointers in the provided decoder. All pointers sections are
// iterated over in order.
func Iter(ctx context.Context, obj *dataobj.Object) result.Seq[SectionPointer] {
return result.Iter(func(yield func(SectionPointer) bool) error {
for i, section := range obj.Sections().Filter(CheckSection) {
pointersSection, err := Open(ctx, section)
if err != nil {
return fmt.Errorf("opening section %d: %w", i, err)
}
for result := range IterSection(ctx, pointersSection) {
if result.Err() != nil || !yield(result.MustValue()) {
return result.Err()
}
}
}
return nil
})
}
func IterSection(ctx context.Context, section *Section) result.Seq[SectionPointer] {
return result.Iter(func(yield func(SectionPointer) bool) error {
columnarSection := section.inner
dset, err := columnar.MakeDataset(columnarSection, columnarSection.Columns())
if err != nil {
return fmt.Errorf("creating columns dataset: %w", err)
}
columns, err := result.Collect(dset.ListColumns(ctx))
if err != nil {
return err
}
r := dataset.NewReader(dataset.ReaderOptions{
Dataset: dset,
Columns: columns,
Prefetch: true,
})
defer r.Close()
sym := symbolizer.New(128, 1024)
var rows [1]dataset.Row
for {
n, err := r.Read(ctx, rows[:])
if err != nil && !errors.Is(err, io.EOF) {
return err
} else if n == 0 && errors.Is(err, io.EOF) {
return nil
}
var pointer SectionPointer
for _, row := range rows[:n] {
if err := decodeRow(section.Columns(), row, &pointer, sym); err != nil {
return err
}
if !yield(pointer) {
return nil
}
}
}
})
}
// decodeRow decodes a stream from a [dataset.Row], using the provided columns to
// determine the column type. The list of columns must match the columns used
// to create the row.
//
// The sym argument is used for reusing label values between calls to
// decodeRow. If sym is nil, label value strings are always allocated.
func decodeRow(columns []*Column, row dataset.Row, pointer *SectionPointer, sym *symbolizer.Symbolizer) error {
pointer.Reset()
for columnIndex, columnValue := range row.Values {
if columnValue.IsNil() || columnValue.IsZero() {
continue
}
column := columns[columnIndex]
switch column.Type {
case ColumnTypePath:
if ty := columnValue.Type(); ty != datasetmd.PHYSICAL_TYPE_BINARY {
return fmt.Errorf("invalid type %s for %s", ty, column.Type)
}
pointer.Path = sym.Get(unsafeString(columnValue.Binary()))
case ColumnTypeSection:
if ty := columnValue.Type(); ty != datasetmd.PHYSICAL_TYPE_INT64 {
return fmt.Errorf("invalid type %s for %s", ty, column.Type)
}
pointer.Section = columnValue.Int64()
case ColumnTypePointerKind:
if ty := columnValue.Type(); ty != datasetmd.PHYSICAL_TYPE_INT64 {
return fmt.Errorf("invalid type %s for %s", ty, column.Type)
}
switch columnValue.Int64() {
case int64(PointerKindStreamIndex):
pointer.PointerKind = PointerKindStreamIndex
case int64(PointerKindColumnIndex):
pointer.PointerKind = PointerKindColumnIndex
default:
return fmt.Errorf("invalid pointer kind %d", columnValue.Int64())
}
case ColumnTypeStreamID:
if ty := columnValue.Type(); ty != datasetmd.PHYSICAL_TYPE_INT64 {
return fmt.Errorf("invalid type %s for %s", ty, column.Type)
}
pointer.StreamID = columnValue.Int64()
case ColumnTypeStreamIDRef:
if ty := columnValue.Type(); ty != datasetmd.PHYSICAL_TYPE_INT64 {
return fmt.Errorf("invalid type %s for %s", ty, column.Type)
}
pointer.StreamIDRef = columnValue.Int64()
case ColumnTypeMinTimestamp:
if ty := columnValue.Type(); ty != datasetmd.PHYSICAL_TYPE_INT64 {
return fmt.Errorf("invalid type %s for %s", ty, column.Type)
}
pointer.StartTs = time.Unix(0, columnValue.Int64())
case ColumnTypeMaxTimestamp:
if ty := columnValue.Type(); ty != datasetmd.PHYSICAL_TYPE_INT64 {
return fmt.Errorf("invalid type %s for %s", ty, column.Type)
}
pointer.EndTs = time.Unix(0, columnValue.Int64())
case ColumnTypeRowCount:
if ty := columnValue.Type(); ty != datasetmd.PHYSICAL_TYPE_INT64 {
return fmt.Errorf("invalid type %s for %s", ty, column.Type)
}
pointer.LineCount = columnValue.Int64()
case ColumnTypeUncompressedSize:
if ty := columnValue.Type(); ty != datasetmd.PHYSICAL_TYPE_INT64 {
return fmt.Errorf("invalid type %s for %s", ty, column.Type)
}
pointer.UncompressedSize = columnValue.Int64()
case ColumnTypeColumnName:
if ty := columnValue.Type(); ty != datasetmd.PHYSICAL_TYPE_BINARY {
return fmt.Errorf("invalid type %s for %s", ty, column.Type)
}
pointer.ColumnName = sym.Get(unsafeString(columnValue.Binary()))
case ColumnTypeColumnIndex:
if ty := columnValue.Type(); ty != datasetmd.PHYSICAL_TYPE_INT64 {
return fmt.Errorf("invalid type %s for %s", ty, column.Type)
}
pointer.ColumnIndex = columnValue.Int64()
case ColumnTypeValuesBloomFilter:
if ty := columnValue.Type(); ty != datasetmd.PHYSICAL_TYPE_BINARY {
return fmt.Errorf("invalid type %s for %s", ty, column.Type)
}
filterBytes := columnValue.Binary()
pointer.ValuesBloomFilter = slicegrow.GrowToCap(pointer.ValuesBloomFilter, len(filterBytes))
pointer.ValuesBloomFilter = pointer.ValuesBloomFilter[:len(filterBytes)]
copy(pointer.ValuesBloomFilter, filterBytes)
default:
// TODO(rfratto): We probably don't want to return an error on unexpected
// columns because it breaks forward compatibility. Should we log
// something here?
}
}
return nil
}
func unsafeString(data []byte) string {
return unsafe.String(unsafe.SliceData(data), len(data))
}