Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/dataobj/sections/logs/table_test.go

98 lines
2.7 KiB

package logs
import (
"context"
"errors"
"io"
"strings"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/dataobj/internal/dataset"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/result"
)
func Test_table_metadataCleanup(t *testing.T) {
var buf tableBuffer
initBuffer(&buf)
_ = buf.Metadata("foo", 1024, dataset.CompressionOptions{})
_ = buf.Metadata("bar", 1024, dataset.CompressionOptions{})
table, err := buf.Flush()
require.NoError(t, err)
require.Equal(t, 2, len(table.Metadatas))
initBuffer(&buf)
_ = buf.Metadata("bar", 1024, dataset.CompressionOptions{})
table, err = buf.Flush()
require.NoError(t, err)
require.Equal(t, 1, len(table.Metadatas))
require.Equal(t, "bar", table.Metadatas[0].Info.Name)
}
func initBuffer(buf *tableBuffer) {
buf.StreamID(1024)
buf.Timestamp(1024)
buf.Message(1024, dataset.CompressionOptions{})
}
func Test_mergeTables(t *testing.T) {
var buf tableBuffer
// tables need to be sorted by Timestamp DESC and StreamID ASC
var (
tableA = buildTable(&buf, 1024, dataset.CompressionOptions{}, []Record{
{StreamID: 3, Timestamp: time.Unix(3, 0), Line: []byte("hello")},
{StreamID: 2, Timestamp: time.Unix(2, 0), Line: []byte("how")},
{StreamID: 1, Timestamp: time.Unix(1, 0), Line: []byte("you")},
})
tableB = buildTable(&buf, 1024, dataset.CompressionOptions{}, []Record{
{StreamID: 1, Timestamp: time.Unix(2, 0), Line: []byte("world")},
{StreamID: 3, Timestamp: time.Unix(1, 0), Line: []byte("goodbye")},
})
tableC = buildTable(&buf, 1024, dataset.CompressionOptions{}, []Record{
{StreamID: 3, Timestamp: time.Unix(2, 0), Line: []byte("are")},
{StreamID: 2, Timestamp: time.Unix(1, 0), Line: []byte("doing?")},
})
)
mergedTable, err := mergeTables(&buf, 1024, dataset.CompressionOptions{}, []*table{tableA, tableB, tableC})
require.NoError(t, err)
mergedColumns, err := result.Collect(mergedTable.ListColumns(context.Background()))
require.NoError(t, err)
var actual []string
r := dataset.NewReader(dataset.ReaderOptions{
Dataset: mergedTable,
Columns: mergedColumns,
})
rows := make([]dataset.Row, 1024)
for {
n, err := r.Read(context.Background(), rows)
if err != nil && !errors.Is(err, io.EOF) {
require.NoError(t, err)
} else if n == 0 && errors.Is(err, io.EOF) {
break
}
for _, row := range rows[:n] {
require.Len(t, row.Values, 3)
require.Equal(t, datasetmd.VALUE_TYPE_BYTE_ARRAY, row.Values[2].Type())
actual = append(actual, string(row.Values[2].ByteArray()))
}
}
require.Equal(t, "hello world how are you doing? goodbye", strings.Join(actual, " "))
}