Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/dataobj/sections/stats/encode_columnar.go

250 lines
8.8 KiB

package stats
import (
"errors"
"fmt"
"strings"
"github.com/grafana/loki/v3/pkg/dataobj/internal/dataset"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
"github.com/grafana/loki/v3/pkg/dataobj/sections/internal/columnar"
)
// ColumnarSectionEncoder returns a [SectionEncoder] that encodes [Stat] rows
// using [dataset.ColumnBuilder]s backed by a [columnar.Encoder].
//
// pageSizeHint and pageMaxRowCount control the page splitting behaviour of the
// underlying column builders.
func ColumnarSectionEncoder(pageSizeHint int, pageMaxRowCount int) SectionEncoder {
return func(rows []Stat, enc *columnar.Encoder) error {
return columnarEncode(rows, enc, pageSizeHint, pageMaxRowCount)
}
}
// columnarEncode implements the core encoding logic.
func columnarEncode(rows []Stat, enc *columnar.Encoder, pageSizeHint, pageMaxRowCount int) error {
if len(rows) == 0 {
return nil
}
// Parse label keys from SortSchema. All rows within a section share the
// same SortSchema (guaranteed by the builder being per-tenant and the
// calculation pipeline producing rows with a config-driven schema).
sortSchema := rows[0].SortSchema
var labelKeys []string
if sortSchema != "" {
parts := strings.Split(sortSchema, ",")
for _, k := range parts {
if k != "" {
labelKeys = append(labelKeys, k)
}
}
}
// Build fixed column builders (indices 0-6).
objectPathBuilder, err := binaryColumnBuilder(ColumnTypeObjectPath, pageSizeHint, pageMaxRowCount)
if err != nil {
return fmt.Errorf("creating object_path column: %w", err)
}
sectionIndexBuilder, err := numberColumnBuilder(ColumnTypeSectionIndex, pageSizeHint, pageMaxRowCount)
if err != nil {
return fmt.Errorf("creating section_index column: %w", err)
}
sortSchemaBuilder, err := binaryColumnBuilder(ColumnTypeSortSchema, pageSizeHint, pageMaxRowCount)
if err != nil {
return fmt.Errorf("creating sort_schema column: %w", err)
}
minTimestampBuilder, err := numberColumnBuilder(ColumnTypeMinTimestamp, pageSizeHint, pageMaxRowCount)
if err != nil {
return fmt.Errorf("creating min_timestamp column: %w", err)
}
maxTimestampBuilder, err := numberColumnBuilder(ColumnTypeMaxTimestamp, pageSizeHint, pageMaxRowCount)
if err != nil {
return fmt.Errorf("creating max_timestamp column: %w", err)
}
rowCountBuilder, err := numberColumnBuilder(ColumnTypeRowCount, pageSizeHint, pageMaxRowCount)
if err != nil {
return fmt.Errorf("creating row_count column: %w", err)
}
uncompressedSizeBuilder, err := numberColumnBuilder(ColumnTypeUncompressedSize, pageSizeHint, pageMaxRowCount)
if err != nil {
return fmt.Errorf("creating uncompressed_size column: %w", err)
}
// Build dynamic label column builders (indices 7+), one per label key in sort-schema order.
labelBuilders := make([]*dataset.ColumnBuilder, len(labelKeys))
for i, key := range labelKeys {
lb, err := labelColumnBuilder(key, pageSizeHint, pageMaxRowCount)
if err != nil {
return fmt.Errorf("creating label column %q: %w", key, err)
}
labelBuilders[i] = lb
}
// Populate column builders row by row.
for i, r := range rows {
_ = objectPathBuilder.Append(i, dataset.BinaryValue([]byte(r.ObjectPath)))
_ = sectionIndexBuilder.Append(i, dataset.Int64Value(r.SectionIndex))
_ = sortSchemaBuilder.Append(i, dataset.BinaryValue([]byte(r.SortSchema)))
_ = minTimestampBuilder.Append(i, dataset.Int64Value(r.MinTimestamp))
_ = maxTimestampBuilder.Append(i, dataset.Int64Value(r.MaxTimestamp))
_ = rowCountBuilder.Append(i, dataset.Int64Value(r.RowCount))
_ = uncompressedSizeBuilder.Append(i, dataset.Int64Value(r.UncompressedSize))
// Dynamic label columns: append value, or omit (null) when absent.
for j, key := range labelKeys {
if val, ok := r.Labels[key]; ok {
_ = labelBuilders[j].Append(i, dataset.BinaryValue([]byte(val)))
}
}
}
// Set sort info: sort_schema (index 2), then dynamic label columns (indices 7+),
// then min_timestamp (index 3), then max_timestamp (index 4).
// Fixed column indices: object_path=0, section_index=1, sort_schema=2,
// min_timestamp=3, max_timestamp=4, row_count=5, uncompressed_size=6.
// Label columns: 7, 8, ...
columnSorts := make([]*datasetmd.SortInfo_ColumnSort, 0, 1+len(labelKeys)+2)
columnSorts = append(columnSorts, &datasetmd.SortInfo_ColumnSort{
ColumnIndex: 2, // sort_schema
Direction: datasetmd.SORT_DIRECTION_ASCENDING,
})
for i := range labelKeys {
columnSorts = append(columnSorts, &datasetmd.SortInfo_ColumnSort{
ColumnIndex: uint32(7 + i),
Direction: datasetmd.SORT_DIRECTION_ASCENDING,
})
}
columnSorts = append(columnSorts, &datasetmd.SortInfo_ColumnSort{
ColumnIndex: 3, // min_timestamp
Direction: datasetmd.SORT_DIRECTION_ASCENDING,
})
columnSorts = append(columnSorts, &datasetmd.SortInfo_ColumnSort{
ColumnIndex: 4, // max_timestamp
Direction: datasetmd.SORT_DIRECTION_ASCENDING,
})
enc.SetSortInfo(&datasetmd.SortInfo{ColumnSorts: columnSorts})
// Encode fixed columns.
errs := make([]error, 0, 7+len(labelKeys))
errs = append(errs, encodeColumn(enc, ColumnTypeObjectPath, objectPathBuilder))
errs = append(errs, encodeColumn(enc, ColumnTypeSectionIndex, sectionIndexBuilder))
errs = append(errs, encodeColumn(enc, ColumnTypeSortSchema, sortSchemaBuilder))
errs = append(errs, encodeColumn(enc, ColumnTypeMinTimestamp, minTimestampBuilder))
errs = append(errs, encodeColumn(enc, ColumnTypeMaxTimestamp, maxTimestampBuilder))
errs = append(errs, encodeColumn(enc, ColumnTypeRowCount, rowCountBuilder))
errs = append(errs, encodeColumn(enc, ColumnTypeUncompressedSize, uncompressedSizeBuilder))
// Encode dynamic label columns.
for i, lb := range labelBuilders {
errs = append(errs, encodeLabelColumn(enc, labelKeys[i], lb))
}
if err := errors.Join(errs...); err != nil {
return fmt.Errorf("encoding columns: %w", err)
}
return nil
}
// binaryColumnBuilder creates a column builder for a fixed BINARY/PLAIN/ZSTD column.
func binaryColumnBuilder(logicalType ColumnType, pageSize, pageRowCount int) (*dataset.ColumnBuilder, error) {
return dataset.NewColumnBuilder("", dataset.BuilderOptions{
PageSizeHint: pageSize,
PageMaxRowCount: pageRowCount,
Type: dataset.ColumnType{
Physical: datasetmd.PHYSICAL_TYPE_BINARY,
Logical: logicalType.String(),
},
Encoding: datasetmd.ENCODING_TYPE_PLAIN,
Compression: datasetmd.COMPRESSION_TYPE_ZSTD,
})
}
// numberColumnBuilder creates a column builder for a fixed INT64/DELTA/NONE column.
func numberColumnBuilder(logicalType ColumnType, pageSize, pageRowCount int) (*dataset.ColumnBuilder, error) {
return dataset.NewColumnBuilder("", dataset.BuilderOptions{
PageSizeHint: pageSize,
PageMaxRowCount: pageRowCount,
Type: dataset.ColumnType{
Physical: datasetmd.PHYSICAL_TYPE_INT64,
Logical: logicalType.String(),
},
Encoding: datasetmd.ENCODING_TYPE_DELTA,
Compression: datasetmd.COMPRESSION_TYPE_NONE,
})
}
// labelColumnBuilder creates a column builder for a dynamic label column (BINARY/PLAIN/ZSTD).
// The tag is the label name; the logical type is ColumnTypeLabel.
func labelColumnBuilder(labelName string, pageSize, pageRowCount int) (*dataset.ColumnBuilder, error) {
return dataset.NewColumnBuilder(labelName, dataset.BuilderOptions{
PageSizeHint: pageSize,
PageMaxRowCount: pageRowCount,
Type: dataset.ColumnType{
Physical: datasetmd.PHYSICAL_TYPE_BINARY,
Logical: ColumnTypeLabel.String(),
},
Encoding: datasetmd.ENCODING_TYPE_PLAIN,
Compression: datasetmd.COMPRESSION_TYPE_ZSTD,
})
}
// encodeColumn flushes a builder and writes all its pages to enc.
func encodeColumn(enc *columnar.Encoder, columnType ColumnType, builder *dataset.ColumnBuilder) error {
column, err := builder.Flush()
if err != nil {
return fmt.Errorf("flushing %s column: %w", columnType, err)
}
columnEnc, err := enc.OpenColumn(column.ColumnDesc())
if err != nil {
return fmt.Errorf("opening %s column encoder: %w", columnType, err)
}
defer func() {
_ = columnEnc.Discard()
}()
if len(column.Pages) == 0 {
return nil
}
for _, page := range column.Pages {
if err := columnEnc.AppendPage(page); err != nil {
return fmt.Errorf("appending %s page: %w", columnType, err)
}
}
return columnEnc.Commit()
}
// encodeLabelColumn flushes a dynamic label column builder and writes all its pages to enc.
func encodeLabelColumn(enc *columnar.Encoder, labelName string, builder *dataset.ColumnBuilder) error {
column, err := builder.Flush()
if err != nil {
return fmt.Errorf("flushing label %q column: %w", labelName, err)
}
columnEnc, err := enc.OpenColumn(column.ColumnDesc())
if err != nil {
return fmt.Errorf("opening label %q column encoder: %w", labelName, err)
}
defer func() {
_ = columnEnc.Discard()
}()
if len(column.Pages) == 0 {
return nil
}
for _, page := range column.Pages {
if err := columnEnc.AppendPage(page); err != nil {
return fmt.Errorf("appending label %q page: %w", labelName, err)
}
}
return columnEnc.Commit()
}