Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/dataobj/sections/internal/columnar/reader_adapter.go

111 lines
3.3 KiB

package columnar
import (
"context"
"fmt"
"github.com/grafana/loki/v3/pkg/columnar"
"github.com/grafana/loki/v3/pkg/dataobj/internal/dataset"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/util/slicegrow"
"github.com/grafana/loki/v3/pkg/memory"
)
// ReaderAdapter is a temporary translation layer that allows the caller to read
// [columnar.RecordBatch] values from a reader that only supports reads through
// a slice of [dataset.Row].
type ReaderAdapter struct {
inner *dataset.Reader
colTypes []datasetmd.PhysicalType
buf []dataset.Row
}
// NewReaderAdapter creates a ReaderAdapter with the provided dataset reader options.
func NewReaderAdapter(innerOpts dataset.ReaderOptions) *ReaderAdapter {
r := &ReaderAdapter{inner: dataset.NewReader(innerOpts)}
r.Reset(innerOpts)
return r
}
// Reset reinitializes the adapter with new reader options.
func (r *ReaderAdapter) Reset(opts dataset.ReaderOptions) {
r.inner.Reset(opts)
slicegrow.GrowToCap(r.colTypes, len(opts.Columns))
r.colTypes = r.colTypes[:0]
for _, col := range opts.Columns {
r.colTypes = append(r.colTypes, col.ColumnDesc().Type.Physical)
}
}
// Close closes the underlying reader.
func (r *ReaderAdapter) Close() error {
return r.inner.Close()
}
// Read reads up to batchSize rows from the underlying dataset reader and
// returns them as a [columnar.RecordBatch].
func (r *ReaderAdapter) Read(ctx context.Context, alloc *memory.Allocator, batchSize int) (columnar.RecordBatch, error) {
r.buf = slicegrow.GrowToCap(r.buf, batchSize)
r.buf = r.buf[:batchSize]
var arrBuilders []columnar.Builder
n, readErr := r.inner.Read(ctx, r.buf)
for _, colType := range r.colTypes {
switch colType {
case datasetmd.PHYSICAL_TYPE_UNSPECIFIED:
return columnar.RecordBatch{}, fmt.Errorf("undefined physical type: %v", colType)
case datasetmd.PHYSICAL_TYPE_INT64:
builder := columnar.NewNumberBuilder[int64](alloc)
builder.Grow(n)
arrBuilders = append(arrBuilders, builder)
case datasetmd.PHYSICAL_TYPE_UINT64:
builder := columnar.NewNumberBuilder[uint64](alloc)
builder.Grow(n)
arrBuilders = append(arrBuilders, builder)
case datasetmd.PHYSICAL_TYPE_BINARY:
builder := columnar.NewUTF8Builder(alloc)
builder.Grow(n)
arrBuilders = append(arrBuilders, builder)
}
}
for rowIndex := range n {
row := r.buf[rowIndex]
for colIdx, val := range row.Values {
colType := r.colTypes[colIdx]
builder := arrBuilders[colIdx]
if val.IsNil() {
builder.AppendNull()
continue
}
switch colType {
case datasetmd.PHYSICAL_TYPE_UNSPECIFIED:
return columnar.RecordBatch{}, fmt.Errorf("unsupported column type: %s", colType)
case datasetmd.PHYSICAL_TYPE_INT64:
builder.(*columnar.NumberBuilder[int64]).AppendValue(val.Int64())
case datasetmd.PHYSICAL_TYPE_UINT64:
builder.(*columnar.NumberBuilder[uint64]).AppendValue(val.Uint64())
case datasetmd.PHYSICAL_TYPE_BINARY:
builder.(*columnar.UTF8Builder).AppendValue(val.Binary())
}
}
}
arrs := make([]columnar.Array, len(arrBuilders))
for i, builder := range arrBuilders {
arrs[i] = builder.BuildArray()
}
// We only return readErr after processing n so that we properly handle n>0
// while also getting an error such as io.EOF.
return columnar.NewRecordBatch(int64(n), arrs), readErr
}