Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
loki/pkg/dataobj/encoder.go

281 lines
8.0 KiB

refactor(dataobj): invert dependency between dataobj and sections (#17762) Originally, the dataobj package was a higher-level API around sections. This design caused it to become a bottleneck: * Implementing any new public behaviour for a section required bubbling it up to the dataobj API for it to be exposed, making it tedious to add new sections or update existing ones. * The `dataobj.Builder` pattern was focused on constructing dataobjs for storing log data, which will cause friction as we build objects around other use cases. This PR builds on top of the foundation laid out by #17704 and #17708, fully inverting the dependency between dataobj and sections: * The `dataobj` package has no knowledge of what sections exist, and can now be used for writing and reading generic sections. Section packages now create higher-level APIs around the abstractions provided by `dataobj`. * Section packages are now public, and callers interact directly with these packages for writing and reading section-specific data. * All logic for a section (encoding, decoding, buffering, reading) is now fully self-contained inside the section package. Previously, the implementation of each section was spread across three packages (`pkg/dataobj/internal/encoding`, `pkg/dataobj/internal/sections/SECTION`, `pkg/dataobj`). * Cutting a section is now a decision made by the caller rather than the section implementation. Previously, the logs section builder would create multiple sections. For the most part, this change is a no-op, with two exceptions: 1. Section cutting is now performed by the caller; however, this shouldn't result in any issues. 2. Removing the high-level `dataobj.Stream` and `dataobj.Record` types will temporarily reduce the allocation gains from #16988. I will address this after this PR is merged.
1 month ago
package dataobj
import (
"bytes"
"encoding/binary"
"fmt"
"github.com/gogo/protobuf/proto"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/filemd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/streamio"
"github.com/grafana/loki/v3/pkg/dataobj/internal/util/bufpool"
"github.com/grafana/loki/v3/pkg/dataobj/internal/util/protocodec"
)
var (
magic = []byte("THOR")
)
const (
fileFormatVersion = 0x1
)
// Legacy section types; these can be removed once support for the Kind field
// is completely removed.
var (
legacySectionTypeInvalid = SectionType{}
legacySectionTypeStreams = SectionType{"github.com/grafana/loki", "streams"}
legacySectionTypeLogs = SectionType{"github.com/grafana/loki", "logs"}
)
// TODO(rfratto): the memory footprint of [Encoder] can very slowly grow in
// memory as [bufpool] is filled with buffers with increasing capacity:
// each encoding pass has a different number of elements, shuffling which
// elements of the hierarchy get which pooled buffers.
//
// This means that elements that require more bytes will grow the capacity of
// the buffer and put the buffer back into the pool. Even if further encoding
// passes don't need that many bytes, the buffer is kept alive with its larger
// footprint. Given enough time, all buffers in the pool will have a large
// capacity.
//
// The bufpool package provides bucketed pools as a solution to, but this
// requires knowing how many bytes are needed.
//
// Encoder can eventually be moved to the bucketed pools by calculating a
// rolling maximum of encoding size used per element across usages of an
// Encoder instance. This would then allow larger buffers to be eventually
// reclaimed regardless of how often encoding is done.
// encoder encodes a data object. Data objects are hierarchical, split into
// distinct sections that contain their own hierarchy.
//
// To support hierarchical encoding, a set of Open* methods are provided to
// open a child element. Only one child element may be open at a given time;
// call Commit or Discard on a child element to close it.
type encoder struct {
startOffset int // Byte offset in the file where data starts after the header.
sections []*filemd.SectionInfo
typesReady bool
dictionary []string
dictionaryLookup map[string]uint32
rawTypes []*filemd.SectionType
typeRefLookup map[SectionType]uint32
data *bytes.Buffer
}
// newEncoder creates a new Encoder which writes a data object to the provided
// writer.
func newEncoder() *encoder {
return &encoder{startOffset: len(magic)}
}
// AppendSection appends a section to the data object. AppendSection panics if
// typ is not SectionTypeLogs or SectionTypeStreams.
func (enc *encoder) AppendSection(typ SectionType, data, metadata []byte) {
if enc.data == nil {
// Lazily initialize enc.data. This allows an Encoder to persist for the
// lifetime of a dataobj.Builder without holding onto memory when no data
// is present.
enc.data = bufpool.GetUnsized()
}
info := &filemd.SectionInfo{
TypeRef: enc.getTypeRef(typ),
Layout: &filemd.SectionLayout{
Data: &filemd.Region{
Offset: uint64(enc.startOffset + enc.data.Len()),
Length: uint64(len(data)),
},
Metadata: &filemd.Region{
Offset: uint64(enc.startOffset + enc.data.Len() + len(data)),
Length: uint64(len(metadata)),
},
},
}
// bytes.Buffer.Write never fails.
enc.data.Grow(len(data) + len(metadata))
_, _ = enc.data.Write(data)
_, _ = enc.data.Write(metadata)
enc.sections = append(enc.sections, info)
}
// getTypeRef returns the type reference for the given type or creates a new
// one.
func (enc *encoder) getTypeRef(typ SectionType) uint32 {
if !enc.typesReady {
enc.initLegacyTypeRefs()
}
ref, ok := enc.typeRefLookup[typ]
if !ok {
// Create a new type reference.
enc.typeRefLookup[typ] = uint32(len(enc.rawTypes))
enc.rawTypes = append(enc.rawTypes, &filemd.SectionType{
NameRef: &filemd.SectionType_NameRef{
NamespaceRef: enc.getDictionaryKey(typ.Namespace),
KindRef: enc.getDictionaryKey(typ.Kind),
},
})
return enc.typeRefLookup[typ]
}
return ref
}
func (enc *encoder) initLegacyTypeRefs() {
// Reserve the zero index in the dictionary for an invalid entry. This is
// only required for the type refs, but it's still easier to debug.
enc.dictionary = []string{"", "github.com/grafana/loki", "streams", "logs"}
enc.dictionaryLookup = map[string]uint32{
"": 0,
"github.com/grafana/loki": 1,
"streams": 2,
"logs": 3,
}
enc.rawTypes = []*filemd.SectionType{
{NameRef: nil}, // Invalid type.
{NameRef: &filemd.SectionType_NameRef{NamespaceRef: 1, KindRef: 2}}, // Streams.
{NameRef: &filemd.SectionType_NameRef{NamespaceRef: 1, KindRef: 3}}, // Logs.
}
enc.typeRefLookup = map[SectionType]uint32{
legacySectionTypeInvalid: 0,
legacySectionTypeStreams: 1,
legacySectionTypeLogs: 2,
}
enc.typesReady = true
}
// getDictionaryKey returns the dictionary key for the given text or creates a
// new entry.
func (enc *encoder) getDictionaryKey(text string) uint32 {
if enc.dictionaryLookup == nil {
enc.dictionaryLookup = make(map[string]uint32)
}
key, ok := enc.dictionaryLookup[text]
if ok {
return key
}
key = uint32(len(enc.dictionary))
enc.dictionary = append(enc.dictionary, text)
enc.dictionaryLookup[text] = key
return key
}
// MetadataSize returns an estimate of the current size of the metadata for the
// data object. MetadataSize does not include the size of data appended or the
// currently open stream.
func (enc *encoder) MetadataSize() int { return proto.Size(enc.Metadata()) }
func (enc *encoder) Metadata() proto.Message {
sections := enc.sections[:len(enc.sections):cap(enc.sections)]
return &filemd.Metadata{
Sections: sections,
Dictionary: enc.dictionary,
Types: enc.rawTypes,
}
}
func (enc *encoder) Bytes() int {
if enc.data == nil {
return 0
}
return enc.data.Len()
}
// Flush flushes any buffered data to the underlying writer. After flushing,
// enc is reset.
func (enc *encoder) Flush(w streamio.Writer) (int64, error) {
cw := countingWriter{w: w}
if enc.data == nil {
return cw.count, fmt.Errorf("empty Encoder")
}
metadataBuffer := bufpool.GetUnsized()
defer bufpool.PutUnsized(metadataBuffer)
// The file metadata should start with the version.
if err := streamio.WriteUvarint(metadataBuffer, fileFormatVersion); err != nil {
return cw.count, err
} else if err := protocodec.Encode(metadataBuffer, enc.Metadata()); err != nil {
return cw.count, err
}
// The overall structure is:
//
// header:
// [magic]
// body:
// [data]
// [metadata]
// tailer:
// [file metadata size (32 bits)]
// [magic]
//
// The file metadata size *must not* be varint since we need the last 8 bytes
// of the file to consistently retrieve the tailer.
if _, err := cw.Write(magic); err != nil {
return cw.count, fmt.Errorf("writing magic header: %w", err)
} else if _, err := cw.Write(enc.data.Bytes()); err != nil {
return cw.count, fmt.Errorf("writing data: %w", err)
} else if _, err := cw.Write(metadataBuffer.Bytes()); err != nil {
return cw.count, fmt.Errorf("writing metadata: %w", err)
} else if err := binary.Write(&cw, binary.LittleEndian, uint32(metadataBuffer.Len())); err != nil {
return cw.count, fmt.Errorf("writing metadata size: %w", err)
} else if _, err := cw.Write(magic); err != nil {
return cw.count, fmt.Errorf("writing magic tailer: %w", err)
}
enc.Reset()
return cw.count, nil
}
// Reset resets the Encoder to a fresh state.
func (enc *encoder) Reset() {
enc.startOffset = len(magic)
enc.sections = nil
enc.typesReady = false
enc.dictionary = nil
enc.rawTypes = nil
enc.typeRefLookup = nil
bufpool.PutUnsized(enc.data)
enc.data = nil
}
type countingWriter struct {
w streamio.Writer
count int64
}
func (cw *countingWriter) Write(p []byte) (n int, err error) {
n, err = cw.w.Write(p)
cw.count += int64(n)
return n, err
}
func (cw *countingWriter) WriteByte(c byte) error {
if err := cw.w.WriteByte(c); err != nil {
return err
}
cw.count++
return nil
}