Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/dataobj/internal/encoding/encoder_streams.go

274 lines
8.1 KiB

package encoding
import (
"bytes"
"math"
"github.com/gogo/protobuf/proto"
"github.com/grafana/loki/v3/pkg/dataobj/internal/dataset"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/streamsmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/streamio"
)
// StreamsEncoder encodes an individual streams section in a data object.
// StreamsEncoder are created by [Encoder]s.
type StreamsEncoder struct {
parent *Encoder
startOffset int // Byte offset in the file where the column starts.
closed bool // true if StreamsEncoder has been closed.
data *bytes.Buffer
columns []*streamsmd.ColumnDesc // closed columns.
curColumn *streamsmd.ColumnDesc // curColumn is the currently open column.
}
func newStreamsEncoder(parent *Encoder, offset int) *StreamsEncoder {
buf := bytesBufferPool.Get().(*bytes.Buffer)
buf.Reset()
return &StreamsEncoder{
parent: parent,
startOffset: offset,
data: buf,
}
}
// OpenColumn opens a new column in the streams section. OpenColumn fails if
// there is another open column or if the StreamsEncoder has been closed.
func (enc *StreamsEncoder) OpenColumn(columnType streamsmd.ColumnType, info *dataset.ColumnInfo) (*StreamsColumnEncoder, error) {
if enc.curColumn != nil {
return nil, ErrElementExist
} else if enc.closed {
return nil, ErrClosed
}
// MetadataOffset and MetadataSize aren't available until the column is
// closed. We temporarily set these fields to the maximum values so they're
// accounted for in the MetadataSize estimate.
enc.curColumn = &streamsmd.ColumnDesc{
Type: columnType,
Info: &datasetmd.ColumnInfo{
Name: info.Name,
ValueType: info.Type,
RowsCount: uint64(info.RowsCount),
ValuesCount: uint64(info.ValuesCount),
Compression: info.Compression,
UncompressedSize: uint64(info.UncompressedSize),
CompressedSize: uint64(info.CompressedSize),
Statistics: info.Statistics,
MetadataOffset: math.MaxUint32,
MetadataSize: math.MaxUint32,
},
}
return newStreamsColumnEncoder(
enc,
enc.startOffset+enc.data.Len(),
), nil
}
// MetadataSize returns an estimate of the current size of the metadata for the
// stream. MetadataSize includes an estimate for the currently open element.
func (enc *StreamsEncoder) MetadataSize() int { return elementMetadataSize(enc) }
func (enc *StreamsEncoder) metadata() proto.Message {
columns := enc.columns[:len(enc.columns):cap(enc.columns)]
if enc.curColumn != nil {
columns = append(columns, enc.curColumn)
}
return &streamsmd.Metadata{Columns: columns}
}
// Commit closes the section, flushing all data to the parent element. After
// Commit is called, the StreamsEncoder can no longer be modified.
//
// Commit fails if there is an open column.
func (enc *StreamsEncoder) Commit() error {
if enc.closed {
return ErrClosed
} else if enc.curColumn != nil {
return ErrElementExist
}
enc.closed = true
defer bytesBufferPool.Put(enc.data)
if len(enc.columns) == 0 {
// No data was written; discard.
return enc.parent.append(nil, nil)
}
metadataBuffer := bytesBufferPool.Get().(*bytes.Buffer)
metadataBuffer.Reset()
defer bytesBufferPool.Put(metadataBuffer)
// The section metadata should start with its version.
if err := streamio.WriteUvarint(metadataBuffer, streamsFormatVersion); err != nil {
return err
} else if err := elementMetadataWrite(enc, metadataBuffer); err != nil {
return err
}
return enc.parent.append(enc.data.Bytes(), metadataBuffer.Bytes())
}
// Discard discards the section, discarding any data written to it. After
// Discard is called, the StreamsEncoder can no longer be modified.
//
// Discard fails if there is an open column.
func (enc *StreamsEncoder) Discard() error {
if enc.closed {
return ErrClosed
} else if enc.curColumn != nil {
return ErrElementExist
}
enc.closed = true
defer bytesBufferPool.Put(enc.data)
return enc.parent.append(nil, nil)
}
// append adds data and metadata to enc. append must only be called from child
// elements on Close and Discard. Discard calls must pass nil for both data and
// metadata to denote a discard.
func (enc *StreamsEncoder) append(data, metadata []byte) error {
if enc.closed {
return ErrClosed
} else if enc.curColumn == nil {
return errElementNoExist
}
if len(data) == 0 && len(metadata) == 0 {
// Column was discarded.
enc.curColumn = nil
return nil
}
enc.curColumn.Info.MetadataOffset = uint64(enc.startOffset + enc.data.Len() + len(data))
enc.curColumn.Info.MetadataSize = uint64(len(metadata))
// bytes.Buffer.Write never fails.
enc.data.Grow(len(data) + len(metadata))
_, _ = enc.data.Write(data)
_, _ = enc.data.Write(metadata)
enc.columns = append(enc.columns, enc.curColumn)
enc.curColumn = nil
return nil
}
// StreamsColumnEncoder encodes an individual column in a streams section.
// StreamsColumnEncoder are created by [StreamsEncoder].
type StreamsColumnEncoder struct {
parent *StreamsEncoder
startOffset int // Byte offset in the file where the column starts.
closed bool // true if StreamsColumnEncoder has been closed.
data *bytes.Buffer // All page data.
pageHeaders []*streamsmd.PageDesc
memPages []*dataset.MemPage // Pages to write.
totalPageSize int // Total size of all pages.
}
func newStreamsColumnEncoder(parent *StreamsEncoder, offset int) *StreamsColumnEncoder {
buf := bytesBufferPool.Get().(*bytes.Buffer)
buf.Reset()
return &StreamsColumnEncoder{
parent: parent,
startOffset: offset,
data: buf,
}
}
// AppendPage appens a new [dataset.MemPage] to the column. AppendPage fails if
// the column has been closed.
func (enc *StreamsColumnEncoder) AppendPage(page *dataset.MemPage) error {
if enc.closed {
return ErrClosed
}
// It's possible the caller can pass an incorrect value for UncompressedSize
// and CompressedSize, but those fields are purely for stats so we don't
// check it.
enc.pageHeaders = append(enc.pageHeaders, &streamsmd.PageDesc{
Info: &datasetmd.PageInfo{
UncompressedSize: uint64(page.Info.UncompressedSize),
CompressedSize: uint64(page.Info.CompressedSize),
Crc32: page.Info.CRC32,
RowsCount: uint64(page.Info.RowCount),
ValuesCount: uint64(page.Info.ValuesCount),
Encoding: page.Info.Encoding,
DataOffset: uint64(enc.startOffset + enc.totalPageSize),
DataSize: uint64(len(page.Data)),
Statistics: page.Info.Stats,
},
})
enc.memPages = append(enc.memPages, page)
enc.totalPageSize += len(page.Data)
return nil
}
// MetadataSize returns an estimate of the current size of the metadata for the
// column. MetadataSize does not include the size of data appended.
func (enc *StreamsColumnEncoder) MetadataSize() int { return elementMetadataSize(enc) }
func (enc *StreamsColumnEncoder) metadata() proto.Message {
return &streamsmd.ColumnMetadata{Pages: enc.pageHeaders}
}
// Commit closes the column, flushing all data to the parent element. After
// Commit is called, the StreamsColumnEncoder can no longer be modified.
func (enc *StreamsColumnEncoder) Commit() error {
if enc.closed {
return ErrClosed
}
enc.closed = true
defer bytesBufferPool.Put(enc.data)
if len(enc.pageHeaders) == 0 {
// No data was written; discard.
return enc.parent.append(nil, nil)
}
// Write all pages. To avoid costly reallocations, we grow our buffer to fit
// all data first.
enc.data.Grow(enc.totalPageSize)
for _, p := range enc.memPages {
_, _ = enc.data.Write(p.Data) // bytes.Buffer.Write never fails.
}
metadataBuffer := bytesBufferPool.Get().(*bytes.Buffer)
metadataBuffer.Reset()
defer bytesBufferPool.Put(metadataBuffer)
if err := elementMetadataWrite(enc, metadataBuffer); err != nil {
return err
}
return enc.parent.append(enc.data.Bytes(), metadataBuffer.Bytes())
}
// Discard discards the column, discarding any data written to it. After
// Discard is called, the StreamsColumnEncoder can no longer be modified.
func (enc *StreamsColumnEncoder) Discard() error {
if enc.closed {
return ErrClosed
}
enc.closed = true
defer bytesBufferPool.Put(enc.data)
return enc.parent.append(nil, nil) // Notify parent of discard.
}