mirror of https://github.com/grafana/loki
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
386 lines
11 KiB
386 lines
11 KiB
package logs
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/klauspost/compress/zstd"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/prometheus/prometheus/model/labels"
|
|
|
|
"github.com/grafana/loki/v3/pkg/dataobj"
|
|
"github.com/grafana/loki/v3/pkg/dataobj/internal/dataset"
|
|
datasetmd_v2 "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
|
|
"github.com/grafana/loki/v3/pkg/dataobj/internal/util/sliceclear"
|
|
"github.com/grafana/loki/v3/pkg/dataobj/sections/internal/columnar"
|
|
)
|
|
|
|
// A Record is an individual log record within the logs section.
|
|
type Record struct {
|
|
StreamID int64
|
|
Timestamp time.Time
|
|
Metadata labels.Labels
|
|
Line []byte
|
|
}
|
|
|
|
type AppendStrategy int
|
|
|
|
const (
|
|
AppendUnordered = iota
|
|
AppendOrdered
|
|
)
|
|
|
|
type SortOrder int
|
|
|
|
const (
|
|
SortStreamASC SortOrder = iota
|
|
SortTimestampDESC
|
|
)
|
|
|
|
// BuilderOptions configures the behavior of the logs section.
|
|
type BuilderOptions struct {
|
|
// PageSizeHint is the size of pages to use when encoding the logs section.
|
|
PageSizeHint int
|
|
|
|
// PageMaxRowCount is the maximum amount of rows of pages to use when encoding the logs section.
|
|
PageMaxRowCount int
|
|
|
|
// BufferSize is the size of the buffer to use when accumulating log records.
|
|
BufferSize int
|
|
|
|
// StripeMergeLimit is the maximum number of stripes to merge at once when
|
|
// flushing stripes into a section. StripeMergeLimit must be larger than 1.
|
|
//
|
|
// Lower values of StripeMergeLimit reduce the memory overhead of merging but
|
|
// increase time spent merging. Higher values of StripeMergeLimit increase
|
|
// memory overhead but reduce time spent merging.
|
|
StripeMergeLimit int
|
|
|
|
// AppendStrategy is allowed to control how the builder creates the section.
|
|
// When appending logs to the section in strict sort order, the [AppendOrdered] can be used to avoid
|
|
// creating and sorting of stripes.
|
|
AppendStrategy AppendStrategy
|
|
|
|
// SortOrder defines the order in which the rows of the logs sections are sorted.
|
|
// They can either be sorted by [streamID ASC, timestamp DESC] ([SortStreamASC]) or [timestamp DESC, streamID ASC] ([SortTimestampDESC]).
|
|
SortOrder SortOrder
|
|
}
|
|
|
|
// Builder accumulate a set of [Record]s within a data object.
|
|
type Builder struct {
|
|
metrics *Metrics
|
|
opts BuilderOptions
|
|
|
|
// The optional tenant that owns the builder. If specified, the section
|
|
// must only contain logs owned by the tenant, and no other tenants.
|
|
tenant string
|
|
|
|
// Sorting the entire set of logs is very expensive, so we need to break it
|
|
// up into smaller pieces:
|
|
//
|
|
// 1. Records are accumulated in memory up to BufferSize; the current size is
|
|
// tracked by recordsSize.
|
|
//
|
|
// 2. Once the buffer is full, records are sorted and flushed to smaller
|
|
// [table]s called stripes.
|
|
//
|
|
// 3. Once the set of stripes reaches SectionSize, they are merged together
|
|
// into a final table that will be encoded as a single section.
|
|
//
|
|
// At the end of this process, there will be a set of sections that are
|
|
// encoded separately.
|
|
|
|
records []Record // Buffered records to flush to a group.
|
|
recordsSize int // Byte size of all buffered records (uncompressed).
|
|
|
|
stripes []*table // In-progress section; flushed with [mergeTables] into a single table.
|
|
stripeBuffer tableBuffer
|
|
stripesUncompressedSize int // Estimated byte size of all elements in stripes (uncompressed).
|
|
stripesCompressedSize int // Estimated byte size of all elements in stripes (compressed).
|
|
|
|
sectionBuffer tableBuffer
|
|
}
|
|
|
|
// Nwe creates a new logs section. The pageSize argument specifies how large
|
|
// pages should be.
|
|
func NewBuilder(metrics *Metrics, opts BuilderOptions) *Builder {
|
|
if metrics == nil {
|
|
metrics = NewMetrics()
|
|
}
|
|
|
|
return &Builder{
|
|
metrics: metrics,
|
|
opts: opts,
|
|
}
|
|
}
|
|
|
|
// Tenant returns the optional tenant that owns the builder.
|
|
func (b *Builder) Tenant() string { return b.tenant }
|
|
|
|
// SetTenant sets the tenant that owns the builder. A builder can be made
|
|
// multi-tenant by passing an empty string.
|
|
func (b *Builder) SetTenant(tenant string) { b.tenant = tenant }
|
|
|
|
// Type returns the [dataobj.SectionType] of the logs builder.
|
|
func (b *Builder) Type() dataobj.SectionType { return sectionType }
|
|
|
|
// Append adds a new entry to b.
|
|
func (b *Builder) Append(entry Record) {
|
|
b.metrics.appendsTotal.Inc()
|
|
b.metrics.recordCount.Inc()
|
|
|
|
b.records = append(b.records, entry)
|
|
b.recordsSize += recordSize(entry)
|
|
|
|
// Shortcut for when logs are appending in strict sort order.
|
|
// We skip building temporarily compressed stripes in favour of a speed
|
|
// with a single pass compression of all records.
|
|
if b.opts.AppendStrategy == AppendOrdered {
|
|
return
|
|
}
|
|
|
|
if b.recordsSize >= b.opts.BufferSize {
|
|
b.flushRecords(zstd.SpeedFastest)
|
|
}
|
|
}
|
|
|
|
func recordSize(record Record) int {
|
|
var size int
|
|
|
|
size++ // One byte per stream ID (for uvarint).
|
|
size += 8 // Eight bytes for timestamp.
|
|
record.Metadata.Range(func(metadata labels.Label) {
|
|
size += len(metadata.Value)
|
|
})
|
|
size += len(record.Line)
|
|
|
|
return size
|
|
}
|
|
|
|
func (b *Builder) flushRecords(encLevel zstd.EncoderLevel) {
|
|
if len(b.records) == 0 {
|
|
return
|
|
}
|
|
|
|
// We can panic in case flushRecords is called multiple times before flushing a section
|
|
// when using the [AppendOrdered] strategy, because that should not happen and is
|
|
// considered a programming error.
|
|
if b.opts.AppendStrategy == AppendOrdered && len(b.stripes) > 0 {
|
|
panic("must not call flushRecords multiple times for a single section when using AppendOrdered strategy")
|
|
}
|
|
|
|
// Our stripes are intermediate tables that don't need to have the best
|
|
// compression. To maintain high throughput on appends, we use the fastest
|
|
// compression for a stripe. Better compression is then used for sections.
|
|
compressionOpts := &dataset.CompressionOptions{
|
|
Zstd: []zstd.EOption{zstd.WithEncoderLevel(encLevel)},
|
|
}
|
|
|
|
stripe := buildTable(&b.stripeBuffer, b.opts.PageSizeHint, b.opts.PageMaxRowCount, compressionOpts, b.records, b.opts.SortOrder)
|
|
b.stripes = append(b.stripes, stripe)
|
|
b.stripesUncompressedSize += stripe.UncompressedSize()
|
|
b.stripesCompressedSize += stripe.CompressedSize()
|
|
|
|
b.records = sliceclear.Clear(b.records)
|
|
b.recordsSize = 0
|
|
}
|
|
|
|
func (b *Builder) flushSection() *table {
|
|
if len(b.stripes) == 0 {
|
|
return nil
|
|
}
|
|
|
|
compressionOpts := &dataset.CompressionOptions{
|
|
Zstd: []zstd.EOption{zstd.WithEncoderLevel(zstd.SpeedDefault)},
|
|
}
|
|
|
|
section, err := mergeTablesIncremental(&b.sectionBuffer, b.opts.PageSizeHint, b.opts.PageMaxRowCount, compressionOpts, b.stripes, b.opts.StripeMergeLimit, b.opts.SortOrder)
|
|
if err != nil {
|
|
// We control the input to mergeTables, so this should never happen.
|
|
panic(fmt.Sprintf("merging tables: %v", err))
|
|
}
|
|
|
|
b.stripes = sliceclear.Clear(b.stripes)
|
|
b.stripesCompressedSize = 0
|
|
b.stripesUncompressedSize = 0
|
|
return section
|
|
}
|
|
|
|
func (b *Builder) flushSectionOrdered() *table {
|
|
b.flushRecords(zstd.SpeedDefault)
|
|
|
|
if len(b.stripes) == 0 {
|
|
return nil
|
|
}
|
|
|
|
section := b.stripes[0]
|
|
b.stripes = sliceclear.Clear(b.stripes)
|
|
b.stripesCompressedSize = 0
|
|
b.stripesUncompressedSize = 0
|
|
return section
|
|
}
|
|
|
|
// UncompressedSize returns the current uncompressed size of the logs section
|
|
// in bytes.
|
|
func (b *Builder) UncompressedSize() int {
|
|
var size int
|
|
|
|
size += b.recordsSize
|
|
size += b.stripesUncompressedSize
|
|
|
|
return size
|
|
}
|
|
|
|
// EstimatedSize returns the estimated size of the Logs section in bytes.
|
|
func (b *Builder) EstimatedSize() int {
|
|
var size int
|
|
|
|
size += b.recordsSize
|
|
size += b.stripesCompressedSize
|
|
|
|
return size
|
|
}
|
|
|
|
// Flush flushes b to the provided writer.
|
|
//
|
|
// After successful encoding, the b is reset and can be reused.
|
|
func (b *Builder) Flush(w dataobj.SectionWriter) (n int64, err error) {
|
|
timer := prometheus.NewTimer(b.metrics.encodeSeconds)
|
|
defer timer.ObserveDuration()
|
|
|
|
var section *table
|
|
if b.opts.AppendStrategy == AppendOrdered {
|
|
// Flush buffered data all at once
|
|
section = b.flushSectionOrdered()
|
|
} else {
|
|
// Flush any remaining buffered data.
|
|
b.flushRecords(zstd.SpeedFastest)
|
|
section = b.flushSection()
|
|
}
|
|
|
|
if section == nil {
|
|
return 0, nil
|
|
}
|
|
|
|
// TODO(rfratto): handle an individual section having oversized metadata.
|
|
// This can happen when the number of columns is very wide, due to a lot of
|
|
// metadata columns.
|
|
//
|
|
// As the caller likely creates many smaller logs sections, the best solution
|
|
// for this is to aggregate the lowest cardinality columns into a combined
|
|
// column. This will reduce the number of columns in the section and thus the
|
|
// metadata size.
|
|
|
|
var logsEnc columnar.Encoder
|
|
if err := b.encodeSection(&logsEnc, section); err != nil {
|
|
return 0, fmt.Errorf("encoding section: %w", err)
|
|
}
|
|
|
|
// The first two columns of each row are *always* stream ID and timestamp.
|
|
// TODO(ashwanth): Find a safer way to do this. Same as [CompareRows]
|
|
logsEnc.SetSortInfo(sortInfo(b.opts.SortOrder))
|
|
logsEnc.SetTenant(b.tenant)
|
|
|
|
n, err = logsEnc.Flush(w)
|
|
if err == nil {
|
|
b.Reset()
|
|
}
|
|
return n, err
|
|
}
|
|
|
|
func (b *Builder) encodeSection(enc *columnar.Encoder, section *table) error {
|
|
{
|
|
errs := make([]error, 0, len(section.Metadatas)+3)
|
|
errs = append(errs, encodeColumn(enc, ColumnTypeStreamID, section.StreamID))
|
|
errs = append(errs, encodeColumn(enc, ColumnTypeTimestamp, section.Timestamp))
|
|
for _, md := range section.Metadatas {
|
|
errs = append(errs, encodeColumn(enc, ColumnTypeMetadata, md))
|
|
}
|
|
errs = append(errs, encodeColumn(enc, ColumnTypeMessage, section.Message))
|
|
if err := errors.Join(errs...); err != nil {
|
|
return fmt.Errorf("encoding columns: %w", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func sortInfo(sort SortOrder) *datasetmd_v2.SortInfo {
|
|
switch sort {
|
|
case SortStreamASC:
|
|
return &datasetmd_v2.SortInfo{
|
|
ColumnSorts: []*datasetmd_v2.SortInfo_ColumnSort{
|
|
{ColumnIndex: 0, Direction: datasetmd_v2.SORT_DIRECTION_ASCENDING}, // StreamID ASC
|
|
{ColumnIndex: 1, Direction: datasetmd_v2.SORT_DIRECTION_DESCENDING}, // Timestamp DESC
|
|
},
|
|
}
|
|
case SortTimestampDESC:
|
|
return &datasetmd_v2.SortInfo{
|
|
ColumnSorts: []*datasetmd_v2.SortInfo_ColumnSort{
|
|
{ColumnIndex: 1, Direction: datasetmd_v2.SORT_DIRECTION_DESCENDING}, // Timestamp DESC
|
|
{ColumnIndex: 0, Direction: datasetmd_v2.SORT_DIRECTION_ASCENDING}, // StreamID ASC
|
|
},
|
|
}
|
|
default:
|
|
panic("invalid sort order")
|
|
}
|
|
}
|
|
|
|
func encodeColumn(enc *columnar.Encoder, columnType ColumnType, column *tableColumn) error {
|
|
columnEnc, err := enc.OpenColumn(column.ColumnDesc())
|
|
if err != nil {
|
|
return fmt.Errorf("opening %s column encoder: %w", columnType, err)
|
|
}
|
|
defer func() {
|
|
// Discard on defer for safety. This will return an error if we
|
|
// successfully committed.
|
|
_ = columnEnc.Discard()
|
|
}()
|
|
if len(column.Pages) == 0 {
|
|
// Column has no data; discard.
|
|
return nil
|
|
}
|
|
|
|
// Our column is in memory, so we don't need a "real" context in the calls
|
|
// below.
|
|
for result := range column.ListPages(context.Background()) {
|
|
page, err := result.Value()
|
|
if err != nil {
|
|
return fmt.Errorf("getting %s page: %w", columnType, err)
|
|
}
|
|
|
|
data, err := page.ReadPage(context.Background())
|
|
if err != nil {
|
|
return fmt.Errorf("reading %s page: %w", columnType, err)
|
|
}
|
|
|
|
memPage := &dataset.MemPage{
|
|
Desc: *page.PageDesc(),
|
|
Data: data,
|
|
}
|
|
if err := columnEnc.AppendPage(memPage); err != nil {
|
|
return fmt.Errorf("appending %s page: %w", columnType, err)
|
|
}
|
|
}
|
|
|
|
return columnEnc.Commit()
|
|
}
|
|
|
|
// Reset resets all state, allowing b to be reused.
|
|
func (b *Builder) Reset() {
|
|
b.metrics.recordCount.Set(0)
|
|
|
|
b.tenant = ""
|
|
|
|
b.records = sliceclear.Clear(b.records)
|
|
b.recordsSize = 0
|
|
|
|
b.stripes = sliceclear.Clear(b.stripes)
|
|
b.stripeBuffer.Reset()
|
|
b.stripesCompressedSize = 0
|
|
b.stripesUncompressedSize = 0
|
|
|
|
b.sectionBuffer.Reset()
|
|
}
|
|
|