Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/dataobj/sections/logs/builder.go

386 lines
11 KiB

package logs
import (
"context"
"errors"
"fmt"
"time"
"github.com/klauspost/compress/zstd"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/dataobj/internal/dataset"
datasetmd_v2 "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/util/sliceclear"
"github.com/grafana/loki/v3/pkg/dataobj/sections/internal/columnar"
)
// A Record is an individual log record within the logs section.
type Record struct {
StreamID int64
Timestamp time.Time
Metadata labels.Labels
Line []byte
}
type AppendStrategy int
const (
AppendUnordered = iota
AppendOrdered
)
type SortOrder int
const (
SortStreamASC SortOrder = iota
SortTimestampDESC
)
// BuilderOptions configures the behavior of the logs section.
type BuilderOptions struct {
// PageSizeHint is the size of pages to use when encoding the logs section.
PageSizeHint int
// PageMaxRowCount is the maximum amount of rows of pages to use when encoding the logs section.
PageMaxRowCount int
// BufferSize is the size of the buffer to use when accumulating log records.
BufferSize int
// StripeMergeLimit is the maximum number of stripes to merge at once when
// flushing stripes into a section. StripeMergeLimit must be larger than 1.
//
// Lower values of StripeMergeLimit reduce the memory overhead of merging but
// increase time spent merging. Higher values of StripeMergeLimit increase
// memory overhead but reduce time spent merging.
StripeMergeLimit int
// AppendStrategy is allowed to control how the builder creates the section.
// When appending logs to the section in strict sort order, the [AppendOrdered] can be used to avoid
// creating and sorting of stripes.
AppendStrategy AppendStrategy
// SortOrder defines the order in which the rows of the logs sections are sorted.
// They can either be sorted by [streamID ASC, timestamp DESC] ([SortStreamASC]) or [timestamp DESC, streamID ASC] ([SortTimestampDESC]).
SortOrder SortOrder
}
// Builder accumulate a set of [Record]s within a data object.
type Builder struct {
metrics *Metrics
opts BuilderOptions
// The optional tenant that owns the builder. If specified, the section
// must only contain logs owned by the tenant, and no other tenants.
tenant string
// Sorting the entire set of logs is very expensive, so we need to break it
// up into smaller pieces:
//
// 1. Records are accumulated in memory up to BufferSize; the current size is
// tracked by recordsSize.
//
// 2. Once the buffer is full, records are sorted and flushed to smaller
// [table]s called stripes.
//
// 3. Once the set of stripes reaches SectionSize, they are merged together
// into a final table that will be encoded as a single section.
//
// At the end of this process, there will be a set of sections that are
// encoded separately.
records []Record // Buffered records to flush to a group.
recordsSize int // Byte size of all buffered records (uncompressed).
stripes []*table // In-progress section; flushed with [mergeTables] into a single table.
stripeBuffer tableBuffer
stripesUncompressedSize int // Estimated byte size of all elements in stripes (uncompressed).
stripesCompressedSize int // Estimated byte size of all elements in stripes (compressed).
sectionBuffer tableBuffer
}
// Nwe creates a new logs section. The pageSize argument specifies how large
// pages should be.
func NewBuilder(metrics *Metrics, opts BuilderOptions) *Builder {
if metrics == nil {
metrics = NewMetrics()
}
return &Builder{
metrics: metrics,
opts: opts,
}
}
// Tenant returns the optional tenant that owns the builder.
func (b *Builder) Tenant() string { return b.tenant }
// SetTenant sets the tenant that owns the builder. A builder can be made
// multi-tenant by passing an empty string.
func (b *Builder) SetTenant(tenant string) { b.tenant = tenant }
// Type returns the [dataobj.SectionType] of the logs builder.
func (b *Builder) Type() dataobj.SectionType { return sectionType }
// Append adds a new entry to b.
func (b *Builder) Append(entry Record) {
b.metrics.appendsTotal.Inc()
b.metrics.recordCount.Inc()
b.records = append(b.records, entry)
b.recordsSize += recordSize(entry)
// Shortcut for when logs are appending in strict sort order.
// We skip building temporarily compressed stripes in favour of a speed
// with a single pass compression of all records.
if b.opts.AppendStrategy == AppendOrdered {
return
}
if b.recordsSize >= b.opts.BufferSize {
b.flushRecords(zstd.SpeedFastest)
}
}
func recordSize(record Record) int {
var size int
size++ // One byte per stream ID (for uvarint).
size += 8 // Eight bytes for timestamp.
record.Metadata.Range(func(metadata labels.Label) {
size += len(metadata.Value)
})
size += len(record.Line)
return size
}
func (b *Builder) flushRecords(encLevel zstd.EncoderLevel) {
if len(b.records) == 0 {
return
}
// We can panic in case flushRecords is called multiple times before flushing a section
// when using the [AppendOrdered] strategy, because that should not happen and is
// considered a programming error.
if b.opts.AppendStrategy == AppendOrdered && len(b.stripes) > 0 {
panic("must not call flushRecords multiple times for a single section when using AppendOrdered strategy")
}
// Our stripes are intermediate tables that don't need to have the best
// compression. To maintain high throughput on appends, we use the fastest
// compression for a stripe. Better compression is then used for sections.
compressionOpts := &dataset.CompressionOptions{
Zstd: []zstd.EOption{zstd.WithEncoderLevel(encLevel)},
}
stripe := buildTable(&b.stripeBuffer, b.opts.PageSizeHint, b.opts.PageMaxRowCount, compressionOpts, b.records, b.opts.SortOrder)
b.stripes = append(b.stripes, stripe)
b.stripesUncompressedSize += stripe.UncompressedSize()
b.stripesCompressedSize += stripe.CompressedSize()
b.records = sliceclear.Clear(b.records)
b.recordsSize = 0
}
func (b *Builder) flushSection() *table {
if len(b.stripes) == 0 {
return nil
}
compressionOpts := &dataset.CompressionOptions{
Zstd: []zstd.EOption{zstd.WithEncoderLevel(zstd.SpeedDefault)},
}
section, err := mergeTablesIncremental(&b.sectionBuffer, b.opts.PageSizeHint, b.opts.PageMaxRowCount, compressionOpts, b.stripes, b.opts.StripeMergeLimit, b.opts.SortOrder)
if err != nil {
// We control the input to mergeTables, so this should never happen.
panic(fmt.Sprintf("merging tables: %v", err))
}
b.stripes = sliceclear.Clear(b.stripes)
b.stripesCompressedSize = 0
b.stripesUncompressedSize = 0
return section
}
func (b *Builder) flushSectionOrdered() *table {
b.flushRecords(zstd.SpeedDefault)
if len(b.stripes) == 0 {
return nil
}
section := b.stripes[0]
b.stripes = sliceclear.Clear(b.stripes)
b.stripesCompressedSize = 0
b.stripesUncompressedSize = 0
return section
}
// UncompressedSize returns the current uncompressed size of the logs section
// in bytes.
func (b *Builder) UncompressedSize() int {
var size int
size += b.recordsSize
size += b.stripesUncompressedSize
return size
}
// EstimatedSize returns the estimated size of the Logs section in bytes.
func (b *Builder) EstimatedSize() int {
var size int
size += b.recordsSize
size += b.stripesCompressedSize
return size
}
// Flush flushes b to the provided writer.
//
// After successful encoding, the b is reset and can be reused.
func (b *Builder) Flush(w dataobj.SectionWriter) (n int64, err error) {
timer := prometheus.NewTimer(b.metrics.encodeSeconds)
defer timer.ObserveDuration()
var section *table
if b.opts.AppendStrategy == AppendOrdered {
// Flush buffered data all at once
section = b.flushSectionOrdered()
} else {
// Flush any remaining buffered data.
b.flushRecords(zstd.SpeedFastest)
section = b.flushSection()
}
if section == nil {
return 0, nil
}
// TODO(rfratto): handle an individual section having oversized metadata.
// This can happen when the number of columns is very wide, due to a lot of
// metadata columns.
//
// As the caller likely creates many smaller logs sections, the best solution
// for this is to aggregate the lowest cardinality columns into a combined
// column. This will reduce the number of columns in the section and thus the
// metadata size.
var logsEnc columnar.Encoder
if err := b.encodeSection(&logsEnc, section); err != nil {
return 0, fmt.Errorf("encoding section: %w", err)
}
// The first two columns of each row are *always* stream ID and timestamp.
// TODO(ashwanth): Find a safer way to do this. Same as [CompareRows]
logsEnc.SetSortInfo(sortInfo(b.opts.SortOrder))
logsEnc.SetTenant(b.tenant)
n, err = logsEnc.Flush(w)
if err == nil {
b.Reset()
}
return n, err
}
func (b *Builder) encodeSection(enc *columnar.Encoder, section *table) error {
{
errs := make([]error, 0, len(section.Metadatas)+3)
errs = append(errs, encodeColumn(enc, ColumnTypeStreamID, section.StreamID))
errs = append(errs, encodeColumn(enc, ColumnTypeTimestamp, section.Timestamp))
for _, md := range section.Metadatas {
errs = append(errs, encodeColumn(enc, ColumnTypeMetadata, md))
}
errs = append(errs, encodeColumn(enc, ColumnTypeMessage, section.Message))
if err := errors.Join(errs...); err != nil {
return fmt.Errorf("encoding columns: %w", err)
}
}
return nil
}
func sortInfo(sort SortOrder) *datasetmd_v2.SortInfo {
switch sort {
case SortStreamASC:
return &datasetmd_v2.SortInfo{
ColumnSorts: []*datasetmd_v2.SortInfo_ColumnSort{
{ColumnIndex: 0, Direction: datasetmd_v2.SORT_DIRECTION_ASCENDING}, // StreamID ASC
{ColumnIndex: 1, Direction: datasetmd_v2.SORT_DIRECTION_DESCENDING}, // Timestamp DESC
},
}
case SortTimestampDESC:
return &datasetmd_v2.SortInfo{
ColumnSorts: []*datasetmd_v2.SortInfo_ColumnSort{
{ColumnIndex: 1, Direction: datasetmd_v2.SORT_DIRECTION_DESCENDING}, // Timestamp DESC
{ColumnIndex: 0, Direction: datasetmd_v2.SORT_DIRECTION_ASCENDING}, // StreamID ASC
},
}
default:
panic("invalid sort order")
}
}
func encodeColumn(enc *columnar.Encoder, columnType ColumnType, column *tableColumn) error {
columnEnc, err := enc.OpenColumn(column.ColumnDesc())
if err != nil {
return fmt.Errorf("opening %s column encoder: %w", columnType, err)
}
defer func() {
// Discard on defer for safety. This will return an error if we
// successfully committed.
_ = columnEnc.Discard()
}()
if len(column.Pages) == 0 {
// Column has no data; discard.
return nil
}
// Our column is in memory, so we don't need a "real" context in the calls
// below.
for result := range column.ListPages(context.Background()) {
page, err := result.Value()
if err != nil {
return fmt.Errorf("getting %s page: %w", columnType, err)
}
data, err := page.ReadPage(context.Background())
if err != nil {
return fmt.Errorf("reading %s page: %w", columnType, err)
}
memPage := &dataset.MemPage{
Desc: *page.PageDesc(),
Data: data,
}
if err := columnEnc.AppendPage(memPage); err != nil {
return fmt.Errorf("appending %s page: %w", columnType, err)
}
}
return columnEnc.Commit()
}
// Reset resets all state, allowing b to be reused.
func (b *Builder) Reset() {
b.metrics.recordCount.Set(0)
b.tenant = ""
b.records = sliceclear.Clear(b.records)
b.recordsSize = 0
b.stripes = sliceclear.Clear(b.stripes)
b.stripeBuffer.Reset()
b.stripesCompressedSize = 0
b.stripesUncompressedSize = 0
b.sectionBuffer.Reset()
}