package indexpointers import ( "bytes" "errors" "math" "github.com/gogo/protobuf/proto" "github.com/grafana/loki/v3/pkg/dataobj" "github.com/grafana/loki/v3/pkg/dataobj/internal/dataset" "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd" "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/indexpointersmd" "github.com/grafana/loki/v3/pkg/dataobj/internal/streamio" "github.com/grafana/loki/v3/pkg/dataobj/internal/util/bufpool" "github.com/grafana/loki/v3/pkg/dataobj/internal/util/protocodec" ) const ( indexPointersFormatVersion = 0x1 ) var ( // errElementNoExist is used when a child element tries to notify its parent // of it closing but the parent doesn't have a child open. This would // indicate a bug in the encoder so it's not exposed to callers. errElementNoExist = errors.New("open element does not exist") errElementExist = errors.New("open element already exists") errClosed = errors.New("element is closed") ) // encoder encodes an individual indexpointers section in a data object. // // The zero value of encoder is ready for use. type encoder struct { data *bytes.Buffer columns []*indexpointersmd.ColumnDesc // closed columns. curColumn *indexpointersmd.ColumnDesc // curColumn is the currently open column. } // OpenColumn opens a new column in the indexpointers section. OpenColumn fails if // there is another open column. func (enc *encoder) OpenColumn(columnType indexpointersmd.ColumnType, info *dataset.ColumnInfo) (*columnEncoder, error) { if enc.curColumn != nil { return nil, errElementExist } // MetadataOffset and MetadataSize aren't available until the column is // closed. We temporarily set these fields to the maximum values so they're // accounted for in the MetadataSize estimate. enc.curColumn = &indexpointersmd.ColumnDesc{ Type: columnType, Info: &datasetmd.ColumnInfo{ Name: info.Name, ValueType: info.Type, RowsCount: uint64(info.RowsCount), ValuesCount: uint64(info.ValuesCount), Compression: info.Compression, UncompressedSize: uint64(info.UncompressedSize), CompressedSize: uint64(info.CompressedSize), Statistics: info.Statistics, MetadataOffset: math.MaxUint32, MetadataSize: math.MaxUint32, }, } return newColumnEncoder(enc, enc.size()), nil } // size returns the current number of buffered data bytes. func (enc *encoder) size() int { if enc.data == nil { return 0 } return enc.data.Len() } // MetadataSize returns an estimate of the current size of the metadata for the // indexpointer. MetadataSize includes an estimate for the currently open element. func (enc *encoder) MetadataSize() int { return proto.Size(enc.Metadata()) } func (enc *encoder) Metadata() proto.Message { columns := enc.columns[:len(enc.columns):cap(enc.columns)] if enc.curColumn != nil { columns = append(columns, enc.curColumn) } return &indexpointersmd.Metadata{Columns: columns} } // Flush writes the section to the given [dataobj.SectionWriter]. Flush // returns an error if there is an open column. // // Flush returns 0, nil if there is no data to write. // // After Flush is called successfully, the encoder is reset to a fresh state // and can be reused. func (enc *encoder) Flush(w dataobj.SectionWriter) (int64, error) { if enc.curColumn != nil { return 0, errElementExist } if len(enc.columns) == 0 { return 0, nil } metadataBuffer := bufpool.GetUnsized() defer bufpool.PutUnsized(metadataBuffer) // The section metadata should start with its version. if err := streamio.WriteUvarint(metadataBuffer, indexPointersFormatVersion); err != nil { return 0, err } else if err := protocodec.Encode(metadataBuffer, enc.Metadata()); err != nil { return 0, err } n, err := w.WriteSection(enc.data.Bytes(), metadataBuffer.Bytes()) if err == nil { enc.Reset() } return n, err } // Reset resets the encoder to a fresh state, discarding any in-progress // columns. func (enc *encoder) Reset() { bufpool.PutUnsized(enc.data) enc.data = nil enc.curColumn = nil } // append adds data and metadata to enc. append must only be called from child // elements on Close and Discard. Discard calls must pass nil for both data and // metadata to denote a discard. func (enc *encoder) append(data, metadata []byte) error { if enc.curColumn == nil { return errElementNoExist } if len(data) == 0 && len(metadata) == 0 { // Column was discarded. enc.curColumn = nil return nil } if enc.data == nil { enc.data = bufpool.GetUnsized() } enc.curColumn.Info.MetadataOffset = uint64(enc.data.Len() + len(data)) enc.curColumn.Info.MetadataSize = uint64(len(metadata)) // bytes.Buffer.Write never fails. enc.data.Grow(len(data) + len(metadata)) _, _ = enc.data.Write(data) _, _ = enc.data.Write(metadata) enc.columns = append(enc.columns, enc.curColumn) enc.curColumn = nil return nil } // columnEncoder encodes an individual column in a indexpointers section. // columnEncoder are created by [encoder]. type columnEncoder struct { parent *encoder startOffset int // Byte offset in the section where the column starts. closed bool // true if columnEncoder has been closed. data *bytes.Buffer // All page data. pageHeaders []*indexpointersmd.PageDesc memPages []*dataset.MemPage // Pages to write. totalPageSize int // Total size of all pages. } func newColumnEncoder(parent *encoder, offset int) *columnEncoder { return &columnEncoder{ parent: parent, startOffset: offset, data: bufpool.GetUnsized(), } } // AppendPage appends a new [dataset.MemPage] to the column. AppendPage fails if // the column has been closed. func (enc *columnEncoder) AppendPage(page *dataset.MemPage) error { if enc.closed { return errClosed } // It's possible the caller can pass an incorrect value for UncompressedSize // and CompressedSize, but those fields are purely for stats so we don't // check it. enc.pageHeaders = append(enc.pageHeaders, &indexpointersmd.PageDesc{ Info: &datasetmd.PageInfo{ UncompressedSize: uint64(page.Info.UncompressedSize), CompressedSize: uint64(page.Info.CompressedSize), Crc32: page.Info.CRC32, RowsCount: uint64(page.Info.RowCount), ValuesCount: uint64(page.Info.ValuesCount), Encoding: page.Info.Encoding, DataOffset: uint64(enc.startOffset + enc.totalPageSize), DataSize: uint64(len(page.Data)), Statistics: page.Info.Stats, }, }) enc.memPages = append(enc.memPages, page) enc.totalPageSize += len(page.Data) return nil } // MetadataSize returns an estimate of the current size of the metadata for the // column. MetadataSize does not include the size of data appended. func (enc *columnEncoder) MetadataSize() int { return proto.Size(enc.Metadata()) } func (enc *columnEncoder) Metadata() proto.Message { return &indexpointersmd.ColumnMetadata{Pages: enc.pageHeaders} } // Commit closes the column, flushing all data to the parent element. After // Commit is called, the columnEncoder can no longer be modified. func (enc *columnEncoder) Commit() error { if enc.closed { return errClosed } enc.closed = true defer bufpool.PutUnsized(enc.data) if len(enc.pageHeaders) == 0 { // No data was written; discard. return enc.parent.append(nil, nil) } // Write all pages. To avoid costly reallocations, we grow our buffer to fit // all data first. enc.data.Grow(enc.totalPageSize) for _, p := range enc.memPages { _, _ = enc.data.Write(p.Data) // bytes.Buffer.Write never fails. } metadataBuffer := bufpool.GetUnsized() defer bufpool.PutUnsized(metadataBuffer) if err := protocodec.Encode(metadataBuffer, enc.Metadata()); err != nil { return err } return enc.parent.append(enc.data.Bytes(), metadataBuffer.Bytes()) } // Discard discards the column, discarding any data written to it. After // Discard is called, the columnEncoder can no longer be modified. func (enc *columnEncoder) Discard() error { if enc.closed { return errClosed } enc.closed = true defer bufpool.PutUnsized(enc.data) return enc.parent.append(nil, nil) // Notify parent of discard. }