diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index 2d495f9f34..288d3a1423 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -845,6 +845,11 @@ dataobj: # CLI flag: -dataobj-consumer.buffer-size [buffer_size: | default = 16MiB] + # The maximum number of stripes to merge into a section at once. Must be + # greater than 1. + # CLI flag: -dataobj-consumer.section-stripe-merge-limit + [section_stripe_merge_limit: | default = 2] + uploader: # The size of the SHA prefix to use for generating object storage keys for # data objects. diff --git a/pkg/dataobj/builder.go b/pkg/dataobj/builder.go index 3a76bfff9c..cfb1641f41 100644 --- a/pkg/dataobj/builder.go +++ b/pkg/dataobj/builder.go @@ -52,6 +52,12 @@ type BuilderConfig struct { // BufferSize configures the size of the buffer used to accumulate // uncompressed logs in memory prior to sorting. BufferSize flagext.Bytes `yaml:"buffer_size"` + + // SectionStripeMergeLimit configures the number of stripes to merge at once when + // flushing stripes into a section. MergeSize must be larger than 1. Lower + // values of MergeSize trade off lower memory overhead for higher time spent + // merging. + SectionStripeMergeLimit int `yaml:"section_stripe_merge_limit"` } // RegisterFlagsWithPrefix registers flags with the given prefix. @@ -65,6 +71,7 @@ func (cfg *BuilderConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet f.Var(&cfg.TargetObjectSize, prefix+"target-object-size", "The size of the target object to use for the data object builder.") f.Var(&cfg.TargetSectionSize, prefix+"target-section-size", "Configures a maximum size for sections, for sections that support it.") f.Var(&cfg.BufferSize, prefix+"buffer-size", "The size of the buffer to use for sorting logs.") + f.IntVar(&cfg.SectionStripeMergeLimit, prefix+"section-stripe-merge-limit", 2, "The maximum number of stripes to merge into a section at once. Must be greater than 1.") } // Validate validates the BuilderConfig. @@ -89,6 +96,10 @@ func (cfg *BuilderConfig) Validate() error { errs = append(errs, errors.New("SectionSize must be greater than 0 and less than or equal to TargetObjectSize")) } + if cfg.SectionStripeMergeLimit < 2 { + errs = append(errs, errors.New("LogsMergeStripesMax must be greater than 1")) + } + return errors.Join(errs...) } @@ -152,9 +163,10 @@ func NewBuilder(cfg BuilderConfig) (*Builder, error) { streams: streams.New(metrics.streams, int(cfg.TargetPageSize)), logs: logs.New(metrics.logs, logs.Options{ - PageSizeHint: int(cfg.TargetPageSize), - BufferSize: int(cfg.BufferSize), - SectionSize: int(cfg.TargetSectionSize), + PageSizeHint: int(cfg.TargetPageSize), + BufferSize: int(cfg.BufferSize), + SectionSize: int(cfg.TargetSectionSize), + StripeMergeLimit: cfg.SectionStripeMergeLimit, }), }, nil } diff --git a/pkg/dataobj/builder_test.go b/pkg/dataobj/builder_test.go index 20e9d0db9b..49530db7eb 100644 --- a/pkg/dataobj/builder_test.go +++ b/pkg/dataobj/builder_test.go @@ -21,6 +21,8 @@ var testBuilderConfig = BuilderConfig{ TargetSectionSize: 4096, BufferSize: 2048 * 8, + + SectionStripeMergeLimit: 2, } func TestBuilder(t *testing.T) { diff --git a/pkg/dataobj/consumer/partition_processor_test.go b/pkg/dataobj/consumer/partition_processor_test.go index 011abac684..f2c2926df7 100644 --- a/pkg/dataobj/consumer/partition_processor_test.go +++ b/pkg/dataobj/consumer/partition_processor_test.go @@ -97,6 +97,8 @@ var testBuilderConfig = dataobj.BuilderConfig{ TargetSectionSize: 4096, BufferSize: 2048 * 8, + + SectionStripeMergeLimit: 2, } // TestIdleFlush tests the idle flush behavior of the partition processor diff --git a/pkg/dataobj/internal/dataset/column_reader.go b/pkg/dataobj/internal/dataset/column_reader.go index 43d3bacc7b..5bb5ebec36 100644 --- a/pkg/dataobj/internal/dataset/column_reader.go +++ b/pkg/dataobj/internal/dataset/column_reader.go @@ -188,6 +188,7 @@ func (cr *columnReader) Reset(column Column) { cr.ranges = sliceclear.Clear(cr.ranges) if cr.reader != nil { + // Resetting takes the place of calling Close here. cr.reader.Reset(nil, column.ColumnInfo().Type, column.ColumnInfo().Compression) } diff --git a/pkg/dataobj/internal/dataset/page.go b/pkg/dataobj/internal/dataset/page.go index 004c16128b..6bc26a20ef 100644 --- a/pkg/dataobj/internal/dataset/page.go +++ b/pkg/dataobj/internal/dataset/page.go @@ -120,6 +120,7 @@ func (p *MemPage) reader(compression datasetmd.CompressionType) (presence io.Rea // directly. zr = zstdPool.New().(*zstdWrapper) } + return bitmapReader, &closerFunc{Reader: zr, onClose: func() error { _ = zr.Reset(nil) // Allow releasing the buffer. zstdPool.Put(zr) @@ -168,7 +169,9 @@ var zstdPool = sync.Pool{ // See doc comment on [zstdWrapper] for why we're doing this. zw := &zstdWrapper{zr} - runtime.AddCleanup(zw, func(zr *zstd.Decoder) { zr.Close() }, zr) + runtime.AddCleanup(zw, func(zr *zstd.Decoder) { + zr.Close() + }, zr) return zw }, diff --git a/pkg/dataobj/internal/dataset/page_reader.go b/pkg/dataobj/internal/dataset/page_reader.go index 05b5168323..2e5f698968 100644 --- a/pkg/dataobj/internal/dataset/page_reader.go +++ b/pkg/dataobj/internal/dataset/page_reader.go @@ -92,6 +92,11 @@ func (pr *pageReader) read(v []Value) (n int, err error) { if err != nil && !errors.Is(err, io.EOF) { return n, err } else if count == 0 && errors.Is(err, io.EOF) { + // If we've hit EOF, we can immediately close the inner reader to release + // any resources back, rather than waiting for the next call to + // [pageReader.init] to do it. + _ = pr.Close() + return n, io.EOF } else if count == 0 { return 0, nil @@ -166,6 +171,13 @@ func reuseValuesBuffer(dst []Value, src []Value) []Value { } func (pr *pageReader) init(ctx context.Context) error { + // Close any existing reader from a previous pageReader init. Even though + // this also happens in [pageReader.Close], we want to do it here as well in + // case we seeked backwards in a file. + if err := pr.Close(); err != nil { + return fmt.Errorf("closing previous page: %w", err) + } + data, err := pr.page.ReadPage(ctx) if err != nil { return err @@ -181,11 +193,6 @@ func (pr *pageReader) init(ctx context.Context) error { return fmt.Errorf("opening page for reading: %w", err) } - // Close any existing reader from a previous pageReader init. - if err := pr.Close(); err != nil { - return fmt.Errorf("closing previous page: %w", err) - } - if pr.presenceDec == nil { pr.presenceDec = newBitmapDecoder(bufio.NewReader(presenceReader)) } else { @@ -274,6 +281,10 @@ func (pr *pageReader) Reset(page Page, value datasetmd.ValueType, compression da pr.pageRow = 0 pr.nextRow = 0 + + // Close the underlying reader if one is open so resources get released + // sooner. + _ = pr.Close() } // Close closes the pageReader. Closed pageReaders can be reused by calling diff --git a/pkg/dataobj/internal/dataset/reader.go b/pkg/dataobj/internal/dataset/reader.go index 02cce2d507..57d233bd7f 100644 --- a/pkg/dataobj/internal/dataset/reader.go +++ b/pkg/dataobj/internal/dataset/reader.go @@ -292,7 +292,9 @@ func (r *Reader) Reset(opts ReaderOptions) { r.opts = opts // There's not much work Reset can do without a context, since it needs to - // retrieve page info. We'll defer this work to an init function. + // retrieve page info. We'll defer this work to an init function. This also + // unfortunately means that we might not reset page readers until the first + // call to Read. if r.origColumnLookup == nil { r.origColumnLookup = make(map[Column]int, len(opts.Columns)) } diff --git a/pkg/dataobj/internal/dataset/reader_basic.go b/pkg/dataobj/internal/dataset/reader_basic.go index a390aaa640..c641ec354f 100644 --- a/pkg/dataobj/internal/dataset/reader_basic.go +++ b/pkg/dataobj/internal/dataset/reader_basic.go @@ -326,7 +326,8 @@ func (pr *basicReader) Reset(columns []Column) { clear(pr.columnLookup) } - // Reset existing readers. + // Reset existing readers, which takes the place of otherwise closing + // existing ones. pr.columns = columns for i := 0; i < len(pr.readers) && i < len(columns); i++ { pr.readers[i].Reset(columns[i]) @@ -339,14 +340,23 @@ func (pr *basicReader) Reset(columns []Column) { pr.columnLookup[columns[i]] = i } - // Clear out remaining readers. This needs to clear beyond the final length - // of the pr.readers slice (up to its full capacity) so elements beyond the - // length can be garbage collected. + // Close and clear out remaining readers. This needs to clear beyond the + // final length of the pr.readers slice (up to its full capacity) so elements + // beyond the length can be garbage collected. pr.readers = pr.readers[:len(columns)] - clear(pr.readers[len(columns):cap(pr.readers)]) + closeAndClear(pr.readers[len(columns):cap(pr.readers)]) pr.nextRow = 0 } +func closeAndClear(r []*columnReader) { + for _, c := range r { + if c != nil { + c.Close() + } + } + clear(r) +} + // Close closes the basicReader. Closed basicReaders can be reused by calling // [basicReader.Reset]. func (pr *basicReader) Close() error { diff --git a/pkg/dataobj/internal/sections/logs/iter.go b/pkg/dataobj/internal/sections/logs/iter.go index d8ba2a7195..ed054622a1 100644 --- a/pkg/dataobj/internal/sections/logs/iter.go +++ b/pkg/dataobj/internal/sections/logs/iter.go @@ -69,6 +69,7 @@ func IterSection(ctx context.Context, dec encoding.LogsDecoder, section *filemd. Dataset: dset, Columns: columns, }) + defer r.Close() var rows [1]dataset.Row for { diff --git a/pkg/dataobj/internal/sections/logs/logs.go b/pkg/dataobj/internal/sections/logs/logs.go index b95e576515..8184f37d37 100644 --- a/pkg/dataobj/internal/sections/logs/logs.go +++ b/pkg/dataobj/internal/sections/logs/logs.go @@ -39,6 +39,14 @@ type Options struct { // section. If the section size is exceeded, multiple sections will be // created. SectionSize int + + // StripeMergeLimit is the maximum number of stripes to merge at once when + // flushing stripes into a section. StripeMergeLimit must be larger than 1. + // + // Lower values of StripeMergeLimit reduce the memory overhead of merging but + // increase time spent merging. Higher values of StripeMergeLimit increase + // memory overhead but reduce time spent merging. + StripeMergeLimit int } // Logs accumulate a set of [Record]s within a data object. @@ -145,12 +153,11 @@ func (l *Logs) flushSection() { Zstd: []zstd.EOption{zstd.WithEncoderLevel(zstd.SpeedDefault)}, } - section, err := mergeTables(&l.sectionBuffer, l.opts.PageSizeHint, compressionOpts, l.stripes) + section, err := mergeTablesIncremental(&l.sectionBuffer, l.opts.PageSizeHint, compressionOpts, l.stripes, l.opts.StripeMergeLimit) if err != nil { // We control the input to mergeTables, so this should never happen. panic(fmt.Sprintf("merging tables: %v", err)) } - l.sections = append(l.sections, section) l.stripes = sliceclear.Clear(l.stripes) diff --git a/pkg/dataobj/internal/sections/logs/logs_test.go b/pkg/dataobj/internal/sections/logs/logs_test.go index 46b568a53b..7ec631aa4b 100644 --- a/pkg/dataobj/internal/sections/logs/logs_test.go +++ b/pkg/dataobj/internal/sections/logs/logs_test.go @@ -36,9 +36,10 @@ func Test(t *testing.T) { } opts := logs.Options{ - PageSizeHint: 1024, - BufferSize: 256, - SectionSize: 4096, + PageSizeHint: 1024, + BufferSize: 256, + SectionSize: 4096, + StripeMergeLimit: 2, } tracker := logs.New(nil, opts) diff --git a/pkg/dataobj/internal/sections/logs/table_merge.go b/pkg/dataobj/internal/sections/logs/table_merge.go index 9fa16402db..00f41f39b8 100644 --- a/pkg/dataobj/internal/sections/logs/table_merge.go +++ b/pkg/dataobj/internal/sections/logs/table_merge.go @@ -14,6 +14,42 @@ import ( "github.com/grafana/loki/v3/pkg/util/loser" ) +// mergeTablesIncremental incrementally merges the provides sorted tables into +// a single table. Incremental merging limits memory overhead as only mergeSize +// tables are open at a time. +// +// mergeTablesIncremental panics if maxMergeSize is less than 2. +func mergeTablesIncremental(buf *tableBuffer, pageSize int, compressionOpts dataset.CompressionOptions, tables []*table, maxMergeSize int) (*table, error) { + if maxMergeSize < 2 { + panic("mergeTablesIncremental: merge size must be at least 2, got " + fmt.Sprint(maxMergeSize)) + } + + // Even if there's only one table, we still pass to mergeTables to ensure + // it's compressed with compressionOpts. + if len(tables) == 1 { + return mergeTables(buf, pageSize, compressionOpts, tables) + } + + in := tables + + for len(in) > 1 { + var out []*table + + for i := 0; i < len(in); i += maxMergeSize { + set := in[i:min(i+maxMergeSize, len(in))] + merged, err := mergeTables(buf, pageSize, compressionOpts, set) + if err != nil { + return nil, err + } + out = append(out, merged) + } + + in = out + } + + return in[0], nil +} + // mergeTables merges the provided sorted tables into a new single sorted table // using k-way merge. func mergeTables(buf *tableBuffer, pageSize int, compressionOpts dataset.CompressionOptions, tables []*table) (*table, error) { @@ -58,6 +94,8 @@ func mergeTables(buf *tableBuffer, pageSize int, compressionOpts dataset.Compres var rows int tree := loser.New(tableSequences, maxValue, tableSequenceValue, rowResultLess, tableSequenceStop) + defer tree.Close() + for tree.Next() { seq := tree.Winner() diff --git a/pkg/dataobj/internal/sections/streams/iter.go b/pkg/dataobj/internal/sections/streams/iter.go index b518705927..1f4417bd31 100644 --- a/pkg/dataobj/internal/sections/streams/iter.go +++ b/pkg/dataobj/internal/sections/streams/iter.go @@ -67,6 +67,7 @@ func IterSection(ctx context.Context, dec encoding.StreamsDecoder, section *file Dataset: dset, Columns: columns, }) + defer r.Close() var rows [1]dataset.Row for { diff --git a/pkg/dataobj/logs_reader.go b/pkg/dataobj/logs_reader.go index 764bc4b89b..87d4a72844 100644 --- a/pkg/dataobj/logs_reader.go +++ b/pkg/dataobj/logs_reader.go @@ -235,6 +235,15 @@ func (r *LogsReader) Reset(obj *Object, sectionIndex int) { // call to Read. } +// Close closes the LogsReader and releases any resources it holds. Closed +// LogsReaders can be reused by calling [LogsReader.Reset]. +func (r *LogsReader) Close() error { + if r.reader != nil { + return r.reader.Close() + } + return nil +} + func streamIDPredicate(ids iter.Seq[int64], columns []dataset.Column, columnDesc []*logsmd.ColumnDesc) dataset.Predicate { var res dataset.Predicate diff --git a/pkg/dataobj/logs_reader_test.go b/pkg/dataobj/logs_reader_test.go index c37114eabc..b9a70d713b 100644 --- a/pkg/dataobj/logs_reader_test.go +++ b/pkg/dataobj/logs_reader_test.go @@ -54,9 +54,10 @@ func TestLogsReader(t *testing.T) { // Build with many pages but one section. obj := buildLogsObject(t, logs.Options{ - PageSizeHint: 1, - BufferSize: 1, - SectionSize: 1024, + PageSizeHint: 1, + BufferSize: 1, + SectionSize: 1024, + StripeMergeLimit: 2, }) md, err := obj.Metadata(context.Background()) require.NoError(t, err) @@ -78,9 +79,10 @@ func TestLogsReader_MatchStreams(t *testing.T) { // Build with many pages but one section. obj := buildLogsObject(t, logs.Options{ - PageSizeHint: 1, - BufferSize: 1, - SectionSize: 1024, + PageSizeHint: 1, + BufferSize: 1, + SectionSize: 1024, + StripeMergeLimit: 2, }) md, err := obj.Metadata(context.Background()) require.NoError(t, err) @@ -102,9 +104,10 @@ func TestLogsReader_AddMetadataMatcher(t *testing.T) { // Build with many pages but one section. obj := buildLogsObject(t, logs.Options{ - PageSizeHint: 1, - BufferSize: 1, - SectionSize: 1024, + PageSizeHint: 1, + BufferSize: 1, + SectionSize: 1024, + StripeMergeLimit: 2, }) md, err := obj.Metadata(context.Background()) require.NoError(t, err) @@ -126,9 +129,10 @@ func TestLogsReader_AddMetadataFilter(t *testing.T) { // Build with many pages but one section. obj := buildLogsObject(t, logs.Options{ - PageSizeHint: 1, - BufferSize: 1, - SectionSize: 1024, + PageSizeHint: 1, + BufferSize: 1, + SectionSize: 1024, + StripeMergeLimit: 2, }) md, err := obj.Metadata(context.Background()) require.NoError(t, err) @@ -195,6 +199,8 @@ func BenchmarkLogsReader(b *testing.B) { TargetSectionSize: 4 * 1024 * 1024, BufferSize: 16 * 1024 * 1024, TargetPageSize: 2 * 1024 * 1024, + + SectionStripeMergeLimit: 2, } builder, err := dataobj.NewBuilder(opts) diff --git a/pkg/dataobj/metastore/object.go b/pkg/dataobj/metastore/object.go index 788c3bef3a..bf7d2c8ad6 100644 --- a/pkg/dataobj/metastore/object.go +++ b/pkg/dataobj/metastore/object.go @@ -302,9 +302,12 @@ func forEachStream(ctx context.Context, object *dataobj.Object, predicate dataob return err } + var reader dataobj.StreamsReader + defer reader.Close() + streams := make([]dataobj.Stream, 1024) for i := 0; i < md.StreamsSections; i++ { - reader := dataobj.NewStreamsReader(object, i) + reader.Reset(object, i) if predicate != nil { err := reader.SetPredicate(predicate) if err != nil { diff --git a/pkg/dataobj/metastore/object_test.go b/pkg/dataobj/metastore/object_test.go index 607becf108..728ceaf433 100644 --- a/pkg/dataobj/metastore/object_test.go +++ b/pkg/dataobj/metastore/object_test.go @@ -237,10 +237,11 @@ func newTestDataBuilder(t *testing.T, tenantID string) *testDataBuilder { bucket := objstore.NewInMemBucket() builder, err := dataobj.NewBuilder(dataobj.BuilderConfig{ - TargetPageSize: 1024 * 1024, // 1MB - TargetObjectSize: 10 * 1024 * 1024, // 10MB - TargetSectionSize: 1024 * 1024, // 1MB - BufferSize: 1024 * 1024, // 1MB + TargetPageSize: 1024 * 1024, // 1MB + TargetObjectSize: 10 * 1024 * 1024, // 10MB + TargetSectionSize: 1024 * 1024, // 1MB + BufferSize: 1024 * 1024, // 1MB + SectionStripeMergeLimit: 2, }) require.NoError(t, err) diff --git a/pkg/dataobj/metastore/updater.go b/pkg/dataobj/metastore/updater.go index c115261b0e..750baff415 100644 --- a/pkg/dataobj/metastore/updater.go +++ b/pkg/dataobj/metastore/updater.go @@ -25,6 +25,8 @@ var metastoreBuilderCfg = dataobj.BuilderConfig{ TargetPageSize: 4 * 1024 * 1024, BufferSize: 32 * 1024 * 1024, // 8x page size TargetSectionSize: 4 * 1024 * 1024, // object size / 8 + + SectionStripeMergeLimit: 2, } type Updater struct { @@ -160,10 +162,13 @@ func (m *Updater) readFromExisting(ctx context.Context, object *dataobj.Object) return errors.Wrap(err, "resolving object metadata") } + var streamsReader dataobj.StreamsReader + defer streamsReader.Close() + // Read streams from existing metastore object and write them to the builder for the new object streams := make([]dataobj.Stream, 100) for i := 0; i < si.StreamsSections; i++ { - streamsReader := dataobj.NewStreamsReader(object, i) + streamsReader.Reset(object, i) for n, err := streamsReader.Read(ctx, streams); n > 0; n, err = streamsReader.Read(ctx, streams) { if err != nil && err != io.EOF { return errors.Wrap(err, "reading streams") diff --git a/pkg/dataobj/querier/metadata.go b/pkg/dataobj/querier/metadata.go index 5332cebb68..50ee62cfad 100644 --- a/pkg/dataobj/querier/metadata.go +++ b/pkg/dataobj/querier/metadata.go @@ -185,6 +185,7 @@ func (sp *streamProcessor) ProcessParallel(ctx context.Context, onNewStream func } defer func() { for _, reader := range readers { + _ = reader.Close() streamReaderPool.Put(reader) } }() diff --git a/pkg/dataobj/querier/store.go b/pkg/dataobj/querier/store.go index d50208a559..b5e7d4e54d 100644 --- a/pkg/dataobj/querier/store.go +++ b/pkg/dataobj/querier/store.go @@ -442,8 +442,10 @@ func shardObjects( } func (s *shardedObject) reset() { + _ = s.streamReader.Close() streamReaderPool.Put(s.streamReader) for i, reader := range s.logReaders { + _ = reader.Close() logReaderPool.Put(reader) s.logReaders[i] = nil } diff --git a/pkg/dataobj/querier/store_test.go b/pkg/dataobj/querier/store_test.go index 68cf492e50..3cc127ae6f 100644 --- a/pkg/dataobj/querier/store_test.go +++ b/pkg/dataobj/querier/store_test.go @@ -470,6 +470,8 @@ func newTestDataBuilder(t *testing.T, tenantID string) *testDataBuilder { TargetObjectSize: 10 * 1024 * 1024, // 10MB TargetSectionSize: 1024 * 1024, // 1MB BufferSize: 1024 * 1024, // 1MB + + SectionStripeMergeLimit: 2, }) require.NoError(t, err) diff --git a/pkg/dataobj/streams_reader.go b/pkg/dataobj/streams_reader.go index 6659103dfb..fc321b3597 100644 --- a/pkg/dataobj/streams_reader.go +++ b/pkg/dataobj/streams_reader.go @@ -197,6 +197,15 @@ func (r *StreamsReader) Reset(obj *Object, sectionIndex int) { // call to Read. } +// Close closes the StreamsReader and releases any resources it holds. Closed +// StreamsReaders can be reused by calling [StreamsReader.Reset]. +func (r *StreamsReader) Close() error { + if r.reader != nil { + return r.reader.Close() + } + return nil +} + func translateStreamsPredicate(p StreamsPredicate, columns []dataset.Column, columnDesc []*streamsmd.ColumnDesc) dataset.Predicate { if p == nil { return nil diff --git a/pkg/logql/bench/store_dataobj.go b/pkg/logql/bench/store_dataobj.go index 8906ea4609..cc420c6904 100644 --- a/pkg/logql/bench/store_dataobj.go +++ b/pkg/logql/bench/store_dataobj.go @@ -59,6 +59,8 @@ func NewDataObjStore(dir, tenantID string) (*DataObjStore, error) { TargetObjectSize: 128 * 1024 * 1024, // 128MB TargetSectionSize: 16 * 1024 * 1024, // 16MB BufferSize: 16 * 1024 * 1024, // 16MB + + SectionStripeMergeLimit: 2, }) if err != nil { return nil, fmt.Errorf("failed to create builder: %w", err)