Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/dataobj/sections/internal/columnar/encoder.go

350 lines
11 KiB

package columnar
import (
"bytes"
"errors"
"math"
"github.com/gogo/protobuf/proto"
"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/dataobj/internal/dataset"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/streamio"
"github.com/grafana/loki/v3/pkg/dataobj/internal/util/bufpool"
"github.com/grafana/loki/v3/pkg/dataobj/internal/util/protocodec"
)
// FormatVersion is the current version the encoding format for dataset-derived
// sections.
//
// FormatVersion started at 2 to help consolidate all the duplicated encoding
// code across the original section implementations (which were all collectively
// at v1).
const FormatVersion = 2
var (
// errElementNoExist is used when a child element tries to notify its parent
// of it closing but the parent doesn't have a child open. This would
// indicate a bug in the encoder so it's not exposed to callers.
errElementNoExist = errors.New("open element does not exist")
errElementExist = errors.New("open element already exists")
errClosed = errors.New("element is closed")
)
// Encoder encodes an individual dataset-based section in a data object.
//
// The zero value of Encoder is ready for use.
type Encoder struct {
tenant string
sortInfo *datasetmd.SortInfo
data *bytes.Buffer
metadata *bytes.Buffer
dictionary []string
dictionaryLookup map[string]uint32
columns []*datasetmd.ColumnDesc
curColumn *datasetmd.ColumnDesc
}
// SetTenant sets the tenant ID for the section. This must be called before
// calling [Encoder.Flush]. The set tenant is reset after flushing or calling
// [Encoder.Reset].
func (enc *Encoder) SetTenant(tenant string) { enc.tenant = tenant }
// SetSortInfo sets the sort order information for the section. This must be
// called before calling [Encoder.Flush]. The sort order information is reset
// after flushing or calling [Encoder.Reset].
func (enc *Encoder) SetSortInfo(info *datasetmd.SortInfo) { enc.sortInfo = info }
// NumColumns returns the number of columns committed to the section.
func (enc *Encoder) NumColumns() int { return len(enc.columns) }
// OpenColumn opens a new column in the section. OpenColumn fails if there is
// another open column.
func (enc *Encoder) OpenColumn(desc *dataset.ColumnDesc) (*ColumnEncoder, error) {
if enc.curColumn != nil {
return nil, errElementExist
}
enc.curColumn = &datasetmd.ColumnDesc{
Type: &datasetmd.ColumnType{
Physical: desc.Type.Physical,
LogicalRef: enc.getDictionaryRef(desc.Type.Logical),
},
TagRef: enc.getDictionaryRef(desc.Tag),
RowsCount: uint64(desc.RowsCount),
ValuesCount: uint64(desc.ValuesCount),
Compression: desc.Compression,
UncompressedSize: uint64(desc.UncompressedSize),
CompressedSize: uint64(desc.CompressedSize),
Statistics: desc.Statistics,
// These fields aren't available until the column is closed. We
// temporarily set them to the maximum values so they're accounted for
// when estimating metadata size.
PagesCount: math.MaxUint32,
ColumnMetadataOffset: math.MaxUint32,
ColumnMetadataLength: math.MaxUint32,
}
return newColumnEncoder(enc, enc.dataSize()), nil
}
func (enc *Encoder) getDictionaryRef(text string) uint32 {
enc.initDictionary() // Make sure the dictionary is initialized.
if idx, ok := enc.dictionaryLookup[text]; ok {
return idx
}
idx := uint32(len(enc.dictionary))
enc.dictionary = append(enc.dictionary, text)
enc.dictionaryLookup[text] = idx
return idx
}
func (enc *Encoder) initDictionary() {
if len(enc.dictionary) > 0 && len(enc.dictionaryLookup) > 0 {
return // Already initialized.
}
// Initialize the dictionary with index zero being reserved for the
// empty string.
enc.dictionary = []string{""}
enc.dictionaryLookup = map[string]uint32{"": 0}
}
// dataSize returns the current number of buffered data bytes.
func (enc *Encoder) dataSize() int {
if enc.data == nil {
return 0
}
return enc.data.Len()
}
// appendColumn adds column data and metadata to enc. appendColumn must only be
// called from the current child [ColumnEncoder] on Close or Discard. Discard
// calls must pass nil for both data and metadata to denote a discard.
//
// numPages specifies how many pages were encoded in the column.
//
// enc *must never* retain data or metadata beyond the call to appendColumn.
func (enc *Encoder) appendColumn(numPages int, data, metadata []byte) error {
if enc.curColumn == nil {
return errElementNoExist
}
if len(data) == 0 && len(metadata) == 0 {
// Column was discarded.
enc.curColumn = nil
return nil
}
// Initialize enc.data and enc.metadata.
enc.initBuffers()
// Update deferred fields now that we know their values.
enc.curColumn.PagesCount = uint64(numPages)
enc.curColumn.ColumnMetadataOffset = uint64(enc.metadata.Len())
enc.curColumn.ColumnMetadataLength = uint64(len(metadata))
// [bytes.Buffer.Write] never fails.
_, _ = enc.data.Write(data)
_, _ = enc.metadata.Write(metadata)
enc.columns = append(enc.columns, enc.curColumn)
enc.curColumn = nil
return nil
}
func (enc *Encoder) initBuffers() {
if enc.data != nil && enc.metadata != nil {
return
}
enc.data = bufpool.GetUnsized()
enc.metadata = bufpool.GetUnsized()
// Initialize the metadata buffer with the format version. Section
// implementations will use the version stored in the SectionType, but we
// write the format version to the metadata anyway as the first byte so that
// older versions of dataobj readers (where the version was encoded in
// metadata) recognizes that it's a newer format and abort.
//
// TODO(rfratto): remove this once we've fully rolled out the new format,
// and there's no more instances of Loki which are reading the v1 format.
// Since this byte is written but never read, it can be removed without
// needing a new format version or a breaking change.
_ = streamio.WriteUvarint(enc.metadata, FormatVersion) // [bytes.Buffer.WriteByte] never returns an error.
}
// Flush writes the section to the given [dataobj.SectionWriter]. Flush returns
// an error if there is a currently open column.
//
// After Flush is called successfully, the encoder is reset to a fresh state and
// can be reused.
func (enc *Encoder) Flush(w dataobj.SectionWriter) (int64, error) {
if enc.curColumn != nil {
return 0, errElementExist
}
defer enc.Reset()
// Perform last-minute initialization for any unused data.
enc.initBuffers()
enc.initDictionary()
// Encode SectionMetadata into our buffer, but record the offset before and
// after so we can store its offset/length into our info extension.
startMetadataOffset := enc.metadata.Len()
_ = protocodec.Encode(enc.metadata, enc.buildSectionMetadata()) // Writes to [bytes.Buffer] never fail
endMetadataOffset := enc.metadata.Len()
sectionInfoExtension := enc.buildSectionInfoExtension(
startMetadataOffset,
endMetadataOffset-startMetadataOffset,
)
extensionBuffer := bufpool.Get(protocodec.Size(sectionInfoExtension))
defer bufpool.Put(extensionBuffer)
_ = protocodec.Encode(extensionBuffer, sectionInfoExtension)
opts := &dataobj.WriteSectionOptions{
Tenant: enc.tenant,
ExtensionData: extensionBuffer.Bytes(),
}
return w.WriteSection(opts, enc.data.Bytes(), enc.metadata.Bytes())
}
func (enc *Encoder) buildSectionMetadata() *datasetmd.SectionMetadata {
return &datasetmd.SectionMetadata{
Columns: enc.columns,
Dictionary: enc.dictionary,
SortInfo: enc.sortInfo,
}
}
func (enc *Encoder) buildSectionInfoExtension(offset, length int) *datasetmd.SectionInfoExtension {
return &datasetmd.SectionInfoExtension{
SectionMetadataOffset: uint64(offset),
SectionMetadataLength: uint64(length),
}
}
// Reset resets the encoder to a fresh state, discarding any in-progress columns.
func (enc *Encoder) Reset() {
enc.tenant = ""
enc.sortInfo = nil
bufpool.PutUnsized(enc.data)
bufpool.PutUnsized(enc.metadata)
enc.data = nil
enc.metadata = nil
enc.dictionary = nil
enc.dictionaryLookup = nil
enc.columns = nil
enc.curColumn = nil
}
// The ColumnEncoder encodes data for a single column in a section.
// ColumnEncoders must be created by an [Encoder].
type ColumnEncoder struct {
parent *Encoder
dataOffset int // Byte offset in the data region where column starts.
closed bool // True if ColumnEncoder has been closed.
memPages []*dataset.MemPage // Pages to write.
pageDescs []*datasetmd.PageDesc // Page descriptions.
totalPageSize int // Total size of all pages.
}
func newColumnEncoder(parent *Encoder, dataOffset int) *ColumnEncoder {
return &ColumnEncoder{
parent: parent,
dataOffset: dataOffset,
}
}
// AppendPage appends a new page to enc. The page must not be modified after
// passing to AppendPage.
//
// AppendPage fails if enc is closed.
func (enc *ColumnEncoder) AppendPage(page *dataset.MemPage) error {
if enc.closed {
return errClosed
}
// NOTE(rfratto): The caller can pass invalid values for the page info, but
// these don't impact encoding so we don't provide any validation.
enc.pageDescs = append(enc.pageDescs, &datasetmd.PageDesc{
UncompressedSize: uint64(page.Desc.UncompressedSize),
CompressedSize: uint64(page.Desc.CompressedSize),
Crc32: page.Desc.CRC32,
RowsCount: uint64(page.Desc.RowCount),
ValuesCount: uint64(page.Desc.ValuesCount),
Encoding: page.Desc.Encoding,
DataOffset: uint64(enc.dataOffset + enc.totalPageSize),
DataSize: uint64(len(page.Data)),
Statistics: page.Desc.Stats,
})
enc.memPages = append(enc.memPages, page)
enc.totalPageSize += len(page.Data)
return nil
}
// Commit completes the column, appending it to the section. After Commit is
// called, enc can no longer be used.
//
// If no pages have been appnded, Commit appends an empty column to the section.
func (enc *ColumnEncoder) Commit() error {
if enc.closed {
return errClosed
}
enc.closed = true
columnData := bufpool.Get(enc.totalPageSize)
defer bufpool.PutUnsized(columnData)
for _, p := range enc.memPages {
_, _ = columnData.Write(p.Data) // [bytes.Buffer.Write] never fails.
}
metadata := enc.buildMetadata()
columnMetadata := bufpool.Get(protocodec.Size(metadata))
defer bufpool.Put(columnMetadata)
if err := protocodec.Encode(columnMetadata, metadata); err != nil {
return err
}
return enc.parent.appendColumn(len(enc.memPages), columnData.Bytes(), columnMetadata.Bytes())
}
// Discard discards the column, discarding any data written to it. After Discard
// is called, enc can no longer be used.
func (enc *ColumnEncoder) Discard() error {
if enc.closed {
return errClosed
}
enc.closed = true
return enc.parent.appendColumn(0, nil, nil) // Notify parent of discard.
}
// buildMetadata builds the metadata message for the column.
func (enc *ColumnEncoder) buildMetadata() proto.Message {
return &datasetmd.ColumnMetadata{Pages: enc.pageDescs}
}