chore(dataobj): Reintroduce sorting of the logs section (#15906)

pull/15922/head
Robert Fratto 12 months ago committed by GitHub
parent 5e4df21e67
commit 948f5c5f3e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 27
      pkg/dataobj/dataobj.go
  2. 37
      pkg/dataobj/dataobj_test.go
  3. 12
      pkg/dataobj/internal/dataset/column_builder.go
  4. 5
      pkg/dataobj/internal/dataset/dataset_iter.go
  5. 98
      pkg/dataobj/internal/dataset/page.go
  6. 12
      pkg/dataobj/internal/dataset/page_builder.go
  7. 10
      pkg/dataobj/internal/dataset/page_compress_writer.go
  8. 51
      pkg/dataobj/internal/dataset/page_test.go
  9. 9
      pkg/dataobj/internal/encoding/decoder_metadata.go
  10. 20
      pkg/dataobj/internal/encoding/encoder.go
  11. 313
      pkg/dataobj/internal/sections/logs/logs.go
  12. 10
      pkg/dataobj/internal/sections/logs/logs_test.go
  13. 312
      pkg/dataobj/internal/sections/logs/table.go
  14. 54
      pkg/dataobj/internal/sections/logs/table_build.go
  15. 151
      pkg/dataobj/internal/sections/logs/table_merge.go
  16. 81
      pkg/dataobj/internal/sections/logs/table_test.go
  17. 3
      pkg/dataobj/internal/sections/streams/streams.go
  18. 72
      pkg/dataobj/internal/util/bufpool/bucket.go
  19. 41
      pkg/dataobj/internal/util/bufpool/bufpool.go
  20. 36
      pkg/dataobj/internal/util/bufpool/bufpool_test.go
  21. 11
      pkg/dataobj/internal/util/sliceclear/sliceclear.go
  22. 28
      pkg/dataobj/internal/util/sliceclear/sliceclear_test.go

@ -47,16 +47,29 @@ type BuilderConfig struct {
// TargetObjectSize configures a target size for data objects.
TargetObjectSize flagext.Bytes `yaml:"target_object_size"`
// TargetSectionSize configures the maximum size of data in a section. Sections
// which support this parameter will place overflow data into new sections of
// the same type.
TargetSectionSize flagext.Bytes `yaml:"target_section_size"`
// BufferSize configures the size of the buffer used to accumulate
// uncompressed logs in memory prior to sorting.
BufferSize flagext.Bytes `yaml:"buffer_size"`
}
// RegisterFlagsWithPrefix registers flags with the given prefix.
func (cfg *BuilderConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
_ = cfg.TargetPageSize.Set("2MB")
_ = cfg.TargetObjectSize.Set("1GB")
_ = cfg.BufferSize.Set("16MB") // Page Size * 8
_ = cfg.TargetSectionSize.Set("128MB") // Target Object Size / 8
f.IntVar(&cfg.SHAPrefixSize, prefix+"sha-prefix-size", 2, "The size of the SHA prefix to use for the data object builder.")
f.Var(&cfg.TargetPageSize, prefix+"target-page-size", "The size of the target page to use for the data object builder.")
f.Var(&cfg.TargetObjectSize, prefix+"target-object-size", "The size of the target object to use for the data object builder.")
f.Var(&cfg.TargetSectionSize, prefix+"target-section-size", "Configures a maximum size for sections, for sections that support it.")
f.Var(&cfg.BufferSize, prefix+"buffer-size", "The size of the buffer to use for sorting logs.")
}
// Validate validates the BuilderConfig.
@ -77,6 +90,14 @@ func (cfg *BuilderConfig) Validate() error {
errs = append(errs, errors.New("TargetObjectSize must be greater than 0"))
}
if cfg.BufferSize <= 0 {
errs = append(errs, errors.New("BufferSize must be greater than 0"))
}
if cfg.TargetSectionSize <= 0 || cfg.TargetSectionSize > cfg.TargetObjectSize {
errs = append(errs, errors.New("SectionSize must be greater than 0 and less than or equal to TargetObjectSize"))
}
return errors.Join(errs...)
}
@ -148,7 +169,11 @@ func NewBuilder(cfg BuilderConfig, bucket objstore.Bucket, tenantID string) (*Bu
labelCache: labelCache,
streams: streams.New(metrics.streams, int(cfg.TargetPageSize)),
logs: logs.New(metrics.logs, int(cfg.TargetPageSize)),
logs: logs.New(metrics.logs, logs.Options{
PageSizeHint: int(cfg.TargetPageSize),
BufferSize: int(cfg.BufferSize),
SectionSize: int(cfg.TargetSectionSize),
}),
flushBuffer: flushBuffer,
encoder: encoder,

@ -19,6 +19,16 @@ import (
"github.com/grafana/loki/v3/pkg/logql/syntax"
)
var testBuilderConfig = BuilderConfig{
SHAPrefixSize: 2,
TargetPageSize: 2048,
TargetObjectSize: 4096,
TargetSectionSize: 4096,
BufferSize: 2048 * 8,
}
func Test(t *testing.T) {
bucket := objstore.NewInMemBucket()
@ -67,16 +77,7 @@ func Test(t *testing.T) {
}
t.Run("Build", func(t *testing.T) {
// Create a tiny builder which flushes a lot of objects and pages to properly
// test the builder.
builderConfig := BuilderConfig{
SHAPrefixSize: 2,
TargetPageSize: 1_500_000,
TargetObjectSize: 10_000_000,
}
builder, err := NewBuilder(builderConfig, bucket, "fake")
builder, err := NewBuilder(testBuilderConfig, bucket, "fake")
require.NoError(t, err)
for _, entry := range streams {
@ -94,10 +95,7 @@ func Test(t *testing.T) {
actual, err := result.Collect(reader.Streams(context.Background(), objects[0]))
require.NoError(t, err)
// TODO(rfratto): reenable once sorting is reintroduced.
_ = actual
// require.Equal(t, sortStreams(t, streams), actual)
require.Equal(t, sortStreams(t, streams), actual)
})
}
@ -109,16 +107,7 @@ func Test_Builder_Append(t *testing.T) {
bucket := objstore.NewInMemBucket()
// Create a tiny builder which flushes a lot of objects and pages to properly
// test the builder.
builderConfig := BuilderConfig{
SHAPrefixSize: 2,
TargetPageSize: 2048,
TargetObjectSize: 4096,
}
builder, err := NewBuilder(builderConfig, bucket, "fake")
builder, err := NewBuilder(testBuilderConfig, bucket, "fake")
require.NoError(t, err)
for {

@ -3,6 +3,8 @@ package dataset
import (
"fmt"
"github.com/klauspost/compress/zstd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
)
@ -21,6 +23,16 @@ type BuilderOptions struct {
// Compression is the compression algorithm to use for values.
Compression datasetmd.CompressionType
// CompressionOptions holds optional configuration for compression.
CompressionOptions CompressionOptions
}
// CompressionOptions customizes the compressor used when building pages.
type CompressionOptions struct {
// Zstd holds encoding options for Zstd compression. Only used for
// [datasetmd.COMPRESSION_TYPE_ZSTD].
Zstd []zstd.EOption
}
// A ColumnBuilder builds a sequence of [Value] entries of a common type into a

@ -33,7 +33,6 @@ func Iter(ctx context.Context, columns []Column) result.Seq[Row] {
type pullColumnIter struct {
Next func() (result.Result[Value], bool)
Stop func()
}
return result.Iter(func(yield func(Row) bool) error {
@ -47,7 +46,9 @@ func Iter(ctx context.Context, columns []Column) result.Seq[Row] {
}
next, stop := result.Pull(lazyColumnIter(ctx, col.ColumnInfo(), pages))
pullColumns = append(pullColumns, pullColumnIter{Next: next, Stop: stop})
defer stop()
pullColumns = append(pullColumns, pullColumnIter{Next: next})
}
// Start emitting rows; each row is composed of the next value from all of

@ -7,11 +7,13 @@ import (
"fmt"
"hash/crc32"
"io"
"sync"
"github.com/golang/snappy"
"github.com/klauspost/compress/zstd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/util/bufpool"
)
// Helper types.
@ -88,39 +90,99 @@ func (p *MemPage) reader(compression datasetmd.CompressionType) (presence io.Rea
}
var (
bitmapReader = bytes.NewReader(p.Data[n : n+int(bitmapSize)])
compressedDataReader = bytes.NewReader(p.Data[n+int(bitmapSize):])
bitmapData = p.Data[n : n+int(bitmapSize)]
compressedValuesData = p.Data[n+int(bitmapSize):]
bitmapReader = bytes.NewReader(bitmapData)
compressedValuesReader = bytes.NewReader(compressedValuesData)
)
switch compression {
case datasetmd.COMPRESSION_TYPE_UNSPECIFIED, datasetmd.COMPRESSION_TYPE_NONE:
return bitmapReader, io.NopCloser(compressedDataReader), nil
return bitmapReader, io.NopCloser(compressedValuesReader), nil
case datasetmd.COMPRESSION_TYPE_SNAPPY:
sr := snappy.NewReader(compressedDataReader)
return bitmapReader, io.NopCloser(sr), nil
sr := snappyPool.Get().(*snappy.Reader)
sr.Reset(compressedValuesReader)
return bitmapReader, &closerFunc{Reader: sr, onClose: func() error {
snappyPool.Put(sr)
return nil
}}, nil
case datasetmd.COMPRESSION_TYPE_ZSTD:
zr, err := zstd.NewReader(compressedDataReader)
if err != nil {
return nil, nil, fmt.Errorf("opening zstd reader: %w", err)
}
return bitmapReader, newZstdReader(zr), nil
zr := &fixedZstdReader{page: p, data: compressedValuesData}
return bitmapReader, zr, nil
}
panic(fmt.Sprintf("dataset.MemPage.reader: unknown compression type %q", compression.String()))
}
// zstdReader implements [io.ReadCloser] for a [zstd.Decoder].
type zstdReader struct{ *zstd.Decoder }
var snappyPool = sync.Pool{
New: func() interface{} {
return snappy.NewReader(nil)
},
}
type closerFunc struct {
io.Reader
onClose func() error
}
func (c *closerFunc) Close() error { return c.onClose() }
// newZstdReader returns a new [io.ReadCloser] for a [zstd.Decoder].
func newZstdReader(dec *zstd.Decoder) io.ReadCloser {
return &zstdReader{Decoder: dec}
// globalZstdDecoder is a shared zstd decoder for [fixedZstdReader]. Concurrent
// uses of globalZstdDecoder are only safe when using [zstd.Decoder.DecodeAll].
var globalZstdDecoder = func() *zstd.Decoder {
d, err := zstd.NewReader(nil, zstd.WithDecoderConcurrency(1))
if err != nil {
panic(err)
}
return d
}()
// fixedZstdReader is an [io.ReadCloser] that decompresses a zstd buffer in a
// single pass.
type fixedZstdReader struct {
page *MemPage
data []byte
uncompressedBuf *bytes.Buffer
closed bool
}
// Close implements [io.Closer].
func (r *zstdReader) Close() error {
r.Decoder.Close()
func (r *fixedZstdReader) Read(p []byte) (int, error) {
if r.closed {
return 0, io.ErrClosedPipe
}
if r.uncompressedBuf != nil {
return r.uncompressedBuf.Read(p)
}
// We decompress the entire buffer in a single pass. While a pooled zstd
// reader would require less memory and would allow us to stream values as we
// decompress, pooling zstd decoders is difficult to do properly, as it
// requires a finalizer to release resources, and the goroutines spawned by
// decoders prevent the finalizer from ever being called.
//
// To make efficient zstd decoding less error prone, we opt for this instead.
r.uncompressedBuf = bufpool.Get(r.page.Info.UncompressedSize)
r.uncompressedBuf.Reset()
buf, err := globalZstdDecoder.DecodeAll(r.data, r.uncompressedBuf.AvailableBuffer())
if err != nil {
return 0, fmt.Errorf("decoding zstd: %w", err)
}
_, _ = r.uncompressedBuf.Write(buf)
return r.uncompressedBuf.Read(p)
}
func (r *fixedZstdReader) Close() error {
if r.uncompressedBuf != nil {
bufpool.Put(r.uncompressedBuf)
r.uncompressedBuf = nil
}
r.closed = true
return nil
}

@ -56,7 +56,7 @@ func newPageBuilder(opts BuilderOptions) (*pageBuilder, error) {
presenceBuffer = bytes.NewBuffer(nil)
valuesBuffer = bytes.NewBuffer(make([]byte, 0, opts.PageSizeHint))
valuesWriter = newCompressWriter(valuesBuffer, opts.Compression)
valuesWriter = newCompressWriter(valuesBuffer, opts.Compression, opts.CompressionOptions)
)
presenceEnc := newBitmapEncoder(presenceBuffer)
@ -174,12 +174,18 @@ func (b *pageBuilder) Flush() (*MemPage, error) {
return nil, fmt.Errorf("no data to flush")
}
// Before we can build the page we need to finish flushing our encoders and writers.
// Before we can build the page we need to finish flushing our encoders and
// writers.
//
// We must call [compressWriter.Close] to ensure that Zstd writers write a
// proper EOF marker, otherwise synchronous decoding can't be used.
// compressWriters can continue to reset and reused after closing, so this is
// safe.
if err := b.presenceEnc.Flush(); err != nil {
return nil, fmt.Errorf("flushing presence encoder: %w", err)
} else if err := b.valuesEnc.Flush(); err != nil {
return nil, fmt.Errorf("flushing values encoder: %w", err)
} else if err := b.valuesWriter.Flush(); err != nil {
} else if err := b.valuesWriter.Close(); err != nil {
return nil, fmt.Errorf("flushing values writer: %w", err)
}

@ -20,14 +20,16 @@ type compressWriter struct {
w io.WriteCloser // Compressing writer.
buf *bufio.Writer // Buffered writer in front of w to be able to call WriteByte.
rawBytes int // Number of uncompressed bytes written.
compression datasetmd.CompressionType // Compression type being used.
rawBytes int // Number of uncompressed bytes written.
opts CompressionOptions // Options to customize compression.
}
var _ streamio.Writer = (*compressWriter)(nil)
func newCompressWriter(w io.Writer, ty datasetmd.CompressionType) *compressWriter {
c := compressWriter{compression: ty}
func newCompressWriter(w io.Writer, ty datasetmd.CompressionType, opts CompressionOptions) *compressWriter {
c := compressWriter{compression: ty, opts: opts}
c.Reset(w)
return &c
}
@ -85,7 +87,7 @@ func (c *compressWriter) Reset(w io.Writer) {
compressedWriter = snappy.NewBufferedWriter(w)
case datasetmd.COMPRESSION_TYPE_ZSTD:
zw, err := zstd.NewWriter(w, zstd.WithEncoderLevel(zstd.SpeedBestCompression))
zw, err := zstd.NewWriter(w, c.opts.Zstd...)
if err != nil {
panic(fmt.Sprintf("compressWriter.Reset: creating zstd writer: %v", err))
}

@ -1,6 +1,7 @@
package dataset
import (
"io"
"math/rand"
"testing"
"time"
@ -10,7 +11,7 @@ import (
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
)
func Test_pageBuilder_WriteRead(t *testing.T) {
func Benchmark_pageBuilder_WriteRead(b *testing.B) {
in := []string{
"hello, world!",
"",
@ -30,6 +31,54 @@ func Test_pageBuilder_WriteRead(t *testing.T) {
Compression: datasetmd.COMPRESSION_TYPE_ZSTD,
Encoding: datasetmd.ENCODING_TYPE_PLAIN,
}
builder, err := newPageBuilder(opts)
require.NoError(b, err)
for _, s := range in {
require.True(b, builder.Append(StringValue(s)))
}
page, err := builder.Flush()
require.NoError(b, err)
require.Equal(b, len(in), page.Info.RowCount)
require.Equal(b, len(in)-2, page.Info.ValuesCount) // -2 for the empty strings
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, values, err := page.reader(datasetmd.COMPRESSION_TYPE_ZSTD)
if err != nil {
b.Fatal()
}
if _, err := io.Copy(io.Discard, values); err != nil {
b.Fatal(err)
} else if err := values.Close(); err != nil {
b.Fatal(err)
}
}
}
func Test_pageBuilder_WriteRead(t *testing.T) {
in := []string{
"hello, world!",
"",
"this is a test of the emergency broadcast system",
"this is only a test",
"if this were a real emergency, you would be instructed to panic",
"but it's not, so don't",
"",
"this concludes the test",
"thank you for your cooperation",
"goodbye",
}
opts := BuilderOptions{
PageSizeHint: 1024,
Value: datasetmd.VALUE_TYPE_STRING,
Compression: datasetmd.COMPRESSION_TYPE_SNAPPY,
Encoding: datasetmd.ENCODING_TYPE_PLAIN,
}
b, err := newPageBuilder(opts)
require.NoError(t, err)

@ -1,7 +1,6 @@
package encoding
import (
"bytes"
"encoding/binary"
"fmt"
"io"
@ -12,6 +11,7 @@ import (
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/logsmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/streamsmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/streamio"
"github.com/grafana/loki/v3/pkg/dataobj/internal/util/bufpool"
)
// decode* methods for metadata shared by Decoder implementations.
@ -108,9 +108,10 @@ func decodeProto(r streamio.Reader, pb proto.Message) error {
return fmt.Errorf("read proto message size: %w", err)
}
buf := bytesBufferPool.Get().(*bytes.Buffer)
buf.Reset()
defer bytesBufferPool.Put(buf)
// We know exactly how big of a buffer we need here, so we can get a bucketed
// buffer from bufpool.
buf := bufpool.Get(int(size))
defer bufpool.Put(buf)
n, err := io.Copy(buf, io.LimitReader(r, int64(size)))
if err != nil {

@ -12,6 +12,26 @@ import (
"github.com/grafana/loki/v3/pkg/dataobj/internal/streamio"
)
// TODO(rfratto): the memory footprint of [Encoder] can very slowly grow in
// memory as [bytesBufferPool] is filled with buffers with increasing capacity:
// each encoding pass has a different number of elements, shuffling which
// elements of the hierarchy get which pooled buffers.
//
// This means that elements that require more bytes will grow the capacity of
// the buffer and put the buffer back into the pool. Even if further encoding
// passes don't need that many bytes, the buffer is kept alive with its larger
// footprint. Given enough time, all buffers in the pool will have a large
// capacity.
//
// The bufpool package provides a solution to this (bucketing pools by
// capacity), but using bufpool properly requires knowing how many bytes are
// needed.
//
// Encoder can eventually be moved to the bufpool package by calculating a
// rolling maximum of encoding size used per element across usages of an
// Encoder instance. This would then allow larger buffers to be eventually
// reclaimed regardless of how often encoding is done.
// Encoder encodes a data object. Data objects are hierarchical, split into
// distinct sections that contain their own hierarchy.
//

@ -3,22 +3,20 @@
package logs
import (
"cmp"
"context"
"errors"
"fmt"
"slices"
"time"
"github.com/klauspost/compress/zstd"
"github.com/prometheus/client_golang/prometheus"
"github.com/grafana/loki/pkg/push"
"github.com/grafana/loki/v3/pkg/dataobj/internal/dataset"
"github.com/grafana/loki/v3/pkg/dataobj/internal/encoding"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/logsmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/result"
"github.com/grafana/loki/v3/pkg/dataobj/internal/util/sliceclear"
)
// A Record is an individual log record within the logs section.
@ -29,141 +27,147 @@ type Record struct {
Line string
}
// Options configures the behavior of the logs section.
type Options struct {
// PageSizeHint is the size of pages to use when encoding the logs section.
PageSizeHint int
// BufferSize is the size of the buffer to use when accumulating log records.
BufferSize int
// SectionSizeHint is the size of the section to use when encoding the logs
// section. If the section size is exceeded, multiple sections will be
// created.
SectionSize int
}
// Logs accumulate a set of [Record]s within a data object.
type Logs struct {
metrics *Metrics
rows int
pageSize int
metrics *Metrics
opts Options
// Sorting the entire set of logs is very expensive, so we need to break it
// up into smaller pieces:
//
// 1. Records are accumulated in memory up to BufferSize; the current size is
// tracked by recordsSize.
//
// 2. Once the buffer is full, records are sorted and flushed to smaller
// [table]s called stripes.
//
// 3. Once the set of stripes reaches SectionSize, they are merged together
// into a final table that will be encoded as a single section.
//
// At the end of this process, there will be a set of sections that are
// encoded separately.
streamIDs *dataset.ColumnBuilder
timestamps *dataset.ColumnBuilder
records []Record // Buffered records to flush to a group.
recordsSize int
metadatas []*dataset.ColumnBuilder
metadataLookup map[string]int // map of metadata key to index in metadatas
stripes []*table // In-progress section; flushed with [mergeTables] into a single table.
stripeBuffer tableBuffer
stripesSize int // Estimated byte size of all elements in stripes.
messages *dataset.ColumnBuilder
sections []*table // Completed sections.
sectionBuffer tableBuffer
}
// Nwe creates a new Logs section. The pageSize argument specifies how large
// pages should be.
func New(metrics *Metrics, pageSize int) *Logs {
func New(metrics *Metrics, opts Options) *Logs {
if metrics == nil {
metrics = NewMetrics()
}
// We control the Value/Encoding tuple so creating column builders can't
// fail; if it does, we're left in an unrecoverable state where nothing can
// be encoded properly so we panic.
streamIDs, err := dataset.NewColumnBuilder("", dataset.BuilderOptions{
PageSizeHint: pageSize,
Value: datasetmd.VALUE_TYPE_INT64,
Encoding: datasetmd.ENCODING_TYPE_DELTA,
Compression: datasetmd.COMPRESSION_TYPE_NONE,
})
if err != nil {
panic(fmt.Sprintf("creating stream ID column: %v", err))
return &Logs{
metrics: metrics,
opts: opts,
}
}
timestamps, err := dataset.NewColumnBuilder("", dataset.BuilderOptions{
PageSizeHint: pageSize,
Value: datasetmd.VALUE_TYPE_INT64,
Encoding: datasetmd.ENCODING_TYPE_DELTA,
Compression: datasetmd.COMPRESSION_TYPE_NONE,
})
if err != nil {
panic(fmt.Sprintf("creating timestamp column: %v", err))
}
// Append adds a new entry to the set of Logs.
func (l *Logs) Append(entry Record) {
l.metrics.appendsTotal.Inc()
messages, err := dataset.NewColumnBuilder("", dataset.BuilderOptions{
PageSizeHint: pageSize,
Value: datasetmd.VALUE_TYPE_STRING,
Encoding: datasetmd.ENCODING_TYPE_PLAIN,
Compression: datasetmd.COMPRESSION_TYPE_ZSTD,
})
if err != nil {
panic(fmt.Sprintf("creating message column: %v", err))
}
l.records = append(l.records, entry)
l.recordsSize += recordSize(entry)
return &Logs{
metrics: metrics,
pageSize: pageSize,
if l.recordsSize >= l.opts.BufferSize {
l.flushRecords()
}
streamIDs: streamIDs,
timestamps: timestamps,
l.metrics.recordCount.Inc()
}
metadataLookup: make(map[string]int),
func recordSize(record Record) int {
var size int
messages: messages,
size++ // One byte per stream ID (for uvarint).
size += 8 // Eight bytes for timestamp.
for _, metadata := range record.Metadata {
size += len(metadata.Value)
}
size += len(record.Line)
return size
}
// Append adds a new entry to the set of Logs.
func (l *Logs) Append(entry Record) {
l.metrics.appendsTotal.Inc()
func (l *Logs) flushRecords() {
if len(l.records) == 0 {
return
}
// Sort metadata to ensure consistent encoding. Metadata is sorted by key.
// While keys must be unique, we sort by value if two keys match; this
// ensures that the same value always gets encoded for duplicate keys.
slices.SortFunc(entry.Metadata, func(a, b push.LabelAdapter) int {
if res := cmp.Compare(a.Name, b.Name); res != 0 {
return res
}
return cmp.Compare(a.Value, b.Value)
})
// Our stripes are intermediate tables that don't need to have the best
// compression. To maintain high throughput on appends, we use the fastest
// compression for a stripe. Better compression is then used for sections.
compressionOpts := dataset.CompressionOptions{
Zstd: []zstd.EOption{zstd.WithEncoderLevel(zstd.SpeedFastest)},
}
// We ignore the errors below; they only fail if given out-of-order data
// (where the row number is less than the previous row number), which can't
// ever happen here.
stripe := buildTable(&l.stripeBuffer, l.opts.PageSizeHint, compressionOpts, l.records)
l.stripes = append(l.stripes, stripe)
l.stripesSize += stripe.Size()
_ = l.streamIDs.Append(l.rows, dataset.Int64Value(entry.StreamID))
_ = l.timestamps.Append(l.rows, dataset.Int64Value(entry.Timestamp.UnixNano()))
_ = l.messages.Append(l.rows, dataset.StringValue(entry.Line))
l.records = sliceclear.Clear(l.records)
l.recordsSize = 0
for _, m := range entry.Metadata {
col := l.getMetadataColumn(m.Name)
_ = col.Append(l.rows, dataset.StringValue(m.Value))
if l.stripesSize >= l.opts.SectionSize {
l.flushSection()
}
l.rows++
l.metrics.recordCount.Inc()
}
// EstimatedSize returns the estimated size of the Logs section in bytes.
func (l *Logs) EstimatedSize() int {
var size int
func (l *Logs) flushSection() {
if len(l.stripes) == 0 {
return
}
size += l.streamIDs.EstimatedSize()
size += l.timestamps.EstimatedSize()
size += l.messages.EstimatedSize()
compressionOpts := dataset.CompressionOptions{
Zstd: []zstd.EOption{zstd.WithEncoderLevel(zstd.SpeedDefault)},
}
for _, md := range l.metadatas {
size += md.EstimatedSize()
section, err := mergeTables(&l.sectionBuffer, l.opts.PageSizeHint, compressionOpts, l.stripes)
if err != nil {
// We control the input to mergeTables, so this should never happen.
panic(fmt.Sprintf("merging tables: %v", err))
}
return size
l.sections = append(l.sections, section)
l.stripes = sliceclear.Clear(l.stripes)
l.stripesSize = 0
}
func (l *Logs) getMetadataColumn(key string) *dataset.ColumnBuilder {
idx, ok := l.metadataLookup[key]
if !ok {
col, err := dataset.NewColumnBuilder(key, dataset.BuilderOptions{
PageSizeHint: l.pageSize,
Value: datasetmd.VALUE_TYPE_STRING,
Encoding: datasetmd.ENCODING_TYPE_PLAIN,
Compression: datasetmd.COMPRESSION_TYPE_ZSTD,
})
if err != nil {
// We control the Value/Encoding tuple so this can't fail; if it does,
// we're left in an unrecoverable state where nothing can be encoded
// properly so we panic.
panic(fmt.Sprintf("creating metadata column: %v", err))
}
// EstimatedSize returns the estimated size of the Logs section in bytes.
func (l *Logs) EstimatedSize() int {
var size int
l.metadatas = append(l.metadatas, col)
l.metadataLookup[key] = len(l.metadatas) - 1
return col
size += l.recordsSize
size += l.stripesSize
for _, section := range l.sections {
size += section.Size()
}
return l.metadatas[idx]
return size
}
// EncodeTo encodes the set of logs to the provided encoder. Before encoding,
@ -179,23 +183,28 @@ func (l *Logs) EncodeTo(enc *encoding.Encoder) error {
defer l.Reset()
// TODO(rfratto): handle one section becoming too large. This can happen when
// the number of columns is very wide, due to a lot of metadata columns.
// There are two approaches to handle this:
//
// 1. Split streams into multiple sections.
// 2. Move some columns into an aggregated column which holds multiple label
// keys and values.
// Flush any remaining buffered data.
l.flushRecords()
l.flushSection()
dset, err := l.buildDataset()
if err != nil {
return fmt.Errorf("building dataset: %w", err)
}
cols, err := result.Collect(dset.ListColumns(context.Background())) // dset is in memory; "real" context not needed.
if err != nil {
return fmt.Errorf("listing columns: %w", err)
// TODO(rfratto): handle individual sections having oversized metadata. This
// can happen when the number of columns is very wide, due to a lot of
// metadata columns.
//
// As we're already splitting data into separate sections, the best solution
// for this is to aggregate the lowest cardinality columns into a combined
// column. This will reduce the number of columns in the section and thus the
// metadata size.
for _, section := range l.sections {
if err := l.encodeSection(enc, section); err != nil {
return fmt.Errorf("encoding section: %w", err)
}
}
return nil
}
func (l *Logs) encodeSection(enc *encoding.Encoder, section *table) error {
logsEnc, err := enc.OpenLogs()
if err != nil {
return fmt.Errorf("opening logs section: %w", err)
@ -206,16 +215,14 @@ func (l *Logs) EncodeTo(enc *encoding.Encoder) error {
_ = logsEnc.Discard()
}()
// Encode our columns. The slice order here *must* match the order in
// [Logs.buildDataset]!
{
errs := make([]error, 0, len(cols))
errs = append(errs, encodeColumn(logsEnc, logsmd.COLUMN_TYPE_STREAM_ID, cols[0]))
errs = append(errs, encodeColumn(logsEnc, logsmd.COLUMN_TYPE_TIMESTAMP, cols[1]))
for _, mdCol := range cols[2 : len(cols)-1] {
errs = append(errs, encodeColumn(logsEnc, logsmd.COLUMN_TYPE_METADATA, mdCol))
errs := make([]error, 0, len(section.Metadatas)+3)
errs = append(errs, encodeColumn(logsEnc, logsmd.COLUMN_TYPE_STREAM_ID, section.StreamID))
errs = append(errs, encodeColumn(logsEnc, logsmd.COLUMN_TYPE_TIMESTAMP, section.Timestamp))
for _, md := range section.Metadatas {
errs = append(errs, encodeColumn(logsEnc, logsmd.COLUMN_TYPE_METADATA, md))
}
errs = append(errs, encodeColumn(logsEnc, logsmd.COLUMN_TYPE_MESSAGE, cols[len(cols)-1]))
errs = append(errs, encodeColumn(logsEnc, logsmd.COLUMN_TYPE_MESSAGE, section.Message))
if err := errors.Join(errs...); err != nil {
return fmt.Errorf("encoding columns: %w", err)
}
@ -224,51 +231,6 @@ func (l *Logs) EncodeTo(enc *encoding.Encoder) error {
return logsEnc.Commit()
}
func (l *Logs) buildDataset() (dataset.Dataset, error) {
// Our columns are ordered as follows:
//
// 1. StreamID
// 2. Timestamp
// 3. Metadata columns
// 4. Message
//
// Do *not* change this order without updating [Logs.EncodeTo]!
//
// TODO(rfratto): find a clean way to decorate columns with additional
// metadata so we don't have to rely on order.
columns := make([]*dataset.MemColumn, 0, 3+len(l.metadatas))
// Flush never returns an error so we ignore it here to keep the code simple.
//
// TODO(rfratto): remove error return from Flush to clean up code.
streamID, _ := l.streamIDs.Flush()
timestamp, _ := l.timestamps.Flush()
columns = append(columns, streamID, timestamp)
for _, mdBuilder := range l.metadatas {
mdBuilder.Backfill(l.rows)
mdColumn, _ := mdBuilder.Flush()
columns = append(columns, mdColumn)
}
messages, _ := l.messages.Flush()
columns = append(columns, messages)
// TODO(rfratto): We need to be able to sort the columns first by StreamID
// and then by timestamp, but as it is now this is way too slow; sorting a
// 20MB dataset took several minutes due to the number of page loads
// happening across streams.
//
// Sorting can be made more efficient by:
//
// 1. Separating streams into separate datasets while appending
// 2. Sorting each stream separately
// 3. Combining sorted streams into a single dataset, which will already be
// sorted.
return dataset.FromMemory(columns), nil
}
func encodeColumn(enc *encoding.LogsEncoder, columnType logsmd.ColumnType, column dataset.Column) error {
columnEnc, err := enc.OpenColumn(columnType, column.ColumnInfo())
if err != nil {
@ -307,12 +269,15 @@ func encodeColumn(enc *encoding.LogsEncoder, columnType logsmd.ColumnType, colum
// Reset resets all state, allowing Logs to be reused.
func (l *Logs) Reset() {
l.rows = 0
l.metrics.recordCount.Set(0)
l.streamIDs.Reset()
l.timestamps.Reset()
l.metadatas = l.metadatas[:0]
clear(l.metadataLookup)
l.messages.Reset()
l.records = sliceclear.Clear(l.records)
l.recordsSize = 0
l.stripes = sliceclear.Clear(l.stripes)
l.stripeBuffer.Reset()
l.stripesSize = 0
l.sections = sliceclear.Clear(l.sections)
l.sectionBuffer.Reset()
}

@ -15,8 +15,6 @@ import (
)
func Test(t *testing.T) {
t.Skip("Disabled until sorting is reimplemented")
records := []logs.Record{
{
StreamID: 1,
@ -44,7 +42,13 @@ func Test(t *testing.T) {
},
}
tracker := logs.New(nil, 1024)
opts := logs.Options{
PageSizeHint: 1024,
BufferSize: 256,
SectionSize: 4096,
}
tracker := logs.New(nil, opts)
for _, record := range records {
tracker.Append(record)
}

@ -0,0 +1,312 @@
package logs
import (
"cmp"
"context"
"fmt"
"slices"
"github.com/grafana/loki/v3/pkg/dataobj/internal/dataset"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/logsmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/result"
)
// A table is a collection of columns that form a logs section.
type table struct {
StreamID *tableColumn
Timestamp *tableColumn
Metadatas []*tableColumn
Message *tableColumn
}
type tableColumn struct {
*dataset.MemColumn
Type logsmd.ColumnType
}
var _ dataset.Dataset = (*table)(nil)
// ListColumns implements [dataset.Dataset].
func (t *table) ListColumns(_ context.Context) result.Seq[dataset.Column] {
return result.Iter(func(yield func(dataset.Column) bool) error {
if !yield(t.StreamID) {
return nil
}
if !yield(t.Timestamp) {
return nil
}
for _, metadata := range t.Metadatas {
if !yield(metadata) {
return nil
}
}
if !yield(t.Message) {
return nil
}
return nil
})
}
// ListPages implements [dataset.Dataset].
func (t *table) ListPages(ctx context.Context, columns []dataset.Column) result.Seq[dataset.Pages] {
return result.Iter(func(yield func(dataset.Pages) bool) error {
for _, c := range columns {
pages, err := result.Collect(c.ListPages(ctx))
if err != nil {
return err
} else if !yield(dataset.Pages(pages)) {
return nil
}
}
return nil
})
}
// ReadPages implements [dataset.Dataset].
func (t *table) ReadPages(ctx context.Context, pages []dataset.Page) result.Seq[dataset.PageData] {
return result.Iter(func(yield func(dataset.PageData) bool) error {
for _, p := range pages {
data, err := p.ReadPage(ctx)
if err != nil {
return err
} else if !yield(data) {
return nil
}
}
return nil
})
}
// Size returns the total size of the table in bytes.
func (t *table) Size() int {
var size int
size += t.StreamID.ColumnInfo().CompressedSize
size += t.Timestamp.ColumnInfo().CompressedSize
for _, metadata := range t.Metadatas {
size += metadata.ColumnInfo().CompressedSize
}
size += t.Message.ColumnInfo().CompressedSize
return size
}
// A tableBuffer holds a set of column builders used for constructing tables.
// The zero value is ready for use.
type tableBuffer struct {
streamID *dataset.ColumnBuilder
timestamp *dataset.ColumnBuilder
metadatas []*dataset.ColumnBuilder
metadataLookup map[string]int // map of metadata key to index in metadatas
usedMetadatas map[*dataset.ColumnBuilder]string // metadata with its name.
message *dataset.ColumnBuilder
}
// StreamID gets or creates a stream ID column for the buffer.
func (b *tableBuffer) StreamID(pageSize int) *dataset.ColumnBuilder {
if b.streamID != nil {
return b.streamID
}
col, err := dataset.NewColumnBuilder("", dataset.BuilderOptions{
PageSizeHint: pageSize,
Value: datasetmd.VALUE_TYPE_INT64,
Encoding: datasetmd.ENCODING_TYPE_DELTA,
Compression: datasetmd.COMPRESSION_TYPE_NONE,
})
if err != nil {
// We control the Value/Encoding tuple so this can't fail; if it does,
// we're left in an unrecoverable state where nothing can be encoded
// properly so we panic.
panic(fmt.Sprintf("creating stream ID column: %v", err))
}
b.streamID = col
return col
}
// Timestamp gets or creates a timestamp column for the buffer.
func (b *tableBuffer) Timestamp(pageSize int) *dataset.ColumnBuilder {
if b.timestamp != nil {
return b.timestamp
}
col, err := dataset.NewColumnBuilder("", dataset.BuilderOptions{
PageSizeHint: pageSize,
Value: datasetmd.VALUE_TYPE_INT64,
Encoding: datasetmd.ENCODING_TYPE_DELTA,
Compression: datasetmd.COMPRESSION_TYPE_NONE,
})
if err != nil {
// We control the Value/Encoding tuple so this can't fail; if it does,
// we're left in an unrecoverable state where nothing can be encoded
// properly so we panic.
panic(fmt.Sprintf("creating timestamp column: %v", err))
}
b.timestamp = col
return col
}
// Metadata gets or creates a metadata column for the buffer. To remove created
// metadata columns, call [tableBuffer.CleanupMetadatas].
func (b *tableBuffer) Metadata(key string, pageSize int, compressionOpts dataset.CompressionOptions) *dataset.ColumnBuilder {
if b.usedMetadatas == nil {
b.usedMetadatas = make(map[*dataset.ColumnBuilder]string)
}
index, ok := b.metadataLookup[key]
if ok {
builder := b.metadatas[index]
b.usedMetadatas[builder] = key
return builder
}
col, err := dataset.NewColumnBuilder(key, dataset.BuilderOptions{
PageSizeHint: pageSize,
Value: datasetmd.VALUE_TYPE_STRING,
Encoding: datasetmd.ENCODING_TYPE_PLAIN,
Compression: datasetmd.COMPRESSION_TYPE_ZSTD,
CompressionOptions: compressionOpts,
})
if err != nil {
// We control the Value/Encoding tuple so this can't fail; if it does,
// we're left in an unrecoverable state where nothing can be encoded
// properly so we panic.
panic(fmt.Sprintf("creating metadata column: %v", err))
}
b.metadatas = append(b.metadatas, col)
if b.metadataLookup == nil {
b.metadataLookup = make(map[string]int)
}
b.metadataLookup[key] = len(b.metadatas) - 1
b.usedMetadatas[col] = key
return col
}
// Message gets or creates a message column for the buffer.
func (b *tableBuffer) Message(pageSize int, compressionOpts dataset.CompressionOptions) *dataset.ColumnBuilder {
if b.message != nil {
return b.message
}
col, err := dataset.NewColumnBuilder("", dataset.BuilderOptions{
PageSizeHint: pageSize,
Value: datasetmd.VALUE_TYPE_STRING,
Encoding: datasetmd.ENCODING_TYPE_PLAIN,
Compression: datasetmd.COMPRESSION_TYPE_ZSTD,
CompressionOptions: compressionOpts,
})
if err != nil {
// We control the Value/Encoding tuple so this can't fail; if it does,
// we're left in an unrecoverable state where nothing can be encoded
// properly so we panic.
panic(fmt.Sprintf("creating messages column: %v", err))
}
b.message = col
return col
}
// Reset resets the buffer to its initial state.
func (b *tableBuffer) Reset() {
if b.streamID != nil {
b.streamID.Reset()
}
if b.timestamp != nil {
b.timestamp.Reset()
}
if b.message != nil {
b.message.Reset()
}
for _, md := range b.metadatas {
md.Reset()
}
// We don't want to keep all metadata columns around forever, so we only
// retain the columns that were used in the last Flush.
var (
newMetadatas = make([]*dataset.ColumnBuilder, 0, len(b.metadatas))
newMetadataLookup = make(map[string]int, len(b.metadatas))
)
for _, md := range b.metadatas {
if b.usedMetadatas == nil {
break // Nothing was used.
}
key, used := b.usedMetadatas[md]
if !used {
continue
}
newMetadatas = append(newMetadatas, md)
newMetadataLookup[key] = len(newMetadatas) - 1
}
b.metadatas = newMetadatas
b.metadataLookup = newMetadataLookup
clear(b.usedMetadatas) // Reset the used cache for next time.
}
// Flush flushes the buffer into a table. Flush returns an error if the stream,
// timestamp, or messages column was never appended to.
//
// Only metadata columns that were appended to since the last flush are included in the table.
func (b *tableBuffer) Flush() (*table, error) {
defer b.Reset()
if b.streamID == nil {
return nil, fmt.Errorf("no stream column")
} else if b.timestamp == nil {
return nil, fmt.Errorf("no timestamp column")
} else if b.message == nil {
return nil, fmt.Errorf("no message column")
}
var (
// Flush never returns an error so we ignore it here to keep the code simple.
//
// TODO(rfratto): remove error return from Flush to clean up code.
streamID, _ = b.streamID.Flush()
timestamp, _ = b.timestamp.Flush()
messages, _ = b.message.Flush()
metadatas = make([]*tableColumn, 0, len(b.metadatas))
)
for _, metadataBuilder := range b.metadatas {
if b.usedMetadatas == nil {
continue
} else if _, ok := b.usedMetadatas[metadataBuilder]; !ok {
continue
}
// Each metadata column may have a different number of rows compared to
// other columns. Since adding NULLs isn't free, we don't call Backfill
// here.
metadata, _ := metadataBuilder.Flush()
metadatas = append(metadatas, &tableColumn{metadata, logsmd.COLUMN_TYPE_METADATA})
}
// Sort metadata columns by name for consistency.
slices.SortFunc(metadatas, func(a, b *tableColumn) int {
return cmp.Compare(a.ColumnInfo().Name, b.ColumnInfo().Name)
})
return &table{
StreamID: &tableColumn{streamID, logsmd.COLUMN_TYPE_STREAM_ID},
Timestamp: &tableColumn{timestamp, logsmd.COLUMN_TYPE_TIMESTAMP},
Metadatas: metadatas,
Message: &tableColumn{messages, logsmd.COLUMN_TYPE_MESSAGE},
}, nil
}

@ -0,0 +1,54 @@
package logs
import (
"cmp"
"slices"
"github.com/grafana/loki/v3/pkg/dataobj/internal/dataset"
)
// buildTable builds a table from the set of provided records. The records are
// sorted with [sortRecords] prior to building the table.
func buildTable(buf *tableBuffer, pageSize int, compressionOpts dataset.CompressionOptions, records []Record) *table {
sortRecords(records)
buf.Reset()
var (
streamIDBuilder = buf.StreamID(pageSize)
timestampBuilder = buf.Timestamp(pageSize)
messageBuilder = buf.Message(pageSize, compressionOpts)
)
for i, record := range records {
// Append only fails if given out-of-order data, where the provided row
// number is less than the previous row number. That can't happen here, so
// to keep the code readable we ignore the error values.
_ = streamIDBuilder.Append(i, dataset.Int64Value(record.StreamID))
_ = timestampBuilder.Append(i, dataset.Int64Value(record.Timestamp.UnixNano()))
_ = messageBuilder.Append(i, dataset.StringValue(record.Line))
for _, md := range record.Metadata {
metadataBuilder := buf.Metadata(md.Name, pageSize, compressionOpts)
_ = metadataBuilder.Append(i, dataset.StringValue(md.Value))
}
}
table, err := buf.Flush()
if err != nil {
// Unreachable; we always ensure every required column is created.
panic(err)
}
return table
}
// sortRecords sorts the set of records by stream ID and timestamp.
func sortRecords(records []Record) {
slices.SortFunc(records, func(a, b Record) int {
if res := cmp.Compare(a.StreamID, b.StreamID); res != 0 {
return res
}
return a.Timestamp.Compare(b.Timestamp)
})
}

@ -0,0 +1,151 @@
package logs
import (
"cmp"
"context"
"fmt"
"math"
"github.com/grafana/loki/v3/pkg/dataobj/internal/dataset"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/logsmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/result"
"github.com/grafana/loki/v3/pkg/util/loser"
)
// mergeTables merges the provided sorted tables into a new single sorted table
// using k-way merge.
func mergeTables(buf *tableBuffer, pageSize int, compressionOpts dataset.CompressionOptions, tables []*table) (*table, error) {
buf.Reset()
var (
streamIDBuilder = buf.StreamID(pageSize)
timestampBuilder = buf.Timestamp(pageSize)
messageBuilder = buf.Message(pageSize, compressionOpts)
)
var (
tableSequences = make([]*tableSequence, 0, len(tables))
)
for _, t := range tables {
dsetColumns, err := result.Collect(t.ListColumns(context.Background()))
if err != nil {
return nil, err
}
seq := dataset.Iter(context.Background(), dsetColumns)
next, stop := result.Pull(seq)
defer stop()
tableSequences = append(tableSequences, &tableSequence{
columns: dsetColumns,
pull: next, stop: stop,
})
}
maxValue := result.Value(dataset.Row{
Index: math.MaxInt,
Values: []dataset.Value{
dataset.Int64Value(math.MaxInt64),
dataset.Int64Value(math.MaxInt64),
},
})
var rows int
tree := loser.New(tableSequences, maxValue, tableSequenceValue, rowResultLess, tableSequenceStop)
for tree.Next() {
seq := tree.Winner()
row, err := tableSequenceValue(seq).Value()
if err != nil {
return nil, err
}
for i, column := range seq.columns {
// column is guaranteed to be a *tableColumn since we got it from *table.
column := column.(*tableColumn)
// dataset.Iter returns values in the same order as the number of
// columns.
value := row.Values[i]
switch column.Type {
case logsmd.COLUMN_TYPE_STREAM_ID:
_ = streamIDBuilder.Append(rows, value)
case logsmd.COLUMN_TYPE_TIMESTAMP:
_ = timestampBuilder.Append(rows, value)
case logsmd.COLUMN_TYPE_METADATA:
columnBuilder := buf.Metadata(column.Info.Name, pageSize, compressionOpts)
_ = columnBuilder.Append(rows, value)
case logsmd.COLUMN_TYPE_MESSAGE:
_ = messageBuilder.Append(rows, value)
default:
return nil, fmt.Errorf("unknown column type %s", column.Type)
}
}
rows++
}
return buf.Flush()
}
type tableSequence struct {
curValue result.Result[dataset.Row]
columns []dataset.Column
pull func() (result.Result[dataset.Row], bool)
stop func()
}
var _ loser.Sequence = (*tableSequence)(nil)
func (seq *tableSequence) Next() bool {
val, ok := seq.pull()
seq.curValue = val
return ok
}
func tableSequenceValue(seq *tableSequence) result.Result[dataset.Row] { return seq.curValue }
func tableSequenceStop(seq *tableSequence) { seq.stop() }
func rowResultLess(a, b result.Result[dataset.Row]) bool {
var (
aRow, aErr = a.Value()
bRow, bErr = b.Value()
)
// Put errors first so we return errors early.
if aErr != nil {
return true
} else if bErr != nil {
return false
}
return compareRows(aRow, bRow) < 0
}
// compareRows compares two rows by their first two columns. compareRows panics
// if a or b doesn't have at least two columns, if the first column isn't a
// int64-encoded stream ID, or if the second column isn't an int64-encoded
// timestamp.
func compareRows(a, b dataset.Row) int {
// The first two columns of each row are *always* stream ID and timestamp.
//
// TODO(rfratto): Can we find a safer way of doing this?
var (
aStreamID = a.Values[0].Int64()
bStreamID = b.Values[0].Int64()
aTimestamp = a.Values[1].Int64()
bTimestamp = b.Values[1].Int64()
)
if res := cmp.Compare(aStreamID, bStreamID); res != 0 {
return res
}
return cmp.Compare(aTimestamp, bTimestamp)
}

@ -0,0 +1,81 @@
package logs
import (
"context"
"strings"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/dataobj/internal/dataset"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/result"
)
func Test_table_metadataCleanup(t *testing.T) {
var buf tableBuffer
initBuffer(&buf)
_ = buf.Metadata("foo", 1024, dataset.CompressionOptions{})
_ = buf.Metadata("bar", 1024, dataset.CompressionOptions{})
table, err := buf.Flush()
require.NoError(t, err)
require.Equal(t, 2, len(table.Metadatas))
initBuffer(&buf)
_ = buf.Metadata("bar", 1024, dataset.CompressionOptions{})
table, err = buf.Flush()
require.NoError(t, err)
require.Equal(t, 1, len(table.Metadatas))
require.Equal(t, "bar", table.Metadatas[0].Info.Name)
}
func initBuffer(buf *tableBuffer) {
buf.StreamID(1024)
buf.Timestamp(1024)
buf.Message(1024, dataset.CompressionOptions{})
}
func Test_mergeTables(t *testing.T) {
var buf tableBuffer
var (
tableA = buildTable(&buf, 1024, dataset.CompressionOptions{}, []Record{
{StreamID: 1, Timestamp: time.Unix(1, 0), Line: "hello"},
{StreamID: 2, Timestamp: time.Unix(2, 0), Line: "are"},
{StreamID: 3, Timestamp: time.Unix(3, 0), Line: "goodbye"},
})
tableB = buildTable(&buf, 1024, dataset.CompressionOptions{}, []Record{
{StreamID: 1, Timestamp: time.Unix(2, 0), Line: "world"},
{StreamID: 3, Timestamp: time.Unix(1, 0), Line: "you"},
})
tableC = buildTable(&buf, 1024, dataset.CompressionOptions{}, []Record{
{StreamID: 2, Timestamp: time.Unix(1, 0), Line: "how"},
{StreamID: 3, Timestamp: time.Unix(2, 0), Line: "doing?"},
})
)
mergedTable, err := mergeTables(&buf, 1024, dataset.CompressionOptions{}, []*table{tableA, tableB, tableC})
require.NoError(t, err)
mergedColumns, err := result.Collect(mergedTable.ListColumns(context.Background()))
require.NoError(t, err)
var actual []string
for result := range dataset.Iter(context.Background(), mergedColumns) {
row, err := result.Value()
require.NoError(t, err)
require.Len(t, row.Values, 3)
require.Equal(t, datasetmd.VALUE_TYPE_STRING, row.Values[2].Type())
actual = append(actual, row.Values[2].String())
}
require.Equal(t, "hello world how are you doing? goodbye", strings.Join(actual, " "))
}

@ -17,6 +17,7 @@ import (
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/streamsmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/streamio"
"github.com/grafana/loki/v3/pkg/dataobj/internal/util/sliceclear"
)
// A Stream is an individual stream within a data object.
@ -333,7 +334,7 @@ func encodeColumn(enc *encoding.StreamsEncoder, columnType streamsmd.ColumnType,
func (s *Streams) Reset() {
s.lastID.Store(0)
clear(s.lookup)
s.ordered = s.ordered[:0]
s.ordered = sliceclear.Clear(s.ordered)
s.currentLabelsSize = 0
s.globalMinTimestamp = time.Time{}
s.globalMaxTimestamp = time.Time{}

@ -0,0 +1,72 @@
package bufpool
import (
"bytes"
"math"
"sync"
)
type bucket struct {
size uint64
pool sync.Pool
}
var buckets []*bucket
// Bucket sizes are exponentially sized from 1KiB to 64GiB. The max boundary is
// picked arbitrarily.
const (
bucketMin uint64 = 1024
bucketMax uint64 = 1 << 36 /* 64 GiB */
)
func init() {
nextBucket := bucketMin
for {
// Capture the size so New refers to the correct size per bucket.
buckets = append(buckets, &bucket{
size: nextBucket,
pool: sync.Pool{
New: func() any {
// We don't preallocate the buffer here; this will help a bucket pool
// to be filled with buffers of varying sizes within that bucket.
//
// If we *did* preallocate the buffer, then any call to
// [bytes.Buffer.Grow] beyond the bucket size would immediately cause
// it to double in size, placing it in the next bucket.
return bytes.NewBuffer(nil)
},
},
})
// Exponentially grow the bucket size up to bucketMax.
nextBucket *= 2
if nextBucket > bucketMax {
break
}
}
// Catch-all for buffers bigger than bucketMax.
buckets = append(buckets, &bucket{
size: math.MaxUint64,
pool: sync.Pool{
New: func() any {
return bytes.NewBuffer(nil)
},
},
})
}
// findBucket returns the first bucket that is large enough to hold size.
func findBucket(size uint64) *bucket {
for _, b := range buckets {
if b.size >= size {
return b
}
}
// We shouldn't be able to reach this point; the final bucket is sized for
// anything, but if we do reach this we'll return the last bucket anyway.
return buckets[len(buckets)-1]
}

@ -0,0 +1,41 @@
// Package bufpool offers a pool of [*bytes.Buffer] objects that are placed
// into exponentially sized buckets.
//
// Bucketing prevents the memory cost of a pool from permanently increasing
// when a large buffer is placed into the pool.
package bufpool
import (
"bytes"
)
// Get returns a buffer from the pool for the given size. Returned buffers are
// reset and ready for writes.
//
// The capacity of the returned buffer is guaranteed to be at least size.
func Get(size int) *bytes.Buffer {
if size < 0 {
size = 0
}
b := findBucket(uint64(size))
buf := b.pool.Get().(*bytes.Buffer)
buf.Reset()
buf.Grow(size)
return buf
}
// Put returns a buffer to the pool. The buffer is placed into an appropriate
// bucket based on its current capacity.
func Put(buf *bytes.Buffer) {
if buf == nil {
return
}
b := findBucket(uint64(buf.Cap()))
if b == nil {
return
}
b.pool.Put(buf)
}

@ -0,0 +1,36 @@
package bufpool
import (
"fmt"
"math"
"testing"
"github.com/stretchr/testify/require"
)
func Test_findBucket(t *testing.T) {
tt := []struct {
size uint64
expect uint64
}{
{size: 0, expect: 1024},
{size: 512, expect: 1024},
{size: 1024, expect: 1024},
{size: 1025, expect: 2048},
{size: (1 << 36), expect: (1 << 36)},
{size: (1 << 37), expect: math.MaxUint64},
}
for _, tc := range tt {
t.Run(fmt.Sprintf("size=%d", tc.size), func(t *testing.T) {
got := findBucket(tc.size).size
require.Equal(t, tc.expect, got)
})
}
}
func Test(t *testing.T) {
buf := Get(1_500_000)
require.NotNil(t, buf)
require.Less(t, buf.Cap(), 2<<20, "buffer should not have grown to next bucket size")
}

@ -0,0 +1,11 @@
// Package sliceclear provides a way to clear and truncate the length of a
// slice.
package sliceclear
// Clear zeroes out all values in s and returns s[:0]. Clear allows memory of
// previous elements in the slice to be reclained by the garbage collector
// while still allowing the underlying slice memory to be reused.
func Clear[Slice ~[]E, E any](s Slice) Slice {
clear(s)
return s[:0]
}

@ -0,0 +1,28 @@
package sliceclear_test
import (
"testing"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/dataobj/internal/util/sliceclear"
)
func Test(t *testing.T) {
s := make([]*int, 0, 10)
for i := 0; i < 10; i++ {
s = append(s, new(int))
}
s = sliceclear.Clear(s)
require.Equal(t, 10, cap(s))
require.Equal(t, 0, len(s))
// Reexpand s to its full capacity and ensure that all elements have been
// zeroed out.
full := s[:cap(s)]
require.Equal(t, 10, len(full))
for i := 0; i < 10; i++ {
require.Nil(t, full[i], "element %d was not zeroed; this can cause memory leaks", i)
}
}
Loading…
Cancel
Save