mirror of https://github.com/grafana/loki
chore(dataobj): logs section metadata and encoding/decoding (#15720)
parent
709a3a22c5
commit
bcec63d9c2
@ -0,0 +1,152 @@ |
||||
package encoding |
||||
|
||||
import ( |
||||
"context" |
||||
"fmt" |
||||
|
||||
"github.com/grafana/loki/v3/pkg/dataobj/internal/dataset" |
||||
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/filemd" |
||||
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/logsmd" |
||||
"github.com/grafana/loki/v3/pkg/dataobj/internal/result" |
||||
) |
||||
|
||||
// LogsDataset implements returns a [dataset.Dataset] from a [LogsDecoder] for
|
||||
// the given section.
|
||||
func LogsDataset(dec LogsDecoder, sec *filemd.SectionInfo) dataset.Dataset { |
||||
return &logsDataset{dec: dec, sec: sec} |
||||
} |
||||
|
||||
type logsDataset struct { |
||||
dec LogsDecoder |
||||
sec *filemd.SectionInfo |
||||
} |
||||
|
||||
func (ds *logsDataset) ListColumns(ctx context.Context) result.Seq[dataset.Column] { |
||||
return result.Iter(func(yield func(dataset.Column) bool) error { |
||||
columns, err := ds.dec.Columns(ctx, ds.sec) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
for _, column := range columns { |
||||
if !yield(&logsDatasetColumn{dec: ds.dec, desc: column}) { |
||||
return nil |
||||
} |
||||
} |
||||
|
||||
return err |
||||
}) |
||||
|
||||
} |
||||
|
||||
func (ds *logsDataset) ListPages(ctx context.Context, columns []dataset.Column) result.Seq[dataset.Pages] { |
||||
// TODO(rfratto): Switch to batch retrieval instead of iterating over each column.
|
||||
return result.Iter(func(yield func(dataset.Pages) bool) error { |
||||
for _, column := range columns { |
||||
pages, err := result.Collect(column.ListPages(ctx)) |
||||
if err != nil { |
||||
return err |
||||
} else if !yield(pages) { |
||||
return nil |
||||
} |
||||
} |
||||
|
||||
return nil |
||||
}) |
||||
} |
||||
|
||||
func (ds *logsDataset) ReadPages(ctx context.Context, pages []dataset.Page) result.Seq[dataset.PageData] { |
||||
// TODO(rfratto): Switch to batch retrieval instead of iterating over each page.
|
||||
return result.Iter(func(yield func(dataset.PageData) bool) error { |
||||
for _, page := range pages { |
||||
data, err := page.ReadPage(ctx) |
||||
if err != nil { |
||||
return err |
||||
} else if !yield(data) { |
||||
return nil |
||||
} |
||||
} |
||||
|
||||
return nil |
||||
}) |
||||
} |
||||
|
||||
type logsDatasetColumn struct { |
||||
dec LogsDecoder |
||||
desc *logsmd.ColumnDesc |
||||
|
||||
info *dataset.ColumnInfo |
||||
} |
||||
|
||||
func (col *logsDatasetColumn) ColumnInfo() *dataset.ColumnInfo { |
||||
if col.info != nil { |
||||
return col.info |
||||
} |
||||
|
||||
col.info = &dataset.ColumnInfo{ |
||||
Name: col.desc.Info.Name, |
||||
Type: col.desc.Info.ValueType, |
||||
Compression: col.desc.Info.Compression, |
||||
|
||||
RowsCount: int(col.desc.Info.RowsCount), |
||||
CompressedSize: int(col.desc.Info.CompressedSize), |
||||
UncompressedSize: int(col.desc.Info.UncompressedSize), |
||||
|
||||
Statistics: col.desc.Info.Statistics, |
||||
} |
||||
return col.info |
||||
} |
||||
|
||||
func (col *logsDatasetColumn) ListPages(ctx context.Context) result.Seq[dataset.Page] { |
||||
return result.Iter(func(yield func(dataset.Page) bool) error { |
||||
pageSets, err := result.Collect(col.dec.Pages(ctx, []*logsmd.ColumnDesc{col.desc})) |
||||
if err != nil { |
||||
return err |
||||
} else if len(pageSets) != 1 { |
||||
return fmt.Errorf("unexpected number of page sets: got=%d want=1", len(pageSets)) |
||||
} |
||||
|
||||
for _, page := range pageSets[0] { |
||||
if !yield(&logsDatasetPage{dec: col.dec, desc: page}) { |
||||
return nil |
||||
} |
||||
} |
||||
|
||||
return nil |
||||
}) |
||||
} |
||||
|
||||
type logsDatasetPage struct { |
||||
dec LogsDecoder |
||||
desc *logsmd.PageDesc |
||||
|
||||
info *dataset.PageInfo |
||||
} |
||||
|
||||
func (p *logsDatasetPage) PageInfo() *dataset.PageInfo { |
||||
if p.info != nil { |
||||
return p.info |
||||
} |
||||
|
||||
p.info = &dataset.PageInfo{ |
||||
UncompressedSize: int(p.desc.Info.UncompressedSize), |
||||
CompressedSize: int(p.desc.Info.CompressedSize), |
||||
CRC32: p.desc.Info.Crc32, |
||||
RowCount: int(p.desc.Info.RowsCount), |
||||
|
||||
Encoding: p.desc.Info.Encoding, |
||||
Stats: p.desc.Info.Statistics, |
||||
} |
||||
return p.info |
||||
} |
||||
|
||||
func (p *logsDatasetPage) ReadPage(ctx context.Context) (dataset.PageData, error) { |
||||
pages, err := result.Collect(p.dec.ReadPages(ctx, []*logsmd.PageDesc{p.desc})) |
||||
if err != nil { |
||||
return nil, err |
||||
} else if len(pages) != 1 { |
||||
return nil, fmt.Errorf("unexpected number of pages: got=%d want=1", len(pages)) |
||||
} |
||||
|
||||
return pages[0], nil |
||||
} |
||||
@ -0,0 +1,259 @@ |
||||
package encoding |
||||
|
||||
import ( |
||||
"bytes" |
||||
"math" |
||||
|
||||
"github.com/gogo/protobuf/proto" |
||||
|
||||
"github.com/grafana/loki/v3/pkg/dataobj/internal/dataset" |
||||
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd" |
||||
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/logsmd" |
||||
"github.com/grafana/loki/v3/pkg/dataobj/internal/streamio" |
||||
) |
||||
|
||||
// LogsEncoder encodes an individual logs section in a data object.
|
||||
// LogsEncoders are created by [Encoder]s.
|
||||
type LogsEncoder struct { |
||||
parent *Encoder |
||||
|
||||
startOffset int // Byte offset in the file where the column starts.
|
||||
closed bool // true if LogsEncoder has been closed.
|
||||
|
||||
data *bytes.Buffer |
||||
columns []*logsmd.ColumnDesc // closed columns.
|
||||
curColumn *logsmd.ColumnDesc // curColumn is the currently open column.
|
||||
} |
||||
|
||||
func newLogsEncoder(parent *Encoder, offset int) *LogsEncoder { |
||||
buf := bytesBufferPool.Get().(*bytes.Buffer) |
||||
buf.Reset() |
||||
|
||||
return &LogsEncoder{ |
||||
parent: parent, |
||||
startOffset: offset, |
||||
|
||||
data: buf, |
||||
} |
||||
} |
||||
|
||||
// OpenColumn opens a new column in the logs section. OpenColumn fails if there
|
||||
// is another open column or if the LogsEncoder has been closed.
|
||||
func (enc *LogsEncoder) OpenColumn(columnType logsmd.ColumnType, info *dataset.ColumnInfo) (*LogsColumnEncoder, error) { |
||||
if enc.curColumn != nil { |
||||
return nil, ErrElementExist |
||||
} else if enc.closed { |
||||
return nil, ErrClosed |
||||
} |
||||
|
||||
// MetadataOffset and MetadataSize aren't available until the column is
|
||||
// closed. We temporarily set these fields to the maximum values so they're
|
||||
// accounted for in the MetadataSize estimate.
|
||||
enc.curColumn = &logsmd.ColumnDesc{ |
||||
Type: columnType, |
||||
Info: &datasetmd.ColumnInfo{ |
||||
Name: info.Name, |
||||
ValueType: info.Type, |
||||
RowsCount: uint32(info.RowsCount), |
||||
Compression: info.Compression, |
||||
UncompressedSize: uint32(info.UncompressedSize), |
||||
CompressedSize: uint32(info.CompressedSize), |
||||
Statistics: info.Statistics, |
||||
|
||||
MetadataOffset: math.MaxUint32, |
||||
MetadataSize: math.MaxUint32, |
||||
}, |
||||
} |
||||
|
||||
return newLogsColumnEncoder( |
||||
enc, |
||||
enc.startOffset+enc.data.Len(), |
||||
), nil |
||||
} |
||||
|
||||
// MetadataSize returns an estimate of the current size of the metadata for the
|
||||
// section. MetadataSize includes an estimate for the currently open element.
|
||||
func (enc *LogsEncoder) MetadataSize() int { return elementMetadataSize(enc) } |
||||
|
||||
func (enc *LogsEncoder) metadata() proto.Message { |
||||
columns := enc.columns[:len(enc.columns):cap(enc.columns)] |
||||
if enc.curColumn != nil { |
||||
columns = append(columns, enc.curColumn) |
||||
} |
||||
return &logsmd.Metadata{Columns: columns} |
||||
} |
||||
|
||||
// Commit closes the section, flushing all data to the parent element. After
|
||||
// Commit is called, the LogsEncoder can no longer be modified.
|
||||
//
|
||||
// Commit fails if there is an open column.
|
||||
func (enc *LogsEncoder) Commit() error { |
||||
if enc.closed { |
||||
return ErrClosed |
||||
} else if enc.curColumn != nil { |
||||
return ErrElementExist |
||||
} |
||||
|
||||
defer bytesBufferPool.Put(enc.data) |
||||
|
||||
if len(enc.columns) == 0 { |
||||
// No data was written; discard.
|
||||
return enc.parent.append(nil, nil) |
||||
} |
||||
|
||||
metadataBuffer := bytesBufferPool.Get().(*bytes.Buffer) |
||||
metadataBuffer.Reset() |
||||
defer bytesBufferPool.Put(metadataBuffer) |
||||
|
||||
// The section metadata should start with its version.
|
||||
if err := streamio.WriteUvarint(metadataBuffer, logsFormatVersion); err != nil { |
||||
return err |
||||
} else if err := elementMetadataWrite(enc, metadataBuffer); err != nil { |
||||
return err |
||||
} |
||||
return enc.parent.append(enc.data.Bytes(), metadataBuffer.Bytes()) |
||||
} |
||||
|
||||
// Discard discards the section, discarding any data written to it. After
|
||||
// Discard is called, the LogsEncoder can no longer be modified.
|
||||
//
|
||||
// Discard fails if there is an open column.
|
||||
func (enc *LogsEncoder) Discard() error { |
||||
if enc.closed { |
||||
return ErrClosed |
||||
} else if enc.curColumn != nil { |
||||
return ErrElementExist |
||||
} |
||||
enc.closed = true |
||||
|
||||
defer bytesBufferPool.Put(enc.data) |
||||
|
||||
return enc.parent.append(nil, nil) |
||||
} |
||||
|
||||
// append adds data and metadata to enc. append must only be called from child
|
||||
// elements on Close and Discard. Discard calls must pass nil for both data and
|
||||
// metadata to denote a discard.
|
||||
func (enc *LogsEncoder) append(data, metadata []byte) error { |
||||
if enc.closed { |
||||
return ErrClosed |
||||
} else if enc.curColumn == nil { |
||||
return errElementNoExist |
||||
} |
||||
|
||||
if len(data) == 0 && len(metadata) == 0 { |
||||
// Column was discarded.
|
||||
enc.curColumn = nil |
||||
return nil |
||||
} |
||||
|
||||
enc.curColumn.Info.MetadataOffset = uint32(enc.startOffset + enc.data.Len() + len(data)) |
||||
enc.curColumn.Info.MetadataSize = uint32(len(metadata)) |
||||
|
||||
// bytes.Buffer.Write never fails.
|
||||
_, _ = enc.data.Write(data) |
||||
_, _ = enc.data.Write(metadata) |
||||
|
||||
enc.columns = append(enc.columns, enc.curColumn) |
||||
enc.curColumn = nil |
||||
return nil |
||||
} |
||||
|
||||
// LogsColumnEncoder encodes an individual column in a logs section.
|
||||
// LogsColumnEncoder are created by [LogsEncoder].
|
||||
type LogsColumnEncoder struct { |
||||
parent *LogsEncoder |
||||
|
||||
startOffset int // Byte offset in the file where the column starts.
|
||||
closed bool // true if LogsColumnEncoder has been closed.
|
||||
|
||||
data *bytes.Buffer // All page data.
|
||||
pages []*logsmd.PageDesc |
||||
} |
||||
|
||||
func newLogsColumnEncoder(parent *LogsEncoder, offset int) *LogsColumnEncoder { |
||||
buf := bytesBufferPool.Get().(*bytes.Buffer) |
||||
buf.Reset() |
||||
|
||||
return &LogsColumnEncoder{ |
||||
parent: parent, |
||||
startOffset: offset, |
||||
|
||||
data: buf, |
||||
} |
||||
} |
||||
|
||||
// AppendPage appens a new [dataset.MemPage] to the column. AppendPage fails if
|
||||
// the column has been closed.
|
||||
func (enc *LogsColumnEncoder) AppendPage(page *dataset.MemPage) error { |
||||
if enc.closed { |
||||
return ErrClosed |
||||
} |
||||
|
||||
// It's possible the caller can pass an incorrect value for UncompressedSize
|
||||
// and CompressedSize, but those fields are purely for stats so we don't
|
||||
// check it.
|
||||
enc.pages = append(enc.pages, &logsmd.PageDesc{ |
||||
Info: &datasetmd.PageInfo{ |
||||
UncompressedSize: uint32(page.Info.UncompressedSize), |
||||
CompressedSize: uint32(page.Info.CompressedSize), |
||||
Crc32: page.Info.CRC32, |
||||
RowsCount: uint32(page.Info.RowCount), |
||||
Encoding: page.Info.Encoding, |
||||
|
||||
DataOffset: uint32(enc.startOffset + enc.data.Len()), |
||||
DataSize: uint32(len(page.Data)), |
||||
|
||||
Statistics: page.Info.Stats, |
||||
}, |
||||
}) |
||||
|
||||
_, _ = enc.data.Write(page.Data) // bytes.Buffer.Write never fails.
|
||||
return nil |
||||
} |
||||
|
||||
// MetadataSize returns an estimate of the current size of the metadata for the
|
||||
// column. MetadataSize does not include the size of data appended.
|
||||
func (enc *LogsColumnEncoder) MetadataSize() int { return elementMetadataSize(enc) } |
||||
|
||||
func (enc *LogsColumnEncoder) metadata() proto.Message { |
||||
return &logsmd.ColumnMetadata{Pages: enc.pages} |
||||
} |
||||
|
||||
// Commit closes the column, flushing all data to the parent element. After
|
||||
// Commit is called, the LogsColumnEncoder can no longer be modified.
|
||||
func (enc *LogsColumnEncoder) Commit() error { |
||||
if enc.closed { |
||||
return ErrClosed |
||||
} |
||||
enc.closed = true |
||||
|
||||
defer bytesBufferPool.Put(enc.data) |
||||
|
||||
if len(enc.pages) == 0 { |
||||
// No data was written; discard.
|
||||
return enc.parent.append(nil, nil) |
||||
} |
||||
|
||||
metadataBuffer := bytesBufferPool.Get().(*bytes.Buffer) |
||||
metadataBuffer.Reset() |
||||
defer bytesBufferPool.Put(metadataBuffer) |
||||
|
||||
if err := elementMetadataWrite(enc, metadataBuffer); err != nil { |
||||
return err |
||||
} |
||||
return enc.parent.append(enc.data.Bytes(), metadataBuffer.Bytes()) |
||||
} |
||||
|
||||
// Discard discards the column, discarding any data written to it. After
|
||||
// Discard is called, the LogsColumnEncoder can no longer be modified.
|
||||
func (enc *LogsColumnEncoder) Discard() error { |
||||
if enc.closed { |
||||
return ErrClosed |
||||
} |
||||
enc.closed = true |
||||
|
||||
defer bytesBufferPool.Put(enc.data) |
||||
|
||||
return enc.parent.append(nil, nil) // Notify parent of discard.
|
||||
} |
||||
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,57 @@ |
||||
// logsmd.proto holds metadata for the logs section of a data object. The logs |
||||
// section contains a series of logs records across multiple streams. |
||||
syntax = "proto3"; |
||||
|
||||
package dataobj.metadata.logs.v1; |
||||
|
||||
import "pkg/dataobj/internal/metadata/datasetmd/datasetmd.proto"; |
||||
|
||||
option go_package = "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/logsmd"; |
||||
|
||||
// Metadata describes the metadata for the logs section. |
||||
message Metadata { |
||||
// Columns within the logs. |
||||
repeated ColumnDesc columns = 1; |
||||
} |
||||
|
||||
// ColumnDesc describes an individual column within the logs table. |
||||
message ColumnDesc { |
||||
// Information about the column. |
||||
dataobj.metadata.dataset.v1.ColumnInfo info = 1; |
||||
|
||||
// Column type. |
||||
ColumnType type = 2; |
||||
} |
||||
|
||||
// ColumnType represents the valid types that a logs column can have. |
||||
enum ColumnType { |
||||
// Invalid column type. |
||||
COLUMN_TYPE_UNSPECIFIED = 0; |
||||
|
||||
// COLUMN_TYPE_STREAM_ID is a column containing the stream the log record |
||||
// belongs to. |
||||
COLUMN_TYPE_STREAM_ID = 1; |
||||
|
||||
// COLUMN_TYPE_TIMESTAMP is a column containing the timestamp of the log |
||||
// record. |
||||
COLUMN_TYPE_TIMESTAMP = 2; |
||||
|
||||
// COLUMN_TYPE_METADATA is a column containing structured metadata values for |
||||
// a specific key. |
||||
COLUMN_TYPE_METADATA = 3; |
||||
|
||||
// COLUMN_TYPE_MESSAGE is a column containing the message of the log record. |
||||
COLUMN_TYPE_MESSAGE = 4; |
||||
} |
||||
|
||||
// ColumnMetadata describes the metadata for a column. |
||||
message ColumnMetadata { |
||||
// Pages within the column. |
||||
repeated PageDesc pages = 1; |
||||
} |
||||
|
||||
// PageDesc describes an individual page within a column. |
||||
message PageDesc { |
||||
// Information about the page. |
||||
dataobj.metadata.dataset.v1.PageInfo info = 1; |
||||
} |
||||
Loading…
Reference in new issue