mirror of https://github.com/grafana/loki
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
350 lines
11 KiB
350 lines
11 KiB
package columnar
|
|
|
|
import (
|
|
"bytes"
|
|
"errors"
|
|
"math"
|
|
|
|
"github.com/gogo/protobuf/proto"
|
|
|
|
"github.com/grafana/loki/v3/pkg/dataobj"
|
|
"github.com/grafana/loki/v3/pkg/dataobj/internal/dataset"
|
|
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
|
|
"github.com/grafana/loki/v3/pkg/dataobj/internal/streamio"
|
|
"github.com/grafana/loki/v3/pkg/dataobj/internal/util/bufpool"
|
|
"github.com/grafana/loki/v3/pkg/dataobj/internal/util/protocodec"
|
|
)
|
|
|
|
// FormatVersion is the current version the encoding format for dataset-derived
|
|
// sections.
|
|
//
|
|
// FormatVersion started at 2 to help consolidate all the duplicated encoding
|
|
// code across the original section implementations (which were all collectively
|
|
// at v1).
|
|
const FormatVersion = 2
|
|
|
|
var (
|
|
// errElementNoExist is used when a child element tries to notify its parent
|
|
// of it closing but the parent doesn't have a child open. This would
|
|
// indicate a bug in the encoder so it's not exposed to callers.
|
|
errElementNoExist = errors.New("open element does not exist")
|
|
errElementExist = errors.New("open element already exists")
|
|
errClosed = errors.New("element is closed")
|
|
)
|
|
|
|
// Encoder encodes an individual dataset-based section in a data object.
|
|
//
|
|
// The zero value of Encoder is ready for use.
|
|
type Encoder struct {
|
|
tenant string
|
|
sortInfo *datasetmd.SortInfo
|
|
|
|
data *bytes.Buffer
|
|
metadata *bytes.Buffer
|
|
|
|
dictionary []string
|
|
dictionaryLookup map[string]uint32
|
|
|
|
columns []*datasetmd.ColumnDesc
|
|
curColumn *datasetmd.ColumnDesc
|
|
}
|
|
|
|
// SetTenant sets the tenant ID for the section. This must be called before
|
|
// calling [Encoder.Flush]. The set tenant is reset after flushing or calling
|
|
// [Encoder.Reset].
|
|
func (enc *Encoder) SetTenant(tenant string) { enc.tenant = tenant }
|
|
|
|
// SetSortInfo sets the sort order information for the section. This must be
|
|
// called before calling [Encoder.Flush]. The sort order information is reset
|
|
// after flushing or calling [Encoder.Reset].
|
|
func (enc *Encoder) SetSortInfo(info *datasetmd.SortInfo) { enc.sortInfo = info }
|
|
|
|
// NumColumns returns the number of columns committed to the section.
|
|
func (enc *Encoder) NumColumns() int { return len(enc.columns) }
|
|
|
|
// OpenColumn opens a new column in the section. OpenColumn fails if there is
|
|
// another open column.
|
|
func (enc *Encoder) OpenColumn(desc *dataset.ColumnDesc) (*ColumnEncoder, error) {
|
|
if enc.curColumn != nil {
|
|
return nil, errElementExist
|
|
}
|
|
|
|
enc.curColumn = &datasetmd.ColumnDesc{
|
|
Type: &datasetmd.ColumnType{
|
|
Physical: desc.Type.Physical,
|
|
LogicalRef: enc.getDictionaryRef(desc.Type.Logical),
|
|
},
|
|
TagRef: enc.getDictionaryRef(desc.Tag),
|
|
RowsCount: uint64(desc.RowsCount),
|
|
ValuesCount: uint64(desc.ValuesCount),
|
|
Compression: desc.Compression,
|
|
UncompressedSize: uint64(desc.UncompressedSize),
|
|
CompressedSize: uint64(desc.CompressedSize),
|
|
Statistics: desc.Statistics,
|
|
|
|
// These fields aren't available until the column is closed. We
|
|
// temporarily set them to the maximum values so they're accounted for
|
|
// when estimating metadata size.
|
|
|
|
PagesCount: math.MaxUint32,
|
|
ColumnMetadataOffset: math.MaxUint32,
|
|
ColumnMetadataLength: math.MaxUint32,
|
|
}
|
|
|
|
return newColumnEncoder(enc, enc.dataSize()), nil
|
|
}
|
|
|
|
func (enc *Encoder) getDictionaryRef(text string) uint32 {
|
|
enc.initDictionary() // Make sure the dictionary is initialized.
|
|
|
|
if idx, ok := enc.dictionaryLookup[text]; ok {
|
|
return idx
|
|
}
|
|
|
|
idx := uint32(len(enc.dictionary))
|
|
enc.dictionary = append(enc.dictionary, text)
|
|
enc.dictionaryLookup[text] = idx
|
|
|
|
return idx
|
|
}
|
|
|
|
func (enc *Encoder) initDictionary() {
|
|
if len(enc.dictionary) > 0 && len(enc.dictionaryLookup) > 0 {
|
|
return // Already initialized.
|
|
}
|
|
|
|
// Initialize the dictionary with index zero being reserved for the
|
|
// empty string.
|
|
enc.dictionary = []string{""}
|
|
enc.dictionaryLookup = map[string]uint32{"": 0}
|
|
}
|
|
|
|
// dataSize returns the current number of buffered data bytes.
|
|
func (enc *Encoder) dataSize() int {
|
|
if enc.data == nil {
|
|
return 0
|
|
}
|
|
return enc.data.Len()
|
|
}
|
|
|
|
// appendColumn adds column data and metadata to enc. appendColumn must only be
|
|
// called from the current child [ColumnEncoder] on Close or Discard. Discard
|
|
// calls must pass nil for both data and metadata to denote a discard.
|
|
//
|
|
// numPages specifies how many pages were encoded in the column.
|
|
//
|
|
// enc *must never* retain data or metadata beyond the call to appendColumn.
|
|
func (enc *Encoder) appendColumn(numPages int, data, metadata []byte) error {
|
|
if enc.curColumn == nil {
|
|
return errElementNoExist
|
|
}
|
|
|
|
if len(data) == 0 && len(metadata) == 0 {
|
|
// Column was discarded.
|
|
enc.curColumn = nil
|
|
return nil
|
|
}
|
|
|
|
// Initialize enc.data and enc.metadata.
|
|
enc.initBuffers()
|
|
|
|
// Update deferred fields now that we know their values.
|
|
enc.curColumn.PagesCount = uint64(numPages)
|
|
enc.curColumn.ColumnMetadataOffset = uint64(enc.metadata.Len())
|
|
enc.curColumn.ColumnMetadataLength = uint64(len(metadata))
|
|
|
|
// [bytes.Buffer.Write] never fails.
|
|
_, _ = enc.data.Write(data)
|
|
_, _ = enc.metadata.Write(metadata)
|
|
|
|
enc.columns = append(enc.columns, enc.curColumn)
|
|
enc.curColumn = nil
|
|
return nil
|
|
}
|
|
|
|
func (enc *Encoder) initBuffers() {
|
|
if enc.data != nil && enc.metadata != nil {
|
|
return
|
|
}
|
|
|
|
enc.data = bufpool.GetUnsized()
|
|
enc.metadata = bufpool.GetUnsized()
|
|
|
|
// Initialize the metadata buffer with the format version. Section
|
|
// implementations will use the version stored in the SectionType, but we
|
|
// write the format version to the metadata anyway as the first byte so that
|
|
// older versions of dataobj readers (where the version was encoded in
|
|
// metadata) recognizes that it's a newer format and abort.
|
|
//
|
|
// TODO(rfratto): remove this once we've fully rolled out the new format,
|
|
// and there's no more instances of Loki which are reading the v1 format.
|
|
// Since this byte is written but never read, it can be removed without
|
|
// needing a new format version or a breaking change.
|
|
_ = streamio.WriteUvarint(enc.metadata, FormatVersion) // [bytes.Buffer.WriteByte] never returns an error.
|
|
}
|
|
|
|
// Flush writes the section to the given [dataobj.SectionWriter]. Flush returns
|
|
// an error if there is a currently open column.
|
|
//
|
|
// After Flush is called successfully, the encoder is reset to a fresh state and
|
|
// can be reused.
|
|
func (enc *Encoder) Flush(w dataobj.SectionWriter) (int64, error) {
|
|
if enc.curColumn != nil {
|
|
return 0, errElementExist
|
|
}
|
|
defer enc.Reset()
|
|
|
|
// Perform last-minute initialization for any unused data.
|
|
enc.initBuffers()
|
|
enc.initDictionary()
|
|
|
|
// Encode SectionMetadata into our buffer, but record the offset before and
|
|
// after so we can store its offset/length into our info extension.
|
|
startMetadataOffset := enc.metadata.Len()
|
|
_ = protocodec.Encode(enc.metadata, enc.buildSectionMetadata()) // Writes to [bytes.Buffer] never fail
|
|
endMetadataOffset := enc.metadata.Len()
|
|
|
|
sectionInfoExtension := enc.buildSectionInfoExtension(
|
|
startMetadataOffset,
|
|
endMetadataOffset-startMetadataOffset,
|
|
)
|
|
|
|
extensionBuffer := bufpool.Get(protocodec.Size(sectionInfoExtension))
|
|
defer bufpool.Put(extensionBuffer)
|
|
|
|
_ = protocodec.Encode(extensionBuffer, sectionInfoExtension)
|
|
|
|
opts := &dataobj.WriteSectionOptions{
|
|
Tenant: enc.tenant,
|
|
ExtensionData: extensionBuffer.Bytes(),
|
|
}
|
|
return w.WriteSection(opts, enc.data.Bytes(), enc.metadata.Bytes())
|
|
}
|
|
|
|
func (enc *Encoder) buildSectionMetadata() *datasetmd.SectionMetadata {
|
|
return &datasetmd.SectionMetadata{
|
|
Columns: enc.columns,
|
|
Dictionary: enc.dictionary,
|
|
SortInfo: enc.sortInfo,
|
|
}
|
|
}
|
|
|
|
func (enc *Encoder) buildSectionInfoExtension(offset, length int) *datasetmd.SectionInfoExtension {
|
|
return &datasetmd.SectionInfoExtension{
|
|
SectionMetadataOffset: uint64(offset),
|
|
SectionMetadataLength: uint64(length),
|
|
}
|
|
}
|
|
|
|
// Reset resets the encoder to a fresh state, discarding any in-progress columns.
|
|
func (enc *Encoder) Reset() {
|
|
enc.tenant = ""
|
|
enc.sortInfo = nil
|
|
|
|
bufpool.PutUnsized(enc.data)
|
|
bufpool.PutUnsized(enc.metadata)
|
|
enc.data = nil
|
|
enc.metadata = nil
|
|
|
|
enc.dictionary = nil
|
|
enc.dictionaryLookup = nil
|
|
|
|
enc.columns = nil
|
|
enc.curColumn = nil
|
|
}
|
|
|
|
// The ColumnEncoder encodes data for a single column in a section.
|
|
// ColumnEncoders must be created by an [Encoder].
|
|
type ColumnEncoder struct {
|
|
parent *Encoder
|
|
|
|
dataOffset int // Byte offset in the data region where column starts.
|
|
closed bool // True if ColumnEncoder has been closed.
|
|
|
|
memPages []*dataset.MemPage // Pages to write.
|
|
pageDescs []*datasetmd.PageDesc // Page descriptions.
|
|
totalPageSize int // Total size of all pages.
|
|
}
|
|
|
|
func newColumnEncoder(parent *Encoder, dataOffset int) *ColumnEncoder {
|
|
return &ColumnEncoder{
|
|
parent: parent,
|
|
|
|
dataOffset: dataOffset,
|
|
}
|
|
}
|
|
|
|
// AppendPage appends a new page to enc. The page must not be modified after
|
|
// passing to AppendPage.
|
|
//
|
|
// AppendPage fails if enc is closed.
|
|
func (enc *ColumnEncoder) AppendPage(page *dataset.MemPage) error {
|
|
if enc.closed {
|
|
return errClosed
|
|
}
|
|
|
|
// NOTE(rfratto): The caller can pass invalid values for the page info, but
|
|
// these don't impact encoding so we don't provide any validation.
|
|
enc.pageDescs = append(enc.pageDescs, &datasetmd.PageDesc{
|
|
UncompressedSize: uint64(page.Desc.UncompressedSize),
|
|
CompressedSize: uint64(page.Desc.CompressedSize),
|
|
Crc32: page.Desc.CRC32,
|
|
RowsCount: uint64(page.Desc.RowCount),
|
|
ValuesCount: uint64(page.Desc.ValuesCount),
|
|
Encoding: page.Desc.Encoding,
|
|
|
|
DataOffset: uint64(enc.dataOffset + enc.totalPageSize),
|
|
DataSize: uint64(len(page.Data)),
|
|
|
|
Statistics: page.Desc.Stats,
|
|
})
|
|
|
|
enc.memPages = append(enc.memPages, page)
|
|
enc.totalPageSize += len(page.Data)
|
|
return nil
|
|
}
|
|
|
|
// Commit completes the column, appending it to the section. After Commit is
|
|
// called, enc can no longer be used.
|
|
//
|
|
// If no pages have been appnded, Commit appends an empty column to the section.
|
|
func (enc *ColumnEncoder) Commit() error {
|
|
if enc.closed {
|
|
return errClosed
|
|
}
|
|
enc.closed = true
|
|
|
|
columnData := bufpool.Get(enc.totalPageSize)
|
|
defer bufpool.PutUnsized(columnData)
|
|
|
|
for _, p := range enc.memPages {
|
|
_, _ = columnData.Write(p.Data) // [bytes.Buffer.Write] never fails.
|
|
}
|
|
|
|
metadata := enc.buildMetadata()
|
|
|
|
columnMetadata := bufpool.Get(protocodec.Size(metadata))
|
|
defer bufpool.Put(columnMetadata)
|
|
|
|
if err := protocodec.Encode(columnMetadata, metadata); err != nil {
|
|
return err
|
|
}
|
|
|
|
return enc.parent.appendColumn(len(enc.memPages), columnData.Bytes(), columnMetadata.Bytes())
|
|
}
|
|
|
|
// Discard discards the column, discarding any data written to it. After Discard
|
|
// is called, enc can no longer be used.
|
|
func (enc *ColumnEncoder) Discard() error {
|
|
if enc.closed {
|
|
return errClosed
|
|
}
|
|
enc.closed = true
|
|
|
|
return enc.parent.appendColumn(0, nil, nil) // Notify parent of discard.
|
|
}
|
|
|
|
// buildMetadata builds the metadata message for the column.
|
|
func (enc *ColumnEncoder) buildMetadata() proto.Message {
|
|
return &datasetmd.ColumnMetadata{Pages: enc.pageDescs}
|
|
}
|
|
|