chore(dataobj): add "uncompressed size" column to streams section (#16322)

pull/16355/head
Ashwanth 11 months ago committed by GitHub
parent b52c8e9dd8
commit 651d410eea
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 7
      pkg/dataobj/builder.go
  2. 77
      pkg/dataobj/internal/metadata/streamsmd/streamsmd.pb.go
  3. 5
      pkg/dataobj/internal/metadata/streamsmd/streamsmd.proto
  4. 6
      pkg/dataobj/internal/sections/streams/iter.go
  5. 21
      pkg/dataobj/internal/sections/streams/streams.go
  6. 33
      pkg/dataobj/internal/sections/streams/streams_test.go
  7. 12
      pkg/dataobj/streams_reader.go
  8. 31
      pkg/dataobj/streams_reader_test.go

@ -185,7 +185,12 @@ func (b *Builder) Append(stream logproto.Stream) error {
defer timer.ObserveDuration()
for _, entry := range stream.Entries {
streamID := b.streams.Record(ls, entry.Timestamp)
sz := int64(len(entry.Line))
for _, md := range entry.StructuredMetadata {
sz += int64(len(md.Value))
}
streamID := b.streams.Record(ls, entry.Timestamp, sz)
b.logs.Append(logs.Record{
StreamID: streamID,

@ -45,6 +45,10 @@ const (
COLUMN_TYPE_LABEL ColumnType = 4
// COLUMN_TYPE_ROWS is a column indicating the number of rows for a stream.
COLUMN_TYPE_ROWS ColumnType = 5
// COLUMN_TYPE_UNCOMPRESSED_SIZE is a column indicating the uncompressed size
// of a stream. Size of a stream is the sum of the length of all log lines and
// the length of all structured metadata values
COLUMN_TYPE_UNCOMPRESSED_SIZE ColumnType = 6
)
var ColumnType_name = map[int32]string{
@ -54,15 +58,17 @@ var ColumnType_name = map[int32]string{
3: "COLUMN_TYPE_MAX_TIMESTAMP",
4: "COLUMN_TYPE_LABEL",
5: "COLUMN_TYPE_ROWS",
6: "COLUMN_TYPE_UNCOMPRESSED_SIZE",
}
var ColumnType_value = map[string]int32{
"COLUMN_TYPE_UNSPECIFIED": 0,
"COLUMN_TYPE_STREAM_ID": 1,
"COLUMN_TYPE_MIN_TIMESTAMP": 2,
"COLUMN_TYPE_MAX_TIMESTAMP": 3,
"COLUMN_TYPE_LABEL": 4,
"COLUMN_TYPE_ROWS": 5,
"COLUMN_TYPE_UNSPECIFIED": 0,
"COLUMN_TYPE_STREAM_ID": 1,
"COLUMN_TYPE_MIN_TIMESTAMP": 2,
"COLUMN_TYPE_MAX_TIMESTAMP": 3,
"COLUMN_TYPE_LABEL": 4,
"COLUMN_TYPE_ROWS": 5,
"COLUMN_TYPE_UNCOMPRESSED_SIZE": 6,
}
func (ColumnType) EnumDescriptor() ([]byte, []int) {
@ -271,35 +277,36 @@ func init() {
}
var fileDescriptor_7b94842ca2f0bf8d = []byte{
// 439 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x52, 0x3d, 0x6f, 0xd3, 0x40,
0x18, 0xf6, 0xf5, 0x03, 0xaa, 0x43, 0xaa, 0xcc, 0x89, 0x8a, 0x94, 0x8a, 0x53, 0x14, 0xa9, 0xa2,
0x62, 0xf0, 0x09, 0x3a, 0x20, 0xd4, 0xc9, 0x49, 0x8c, 0x64, 0x29, 0x97, 0x46, 0xb6, 0x2b, 0x3e,
0x16, 0xeb, 0x92, 0x5c, 0x8c, 0x69, 0xec, 0xb3, 0xec, 0x6b, 0x45, 0x37, 0x26, 0x66, 0x7e, 0x06,
0x1b, 0x7f, 0x83, 0x31, 0x63, 0x47, 0xe2, 0x2c, 0x8c, 0xfd, 0x09, 0xc8, 0x8e, 0x5d, 0x1b, 0x81,
0xd2, 0x2c, 0xd6, 0xab, 0xe7, 0xcb, 0xef, 0x73, 0x7a, 0xe1, 0xab, 0xe8, 0xdc, 0x23, 0x63, 0x26,
0x99, 0x18, 0x7e, 0x22, 0x7e, 0x28, 0x79, 0x1c, 0xb2, 0x29, 0x09, 0xb8, 0x64, 0x19, 0x48, 0x12,
0x19, 0x73, 0x16, 0x24, 0xc1, 0xb8, 0x9a, 0xb4, 0x28, 0x16, 0x52, 0xa0, 0x83, 0xc2, 0xa4, 0x95,
0x5a, 0xad, 0x50, 0x68, 0x97, 0x2f, 0x9e, 0xdc, 0x91, 0x9a, 0x7d, 0x12, 0x2e, 0x83, 0x71, 0x35,
0x2d, 0x53, 0x5b, 0x14, 0xee, 0xd0, 0x42, 0x85, 0x74, 0x78, 0x7f, 0x24, 0xa6, 0x17, 0x41, 0x98,
0x34, 0x40, 0x73, 0xf3, 0xe8, 0xc1, 0xcb, 0x67, 0xda, 0x8a, 0x7f, 0x6a, 0x9d, 0x5c, 0xdb, 0xe5,
0xc9, 0xc8, 0x2a, 0x7d, 0xad, 0xaf, 0x00, 0xc2, 0x0a, 0x47, 0x27, 0x70, 0xcb, 0x0f, 0x27, 0xa2,
0x01, 0x9a, 0xe0, 0xff, 0x71, 0xc5, 0x3a, 0x55, 0x9c, 0x19, 0x4e, 0x84, 0x95, 0x9b, 0x32, 0xb3,
0xbc, 0x8a, 0x78, 0x63, 0xa3, 0x09, 0x8e, 0x76, 0xd7, 0xda, 0xc5, 0xb9, 0x8a, 0xb8, 0x95, 0x9b,
0x5a, 0x14, 0xee, 0x2e, 0xb1, 0xdb, 0x76, 0x27, 0x70, 0x3b, 0x62, 0x1e, 0x2f, 0xbb, 0x1d, 0xae,
0xcc, 0x1b, 0x30, 0x8f, 0xe7, 0xcd, 0x96, 0x9e, 0x96, 0x01, 0x77, 0x4a, 0x08, 0xbd, 0xfe, 0xab,
0xd4, 0xe1, 0xca, 0x52, 0x99, 0xa9, 0xaa, 0xf4, 0xfc, 0xc7, 0xed, 0xf3, 0x64, 0xab, 0xa2, 0x03,
0xf8, 0xb8, 0x73, 0xda, 0x3b, 0xa3, 0x7d, 0xd7, 0x79, 0x3f, 0x30, 0xdc, 0xb3, 0xbe, 0x3d, 0x30,
0x3a, 0xe6, 0x1b, 0xd3, 0xe8, 0xaa, 0x0a, 0xda, 0x87, 0x7b, 0x75, 0xd2, 0x76, 0x2c, 0x43, 0xa7,
0xae, 0xd9, 0x55, 0x01, 0x7a, 0x0a, 0xf7, 0xeb, 0x14, 0x35, 0xfb, 0xae, 0x63, 0x52, 0xc3, 0x76,
0x74, 0x3a, 0x50, 0x37, 0xfe, 0xa1, 0xf5, 0x77, 0x35, 0x7a, 0x13, 0xed, 0xc1, 0x87, 0x75, 0xba,
0xa7, 0xb7, 0x8d, 0x9e, 0xba, 0x85, 0x1e, 0x41, 0xb5, 0x0e, 0x5b, 0xa7, 0x6f, 0x6d, 0x75, 0xbb,
0xfd, 0x79, 0x36, 0xc7, 0xca, 0xf5, 0x1c, 0x2b, 0x37, 0x73, 0x0c, 0xbe, 0xa4, 0x18, 0x7c, 0x4f,
0x31, 0xf8, 0x99, 0x62, 0x30, 0x4b, 0x31, 0xf8, 0x95, 0x62, 0xf0, 0x3b, 0xc5, 0xca, 0x4d, 0x8a,
0xc1, 0xb7, 0x05, 0x56, 0x66, 0x0b, 0xac, 0x5c, 0x2f, 0xb0, 0xf2, 0xa1, 0xed, 0xf9, 0xf2, 0xe3,
0xc5, 0x50, 0x1b, 0x89, 0x80, 0x78, 0x31, 0x9b, 0xb0, 0x90, 0x91, 0xa9, 0x38, 0xf7, 0xc9, 0xe5,
0x31, 0x59, 0xf3, 0xfe, 0x87, 0xf7, 0xf2, 0x03, 0x3d, 0xfe, 0x13, 0x00, 0x00, 0xff, 0xff, 0x66,
0x7a, 0xe2, 0x3c, 0x31, 0x03, 0x00, 0x00,
// 457 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x53, 0xbd, 0x6e, 0xd3, 0x40,
0x00, 0xf6, 0xf5, 0x8f, 0xea, 0x90, 0x2a, 0x73, 0xa2, 0x22, 0xa5, 0xea, 0x29, 0x44, 0xaa, 0xa8,
0x18, 0x7c, 0x82, 0x0e, 0x08, 0x75, 0x72, 0x92, 0x43, 0xb2, 0x94, 0x4b, 0x2c, 0xdb, 0x15, 0xd0,
0xc5, 0xba, 0x24, 0x17, 0x13, 0x1a, 0xfb, 0xac, 0xf8, 0x5a, 0xd1, 0x8d, 0x89, 0x99, 0xc7, 0xe0,
0x51, 0x18, 0x23, 0xa6, 0x8e, 0xc4, 0x59, 0x18, 0xfb, 0x08, 0xc8, 0x4e, 0x52, 0xbb, 0x02, 0x85,
0x2e, 0xd6, 0xe9, 0xfb, 0xf3, 0xf7, 0x59, 0x3e, 0xf8, 0x3a, 0x3e, 0x0f, 0x48, 0x9f, 0x2b, 0x2e,
0xbb, 0x9f, 0xc8, 0x30, 0x52, 0x62, 0x1c, 0xf1, 0x11, 0x09, 0x85, 0xe2, 0x19, 0x48, 0x12, 0x35,
0x16, 0x3c, 0x4c, 0xc2, 0x7e, 0x71, 0x32, 0xe2, 0xb1, 0x54, 0x12, 0xed, 0x2f, 0x4c, 0xc6, 0x52,
0x6b, 0x2c, 0x14, 0xc6, 0xe5, 0xcb, 0xa7, 0xff, 0x49, 0xcd, 0x1e, 0x89, 0x50, 0x61, 0xbf, 0x38,
0xcd, 0x53, 0x6b, 0x0c, 0x6e, 0xb3, 0x85, 0x0a, 0x99, 0xf0, 0x41, 0x4f, 0x8e, 0x2e, 0xc2, 0x28,
0xa9, 0x80, 0xea, 0xfa, 0xd1, 0xc3, 0x57, 0xcf, 0x8d, 0x15, 0xef, 0x34, 0x1a, 0xb9, 0xb6, 0x29,
0x92, 0x9e, 0xb3, 0xf4, 0xd5, 0xbe, 0x02, 0x08, 0x0b, 0x1c, 0x9d, 0xc0, 0x8d, 0x61, 0x34, 0x90,
0x15, 0x50, 0x05, 0xff, 0x8e, 0x5b, 0xd4, 0x29, 0xe2, 0xac, 0x68, 0x20, 0x9d, 0xdc, 0x94, 0x99,
0xd5, 0x55, 0x2c, 0x2a, 0x6b, 0x55, 0x70, 0xb4, 0x73, 0xaf, 0x2e, 0xde, 0x55, 0x2c, 0x9c, 0xdc,
0x54, 0x63, 0x70, 0x67, 0x8e, 0xdd, 0xae, 0x3b, 0x81, 0x9b, 0x31, 0x0f, 0xc4, 0x72, 0xdb, 0xe1,
0xca, 0x3c, 0x9b, 0x07, 0x22, 0x5f, 0x36, 0xf7, 0xd4, 0x28, 0xdc, 0x5e, 0x42, 0xe8, 0xcd, 0x9d,
0x51, 0x87, 0x2b, 0x47, 0x65, 0xa6, 0x62, 0xd2, 0x8b, 0x9f, 0xb7, 0x9f, 0x27, 0xab, 0x8a, 0xf6,
0xe1, 0x93, 0x46, 0xa7, 0x75, 0xca, 0xda, 0xbe, 0xf7, 0xc1, 0xa6, 0xfe, 0x69, 0xdb, 0xb5, 0x69,
0xc3, 0x7a, 0x6b, 0xd1, 0xa6, 0xae, 0xa1, 0x3d, 0xb8, 0x5b, 0x26, 0x5d, 0xcf, 0xa1, 0x26, 0xf3,
0xad, 0xa6, 0x0e, 0xd0, 0x01, 0xdc, 0x2b, 0x53, 0xcc, 0x6a, 0xfb, 0x9e, 0xc5, 0xa8, 0xeb, 0x99,
0xcc, 0xd6, 0xd7, 0xfe, 0xa2, 0xcd, 0xf7, 0x25, 0x7a, 0x1d, 0xed, 0xc2, 0x47, 0x65, 0xba, 0x65,
0xd6, 0x69, 0x4b, 0xdf, 0x40, 0x8f, 0xa1, 0x5e, 0x86, 0x9d, 0xce, 0x3b, 0x57, 0xdf, 0x44, 0xcf,
0xe0, 0xc1, 0xdd, 0x8a, 0x8d, 0x0e, 0xb3, 0x1d, 0xea, 0xba, 0xb4, 0xe9, 0xbb, 0xd6, 0x19, 0xd5,
0xb7, 0xea, 0x9f, 0x27, 0x53, 0xac, 0x5d, 0x4f, 0xb1, 0x76, 0x33, 0xc5, 0xe0, 0x4b, 0x8a, 0xc1,
0xf7, 0x14, 0x83, 0x1f, 0x29, 0x06, 0x93, 0x14, 0x83, 0x5f, 0x29, 0x06, 0xbf, 0x53, 0xac, 0xdd,
0xa4, 0x18, 0x7c, 0x9b, 0x61, 0x6d, 0x32, 0xc3, 0xda, 0xf5, 0x0c, 0x6b, 0x67, 0xf5, 0x60, 0xa8,
0x3e, 0x5e, 0x74, 0x8d, 0x9e, 0x0c, 0x49, 0x30, 0xe6, 0x03, 0x1e, 0x71, 0x32, 0x92, 0xe7, 0x43,
0x72, 0x79, 0x4c, 0xee, 0x79, 0x45, 0xba, 0x5b, 0xf9, 0x3f, 0x7c, 0xfc, 0x27, 0x00, 0x00, 0xff,
0xff, 0x87, 0x13, 0x9c, 0x9b, 0x54, 0x03, 0x00, 0x00,
}
func (x ColumnType) String() string {

@ -45,6 +45,11 @@ enum ColumnType {
// COLUMN_TYPE_ROWS is a column indicating the number of rows for a stream.
COLUMN_TYPE_ROWS = 5;
// COLUMN_TYPE_UNCOMPRESSED_SIZE is a column indicating the uncompressed size
// of a stream. Size of a stream is the sum of the length of all log lines and
// the length of all structured metadata values
COLUMN_TYPE_UNCOMPRESSED_SIZE = 6;
}
// ColumnMetadata describes the metadata for a column.

@ -113,6 +113,12 @@ func decodeRow(columns []*streamsmd.ColumnDesc, row dataset.Row) (Stream, error)
}
stream.Rows = int(columnValue.Int64())
case streamsmd.COLUMN_TYPE_UNCOMPRESSED_SIZE:
if ty := columnValue.Type(); ty != datasetmd.VALUE_TYPE_INT64 {
return stream, fmt.Errorf("invalid type %s for %s", ty, column.Type)
}
stream.UncompressedSize = columnValue.Int64()
case streamsmd.COLUMN_TYPE_LABEL:
if ty := columnValue.Type(); ty != datasetmd.VALUE_TYPE_STRING {
return stream, fmt.Errorf("invalid type %s for %s", ty, column.Type)

@ -28,10 +28,11 @@ type Stream struct {
// object.
ID int64
Labels labels.Labels // Stream labels.
MinTimestamp time.Time // Minimum timestamp in the stream.
MaxTimestamp time.Time // Maximum timestamp in the stream.
Rows int // Number of rows in the stream.
Labels labels.Labels // Stream labels.
MinTimestamp time.Time // Minimum timestamp in the stream.
MaxTimestamp time.Time // Maximum timestamp in the stream.
UncompressedSize int64 // Uncompressed size of the log lines and stuctured metadata values in the stream.
Rows int // Number of rows in the stream.
}
// Reset zeroes all values in the stream struct so it can be reused.
@ -40,6 +41,7 @@ func (s *Stream) Reset() {
s.Labels = nil
s.MinTimestamp = time.Time{}
s.MaxTimestamp = time.Time{}
s.UncompressedSize = 0
s.Rows = 0
}
@ -90,9 +92,10 @@ func (s *Streams) TimeRange() (time.Time, time.Time) {
// Record a stream record within the Streams section. The provided timestamp is
// used to track the minimum and maximum timestamp of a stream. The number of
// calls to Record is used to track the number of rows for a stream.
// The recordSize is used to track the uncompressed size of the stream.
//
// The stream ID of the recorded stream is returned.
func (s *Streams) Record(streamLabels labels.Labels, ts time.Time) int64 {
func (s *Streams) Record(streamLabels labels.Labels, ts time.Time, recordSize int64) int64 {
ts = ts.UTC()
s.observeRecord(ts)
@ -104,6 +107,8 @@ func (s *Streams) Record(streamLabels labels.Labels, ts time.Time) int64 {
stream.MaxTimestamp = ts
}
stream.Rows++
stream.UncompressedSize += recordSize
return stream.ID
}
@ -238,6 +243,10 @@ func (s *Streams) EncodeTo(enc *encoding.Encoder) error {
if err != nil {
return fmt.Errorf("creating rows column: %w", err)
}
uncompressedSizeBuilder, err := numberColumnBuilder(s.pageSize)
if err != nil {
return fmt.Errorf("creating uncompressed size column: %w", err)
}
var (
labelBuilders []*dataset.ColumnBuilder
@ -275,6 +284,7 @@ func (s *Streams) EncodeTo(enc *encoding.Encoder) error {
_ = minTimestampBuilder.Append(i, dataset.Int64Value(stream.MinTimestamp.UnixNano()))
_ = maxTimestampBuilder.Append(i, dataset.Int64Value(stream.MaxTimestamp.UnixNano()))
_ = rowsCountBuilder.Append(i, dataset.Int64Value(int64(stream.Rows)))
_ = uncompressedSizeBuilder.Append(i, dataset.Int64Value(stream.UncompressedSize))
for _, label := range stream.Labels {
builder, err := getLabelColumn(label.Name)
@ -304,6 +314,7 @@ func (s *Streams) EncodeTo(enc *encoding.Encoder) error {
errs = append(errs, encodeColumn(streamsEnc, streamsmd.COLUMN_TYPE_MIN_TIMESTAMP, minTimestampBuilder))
errs = append(errs, encodeColumn(streamsEnc, streamsmd.COLUMN_TYPE_MAX_TIMESTAMP, maxTimestampBuilder))
errs = append(errs, encodeColumn(streamsEnc, streamsmd.COLUMN_TYPE_ROWS, rowsCountBuilder))
errs = append(errs, encodeColumn(streamsEnc, streamsmd.COLUMN_TYPE_UNCOMPRESSED_SIZE, uncompressedSizeBuilder))
if err := errors.Join(errs...); err != nil {
return fmt.Errorf("encoding columns: %w", err)
}

@ -17,18 +17,19 @@ func Test(t *testing.T) {
type ent struct {
Labels labels.Labels
Time time.Time
Size int64
}
tt := []ent{
{labels.FromStrings("cluster", "test", "app", "foo"), time.Unix(10, 0).UTC()},
{labels.FromStrings("cluster", "test", "app", "bar", "special", "yes"), time.Unix(100, 0).UTC()},
{labels.FromStrings("cluster", "test", "app", "foo"), time.Unix(15, 0).UTC()},
{labels.FromStrings("cluster", "test", "app", "foo"), time.Unix(9, 0).UTC()},
{labels.FromStrings("cluster", "test", "app", "foo"), time.Unix(10, 0).UTC(), 10},
{labels.FromStrings("cluster", "test", "app", "bar", "special", "yes"), time.Unix(100, 0).UTC(), 20},
{labels.FromStrings("cluster", "test", "app", "foo"), time.Unix(15, 0).UTC(), 15},
{labels.FromStrings("cluster", "test", "app", "foo"), time.Unix(9, 0).UTC(), 5},
}
tracker := streams.New(nil, 1024)
for _, tc := range tt {
tracker.Record(tc.Labels, tc.Time)
tracker.Record(tc.Labels, tc.Time, tc.Size)
}
buf, err := buildObject(tracker)
@ -36,18 +37,20 @@ func Test(t *testing.T) {
expect := []streams.Stream{
{
ID: 1,
Labels: labels.FromStrings("cluster", "test", "app", "foo"),
MinTimestamp: time.Unix(9, 0).UTC(),
MaxTimestamp: time.Unix(15, 0).UTC(),
Rows: 3,
ID: 1,
Labels: labels.FromStrings("cluster", "test", "app", "foo"),
MinTimestamp: time.Unix(9, 0).UTC(),
MaxTimestamp: time.Unix(15, 0).UTC(),
Rows: 3,
UncompressedSize: 30,
},
{
ID: 2,
Labels: labels.FromStrings("cluster", "test", "app", "bar", "special", "yes"),
MinTimestamp: time.Unix(100, 0).UTC(),
MaxTimestamp: time.Unix(100, 0).UTC(),
Rows: 1,
ID: 2,
Labels: labels.FromStrings("cluster", "test", "app", "bar", "special", "yes"),
MinTimestamp: time.Unix(100, 0).UTC(),
MaxTimestamp: time.Unix(100, 0).UTC(),
Rows: 1,
UncompressedSize: 20,
},
}

@ -23,6 +23,9 @@ type Stream struct {
// the stream.
MinTime, MaxTime time.Time
// UncompressedSize is the total size of all the log lines and structured metadata values in the stream
UncompressedSize int64
// Labels of the stream.
Labels labels.Labels
}
@ -111,10 +114,11 @@ func (r *StreamsReader) Read(ctx context.Context, s []Stream) (int, error) {
}
s[i] = Stream{
ID: stream.ID,
MinTime: stream.MinTimestamp,
MaxTime: stream.MaxTimestamp,
Labels: stream.Labels,
ID: stream.ID,
MinTime: stream.MinTimestamp,
MaxTime: stream.MaxTimestamp,
UncompressedSize: stream.UncompressedSize,
Labels: stream.Labels,
}
}

@ -18,22 +18,23 @@ import (
)
var streamsTestdata = []struct {
Labels labels.Labels
Timestamp time.Time
Labels labels.Labels
Timestamp time.Time
UncompressedSize int64
}{
{labels.FromStrings("cluster", "test", "app", "foo"), unixTime(10)},
{labels.FromStrings("cluster", "test", "app", "foo"), unixTime(15)},
{labels.FromStrings("cluster", "test", "app", "bar"), unixTime(5)},
{labels.FromStrings("cluster", "test", "app", "bar"), unixTime(20)},
{labels.FromStrings("cluster", "test", "app", "baz"), unixTime(25)},
{labels.FromStrings("cluster", "test", "app", "baz"), unixTime(30)},
{labels.FromStrings("cluster", "test", "app", "foo"), unixTime(10), 15},
{labels.FromStrings("cluster", "test", "app", "foo"), unixTime(15), 10},
{labels.FromStrings("cluster", "test", "app", "bar"), unixTime(5), 20},
{labels.FromStrings("cluster", "test", "app", "bar"), unixTime(20), 25},
{labels.FromStrings("cluster", "test", "app", "baz"), unixTime(25), 30},
{labels.FromStrings("cluster", "test", "app", "baz"), unixTime(30), 5},
}
func TestStreamsReader(t *testing.T) {
expect := []dataobj.Stream{
{1, unixTime(10), unixTime(15), labels.FromStrings("cluster", "test", "app", "foo")},
{2, unixTime(5), unixTime(20), labels.FromStrings("cluster", "test", "app", "bar")},
{3, unixTime(25), unixTime(30), labels.FromStrings("cluster", "test", "app", "baz")},
{1, unixTime(10), unixTime(15), 25, labels.FromStrings("cluster", "test", "app", "foo")},
{2, unixTime(5), unixTime(20), 45, labels.FromStrings("cluster", "test", "app", "bar")},
{3, unixTime(25), unixTime(30), 35, labels.FromStrings("cluster", "test", "app", "baz")},
}
obj := buildStreamsObject(t, 1) // Many pages
@ -49,7 +50,7 @@ func TestStreamsReader(t *testing.T) {
func TestStreamsReader_AddLabelMatcher(t *testing.T) {
expect := []dataobj.Stream{
{2, unixTime(5), unixTime(20), labels.FromStrings("cluster", "test", "app", "bar")},
{2, unixTime(5), unixTime(20), 45, labels.FromStrings("cluster", "test", "app", "bar")},
}
obj := buildStreamsObject(t, 1) // Many pages
@ -67,8 +68,8 @@ func TestStreamsReader_AddLabelMatcher(t *testing.T) {
func TestStreamsReader_AddLabelFilter(t *testing.T) {
expect := []dataobj.Stream{
{2, unixTime(5), unixTime(20), labels.FromStrings("cluster", "test", "app", "bar")},
{3, unixTime(25), unixTime(30), labels.FromStrings("cluster", "test", "app", "baz")},
{2, unixTime(5), unixTime(20), 45, labels.FromStrings("cluster", "test", "app", "bar")},
{3, unixTime(25), unixTime(30), 35, labels.FromStrings("cluster", "test", "app", "baz")},
}
obj := buildStreamsObject(t, 1) // Many pages
@ -100,7 +101,7 @@ func buildStreamsObject(t *testing.T, pageSize int) *dataobj.Object {
s := streams.New(nil, pageSize)
for _, d := range streamsTestdata {
s.Record(d.Labels, d.Timestamp)
s.Record(d.Labels, d.Timestamp, d.UncompressedSize)
}
var buf bytes.Buffer

Loading…
Cancel
Save