From 3fbebd3bc4343b2911466fa8b321fe229bef4d5a Mon Sep 17 00:00:00 2001 From: Robert Fratto Date: Tue, 13 May 2025 14:26:44 -0400 Subject: [PATCH] chore(dataobj): migrate to section-relative offsets (#17704) --- pkg/dataobj/explorer/inspect.go | 8 +- pkg/dataobj/internal/encoding/dataset_logs.go | 11 +- .../internal/encoding/dataset_streams.go | 8 +- pkg/dataobj/internal/encoding/decoder.go | 30 +- .../internal/encoding/decoder_metadata.go | 4 +- .../internal/encoding/decoder_range.go | 123 ++- pkg/dataobj/internal/encoding/encoder.go | 32 +- pkg/dataobj/internal/encoding/encoder_logs.go | 17 +- .../internal/encoding/encoder_streams.go | 20 +- .../internal/encoding/encoding_test.go | 6 +- pkg/dataobj/internal/encoding/metrics.go | 36 +- .../metadata/datasetmd/datasetmd.pb.go | 12 +- .../metadata/datasetmd/datasetmd.proto | 12 +- .../internal/metadata/filemd/filemd.pb.go | 708 +++++++++++++++++- .../internal/metadata/filemd/filemd.proto | 63 +- pkg/dataobj/internal/sections/logs/iter.go | 10 +- pkg/dataobj/internal/sections/streams/iter.go | 10 +- pkg/dataobj/logs_reader.go | 6 +- pkg/dataobj/querier/store_test.go | 163 ++-- pkg/dataobj/streams_reader.go | 6 +- pkg/dataobj/tools/inspect.go | 8 +- 21 files changed, 1076 insertions(+), 217 deletions(-) diff --git a/pkg/dataobj/explorer/inspect.go b/pkg/dataobj/explorer/inspect.go index 66fa1bef80..7d7166cf9f 100644 --- a/pkg/dataobj/explorer/inspect.go +++ b/pkg/dataobj/explorer/inspect.go @@ -181,8 +181,8 @@ func inspectLogsSection(ctx context.Context, reader encoding.Decoder, section *f Type: section.Type.String(), } - dec := reader.LogsDecoder() - cols, err := dec.Columns(ctx, section) + dec := reader.LogsDecoder(section) + cols, err := dec.Columns(ctx) if err != nil { return meta, err } @@ -255,8 +255,8 @@ func inspectStreamsSection(ctx context.Context, reader encoding.Decoder, section Type: section.Type.String(), } - dec := reader.StreamsDecoder() - cols, err := dec.Columns(ctx, section) + dec := reader.StreamsDecoder(section) + cols, err := dec.Columns(ctx) if err != nil { return meta, err } diff --git a/pkg/dataobj/internal/encoding/dataset_logs.go b/pkg/dataobj/internal/encoding/dataset_logs.go index ce04cf7e1e..ee4e12c29a 100644 --- a/pkg/dataobj/internal/encoding/dataset_logs.go +++ b/pkg/dataobj/internal/encoding/dataset_logs.go @@ -5,25 +5,22 @@ import ( "fmt" "github.com/grafana/loki/v3/pkg/dataobj/internal/dataset" - "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/filemd" "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/logsmd" "github.com/grafana/loki/v3/pkg/dataobj/internal/result" ) -// LogsDataset implements returns a [dataset.Dataset] from a [LogsDecoder] for -// the given section. -func LogsDataset(dec LogsDecoder, sec *filemd.SectionInfo) dataset.Dataset { - return &logsDataset{dec: dec, sec: sec} +// LogsDataset implements returns a [dataset.Dataset] from a [LogsDecoder]. +func LogsDataset(dec LogsDecoder) dataset.Dataset { + return &logsDataset{dec: dec} } type logsDataset struct { dec LogsDecoder - sec *filemd.SectionInfo } func (ds *logsDataset) ListColumns(ctx context.Context) result.Seq[dataset.Column] { return result.Iter(func(yield func(dataset.Column) bool) error { - columns, err := ds.dec.Columns(ctx, ds.sec) + columns, err := ds.dec.Columns(ctx) if err != nil { return err } diff --git a/pkg/dataobj/internal/encoding/dataset_streams.go b/pkg/dataobj/internal/encoding/dataset_streams.go index d7592d3cf2..9748bfd141 100644 --- a/pkg/dataobj/internal/encoding/dataset_streams.go +++ b/pkg/dataobj/internal/encoding/dataset_streams.go @@ -11,9 +11,9 @@ import ( ) // StreamsDataset implements returns a [dataset.Dataset] from a -// [StreamsDecoder] for the given section. -func StreamsDataset(dec StreamsDecoder, sec *filemd.SectionInfo) dataset.Dataset { - return &streamsDataset{dec: dec, sec: sec} +// [StreamsDecoder]. +func StreamsDataset(dec StreamsDecoder) dataset.Dataset { + return &streamsDataset{dec: dec} } type streamsDataset struct { @@ -23,7 +23,7 @@ type streamsDataset struct { func (ds *streamsDataset) ListColumns(ctx context.Context) result.Seq[dataset.Column] { return result.Iter(func(yield func(dataset.Column) bool) error { - columns, err := ds.dec.Columns(ctx, ds.sec) + columns, err := ds.dec.Columns(ctx) if err != nil { return err } diff --git a/pkg/dataobj/internal/encoding/decoder.go b/pkg/dataobj/internal/encoding/decoder.go index b0916d8225..c1f4a72fd1 100644 --- a/pkg/dataobj/internal/encoding/decoder.go +++ b/pkg/dataobj/internal/encoding/decoder.go @@ -19,17 +19,27 @@ type ( // Sections returns the list of sections within a data object. Sections(ctx context.Context) ([]*filemd.SectionInfo, error) - // StreamsDecoder returns a decoder for streams sections. - StreamsDecoder() StreamsDecoder + // StreamsDecoder returns a decoder for a streams section. The section is + // not checked for type until the decoder is used. + // + // Sections where [filemd.SectionLayout] are defined are prevented from + // reading outside of their layout. + StreamsDecoder(section *filemd.SectionInfo) StreamsDecoder - // LogsDecoder returns a decoder for logs sections. - LogsDecoder() LogsDecoder + // LogsDecoder returns a decoder for a logs section. The section is not + // checked for type until the decoder is used. + // + // Sections where [filemd.SectionLayout] are defined are prevented from + // reading outside of their layout. + LogsDecoder(section *filemd.SectionInfo) LogsDecoder } - // StreamsDecoder supports decoding data within a streams section. + // StreamsDecoder supports decoding data of a streams section. StreamsDecoder interface { - // Columns describes the set of columns in the provided section. - Columns(ctx context.Context, section *filemd.SectionInfo) ([]*streamsmd.ColumnDesc, error) + // Columns describes the set of columns the section. Columns returns an + // error if the section associated with the StreamsDecoder is not a valid + // streams section. + Columns(ctx context.Context) ([]*streamsmd.ColumnDesc, error) // Pages retrieves the set of pages for the provided columns. The order of // page lists emitted by the sequence matches the order of columns @@ -45,8 +55,10 @@ type ( // LogsDecoder supports decoding data within a logs section. LogsDecoder interface { - // Columns describes the set of columns in the provided section. - Columns(ctx context.Context, section *filemd.SectionInfo) ([]*logsmd.ColumnDesc, error) + // Columns describes the set of columns in the provided section. Columns + // returns an error if the section associated with the LogsDecoder is not a + // valid logs section. + Columns(ctx context.Context) ([]*logsmd.ColumnDesc, error) // Pages retrieves the set of pages for the provided columns. The order of // page lists emitted by the sequence matches the order of columns diff --git a/pkg/dataobj/internal/encoding/decoder_metadata.go b/pkg/dataobj/internal/encoding/decoder_metadata.go index 192ea4a5be..b57ab02bac 100644 --- a/pkg/dataobj/internal/encoding/decoder_metadata.go +++ b/pkg/dataobj/internal/encoding/decoder_metadata.go @@ -78,9 +78,9 @@ func decodeStreamsColumnMetadata(r streamio.Reader) (*streamsmd.ColumnMetadata, func decodeLogsMetadata(r streamio.Reader) (*logsmd.Metadata, error) { gotVersion, err := streamio.ReadUvarint(r) if err != nil { - return nil, fmt.Errorf("read streams section format version: %w", err) + return nil, fmt.Errorf("read logs section format version: %w", err) } else if gotVersion != streamsFormatVersion { - return nil, fmt.Errorf("unexpected streams section format version: got=%d want=%d", gotVersion, streamsFormatVersion) + return nil, fmt.Errorf("unexpected logs section format version: got=%d want=%d", gotVersion, streamsFormatVersion) } var md logsmd.Metadata diff --git a/pkg/dataobj/internal/encoding/decoder_range.go b/pkg/dataobj/internal/encoding/decoder_range.go index 22b9083868..4778749ac0 100644 --- a/pkg/dataobj/internal/encoding/decoder_range.go +++ b/pkg/dataobj/internal/encoding/decoder_range.go @@ -91,24 +91,34 @@ func (rd *rangeDecoder) tailer(ctx context.Context) (tailer, error) { }, nil } -func (rd *rangeDecoder) StreamsDecoder() StreamsDecoder { - return &rangeStreamsDecoder{rr: rd.r} +func (rd *rangeDecoder) StreamsDecoder(section *filemd.SectionInfo) StreamsDecoder { + return &rangeStreamsDecoder{rr: rd.r, sec: section} } -func (rd *rangeDecoder) LogsDecoder() LogsDecoder { - return &rangeLogsDecoder{rr: rd.r} +func (rd *rangeDecoder) LogsDecoder(section *filemd.SectionInfo) LogsDecoder { + return &rangeLogsDecoder{rr: rd.r, sec: section} } type rangeStreamsDecoder struct { - rr rangeReader + // TODO(rfratto): restrict sections from reading outside of their regions. + + rr rangeReader // Reader for absolute ranges within the file. + sec *filemd.SectionInfo } -func (rd *rangeStreamsDecoder) Columns(ctx context.Context, section *filemd.SectionInfo) ([]*streamsmd.ColumnDesc, error) { - if got, want := section.Type, filemd.SECTION_TYPE_STREAMS; got != want { +func (rd *rangeStreamsDecoder) Columns(ctx context.Context) ([]*streamsmd.ColumnDesc, error) { + if got, want := rd.sec.Type, filemd.SECTION_TYPE_STREAMS; got != want { return nil, fmt.Errorf("unexpected section type: got=%s want=%s", got, want) } - rc, err := rd.rr.ReadRange(ctx, int64(section.MetadataOffset), int64(section.MetadataSize)) + metadataRegion, err := findMetadataRegion(rd.sec) + if err != nil { + return nil, err + } else if metadataRegion == nil { + return nil, fmt.Errorf("section is missing metadata") + } + + rc, err := rd.rr.ReadRange(ctx, int64(metadataRegion.Offset), int64(metadataRegion.Length)) if err != nil { return nil, fmt.Errorf("reading streams section metadata: %w", err) } @@ -124,8 +134,42 @@ func (rd *rangeStreamsDecoder) Columns(ctx context.Context, section *filemd.Sect return md.Columns, nil } +// findMetadataRegion returns the region where a section's metadata is stored. +// If section specifies the new [filemd.SectionLayout] field, then the region +// from tha layout is returned. Otherwise, it returns the deprecated +// MetadataOffset and MetadataSize fields. +// +// findMetadataRegion returns an error if both the layout and metadata fields +// are set. +// +// findMetadtaRegion returns nil for sections without metadata. +func findMetadataRegion(section *filemd.SectionInfo) (*filemd.Region, error) { + // Fallbacks to deprecated fields if the layout is not set. + var ( + deprecatedOffset = section.MetadataOffset //nolint:staticcheck // Ignore deprecation warning + deprecatedSize = section.MetadataSize //nolint:staticcheck // Ignore deprecation warning + ) + + if section.Layout != nil { + if deprecatedOffset != 0 || deprecatedSize != 0 { + return nil, fmt.Errorf("invalid section: both layout and deprecated metadata fields are set") + } + return section.Layout.Metadata, nil + } + + return &filemd.Region{ + Offset: deprecatedOffset, + Length: deprecatedSize, + }, nil +} + func (rd *rangeStreamsDecoder) Pages(ctx context.Context, columns []*streamsmd.ColumnDesc) result.Seq[[]*streamsmd.PageDesc] { return result.Iter(func(yield func([]*streamsmd.PageDesc) bool) error { + baseOffset, err := findDataOffset(rd.sec) + if err != nil { + return err + } + results := make([][]*streamsmd.PageDesc, len(columns)) columnInfo := func(c *streamsmd.ColumnDesc) (uint64, uint64) { @@ -142,7 +186,7 @@ func (rd *rangeStreamsDecoder) Pages(ctx context.Context, columns []*streamsmd.C windowSize = (window.End().GetInfo().MetadataOffset + window.End().GetInfo().MetadataSize) - windowOffset ) - rc, err := rd.rr.ReadRange(ctx, int64(windowOffset), int64(windowSize)) + rc, err := rd.rr.ReadRange(ctx, int64(baseOffset+windowOffset), int64(windowSize)) if err != nil { return fmt.Errorf("reading column data: %w", err) } @@ -181,6 +225,25 @@ func (rd *rangeStreamsDecoder) Pages(ctx context.Context, columns []*streamsmd.C }) } +// findDataOffset returns the base byte offset from where all reads of a +// section start. +// +// Older versions of data objects use absolute offsets for page data. Newer +// versions (where [filemd.SectionLayout] is provided) use offsets relative to +// the start of a section's data region. +// +// If a section specifies a layout but has no data region, then the section has +// no data for reading, and findDataOffset returns an error. +func findDataOffset(section *filemd.SectionInfo) (uint64, error) { + if section.Layout != nil { + if section.Layout.Data == nil { + return 0, fmt.Errorf("section has no data") + } + return section.Layout.Data.Offset, nil + } + return 0, nil +} + // readAndClose reads exactly size bytes from rc and then closes it. func readAndClose(rc io.ReadCloser, size uint64) ([]byte, error) { defer rc.Close() @@ -194,6 +257,11 @@ func readAndClose(rc io.ReadCloser, size uint64) ([]byte, error) { func (rd *rangeStreamsDecoder) ReadPages(ctx context.Context, pages []*streamsmd.PageDesc) result.Seq[dataset.PageData] { return result.Iter(func(yield func(dataset.PageData) bool) error { + baseOffset, err := findDataOffset(rd.sec) + if err != nil { + return err + } + results := make([]dataset.PageData, len(pages)) pageInfo := func(p *streamsmd.PageDesc) (uint64, uint64) { @@ -212,7 +280,7 @@ func (rd *rangeStreamsDecoder) ReadPages(ctx context.Context, pages []*streamsmd windowSize = (window.End().GetInfo().DataOffset + window.End().GetInfo().DataSize) - windowOffset ) - rc, err := rd.rr.ReadRange(ctx, int64(windowOffset), int64(windowSize)) + rc, err := rd.rr.ReadRange(ctx, int64(baseOffset+windowOffset), int64(windowSize)) if err != nil { return fmt.Errorf("reading page data: %w", err) } @@ -245,16 +313,27 @@ func (rd *rangeStreamsDecoder) ReadPages(ctx context.Context, pages []*streamsmd } type rangeLogsDecoder struct { - rr rangeReader + // TODO(rfratto): restrict sections from reading outside of their regions. + + rr rangeReader // Reader for absolute ranges within the file. + sec *filemd.SectionInfo } -func (rd *rangeLogsDecoder) Columns(ctx context.Context, section *filemd.SectionInfo) ([]*logsmd.ColumnDesc, error) { - if got, want := section.Type, filemd.SECTION_TYPE_LOGS; got != want { +func (rd *rangeLogsDecoder) Columns(ctx context.Context) ([]*logsmd.ColumnDesc, error) { + if got, want := rd.sec.Type, filemd.SECTION_TYPE_LOGS; got != want { return nil, fmt.Errorf("unexpected section type: got=%s want=%s", got, want) } - rc, err := rd.rr.ReadRange(ctx, int64(section.MetadataOffset), int64(section.MetadataSize)) + + metadataRegion, err := findMetadataRegion(rd.sec) if err != nil { - return nil, fmt.Errorf("reading streams section metadata: %w", err) + return nil, err + } else if metadataRegion == nil { + return nil, fmt.Errorf("section is missing metadata") + } + + rc, err := rd.rr.ReadRange(ctx, int64(metadataRegion.Offset), int64(metadataRegion.Length)) + if err != nil { + return nil, fmt.Errorf("reading logs section metadata: %w", err) } defer rc.Close() @@ -270,6 +349,11 @@ func (rd *rangeLogsDecoder) Columns(ctx context.Context, section *filemd.Section func (rd *rangeLogsDecoder) Pages(ctx context.Context, columns []*logsmd.ColumnDesc) result.Seq[[]*logsmd.PageDesc] { return result.Iter(func(yield func([]*logsmd.PageDesc) bool) error { + baseOffset, err := findDataOffset(rd.sec) + if err != nil { + return err + } + results := make([][]*logsmd.PageDesc, len(columns)) columnInfo := func(c *logsmd.ColumnDesc) (uint64, uint64) { @@ -286,7 +370,7 @@ func (rd *rangeLogsDecoder) Pages(ctx context.Context, columns []*logsmd.ColumnD windowSize = (window.End().GetInfo().MetadataOffset + window.End().GetInfo().MetadataSize) - windowOffset ) - rc, err := rd.rr.ReadRange(ctx, int64(windowOffset), int64(windowSize)) + rc, err := rd.rr.ReadRange(ctx, int64(baseOffset+windowOffset), int64(windowSize)) if err != nil { return fmt.Errorf("reading column data: %w", err) } @@ -327,6 +411,11 @@ func (rd *rangeLogsDecoder) Pages(ctx context.Context, columns []*logsmd.ColumnD func (rd *rangeLogsDecoder) ReadPages(ctx context.Context, pages []*logsmd.PageDesc) result.Seq[dataset.PageData] { return result.Iter(func(yield func(dataset.PageData) bool) error { + baseOffset, err := findDataOffset(rd.sec) + if err != nil { + return err + } + results := make([]dataset.PageData, len(pages)) pageInfo := func(p *logsmd.PageDesc) (uint64, uint64) { @@ -345,7 +434,7 @@ func (rd *rangeLogsDecoder) ReadPages(ctx context.Context, pages []*logsmd.PageD windowSize = (window.End().GetInfo().DataOffset + window.End().GetInfo().DataSize) - windowOffset ) - rc, err := rd.rr.ReadRange(ctx, int64(windowOffset), int64(windowSize)) + rc, err := rd.rr.ReadRange(ctx, int64(baseOffset+windowOffset), int64(windowSize)) if err != nil { return fmt.Errorf("reading page data: %w", err) } diff --git a/pkg/dataobj/internal/encoding/encoder.go b/pkg/dataobj/internal/encoding/encoder.go index 3ed57f5b3e..6f4aa75a04 100644 --- a/pkg/dataobj/internal/encoding/encoder.go +++ b/pkg/dataobj/internal/encoding/encoder.go @@ -4,7 +4,6 @@ import ( "bytes" "encoding/binary" "fmt" - "math" "github.com/gogo/protobuf/proto" @@ -75,15 +74,10 @@ func (enc *Encoder) OpenStreams() (*StreamsEncoder, error) { // closed. We temporarily set these fields to the maximum values so they're // accounted for in the MetadataSize estimate. enc.curSection = &filemd.SectionInfo{ - Type: filemd.SECTION_TYPE_STREAMS, - MetadataOffset: math.MaxUint32, - MetadataSize: math.MaxUint32, + Type: filemd.SECTION_TYPE_STREAMS, } - return newStreamsEncoder( - enc, - enc.startOffset+enc.data.Len(), - ), nil + return newStreamsEncoder(enc), nil } // OpenLogs opens a [LogsEncoder]. OpenLogs fails if there is another open @@ -94,15 +88,10 @@ func (enc *Encoder) OpenLogs() (*LogsEncoder, error) { } enc.curSection = &filemd.SectionInfo{ - Type: filemd.SECTION_TYPE_LOGS, - MetadataOffset: math.MaxUint32, - MetadataSize: math.MaxUint32, + Type: filemd.SECTION_TYPE_LOGS, } - return newLogsEncoder( - enc, - enc.startOffset+enc.data.Len(), - ), nil + return newLogsEncoder(enc), nil } // MetadataSize returns an estimate of the current size of the metadata for the @@ -186,8 +175,17 @@ func (enc *Encoder) append(data, metadata []byte) error { return nil } - enc.curSection.MetadataOffset = uint64(enc.startOffset + enc.data.Len() + len(data)) - enc.curSection.MetadataSize = uint64(len(metadata)) + enc.curSection.Layout = &filemd.SectionLayout{ + Data: &filemd.Region{ + Offset: uint64(enc.startOffset + enc.data.Len()), + Length: uint64(len(data)), + }, + + Metadata: &filemd.Region{ + Offset: uint64(enc.startOffset + enc.data.Len() + len(data)), + Length: uint64(len(metadata)), + }, + } // bytes.Buffer.Write never fails. enc.data.Grow(len(data) + len(metadata)) diff --git a/pkg/dataobj/internal/encoding/encoder_logs.go b/pkg/dataobj/internal/encoding/encoder_logs.go index 701fe92432..695dcc6619 100644 --- a/pkg/dataobj/internal/encoding/encoder_logs.go +++ b/pkg/dataobj/internal/encoding/encoder_logs.go @@ -17,21 +17,19 @@ import ( type LogsEncoder struct { parent *Encoder - startOffset int // Byte offset in the file where the column starts. - closed bool // true if LogsEncoder has been closed. + closed bool // true if LogsEncoder has been closed. data *bytes.Buffer columns []*logsmd.ColumnDesc // closed columns. curColumn *logsmd.ColumnDesc // curColumn is the currently open column. } -func newLogsEncoder(parent *Encoder, offset int) *LogsEncoder { +func newLogsEncoder(parent *Encoder) *LogsEncoder { buf := bytesBufferPool.Get().(*bytes.Buffer) buf.Reset() return &LogsEncoder{ - parent: parent, - startOffset: offset, + parent: parent, data: buf, } @@ -66,10 +64,7 @@ func (enc *LogsEncoder) OpenColumn(columnType logsmd.ColumnType, info *dataset.C }, } - return newLogsColumnEncoder( - enc, - enc.startOffset+enc.data.Len(), - ), nil + return newLogsColumnEncoder(enc, enc.data.Len()), nil } // MetadataSize returns an estimate of the current size of the metadata for the @@ -149,7 +144,7 @@ func (enc *LogsEncoder) append(data, metadata []byte) error { return nil } - enc.curColumn.Info.MetadataOffset = uint64(enc.startOffset + enc.data.Len() + len(data)) + enc.curColumn.Info.MetadataOffset = uint64(enc.data.Len() + len(data)) enc.curColumn.Info.MetadataSize = uint64(len(metadata)) // bytes.Buffer.Write never fails. @@ -167,7 +162,7 @@ func (enc *LogsEncoder) append(data, metadata []byte) error { type LogsColumnEncoder struct { parent *LogsEncoder - startOffset int // Byte offset in the file where the column starts. + startOffset int // Byte offset in the section where the column starts. closed bool // true if LogsColumnEncoder has been closed. data *bytes.Buffer // All page data. diff --git a/pkg/dataobj/internal/encoding/encoder_streams.go b/pkg/dataobj/internal/encoding/encoder_streams.go index fe41c4349b..b0ecbaa195 100644 --- a/pkg/dataobj/internal/encoding/encoder_streams.go +++ b/pkg/dataobj/internal/encoding/encoder_streams.go @@ -17,23 +17,20 @@ import ( type StreamsEncoder struct { parent *Encoder - startOffset int // Byte offset in the file where the column starts. - closed bool // true if StreamsEncoder has been closed. + closed bool // true if StreamsEncoder has been closed. data *bytes.Buffer columns []*streamsmd.ColumnDesc // closed columns. curColumn *streamsmd.ColumnDesc // curColumn is the currently open column. } -func newStreamsEncoder(parent *Encoder, offset int) *StreamsEncoder { +func newStreamsEncoder(parent *Encoder) *StreamsEncoder { buf := bytesBufferPool.Get().(*bytes.Buffer) buf.Reset() return &StreamsEncoder{ - parent: parent, - startOffset: offset, - - data: buf, + parent: parent, + data: buf, } } @@ -66,10 +63,7 @@ func (enc *StreamsEncoder) OpenColumn(columnType streamsmd.ColumnType, info *dat }, } - return newStreamsColumnEncoder( - enc, - enc.startOffset+enc.data.Len(), - ), nil + return newStreamsColumnEncoder(enc, enc.data.Len()), nil } // MetadataSize returns an estimate of the current size of the metadata for the @@ -149,7 +143,7 @@ func (enc *StreamsEncoder) append(data, metadata []byte) error { return nil } - enc.curColumn.Info.MetadataOffset = uint64(enc.startOffset + enc.data.Len() + len(data)) + enc.curColumn.Info.MetadataOffset = uint64(enc.data.Len() + len(data)) enc.curColumn.Info.MetadataSize = uint64(len(metadata)) // bytes.Buffer.Write never fails. @@ -167,7 +161,7 @@ func (enc *StreamsEncoder) append(data, metadata []byte) error { type StreamsColumnEncoder struct { parent *StreamsEncoder - startOffset int // Byte offset in the file where the column starts. + startOffset int // Byte offset in the section where the column starts. closed bool // true if StreamsColumnEncoder has been closed. data *bytes.Buffer // All page data. diff --git a/pkg/dataobj/internal/encoding/encoding_test.go b/pkg/dataobj/internal/encoding/encoding_test.go index d75a8333f3..63042b10d7 100644 --- a/pkg/dataobj/internal/encoding/encoding_test.go +++ b/pkg/dataobj/internal/encoding/encoding_test.go @@ -97,7 +97,7 @@ func TestStreams(t *testing.T) { require.Len(t, sections, 1) require.Equal(t, filemd.SECTION_TYPE_STREAMS, sections[0].Type) - dset := encoding.StreamsDataset(dec.StreamsDecoder(), sections[0]) + dset := encoding.StreamsDataset(dec.StreamsDecoder(sections[0])) columns, err := result.Collect(dset.ListColumns(context.Background())) require.NoError(t, err) @@ -182,8 +182,8 @@ func TestLogs(t *testing.T) { require.Len(t, sections, 1) require.Equal(t, filemd.SECTION_TYPE_LOGS, sections[0].Type) - logsDec := dec.LogsDecoder() - columns, err := logsDec.Columns(context.TODO(), sections[0]) + logsDec := dec.LogsDecoder(sections[0]) + columns, err := logsDec.Columns(context.TODO()) require.NoError(t, err) require.Len(t, columns, 2) diff --git a/pkg/dataobj/internal/encoding/metrics.go b/pkg/dataobj/internal/encoding/metrics.go index e0506578d7..babdceab7e 100644 --- a/pkg/dataobj/internal/encoding/metrics.go +++ b/pkg/dataobj/internal/encoding/metrics.go @@ -241,22 +241,17 @@ func (m *Metrics) Observe(ctx context.Context, dec Decoder) error { m.sectionsCount.Observe(float64(len(sections))) m.fileMetadataSize.Observe(float64(proto.Size(&filemd.Metadata{Sections: sections}))) for _, section := range sections { - m.sectionMetadataSize.WithLabelValues(section.Type.String()).Observe(float64(section.MetadataSize)) + m.sectionMetadataSize.WithLabelValues(section.Type.String()).Observe(float64(calculateMetadataSize(section))) } - var ( - streamsDecoder = dec.StreamsDecoder() - logsDecoder = dec.LogsDecoder() - ) - var errs []error for _, section := range sections { switch section.Type { case filemd.SECTION_TYPE_STREAMS: - errs = append(errs, m.observeStreamsSection(ctx, section, streamsDecoder)) + errs = append(errs, m.observeStreamsSection(ctx, dec.StreamsDecoder(section))) case filemd.SECTION_TYPE_LOGS: - errs = append(errs, m.observeLogsSection(ctx, section, logsDecoder)) + errs = append(errs, m.observeLogsSection(ctx, dec.LogsDecoder(section))) default: errs = append(errs, fmt.Errorf("unknown section type %q", section.Type.String())) } @@ -265,10 +260,23 @@ func (m *Metrics) Observe(ctx context.Context, dec Decoder) error { return errors.Join(errs...) } -func (m *Metrics) observeStreamsSection(ctx context.Context, section *filemd.SectionInfo, dec StreamsDecoder) error { - sectionType := section.Type.String() +// calculateMetadataSize returns the size of metadata in a section, accounting +// for whether it's using the deprecated fields or the new layout. +func calculateMetadataSize(section *filemd.SectionInfo) uint64 { + if section.GetLayout() != nil { + // This will return zero if GetMetadata returns nil, which is correct as it + // defines the section as having no metadata. + return section.GetLayout().GetMetadata().GetLength() + } + + // Fallback to the deprecated field. + return section.MetadataSize //nolint:staticcheck // MetadataSize is deprecated but still used as a fallback. +} + +func (m *Metrics) observeStreamsSection(ctx context.Context, dec StreamsDecoder) error { + sectionType := filemd.SECTION_TYPE_STREAMS.String() - columns, err := dec.Columns(ctx, section) + columns, err := dec.Columns(ctx) if err != nil { return err } @@ -321,10 +329,10 @@ func (m *Metrics) observeStreamsSection(ctx context.Context, section *filemd.Sec return nil } -func (m *Metrics) observeLogsSection(ctx context.Context, section *filemd.SectionInfo, dec LogsDecoder) error { - sectionType := section.Type.String() +func (m *Metrics) observeLogsSection(ctx context.Context, dec LogsDecoder) error { + sectionType := filemd.SECTION_TYPE_LOGS.String() - columns, err := dec.Columns(ctx, section) + columns, err := dec.Columns(ctx) if err != nil { return err } diff --git a/pkg/dataobj/internal/metadata/datasetmd/datasetmd.pb.go b/pkg/dataobj/internal/metadata/datasetmd/datasetmd.pb.go index ab0438c6c2..573446881f 100644 --- a/pkg/dataobj/internal/metadata/datasetmd/datasetmd.pb.go +++ b/pkg/dataobj/internal/metadata/datasetmd/datasetmd.pb.go @@ -141,7 +141,11 @@ type ColumnInfo struct { // Total compressed size of all pages in the column. Compressed size may // match uncompressed size if no compression is used. CompressedSize uint64 `protobuf:"varint,6,opt,name=compressed_size,json=compressedSize,proto3" json:"compressed_size,omitempty"` - // Byte offset from the start of the data object to the column's metadata. + // Byte offset relative to the start of the section to the column's metadata. + // + // In older versions of dataobjs, this was an absolute offset from the start + // of the data object. For backwards compatibility, interpret this offset as + // absolute if filemd.SectionLayout is unset for a section. MetadataOffset uint64 `protobuf:"varint,7,opt,name=metadata_offset,json=metadataOffset,proto3" json:"metadata_offset,omitempty"` // Size of the column's metadata in bytes. MetadataSize uint64 `protobuf:"varint,8,opt,name=metadata_size,json=metadataSize,proto3" json:"metadata_size,omitempty"` @@ -341,7 +345,11 @@ type PageInfo struct { RowsCount uint64 `protobuf:"varint,4,opt,name=rows_count,json=rowsCount,proto3" json:"rows_count,omitempty"` // Encoding type used for the page. Encoding EncodingType `protobuf:"varint,5,opt,name=encoding,proto3,enum=dataobj.metadata.dataset.v1.EncodingType" json:"encoding,omitempty"` - // Byte offset from the start of the data object to the page's data. + // Byte offset relative to the start of the section to the page's data + // + // In older versions of dataobjs, this was an absolute offset from the start + // of the data object. For backwards compatibility, interpret this offset as + // absolute if filemd.SectionLayout is unset for a section. DataOffset uint64 `protobuf:"varint,6,opt,name=data_offset,json=dataOffset,proto3" json:"data_offset,omitempty"` // Size of the page's data in bytes. DataSize uint64 `protobuf:"varint,7,opt,name=data_size,json=dataSize,proto3" json:"data_size,omitempty"` diff --git a/pkg/dataobj/internal/metadata/datasetmd/datasetmd.proto b/pkg/dataobj/internal/metadata/datasetmd/datasetmd.proto index a2cca57a24..6ce202cda7 100644 --- a/pkg/dataobj/internal/metadata/datasetmd/datasetmd.proto +++ b/pkg/dataobj/internal/metadata/datasetmd/datasetmd.proto @@ -26,7 +26,11 @@ message ColumnInfo { // match uncompressed size if no compression is used. uint64 compressed_size = 6; - // Byte offset from the start of the data object to the column's metadata. + // Byte offset relative to the start of the section to the column's metadata. + // + // In older versions of dataobjs, this was an absolute offset from the start + // of the data object. For backwards compatibility, interpret this offset as + // absolute if filemd.SectionLayout is unset for a section. uint64 metadata_offset = 7; // Size of the column's metadata in bytes. @@ -115,7 +119,11 @@ message PageInfo { // Encoding type used for the page. EncodingType encoding = 5; - // Byte offset from the start of the data object to the page's data. + // Byte offset relative to the start of the section to the page's data + // + // In older versions of dataobjs, this was an absolute offset from the start + // of the data object. For backwards compatibility, interpret this offset as + // absolute if filemd.SectionLayout is unset for a section. uint64 data_offset = 6; // Size of the page's data in bytes. diff --git a/pkg/dataobj/internal/metadata/filemd/filemd.pb.go b/pkg/dataobj/internal/metadata/filemd/filemd.pb.go index cc1dc41c7f..2039f39594 100644 --- a/pkg/dataobj/internal/metadata/filemd/filemd.pb.go +++ b/pkg/dataobj/internal/metadata/filemd/filemd.pb.go @@ -101,14 +101,39 @@ func (m *Metadata) GetSections() []*SectionInfo { return nil } -// SectionInfo describes a section within the data object. +// SectionInfo describes a section within the data object. Each section is an +// independent unit of the data object. type SectionInfo struct { // Type of the section within the data object. Type SectionType `protobuf:"varint,1,opt,name=type,proto3,enum=dataobj.metadata.file.v1.SectionType" json:"type,omitempty"` // Byte offset of the section's metadata from the start of the data object. - MetadataOffset uint64 `protobuf:"varint,2,opt,name=metadata_offset,json=metadataOffset,proto3" json:"metadata_offset,omitempty"` + // + // Deprecated: Use layout to describe the location of regions of a section. + MetadataOffset uint64 `protobuf:"varint,2,opt,name=metadata_offset,json=metadataOffset,proto3" json:"metadata_offset,omitempty"` // Deprecated: Do not use. // Size of the section's metadata in bytes. - MetadataSize uint64 `protobuf:"varint,3,opt,name=metadata_size,json=metadataSize,proto3" json:"metadata_size,omitempty"` + // + // Deprecated: Use layout to describe the location of regions of a section. + MetadataSize uint64 `protobuf:"varint,3,opt,name=metadata_size,json=metadataSize,proto3" json:"metadata_size,omitempty"` // Deprecated: Do not use. + // The physical layout of the section within the data object. Setting + // layout is mutually exclusive with specifying the metadata_offset and + // metadata_size fields. + // + // For backwards compatibility with older versions of data objects where + // layout isn't provided, implementations must assume that: + // + // - A section has data, but its offset and length are unknown. + // + // - Range reads of section data are done relative to the start of the + // dataobj. + // + // If the SectionLayout is specified for a section, range reads are instead + // relative to the start of the data region. If the data region is undefined, + // then the section has no data. + // + // Setting the layout is mutually exclusive with specfiying the + // metadata_offset and metadata_size fields, and readers must reject data + // objects that set both. + Layout *SectionLayout `protobuf:"bytes,4,opt,name=layout,proto3" json:"layout,omitempty"` } func (m *SectionInfo) Reset() { *m = SectionInfo{} } @@ -150,6 +175,7 @@ func (m *SectionInfo) GetType() SectionType { return SECTION_TYPE_UNSPECIFIED } +// Deprecated: Do not use. func (m *SectionInfo) GetMetadataOffset() uint64 { if m != nil { return m.MetadataOffset @@ -157,6 +183,7 @@ func (m *SectionInfo) GetMetadataOffset() uint64 { return 0 } +// Deprecated: Do not use. func (m *SectionInfo) GetMetadataSize() uint64 { if m != nil { return m.MetadataSize @@ -164,10 +191,140 @@ func (m *SectionInfo) GetMetadataSize() uint64 { return 0 } +func (m *SectionInfo) GetLayout() *SectionLayout { + if m != nil { + return m.Layout + } + return nil +} + +// SectionLayout describes the physical placement of the regions that form a +// complete section: its data and its metadata. +// +// The metadata of a section is intended to be lightweight and is typically +// used to aid reading the section's data in smaller chunks. +// +// There are no guarantees about the placement or ordering of a section's +// regions; they may be contiguous, disjoint, or interleaved with regions from +// other sections. +// +// Implementations can use region information to ensure that a section does not +// access bytes outside of its layout. +type SectionLayout struct { + // The region covering the data of a section. If the data region is + // undefined, implementations must assume that the section has no data. + Data *Region `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"` + // The region covering the metadata of a section. If the metadata region is + // undefined, implementations must assume that the section has no metadata. + Metadata *Region `protobuf:"bytes,2,opt,name=metadata,proto3" json:"metadata,omitempty"` +} + +func (m *SectionLayout) Reset() { *m = SectionLayout{} } +func (*SectionLayout) ProtoMessage() {} +func (*SectionLayout) Descriptor() ([]byte, []int) { + return fileDescriptor_be80f52d1e05bad9, []int{2} +} +func (m *SectionLayout) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *SectionLayout) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_SectionLayout.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *SectionLayout) XXX_Merge(src proto.Message) { + xxx_messageInfo_SectionLayout.Merge(m, src) +} +func (m *SectionLayout) XXX_Size() int { + return m.Size() +} +func (m *SectionLayout) XXX_DiscardUnknown() { + xxx_messageInfo_SectionLayout.DiscardUnknown(m) +} + +var xxx_messageInfo_SectionLayout proto.InternalMessageInfo + +func (m *SectionLayout) GetData() *Region { + if m != nil { + return m.Data + } + return nil +} + +func (m *SectionLayout) GetMetadata() *Region { + if m != nil { + return m.Metadata + } + return nil +} + +// Region describes a contiguous range of bytes within a data object. +type Region struct { + // Byte offset of the region from the start of the data object. + Offset uint64 `protobuf:"varint,1,opt,name=offset,proto3" json:"offset,omitempty"` + // Length of the region in bytes. + Length uint64 `protobuf:"varint,2,opt,name=length,proto3" json:"length,omitempty"` +} + +func (m *Region) Reset() { *m = Region{} } +func (*Region) ProtoMessage() {} +func (*Region) Descriptor() ([]byte, []int) { + return fileDescriptor_be80f52d1e05bad9, []int{3} +} +func (m *Region) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *Region) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_Region.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *Region) XXX_Merge(src proto.Message) { + xxx_messageInfo_Region.Merge(m, src) +} +func (m *Region) XXX_Size() int { + return m.Size() +} +func (m *Region) XXX_DiscardUnknown() { + xxx_messageInfo_Region.DiscardUnknown(m) +} + +var xxx_messageInfo_Region proto.InternalMessageInfo + +func (m *Region) GetOffset() uint64 { + if m != nil { + return m.Offset + } + return 0 +} + +func (m *Region) GetLength() uint64 { + if m != nil { + return m.Length + } + return 0 +} + func init() { proto.RegisterEnum("dataobj.metadata.file.v1.SectionType", SectionType_name, SectionType_value) proto.RegisterType((*Metadata)(nil), "dataobj.metadata.file.v1.Metadata") proto.RegisterType((*SectionInfo)(nil), "dataobj.metadata.file.v1.SectionInfo") + proto.RegisterType((*SectionLayout)(nil), "dataobj.metadata.file.v1.SectionLayout") + proto.RegisterType((*Region)(nil), "dataobj.metadata.file.v1.Region") } func init() { @@ -175,30 +332,36 @@ func init() { } var fileDescriptor_be80f52d1e05bad9 = []byte{ - // 356 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x32, 0x2c, 0xc8, 0x4e, 0xd7, - 0x4f, 0x49, 0x2c, 0x49, 0xcc, 0x4f, 0xca, 0xd2, 0xcf, 0xcc, 0x2b, 0x49, 0x2d, 0xca, 0x4b, 0xcc, - 0xd1, 0xcf, 0x4d, 0x2d, 0x49, 0x04, 0x09, 0xea, 0xa7, 0x65, 0xe6, 0xa4, 0xe6, 0xa6, 0x40, 0x29, - 0xbd, 0x82, 0xa2, 0xfc, 0x92, 0x7c, 0x21, 0x09, 0xa8, 0x72, 0x3d, 0x98, 0x2a, 0x3d, 0x90, 0xb4, - 0x5e, 0x99, 0xa1, 0x92, 0x2f, 0x17, 0x87, 0x2f, 0x54, 0x4c, 0xc8, 0x91, 0x8b, 0xa3, 0x38, 0x35, - 0xb9, 0x24, 0x33, 0x3f, 0xaf, 0x58, 0x82, 0x51, 0x81, 0x59, 0x83, 0xdb, 0x48, 0x55, 0x0f, 0x97, - 0x46, 0xbd, 0x60, 0x88, 0x4a, 0xcf, 0xbc, 0xb4, 0xfc, 0x20, 0xb8, 0x36, 0xa5, 0x69, 0x8c, 0x5c, - 0xdc, 0x48, 0x32, 0x42, 0x96, 0x5c, 0x2c, 0x25, 0x95, 0x05, 0xa9, 0x12, 0x8c, 0x0a, 0x8c, 0x1a, - 0x7c, 0x44, 0x18, 0x17, 0x52, 0x59, 0x90, 0x1a, 0x04, 0xd6, 0x22, 0xa4, 0xce, 0xc5, 0x0f, 0x53, - 0x15, 0x9f, 0x9f, 0x96, 0x56, 0x9c, 0x5a, 0x22, 0xc1, 0xa4, 0xc0, 0xa8, 0xc1, 0x12, 0xc4, 0x07, - 0x13, 0xf6, 0x07, 0x8b, 0x0a, 0x29, 0x73, 0xf1, 0xc2, 0x15, 0x16, 0x67, 0x56, 0xa5, 0x4a, 0x30, - 0x83, 0x95, 0xf1, 0xc0, 0x04, 0x83, 0x33, 0xab, 0x52, 0xb5, 0x62, 0xe0, 0xee, 0x02, 0x59, 0x21, - 0x24, 0xc3, 0x25, 0x11, 0xec, 0xea, 0x1c, 0xe2, 0xe9, 0xef, 0x17, 0x1f, 0x12, 0x19, 0xe0, 0x1a, - 0x1f, 0xea, 0x17, 0x1c, 0xe0, 0xea, 0xec, 0xe9, 0xe6, 0xe9, 0xea, 0x22, 0xc0, 0x20, 0x24, 0xc1, - 0x25, 0x82, 0x22, 0x1b, 0x1c, 0x12, 0xe4, 0xea, 0xe8, 0x1b, 0x2c, 0xc0, 0x28, 0x24, 0xca, 0x25, - 0x88, 0x22, 0xe3, 0xe3, 0xef, 0x1e, 0x2c, 0xc0, 0xe4, 0x54, 0x7a, 0xe1, 0xa1, 0x1c, 0xc3, 0x8d, - 0x87, 0x72, 0x0c, 0x1f, 0x1e, 0xca, 0x31, 0x36, 0x3c, 0x92, 0x63, 0x5c, 0xf1, 0x48, 0x8e, 0xf1, - 0xc4, 0x23, 0x39, 0xc6, 0x0b, 0x8f, 0xe4, 0x18, 0x1f, 0x3c, 0x92, 0x63, 0x7c, 0xf1, 0x48, 0x8e, - 0xe1, 0xc3, 0x23, 0x39, 0xc6, 0x09, 0x8f, 0xe5, 0x18, 0x2e, 0x3c, 0x96, 0x63, 0xb8, 0xf1, 0x58, - 0x8e, 0x21, 0xca, 0x3e, 0x3d, 0xb3, 0x24, 0xa3, 0x34, 0x49, 0x2f, 0x39, 0x3f, 0x57, 0x3f, 0xbd, - 0x28, 0x31, 0x2d, 0x31, 0x2f, 0x51, 0x3f, 0x27, 0x3f, 0x3b, 0x53, 0xbf, 0xcc, 0x58, 0x9f, 0x98, - 0x38, 0x4e, 0x62, 0x03, 0xc7, 0xae, 0x31, 0x20, 0x00, 0x00, 0xff, 0xff, 0x18, 0x88, 0x8a, 0x94, - 0x12, 0x02, 0x00, 0x00, + // 455 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x92, 0xb1, 0x6a, 0xdb, 0x40, + 0x18, 0xc7, 0xf5, 0x39, 0xc2, 0x98, 0xcf, 0x4d, 0xea, 0x1e, 0x6d, 0xb9, 0xa1, 0x1c, 0xc2, 0x50, + 0x62, 0x5a, 0x90, 0x88, 0xd3, 0xa1, 0x85, 0x42, 0x48, 0x52, 0xb5, 0x18, 0xe2, 0x38, 0x9c, 0xdc, + 0xa1, 0xa5, 0x60, 0xe4, 0xe4, 0xa4, 0xa8, 0x91, 0x75, 0xc6, 0x3e, 0x07, 0x9c, 0xa9, 0xd0, 0x17, + 0xe8, 0x63, 0xf4, 0x51, 0x3a, 0x7a, 0xcc, 0xd6, 0x5a, 0x5e, 0x3a, 0xe6, 0x11, 0x8a, 0x64, 0x49, + 0xd4, 0x83, 0x89, 0x27, 0x71, 0xff, 0xdf, 0xef, 0x7f, 0xd2, 0x7d, 0x3a, 0xdc, 0x1b, 0x5e, 0xf9, + 0xd6, 0x85, 0xab, 0x5c, 0xd9, 0xff, 0x6a, 0x05, 0x91, 0x12, 0xa3, 0xc8, 0x0d, 0xad, 0x81, 0x50, + 0x6e, 0x12, 0x5a, 0x5e, 0x10, 0x8a, 0xc1, 0x45, 0xf6, 0x30, 0x87, 0x23, 0xa9, 0x24, 0xa1, 0x99, + 0x6e, 0xe6, 0x96, 0x99, 0x60, 0xf3, 0x7a, 0xaf, 0xde, 0xc6, 0x4a, 0x3b, 0xcb, 0xc8, 0x21, 0x56, + 0xc6, 0xe2, 0x5c, 0x05, 0x32, 0x1a, 0x53, 0x30, 0xb6, 0x1a, 0xd5, 0xe6, 0x73, 0x73, 0x5d, 0xd1, + 0x74, 0x96, 0x66, 0x2b, 0xf2, 0x24, 0x2f, 0x6a, 0xf5, 0xdf, 0x80, 0xd5, 0xff, 0x08, 0x79, 0x83, + 0xba, 0x9a, 0x0e, 0x05, 0x05, 0x03, 0x1a, 0x3b, 0x1b, 0x6c, 0xd7, 0x9d, 0x0e, 0x05, 0x4f, 0x2b, + 0xe4, 0x25, 0x3e, 0xcc, 0xad, 0x9e, 0xf4, 0xbc, 0xb1, 0x50, 0xb4, 0x64, 0x40, 0x43, 0x3f, 0x2a, + 0x51, 0xe0, 0x3b, 0x39, 0xea, 0xa4, 0x84, 0xec, 0xe2, 0x76, 0x21, 0x8f, 0x83, 0x1b, 0x41, 0xb7, + 0x0a, 0xf5, 0x41, 0x0e, 0x9c, 0xe0, 0x46, 0x90, 0x03, 0x2c, 0x87, 0xee, 0x54, 0x4e, 0x14, 0xd5, + 0x0d, 0x68, 0x54, 0x9b, 0xbb, 0xf7, 0x7e, 0xd2, 0x49, 0xaa, 0xf3, 0xac, 0x56, 0xff, 0x0e, 0xb8, + 0xbd, 0x42, 0xc8, 0x2b, 0xd4, 0x93, 0x5e, 0x7a, 0xc6, 0x6a, 0xd3, 0x58, 0xbf, 0x21, 0x17, 0x7e, + 0x20, 0x23, 0x9e, 0xda, 0xe4, 0x2d, 0x56, 0x72, 0x21, 0x3d, 0xd7, 0x26, 0xcd, 0xa2, 0x51, 0x7f, + 0x8d, 0xe5, 0x65, 0x46, 0x9e, 0x62, 0x39, 0x9b, 0x4e, 0xf2, 0x7e, 0x9d, 0x67, 0xab, 0x24, 0x0f, + 0x45, 0xe4, 0xab, 0xcb, 0xe5, 0xd4, 0x78, 0xb6, 0x7a, 0xf1, 0xa5, 0xf8, 0x41, 0xc9, 0xac, 0xc9, + 0x33, 0xa4, 0x8e, 0x7d, 0xdc, 0x6d, 0x75, 0x4e, 0x7b, 0xdd, 0x4f, 0x67, 0x76, 0xef, 0xe3, 0xa9, + 0x73, 0x66, 0x1f, 0xb7, 0xde, 0xb7, 0xec, 0x77, 0x35, 0x8d, 0x50, 0x7c, 0xbc, 0x42, 0x9d, 0x2e, + 0xb7, 0x0f, 0xdb, 0x4e, 0x0d, 0xc8, 0x13, 0x7c, 0xb4, 0x42, 0x4e, 0x3a, 0x1f, 0x9c, 0x5a, 0xe9, + 0x68, 0x32, 0x9b, 0x33, 0xed, 0x76, 0xce, 0xb4, 0xbb, 0x39, 0x83, 0x6f, 0x31, 0x83, 0x9f, 0x31, + 0x83, 0x5f, 0x31, 0x83, 0x59, 0xcc, 0xe0, 0x4f, 0xcc, 0xe0, 0x6f, 0xcc, 0xb4, 0xbb, 0x98, 0xc1, + 0x8f, 0x05, 0xd3, 0x66, 0x0b, 0xa6, 0xdd, 0x2e, 0x98, 0xf6, 0xf9, 0xc0, 0x0f, 0xd4, 0xe5, 0xa4, + 0x6f, 0x9e, 0xcb, 0x81, 0xe5, 0x8f, 0x5c, 0xcf, 0x8d, 0x5c, 0x2b, 0x94, 0x57, 0x81, 0x75, 0xbd, + 0x6f, 0x6d, 0x72, 0xd9, 0xfb, 0xe5, 0xf4, 0x9a, 0xef, 0xff, 0x0b, 0x00, 0x00, 0xff, 0xff, 0xfb, + 0x1e, 0xdd, 0x80, 0x1b, 0x03, 0x00, 0x00, } func (x SectionType) String() string { @@ -265,6 +428,63 @@ func (this *SectionInfo) Equal(that interface{}) bool { if this.MetadataSize != that1.MetadataSize { return false } + if !this.Layout.Equal(that1.Layout) { + return false + } + return true +} +func (this *SectionLayout) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*SectionLayout) + if !ok { + that2, ok := that.(SectionLayout) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if !this.Data.Equal(that1.Data) { + return false + } + if !this.Metadata.Equal(that1.Metadata) { + return false + } + return true +} +func (this *Region) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*Region) + if !ok { + that2, ok := that.(Region) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if this.Offset != that1.Offset { + return false + } + if this.Length != that1.Length { + return false + } return true } func (this *Metadata) GoString() string { @@ -283,11 +503,40 @@ func (this *SectionInfo) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 7) + s := make([]string, 0, 8) s = append(s, "&filemd.SectionInfo{") s = append(s, "Type: "+fmt.Sprintf("%#v", this.Type)+",\n") s = append(s, "MetadataOffset: "+fmt.Sprintf("%#v", this.MetadataOffset)+",\n") s = append(s, "MetadataSize: "+fmt.Sprintf("%#v", this.MetadataSize)+",\n") + if this.Layout != nil { + s = append(s, "Layout: "+fmt.Sprintf("%#v", this.Layout)+",\n") + } + s = append(s, "}") + return strings.Join(s, "") +} +func (this *SectionLayout) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 6) + s = append(s, "&filemd.SectionLayout{") + if this.Data != nil { + s = append(s, "Data: "+fmt.Sprintf("%#v", this.Data)+",\n") + } + if this.Metadata != nil { + s = append(s, "Metadata: "+fmt.Sprintf("%#v", this.Metadata)+",\n") + } + s = append(s, "}") + return strings.Join(s, "") +} +func (this *Region) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 6) + s = append(s, "&filemd.Region{") + s = append(s, "Offset: "+fmt.Sprintf("%#v", this.Offset)+",\n") + s = append(s, "Length: "+fmt.Sprintf("%#v", this.Length)+",\n") s = append(s, "}") return strings.Join(s, "") } @@ -356,6 +605,18 @@ func (m *SectionInfo) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.Layout != nil { + { + size, err := m.Layout.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintFilemd(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x22 + } if m.MetadataSize != 0 { i = encodeVarintFilemd(dAtA, i, uint64(m.MetadataSize)) i-- @@ -374,6 +635,86 @@ func (m *SectionInfo) MarshalToSizedBuffer(dAtA []byte) (int, error) { return len(dAtA) - i, nil } +func (m *SectionLayout) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *SectionLayout) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *SectionLayout) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.Metadata != nil { + { + size, err := m.Metadata.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintFilemd(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x12 + } + if m.Data != nil { + { + size, err := m.Data.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintFilemd(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *Region) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Region) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *Region) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.Length != 0 { + i = encodeVarintFilemd(dAtA, i, uint64(m.Length)) + i-- + dAtA[i] = 0x10 + } + if m.Offset != 0 { + i = encodeVarintFilemd(dAtA, i, uint64(m.Offset)) + i-- + dAtA[i] = 0x8 + } + return len(dAtA) - i, nil +} + func encodeVarintFilemd(dAtA []byte, offset int, v uint64) int { offset -= sovFilemd(v) base := offset @@ -415,6 +756,42 @@ func (m *SectionInfo) Size() (n int) { if m.MetadataSize != 0 { n += 1 + sovFilemd(uint64(m.MetadataSize)) } + if m.Layout != nil { + l = m.Layout.Size() + n += 1 + l + sovFilemd(uint64(l)) + } + return n +} + +func (m *SectionLayout) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Data != nil { + l = m.Data.Size() + n += 1 + l + sovFilemd(uint64(l)) + } + if m.Metadata != nil { + l = m.Metadata.Size() + n += 1 + l + sovFilemd(uint64(l)) + } + return n +} + +func (m *Region) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Offset != 0 { + n += 1 + sovFilemd(uint64(m.Offset)) + } + if m.Length != 0 { + n += 1 + sovFilemd(uint64(m.Length)) + } return n } @@ -447,6 +824,29 @@ func (this *SectionInfo) String() string { `Type:` + fmt.Sprintf("%v", this.Type) + `,`, `MetadataOffset:` + fmt.Sprintf("%v", this.MetadataOffset) + `,`, `MetadataSize:` + fmt.Sprintf("%v", this.MetadataSize) + `,`, + `Layout:` + strings.Replace(this.Layout.String(), "SectionLayout", "SectionLayout", 1) + `,`, + `}`, + }, "") + return s +} +func (this *SectionLayout) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&SectionLayout{`, + `Data:` + strings.Replace(this.Data.String(), "Region", "Region", 1) + `,`, + `Metadata:` + strings.Replace(this.Metadata.String(), "Region", "Region", 1) + `,`, + `}`, + }, "") + return s +} +func (this *Region) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&Region{`, + `Offset:` + fmt.Sprintf("%v", this.Offset) + `,`, + `Length:` + fmt.Sprintf("%v", this.Length) + `,`, `}`, }, "") return s @@ -632,6 +1032,258 @@ func (m *SectionInfo) Unmarshal(dAtA []byte) error { break } } + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Layout", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowFilemd + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthFilemd + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthFilemd + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Layout == nil { + m.Layout = &SectionLayout{} + } + if err := m.Layout.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipFilemd(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthFilemd + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthFilemd + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *SectionLayout) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowFilemd + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: SectionLayout: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: SectionLayout: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Data", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowFilemd + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthFilemd + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthFilemd + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Data == nil { + m.Data = &Region{} + } + if err := m.Data.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Metadata", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowFilemd + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthFilemd + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthFilemd + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Metadata == nil { + m.Metadata = &Region{} + } + if err := m.Metadata.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipFilemd(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthFilemd + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthFilemd + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *Region) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowFilemd + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Region: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Region: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Offset", wireType) + } + m.Offset = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowFilemd + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Offset |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Length", wireType) + } + m.Length = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowFilemd + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Length |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipFilemd(dAtA[iNdEx:]) diff --git a/pkg/dataobj/internal/metadata/filemd/filemd.proto b/pkg/dataobj/internal/metadata/filemd/filemd.proto index 0ec420fa11..49da81cf15 100644 --- a/pkg/dataobj/internal/metadata/filemd/filemd.proto +++ b/pkg/dataobj/internal/metadata/filemd/filemd.proto @@ -16,16 +16,73 @@ message Metadata { repeated SectionInfo sections = 1; } -// SectionInfo describes a section within the data object. +// SectionInfo describes a section within the data object. Each section is an +// independent unit of the data object. message SectionInfo { // Type of the section within the data object. SectionType type = 1; // Byte offset of the section's metadata from the start of the data object. - uint64 metadata_offset = 2; + // + // Deprecated: Use layout to describe the location of regions of a section. + uint64 metadata_offset = 2 [deprecated = true]; // Size of the section's metadata in bytes. - uint64 metadata_size = 3; + // + // Deprecated: Use layout to describe the location of regions of a section. + uint64 metadata_size = 3 [deprecated = true]; + + // The physical layout of the section within the data object. Setting + // layout is mutually exclusive with specifying the metadata_offset and + // metadata_size fields. + // + // For backwards compatibility with older versions of data objects where + // layout isn't provided, implementations must assume that: + // + // - A section has data, but its offset and length are unknown. + // + // - Range reads of section data are done relative to the start of the + // dataobj. + // + // If the SectionLayout is specified for a section, range reads are instead + // relative to the start of the data region. If the data region is undefined, + // then the section has no data. + // + // Setting the layout is mutually exclusive with specfiying the + // metadata_offset and metadata_size fields, and readers must reject data + // objects that set both. + SectionLayout layout = 4; +} + +// SectionLayout describes the physical placement of the regions that form a +// complete section: its data and its metadata. +// +// The metadata of a section is intended to be lightweight and is typically +// used to aid reading the section's data in smaller chunks. +// +// There are no guarantees about the placement or ordering of a section's +// regions; they may be contiguous, disjoint, or interleaved with regions from +// other sections. +// +// Implementations can use region information to ensure that a section does not +// access bytes outside of its layout. +message SectionLayout { + // The region covering the data of a section. If the data region is + // undefined, implementations must assume that the section has no data. + Region data = 1; + + // The region covering the metadata of a section. If the metadata region is + // undefined, implementations must assume that the section has no metadata. + Region metadata = 2; +} + +// Region describes a contiguous range of bytes within a data object. +message Region { + // Byte offset of the region from the start of the data object. + uint64 offset = 1; + + // Length of the region in bytes. + uint64 length = 2; } enum SectionType { diff --git a/pkg/dataobj/internal/sections/logs/iter.go b/pkg/dataobj/internal/sections/logs/iter.go index 10940ba523..ff6c669ca7 100644 --- a/pkg/dataobj/internal/sections/logs/iter.go +++ b/pkg/dataobj/internal/sections/logs/iter.go @@ -29,14 +29,12 @@ func Iter(ctx context.Context, dec encoding.Decoder) result.Seq[Record] { return err } - logsDec := dec.LogsDecoder() - for _, section := range sections { if section.Type != filemd.SECTION_TYPE_LOGS { continue } - for result := range IterSection(ctx, logsDec, section) { + for result := range IterSection(ctx, dec.LogsDecoder(section)) { if result.Err() != nil || !yield(result.MustValue()) { return result.Err() } @@ -47,19 +45,19 @@ func Iter(ctx context.Context, dec encoding.Decoder) result.Seq[Record] { }) } -func IterSection(ctx context.Context, dec encoding.LogsDecoder, section *filemd.SectionInfo) result.Seq[Record] { +func IterSection(ctx context.Context, dec encoding.LogsDecoder) result.Seq[Record] { return result.Iter(func(yield func(Record) bool) error { // We need to pull the columns twice: once from the dataset implementation // and once for the metadata to retrieve column type. // // TODO(rfratto): find a way to expose this information from // encoding.StreamsDataset to avoid the double call. - streamsColumns, err := dec.Columns(ctx, section) + streamsColumns, err := dec.Columns(ctx) if err != nil { return err } - dset := encoding.LogsDataset(dec, section) + dset := encoding.LogsDataset(dec) columns, err := result.Collect(dset.ListColumns(ctx)) if err != nil { diff --git a/pkg/dataobj/internal/sections/streams/iter.go b/pkg/dataobj/internal/sections/streams/iter.go index d4d17bbd90..3d60bb2f10 100644 --- a/pkg/dataobj/internal/sections/streams/iter.go +++ b/pkg/dataobj/internal/sections/streams/iter.go @@ -26,14 +26,12 @@ func Iter(ctx context.Context, dec encoding.Decoder) result.Seq[Stream] { return err } - streamsDec := dec.StreamsDecoder() - for _, section := range sections { if section.Type != filemd.SECTION_TYPE_STREAMS { continue } - for result := range IterSection(ctx, streamsDec, section) { + for result := range IterSection(ctx, dec.StreamsDecoder(section)) { if result.Err() != nil || !yield(result.MustValue()) { return result.Err() } @@ -44,19 +42,19 @@ func Iter(ctx context.Context, dec encoding.Decoder) result.Seq[Stream] { }) } -func IterSection(ctx context.Context, dec encoding.StreamsDecoder, section *filemd.SectionInfo) result.Seq[Stream] { +func IterSection(ctx context.Context, dec encoding.StreamsDecoder) result.Seq[Stream] { return result.Iter(func(yield func(Stream) bool) error { // We need to pull the columns twice: once from the dataset implementation // and once for the metadata to retrieve column type. // // TODO(rfratto): find a way to expose this information from // encoding.StreamsDataset to avoid the double call. - streamsColumns, err := dec.Columns(ctx, section) + streamsColumns, err := dec.Columns(ctx) if err != nil { return err } - dset := encoding.StreamsDataset(dec, section) + dset := encoding.StreamsDataset(dec) columns, err := result.Collect(dset.ListColumns(ctx)) if err != nil { diff --git a/pkg/dataobj/logs_reader.go b/pkg/dataobj/logs_reader.go index 36537f88b8..5a60ad1acf 100644 --- a/pkg/dataobj/logs_reader.go +++ b/pkg/dataobj/logs_reader.go @@ -158,18 +158,18 @@ func unsafeString(data []byte) string { } func (r *LogsReader) initReader(ctx context.Context) error { - dec := r.obj.dec.LogsDecoder() sec, err := r.findSection(ctx) if err != nil { return fmt.Errorf("finding section: %w", err) } - columnDescs, err := dec.Columns(ctx, sec) + dec := r.obj.dec.LogsDecoder(sec) + columnDescs, err := dec.Columns(ctx) if err != nil { return fmt.Errorf("reading columns: %w", err) } - dset := encoding.LogsDataset(dec, sec) + dset := encoding.LogsDataset(dec) columns, err := result.Collect(dset.ListColumns(ctx)) if err != nil { return fmt.Errorf("reading columns: %w", err) diff --git a/pkg/dataobj/querier/store_test.go b/pkg/dataobj/querier/store_test.go index f10e2d5cfd..5b4150129a 100644 --- a/pkg/dataobj/querier/store_test.go +++ b/pkg/dataobj/querier/store_test.go @@ -2,14 +2,16 @@ package querier import ( "bytes" + "cmp" "context" "os" "path/filepath" + "slices" "testing" "time" "github.com/go-kit/log" - "github.com/google/go-cmp/cmp" + gocmp "github.com/google/go-cmp/cmp" "github.com/grafana/dskit/user" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" @@ -57,22 +59,26 @@ func TestStore_SelectSamples(t *testing.T) { start: now, end: now.Add(time.Hour), want: []sampleWithLabels{ - {Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: now.UnixNano(), Value: 1}}, - {Labels: `{app="bar", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(5 * time.Second).UnixNano(), Value: 1}}, {Labels: `{app="bar", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(8 * time.Second).UnixNano(), Value: 1}}, - {Labels: `{app="foo", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(10 * time.Second).UnixNano(), Value: 1}}, - {Labels: `{app="baz", env="prod", team="a"}`, Samples: logproto.Sample{Timestamp: now.Add(12 * time.Second).UnixNano(), Value: 1}}, - {Labels: `{app="bar", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(15 * time.Second).UnixNano(), Value: 1}}, {Labels: `{app="bar", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(18 * time.Second).UnixNano(), Value: 1}}, - {Labels: `{app="foo", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(20 * time.Second).UnixNano(), Value: 1}}, - {Labels: `{app="baz", env="prod", team="a"}`, Samples: logproto.Sample{Timestamp: now.Add(22 * time.Second).UnixNano(), Value: 1}}, - {Labels: `{app="bar", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(25 * time.Second).UnixNano(), Value: 1}}, - {Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(30 * time.Second).UnixNano(), Value: 1}}, - {Labels: `{app="baz", env="prod", team="a"}`, Samples: logproto.Sample{Timestamp: now.Add(32 * time.Second).UnixNano(), Value: 1}}, - {Labels: `{app="foo", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(35 * time.Second).UnixNano(), Value: 1}}, {Labels: `{app="bar", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(38 * time.Second).UnixNano(), Value: 1}}, + + {Labels: `{app="bar", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(5 * time.Second).UnixNano(), Value: 1}}, + {Labels: `{app="bar", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(15 * time.Second).UnixNano(), Value: 1}}, + {Labels: `{app="bar", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(25 * time.Second).UnixNano(), Value: 1}}, {Labels: `{app="bar", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(40 * time.Second).UnixNano(), Value: 1}}, + + {Labels: `{app="baz", env="prod", team="a"}`, Samples: logproto.Sample{Timestamp: now.Add(12 * time.Second).UnixNano(), Value: 1}}, + {Labels: `{app="baz", env="prod", team="a"}`, Samples: logproto.Sample{Timestamp: now.Add(22 * time.Second).UnixNano(), Value: 1}}, + {Labels: `{app="baz", env="prod", team="a"}`, Samples: logproto.Sample{Timestamp: now.Add(32 * time.Second).UnixNano(), Value: 1}}, {Labels: `{app="baz", env="prod", team="a"}`, Samples: logproto.Sample{Timestamp: now.Add(42 * time.Second).UnixNano(), Value: 1}}, + + {Labels: `{app="foo", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(10 * time.Second).UnixNano(), Value: 1}}, + {Labels: `{app="foo", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(20 * time.Second).UnixNano(), Value: 1}}, + {Labels: `{app="foo", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(35 * time.Second).UnixNano(), Value: 1}}, + + {Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: now.UnixNano(), Value: 1}}, + {Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(30 * time.Second).UnixNano(), Value: 1}}, {Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(45 * time.Second).UnixNano(), Value: 1}}, {Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(50 * time.Second).UnixNano(), Value: 1}}, }, @@ -93,11 +99,12 @@ func TestStore_SelectSamples(t *testing.T) { start: now, end: now.Add(time.Hour), want: []sampleWithLabels{ - {Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: now.UnixNano(), Value: 1}}, {Labels: `{app="foo", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(10 * time.Second).UnixNano(), Value: 1}}, {Labels: `{app="foo", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(20 * time.Second).UnixNano(), Value: 1}}, - {Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(30 * time.Second).UnixNano(), Value: 1}}, {Labels: `{app="foo", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(35 * time.Second).UnixNano(), Value: 1}}, + + {Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: now.UnixNano(), Value: 1}}, + {Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(30 * time.Second).UnixNano(), Value: 1}}, {Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(45 * time.Second).UnixNano(), Value: 1}}, {Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(50 * time.Second).UnixNano(), Value: 1}}, }, @@ -108,12 +115,13 @@ func TestStore_SelectSamples(t *testing.T) { start: now, end: now.Add(time.Hour), want: []sampleWithLabels{ - {Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: 3600000000000, Value: 1}}, {Labels: `{app="bar", env="prod"}`, Samples: logproto.Sample{Timestamp: 3605000000000, Value: 1}}, {Labels: `{app="bar", env="prod"}`, Samples: logproto.Sample{Timestamp: 3615000000000, Value: 1}}, {Labels: `{app="bar", env="prod"}`, Samples: logproto.Sample{Timestamp: 3625000000000, Value: 1}}, - {Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: 3630000000000, Value: 1}}, {Labels: `{app="bar", env="prod"}`, Samples: logproto.Sample{Timestamp: 3640000000000, Value: 1}}, + + {Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: 3600000000000, Value: 1}}, + {Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: 3630000000000, Value: 1}}, {Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: 3645000000000, Value: 1}}, {Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: 3650000000000, Value: 1}}, }, @@ -125,17 +133,23 @@ func TestStore_SelectSamples(t *testing.T) { end: now.Add(time.Hour), shards: []string{"0_of_2"}, want: []sampleWithLabels{ - {Labels: `{app="bar", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(5 * time.Second).UnixNano(), Value: 1}}, {Labels: `{app="bar", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(8 * time.Second).UnixNano(), Value: 1}}, - {Labels: `{app="baz", env="prod", team="a"}`, Samples: logproto.Sample{Timestamp: now.Add(12 * time.Second).UnixNano(), Value: 1}}, - {Labels: `{app="bar", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(15 * time.Second).UnixNano(), Value: 1}}, {Labels: `{app="bar", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(18 * time.Second).UnixNano(), Value: 1}}, - {Labels: `{app="baz", env="prod", team="a"}`, Samples: logproto.Sample{Timestamp: now.Add(22 * time.Second).UnixNano(), Value: 1}}, - {Labels: `{app="bar", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(25 * time.Second).UnixNano(), Value: 1}}, - {Labels: `{app="baz", env="prod", team="a"}`, Samples: logproto.Sample{Timestamp: now.Add(32 * time.Second).UnixNano(), Value: 1}}, {Labels: `{app="bar", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(38 * time.Second).UnixNano(), Value: 1}}, + + {Labels: `{app="bar", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(5 * time.Second).UnixNano(), Value: 1}}, + {Labels: `{app="bar", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(15 * time.Second).UnixNano(), Value: 1}}, + {Labels: `{app="bar", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(25 * time.Second).UnixNano(), Value: 1}}, {Labels: `{app="bar", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(40 * time.Second).UnixNano(), Value: 1}}, - {Labels: `{app="baz", env="prod", team="a"}`, Samples: logproto.Sample{Timestamp: now.Add(42 * time.Second).UnixNano(), Value: 1}}, + + {Labels: `{app="foo", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(10 * time.Second).UnixNano(), Value: 1}}, + {Labels: `{app="foo", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(20 * time.Second).UnixNano(), Value: 1}}, + {Labels: `{app="foo", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(35 * time.Second).UnixNano(), Value: 1}}, + + {Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: now.UnixNano(), Value: 1}}, + {Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(30 * time.Second).UnixNano(), Value: 1}}, + {Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(45 * time.Second).UnixNano(), Value: 1}}, + {Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(50 * time.Second).UnixNano(), Value: 1}}, }, }, { @@ -145,13 +159,10 @@ func TestStore_SelectSamples(t *testing.T) { end: now.Add(time.Hour), shards: []string{"1_of_2"}, want: []sampleWithLabels{ - {Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: now.UnixNano(), Value: 1}}, - {Labels: `{app="foo", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(10 * time.Second).UnixNano(), Value: 1}}, - {Labels: `{app="foo", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(20 * time.Second).UnixNano(), Value: 1}}, - {Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(30 * time.Second).UnixNano(), Value: 1}}, - {Labels: `{app="foo", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(35 * time.Second).UnixNano(), Value: 1}}, - {Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(45 * time.Second).UnixNano(), Value: 1}}, - {Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(50 * time.Second).UnixNano(), Value: 1}}, + {Labels: `{app="baz", env="prod", team="a"}`, Samples: logproto.Sample{Timestamp: now.Add(12 * time.Second).UnixNano(), Value: 1}}, + {Labels: `{app="baz", env="prod", team="a"}`, Samples: logproto.Sample{Timestamp: now.Add(22 * time.Second).UnixNano(), Value: 1}}, + {Labels: `{app="baz", env="prod", team="a"}`, Samples: logproto.Sample{Timestamp: now.Add(32 * time.Second).UnixNano(), Value: 1}}, + {Labels: `{app="baz", env="prod", team="a"}`, Samples: logproto.Sample{Timestamp: now.Add(42 * time.Second).UnixNano(), Value: 1}}, }, }, { @@ -171,6 +182,7 @@ func TestStore_SelectSamples(t *testing.T) { want: []sampleWithLabels{ {Labels: `{app="foo", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(10 * time.Second).UnixNano(), Value: 1}}, {Labels: `{app="foo", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(20 * time.Second).UnixNano(), Value: 1}}, + {Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(45 * time.Second).UnixNano(), Value: 1}}, {Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(50 * time.Second).UnixNano(), Value: 1}}, }, @@ -191,7 +203,19 @@ func TestStore_SelectSamples(t *testing.T) { require.NoError(t, err) samples, err := readAllSamples(it) require.NoError(t, err) - if diff := cmp.Diff(tt.want, samples); diff != "" { + + // Sort the output by labels and timestamp; changes to how data objects + // are written can change the order of what we see, which is not what we + // care to test here. + slices.SortFunc(samples, func(a, b sampleWithLabels) int { + res := cmp.Compare(a.Labels, b.Labels) + if res == 0 { + return cmp.Compare(a.Samples.Timestamp, b.Samples.Timestamp) + } + return res + }) + + if diff := gocmp.Diff(tt.want, samples); diff != "" { t.Errorf("samples mismatch (-want +got):\n%s", diff) } }) @@ -227,22 +251,26 @@ func TestStore_SelectLogs(t *testing.T) { limit: 100, direction: logproto.FORWARD, want: []entryWithLabels{ - {Labels: `{app="foo", env="prod"}`, Entry: logproto.Entry{Timestamp: now, Line: "foo1"}}, - {Labels: `{app="bar", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(5 * time.Second), Line: "bar1"}}, {Labels: `{app="bar", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(8 * time.Second), Line: "bar5"}}, - {Labels: `{app="foo", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(10 * time.Second), Line: "foo5"}}, - {Labels: `{app="baz", env="prod", team="a"}`, Entry: logproto.Entry{Timestamp: now.Add(12 * time.Second), Line: "baz1"}}, - {Labels: `{app="bar", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(15 * time.Second), Line: "bar2"}}, {Labels: `{app="bar", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(18 * time.Second), Line: "bar6"}}, - {Labels: `{app="foo", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(20 * time.Second), Line: "foo6"}}, - {Labels: `{app="baz", env="prod", team="a"}`, Entry: logproto.Entry{Timestamp: now.Add(22 * time.Second), Line: "baz2"}}, - {Labels: `{app="bar", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(25 * time.Second), Line: "bar3"}}, - {Labels: `{app="foo", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(30 * time.Second), Line: "foo2"}}, - {Labels: `{app="baz", env="prod", team="a"}`, Entry: logproto.Entry{Timestamp: now.Add(32 * time.Second), Line: "baz3"}}, - {Labels: `{app="foo", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(35 * time.Second), Line: "foo7"}}, {Labels: `{app="bar", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(38 * time.Second), Line: "bar7"}}, + + {Labels: `{app="bar", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(5 * time.Second), Line: "bar1"}}, + {Labels: `{app="bar", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(15 * time.Second), Line: "bar2"}}, + {Labels: `{app="bar", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(25 * time.Second), Line: "bar3"}}, {Labels: `{app="bar", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(40 * time.Second), Line: "bar4"}}, + + {Labels: `{app="baz", env="prod", team="a"}`, Entry: logproto.Entry{Timestamp: now.Add(12 * time.Second), Line: "baz1"}}, + {Labels: `{app="baz", env="prod", team="a"}`, Entry: logproto.Entry{Timestamp: now.Add(22 * time.Second), Line: "baz2"}}, + {Labels: `{app="baz", env="prod", team="a"}`, Entry: logproto.Entry{Timestamp: now.Add(32 * time.Second), Line: "baz3"}}, {Labels: `{app="baz", env="prod", team="a"}`, Entry: logproto.Entry{Timestamp: now.Add(42 * time.Second), Line: "baz4"}}, + + {Labels: `{app="foo", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(10 * time.Second), Line: "foo5"}}, + {Labels: `{app="foo", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(20 * time.Second), Line: "foo6"}}, + {Labels: `{app="foo", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(35 * time.Second), Line: "foo7"}}, + + {Labels: `{app="foo", env="prod"}`, Entry: logproto.Entry{Timestamp: now, Line: "foo1"}}, + {Labels: `{app="foo", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(30 * time.Second), Line: "foo2"}}, {Labels: `{app="foo", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(45 * time.Second), Line: "foo3"}}, {Labels: `{app="foo", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(50 * time.Second), Line: "foo4"}}, }, @@ -267,11 +295,12 @@ func TestStore_SelectLogs(t *testing.T) { limit: 100, direction: logproto.FORWARD, want: []entryWithLabels{ - {Labels: `{app="foo", env="prod"}`, Entry: logproto.Entry{Timestamp: now, Line: "foo1"}}, {Labels: `{app="foo", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(10 * time.Second), Line: "foo5"}}, {Labels: `{app="foo", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(20 * time.Second), Line: "foo6"}}, - {Labels: `{app="foo", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(30 * time.Second), Line: "foo2"}}, {Labels: `{app="foo", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(35 * time.Second), Line: "foo7"}}, + + {Labels: `{app="foo", env="prod"}`, Entry: logproto.Entry{Timestamp: now, Line: "foo1"}}, + {Labels: `{app="foo", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(30 * time.Second), Line: "foo2"}}, {Labels: `{app="foo", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(45 * time.Second), Line: "foo3"}}, {Labels: `{app="foo", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(50 * time.Second), Line: "foo4"}}, }, @@ -285,17 +314,23 @@ func TestStore_SelectLogs(t *testing.T) { limit: 100, direction: logproto.FORWARD, want: []entryWithLabels{ - {Labels: `{app="bar", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(5 * time.Second), Line: "bar1"}}, {Labels: `{app="bar", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(8 * time.Second), Line: "bar5"}}, - {Labels: `{app="baz", env="prod", team="a"}`, Entry: logproto.Entry{Timestamp: now.Add(12 * time.Second), Line: "baz1"}}, - {Labels: `{app="bar", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(15 * time.Second), Line: "bar2"}}, {Labels: `{app="bar", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(18 * time.Second), Line: "bar6"}}, - {Labels: `{app="baz", env="prod", team="a"}`, Entry: logproto.Entry{Timestamp: now.Add(22 * time.Second), Line: "baz2"}}, - {Labels: `{app="bar", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(25 * time.Second), Line: "bar3"}}, - {Labels: `{app="baz", env="prod", team="a"}`, Entry: logproto.Entry{Timestamp: now.Add(32 * time.Second), Line: "baz3"}}, {Labels: `{app="bar", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(38 * time.Second), Line: "bar7"}}, + + {Labels: `{app="bar", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(5 * time.Second), Line: "bar1"}}, + {Labels: `{app="bar", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(15 * time.Second), Line: "bar2"}}, + {Labels: `{app="bar", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(25 * time.Second), Line: "bar3"}}, {Labels: `{app="bar", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(40 * time.Second), Line: "bar4"}}, - {Labels: `{app="baz", env="prod", team="a"}`, Entry: logproto.Entry{Timestamp: now.Add(42 * time.Second), Line: "baz4"}}, + + {Labels: `{app="foo", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(10 * time.Second), Line: "foo5"}}, + {Labels: `{app="foo", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(20 * time.Second), Line: "foo6"}}, + {Labels: `{app="foo", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(35 * time.Second), Line: "foo7"}}, + + {Labels: `{app="foo", env="prod"}`, Entry: logproto.Entry{Timestamp: now, Line: "foo1"}}, + {Labels: `{app="foo", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(30 * time.Second), Line: "foo2"}}, + {Labels: `{app="foo", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(45 * time.Second), Line: "foo3"}}, + {Labels: `{app="foo", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(50 * time.Second), Line: "foo4"}}, }, }, { @@ -307,13 +342,10 @@ func TestStore_SelectLogs(t *testing.T) { limit: 100, direction: logproto.FORWARD, want: []entryWithLabels{ - {Labels: `{app="foo", env="prod"}`, Entry: logproto.Entry{Timestamp: now, Line: "foo1"}}, - {Labels: `{app="foo", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(10 * time.Second), Line: "foo5"}}, - {Labels: `{app="foo", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(20 * time.Second), Line: "foo6"}}, - {Labels: `{app="foo", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(30 * time.Second), Line: "foo2"}}, - {Labels: `{app="foo", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(35 * time.Second), Line: "foo7"}}, - {Labels: `{app="foo", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(45 * time.Second), Line: "foo3"}}, - {Labels: `{app="foo", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(50 * time.Second), Line: "foo4"}}, + {Labels: `{app="baz", env="prod", team="a"}`, Entry: logproto.Entry{Timestamp: now.Add(12 * time.Second), Line: "baz1"}}, + {Labels: `{app="baz", env="prod", team="a"}`, Entry: logproto.Entry{Timestamp: now.Add(22 * time.Second), Line: "baz2"}}, + {Labels: `{app="baz", env="prod", team="a"}`, Entry: logproto.Entry{Timestamp: now.Add(32 * time.Second), Line: "baz3"}}, + {Labels: `{app="baz", env="prod", team="a"}`, Entry: logproto.Entry{Timestamp: now.Add(42 * time.Second), Line: "baz4"}}, }, }, { @@ -337,6 +369,7 @@ func TestStore_SelectLogs(t *testing.T) { want: []entryWithLabels{ {Labels: `{app="foo", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(10 * time.Second), Line: "foo5"}}, {Labels: `{app="foo", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(20 * time.Second), Line: "foo6"}}, + {Labels: `{app="foo", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(45 * time.Second), Line: "foo3"}}, {Labels: `{app="foo", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(50 * time.Second), Line: "foo4"}}, }, @@ -359,7 +392,19 @@ func TestStore_SelectLogs(t *testing.T) { require.NoError(t, err) entries, err := readAllEntries(it) require.NoError(t, err) - if diff := cmp.Diff(tt.want, entries); diff != "" { + + // Sort the output by labels; changes to how data objects are written can + // change the order of what we see, which is not what we care to test + // here. + slices.SortFunc(entries, func(a, b entryWithLabels) int { + res := cmp.Compare(a.Labels, b.Labels) + if res == 0 { + return a.Entry.Timestamp.Compare(b.Entry.Timestamp) + } + return res + }) + + if diff := gocmp.Diff(tt.want, entries); diff != "" { t.Errorf("entries mismatch (-want +got):\n%s", diff) } }) diff --git a/pkg/dataobj/streams_reader.go b/pkg/dataobj/streams_reader.go index 23b03177dc..aa5019a142 100644 --- a/pkg/dataobj/streams_reader.go +++ b/pkg/dataobj/streams_reader.go @@ -127,18 +127,18 @@ func (r *StreamsReader) Read(ctx context.Context, s []Stream) (int, error) { } func (r *StreamsReader) initReader(ctx context.Context) error { - dec := r.obj.dec.StreamsDecoder() sec, err := r.findSection(ctx) if err != nil { return fmt.Errorf("finding section: %w", err) } - columnDescs, err := dec.Columns(ctx, sec) + dec := r.obj.dec.StreamsDecoder(sec) + columnDescs, err := dec.Columns(ctx) if err != nil { return fmt.Errorf("reading columns: %w", err) } - dset := encoding.StreamsDataset(dec, sec) + dset := encoding.StreamsDataset(dec) columns, err := result.Collect(dset.ListColumns(ctx)) if err != nil { return fmt.Errorf("reading columns: %w", err) diff --git a/pkg/dataobj/tools/inspect.go b/pkg/dataobj/tools/inspect.go index 0a1e37a7d6..0cc6a61f70 100644 --- a/pkg/dataobj/tools/inspect.go +++ b/pkg/dataobj/tools/inspect.go @@ -37,9 +37,9 @@ func printStreamInfo(reader encoding.Decoder, section *filemd.SectionInfo) { return } - dec := reader.StreamsDecoder() + dec := reader.StreamsDecoder(section) fmt.Println("---- Streams Section ----") - cols, err := dec.Columns(context.Background(), section) + cols, err := dec.Columns(context.Background()) if err != nil { log.Printf("failed to read columns for section %s: %v", section.Type.String(), err) return @@ -63,8 +63,8 @@ func printLogsInfo(reader encoding.Decoder, section *filemd.SectionInfo) { } fmt.Println("---- Logs Section ----") - dec := reader.LogsDecoder() - cols, err := dec.Columns(context.Background(), section) + dec := reader.LogsDecoder(section) + cols, err := dec.Columns(context.Background()) if err != nil { log.Printf("failed to read columns for section %s: %v", section.Type.String(), err) return