Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/dataobj/sections/logs/builder_test.go

134 lines
3.3 KiB

package logs_test
import (
"context"
"fmt"
"io"
"strings"
"testing"
"time"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/dataobj/sections/logs"
"github.com/grafana/loki/v3/pkg/scratch"
)
func Test(t *testing.T) {
records := []logs.Record{
{
StreamID: 2,
Timestamp: time.Unix(10, 0),
Metadata: labels.New(labels.Label{Name: "cluster", Value: "test"}, labels.Label{Name: "app", Value: "foo"}),
Line: []byte("foo bar"),
},
{
StreamID: 1,
Timestamp: time.Unix(10, 0),
Metadata: labels.EmptyLabels(),
Line: []byte("hello world"),
},
{
StreamID: 2,
Timestamp: time.Unix(100, 0),
Metadata: labels.New(labels.Label{Name: "cluster", Value: "test"}, labels.Label{Name: "app", Value: "bar"}),
Line: []byte("goodbye world"),
},
}
opts := logs.BuilderOptions{
PageSizeHint: 1024,
BufferSize: 256,
StripeMergeLimit: 2,
SortOrder: logs.SortStreamASC,
}
tracker := logs.NewBuilder(nil, opts)
for _, record := range records {
tracker.Append(record)
}
obj, closer, err := buildObject(tracker)
require.NoError(t, err)
defer closer.Close()
// The order of records should be sorted by timestamp DESC then stream ID, and all
// metadata should be sorted by key then value.
expect := []logs.Record{
{
StreamID: 1,
Timestamp: time.Unix(10, 0),
Metadata: labels.EmptyLabels(),
Line: []byte("hello world"),
},
{
StreamID: 2,
Timestamp: time.Unix(100, 0),
Metadata: labels.New(labels.Label{Name: "app", Value: "bar"}, labels.Label{Name: "cluster", Value: "test"}),
Line: []byte("goodbye world"),
},
{
StreamID: 2,
Timestamp: time.Unix(10, 0),
Metadata: labels.New(labels.Label{Name: "app", Value: "foo"}, labels.Label{Name: "cluster", Value: "test"}),
Line: []byte("foo bar"),
},
}
i := 0
for result := range logs.Iter(context.Background(), obj) {
record, err := result.Value()
require.NoError(t, err)
require.Equal(t, expect[i], record)
i++
}
}
func TestLogsBuilder_10000Columns(t *testing.T) {
// Make a dataset of 100 records, each with 100 unique metadata labels, each with a single 256 byte value.
var records []logs.Record
for i := range 100 {
megaRecord := logs.Record{
StreamID: 2,
Timestamp: time.Unix(10, 0),
Line: []byte("foo bar"),
}
lbb := labels.NewScratchBuilder(100)
for j := range 100 {
lbb.Add(fmt.Sprintf("key%d-%d", i, j), fmt.Sprintf("value%d-%d-%s", i, j, strings.Repeat("A", 256)))
}
megaRecord.Metadata = lbb.Labels()
records = append(records, megaRecord)
}
opts := logs.BuilderOptions{
PageSizeHint: 2 * 1024 * 1024,
BufferSize: 256,
StripeMergeLimit: 2,
SortOrder: logs.SortStreamASC,
}
dataobj := dataobj.NewBuilder(scratch.NewMemory())
builder := logs.NewBuilder(nil, opts)
for _, record := range records {
builder.Append(record)
}
err := dataobj.Append(builder)
require.NoError(t, err)
_, closer, err := dataobj.Flush()
require.NoError(t, err)
require.NoError(t, closer.Close())
}
func buildObject(lt *logs.Builder) (*dataobj.Object, io.Closer, error) {
builder := dataobj.NewBuilder(nil)
if err := builder.Append(lt); err != nil {
return nil, nil, err
}
return builder.Flush()
}