From 948f5c5f3e64c6fba714d0bd90be558ac3e7eb04 Mon Sep 17 00:00:00 2001 From: Robert Fratto Date: Thu, 23 Jan 2025 11:14:53 -0500 Subject: [PATCH] chore(dataobj): Reintroduce sorting of the logs section (#15906) --- pkg/dataobj/dataobj.go | 27 +- pkg/dataobj/dataobj_test.go | 37 +-- .../internal/dataset/column_builder.go | 12 + pkg/dataobj/internal/dataset/dataset_iter.go | 5 +- pkg/dataobj/internal/dataset/page.go | 98 +++++- pkg/dataobj/internal/dataset/page_builder.go | 12 +- .../internal/dataset/page_compress_writer.go | 10 +- pkg/dataobj/internal/dataset/page_test.go | 51 ++- .../internal/encoding/decoder_metadata.go | 9 +- pkg/dataobj/internal/encoding/encoder.go | 20 ++ pkg/dataobj/internal/sections/logs/logs.go | 313 ++++++++---------- .../internal/sections/logs/logs_test.go | 10 +- pkg/dataobj/internal/sections/logs/table.go | 312 +++++++++++++++++ .../internal/sections/logs/table_build.go | 54 +++ .../internal/sections/logs/table_merge.go | 151 +++++++++ .../internal/sections/logs/table_test.go | 81 +++++ .../internal/sections/streams/streams.go | 3 +- pkg/dataobj/internal/util/bufpool/bucket.go | 72 ++++ pkg/dataobj/internal/util/bufpool/bufpool.go | 41 +++ .../internal/util/bufpool/bufpool_test.go | 36 ++ .../internal/util/sliceclear/sliceclear.go | 11 + .../util/sliceclear/sliceclear_test.go | 28 ++ 22 files changed, 1158 insertions(+), 235 deletions(-) create mode 100644 pkg/dataobj/internal/sections/logs/table.go create mode 100644 pkg/dataobj/internal/sections/logs/table_build.go create mode 100644 pkg/dataobj/internal/sections/logs/table_merge.go create mode 100644 pkg/dataobj/internal/sections/logs/table_test.go create mode 100644 pkg/dataobj/internal/util/bufpool/bucket.go create mode 100644 pkg/dataobj/internal/util/bufpool/bufpool.go create mode 100644 pkg/dataobj/internal/util/bufpool/bufpool_test.go create mode 100644 pkg/dataobj/internal/util/sliceclear/sliceclear.go create mode 100644 pkg/dataobj/internal/util/sliceclear/sliceclear_test.go diff --git a/pkg/dataobj/dataobj.go b/pkg/dataobj/dataobj.go index 9964fde324..7b374ccc94 100644 --- a/pkg/dataobj/dataobj.go +++ b/pkg/dataobj/dataobj.go @@ -47,16 +47,29 @@ type BuilderConfig struct { // TargetObjectSize configures a target size for data objects. TargetObjectSize flagext.Bytes `yaml:"target_object_size"` + + // TargetSectionSize configures the maximum size of data in a section. Sections + // which support this parameter will place overflow data into new sections of + // the same type. + TargetSectionSize flagext.Bytes `yaml:"target_section_size"` + + // BufferSize configures the size of the buffer used to accumulate + // uncompressed logs in memory prior to sorting. + BufferSize flagext.Bytes `yaml:"buffer_size"` } // RegisterFlagsWithPrefix registers flags with the given prefix. func (cfg *BuilderConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { _ = cfg.TargetPageSize.Set("2MB") _ = cfg.TargetObjectSize.Set("1GB") + _ = cfg.BufferSize.Set("16MB") // Page Size * 8 + _ = cfg.TargetSectionSize.Set("128MB") // Target Object Size / 8 f.IntVar(&cfg.SHAPrefixSize, prefix+"sha-prefix-size", 2, "The size of the SHA prefix to use for the data object builder.") f.Var(&cfg.TargetPageSize, prefix+"target-page-size", "The size of the target page to use for the data object builder.") f.Var(&cfg.TargetObjectSize, prefix+"target-object-size", "The size of the target object to use for the data object builder.") + f.Var(&cfg.TargetSectionSize, prefix+"target-section-size", "Configures a maximum size for sections, for sections that support it.") + f.Var(&cfg.BufferSize, prefix+"buffer-size", "The size of the buffer to use for sorting logs.") } // Validate validates the BuilderConfig. @@ -77,6 +90,14 @@ func (cfg *BuilderConfig) Validate() error { errs = append(errs, errors.New("TargetObjectSize must be greater than 0")) } + if cfg.BufferSize <= 0 { + errs = append(errs, errors.New("BufferSize must be greater than 0")) + } + + if cfg.TargetSectionSize <= 0 || cfg.TargetSectionSize > cfg.TargetObjectSize { + errs = append(errs, errors.New("SectionSize must be greater than 0 and less than or equal to TargetObjectSize")) + } + return errors.Join(errs...) } @@ -148,7 +169,11 @@ func NewBuilder(cfg BuilderConfig, bucket objstore.Bucket, tenantID string) (*Bu labelCache: labelCache, streams: streams.New(metrics.streams, int(cfg.TargetPageSize)), - logs: logs.New(metrics.logs, int(cfg.TargetPageSize)), + logs: logs.New(metrics.logs, logs.Options{ + PageSizeHint: int(cfg.TargetPageSize), + BufferSize: int(cfg.BufferSize), + SectionSize: int(cfg.TargetSectionSize), + }), flushBuffer: flushBuffer, encoder: encoder, diff --git a/pkg/dataobj/dataobj_test.go b/pkg/dataobj/dataobj_test.go index e367eca081..6c75d722ae 100644 --- a/pkg/dataobj/dataobj_test.go +++ b/pkg/dataobj/dataobj_test.go @@ -19,6 +19,16 @@ import ( "github.com/grafana/loki/v3/pkg/logql/syntax" ) +var testBuilderConfig = BuilderConfig{ + SHAPrefixSize: 2, + + TargetPageSize: 2048, + TargetObjectSize: 4096, + TargetSectionSize: 4096, + + BufferSize: 2048 * 8, +} + func Test(t *testing.T) { bucket := objstore.NewInMemBucket() @@ -67,16 +77,7 @@ func Test(t *testing.T) { } t.Run("Build", func(t *testing.T) { - // Create a tiny builder which flushes a lot of objects and pages to properly - // test the builder. - builderConfig := BuilderConfig{ - SHAPrefixSize: 2, - - TargetPageSize: 1_500_000, - TargetObjectSize: 10_000_000, - } - - builder, err := NewBuilder(builderConfig, bucket, "fake") + builder, err := NewBuilder(testBuilderConfig, bucket, "fake") require.NoError(t, err) for _, entry := range streams { @@ -94,10 +95,7 @@ func Test(t *testing.T) { actual, err := result.Collect(reader.Streams(context.Background(), objects[0])) require.NoError(t, err) - - // TODO(rfratto): reenable once sorting is reintroduced. - _ = actual - // require.Equal(t, sortStreams(t, streams), actual) + require.Equal(t, sortStreams(t, streams), actual) }) } @@ -109,16 +107,7 @@ func Test_Builder_Append(t *testing.T) { bucket := objstore.NewInMemBucket() - // Create a tiny builder which flushes a lot of objects and pages to properly - // test the builder. - builderConfig := BuilderConfig{ - SHAPrefixSize: 2, - - TargetPageSize: 2048, - TargetObjectSize: 4096, - } - - builder, err := NewBuilder(builderConfig, bucket, "fake") + builder, err := NewBuilder(testBuilderConfig, bucket, "fake") require.NoError(t, err) for { diff --git a/pkg/dataobj/internal/dataset/column_builder.go b/pkg/dataobj/internal/dataset/column_builder.go index ca67fbdf15..0b6833e0ab 100644 --- a/pkg/dataobj/internal/dataset/column_builder.go +++ b/pkg/dataobj/internal/dataset/column_builder.go @@ -3,6 +3,8 @@ package dataset import ( "fmt" + "github.com/klauspost/compress/zstd" + "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd" ) @@ -21,6 +23,16 @@ type BuilderOptions struct { // Compression is the compression algorithm to use for values. Compression datasetmd.CompressionType + + // CompressionOptions holds optional configuration for compression. + CompressionOptions CompressionOptions +} + +// CompressionOptions customizes the compressor used when building pages. +type CompressionOptions struct { + // Zstd holds encoding options for Zstd compression. Only used for + // [datasetmd.COMPRESSION_TYPE_ZSTD]. + Zstd []zstd.EOption } // A ColumnBuilder builds a sequence of [Value] entries of a common type into a diff --git a/pkg/dataobj/internal/dataset/dataset_iter.go b/pkg/dataobj/internal/dataset/dataset_iter.go index d223b87d1a..34fda49a23 100644 --- a/pkg/dataobj/internal/dataset/dataset_iter.go +++ b/pkg/dataobj/internal/dataset/dataset_iter.go @@ -33,7 +33,6 @@ func Iter(ctx context.Context, columns []Column) result.Seq[Row] { type pullColumnIter struct { Next func() (result.Result[Value], bool) - Stop func() } return result.Iter(func(yield func(Row) bool) error { @@ -47,7 +46,9 @@ func Iter(ctx context.Context, columns []Column) result.Seq[Row] { } next, stop := result.Pull(lazyColumnIter(ctx, col.ColumnInfo(), pages)) - pullColumns = append(pullColumns, pullColumnIter{Next: next, Stop: stop}) + defer stop() + + pullColumns = append(pullColumns, pullColumnIter{Next: next}) } // Start emitting rows; each row is composed of the next value from all of diff --git a/pkg/dataobj/internal/dataset/page.go b/pkg/dataobj/internal/dataset/page.go index 9e8846d88e..c0b2653bbc 100644 --- a/pkg/dataobj/internal/dataset/page.go +++ b/pkg/dataobj/internal/dataset/page.go @@ -7,11 +7,13 @@ import ( "fmt" "hash/crc32" "io" + "sync" "github.com/golang/snappy" "github.com/klauspost/compress/zstd" "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd" + "github.com/grafana/loki/v3/pkg/dataobj/internal/util/bufpool" ) // Helper types. @@ -88,39 +90,99 @@ func (p *MemPage) reader(compression datasetmd.CompressionType) (presence io.Rea } var ( - bitmapReader = bytes.NewReader(p.Data[n : n+int(bitmapSize)]) - compressedDataReader = bytes.NewReader(p.Data[n+int(bitmapSize):]) + bitmapData = p.Data[n : n+int(bitmapSize)] + compressedValuesData = p.Data[n+int(bitmapSize):] + + bitmapReader = bytes.NewReader(bitmapData) + compressedValuesReader = bytes.NewReader(compressedValuesData) ) switch compression { case datasetmd.COMPRESSION_TYPE_UNSPECIFIED, datasetmd.COMPRESSION_TYPE_NONE: - return bitmapReader, io.NopCloser(compressedDataReader), nil + return bitmapReader, io.NopCloser(compressedValuesReader), nil case datasetmd.COMPRESSION_TYPE_SNAPPY: - sr := snappy.NewReader(compressedDataReader) - return bitmapReader, io.NopCloser(sr), nil + sr := snappyPool.Get().(*snappy.Reader) + sr.Reset(compressedValuesReader) + return bitmapReader, &closerFunc{Reader: sr, onClose: func() error { + snappyPool.Put(sr) + return nil + }}, nil case datasetmd.COMPRESSION_TYPE_ZSTD: - zr, err := zstd.NewReader(compressedDataReader) - if err != nil { - return nil, nil, fmt.Errorf("opening zstd reader: %w", err) - } - return bitmapReader, newZstdReader(zr), nil + zr := &fixedZstdReader{page: p, data: compressedValuesData} + return bitmapReader, zr, nil } panic(fmt.Sprintf("dataset.MemPage.reader: unknown compression type %q", compression.String())) } -// zstdReader implements [io.ReadCloser] for a [zstd.Decoder]. -type zstdReader struct{ *zstd.Decoder } +var snappyPool = sync.Pool{ + New: func() interface{} { + return snappy.NewReader(nil) + }, +} + +type closerFunc struct { + io.Reader + onClose func() error +} + +func (c *closerFunc) Close() error { return c.onClose() } -// newZstdReader returns a new [io.ReadCloser] for a [zstd.Decoder]. -func newZstdReader(dec *zstd.Decoder) io.ReadCloser { - return &zstdReader{Decoder: dec} +// globalZstdDecoder is a shared zstd decoder for [fixedZstdReader]. Concurrent +// uses of globalZstdDecoder are only safe when using [zstd.Decoder.DecodeAll]. +var globalZstdDecoder = func() *zstd.Decoder { + d, err := zstd.NewReader(nil, zstd.WithDecoderConcurrency(1)) + if err != nil { + panic(err) + } + return d +}() + +// fixedZstdReader is an [io.ReadCloser] that decompresses a zstd buffer in a +// single pass. +type fixedZstdReader struct { + page *MemPage + data []byte + + uncompressedBuf *bytes.Buffer + closed bool } -// Close implements [io.Closer]. -func (r *zstdReader) Close() error { - r.Decoder.Close() +func (r *fixedZstdReader) Read(p []byte) (int, error) { + if r.closed { + return 0, io.ErrClosedPipe + } + + if r.uncompressedBuf != nil { + return r.uncompressedBuf.Read(p) + } + + // We decompress the entire buffer in a single pass. While a pooled zstd + // reader would require less memory and would allow us to stream values as we + // decompress, pooling zstd decoders is difficult to do properly, as it + // requires a finalizer to release resources, and the goroutines spawned by + // decoders prevent the finalizer from ever being called. + // + // To make efficient zstd decoding less error prone, we opt for this instead. + r.uncompressedBuf = bufpool.Get(r.page.Info.UncompressedSize) + r.uncompressedBuf.Reset() + + buf, err := globalZstdDecoder.DecodeAll(r.data, r.uncompressedBuf.AvailableBuffer()) + if err != nil { + return 0, fmt.Errorf("decoding zstd: %w", err) + } + _, _ = r.uncompressedBuf.Write(buf) + + return r.uncompressedBuf.Read(p) +} + +func (r *fixedZstdReader) Close() error { + if r.uncompressedBuf != nil { + bufpool.Put(r.uncompressedBuf) + r.uncompressedBuf = nil + } + r.closed = true return nil } diff --git a/pkg/dataobj/internal/dataset/page_builder.go b/pkg/dataobj/internal/dataset/page_builder.go index 4222f3cae2..3f16e9e6b2 100644 --- a/pkg/dataobj/internal/dataset/page_builder.go +++ b/pkg/dataobj/internal/dataset/page_builder.go @@ -56,7 +56,7 @@ func newPageBuilder(opts BuilderOptions) (*pageBuilder, error) { presenceBuffer = bytes.NewBuffer(nil) valuesBuffer = bytes.NewBuffer(make([]byte, 0, opts.PageSizeHint)) - valuesWriter = newCompressWriter(valuesBuffer, opts.Compression) + valuesWriter = newCompressWriter(valuesBuffer, opts.Compression, opts.CompressionOptions) ) presenceEnc := newBitmapEncoder(presenceBuffer) @@ -174,12 +174,18 @@ func (b *pageBuilder) Flush() (*MemPage, error) { return nil, fmt.Errorf("no data to flush") } - // Before we can build the page we need to finish flushing our encoders and writers. + // Before we can build the page we need to finish flushing our encoders and + // writers. + // + // We must call [compressWriter.Close] to ensure that Zstd writers write a + // proper EOF marker, otherwise synchronous decoding can't be used. + // compressWriters can continue to reset and reused after closing, so this is + // safe. if err := b.presenceEnc.Flush(); err != nil { return nil, fmt.Errorf("flushing presence encoder: %w", err) } else if err := b.valuesEnc.Flush(); err != nil { return nil, fmt.Errorf("flushing values encoder: %w", err) - } else if err := b.valuesWriter.Flush(); err != nil { + } else if err := b.valuesWriter.Close(); err != nil { return nil, fmt.Errorf("flushing values writer: %w", err) } diff --git a/pkg/dataobj/internal/dataset/page_compress_writer.go b/pkg/dataobj/internal/dataset/page_compress_writer.go index 3fad4a0edf..a096ceb443 100644 --- a/pkg/dataobj/internal/dataset/page_compress_writer.go +++ b/pkg/dataobj/internal/dataset/page_compress_writer.go @@ -20,14 +20,16 @@ type compressWriter struct { w io.WriteCloser // Compressing writer. buf *bufio.Writer // Buffered writer in front of w to be able to call WriteByte. + rawBytes int // Number of uncompressed bytes written. + compression datasetmd.CompressionType // Compression type being used. - rawBytes int // Number of uncompressed bytes written. + opts CompressionOptions // Options to customize compression. } var _ streamio.Writer = (*compressWriter)(nil) -func newCompressWriter(w io.Writer, ty datasetmd.CompressionType) *compressWriter { - c := compressWriter{compression: ty} +func newCompressWriter(w io.Writer, ty datasetmd.CompressionType, opts CompressionOptions) *compressWriter { + c := compressWriter{compression: ty, opts: opts} c.Reset(w) return &c } @@ -85,7 +87,7 @@ func (c *compressWriter) Reset(w io.Writer) { compressedWriter = snappy.NewBufferedWriter(w) case datasetmd.COMPRESSION_TYPE_ZSTD: - zw, err := zstd.NewWriter(w, zstd.WithEncoderLevel(zstd.SpeedBestCompression)) + zw, err := zstd.NewWriter(w, c.opts.Zstd...) if err != nil { panic(fmt.Sprintf("compressWriter.Reset: creating zstd writer: %v", err)) } diff --git a/pkg/dataobj/internal/dataset/page_test.go b/pkg/dataobj/internal/dataset/page_test.go index b041285433..869ace87bd 100644 --- a/pkg/dataobj/internal/dataset/page_test.go +++ b/pkg/dataobj/internal/dataset/page_test.go @@ -1,6 +1,7 @@ package dataset import ( + "io" "math/rand" "testing" "time" @@ -10,7 +11,7 @@ import ( "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd" ) -func Test_pageBuilder_WriteRead(t *testing.T) { +func Benchmark_pageBuilder_WriteRead(b *testing.B) { in := []string{ "hello, world!", "", @@ -30,6 +31,54 @@ func Test_pageBuilder_WriteRead(t *testing.T) { Compression: datasetmd.COMPRESSION_TYPE_ZSTD, Encoding: datasetmd.ENCODING_TYPE_PLAIN, } + builder, err := newPageBuilder(opts) + require.NoError(b, err) + + for _, s := range in { + require.True(b, builder.Append(StringValue(s))) + } + + page, err := builder.Flush() + require.NoError(b, err) + require.Equal(b, len(in), page.Info.RowCount) + require.Equal(b, len(in)-2, page.Info.ValuesCount) // -2 for the empty strings + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + _, values, err := page.reader(datasetmd.COMPRESSION_TYPE_ZSTD) + if err != nil { + b.Fatal() + } + + if _, err := io.Copy(io.Discard, values); err != nil { + b.Fatal(err) + } else if err := values.Close(); err != nil { + b.Fatal(err) + } + } +} + +func Test_pageBuilder_WriteRead(t *testing.T) { + in := []string{ + "hello, world!", + "", + "this is a test of the emergency broadcast system", + "this is only a test", + "if this were a real emergency, you would be instructed to panic", + "but it's not, so don't", + "", + "this concludes the test", + "thank you for your cooperation", + "goodbye", + } + + opts := BuilderOptions{ + PageSizeHint: 1024, + Value: datasetmd.VALUE_TYPE_STRING, + Compression: datasetmd.COMPRESSION_TYPE_SNAPPY, + Encoding: datasetmd.ENCODING_TYPE_PLAIN, + } b, err := newPageBuilder(opts) require.NoError(t, err) diff --git a/pkg/dataobj/internal/encoding/decoder_metadata.go b/pkg/dataobj/internal/encoding/decoder_metadata.go index b409111080..192ea4a5be 100644 --- a/pkg/dataobj/internal/encoding/decoder_metadata.go +++ b/pkg/dataobj/internal/encoding/decoder_metadata.go @@ -1,7 +1,6 @@ package encoding import ( - "bytes" "encoding/binary" "fmt" "io" @@ -12,6 +11,7 @@ import ( "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/logsmd" "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/streamsmd" "github.com/grafana/loki/v3/pkg/dataobj/internal/streamio" + "github.com/grafana/loki/v3/pkg/dataobj/internal/util/bufpool" ) // decode* methods for metadata shared by Decoder implementations. @@ -108,9 +108,10 @@ func decodeProto(r streamio.Reader, pb proto.Message) error { return fmt.Errorf("read proto message size: %w", err) } - buf := bytesBufferPool.Get().(*bytes.Buffer) - buf.Reset() - defer bytesBufferPool.Put(buf) + // We know exactly how big of a buffer we need here, so we can get a bucketed + // buffer from bufpool. + buf := bufpool.Get(int(size)) + defer bufpool.Put(buf) n, err := io.Copy(buf, io.LimitReader(r, int64(size))) if err != nil { diff --git a/pkg/dataobj/internal/encoding/encoder.go b/pkg/dataobj/internal/encoding/encoder.go index 681ec2ddd5..a022d1795a 100644 --- a/pkg/dataobj/internal/encoding/encoder.go +++ b/pkg/dataobj/internal/encoding/encoder.go @@ -12,6 +12,26 @@ import ( "github.com/grafana/loki/v3/pkg/dataobj/internal/streamio" ) +// TODO(rfratto): the memory footprint of [Encoder] can very slowly grow in +// memory as [bytesBufferPool] is filled with buffers with increasing capacity: +// each encoding pass has a different number of elements, shuffling which +// elements of the hierarchy get which pooled buffers. +// +// This means that elements that require more bytes will grow the capacity of +// the buffer and put the buffer back into the pool. Even if further encoding +// passes don't need that many bytes, the buffer is kept alive with its larger +// footprint. Given enough time, all buffers in the pool will have a large +// capacity. +// +// The bufpool package provides a solution to this (bucketing pools by +// capacity), but using bufpool properly requires knowing how many bytes are +// needed. +// +// Encoder can eventually be moved to the bufpool package by calculating a +// rolling maximum of encoding size used per element across usages of an +// Encoder instance. This would then allow larger buffers to be eventually +// reclaimed regardless of how often encoding is done. + // Encoder encodes a data object. Data objects are hierarchical, split into // distinct sections that contain their own hierarchy. // diff --git a/pkg/dataobj/internal/sections/logs/logs.go b/pkg/dataobj/internal/sections/logs/logs.go index e23ecce7cf..cf08e6455e 100644 --- a/pkg/dataobj/internal/sections/logs/logs.go +++ b/pkg/dataobj/internal/sections/logs/logs.go @@ -3,22 +3,20 @@ package logs import ( - "cmp" "context" "errors" "fmt" - "slices" "time" + "github.com/klauspost/compress/zstd" "github.com/prometheus/client_golang/prometheus" "github.com/grafana/loki/pkg/push" "github.com/grafana/loki/v3/pkg/dataobj/internal/dataset" "github.com/grafana/loki/v3/pkg/dataobj/internal/encoding" - "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd" "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/logsmd" - "github.com/grafana/loki/v3/pkg/dataobj/internal/result" + "github.com/grafana/loki/v3/pkg/dataobj/internal/util/sliceclear" ) // A Record is an individual log record within the logs section. @@ -29,141 +27,147 @@ type Record struct { Line string } +// Options configures the behavior of the logs section. +type Options struct { + // PageSizeHint is the size of pages to use when encoding the logs section. + PageSizeHint int + + // BufferSize is the size of the buffer to use when accumulating log records. + BufferSize int + + // SectionSizeHint is the size of the section to use when encoding the logs + // section. If the section size is exceeded, multiple sections will be + // created. + SectionSize int +} + // Logs accumulate a set of [Record]s within a data object. type Logs struct { - metrics *Metrics - rows int - pageSize int + metrics *Metrics + opts Options + + // Sorting the entire set of logs is very expensive, so we need to break it + // up into smaller pieces: + // + // 1. Records are accumulated in memory up to BufferSize; the current size is + // tracked by recordsSize. + // + // 2. Once the buffer is full, records are sorted and flushed to smaller + // [table]s called stripes. + // + // 3. Once the set of stripes reaches SectionSize, they are merged together + // into a final table that will be encoded as a single section. + // + // At the end of this process, there will be a set of sections that are + // encoded separately. - streamIDs *dataset.ColumnBuilder - timestamps *dataset.ColumnBuilder + records []Record // Buffered records to flush to a group. + recordsSize int - metadatas []*dataset.ColumnBuilder - metadataLookup map[string]int // map of metadata key to index in metadatas + stripes []*table // In-progress section; flushed with [mergeTables] into a single table. + stripeBuffer tableBuffer + stripesSize int // Estimated byte size of all elements in stripes. - messages *dataset.ColumnBuilder + sections []*table // Completed sections. + sectionBuffer tableBuffer } // Nwe creates a new Logs section. The pageSize argument specifies how large // pages should be. -func New(metrics *Metrics, pageSize int) *Logs { +func New(metrics *Metrics, opts Options) *Logs { if metrics == nil { metrics = NewMetrics() } - // We control the Value/Encoding tuple so creating column builders can't - // fail; if it does, we're left in an unrecoverable state where nothing can - // be encoded properly so we panic. - streamIDs, err := dataset.NewColumnBuilder("", dataset.BuilderOptions{ - PageSizeHint: pageSize, - Value: datasetmd.VALUE_TYPE_INT64, - Encoding: datasetmd.ENCODING_TYPE_DELTA, - Compression: datasetmd.COMPRESSION_TYPE_NONE, - }) - if err != nil { - panic(fmt.Sprintf("creating stream ID column: %v", err)) + return &Logs{ + metrics: metrics, + opts: opts, } +} - timestamps, err := dataset.NewColumnBuilder("", dataset.BuilderOptions{ - PageSizeHint: pageSize, - Value: datasetmd.VALUE_TYPE_INT64, - Encoding: datasetmd.ENCODING_TYPE_DELTA, - Compression: datasetmd.COMPRESSION_TYPE_NONE, - }) - if err != nil { - panic(fmt.Sprintf("creating timestamp column: %v", err)) - } +// Append adds a new entry to the set of Logs. +func (l *Logs) Append(entry Record) { + l.metrics.appendsTotal.Inc() - messages, err := dataset.NewColumnBuilder("", dataset.BuilderOptions{ - PageSizeHint: pageSize, - Value: datasetmd.VALUE_TYPE_STRING, - Encoding: datasetmd.ENCODING_TYPE_PLAIN, - Compression: datasetmd.COMPRESSION_TYPE_ZSTD, - }) - if err != nil { - panic(fmt.Sprintf("creating message column: %v", err)) - } + l.records = append(l.records, entry) + l.recordsSize += recordSize(entry) - return &Logs{ - metrics: metrics, - pageSize: pageSize, + if l.recordsSize >= l.opts.BufferSize { + l.flushRecords() + } - streamIDs: streamIDs, - timestamps: timestamps, + l.metrics.recordCount.Inc() +} - metadataLookup: make(map[string]int), +func recordSize(record Record) int { + var size int - messages: messages, + size++ // One byte per stream ID (for uvarint). + size += 8 // Eight bytes for timestamp. + for _, metadata := range record.Metadata { + size += len(metadata.Value) } + size += len(record.Line) + + return size } -// Append adds a new entry to the set of Logs. -func (l *Logs) Append(entry Record) { - l.metrics.appendsTotal.Inc() +func (l *Logs) flushRecords() { + if len(l.records) == 0 { + return + } - // Sort metadata to ensure consistent encoding. Metadata is sorted by key. - // While keys must be unique, we sort by value if two keys match; this - // ensures that the same value always gets encoded for duplicate keys. - slices.SortFunc(entry.Metadata, func(a, b push.LabelAdapter) int { - if res := cmp.Compare(a.Name, b.Name); res != 0 { - return res - } - return cmp.Compare(a.Value, b.Value) - }) + // Our stripes are intermediate tables that don't need to have the best + // compression. To maintain high throughput on appends, we use the fastest + // compression for a stripe. Better compression is then used for sections. + compressionOpts := dataset.CompressionOptions{ + Zstd: []zstd.EOption{zstd.WithEncoderLevel(zstd.SpeedFastest)}, + } - // We ignore the errors below; they only fail if given out-of-order data - // (where the row number is less than the previous row number), which can't - // ever happen here. + stripe := buildTable(&l.stripeBuffer, l.opts.PageSizeHint, compressionOpts, l.records) + l.stripes = append(l.stripes, stripe) + l.stripesSize += stripe.Size() - _ = l.streamIDs.Append(l.rows, dataset.Int64Value(entry.StreamID)) - _ = l.timestamps.Append(l.rows, dataset.Int64Value(entry.Timestamp.UnixNano())) - _ = l.messages.Append(l.rows, dataset.StringValue(entry.Line)) + l.records = sliceclear.Clear(l.records) + l.recordsSize = 0 - for _, m := range entry.Metadata { - col := l.getMetadataColumn(m.Name) - _ = col.Append(l.rows, dataset.StringValue(m.Value)) + if l.stripesSize >= l.opts.SectionSize { + l.flushSection() } - - l.rows++ - l.metrics.recordCount.Inc() } -// EstimatedSize returns the estimated size of the Logs section in bytes. -func (l *Logs) EstimatedSize() int { - var size int +func (l *Logs) flushSection() { + if len(l.stripes) == 0 { + return + } - size += l.streamIDs.EstimatedSize() - size += l.timestamps.EstimatedSize() - size += l.messages.EstimatedSize() + compressionOpts := dataset.CompressionOptions{ + Zstd: []zstd.EOption{zstd.WithEncoderLevel(zstd.SpeedDefault)}, + } - for _, md := range l.metadatas { - size += md.EstimatedSize() + section, err := mergeTables(&l.sectionBuffer, l.opts.PageSizeHint, compressionOpts, l.stripes) + if err != nil { + // We control the input to mergeTables, so this should never happen. + panic(fmt.Sprintf("merging tables: %v", err)) } - return size + l.sections = append(l.sections, section) + + l.stripes = sliceclear.Clear(l.stripes) + l.stripesSize = 0 } -func (l *Logs) getMetadataColumn(key string) *dataset.ColumnBuilder { - idx, ok := l.metadataLookup[key] - if !ok { - col, err := dataset.NewColumnBuilder(key, dataset.BuilderOptions{ - PageSizeHint: l.pageSize, - Value: datasetmd.VALUE_TYPE_STRING, - Encoding: datasetmd.ENCODING_TYPE_PLAIN, - Compression: datasetmd.COMPRESSION_TYPE_ZSTD, - }) - if err != nil { - // We control the Value/Encoding tuple so this can't fail; if it does, - // we're left in an unrecoverable state where nothing can be encoded - // properly so we panic. - panic(fmt.Sprintf("creating metadata column: %v", err)) - } +// EstimatedSize returns the estimated size of the Logs section in bytes. +func (l *Logs) EstimatedSize() int { + var size int - l.metadatas = append(l.metadatas, col) - l.metadataLookup[key] = len(l.metadatas) - 1 - return col + size += l.recordsSize + size += l.stripesSize + for _, section := range l.sections { + size += section.Size() } - return l.metadatas[idx] + + return size } // EncodeTo encodes the set of logs to the provided encoder. Before encoding, @@ -179,23 +183,28 @@ func (l *Logs) EncodeTo(enc *encoding.Encoder) error { defer l.Reset() - // TODO(rfratto): handle one section becoming too large. This can happen when - // the number of columns is very wide, due to a lot of metadata columns. - // There are two approaches to handle this: - // - // 1. Split streams into multiple sections. - // 2. Move some columns into an aggregated column which holds multiple label - // keys and values. + // Flush any remaining buffered data. + l.flushRecords() + l.flushSection() - dset, err := l.buildDataset() - if err != nil { - return fmt.Errorf("building dataset: %w", err) - } - cols, err := result.Collect(dset.ListColumns(context.Background())) // dset is in memory; "real" context not needed. - if err != nil { - return fmt.Errorf("listing columns: %w", err) + // TODO(rfratto): handle individual sections having oversized metadata. This + // can happen when the number of columns is very wide, due to a lot of + // metadata columns. + // + // As we're already splitting data into separate sections, the best solution + // for this is to aggregate the lowest cardinality columns into a combined + // column. This will reduce the number of columns in the section and thus the + // metadata size. + for _, section := range l.sections { + if err := l.encodeSection(enc, section); err != nil { + return fmt.Errorf("encoding section: %w", err) + } } + return nil +} + +func (l *Logs) encodeSection(enc *encoding.Encoder, section *table) error { logsEnc, err := enc.OpenLogs() if err != nil { return fmt.Errorf("opening logs section: %w", err) @@ -206,16 +215,14 @@ func (l *Logs) EncodeTo(enc *encoding.Encoder) error { _ = logsEnc.Discard() }() - // Encode our columns. The slice order here *must* match the order in - // [Logs.buildDataset]! { - errs := make([]error, 0, len(cols)) - errs = append(errs, encodeColumn(logsEnc, logsmd.COLUMN_TYPE_STREAM_ID, cols[0])) - errs = append(errs, encodeColumn(logsEnc, logsmd.COLUMN_TYPE_TIMESTAMP, cols[1])) - for _, mdCol := range cols[2 : len(cols)-1] { - errs = append(errs, encodeColumn(logsEnc, logsmd.COLUMN_TYPE_METADATA, mdCol)) + errs := make([]error, 0, len(section.Metadatas)+3) + errs = append(errs, encodeColumn(logsEnc, logsmd.COLUMN_TYPE_STREAM_ID, section.StreamID)) + errs = append(errs, encodeColumn(logsEnc, logsmd.COLUMN_TYPE_TIMESTAMP, section.Timestamp)) + for _, md := range section.Metadatas { + errs = append(errs, encodeColumn(logsEnc, logsmd.COLUMN_TYPE_METADATA, md)) } - errs = append(errs, encodeColumn(logsEnc, logsmd.COLUMN_TYPE_MESSAGE, cols[len(cols)-1])) + errs = append(errs, encodeColumn(logsEnc, logsmd.COLUMN_TYPE_MESSAGE, section.Message)) if err := errors.Join(errs...); err != nil { return fmt.Errorf("encoding columns: %w", err) } @@ -224,51 +231,6 @@ func (l *Logs) EncodeTo(enc *encoding.Encoder) error { return logsEnc.Commit() } -func (l *Logs) buildDataset() (dataset.Dataset, error) { - // Our columns are ordered as follows: - // - // 1. StreamID - // 2. Timestamp - // 3. Metadata columns - // 4. Message - // - // Do *not* change this order without updating [Logs.EncodeTo]! - // - // TODO(rfratto): find a clean way to decorate columns with additional - // metadata so we don't have to rely on order. - columns := make([]*dataset.MemColumn, 0, 3+len(l.metadatas)) - - // Flush never returns an error so we ignore it here to keep the code simple. - // - // TODO(rfratto): remove error return from Flush to clean up code. - streamID, _ := l.streamIDs.Flush() - timestamp, _ := l.timestamps.Flush() - columns = append(columns, streamID, timestamp) - - for _, mdBuilder := range l.metadatas { - mdBuilder.Backfill(l.rows) - - mdColumn, _ := mdBuilder.Flush() - columns = append(columns, mdColumn) - } - - messages, _ := l.messages.Flush() - columns = append(columns, messages) - - // TODO(rfratto): We need to be able to sort the columns first by StreamID - // and then by timestamp, but as it is now this is way too slow; sorting a - // 20MB dataset took several minutes due to the number of page loads - // happening across streams. - // - // Sorting can be made more efficient by: - // - // 1. Separating streams into separate datasets while appending - // 2. Sorting each stream separately - // 3. Combining sorted streams into a single dataset, which will already be - // sorted. - return dataset.FromMemory(columns), nil -} - func encodeColumn(enc *encoding.LogsEncoder, columnType logsmd.ColumnType, column dataset.Column) error { columnEnc, err := enc.OpenColumn(columnType, column.ColumnInfo()) if err != nil { @@ -307,12 +269,15 @@ func encodeColumn(enc *encoding.LogsEncoder, columnType logsmd.ColumnType, colum // Reset resets all state, allowing Logs to be reused. func (l *Logs) Reset() { - l.rows = 0 l.metrics.recordCount.Set(0) - l.streamIDs.Reset() - l.timestamps.Reset() - l.metadatas = l.metadatas[:0] - clear(l.metadataLookup) - l.messages.Reset() + l.records = sliceclear.Clear(l.records) + l.recordsSize = 0 + + l.stripes = sliceclear.Clear(l.stripes) + l.stripeBuffer.Reset() + l.stripesSize = 0 + + l.sections = sliceclear.Clear(l.sections) + l.sectionBuffer.Reset() } diff --git a/pkg/dataobj/internal/sections/logs/logs_test.go b/pkg/dataobj/internal/sections/logs/logs_test.go index 6caa84ca6d..4f14329a77 100644 --- a/pkg/dataobj/internal/sections/logs/logs_test.go +++ b/pkg/dataobj/internal/sections/logs/logs_test.go @@ -15,8 +15,6 @@ import ( ) func Test(t *testing.T) { - t.Skip("Disabled until sorting is reimplemented") - records := []logs.Record{ { StreamID: 1, @@ -44,7 +42,13 @@ func Test(t *testing.T) { }, } - tracker := logs.New(nil, 1024) + opts := logs.Options{ + PageSizeHint: 1024, + BufferSize: 256, + SectionSize: 4096, + } + + tracker := logs.New(nil, opts) for _, record := range records { tracker.Append(record) } diff --git a/pkg/dataobj/internal/sections/logs/table.go b/pkg/dataobj/internal/sections/logs/table.go new file mode 100644 index 0000000000..27508fd511 --- /dev/null +++ b/pkg/dataobj/internal/sections/logs/table.go @@ -0,0 +1,312 @@ +package logs + +import ( + "cmp" + "context" + "fmt" + "slices" + + "github.com/grafana/loki/v3/pkg/dataobj/internal/dataset" + "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd" + "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/logsmd" + "github.com/grafana/loki/v3/pkg/dataobj/internal/result" +) + +// A table is a collection of columns that form a logs section. +type table struct { + StreamID *tableColumn + Timestamp *tableColumn + Metadatas []*tableColumn + Message *tableColumn +} + +type tableColumn struct { + *dataset.MemColumn + + Type logsmd.ColumnType +} + +var _ dataset.Dataset = (*table)(nil) + +// ListColumns implements [dataset.Dataset]. +func (t *table) ListColumns(_ context.Context) result.Seq[dataset.Column] { + return result.Iter(func(yield func(dataset.Column) bool) error { + if !yield(t.StreamID) { + return nil + } + if !yield(t.Timestamp) { + return nil + } + for _, metadata := range t.Metadatas { + if !yield(metadata) { + return nil + } + } + if !yield(t.Message) { + return nil + } + + return nil + }) +} + +// ListPages implements [dataset.Dataset]. +func (t *table) ListPages(ctx context.Context, columns []dataset.Column) result.Seq[dataset.Pages] { + return result.Iter(func(yield func(dataset.Pages) bool) error { + for _, c := range columns { + pages, err := result.Collect(c.ListPages(ctx)) + if err != nil { + return err + } else if !yield(dataset.Pages(pages)) { + return nil + } + } + + return nil + }) +} + +// ReadPages implements [dataset.Dataset]. +func (t *table) ReadPages(ctx context.Context, pages []dataset.Page) result.Seq[dataset.PageData] { + return result.Iter(func(yield func(dataset.PageData) bool) error { + for _, p := range pages { + data, err := p.ReadPage(ctx) + if err != nil { + return err + } else if !yield(data) { + return nil + } + } + + return nil + }) + +} + +// Size returns the total size of the table in bytes. +func (t *table) Size() int { + var size int + + size += t.StreamID.ColumnInfo().CompressedSize + size += t.Timestamp.ColumnInfo().CompressedSize + for _, metadata := range t.Metadatas { + size += metadata.ColumnInfo().CompressedSize + } + size += t.Message.ColumnInfo().CompressedSize + + return size +} + +// A tableBuffer holds a set of column builders used for constructing tables. +// The zero value is ready for use. +type tableBuffer struct { + streamID *dataset.ColumnBuilder + timestamp *dataset.ColumnBuilder + + metadatas []*dataset.ColumnBuilder + metadataLookup map[string]int // map of metadata key to index in metadatas + usedMetadatas map[*dataset.ColumnBuilder]string // metadata with its name. + + message *dataset.ColumnBuilder +} + +// StreamID gets or creates a stream ID column for the buffer. +func (b *tableBuffer) StreamID(pageSize int) *dataset.ColumnBuilder { + if b.streamID != nil { + return b.streamID + } + + col, err := dataset.NewColumnBuilder("", dataset.BuilderOptions{ + PageSizeHint: pageSize, + Value: datasetmd.VALUE_TYPE_INT64, + Encoding: datasetmd.ENCODING_TYPE_DELTA, + Compression: datasetmd.COMPRESSION_TYPE_NONE, + }) + if err != nil { + // We control the Value/Encoding tuple so this can't fail; if it does, + // we're left in an unrecoverable state where nothing can be encoded + // properly so we panic. + panic(fmt.Sprintf("creating stream ID column: %v", err)) + } + + b.streamID = col + return col +} + +// Timestamp gets or creates a timestamp column for the buffer. +func (b *tableBuffer) Timestamp(pageSize int) *dataset.ColumnBuilder { + if b.timestamp != nil { + return b.timestamp + } + + col, err := dataset.NewColumnBuilder("", dataset.BuilderOptions{ + PageSizeHint: pageSize, + Value: datasetmd.VALUE_TYPE_INT64, + Encoding: datasetmd.ENCODING_TYPE_DELTA, + Compression: datasetmd.COMPRESSION_TYPE_NONE, + }) + if err != nil { + // We control the Value/Encoding tuple so this can't fail; if it does, + // we're left in an unrecoverable state where nothing can be encoded + // properly so we panic. + panic(fmt.Sprintf("creating timestamp column: %v", err)) + } + + b.timestamp = col + return col +} + +// Metadata gets or creates a metadata column for the buffer. To remove created +// metadata columns, call [tableBuffer.CleanupMetadatas]. +func (b *tableBuffer) Metadata(key string, pageSize int, compressionOpts dataset.CompressionOptions) *dataset.ColumnBuilder { + if b.usedMetadatas == nil { + b.usedMetadatas = make(map[*dataset.ColumnBuilder]string) + } + + index, ok := b.metadataLookup[key] + if ok { + builder := b.metadatas[index] + b.usedMetadatas[builder] = key + return builder + } + + col, err := dataset.NewColumnBuilder(key, dataset.BuilderOptions{ + PageSizeHint: pageSize, + Value: datasetmd.VALUE_TYPE_STRING, + Encoding: datasetmd.ENCODING_TYPE_PLAIN, + Compression: datasetmd.COMPRESSION_TYPE_ZSTD, + CompressionOptions: compressionOpts, + }) + if err != nil { + // We control the Value/Encoding tuple so this can't fail; if it does, + // we're left in an unrecoverable state where nothing can be encoded + // properly so we panic. + panic(fmt.Sprintf("creating metadata column: %v", err)) + } + + b.metadatas = append(b.metadatas, col) + + if b.metadataLookup == nil { + b.metadataLookup = make(map[string]int) + } + b.metadataLookup[key] = len(b.metadatas) - 1 + b.usedMetadatas[col] = key + return col +} + +// Message gets or creates a message column for the buffer. +func (b *tableBuffer) Message(pageSize int, compressionOpts dataset.CompressionOptions) *dataset.ColumnBuilder { + if b.message != nil { + return b.message + } + + col, err := dataset.NewColumnBuilder("", dataset.BuilderOptions{ + PageSizeHint: pageSize, + Value: datasetmd.VALUE_TYPE_STRING, + Encoding: datasetmd.ENCODING_TYPE_PLAIN, + Compression: datasetmd.COMPRESSION_TYPE_ZSTD, + CompressionOptions: compressionOpts, + }) + if err != nil { + // We control the Value/Encoding tuple so this can't fail; if it does, + // we're left in an unrecoverable state where nothing can be encoded + // properly so we panic. + panic(fmt.Sprintf("creating messages column: %v", err)) + } + + b.message = col + return col +} + +// Reset resets the buffer to its initial state. +func (b *tableBuffer) Reset() { + if b.streamID != nil { + b.streamID.Reset() + } + if b.timestamp != nil { + b.timestamp.Reset() + } + if b.message != nil { + b.message.Reset() + } + for _, md := range b.metadatas { + md.Reset() + } + + // We don't want to keep all metadata columns around forever, so we only + // retain the columns that were used in the last Flush. + var ( + newMetadatas = make([]*dataset.ColumnBuilder, 0, len(b.metadatas)) + newMetadataLookup = make(map[string]int, len(b.metadatas)) + ) + for _, md := range b.metadatas { + if b.usedMetadatas == nil { + break // Nothing was used. + } + + key, used := b.usedMetadatas[md] + if !used { + continue + } + + newMetadatas = append(newMetadatas, md) + newMetadataLookup[key] = len(newMetadatas) - 1 + } + b.metadatas = newMetadatas + b.metadataLookup = newMetadataLookup + clear(b.usedMetadatas) // Reset the used cache for next time. +} + +// Flush flushes the buffer into a table. Flush returns an error if the stream, +// timestamp, or messages column was never appended to. +// +// Only metadata columns that were appended to since the last flush are included in the table. +func (b *tableBuffer) Flush() (*table, error) { + defer b.Reset() + + if b.streamID == nil { + return nil, fmt.Errorf("no stream column") + } else if b.timestamp == nil { + return nil, fmt.Errorf("no timestamp column") + } else if b.message == nil { + return nil, fmt.Errorf("no message column") + } + + var ( + // Flush never returns an error so we ignore it here to keep the code simple. + // + // TODO(rfratto): remove error return from Flush to clean up code. + + streamID, _ = b.streamID.Flush() + timestamp, _ = b.timestamp.Flush() + messages, _ = b.message.Flush() + + metadatas = make([]*tableColumn, 0, len(b.metadatas)) + ) + + for _, metadataBuilder := range b.metadatas { + if b.usedMetadatas == nil { + continue + } else if _, ok := b.usedMetadatas[metadataBuilder]; !ok { + continue + } + + // Each metadata column may have a different number of rows compared to + // other columns. Since adding NULLs isn't free, we don't call Backfill + // here. + metadata, _ := metadataBuilder.Flush() + metadatas = append(metadatas, &tableColumn{metadata, logsmd.COLUMN_TYPE_METADATA}) + } + + // Sort metadata columns by name for consistency. + slices.SortFunc(metadatas, func(a, b *tableColumn) int { + return cmp.Compare(a.ColumnInfo().Name, b.ColumnInfo().Name) + }) + + return &table{ + StreamID: &tableColumn{streamID, logsmd.COLUMN_TYPE_STREAM_ID}, + Timestamp: &tableColumn{timestamp, logsmd.COLUMN_TYPE_TIMESTAMP}, + Metadatas: metadatas, + Message: &tableColumn{messages, logsmd.COLUMN_TYPE_MESSAGE}, + }, nil +} diff --git a/pkg/dataobj/internal/sections/logs/table_build.go b/pkg/dataobj/internal/sections/logs/table_build.go new file mode 100644 index 0000000000..edf74bbfd7 --- /dev/null +++ b/pkg/dataobj/internal/sections/logs/table_build.go @@ -0,0 +1,54 @@ +package logs + +import ( + "cmp" + "slices" + + "github.com/grafana/loki/v3/pkg/dataobj/internal/dataset" +) + +// buildTable builds a table from the set of provided records. The records are +// sorted with [sortRecords] prior to building the table. +func buildTable(buf *tableBuffer, pageSize int, compressionOpts dataset.CompressionOptions, records []Record) *table { + sortRecords(records) + + buf.Reset() + + var ( + streamIDBuilder = buf.StreamID(pageSize) + timestampBuilder = buf.Timestamp(pageSize) + messageBuilder = buf.Message(pageSize, compressionOpts) + ) + + for i, record := range records { + // Append only fails if given out-of-order data, where the provided row + // number is less than the previous row number. That can't happen here, so + // to keep the code readable we ignore the error values. + + _ = streamIDBuilder.Append(i, dataset.Int64Value(record.StreamID)) + _ = timestampBuilder.Append(i, dataset.Int64Value(record.Timestamp.UnixNano())) + _ = messageBuilder.Append(i, dataset.StringValue(record.Line)) + + for _, md := range record.Metadata { + metadataBuilder := buf.Metadata(md.Name, pageSize, compressionOpts) + _ = metadataBuilder.Append(i, dataset.StringValue(md.Value)) + } + } + + table, err := buf.Flush() + if err != nil { + // Unreachable; we always ensure every required column is created. + panic(err) + } + return table +} + +// sortRecords sorts the set of records by stream ID and timestamp. +func sortRecords(records []Record) { + slices.SortFunc(records, func(a, b Record) int { + if res := cmp.Compare(a.StreamID, b.StreamID); res != 0 { + return res + } + return a.Timestamp.Compare(b.Timestamp) + }) +} diff --git a/pkg/dataobj/internal/sections/logs/table_merge.go b/pkg/dataobj/internal/sections/logs/table_merge.go new file mode 100644 index 0000000000..e54c896550 --- /dev/null +++ b/pkg/dataobj/internal/sections/logs/table_merge.go @@ -0,0 +1,151 @@ +package logs + +import ( + "cmp" + "context" + "fmt" + "math" + + "github.com/grafana/loki/v3/pkg/dataobj/internal/dataset" + "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/logsmd" + "github.com/grafana/loki/v3/pkg/dataobj/internal/result" + "github.com/grafana/loki/v3/pkg/util/loser" +) + +// mergeTables merges the provided sorted tables into a new single sorted table +// using k-way merge. +func mergeTables(buf *tableBuffer, pageSize int, compressionOpts dataset.CompressionOptions, tables []*table) (*table, error) { + buf.Reset() + + var ( + streamIDBuilder = buf.StreamID(pageSize) + timestampBuilder = buf.Timestamp(pageSize) + messageBuilder = buf.Message(pageSize, compressionOpts) + ) + + var ( + tableSequences = make([]*tableSequence, 0, len(tables)) + ) + for _, t := range tables { + dsetColumns, err := result.Collect(t.ListColumns(context.Background())) + if err != nil { + return nil, err + } + + seq := dataset.Iter(context.Background(), dsetColumns) + next, stop := result.Pull(seq) + defer stop() + + tableSequences = append(tableSequences, &tableSequence{ + columns: dsetColumns, + + pull: next, stop: stop, + }) + } + + maxValue := result.Value(dataset.Row{ + Index: math.MaxInt, + Values: []dataset.Value{ + dataset.Int64Value(math.MaxInt64), + dataset.Int64Value(math.MaxInt64), + }, + }) + + var rows int + + tree := loser.New(tableSequences, maxValue, tableSequenceValue, rowResultLess, tableSequenceStop) + for tree.Next() { + seq := tree.Winner() + + row, err := tableSequenceValue(seq).Value() + if err != nil { + return nil, err + } + + for i, column := range seq.columns { + // column is guaranteed to be a *tableColumn since we got it from *table. + column := column.(*tableColumn) + + // dataset.Iter returns values in the same order as the number of + // columns. + value := row.Values[i] + + switch column.Type { + case logsmd.COLUMN_TYPE_STREAM_ID: + _ = streamIDBuilder.Append(rows, value) + case logsmd.COLUMN_TYPE_TIMESTAMP: + _ = timestampBuilder.Append(rows, value) + case logsmd.COLUMN_TYPE_METADATA: + columnBuilder := buf.Metadata(column.Info.Name, pageSize, compressionOpts) + _ = columnBuilder.Append(rows, value) + case logsmd.COLUMN_TYPE_MESSAGE: + _ = messageBuilder.Append(rows, value) + default: + return nil, fmt.Errorf("unknown column type %s", column.Type) + } + } + + rows++ + } + + return buf.Flush() +} + +type tableSequence struct { + curValue result.Result[dataset.Row] + + columns []dataset.Column + + pull func() (result.Result[dataset.Row], bool) + stop func() +} + +var _ loser.Sequence = (*tableSequence)(nil) + +func (seq *tableSequence) Next() bool { + val, ok := seq.pull() + seq.curValue = val + return ok +} + +func tableSequenceValue(seq *tableSequence) result.Result[dataset.Row] { return seq.curValue } + +func tableSequenceStop(seq *tableSequence) { seq.stop() } + +func rowResultLess(a, b result.Result[dataset.Row]) bool { + var ( + aRow, aErr = a.Value() + bRow, bErr = b.Value() + ) + + // Put errors first so we return errors early. + if aErr != nil { + return true + } else if bErr != nil { + return false + } + + return compareRows(aRow, bRow) < 0 +} + +// compareRows compares two rows by their first two columns. compareRows panics +// if a or b doesn't have at least two columns, if the first column isn't a +// int64-encoded stream ID, or if the second column isn't an int64-encoded +// timestamp. +func compareRows(a, b dataset.Row) int { + // The first two columns of each row are *always* stream ID and timestamp. + // + // TODO(rfratto): Can we find a safer way of doing this? + var ( + aStreamID = a.Values[0].Int64() + bStreamID = b.Values[0].Int64() + + aTimestamp = a.Values[1].Int64() + bTimestamp = b.Values[1].Int64() + ) + + if res := cmp.Compare(aStreamID, bStreamID); res != 0 { + return res + } + return cmp.Compare(aTimestamp, bTimestamp) +} diff --git a/pkg/dataobj/internal/sections/logs/table_test.go b/pkg/dataobj/internal/sections/logs/table_test.go new file mode 100644 index 0000000000..91f25c141d --- /dev/null +++ b/pkg/dataobj/internal/sections/logs/table_test.go @@ -0,0 +1,81 @@ +package logs + +import ( + "context" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/v3/pkg/dataobj/internal/dataset" + "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd" + "github.com/grafana/loki/v3/pkg/dataobj/internal/result" +) + +func Test_table_metadataCleanup(t *testing.T) { + var buf tableBuffer + initBuffer(&buf) + + _ = buf.Metadata("foo", 1024, dataset.CompressionOptions{}) + _ = buf.Metadata("bar", 1024, dataset.CompressionOptions{}) + + table, err := buf.Flush() + require.NoError(t, err) + require.Equal(t, 2, len(table.Metadatas)) + + initBuffer(&buf) + _ = buf.Metadata("bar", 1024, dataset.CompressionOptions{}) + + table, err = buf.Flush() + require.NoError(t, err) + require.Equal(t, 1, len(table.Metadatas)) + require.Equal(t, "bar", table.Metadatas[0].Info.Name) +} + +func initBuffer(buf *tableBuffer) { + buf.StreamID(1024) + buf.Timestamp(1024) + buf.Message(1024, dataset.CompressionOptions{}) +} + +func Test_mergeTables(t *testing.T) { + var buf tableBuffer + + var ( + tableA = buildTable(&buf, 1024, dataset.CompressionOptions{}, []Record{ + {StreamID: 1, Timestamp: time.Unix(1, 0), Line: "hello"}, + {StreamID: 2, Timestamp: time.Unix(2, 0), Line: "are"}, + {StreamID: 3, Timestamp: time.Unix(3, 0), Line: "goodbye"}, + }) + + tableB = buildTable(&buf, 1024, dataset.CompressionOptions{}, []Record{ + {StreamID: 1, Timestamp: time.Unix(2, 0), Line: "world"}, + {StreamID: 3, Timestamp: time.Unix(1, 0), Line: "you"}, + }) + + tableC = buildTable(&buf, 1024, dataset.CompressionOptions{}, []Record{ + {StreamID: 2, Timestamp: time.Unix(1, 0), Line: "how"}, + {StreamID: 3, Timestamp: time.Unix(2, 0), Line: "doing?"}, + }) + ) + + mergedTable, err := mergeTables(&buf, 1024, dataset.CompressionOptions{}, []*table{tableA, tableB, tableC}) + require.NoError(t, err) + + mergedColumns, err := result.Collect(mergedTable.ListColumns(context.Background())) + require.NoError(t, err) + + var actual []string + + for result := range dataset.Iter(context.Background(), mergedColumns) { + row, err := result.Value() + require.NoError(t, err) + require.Len(t, row.Values, 3) + require.Equal(t, datasetmd.VALUE_TYPE_STRING, row.Values[2].Type()) + + actual = append(actual, row.Values[2].String()) + } + + require.Equal(t, "hello world how are you doing? goodbye", strings.Join(actual, " ")) +} diff --git a/pkg/dataobj/internal/sections/streams/streams.go b/pkg/dataobj/internal/sections/streams/streams.go index 3fc583342b..f9f4aeabd8 100644 --- a/pkg/dataobj/internal/sections/streams/streams.go +++ b/pkg/dataobj/internal/sections/streams/streams.go @@ -17,6 +17,7 @@ import ( "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd" "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/streamsmd" "github.com/grafana/loki/v3/pkg/dataobj/internal/streamio" + "github.com/grafana/loki/v3/pkg/dataobj/internal/util/sliceclear" ) // A Stream is an individual stream within a data object. @@ -333,7 +334,7 @@ func encodeColumn(enc *encoding.StreamsEncoder, columnType streamsmd.ColumnType, func (s *Streams) Reset() { s.lastID.Store(0) clear(s.lookup) - s.ordered = s.ordered[:0] + s.ordered = sliceclear.Clear(s.ordered) s.currentLabelsSize = 0 s.globalMinTimestamp = time.Time{} s.globalMaxTimestamp = time.Time{} diff --git a/pkg/dataobj/internal/util/bufpool/bucket.go b/pkg/dataobj/internal/util/bufpool/bucket.go new file mode 100644 index 0000000000..c64ec44b9e --- /dev/null +++ b/pkg/dataobj/internal/util/bufpool/bucket.go @@ -0,0 +1,72 @@ +package bufpool + +import ( + "bytes" + "math" + "sync" +) + +type bucket struct { + size uint64 + pool sync.Pool +} + +var buckets []*bucket + +// Bucket sizes are exponentially sized from 1KiB to 64GiB. The max boundary is +// picked arbitrarily. +const ( + bucketMin uint64 = 1024 + bucketMax uint64 = 1 << 36 /* 64 GiB */ +) + +func init() { + nextBucket := bucketMin + + for { + // Capture the size so New refers to the correct size per bucket. + buckets = append(buckets, &bucket{ + size: nextBucket, + pool: sync.Pool{ + New: func() any { + // We don't preallocate the buffer here; this will help a bucket pool + // to be filled with buffers of varying sizes within that bucket. + // + // If we *did* preallocate the buffer, then any call to + // [bytes.Buffer.Grow] beyond the bucket size would immediately cause + // it to double in size, placing it in the next bucket. + return bytes.NewBuffer(nil) + }, + }, + }) + + // Exponentially grow the bucket size up to bucketMax. + nextBucket *= 2 + if nextBucket > bucketMax { + break + } + } + + // Catch-all for buffers bigger than bucketMax. + buckets = append(buckets, &bucket{ + size: math.MaxUint64, + pool: sync.Pool{ + New: func() any { + return bytes.NewBuffer(nil) + }, + }, + }) +} + +// findBucket returns the first bucket that is large enough to hold size. +func findBucket(size uint64) *bucket { + for _, b := range buckets { + if b.size >= size { + return b + } + } + + // We shouldn't be able to reach this point; the final bucket is sized for + // anything, but if we do reach this we'll return the last bucket anyway. + return buckets[len(buckets)-1] +} diff --git a/pkg/dataobj/internal/util/bufpool/bufpool.go b/pkg/dataobj/internal/util/bufpool/bufpool.go new file mode 100644 index 0000000000..7d048767ad --- /dev/null +++ b/pkg/dataobj/internal/util/bufpool/bufpool.go @@ -0,0 +1,41 @@ +// Package bufpool offers a pool of [*bytes.Buffer] objects that are placed +// into exponentially sized buckets. +// +// Bucketing prevents the memory cost of a pool from permanently increasing +// when a large buffer is placed into the pool. +package bufpool + +import ( + "bytes" +) + +// Get returns a buffer from the pool for the given size. Returned buffers are +// reset and ready for writes. +// +// The capacity of the returned buffer is guaranteed to be at least size. +func Get(size int) *bytes.Buffer { + if size < 0 { + size = 0 + } + + b := findBucket(uint64(size)) + + buf := b.pool.Get().(*bytes.Buffer) + buf.Reset() + buf.Grow(size) + return buf +} + +// Put returns a buffer to the pool. The buffer is placed into an appropriate +// bucket based on its current capacity. +func Put(buf *bytes.Buffer) { + if buf == nil { + return + } + + b := findBucket(uint64(buf.Cap())) + if b == nil { + return + } + b.pool.Put(buf) +} diff --git a/pkg/dataobj/internal/util/bufpool/bufpool_test.go b/pkg/dataobj/internal/util/bufpool/bufpool_test.go new file mode 100644 index 0000000000..9727bab02b --- /dev/null +++ b/pkg/dataobj/internal/util/bufpool/bufpool_test.go @@ -0,0 +1,36 @@ +package bufpool + +import ( + "fmt" + "math" + "testing" + + "github.com/stretchr/testify/require" +) + +func Test_findBucket(t *testing.T) { + tt := []struct { + size uint64 + expect uint64 + }{ + {size: 0, expect: 1024}, + {size: 512, expect: 1024}, + {size: 1024, expect: 1024}, + {size: 1025, expect: 2048}, + {size: (1 << 36), expect: (1 << 36)}, + {size: (1 << 37), expect: math.MaxUint64}, + } + + for _, tc := range tt { + t.Run(fmt.Sprintf("size=%d", tc.size), func(t *testing.T) { + got := findBucket(tc.size).size + require.Equal(t, tc.expect, got) + }) + } +} + +func Test(t *testing.T) { + buf := Get(1_500_000) + require.NotNil(t, buf) + require.Less(t, buf.Cap(), 2<<20, "buffer should not have grown to next bucket size") +} diff --git a/pkg/dataobj/internal/util/sliceclear/sliceclear.go b/pkg/dataobj/internal/util/sliceclear/sliceclear.go new file mode 100644 index 0000000000..6caaf52cbc --- /dev/null +++ b/pkg/dataobj/internal/util/sliceclear/sliceclear.go @@ -0,0 +1,11 @@ +// Package sliceclear provides a way to clear and truncate the length of a +// slice. +package sliceclear + +// Clear zeroes out all values in s and returns s[:0]. Clear allows memory of +// previous elements in the slice to be reclained by the garbage collector +// while still allowing the underlying slice memory to be reused. +func Clear[Slice ~[]E, E any](s Slice) Slice { + clear(s) + return s[:0] +} diff --git a/pkg/dataobj/internal/util/sliceclear/sliceclear_test.go b/pkg/dataobj/internal/util/sliceclear/sliceclear_test.go new file mode 100644 index 0000000000..fd7b7545b8 --- /dev/null +++ b/pkg/dataobj/internal/util/sliceclear/sliceclear_test.go @@ -0,0 +1,28 @@ +package sliceclear_test + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/v3/pkg/dataobj/internal/util/sliceclear" +) + +func Test(t *testing.T) { + s := make([]*int, 0, 10) + for i := 0; i < 10; i++ { + s = append(s, new(int)) + } + + s = sliceclear.Clear(s) + require.Equal(t, 10, cap(s)) + require.Equal(t, 0, len(s)) + + // Reexpand s to its full capacity and ensure that all elements have been + // zeroed out. + full := s[:cap(s)] + require.Equal(t, 10, len(full)) + for i := 0; i < 10; i++ { + require.Nil(t, full[i], "element %d was not zeroed; this can cause memory leaks", i) + } +}