Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/dataobj/sections/pointers/builder_test.go

157 lines
4.4 KiB

package pointers
import (
"context"
"io"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/dataobj"
)
func TestAddingStreams(t *testing.T) {
type ent struct {
path string
section int64
pointerKind PointerKind
streamIDInObject int64
streamID int64
minTimestamp time.Time
maxTimestamp time.Time
rowsCount int64
uncompressedSize int64
}
tt := []ent{
{path: "foo", section: 0, pointerKind: PointerKindStreamIndex, streamIDInObject: -1, streamID: 1, minTimestamp: time.Unix(9, 0), maxTimestamp: time.Unix(15, 0), rowsCount: 2, uncompressedSize: 30},
{path: "bar", section: 1, pointerKind: PointerKindStreamIndex, streamIDInObject: -2, streamID: 2, minTimestamp: time.Unix(100, 0), maxTimestamp: time.Unix(101, 0), rowsCount: 2, uncompressedSize: 20},
}
tracker := NewBuilder(nil, 1024, 0)
for _, tc := range tt {
tracker.ObserveStream(tc.path, tc.section, tc.streamIDInObject, tc.streamID, tc.minTimestamp, tc.uncompressedSize)
// We Observe twice to track the max timestamp too. But we don't want to count the size twice.
tracker.ObserveStream(tc.path, tc.section, tc.streamIDInObject, tc.streamID, tc.maxTimestamp, 0)
}
obj, closer, err := buildObject(tracker)
require.NoError(t, err)
defer closer.Close()
expect := []SectionPointer{
{
Path: "foo",
Section: 0,
PointerKind: PointerKindStreamIndex,
StreamID: 1,
StreamIDRef: -1,
StartTs: time.Unix(9, 0),
EndTs: time.Unix(15, 0),
LineCount: 2,
UncompressedSize: 30,
// Non stream pieces are set to default values
ValuesBloomFilter: nil,
ColumnName: "",
ColumnIndex: 0,
},
{
Path: "bar",
Section: 1,
PointerKind: PointerKindStreamIndex,
StreamID: 2,
StreamIDRef: -2,
StartTs: time.Unix(100, 0),
EndTs: time.Unix(101, 0),
LineCount: 2,
UncompressedSize: 20,
// Non stream pieces are set to default values
ValuesBloomFilter: nil,
ColumnName: "",
ColumnIndex: 0,
},
}
var actual []SectionPointer
for result := range Iter(context.Background(), obj) {
pointer, err := result.Value()
require.NoError(t, err)
actual = append(actual, pointer)
}
require.Equal(t, expect, actual)
}
func TestAddingColumnIndexes(t *testing.T) {
type ent struct {
path string
section int64
pointerKind PointerKind
columnName string
columnIndex int64
valuesBloomFilter []byte
}
tt := []ent{
{path: "foo", section: 0, pointerKind: PointerKindColumnIndex, columnName: "testColumn", columnIndex: 0, valuesBloomFilter: []byte{1, 2, 3}},
{path: "bar", section: 1, pointerKind: PointerKindColumnIndex, columnName: "testColumn2", columnIndex: 1, valuesBloomFilter: []byte{1, 2, 3, 4}},
}
tracker := NewBuilder(nil, 1024, 0)
for _, tc := range tt {
tracker.RecordColumnIndex(tc.path, tc.section, tc.columnName, tc.columnIndex, tc.valuesBloomFilter)
}
obj, closer, err := buildObject(tracker)
require.NoError(t, err)
defer closer.Close()
expect := []SectionPointer{
{
Path: "foo",
Section: 0,
PointerKind: PointerKindColumnIndex,
ColumnName: "testColumn",
ColumnIndex: 0,
ValuesBloomFilter: []byte{1, 2, 3},
// Non column pieces are set to default values
StreamID: 0,
StartTs: time.Time{},
EndTs: time.Time{},
LineCount: 0,
UncompressedSize: 0,
},
{
Path: "bar",
Section: 1,
PointerKind: PointerKindColumnIndex,
ColumnName: "testColumn2",
ColumnIndex: 1,
ValuesBloomFilter: []byte{1, 2, 3, 4},
// Non column pieces are set to default values
StreamID: 0,
StartTs: time.Time{},
EndTs: time.Time{},
LineCount: 0,
UncompressedSize: 0,
},
}
var actual []SectionPointer
for result := range Iter(context.Background(), obj) {
pointer, err := result.Value()
require.NoError(t, err)
actual = append(actual, pointer)
}
require.Equal(t, expect, actual)
}
func buildObject(st *Builder) (*dataobj.Object, io.Closer, error) {
builder := dataobj.NewBuilder(nil)
if err := builder.Append(st); err != nil {
return nil, nil, err
}
return builder.Flush()
}