perf: Reuse Values in dataobj.Reader to reduce allocs (#16988)

Co-authored-by: Robert Fratto <robertfratto@gmail.com>
pull/17215/head
benclive 1 year ago committed by GitHub
parent 4439096d96
commit e5784d7a82
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 24
      pkg/dataobj/internal/dataset/column_reader_test.go
  2. 60
      pkg/dataobj/internal/dataset/column_test.go
  3. 4
      pkg/dataobj/internal/dataset/page_builder.go
  4. 54
      pkg/dataobj/internal/dataset/page_reader.go
  5. 14
      pkg/dataobj/internal/dataset/page_reader_test.go
  6. 12
      pkg/dataobj/internal/dataset/page_test.go
  7. 17
      pkg/dataobj/internal/dataset/reader_basic.go
  8. 18
      pkg/dataobj/internal/dataset/reader_basic_test.go
  9. 2
      pkg/dataobj/internal/dataset/reader_test.go
  10. 5
      pkg/dataobj/internal/dataset/row_ranges.go
  11. 94
      pkg/dataobj/internal/dataset/value.go
  12. 146
      pkg/dataobj/internal/dataset/value_encoding_plain.go
  13. 89
      pkg/dataobj/internal/dataset/value_encoding_plain_test.go
  14. 18
      pkg/dataobj/internal/dataset/value_test.go
  15. 12
      pkg/dataobj/internal/encoding/encoding_test.go
  16. 92
      pkg/dataobj/internal/metadata/datasetmd/datasetmd.pb.go
  17. 6
      pkg/dataobj/internal/metadata/datasetmd/datasetmd.proto
  18. 57
      pkg/dataobj/internal/sections/logs/iter.go
  19. 16
      pkg/dataobj/internal/sections/logs/iter_test.go
  20. 10
      pkg/dataobj/internal/sections/logs/logs.go
  21. 23
      pkg/dataobj/internal/sections/logs/logs_test.go
  22. 2
      pkg/dataobj/internal/sections/logs/table.go
  23. 2
      pkg/dataobj/internal/sections/logs/table_build.go
  24. 73
      pkg/dataobj/internal/sections/streams/iter.go
  25. 6
      pkg/dataobj/internal/sections/streams/streams.go
  26. 14
      pkg/dataobj/internal/sections/streams/streams_test.go
  27. 27
      pkg/dataobj/internal/util/slicegrow/slicegrow.go
  28. 60
      pkg/dataobj/internal/util/symbolizer/symbolizer.go
  29. 72
      pkg/dataobj/logs_reader.go
  30. 13
      pkg/dataobj/logs_reader_test.go
  31. 1
      pkg/dataobj/querier/metadata.go
  32. 56
      pkg/dataobj/querier/store_test.go
  33. 42
      pkg/dataobj/streams_reader.go

@ -35,10 +35,8 @@ func Test_columnReader_ReadAll(t *testing.T) {
require.Greater(t, len(col.Pages), 1, "test requires multiple pages")
cr := newColumnReader(col)
actualValues, err := readColumn(cr, 4)
actual, err := readColumn(t, cr, 4)
require.NoError(t, err)
actual := convertToStrings(t, actualValues)
require.Equal(t, columnReaderTestStrings, actual)
}
@ -71,17 +69,16 @@ func Test_columnReader_SeekToStart(t *testing.T) {
cr := newColumnReader(col)
// First read everything
_, err := readColumn(cr, 4)
_, err := readColumn(t, cr, 4)
require.NoError(t, err)
// Seek back to start and read again
_, err = cr.Seek(0, io.SeekStart)
require.NoError(t, err)
actualValues, err := readColumn(cr, 4)
actual, err := readColumn(t, cr, 4)
require.NoError(t, err)
actual := convertToStrings(t, actualValues)
require.Equal(t, columnReaderTestStrings, actual)
}
@ -92,16 +89,15 @@ func Test_columnReader_Reset(t *testing.T) {
cr := newColumnReader(col)
// First read everything
_, err := readColumn(cr, 4)
_, err := readColumn(t, cr, 4)
require.NoError(t, err)
// Reset and read again
cr.Reset(col)
actualValues, err := readColumn(cr, 4)
actual, err := readColumn(t, cr, 4)
require.NoError(t, err)
actual := convertToStrings(t, actualValues)
require.Equal(t, columnReaderTestStrings, actual)
}
@ -111,14 +107,14 @@ func buildMultiPageColumn(t *testing.T, values []string) *MemColumn {
builder, err := NewColumnBuilder("", BuilderOptions{
PageSizeHint: 128, // Small page size to force multiple pages
Value: datasetmd.VALUE_TYPE_STRING,
Value: datasetmd.VALUE_TYPE_BYTE_ARRAY,
Compression: datasetmd.COMPRESSION_TYPE_SNAPPY,
Encoding: datasetmd.ENCODING_TYPE_PLAIN,
})
require.NoError(t, err)
for i, v := range values {
require.NoError(t, builder.Append(i, StringValue(v)))
require.NoError(t, builder.Append(i, ByteArrayValue([]byte(v))))
}
col, err := builder.Flush()
@ -126,9 +122,9 @@ func buildMultiPageColumn(t *testing.T, values []string) *MemColumn {
return col
}
func readColumn(cr *columnReader, batchSize int) ([]Value, error) {
func readColumn(t *testing.T, cr *columnReader, batchSize int) ([]string, error) {
var (
all []Value
all []string
batch = make([]Value, batchSize)
)
@ -136,7 +132,7 @@ func readColumn(cr *columnReader, batchSize int) ([]Value, error) {
for {
n, err := cr.Read(context.Background(), batch)
if n > 0 {
all = append(all, batch[:n]...)
all = append(all, convertToStrings(t, batch[:n])...)
}
if errors.Is(err, io.EOF) {
return all, nil

@ -13,23 +13,23 @@ import (
)
func TestColumnBuilder_ReadWrite(t *testing.T) {
in := []string{
"hello, world!",
"",
"this is a test of the emergency broadcast system",
"this is only a test",
"if this were a real emergency, you would be instructed to panic",
"but it's not, so don't",
"",
"this concludes the test",
"thank you for your cooperation",
"goodbye",
in := [][]byte{
[]byte("hello, world!"),
[]byte(""),
[]byte("this is a test of the emergency broadcast system"),
[]byte("this is only a test"),
[]byte("if this were a real emergency, you would be instructed to panic"),
[]byte("but it's not, so don't"),
[]byte(""),
[]byte("this concludes the test"),
[]byte("thank you for your cooperation"),
[]byte("goodbye"),
}
opts := BuilderOptions{
// Set the size to 0 so each column has exactly one value.
PageSizeHint: 0,
Value: datasetmd.VALUE_TYPE_STRING,
Value: datasetmd.VALUE_TYPE_BYTE_ARRAY,
Compression: datasetmd.COMPRESSION_TYPE_ZSTD,
Encoding: datasetmd.ENCODING_TYPE_PLAIN,
}
@ -37,12 +37,12 @@ func TestColumnBuilder_ReadWrite(t *testing.T) {
require.NoError(t, err)
for i, s := range in {
require.NoError(t, b.Append(i, StringValue(s)))
require.NoError(t, b.Append(i, ByteArrayValue(s)))
}
col, err := b.Flush()
require.NoError(t, err)
require.Equal(t, datasetmd.VALUE_TYPE_STRING, col.Info.Type)
require.Equal(t, datasetmd.VALUE_TYPE_BYTE_ARRAY, col.Info.Type)
require.Equal(t, len(in), col.Info.RowsCount)
require.Equal(t, len(in)-2, col.Info.ValuesCount) // -2 for the empty strings
require.Greater(t, len(col.Pages), 1)
@ -51,7 +51,7 @@ func TestColumnBuilder_ReadWrite(t *testing.T) {
t.Log("Compressed size: ", col.Info.CompressedSize)
t.Log("Pages: ", len(col.Pages))
var actual []string
var actual [][]byte
r := newColumnReader(col)
for {
@ -67,10 +67,10 @@ func TestColumnBuilder_ReadWrite(t *testing.T) {
val := values[0]
if val.IsNil() || val.IsZero() {
actual = append(actual, "")
actual = append(actual, []byte{})
} else {
require.Equal(t, datasetmd.VALUE_TYPE_STRING, val.Type())
actual = append(actual, val.String())
require.Equal(t, datasetmd.VALUE_TYPE_BYTE_ARRAY, val.Type())
actual = append(actual, val.ByteArray())
}
}
@ -111,7 +111,7 @@ func TestColumnBuilder_MinMax(t *testing.T) {
opts := BuilderOptions{
PageSizeHint: 301, // Slightly larger than the string length of 3 strings per page.
Value: datasetmd.VALUE_TYPE_STRING,
Value: datasetmd.VALUE_TYPE_BYTE_ARRAY,
Compression: datasetmd.COMPRESSION_TYPE_NONE,
Encoding: datasetmd.ENCODING_TYPE_PLAIN,
@ -123,29 +123,29 @@ func TestColumnBuilder_MinMax(t *testing.T) {
require.NoError(t, err)
for i, s := range in {
require.NoError(t, b.Append(i, StringValue(s)))
require.NoError(t, b.Append(i, ByteArrayValue([]byte(s))))
}
col, err := b.Flush()
require.NoError(t, err)
require.Equal(t, datasetmd.VALUE_TYPE_STRING, col.Info.Type)
require.Equal(t, datasetmd.VALUE_TYPE_BYTE_ARRAY, col.Info.Type)
require.NotNil(t, col.Info.Statistics)
columnMin, columnMax := getMinMax(t, col.Info.Statistics)
require.Equal(t, aString, columnMin.String())
require.Equal(t, fString, columnMax.String())
require.Equal(t, aString, string(columnMin.ByteArray()))
require.Equal(t, fString, string(columnMax.ByteArray()))
require.Len(t, col.Pages, 2)
require.Equal(t, 3, col.Pages[0].Info.ValuesCount)
require.Equal(t, 3, col.Pages[1].Info.ValuesCount)
page0Min, page0Max := getMinMax(t, col.Pages[0].Info.Stats)
require.Equal(t, aString, page0Min.String())
require.Equal(t, cString, page0Max.String())
require.Equal(t, aString, string(page0Min.ByteArray()))
require.Equal(t, cString, string(page0Max.ByteArray()))
page1Min, page1Max := getMinMax(t, col.Pages[1].Info.Stats)
require.Equal(t, dString, page1Min.String())
require.Equal(t, fString, page1Max.String())
require.Equal(t, dString, string(page1Min.ByteArray()))
require.Equal(t, fString, string(page1Max.ByteArray()))
}
func TestColumnBuilder_Cardinality(t *testing.T) {
@ -174,7 +174,7 @@ func TestColumnBuilder_Cardinality(t *testing.T) {
opts := BuilderOptions{
PageSizeHint: 301, // Slightly larger than the string length of 3 strings per page.
Value: datasetmd.VALUE_TYPE_STRING,
Value: datasetmd.VALUE_TYPE_BYTE_ARRAY,
Compression: datasetmd.COMPRESSION_TYPE_NONE,
Encoding: datasetmd.ENCODING_TYPE_PLAIN,
@ -186,12 +186,12 @@ func TestColumnBuilder_Cardinality(t *testing.T) {
require.NoError(t, err)
for i, s := range in {
require.NoError(t, b.Append(i, StringValue(s)))
require.NoError(t, b.Append(i, ByteArrayValue([]byte(s))))
}
col, err := b.Flush()
require.NoError(t, err)
require.Equal(t, datasetmd.VALUE_TYPE_STRING, col.Info.Type)
require.Equal(t, datasetmd.VALUE_TYPE_BYTE_ARRAY, col.Info.Type)
require.NotNil(t, col.Info.Statistics)
// we use sparse hyperloglog reprs until a certain cardinality is reached,
// so this should not be approximate at low counts.

@ -171,10 +171,6 @@ func valueSize(v Value) int {
// Assuming that uint64s are written as uvarints.
return streamio.UvarintSize(v.Uint64())
case datasetmd.VALUE_TYPE_STRING:
// Assuming that strings are PLAIN encoded using their length and bytes.
str := v.String()
return binary.Size(len(str)) + len(str)
case datasetmd.VALUE_TYPE_BYTE_ARRAY:
arr := v.ByteArray()
return binary.Size(len(arr)) + len(arr)

@ -6,9 +6,9 @@ import (
"errors"
"fmt"
"io"
"slices"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/util/slicegrow"
)
type pageReader struct {
@ -29,6 +29,9 @@ type pageReader struct {
pageRow int64
nextRow int64
presenceReader *bufio.Reader
valuesReader *bufio.Reader
}
// newPageReader returns a new pageReader that reads from the provided page.
@ -76,7 +79,7 @@ func (pr *pageReader) Read(ctx context.Context, v []Value) (n int, err error) {
//
// read advances pr.pageRow but not pr.nextRow.
func (pr *pageReader) read(v []Value) (n int, err error) {
pr.presenceBuf = slices.Grow(pr.presenceBuf, len(v))
pr.presenceBuf = slicegrow.GrowToCap(pr.presenceBuf, len(v))
pr.presenceBuf = pr.presenceBuf[:len(v)]
// We want to allow decoders to reuse memory of [Value]s in v while allowing
@ -153,15 +156,12 @@ func (pr *pageReader) read(v []Value) (n int, err error) {
//
// The resulting slice is len(src).
func reuseValuesBuffer(dst []Value, src []Value) []Value {
dst = slices.Grow(dst, len(src))
dst = slicegrow.GrowToCap(dst, len(src))
dst = dst[:0]
for _, val := range src {
if val.IsNil() {
continue
}
dst = append(dst, val)
}
// We must maintain ordering against the caller slice here.
// Otherwise we can move pointers around which can get reused within a read call.
dst = append(dst, src...)
filledLength := len(dst)
@ -193,20 +193,21 @@ func (pr *pageReader) init(ctx context.Context) error {
return fmt.Errorf("opening page for reading: %w", err)
}
if pr.presenceDec == nil {
pr.presenceDec = newBitmapDecoder(bufio.NewReader(presenceReader))
} else {
pr.presenceDec.Reset(bufio.NewReader(presenceReader))
}
pr.presenceReader = pr.getPresenceReader()
pr.presenceReader.Reset(presenceReader)
pr.presenceDec = pr.getPresenceDecoder()
pr.presenceDec.Reset(pr.presenceReader)
pr.valuesReader = pr.getValuesReader()
pr.valuesReader.Reset(valuesReader)
if pr.valuesDec == nil || pr.lastValue != pr.value || pr.lastEncoding != memPage.Info.Encoding {
var ok bool
pr.valuesDec, ok = newValueDecoder(pr.value, memPage.Info.Encoding, bufio.NewReader(valuesReader))
pr.valuesDec, ok = newValueDecoder(pr.value, memPage.Info.Encoding, pr.valuesReader)
if !ok {
return fmt.Errorf("unsupported value encoding %s/%s", pr.value, memPage.Info.Encoding)
}
} else {
pr.valuesDec.Reset(bufio.NewReader(valuesReader))
pr.valuesDec.Reset(pr.valuesReader)
}
pr.ready = true
@ -297,3 +298,24 @@ func (pr *pageReader) Close() error {
}
return nil
}
func (pr *pageReader) getPresenceReader() *bufio.Reader {
if pr.presenceReader == nil {
return bufio.NewReader(nil)
}
return pr.presenceReader
}
func (pr *pageReader) getPresenceDecoder() *bitmapDecoder {
if pr.presenceDec == nil {
return newBitmapDecoder(nil)
}
return pr.presenceDec
}
func (pr *pageReader) getValuesReader() *bufio.Reader {
if pr.valuesReader == nil {
return bufio.NewReader(nil)
}
return pr.valuesReader
}

@ -27,7 +27,7 @@ var pageReaderTestStrings = []string{
func Test_pageReader(t *testing.T) {
opts := BuilderOptions{
PageSizeHint: 1024,
Value: datasetmd.VALUE_TYPE_STRING,
Value: datasetmd.VALUE_TYPE_BYTE_ARRAY,
Compression: datasetmd.COMPRESSION_TYPE_SNAPPY,
Encoding: datasetmd.ENCODING_TYPE_PLAIN,
}
@ -50,7 +50,7 @@ func Test_pageReader(t *testing.T) {
func Test_pageReader_SeekToStart(t *testing.T) {
opts := BuilderOptions{
PageSizeHint: 1024,
Value: datasetmd.VALUE_TYPE_STRING,
Value: datasetmd.VALUE_TYPE_BYTE_ARRAY,
Compression: datasetmd.COMPRESSION_TYPE_SNAPPY,
Encoding: datasetmd.ENCODING_TYPE_PLAIN,
}
@ -80,7 +80,7 @@ func Test_pageReader_SeekToStart(t *testing.T) {
func Test_pageReader_Reset(t *testing.T) {
opts := BuilderOptions{
PageSizeHint: 1024,
Value: datasetmd.VALUE_TYPE_STRING,
Value: datasetmd.VALUE_TYPE_BYTE_ARRAY,
Compression: datasetmd.COMPRESSION_TYPE_SNAPPY,
Encoding: datasetmd.ENCODING_TYPE_PLAIN,
}
@ -109,7 +109,7 @@ func Test_pageReader_Reset(t *testing.T) {
func Test_pageReader_SkipRows(t *testing.T) {
opts := BuilderOptions{
PageSizeHint: 1024,
Value: datasetmd.VALUE_TYPE_STRING,
Value: datasetmd.VALUE_TYPE_BYTE_ARRAY,
Compression: datasetmd.COMPRESSION_TYPE_SNAPPY,
Encoding: datasetmd.ENCODING_TYPE_PLAIN,
}
@ -140,7 +140,7 @@ func buildPage(t *testing.T, opts BuilderOptions, in []string) *MemPage {
require.NoError(t, err)
for _, s := range in {
require.True(t, b.Append(StringValue(s)))
require.True(t, b.Append(ByteArrayValue([]byte(s))))
}
page, err := b.Flush()
@ -186,8 +186,8 @@ func convertToStrings(t *testing.T, values []Value) []string {
if v.IsNil() {
out = append(out, "")
} else {
require.Equal(t, datasetmd.VALUE_TYPE_STRING, v.Type())
out = append(out, v.String())
require.Equal(t, datasetmd.VALUE_TYPE_BYTE_ARRAY, v.Type())
out = append(out, string(v.ByteArray()))
}
}

@ -109,7 +109,7 @@ func logsTestPage(t testing.TB) *MemPage {
opts := BuilderOptions{
PageSizeHint: sb.Len() * 2,
Value: datasetmd.VALUE_TYPE_STRING,
Value: datasetmd.VALUE_TYPE_BYTE_ARRAY,
Compression: datasetmd.COMPRESSION_TYPE_ZSTD,
Encoding: datasetmd.ENCODING_TYPE_PLAIN,
}
@ -117,7 +117,7 @@ func logsTestPage(t testing.TB) *MemPage {
require.NoError(t, err)
for line := range strings.Lines(sb.String()) {
require.True(t, builder.Append(StringValue(line)))
require.True(t, builder.Append(ByteArrayValue([]byte(line))))
}
page, err := builder.Flush()
@ -141,7 +141,7 @@ func Test_pageBuilder_WriteRead(t *testing.T) {
opts := BuilderOptions{
PageSizeHint: 1024,
Value: datasetmd.VALUE_TYPE_STRING,
Value: datasetmd.VALUE_TYPE_BYTE_ARRAY,
Compression: datasetmd.COMPRESSION_TYPE_SNAPPY,
Encoding: datasetmd.ENCODING_TYPE_PLAIN,
}
@ -149,7 +149,7 @@ func Test_pageBuilder_WriteRead(t *testing.T) {
require.NoError(t, err)
for _, s := range in {
require.True(t, b.Append(StringValue(s)))
require.True(t, b.Append(ByteArrayValue([]byte(s))))
}
page, err := b.Flush()
@ -178,8 +178,8 @@ func Test_pageBuilder_WriteRead(t *testing.T) {
if val.IsNil() || val.IsZero() {
actual = append(actual, "")
} else {
require.Equal(t, datasetmd.VALUE_TYPE_STRING, val.Type())
actual = append(actual, val.String())
require.Equal(t, datasetmd.VALUE_TYPE_BYTE_ARRAY, val.Type())
actual = append(actual, string(val.ByteArray()))
}
}
require.Equal(t, in, actual)

@ -6,7 +6,8 @@ import (
"fmt"
"io"
"iter"
"slices"
"github.com/grafana/loki/v3/pkg/dataobj/internal/util/slicegrow"
)
// basicReader is a low-level reader that reads rows from a set of columns.
@ -136,14 +137,13 @@ func (pr *basicReader) fill(ctx context.Context, columns []Column, s []Row) (n i
return 0, nil
}
pr.buf = slices.Grow(pr.buf, len(s))
pr.buf = slicegrow.GrowToCap(pr.buf, len(s))
pr.buf = pr.buf[:len(s)]
startRow := int64(s[0].Index)
// Ensure that each Row.Values slice has enough capacity to store all values.
for i := range s {
s[i].Values = slices.Grow(s[i].Values, len(pr.columns))
s[i].Values = slicegrow.GrowToCap(s[i].Values, len(pr.columns))
s[i].Values = s[i].Values[:len(pr.columns)]
}
@ -224,7 +224,12 @@ func (pr *basicReader) fill(ctx context.Context, columns []Column, s []Row) (n i
columnRead := columnRow - startRow
for i := columnRead; i < int64(maxRead); i++ {
s[n+int(i)].Values[columnIndex] = Value{}
// Reset values to 0 without discarding any memory they are pointing to,
// rather than setting it to NULL. This prevents the caller from being able
// to distinguish between the zero value and a NULL.
if !s[n+int(i)].Values[columnIndex].IsNil() {
s[n+int(i)].Values[columnIndex].Zero()
}
}
}
@ -240,7 +245,7 @@ func (pr *basicReader) fill(ctx context.Context, columns []Column, s []Row) (n i
//
// The resulting slice is len(src).
func reuseRowsBuffer(dst []Value, src []Row, columnIndex int) []Value {
dst = slices.Grow(dst, len(src))
dst = slicegrow.GrowToCap(dst, len(src))
dst = dst[:0]
for _, row := range src {

@ -112,7 +112,7 @@ func Test_basicReader_ReadColumns(t *testing.T) {
// Verify that read columns match the test data
testPerson := basicReaderTestData[row.Index]
if testPerson.middleName != "" {
require.Equal(t, testPerson.middleName, row.Values[1].String(), "middle_name mismatch")
require.Equal(t, testPerson.middleName, string(row.Values[1].ByteArray()), "middle_name mismatch")
} else {
require.True(t, row.Values[1].IsNil(), "middle_name should be nil")
}
@ -160,7 +160,7 @@ func Test_basicReader_Fill(t *testing.T) {
// Verify the firstName value
expectedPerson := basicReaderTestData[row.Index]
require.Equal(t, expectedPerson.firstName, row.Values[0].String(),
require.Equal(t, expectedPerson.firstName, string(row.Values[0].ByteArray()),
"firstName mismatch at index %d", row.Index)
}
}
@ -254,9 +254,9 @@ func buildTestDataset(t *testing.T) (Dataset, []Column) {
// Add data to each column
for i, p := range basicReaderTestData {
require.NoError(t, firstNameBuilder.Append(i, StringValue(p.firstName)))
require.NoError(t, middleNameBuilder.Append(i, StringValue(p.middleName)))
require.NoError(t, lastNameBuilder.Append(i, StringValue(p.lastName)))
require.NoError(t, firstNameBuilder.Append(i, ByteArrayValue([]byte(p.firstName))))
require.NoError(t, middleNameBuilder.Append(i, ByteArrayValue([]byte(p.middleName))))
require.NoError(t, lastNameBuilder.Append(i, ByteArrayValue([]byte(p.lastName))))
require.NoError(t, birthYearBuilder.Append(i, Int64Value(p.birthYear)))
}
@ -283,7 +283,7 @@ func buildStringColumn(t *testing.T, name string) *ColumnBuilder {
builder, err := NewColumnBuilder(name, BuilderOptions{
PageSizeHint: 16, // Small page size to force multiple pages
Value: datasetmd.VALUE_TYPE_STRING,
Value: datasetmd.VALUE_TYPE_BYTE_ARRAY,
Compression: datasetmd.COMPRESSION_TYPE_SNAPPY,
Encoding: datasetmd.ENCODING_TYPE_PLAIN,
@ -343,11 +343,11 @@ func convertToTestPersons(rows []Row) []testPerson {
for _, row := range rows {
var p testPerson
p.firstName = row.Values[0].String()
p.firstName = string(row.Values[0].ByteArray())
if !row.Values[1].IsNil() {
p.middleName = row.Values[1].String()
p.middleName = string(row.Values[1].ByteArray())
}
p.lastName = row.Values[2].String()
p.lastName = string(row.Values[2].ByteArray())
p.birthYear = row.Values[3].Int64()
out = append(out, p)

@ -66,7 +66,7 @@ func Test_Reader_ReadWithPageFiltering(t *testing.T) {
// which is out of range of at least one page.
Predicate: EqualPredicate{
Column: columns[0], // first_name column
Value: StringValue("Henry"),
Value: ByteArrayValue([]byte("Henry")),
},
})
defer r.Close()

@ -4,6 +4,8 @@ import (
"cmp"
"slices"
"sort"
"github.com/grafana/loki/v3/pkg/dataobj/internal/util/slicegrow"
)
// rowRanges tracks a set of row ranges that are "valid."
@ -234,7 +236,8 @@ func unionRanges(dst rowRanges, a, b rowRanges) rowRanges {
// We do our union by adding everything from a and b together, sorting
// the merged range, and then fixing any overlapping ranges.
dst = slices.Grow(dst, len(a)+len(b))
dst = slicegrow.GrowToCap(dst, len(a)+len(b))
dst = dst[:0]
dst = append(dst, a...)
dst = append(dst, b...)

@ -8,11 +8,11 @@ import (
"unsafe"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/util/slicegrow"
)
// Helper types
type (
stringptr *byte
bytearray *byte
)
@ -38,6 +38,9 @@ type Value struct {
// types.
num uint64
// cap holds the capacity for byte slice pointed to by any, if applicable.
cap uint64
// If any is of type [datasetmd.ValueType], then the value is in num as
// described above.
//
@ -63,19 +66,12 @@ func Uint64Value(v uint64) Value {
}
}
// StringValue returns a [Value] for a string.
func StringValue(v string) Value {
return Value{
num: uint64(len(v)),
any: (stringptr)(unsafe.StringData(v)),
}
}
// ByteArrayValue returns a [Value] for a byte slice representing a string.
func ByteArrayValue(v []byte) Value {
return Value{
num: uint64(len(v)),
any: (bytearray)(unsafe.SliceData(v)),
cap: uint64(cap(v)),
}
}
@ -101,8 +97,6 @@ func (v Value) Type() datasetmd.ValueType {
switch v := v.any.(type) {
case datasetmd.ValueType:
return v
case stringptr:
return datasetmd.VALUE_TYPE_STRING
case bytearray:
return datasetmd.VALUE_TYPE_BYTE_ARRAY
default:
@ -112,7 +106,7 @@ func (v Value) Type() datasetmd.ValueType {
// Int64 returns v's value as an int64. It panics if v is not a
// [datasetmd.VALUE_TYPE_INT64].
func (v Value) Int64() int64 {
func (v *Value) Int64() int64 {
if expect, actual := datasetmd.VALUE_TYPE_INT64, v.Type(); expect != actual {
panic(fmt.Sprintf("dataset.Value type is %s, not %s", actual, expect))
}
@ -121,33 +115,77 @@ func (v Value) Int64() int64 {
// Uint64 returns v's value as a uint64. It panics if v is not a
// [datasetmd.VALUE_TYPE_UINT64].
func (v Value) Uint64() uint64 {
func (v *Value) Uint64() uint64 {
if expect, actual := datasetmd.VALUE_TYPE_UINT64, v.Type(); expect != actual {
panic(fmt.Sprintf("dataset.Value type is %s, not %s", actual, expect))
}
return v.num
}
// String returns v's value as a string. Because of Go's String method
// convention, if v is not a string, String returns a string of the form
// "VALUE_TYPE_T", where T is the underlying type of v.
func (v Value) String() string {
if sp, ok := v.any.(stringptr); ok {
return unsafe.String(sp, v.num)
}
return v.Type().String()
}
// ByteSlice returns v's value as a byte slice. If v is not a string,
// ByteSlice returns a byte slice of the form "VALUE_TYPE_T", where T is the
// underlying type of v.
func (v Value) ByteArray() []byte {
func (v *Value) ByteArray() []byte {
if ba, ok := v.any.(bytearray); ok {
return unsafe.Slice(ba, v.num)
}
panic(fmt.Sprintf("dataset.Value type is %s, not %s", v.Type(), datasetmd.VALUE_TYPE_BYTE_ARRAY))
}
// Buffer returns a slice with a capacity of at least sz. Existing
// memory pointed to by Value is reused where possible, either
// returning the underlying memory or growing it to be at least
// sz.
//
// If Value does not point to any underlying memory, a new slice
// is allocated.
//
// After calling Buffer, Value is updated to store the returned
// slice.
func (v *Value) Buffer(sz int) []byte {
if v.cap == 0 {
dst := make([]byte, sz)
v.any = (bytearray)(unsafe.SliceData(dst))
v.cap = uint64(cap(dst))
return dst
}
var dst []byte
// Depending on which type this value was previously used for dictates how we reference the memory.
switch v.any.(type) {
case bytearray:
dst = unsafe.Slice(v.any.(bytearray), int(v.cap))
default:
panic("unsupported value type for buffer in Value's 'any' field, got " + v.Type().String())
}
// Grow the buffer attached to this Value if necessary.
if v.cap < uint64(sz) {
dst = slicegrow.GrowToCap(dst, sz)
v.any = (bytearray)(unsafe.SliceData(dst))
v.cap = uint64(cap(dst))
}
return dst
}
// SetByteArrayValue updates the value to point to the provided byte slice.
// This will overwrite any existing data stored in this Value and update it to be of type [datasetmd.VALUE_TYPE_BYTE_ARRAY].
func (v *Value) SetByteArrayValue(b []byte) {
v.any = (bytearray)(unsafe.SliceData(b))
v.num = uint64(len(b))
v.cap = uint64(cap(b))
}
// Zero resets the value to its zero state while retaining pointers to any existing memory.
// After calling Zero:
// - The value will report as zero but not nil if it points to underlying memory
// - The value will also report as nil only if it doesn't point to any underlying memory
// - Any subsequent operations that read the value will treat it as empty
// - Any subsequent operations that write to a non-nil zero value will re-use the underlying memory.
func (v *Value) Zero() {
v.num = 0
}
// MarshalBinary encodes v into a binary representation. Non-NULL values encode
// first with the type (encoded as uvarint), followed by an encoded value,
// where:
@ -169,9 +207,6 @@ func (v Value) MarshalBinary() (data []byte, err error) {
buf = binary.AppendVarint(buf, v.Int64())
case datasetmd.VALUE_TYPE_UINT64:
buf = binary.AppendUvarint(buf, v.Uint64())
case datasetmd.VALUE_TYPE_STRING:
str := v.String()
buf = append(buf, unsafe.Slice(unsafe.StringData(str), len(str))...)
case datasetmd.VALUE_TYPE_BYTE_ARRAY:
buf = append(buf, v.ByteArray()...)
default:
@ -207,9 +242,6 @@ func (v *Value) UnmarshalBinary(data []byte) error {
return fmt.Errorf("dataset.Value.UnmarshalBinary: invalid uint64 value")
}
*v = Uint64Value(val)
case datasetmd.VALUE_TYPE_STRING:
str := string(data[n:])
*v = StringValue(str)
case datasetmd.VALUE_TYPE_BYTE_ARRAY:
*v = ByteArrayValue(data[n:])
default:
@ -245,8 +277,6 @@ func CompareValues(a, b Value) int {
return cmp.Compare(a.Int64(), b.Int64())
case datasetmd.VALUE_TYPE_UINT64:
return cmp.Compare(a.Uint64(), b.Uint64())
case datasetmd.VALUE_TYPE_STRING:
return cmp.Compare(a.String(), b.String())
case datasetmd.VALUE_TYPE_BYTE_ARRAY:
return bytes.Compare(a.ByteArray(), b.ByteArray())
default:

@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"io"
"unsafe"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/streamio"
@ -13,12 +12,6 @@ import (
func init() {
// Register the encoding so instances of it can be dynamically created.
registerValueEncoding(
datasetmd.VALUE_TYPE_STRING,
datasetmd.ENCODING_TYPE_PLAIN,
func(w streamio.Writer) valueEncoder { return newPlainStringEncoder(w) },
func(r streamio.Reader) valueDecoder { return newPlainStringDecoder(r) },
)
registerValueEncoding(
datasetmd.VALUE_TYPE_BYTE_ARRAY,
datasetmd.ENCODING_TYPE_PLAIN,
@ -27,132 +20,12 @@ func init() {
)
}
// A plainStringEncoder encodes string values to an [streamio.Writer].
type plainStringEncoder struct {
w streamio.Writer
}
var _ valueEncoder = (*plainStringEncoder)(nil)
// newPlainEncoder creates a plainEncoder that writes encoded strings to w.
func newPlainStringEncoder(w streamio.Writer) *plainStringEncoder {
return &plainStringEncoder{w: w}
}
// ValueType returns [datasetmd.VALUE_TYPE_STRING].
func (enc *plainStringEncoder) ValueType() datasetmd.ValueType {
return datasetmd.VALUE_TYPE_STRING
}
// EncodingType returns [datasetmd.ENCODING_TYPE_PLAIN].
func (enc *plainStringEncoder) EncodingType() datasetmd.EncodingType {
return datasetmd.ENCODING_TYPE_PLAIN
}
// Encode encodes an individual string value.
func (enc *plainStringEncoder) Encode(v Value) error {
if v.Type() != datasetmd.VALUE_TYPE_STRING {
return fmt.Errorf("plain: invalid value type %v", v.Type())
}
sv := v.String()
if err := streamio.WriteUvarint(enc.w, uint64(len(sv))); err != nil {
return err
}
// This saves a few allocations by avoiding a copy of the string.
// Implementations of io.Writer are not supposed to modifiy the slice passed
// to Write, so this is generally safe.
n, err := enc.w.Write(unsafe.Slice(unsafe.StringData(sv), len(sv)))
if n != len(sv) {
return fmt.Errorf("short write; expected %d bytes, wrote %d", len(sv), n)
}
return err
}
// Flush implements [valueEncoder]. It is a no-op for plainEncoder.
func (enc *plainStringEncoder) Flush() error {
return nil
}
// Reset implements [valueEncoder]. It resets the encoder to write to w.
func (enc *plainStringEncoder) Reset(w streamio.Writer) {
enc.w = w
}
// plainStringDecoder decodes strings from an [streamio.Reader].
type plainStringDecoder struct {
r streamio.Reader
}
var _ valueDecoder = (*plainStringDecoder)(nil)
// newPlainStringDecoder creates a plainDecoder that reads encoded strings from r.
func newPlainStringDecoder(r streamio.Reader) *plainStringDecoder {
return &plainStringDecoder{r: r}
}
// ValueType returns [datasetmd.VALUE_TYPE_BYTE_ARRAY].
func (dec *plainStringDecoder) ValueType() datasetmd.ValueType {
return datasetmd.VALUE_TYPE_STRING
}
// EncodingType returns [datasetmd.ENCODING_TYPE_PLAIN].
func (dec *plainStringDecoder) EncodingType() datasetmd.EncodingType {
return datasetmd.ENCODING_TYPE_PLAIN
}
// Decode decodes up to len(s) values, storing the results into s. The
// number of decoded values is returned, followed by an error (if any).
// At the end of the stream, Decode returns 0, [io.EOF].
func (dec *plainStringDecoder) Decode(s []Value) (int, error) {
if len(s) == 0 {
return 0, nil
}
var err error
var v Value
for i := range s {
v, err = dec.decode()
if errors.Is(err, io.EOF) {
if i == 0 {
return 0, io.EOF
}
return i, nil
} else if err != nil {
return i, err
}
s[i] = v
}
return len(s), nil
}
// decode decodes a string.
func (dec *plainStringDecoder) decode() (Value, error) {
sz, err := binary.ReadUvarint(dec.r)
if err != nil {
return StringValue(""), err
}
dst := make([]byte, int(sz))
if _, err := io.ReadFull(dec.r, dst); err != nil {
return StringValue(""), err
}
return StringValue(string(dst)), nil
}
// Reset implements [valueDecoder]. It resets the decoder to read from r.
func (dec *plainStringDecoder) Reset(r streamio.Reader) {
dec.r = r
}
// A plainBytesEncoder encodes byte array values to an [streamio.Writer].
type plainBytesEncoder struct {
w streamio.Writer
}
var _ valueEncoder = (*plainStringEncoder)(nil)
var _ valueEncoder = (*plainBytesEncoder)(nil)
// newPlainEncoder creates a plainEncoder that writes encoded strings to w.
func newPlainBytesEncoder(w streamio.Writer) *plainBytesEncoder {
@ -228,10 +101,9 @@ func (dec *plainBytesDecoder) Decode(s []Value) (int, error) {
}
var err error
var v Value
for i := range s {
v, err = dec.decode()
err = dec.decode(&s[i])
if errors.Is(err, io.EOF) {
if i == 0 {
return 0, io.EOF
@ -240,23 +112,25 @@ func (dec *plainBytesDecoder) Decode(s []Value) (int, error) {
} else if err != nil {
return i, err
}
s[i] = v
}
return len(s), nil
}
// decode decodes a string.
func (dec *plainBytesDecoder) decode() (Value, error) {
func (dec *plainBytesDecoder) decode(v *Value) error {
sz, err := binary.ReadUvarint(dec.r)
if err != nil {
return ByteArrayValue([]byte{}), err
return err
}
dst := make([]byte, int(sz))
dst := v.Buffer(int(sz))
dst = dst[:sz]
if _, err := io.ReadFull(dec.r, dst); err != nil {
return ByteArrayValue([]byte{}), err
return err
}
return ByteArrayValue(dst), nil
v.SetByteArrayValue(dst)
return nil
}
// Reset implements [valueDecoder]. It resets the decoder to read from r.

@ -21,47 +21,17 @@ var testStrings = []string{
var batchSize = 64
func Test_plainStringEncoder(t *testing.T) {
var buf bytes.Buffer
var (
enc = newPlainStringEncoder(&buf)
dec = newPlainStringDecoder(&buf)
decBuf = make([]Value, batchSize)
)
for _, v := range testStrings {
require.NoError(t, enc.Encode(StringValue(v)))
}
var out []string
for {
n, err := dec.Decode(decBuf[:batchSize])
if errors.Is(err, io.EOF) {
break
} else if err != nil {
t.Fatal(err)
}
for _, v := range decBuf[:n] {
out = append(out, v.String())
}
}
require.Equal(t, testStrings, out)
}
func Test_plainStringEncoder_partialRead(t *testing.T) {
func Test_plainBytesEncoder(t *testing.T) {
var buf bytes.Buffer
var (
enc = newPlainStringEncoder(&buf)
dec = newPlainStringDecoder(&oneByteReader{&buf})
enc = newPlainBytesEncoder(&buf)
dec = newPlainBytesDecoder(&buf)
decBuf = make([]Value, batchSize)
)
for _, v := range testStrings {
require.NoError(t, enc.Encode(StringValue(v)))
require.NoError(t, enc.Encode(ByteArrayValue([]byte(v))))
}
var out []string
@ -74,56 +44,19 @@ func Test_plainStringEncoder_partialRead(t *testing.T) {
t.Fatal(err)
}
for _, v := range decBuf[:n] {
out = append(out, v.String())
out = append(out, string(v.ByteArray()))
}
}
require.Equal(t, testStrings, out)
}
func Benchmark_plainStringEncoder_Append(b *testing.B) {
enc := newPlainStringEncoder(streamio.Discard)
for i := 0; i < b.N; i++ {
for _, v := range testStrings {
_ = enc.Encode(StringValue(v))
}
}
}
func Benchmark_plainStringDecoder_Decode(b *testing.B) {
buf := bytes.NewBuffer(make([]byte, 0, 1024)) // Large enough to avoid reallocations.
var (
enc = newPlainStringEncoder(buf)
dec = newPlainStringDecoder(buf)
decBuf = make([]Value, batchSize)
)
for _, v := range testStrings {
require.NoError(b, enc.Encode(ByteArrayValue([]byte(v))))
}
var err error
b.ResetTimer()
for i := 0; i < b.N; i++ {
for {
_, err = dec.Decode(decBuf[:batchSize])
if errors.Is(err, io.EOF) {
break
} else if err != nil {
b.Fatal(err)
}
}
}
}
func Test_plainBytesEncoder(t *testing.T) {
func Test_plainBytesEncoder_partialRead(t *testing.T) {
var buf bytes.Buffer
var (
enc = newPlainBytesEncoder(&buf)
dec = newPlainBytesDecoder(&buf)
dec = newPlainBytesDecoder(&oneByteReader{&buf})
decBuf = make([]Value, batchSize)
)
@ -148,12 +81,12 @@ func Test_plainBytesEncoder(t *testing.T) {
require.Equal(t, testStrings, out)
}
func Test_plainBytesEncoder_partialRead(t *testing.T) {
func Test_plainBytesEncoder_reusingValues(t *testing.T) {
var buf bytes.Buffer
var (
enc = newPlainBytesEncoder(&buf)
dec = newPlainBytesDecoder(&oneByteReader{&buf})
dec = newPlainBytesDecoder(&buf)
decBuf = make([]Value, batchSize)
)
@ -161,6 +94,10 @@ func Test_plainBytesEncoder_partialRead(t *testing.T) {
require.NoError(t, enc.Encode(ByteArrayValue([]byte(v))))
}
for i := range decBuf {
decBuf[i] = ByteArrayValue(make([]byte, 64))
}
var out []string
for {

@ -48,31 +48,31 @@ func TestValue_MarshalBinary(t *testing.T) {
require.Equal(t, expect.Uint64(), actual.Uint64())
})
t.Run("StringValue", func(t *testing.T) {
t.Run("ByteArrayValue", func(t *testing.T) {
t.Run("Empty", func(t *testing.T) {
expect := dataset.StringValue("")
require.Equal(t, datasetmd.VALUE_TYPE_STRING, expect.Type())
expect := dataset.ByteArrayValue([]byte{})
require.Equal(t, datasetmd.VALUE_TYPE_BYTE_ARRAY, expect.Type())
b, err := expect.MarshalBinary()
require.NoError(t, err)
var actual dataset.Value
require.NoError(t, actual.UnmarshalBinary(b))
require.Equal(t, datasetmd.VALUE_TYPE_STRING, actual.Type())
require.Equal(t, expect.String(), actual.String())
require.Equal(t, datasetmd.VALUE_TYPE_BYTE_ARRAY, actual.Type())
require.Equal(t, expect.ByteArray(), actual.ByteArray())
})
t.Run("Non-empty", func(t *testing.T) {
expect := dataset.StringValue("hello, world!")
require.Equal(t, datasetmd.VALUE_TYPE_STRING, expect.Type())
expect := dataset.ByteArrayValue([]byte("hello, world!"))
require.Equal(t, datasetmd.VALUE_TYPE_BYTE_ARRAY, expect.Type())
b, err := expect.MarshalBinary()
require.NoError(t, err)
var actual dataset.Value
require.NoError(t, actual.UnmarshalBinary(b))
require.Equal(t, datasetmd.VALUE_TYPE_STRING, actual.Type())
require.Equal(t, expect.String(), actual.String())
require.Equal(t, datasetmd.VALUE_TYPE_BYTE_ARRAY, actual.Type())
require.Equal(t, expect.ByteArray(), actual.ByteArray())
})
})
}

@ -37,22 +37,22 @@ func TestStreams(t *testing.T) {
t.Run("Encode", func(t *testing.T) {
nameBuilder, err := dataset.NewColumnBuilder("name", dataset.BuilderOptions{
Value: datasetmd.VALUE_TYPE_STRING,
Value: datasetmd.VALUE_TYPE_BYTE_ARRAY,
Encoding: datasetmd.ENCODING_TYPE_PLAIN,
Compression: datasetmd.COMPRESSION_TYPE_NONE,
})
require.NoError(t, err)
capitalBuilder, err := dataset.NewColumnBuilder("capital", dataset.BuilderOptions{
Value: datasetmd.VALUE_TYPE_STRING,
Value: datasetmd.VALUE_TYPE_BYTE_ARRAY,
Encoding: datasetmd.ENCODING_TYPE_PLAIN,
Compression: datasetmd.COMPRESSION_TYPE_NONE,
})
require.NoError(t, err)
for i, c := range countries {
require.NoError(t, nameBuilder.Append(i, dataset.StringValue(c.Name)))
require.NoError(t, capitalBuilder.Append(i, dataset.StringValue(c.Capital)))
require.NoError(t, nameBuilder.Append(i, dataset.ByteArrayValue([]byte(c.Name))))
require.NoError(t, capitalBuilder.Append(i, dataset.ByteArrayValue([]byte(c.Capital))))
}
nameColumn, err := nameBuilder.Flush()
@ -123,8 +123,8 @@ func TestStreams(t *testing.T) {
require.Equal(t, len(actual), row.Index)
actual = append(actual, Country{
Name: row.Values[0].String(),
Capital: row.Values[1].String(),
Name: string(row.Values[0].ByteArray()),
Capital: string(row.Values[1].ByteArray()),
})
}
}

@ -36,8 +36,6 @@ const (
VALUE_TYPE_INT64 ValueType = 1
// VALUE_TYPE_UINT64 is a column containing 64-bit unsigned integer values.
VALUE_TYPE_UINT64 ValueType = 2
// VALUE_TYPE_STRING is a column containing string values.
VALUE_TYPE_STRING ValueType = 3
// VALUE_TYPE_BYTE_ARRAY is a column containing bytes with no specific type.
VALUE_TYPE_BYTE_ARRAY ValueType = 4
)
@ -46,7 +44,6 @@ var ValueType_name = map[int32]string{
0: "VALUE_TYPE_UNSPECIFIED",
1: "VALUE_TYPE_INT64",
2: "VALUE_TYPE_UINT64",
3: "VALUE_TYPE_STRING",
4: "VALUE_TYPE_BYTE_ARRAY",
}
@ -54,7 +51,6 @@ var ValueType_value = map[string]int32{
"VALUE_TYPE_UNSPECIFIED": 0,
"VALUE_TYPE_INT64": 1,
"VALUE_TYPE_UINT64": 2,
"VALUE_TYPE_STRING": 3,
"VALUE_TYPE_BYTE_ARRAY": 4,
}
@ -464,53 +460,53 @@ func init() {
}
var fileDescriptor_7ab9d5b21b743868 = []byte{
// 731 bytes of a gzipped FileDescriptorProto
// 728 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x55, 0x4f, 0x53, 0xda, 0x5e,
0x14, 0xe5, 0x01, 0xfa, 0x23, 0x17, 0xd4, 0xf8, 0x7e, 0x5a, 0x63, 0xa9, 0x29, 0xb5, 0x33, 0x95,
0x6a, 0x07, 0xa6, 0xd8, 0x69, 0xd7, 0xfc, 0x49, 0x9d, 0xcc, 0x68, 0xc8, 0x24, 0xd1, 0x19, 0xdc,
0x64, 0x62, 0x08, 0x34, 0x95, 0x24, 0x0c, 0x09, 0x54, 0x5c, 0x75, 0xe5, 0xba, 0x1f, 0xa3, 0x1f,
0x6a, 0x07, 0xa6, 0xd8, 0x69, 0xd7, 0xfc, 0x49, 0x9d, 0xcc, 0x68, 0xc8, 0x04, 0x74, 0x06, 0x37,
0x99, 0x67, 0x08, 0x34, 0x95, 0x24, 0x0c, 0x09, 0x54, 0x5c, 0x75, 0xe5, 0xba, 0x1f, 0xa3, 0x1f,
0xa5, 0x4b, 0x97, 0x2e, 0x2b, 0xce, 0x74, 0xba, 0xf4, 0x23, 0x74, 0x78, 0xe1, 0x4f, 0x04, 0xca,
0xb8, 0xe8, 0xee, 0x71, 0xce, 0xb9, 0xef, 0x5e, 0xee, 0x39, 0x99, 0x07, 0x1f, 0x9a, 0xe7, 0xf5,
0x6c, 0x55, 0xf3, 0x34, 0xe7, 0xec, 0x73, 0xd6, 0xb4, 0x3d, 0xa3, 0x65, 0x6b, 0x8d, 0xac, 0x65,
0x78, 0x5a, 0x1f, 0x24, 0x8c, 0x6b, 0x78, 0x56, 0x75, 0x7c, 0xca, 0x34, 0x5b, 0x8e, 0xe7, 0xe0,
0xe4, 0xa0, 0x28, 0x33, 0xd4, 0x66, 0x06, 0x8a, 0x4c, 0xe7, 0xed, 0xf6, 0xaf, 0x08, 0x40, 0xd1,
0x69, 0xb4, 0x2d, 0x9b, 0xb7, 0x6b, 0x0e, 0xc6, 0x10, 0xb5, 0x35, 0xcb, 0x60, 0x50, 0x0a, 0xa5,
0x29, 0x89, 0x9c, 0x31, 0x07, 0xd0, 0xd1, 0x1a, 0x6d, 0x43, 0xf5, 0xba, 0x4d, 0x83, 0x09, 0xa7,
0x50, 0x7a, 0x39, 0xf7, 0x2a, 0x33, 0xe7, 0xd2, 0xcc, 0x49, 0x5f, 0xae, 0x74, 0x9b, 0x86, 0x44,
0x75, 0x86, 0x47, 0xbc, 0x05, 0xd0, 0x72, 0xbe, 0xb8, 0xaa, 0xee, 0xb4, 0x6d, 0x8f, 0x89, 0xa4,
0x50, 0x3a, 0x2a, 0x51, 0x7d, 0xa4, 0xd8, 0x07, 0xb0, 0x00, 0x71, 0xdd, 0xb1, 0x9a, 0x2d, 0xc3,
0x75, 0x4d, 0xc7, 0x66, 0xa2, 0xa4, 0xcd, 0x9b, 0xb9, 0x6d, 0x8a, 0x63, 0x3d, 0x69, 0x16, 0xbc,
0x00, 0xef, 0xc1, 0x6a, 0xdb, 0x1e, 0x02, 0x46, 0x55, 0x75, 0xcd, 0x4b, 0x83, 0x59, 0x20, 0x5d,
0xe9, 0x20, 0x21, 0x9b, 0x97, 0x06, 0xde, 0x81, 0x95, 0x49, 0xe9, 0x22, 0x91, 0x2e, 0x4f, 0x0b,
0x87, 0x93, 0xa8, 0x4e, 0xad, 0xe6, 0x1a, 0x1e, 0xf3, 0x9f, 0x2f, 0x1c, 0xc2, 0x65, 0x82, 0xe2,
0x97, 0xb0, 0x34, 0x12, 0x92, 0xfb, 0x62, 0x44, 0x96, 0x18, 0x82, 0xe4, 0xb6, 0x03, 0x00, 0xd7,
0xd3, 0x3c, 0xd3, 0xf5, 0x4c, 0xdd, 0x65, 0xa8, 0x14, 0x4a, 0xc7, 0x73, 0x3b, 0x73, 0xff, 0xb2,
0x3c, 0x92, 0x4b, 0x81, 0x52, 0xfc, 0x02, 0x12, 0x64, 0xd1, 0xc3, 0xed, 0x02, 0x69, 0x16, 0xf7,
0x31, 0xb2, 0xdf, 0x6d, 0x17, 0x60, 0x5c, 0x8c, 0x93, 0x40, 0x59, 0xa6, 0xad, 0x12, 0x01, 0x31,
0x3b, 0x21, 0xc5, 0x2c, 0xd3, 0x26, 0xc6, 0x11, 0x52, 0xbb, 0x18, 0x90, 0xe1, 0x01, 0xa9, 0x5d,
0xf8, 0xe4, 0x1e, 0xac, 0xea, 0x5a, 0xab, 0x6a, 0xda, 0x5a, 0xc3, 0xf4, 0xba, 0x0f, 0xdc, 0xa4,
0x03, 0x84, 0xdf, 0xf4, 0x2a, 0x02, 0x31, 0x51, 0xab, 0x1b, 0x24, 0x5b, 0x33, 0x1d, 0x41, 0x8f,
0x77, 0x24, 0x3c, 0xd3, 0x91, 0x35, 0x58, 0xd0, 0x5b, 0xfa, 0x7e, 0x8e, 0xcc, 0xb0, 0x24, 0xf9,
0x3f, 0x26, 0xc2, 0x16, 0x9d, 0x0c, 0x1b, 0x07, 0x31, 0xc3, 0xd6, 0x9d, 0xaa, 0x69, 0xd7, 0x49,
0x26, 0x96, 0x73, 0xaf, 0xe7, 0xae, 0x9d, 0x1b, 0x88, 0x49, 0xcc, 0x46, 0xa5, 0xf8, 0x39, 0xc4,
0x83, 0x49, 0xf0, 0x23, 0x03, 0x81, 0x14, 0x24, 0x81, 0x1a, 0x27, 0xc0, 0x0f, 0x4a, 0xec, 0x2f,
0xee, 0xc7, 0xfe, 0x9d, 0xfb, 0xd4, 0x94, 0xfb, 0xbb, 0x57, 0x08, 0xa8, 0xd1, 0x57, 0x89, 0x9f,
0xc2, 0x93, 0x93, 0xfc, 0xe1, 0x31, 0xa7, 0x2a, 0x15, 0x91, 0x53, 0x8f, 0x05, 0x59, 0xe4, 0x8a,
0xfc, 0x47, 0x9e, 0x2b, 0xd1, 0x21, 0xbc, 0x06, 0x74, 0x80, 0xe3, 0x05, 0xe5, 0xfd, 0x3b, 0x1a,
0xe1, 0x75, 0x58, 0x0d, 0x56, 0xf8, 0x70, 0x78, 0x02, 0x96, 0x15, 0x89, 0x17, 0x0e, 0xe8, 0x08,
0xde, 0x84, 0xf5, 0x00, 0x5c, 0xa8, 0x28, 0x9c, 0x9a, 0x97, 0xa4, 0x7c, 0x85, 0x8e, 0xf6, 0x07,
0x59, 0x99, 0xf8, 0x6e, 0x71, 0x0a, 0x9e, 0x15, 0xcb, 0x47, 0xa2, 0xc4, 0xc9, 0x32, 0x5f, 0x16,
0x66, 0x0d, 0xb5, 0x09, 0xeb, 0x53, 0x0a, 0xa1, 0x2c, 0x70, 0x34, 0xc2, 0x49, 0xd8, 0x98, 0xa2,
0x64, 0x21, 0x2f, 0x8a, 0x15, 0x3a, 0x3c, 0xb3, 0xee, 0x54, 0x56, 0x4a, 0x74, 0x64, 0xb7, 0x0b,
0x89, 0xa0, 0xab, 0x78, 0x0b, 0x36, 0x39, 0xa1, 0x58, 0x2e, 0xf1, 0xc2, 0xc1, 0xac, 0x09, 0x36,
0xe0, 0xff, 0x87, 0xb4, 0x78, 0x98, 0xe7, 0x05, 0x1a, 0x4d, 0x13, 0x25, 0xee, 0x50, 0xc9, 0xd3,
0x61, 0xcc, 0xc0, 0xda, 0x43, 0xa2, 0xc0, 0x2b, 0x47, 0x79, 0x91, 0x8e, 0x14, 0x2e, 0xae, 0x6f,
0xd9, 0xd0, 0xcd, 0x2d, 0x1b, 0xba, 0xbf, 0x65, 0xd1, 0xd7, 0x1e, 0x8b, 0xbe, 0xf7, 0x58, 0xf4,
0xa3, 0xc7, 0xa2, 0xeb, 0x1e, 0x8b, 0x7e, 0xf6, 0x58, 0xf4, 0xbb, 0xc7, 0x86, 0xee, 0x7b, 0x2c,
0xfa, 0x76, 0xc7, 0x86, 0xae, 0xef, 0xd8, 0xd0, 0xcd, 0x1d, 0x1b, 0x3a, 0x2d, 0xd4, 0x4d, 0xef,
0x53, 0xfb, 0x2c, 0xa3, 0x3b, 0x56, 0xb6, 0xde, 0xd2, 0x6a, 0x9a, 0xad, 0x65, 0x1b, 0xce, 0xb9,
0x99, 0xed, 0xec, 0x67, 0x1f, 0xf9, 0x34, 0x9c, 0x2d, 0x92, 0x17, 0x61, 0xff, 0x4f, 0x00, 0x00,
0x00, 0xff, 0xff, 0x41, 0x87, 0xb6, 0x7e, 0x4c, 0x06, 0x00, 0x00,
0xb8, 0xe8, 0xee, 0x71, 0xce, 0xb9, 0xef, 0x5e, 0xee, 0x39, 0x0f, 0xe0, 0x43, 0xf3, 0xbc, 0x9e,
0xae, 0x12, 0x97, 0xd8, 0x67, 0x9f, 0xd3, 0x86, 0xe5, 0xea, 0x2d, 0x8b, 0x34, 0xd2, 0xa6, 0xee,
0x92, 0x3e, 0x48, 0x19, 0x47, 0x77, 0xcd, 0xea, 0xf8, 0x94, 0x6a, 0xb6, 0x6c, 0xd7, 0xc6, 0xf1,
0x41, 0x51, 0x6a, 0xa8, 0x4d, 0x0d, 0x14, 0xa9, 0xce, 0xdb, 0xed, 0x5f, 0x21, 0x80, 0xbc, 0xdd,
0x68, 0x9b, 0x96, 0x68, 0xd5, 0x6c, 0x8c, 0x21, 0x6c, 0x11, 0x53, 0xe7, 0x50, 0x02, 0x25, 0x19,
0x85, 0x9e, 0xb1, 0x00, 0xd0, 0x21, 0x8d, 0xb6, 0xae, 0xba, 0xdd, 0xa6, 0xce, 0x05, 0x13, 0x28,
0xb9, 0x9c, 0x79, 0x95, 0x9a, 0x73, 0x69, 0xea, 0xa4, 0x2f, 0x2f, 0x77, 0x9b, 0xba, 0xc2, 0x74,
0x86, 0x47, 0xbc, 0x05, 0xd0, 0xb2, 0xbf, 0x38, 0xaa, 0x66, 0xb7, 0x2d, 0x97, 0x0b, 0x25, 0x50,
0x32, 0xac, 0x30, 0x7d, 0x24, 0xdf, 0x07, 0xb0, 0x04, 0x51, 0xcd, 0x36, 0x9b, 0x2d, 0xdd, 0x71,
0x0c, 0xdb, 0xe2, 0xc2, 0xb4, 0xcd, 0x9b, 0xb9, 0x6d, 0xf2, 0x63, 0x3d, 0x6d, 0xe6, 0xbf, 0x00,
0xef, 0xc1, 0x6a, 0xdb, 0x1a, 0x02, 0x7a, 0x55, 0x75, 0x8c, 0x4b, 0x9d, 0x5b, 0xa0, 0x5d, 0x59,
0x3f, 0x51, 0x32, 0x2e, 0x75, 0xbc, 0x03, 0x2b, 0x93, 0xd2, 0x45, 0x2a, 0x5d, 0x9e, 0x16, 0x0e,
0x27, 0x51, 0xed, 0x5a, 0xcd, 0xd1, 0x5d, 0xee, 0x3f, 0x4f, 0x38, 0x84, 0x8b, 0x14, 0xc5, 0x2f,
0x61, 0x69, 0x24, 0xa4, 0xf7, 0x45, 0xa8, 0x2c, 0x36, 0x04, 0xe9, 0x6d, 0x07, 0x00, 0x8e, 0x4b,
0x5c, 0xc3, 0x71, 0x0d, 0xcd, 0xe1, 0x98, 0x04, 0x4a, 0x46, 0x33, 0x3b, 0x73, 0xbf, 0x72, 0x69,
0x24, 0x57, 0x7c, 0xa5, 0xf8, 0x05, 0xc4, 0xe8, 0xa2, 0x87, 0xdb, 0x05, 0xda, 0x2c, 0xea, 0x61,
0x74, 0xbf, 0xdb, 0x0e, 0xc0, 0xb8, 0x18, 0xc7, 0x81, 0x31, 0x0d, 0x4b, 0xa5, 0x02, 0x6a, 0x76,
0x4c, 0x89, 0x98, 0x86, 0x45, 0x8d, 0xa3, 0x24, 0xb9, 0x18, 0x90, 0xc1, 0x01, 0x49, 0x2e, 0x3c,
0x72, 0x0f, 0x56, 0x35, 0xd2, 0xaa, 0x1a, 0x16, 0x69, 0x18, 0x6e, 0xf7, 0x81, 0x9b, 0xac, 0x8f,
0xf0, 0x9a, 0x5e, 0x85, 0x20, 0x22, 0x93, 0xba, 0x4e, 0xb3, 0x35, 0xd3, 0x11, 0xf4, 0x78, 0x47,
0x82, 0x33, 0x1d, 0x59, 0x83, 0x05, 0xad, 0xa5, 0xed, 0x67, 0xe8, 0x0c, 0x4b, 0x8a, 0xf7, 0x61,
0x22, 0x6c, 0xe1, 0xc9, 0xb0, 0x09, 0x10, 0xd1, 0x2d, 0xcd, 0xae, 0x1a, 0x56, 0x9d, 0x66, 0x62,
0x39, 0xf3, 0x7a, 0xee, 0xda, 0x85, 0x81, 0x98, 0xc6, 0x6c, 0x54, 0x8a, 0x9f, 0x43, 0xd4, 0x9f,
0x04, 0x2f, 0x32, 0xe0, 0x4b, 0x41, 0x1c, 0x98, 0x71, 0x02, 0xbc, 0xa0, 0x44, 0xfe, 0xe2, 0x7e,
0xe4, 0xdf, 0xb9, 0xcf, 0x4c, 0xb9, 0xbf, 0xdb, 0x06, 0x66, 0xf4, 0x28, 0xf1, 0x53, 0x78, 0x72,
0x92, 0x3d, 0x3c, 0x16, 0xd4, 0x72, 0x45, 0x16, 0xd4, 0x63, 0xa9, 0x24, 0x0b, 0x79, 0xf1, 0xa3,
0x28, 0x14, 0xd8, 0x00, 0x5e, 0x03, 0xd6, 0xc7, 0x89, 0x52, 0xf9, 0xfd, 0x3b, 0x16, 0xe1, 0x75,
0x58, 0xf5, 0x57, 0x78, 0x70, 0x10, 0x6f, 0xc2, 0xba, 0x0f, 0xce, 0x55, 0xca, 0x82, 0x9a, 0x55,
0x94, 0x6c, 0x85, 0x0d, 0x6f, 0x87, 0x23, 0x21, 0x36, 0xb4, 0x7b, 0x85, 0x60, 0x65, 0xe2, 0x95,
0xe2, 0x04, 0x3c, 0xcb, 0x17, 0x8f, 0x64, 0x45, 0x28, 0x95, 0xc4, 0xa2, 0x34, 0x6b, 0x86, 0x4d,
0x58, 0x9f, 0x52, 0x48, 0x45, 0x49, 0x60, 0x11, 0x8e, 0xc3, 0xc6, 0x14, 0x55, 0x92, 0xb2, 0xb2,
0x5c, 0xf1, 0xc6, 0x99, 0x22, 0x4f, 0x4b, 0xe5, 0x02, 0x1b, 0xda, 0xed, 0x42, 0xcc, 0xef, 0x21,
0xde, 0x82, 0x4d, 0x41, 0xca, 0x17, 0x0b, 0xa2, 0x74, 0x30, 0x6b, 0x82, 0x0d, 0xf8, 0xff, 0x21,
0x2d, 0x1f, 0x66, 0x45, 0x89, 0x45, 0xd3, 0x44, 0x41, 0x38, 0x2c, 0x67, 0xd9, 0x20, 0xe6, 0x60,
0xed, 0x21, 0x91, 0x13, 0xcb, 0x47, 0x59, 0x99, 0x0d, 0xe5, 0x2e, 0xae, 0x6f, 0xf9, 0xc0, 0xcd,
0x2d, 0x1f, 0xb8, 0xbf, 0xe5, 0xd1, 0xd7, 0x1e, 0x8f, 0xbe, 0xf7, 0x78, 0xf4, 0xa3, 0xc7, 0xa3,
0xeb, 0x1e, 0x8f, 0x7e, 0xf6, 0x78, 0xf4, 0xbb, 0xc7, 0x07, 0xee, 0x7b, 0x3c, 0xfa, 0x76, 0xc7,
0x07, 0xae, 0xef, 0xf8, 0xc0, 0xcd, 0x1d, 0x1f, 0x38, 0xcd, 0xd5, 0x0d, 0xf7, 0x53, 0xfb, 0x2c,
0xa5, 0xd9, 0x66, 0xba, 0xde, 0x22, 0x35, 0x62, 0x91, 0x74, 0xc3, 0x3e, 0x37, 0xd2, 0x9d, 0xfd,
0xf4, 0x23, 0xff, 0x08, 0xce, 0x16, 0xe9, 0xef, 0xff, 0xfe, 0x9f, 0x00, 0x00, 0x00, 0xff, 0xff,
0xaa, 0x6b, 0x9a, 0xcf, 0x3a, 0x06, 0x00, 0x00,
}
func (x ValueType) String() string {

@ -50,11 +50,11 @@ enum ValueType {
// VALUE_TYPE_UINT64 is a column containing 64-bit unsigned integer values.
VALUE_TYPE_UINT64 = 2;
// VALUE_TYPE_STRING is a column containing string values.
VALUE_TYPE_STRING = 3;
// VALUE_TYPE_BYTE_ARRAY is a column containing bytes with no specific type.
VALUE_TYPE_BYTE_ARRAY = 4;
// Field 3 was VALUE_TYPE_STRING which was discontinued for performance reasons in favour of VALUE_TYPE_BYTE_ARRAY.
reserved 3;
}
// CompressionType represents the valid compression types that can be used for

@ -1,26 +1,27 @@
package logs
import (
"cmp"
"bytes"
"context"
"errors"
"fmt"
"io"
"slices"
"strings"
"time"
"github.com/prometheus/prometheus/model/labels"
"github.com/grafana/loki/v3/pkg/dataobj/internal/dataset"
"github.com/grafana/loki/v3/pkg/dataobj/internal/encoding"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/filemd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/logsmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/result"
"github.com/grafana/loki/v3/pkg/dataobj/internal/util/slicegrow"
)
// Iter iterates over records in the provided decoder. All logs sections are
// iterated over in order.
// Results objects returned to yield may be reused and must be copied for further use via DeepCopy().
func Iter(ctx context.Context, dec encoding.Decoder) result.Seq[Record] {
return result.Iter(func(yield func(Record) bool) error {
sections, err := dec.Sections(ctx)
@ -72,6 +73,7 @@ func IterSection(ctx context.Context, dec encoding.LogsDecoder, section *filemd.
defer r.Close()
var rows [1]dataset.Row
var record Record
for {
n, err := r.Read(ctx, rows[:])
if err != nil && !errors.Is(err, io.EOF) {
@ -79,9 +81,8 @@ func IterSection(ctx context.Context, dec encoding.LogsDecoder, section *filemd.
} else if n == 0 && errors.Is(err, io.EOF) {
return nil
}
for _, row := range rows[:n] {
record, err := Decode(streamsColumns, row)
err := Decode(streamsColumns, row, &record)
if err != nil || !yield(record) {
return err
}
@ -93,12 +94,11 @@ func IterSection(ctx context.Context, dec encoding.LogsDecoder, section *filemd.
// Decode decodes a record from a [dataset.Row], using the provided columns to
// determine the column type. The list of columns must match the columns used
// to create the row.
func Decode(columns []*logsmd.ColumnDesc, row dataset.Row) (Record, error) {
record := Record{
// Preallocate metadata to exact number of metadata columns to avoid
// oversizing.
Metadata: make(labels.Labels, 0, metadataColumns(columns)),
}
func Decode(columns []*logsmd.ColumnDesc, row dataset.Row, record *Record) error {
metadataColumns := metadataColumns(columns)
record.Metadata = slicegrow.GrowToCap(record.Metadata, metadataColumns)
record.Metadata = record.Metadata[:metadataColumns]
nextMetadataIdx := 0
for columnIndex, columnValue := range row.Values {
if columnValue.IsNil() || columnValue.IsZero() {
@ -109,44 +109,51 @@ func Decode(columns []*logsmd.ColumnDesc, row dataset.Row) (Record, error) {
switch column.Type {
case logsmd.COLUMN_TYPE_STREAM_ID:
if ty := columnValue.Type(); ty != datasetmd.VALUE_TYPE_INT64 {
return Record{}, fmt.Errorf("invalid type %s for %s", ty, column.Type)
return fmt.Errorf("invalid type %s for %s", ty, column.Type)
}
record.StreamID = columnValue.Int64()
case logsmd.COLUMN_TYPE_TIMESTAMP:
if ty := columnValue.Type(); ty != datasetmd.VALUE_TYPE_INT64 {
return Record{}, fmt.Errorf("invalid type %s for %s", ty, column.Type)
return fmt.Errorf("invalid type %s for %s", ty, column.Type)
}
record.Timestamp = time.Unix(0, columnValue.Int64())
case logsmd.COLUMN_TYPE_METADATA:
if ty := columnValue.Type(); ty != datasetmd.VALUE_TYPE_STRING {
return Record{}, fmt.Errorf("invalid type %s for %s", ty, column.Type)
if ty := columnValue.Type(); ty != datasetmd.VALUE_TYPE_BYTE_ARRAY {
return fmt.Errorf("invalid type %s for %s", ty, column.Type)
}
record.Metadata = append(record.Metadata, labels.Label{
Name: column.Info.Name,
Value: columnValue.String(),
})
// Convert the target pointer to a byte slice and grow it if necessary.
target := slicegrow.Copy(record.Metadata[nextMetadataIdx].Value, columnValue.ByteArray())
record.Metadata[nextMetadataIdx].Name = column.Info.Name
record.Metadata[nextMetadataIdx].Value = target
nextMetadataIdx++
case logsmd.COLUMN_TYPE_MESSAGE:
if ty := columnValue.Type(); ty != datasetmd.VALUE_TYPE_BYTE_ARRAY {
return Record{}, fmt.Errorf("invalid type %s for %s", ty, column.Type)
return fmt.Errorf("invalid type %s for %s", ty, column.Type)
}
record.Line = columnValue.ByteArray()
line := columnValue.ByteArray()
record.Line = slicegrow.Copy(record.Line, line)
}
}
// Truncate the metadata slice to the number of metadata columns we found.
record.Metadata = record.Metadata[:nextMetadataIdx]
// Metadata is originally sorted in received order; we sort it by key
// per-record since it might not be obvious why keys appear in a certain
// order.
slices.SortFunc(record.Metadata, func(a, b labels.Label) int {
if res := cmp.Compare(a.Name, b.Name); res != 0 {
slices.SortFunc(record.Metadata, func(a, b RecordMetadata) int {
if res := strings.Compare(a.Name, b.Name); res != 0 {
return res
}
return cmp.Compare(a.Value, b.Value)
return bytes.Compare(a.Value, b.Value)
})
return record, nil
return nil
}
func metadataColumns(columns []*logsmd.ColumnDesc) int {

@ -4,7 +4,6 @@ import (
"testing"
"time"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/dataobj/internal/dataset"
@ -33,15 +32,15 @@ func TestDecode(t *testing.T) {
Values: []dataset.Value{
dataset.Int64Value(123),
dataset.Int64Value(1234567890000000000),
dataset.StringValue("test-app"),
dataset.StringValue("prod"),
dataset.ByteArrayValue([]byte("test-app")),
dataset.ByteArrayValue([]byte("prod")),
dataset.ByteArrayValue([]byte("test message")),
},
},
expected: Record{
StreamID: 123,
Timestamp: time.Unix(0, 1234567890000000000),
Metadata: labels.FromStrings("app", "test-app", "env", "prod"),
Metadata: []RecordMetadata{{Name: "app", Value: []byte("test-app")}, {Name: "env", Value: []byte("prod")}},
Line: []byte("test message"),
},
},
@ -64,7 +63,7 @@ func TestDecode(t *testing.T) {
expected: Record{
StreamID: 123,
Timestamp: time.Unix(0, 1234567890000000000),
Metadata: labels.FromStrings(),
Metadata: []RecordMetadata{},
Line: []byte("test message"),
},
},
@ -75,7 +74,7 @@ func TestDecode(t *testing.T) {
},
row: dataset.Row{
Values: []dataset.Value{
dataset.StringValue("invalid"),
dataset.ByteArrayValue([]byte("invalid")),
},
},
wantErr: true,
@ -87,7 +86,7 @@ func TestDecode(t *testing.T) {
},
row: dataset.Row{
Values: []dataset.Value{
dataset.StringValue("invalid"),
dataset.ByteArrayValue([]byte("invalid")),
},
},
wantErr: true,
@ -120,7 +119,8 @@ func TestDecode(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
record, err := Decode(tt.columns, tt.row)
record := Record{}
err := Decode(tt.columns, tt.row, &record)
if tt.wantErr {
require.Error(t, err)
return

@ -11,8 +11,6 @@ import (
"github.com/klauspost/compress/zstd"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
"github.com/grafana/loki/v3/pkg/dataobj/internal/dataset"
"github.com/grafana/loki/v3/pkg/dataobj/internal/encoding"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/logsmd"
@ -23,10 +21,16 @@ import (
type Record struct {
StreamID int64
Timestamp time.Time
Metadata labels.Labels
Metadata []RecordMetadata
Line []byte
}
// A Labels-like type that holds byte buffers instead of strings.
type RecordMetadata struct {
Name string
Value []byte
}
// Options configures the behavior of the logs section.
type Options struct {
// PageSizeHint is the size of pages to use when encoding the logs section.

@ -6,7 +6,6 @@ import (
"testing"
"time"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/dataobj/internal/encoding"
@ -24,13 +23,13 @@ func Test(t *testing.T) {
{
StreamID: 2,
Timestamp: time.Unix(100, 0),
Metadata: labels.FromStrings("cluster", "test", "app", "bar"),
Metadata: []logs.RecordMetadata{{Name: "cluster", Value: []byte("test")}, {Name: "app", Value: []byte("bar")}},
Line: []byte("goodbye world"),
},
{
StreamID: 1,
Timestamp: time.Unix(5, 0),
Metadata: labels.FromStrings("cluster", "test", "app", "foo"),
Metadata: []logs.RecordMetadata{{Name: "cluster", Value: []byte("test")}, {Name: "app", Value: []byte("foo")}},
Line: []byte("foo bar"),
},
}
@ -56,36 +55,32 @@ func Test(t *testing.T) {
{
StreamID: 1,
Timestamp: time.Unix(5, 0),
Metadata: labels.FromStrings(
"app", "foo",
"cluster", "test",
),
Line: []byte("foo bar"),
Metadata: []logs.RecordMetadata{{Name: "app", Value: []byte("foo")}, {Name: "cluster", Value: []byte("test")}},
Line: []byte("foo bar"),
},
{
StreamID: 1,
Timestamp: time.Unix(10, 0),
Metadata: labels.FromStrings(),
Metadata: []logs.RecordMetadata{},
Line: []byte("hello world"),
},
{
StreamID: 2,
Timestamp: time.Unix(100, 0),
Metadata: labels.FromStrings("app", "bar", "cluster", "test"),
Metadata: []logs.RecordMetadata{{Name: "app", Value: []byte("bar")}, {Name: "cluster", Value: []byte("test")}},
Line: []byte("goodbye world"),
},
}
dec := encoding.ReaderAtDecoder(bytes.NewReader(buf), int64(len(buf)))
var actual []logs.Record
i := 0
for result := range logs.Iter(context.Background(), dec) {
record, err := result.Value()
require.NoError(t, err)
actual = append(actual, record)
require.Equal(t, expect[i], record)
i++
}
require.Equal(t, expect, actual)
}
func buildObject(lt *logs.Logs) ([]byte, error) {

@ -177,7 +177,7 @@ func (b *tableBuffer) Metadata(key string, pageSize int, compressionOpts dataset
col, err := dataset.NewColumnBuilder(key, dataset.BuilderOptions{
PageSizeHint: pageSize,
Value: datasetmd.VALUE_TYPE_STRING,
Value: datasetmd.VALUE_TYPE_BYTE_ARRAY,
Encoding: datasetmd.ENCODING_TYPE_PLAIN,
Compression: datasetmd.COMPRESSION_TYPE_ZSTD,
CompressionOptions: compressionOpts,

@ -31,7 +31,7 @@ func buildTable(buf *tableBuffer, pageSize int, compressionOpts dataset.Compress
for _, md := range record.Metadata {
metadataBuilder := buf.Metadata(md.Name, pageSize, compressionOpts)
_ = metadataBuilder.Append(i, dataset.StringValue(md.Value))
_ = metadataBuilder.Append(i, dataset.ByteArrayValue(md.Value))
}
}

@ -6,8 +6,7 @@ import (
"fmt"
"io"
"time"
"github.com/prometheus/prometheus/model/labels"
"unsafe"
"github.com/grafana/loki/v3/pkg/dataobj/internal/dataset"
"github.com/grafana/loki/v3/pkg/dataobj/internal/encoding"
@ -15,6 +14,7 @@ import (
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/filemd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/streamsmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/result"
"github.com/grafana/loki/v3/pkg/dataobj/internal/util/slicegrow"
)
// Iter iterates over streams in the provided decoder. All streams sections are
@ -78,11 +78,15 @@ func IterSection(ctx context.Context, dec encoding.StreamsDecoder, section *file
return nil
}
var stream Stream
for _, row := range rows[:n] {
stream, err := Decode(streamsColumns, row)
if err != nil || !yield(stream) {
if err := Decode(streamsColumns, row, &stream); err != nil {
return err
}
if !yield(stream) {
return nil
}
}
}
})
@ -91,8 +95,13 @@ func IterSection(ctx context.Context, dec encoding.StreamsDecoder, section *file
// Decode decodes a stream from a [dataset.Row], using the provided columns to
// determine the column type. The list of columns must match the columns used
// to create the row.
func Decode(columns []*streamsmd.ColumnDesc, row dataset.Row) (Stream, error) {
var stream Stream
func Decode(columns []*streamsmd.ColumnDesc, row dataset.Row, stream *Stream) error {
labelColumns := labelColumns(columns)
stream.Labels = slicegrow.GrowToCap(stream.Labels, labelColumns)
stream.Labels = stream.Labels[:labelColumns]
stream.LbValueCaps = slicegrow.GrowToCap(stream.LbValueCaps, labelColumns)
stream.LbValueCaps = stream.LbValueCaps[:labelColumns]
nextLabelIdx := 0
for columnIndex, columnValue := range row.Values {
if columnValue.IsNil() || columnValue.IsZero() {
@ -103,42 +112,47 @@ func Decode(columns []*streamsmd.ColumnDesc, row dataset.Row) (Stream, error) {
switch column.Type {
case streamsmd.COLUMN_TYPE_STREAM_ID:
if ty := columnValue.Type(); ty != datasetmd.VALUE_TYPE_INT64 {
return stream, fmt.Errorf("invalid type %s for %s", ty, column.Type)
return fmt.Errorf("invalid type %s for %s", ty, column.Type)
}
stream.ID = columnValue.Int64()
case streamsmd.COLUMN_TYPE_MIN_TIMESTAMP:
if ty := columnValue.Type(); ty != datasetmd.VALUE_TYPE_INT64 {
return stream, fmt.Errorf("invalid type %s for %s", ty, column.Type)
return fmt.Errorf("invalid type %s for %s", ty, column.Type)
}
stream.MinTimestamp = time.Unix(0, columnValue.Int64())
case streamsmd.COLUMN_TYPE_MAX_TIMESTAMP:
if ty := columnValue.Type(); ty != datasetmd.VALUE_TYPE_INT64 {
return stream, fmt.Errorf("invalid type %s for %s", ty, column.Type)
return fmt.Errorf("invalid type %s for %s", ty, column.Type)
}
stream.MaxTimestamp = time.Unix(0, columnValue.Int64())
case streamsmd.COLUMN_TYPE_ROWS:
if ty := columnValue.Type(); ty != datasetmd.VALUE_TYPE_INT64 {
return stream, fmt.Errorf("invalid type %s for %s", ty, column.Type)
return fmt.Errorf("invalid type %s for %s", ty, column.Type)
}
stream.Rows = int(columnValue.Int64())
case streamsmd.COLUMN_TYPE_UNCOMPRESSED_SIZE:
if ty := columnValue.Type(); ty != datasetmd.VALUE_TYPE_INT64 {
return stream, fmt.Errorf("invalid type %s for %s", ty, column.Type)
return fmt.Errorf("invalid type %s for %s", ty, column.Type)
}
stream.UncompressedSize = columnValue.Int64()
case streamsmd.COLUMN_TYPE_LABEL:
if ty := columnValue.Type(); ty != datasetmd.VALUE_TYPE_STRING {
return stream, fmt.Errorf("invalid type %s for %s", ty, column.Type)
if ty := columnValue.Type(); ty != datasetmd.VALUE_TYPE_BYTE_ARRAY {
return fmt.Errorf("invalid type %s for %s", ty, column.Type)
}
stream.Labels = append(stream.Labels, labels.Label{
Name: column.Info.Name,
Value: columnValue.String(),
})
// Convert the target pointer to a byte slice and grow it if necessary.
target := unsafeSlice(stream.Labels[nextLabelIdx].Value, stream.LbValueCaps[nextLabelIdx])
target = slicegrow.Copy(target, columnValue.ByteArray())
stream.LbValueCaps[nextLabelIdx] = cap(target)
stream.Labels[nextLabelIdx].Name = column.Info.Name
stream.Labels[nextLabelIdx].Value = unsafeString(target)
nextLabelIdx++
default:
// TODO(rfratto): We probably don't want to return an error on unexpected
@ -147,5 +161,28 @@ func Decode(columns []*streamsmd.ColumnDesc, row dataset.Row) (Stream, error) {
}
}
return stream, nil
stream.Labels = stream.Labels[:nextLabelIdx]
return nil
}
func labelColumns(columns []*streamsmd.ColumnDesc) int {
var count int
for _, column := range columns {
if column.Type == streamsmd.COLUMN_TYPE_LABEL {
count++
}
}
return count
}
func unsafeSlice(data string, capacity int) []byte {
if capacity <= 0 {
capacity = len(data)
}
return unsafe.Slice(unsafe.StringData(data), capacity)
}
func unsafeString(data []byte) string {
return unsafe.String(unsafe.SliceData(data), len(data))
}

@ -33,6 +33,8 @@ type Stream struct {
MaxTimestamp time.Time // Maximum timestamp in the stream.
UncompressedSize int64 // Uncompressed size of the log lines and structured metadata values in the stream.
Rows int // Number of rows in the stream.
LbValueCaps []int // Capacities for each label value's byte array
}
// Reset zeroes all values in the stream struct so it can be reused.
@ -261,7 +263,7 @@ func (s *Streams) EncodeTo(enc *encoding.Encoder) error {
builder, err := dataset.NewColumnBuilder(name, dataset.BuilderOptions{
PageSizeHint: s.pageSize,
Value: datasetmd.VALUE_TYPE_STRING,
Value: datasetmd.VALUE_TYPE_BYTE_ARRAY,
Encoding: datasetmd.ENCODING_TYPE_PLAIN,
Compression: datasetmd.COMPRESSION_TYPE_ZSTD,
Statistics: dataset.StatisticsOptions{
@ -291,7 +293,7 @@ func (s *Streams) EncodeTo(enc *encoding.Encoder) error {
if err != nil {
return fmt.Errorf("getting label column: %w", err)
}
_ = builder.Append(i, dataset.StringValue(label.Value))
_ = builder.Append(i, dataset.ByteArrayValue([]byte(label.Value)))
}
}

@ -3,6 +3,7 @@ package streams_test
import (
"bytes"
"context"
"strings"
"testing"
"time"
@ -60,12 +61,25 @@ func Test(t *testing.T) {
for result := range streams.Iter(context.Background(), dec) {
stream, err := result.Value()
require.NoError(t, err)
stream.Labels = copyLabels(stream.Labels)
stream.LbValueCaps = nil
actual = append(actual, stream)
}
require.Equal(t, expect, actual)
}
func copyLabels(in labels.Labels) labels.Labels {
lb := make(labels.Labels, len(in))
for i, label := range in {
lb[i] = labels.Label{
Name: strings.Clone(label.Name),
Value: strings.Clone(label.Value),
}
}
return lb
}
func buildObject(st *streams.Streams) ([]byte, error) {
var buf bytes.Buffer
enc := encoding.NewEncoder(&buf)

@ -0,0 +1,27 @@
package slicegrow
import "slices"
// GrowToCap grows the slice to at least n elements total capacity.
// It is an alternative to slices.Grow that increases the capacity of the slice instead of allowing n new appends.
// This is useful when the slice is expected to have nil values.
func GrowToCap[Slice ~[]E, E any](s Slice, n int) Slice {
if s == nil {
return make(Slice, 0, n)
}
return slices.Grow(s, max(0, n-len(s)))
}
func Copy[Slice ~[]E, E any](dst Slice, src Slice) Slice {
dst = GrowToCap(dst, len(src))
dst = dst[:len(src)]
copy(dst, src)
return dst
}
func CopyString[Slice ~[]byte](dst Slice, src string) Slice {
dst = GrowToCap(dst, len(src))
dst = dst[:len(src)]
copy(dst, src)
return dst
}

@ -0,0 +1,60 @@
// Package symbolizer provides a string interning mechanism to reduce memory usage
// by reusing identical strings.
//
// The Symbolizer maintains a cache of strings and returns the same instance
// when the same string is requested multiple times. This reduces memory usage
// when dealing with repeated strings, such as label names or values. It is not
// thread safe.
//
// When the cache exceeds the maximum size, a small percentage of entries are
// randomly discarded to keep memory usage under control.
package symbolizer
import (
"strings"
)
// New creates a new Symbolizer with the given initial capacity and maximum size.
func New(initialCapacity int, maxSize int) *Symbolizer {
return &Symbolizer{
symbols: make(map[string]string, initialCapacity),
maxSize: maxSize,
}
}
type Symbolizer struct {
symbols map[string]string
maxSize int
}
// Get returns a string from the symbolizer. If the string is not in the cache,
// a clone is inserted into the cache and returned.
//
// Get may delete some values from the cache prior to inserting a new value if
// the maximum size is exceeded.
func (s *Symbolizer) Get(name string) string {
if value, ok := s.symbols[name]; ok {
return value
}
// Control maximum memory use by discarding a random 1% of symbols if the map gets too big.
// We rely on Golang's unspecified map ordering to choose what to discard.
if len(s.symbols) > s.maxSize {
i := 0
for k := range s.symbols {
if i > s.maxSize/100 {
break
}
delete(s.symbols, k)
i++
}
}
newString := strings.Clone(name)
s.symbols[newString] = newString
return newString
}
// Reset clears the cache and resets the Symbolizer to its initial state,
// maintaining the existing maxSize.
func (s *Symbolizer) Reset() {
clear(s.symbols)
}

@ -1,16 +1,17 @@
package dataobj
import (
"cmp"
"context"
"errors"
"fmt"
"io"
"iter"
"maps"
"slices"
"sort"
"strconv"
"time"
"unsafe"
"github.com/prometheus/prometheus/model/labels"
@ -23,6 +24,8 @@ import (
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/logsmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/result"
"github.com/grafana/loki/v3/pkg/dataobj/internal/sections/logs"
"github.com/grafana/loki/v3/pkg/dataobj/internal/util/slicegrow"
"github.com/grafana/loki/v3/pkg/dataobj/internal/util/symbolizer"
)
// A Record is an individual log record in a data object.
@ -42,11 +45,14 @@ type LogsReader struct {
matchIDs map[int64]struct{}
predicate LogsPredicate
buf []dataset.Row
buf []dataset.Row
record logs.Record
reader *dataset.Reader
columns []dataset.Column
columnDesc []*logsmd.ColumnDesc
symbols *symbolizer.Symbolizer
}
// NewLogsReader creates a new LogsReader that reads from the logs section of
@ -109,7 +115,7 @@ func (r *LogsReader) Read(ctx context.Context, s []Record) (int, error) {
}
}
r.buf = slices.Grow(r.buf, len(s))
r.buf = slicegrow.GrowToCap(r.buf, len(s))
r.buf = r.buf[:len(s)]
n, err := r.reader.Read(ctx, r.buf)
@ -120,22 +126,37 @@ func (r *LogsReader) Read(ctx context.Context, s []Record) (int, error) {
}
for i := range r.buf[:n] {
readRecord, err := logs.Decode(r.columnDesc, r.buf[i])
err := logs.Decode(r.columnDesc, r.buf[i], &r.record)
if err != nil {
return i, fmt.Errorf("decoding record: %w", err)
}
s[i] = Record{
StreamID: readRecord.StreamID,
Timestamp: readRecord.Timestamp,
Metadata: readRecord.Metadata,
Line: readRecord.Line,
// Copy record data into pre-allocated output buffer
s[i].StreamID = r.record.StreamID
s[i].Timestamp = r.record.Timestamp
s[i].Metadata = slicegrow.GrowToCap(s[i].Metadata, len(r.record.Metadata))
s[i].Metadata = s[i].Metadata[:len(r.record.Metadata)]
for j := range r.record.Metadata {
s[i].Metadata[j].Name = r.symbols.Get(r.record.Metadata[j].Name)
s[i].Metadata[j].Value = r.symbols.Get(unsafeString(r.record.Metadata[j].Value))
}
s[i].Line = slicegrow.Copy(s[i].Line, r.record.Line)
}
return n, nil
}
func unsafeSlice(data string, capacity int) []byte {
if capacity <= 0 {
capacity = len(data)
}
return unsafe.Slice(unsafe.StringData(data), capacity)
}
func unsafeString(data []byte) string {
return unsafe.String(unsafe.SliceData(data), len(data))
}
func (r *LogsReader) initReader(ctx context.Context) error {
dec := r.obj.dec.LogsDecoder()
sec, err := r.findSection(ctx)
@ -178,6 +199,12 @@ func (r *LogsReader) initReader(ctx context.Context) error {
r.reader.Reset(readerOpts)
}
if r.symbols == nil {
r.symbols = symbolizer.New(128, 100_000)
} else {
r.symbols.Reset()
}
r.columnDesc = columnDescs
r.columns = columns
r.ready = true
@ -204,12 +231,17 @@ func (r *LogsReader) findSection(ctx context.Context) (*filemd.SectionInfo, erro
return nil, fmt.Errorf("section index %d not found", r.idx)
}
func convertMetadata(md push.LabelsAdapter) labels.Labels {
l := make(labels.Labels, 0, len(md))
func convertMetadata(md push.LabelsAdapter) []logs.RecordMetadata {
l := make([]logs.RecordMetadata, 0, len(md))
for _, label := range md {
l = append(l, labels.Label{Name: label.Name, Value: label.Value})
l = append(l, logs.RecordMetadata{Name: label.Name, Value: unsafeSlice(label.Value, 0)})
}
sort.Sort(l)
sort.Slice(l, func(i, j int) bool {
if l[i].Name == l[j].Name {
return cmp.Compare(unsafeString(l[i].Value), unsafeString(l[j].Value)) < 0
}
return cmp.Compare(l[i].Name, l[j].Name) < 0
})
return l
}
@ -231,6 +263,10 @@ func (r *LogsReader) Reset(obj *Object, sectionIndex int) {
r.columns = nil
r.columnDesc = nil
if r.symbols != nil {
r.symbols.Reset()
}
// We leave r.reader as-is to avoid reallocating; it'll be reset on the first
// call to Read.
}
@ -310,9 +346,9 @@ func translateLogsPredicate(p LogsPredicate, columns []dataset.Column, columnDes
return dataset.FuncPredicate{
Column: messageColumn,
Keep: func(_ dataset.Column, value dataset.Value) bool {
if value.Type() == datasetmd.VALUE_TYPE_STRING {
if value.Type() == datasetmd.VALUE_TYPE_BYTE_ARRAY {
// To handle older dataobjs that still use string type for message column. This can be removed in future.
return p.Keep([]byte(value.String()))
return p.Keep(value.ByteArray())
}
return p.Keep(value.ByteArray())
@ -328,7 +364,7 @@ func translateLogsPredicate(p LogsPredicate, columns []dataset.Column, columnDes
}
return dataset.EqualPredicate{
Column: metadataColumn,
Value: dataset.StringValue(p.Value),
Value: dataset.ByteArrayValue(unsafeSlice(p.Value, 0)),
}
case MetadataFilterPredicate:
@ -402,8 +438,8 @@ func valueToString(value dataset.Value) string {
return strconv.FormatInt(value.Int64(), 10)
case datasetmd.VALUE_TYPE_UINT64:
return strconv.FormatUint(value.Uint64(), 10)
case datasetmd.VALUE_TYPE_STRING:
return value.String()
case datasetmd.VALUE_TYPE_BYTE_ARRAY:
return unsafeString(value.ByteArray())
default:
panic(fmt.Sprintf("unsupported value type %s", value.Type()))
}

@ -23,11 +23,11 @@ import (
var recordsTestdata = []logs.Record{
{StreamID: 1, Timestamp: unixTime(10), Metadata: nil, Line: []byte("hello")},
{StreamID: 1, Timestamp: unixTime(15), Metadata: labels.FromStrings("trace_id", "123"), Line: []byte("world")},
{StreamID: 1, Timestamp: unixTime(15), Metadata: []logs.RecordMetadata{{Name: "trace_id", Value: []byte("123")}}, Line: []byte("world")},
{StreamID: 2, Timestamp: unixTime(5), Metadata: nil, Line: []byte("hello again")},
{StreamID: 2, Timestamp: unixTime(20), Metadata: labels.FromStrings("user", "12"), Line: []byte("world again")},
{StreamID: 3, Timestamp: unixTime(25), Metadata: labels.FromStrings("user", "14"), Line: []byte("hello one more time")},
{StreamID: 3, Timestamp: unixTime(30), Metadata: labels.FromStrings("trace_id", "123"), Line: []byte("world one more time")},
{StreamID: 2, Timestamp: unixTime(20), Metadata: []logs.RecordMetadata{{Name: "user", Value: []byte("12")}}, Line: []byte("world again")},
{StreamID: 3, Timestamp: unixTime(25), Metadata: []logs.RecordMetadata{{Name: "user", Value: []byte("14")}}, Line: []byte("hello one more time")},
{StreamID: 3, Timestamp: unixTime(30), Metadata: []logs.RecordMetadata{{Name: "trace_id", Value: []byte("123")}}, Line: []byte("world one more time")},
}
func metadata(kvps ...string) push.LabelsAdapter {
@ -173,7 +173,7 @@ func buildLogsObject(t *testing.T, opts logs.Options) *dataobj.Object {
func readAllRecords(ctx context.Context, r *dataobj.LogsReader) ([]dataobj.Record, error) {
var (
res []dataobj.Record
buf = make([]dataobj.Record, 128)
buf = make([]dataobj.Record, 4)
)
for {
@ -214,7 +214,7 @@ func BenchmarkLogsReader(b *testing.B) {
Timestamp: time.Now().Add(time.Duration(i) * time.Second),
Line: "hello world " + strconv.Itoa(i),
StructuredMetadata: push.LabelsAdapter{
{Name: "trace_id", Value: "123"},
{Name: "trace_id", Value: strconv.Itoa(i % 100)},
{Name: "pod", Value: "pod-abcd"},
},
},
@ -234,7 +234,6 @@ func BenchmarkLogsReader(b *testing.B) {
require.Equal(b, 1, md.LogsSections)
r := dataobj.NewLogsReader(obj, 0)
var (
recs = make([]dataobj.Record, 128)
ctx = context.Background()

@ -178,6 +178,7 @@ func newStreamProcessor(start, end time.Time, matchers []*labels.Matcher, object
}
// ProcessParallel processes series from multiple readers in parallel
// dataobj.Stream objects returned to onNewStream may be reused and must be deep copied for further use, including the stream.Labels keys and values.
func (sp *streamProcessor) ProcessParallel(ctx context.Context, onNewStream func(uint64, dataobj.Stream)) error {
readers, err := shardStreamReaders(ctx, sp.objects, sp.shard)
if err != nil {

@ -125,17 +125,17 @@ func TestStore_SelectSamples(t *testing.T) {
end: now.Add(time.Hour),
shards: []string{"0_of_2"},
want: []sampleWithLabels{
{Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: now.UnixNano(), Value: 1}},
{Labels: `{app="foo", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(10 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="bar", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(5 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="bar", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(8 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="baz", env="prod", team="a"}`, Samples: logproto.Sample{Timestamp: now.Add(12 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="foo", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(20 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="bar", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(15 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="bar", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(18 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="baz", env="prod", team="a"}`, Samples: logproto.Sample{Timestamp: now.Add(22 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(30 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="bar", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(25 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="baz", env="prod", team="a"}`, Samples: logproto.Sample{Timestamp: now.Add(32 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="foo", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(35 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="bar", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(38 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="bar", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(40 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="baz", env="prod", team="a"}`, Samples: logproto.Sample{Timestamp: now.Add(42 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(45 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(50 * time.Second).UnixNano(), Value: 1}},
},
},
{
@ -145,13 +145,13 @@ func TestStore_SelectSamples(t *testing.T) {
end: now.Add(time.Hour),
shards: []string{"1_of_2"},
want: []sampleWithLabels{
{Labels: `{app="bar", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(5 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="bar", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(8 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="bar", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(15 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="bar", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(18 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="bar", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(25 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="bar", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(38 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="bar", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(40 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: now.UnixNano(), Value: 1}},
{Labels: `{app="foo", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(10 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="foo", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(20 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(30 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="foo", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(35 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(45 * time.Second).UnixNano(), Value: 1}},
{Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(50 * time.Second).UnixNano(), Value: 1}},
},
},
{
@ -285,17 +285,17 @@ func TestStore_SelectLogs(t *testing.T) {
limit: 100,
direction: logproto.FORWARD,
want: []entryWithLabels{
{Labels: `{app="foo", env="prod"}`, Entry: logproto.Entry{Timestamp: now, Line: "foo1"}},
{Labels: `{app="foo", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(10 * time.Second), Line: "foo5"}},
{Labels: `{app="bar", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(5 * time.Second), Line: "bar1"}},
{Labels: `{app="bar", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(8 * time.Second), Line: "bar5"}},
{Labels: `{app="baz", env="prod", team="a"}`, Entry: logproto.Entry{Timestamp: now.Add(12 * time.Second), Line: "baz1"}},
{Labels: `{app="foo", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(20 * time.Second), Line: "foo6"}},
{Labels: `{app="bar", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(15 * time.Second), Line: "bar2"}},
{Labels: `{app="bar", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(18 * time.Second), Line: "bar6"}},
{Labels: `{app="baz", env="prod", team="a"}`, Entry: logproto.Entry{Timestamp: now.Add(22 * time.Second), Line: "baz2"}},
{Labels: `{app="foo", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(30 * time.Second), Line: "foo2"}},
{Labels: `{app="bar", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(25 * time.Second), Line: "bar3"}},
{Labels: `{app="baz", env="prod", team="a"}`, Entry: logproto.Entry{Timestamp: now.Add(32 * time.Second), Line: "baz3"}},
{Labels: `{app="foo", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(35 * time.Second), Line: "foo7"}},
{Labels: `{app="bar", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(38 * time.Second), Line: "bar7"}},
{Labels: `{app="bar", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(40 * time.Second), Line: "bar4"}},
{Labels: `{app="baz", env="prod", team="a"}`, Entry: logproto.Entry{Timestamp: now.Add(42 * time.Second), Line: "baz4"}},
{Labels: `{app="foo", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(45 * time.Second), Line: "foo3"}},
{Labels: `{app="foo", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(50 * time.Second), Line: "foo4"}},
},
},
{
@ -307,13 +307,13 @@ func TestStore_SelectLogs(t *testing.T) {
limit: 100,
direction: logproto.FORWARD,
want: []entryWithLabels{
{Labels: `{app="bar", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(5 * time.Second), Line: "bar1"}},
{Labels: `{app="bar", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(8 * time.Second), Line: "bar5"}},
{Labels: `{app="bar", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(15 * time.Second), Line: "bar2"}},
{Labels: `{app="bar", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(18 * time.Second), Line: "bar6"}},
{Labels: `{app="bar", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(25 * time.Second), Line: "bar3"}},
{Labels: `{app="bar", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(38 * time.Second), Line: "bar7"}},
{Labels: `{app="bar", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(40 * time.Second), Line: "bar4"}},
{Labels: `{app="foo", env="prod"}`, Entry: logproto.Entry{Timestamp: now, Line: "foo1"}},
{Labels: `{app="foo", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(10 * time.Second), Line: "foo5"}},
{Labels: `{app="foo", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(20 * time.Second), Line: "foo6"}},
{Labels: `{app="foo", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(30 * time.Second), Line: "foo2"}},
{Labels: `{app="foo", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(35 * time.Second), Line: "foo7"}},
{Labels: `{app="foo", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(45 * time.Second), Line: "foo3"}},
{Labels: `{app="foo", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(50 * time.Second), Line: "foo4"}},
},
},
{

@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"io"
"slices"
"time"
"github.com/prometheus/prometheus/model/labels"
@ -16,6 +15,8 @@ import (
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/streamsmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/result"
"github.com/grafana/loki/v3/pkg/dataobj/internal/sections/streams"
"github.com/grafana/loki/v3/pkg/dataobj/internal/util/slicegrow"
"github.com/grafana/loki/v3/pkg/dataobj/internal/util/symbolizer"
)
// A Stream is an individual stream in a data object.
@ -43,11 +44,14 @@ type StreamsReader struct {
predicate StreamsPredicate
buf []dataset.Row
buf []dataset.Row
stream streams.Stream
reader *dataset.Reader
columns []dataset.Column
columnDesc []*streamsmd.ColumnDesc
symbols *symbolizer.Symbolizer
}
// NewStreamsReader creates a new StreamsReader that reads from the streams
@ -92,9 +96,8 @@ func (r *StreamsReader) Read(ctx context.Context, s []Stream) (int, error) {
}
}
r.buf = slices.Grow(r.buf, len(s))
r.buf = slicegrow.GrowToCap(r.buf, len(s))
r.buf = r.buf[:len(s)]
n, err := r.reader.Read(ctx, r.buf)
if err != nil && !errors.Is(err, io.EOF) {
return 0, fmt.Errorf("reading rows: %w", err)
@ -103,17 +106,20 @@ func (r *StreamsReader) Read(ctx context.Context, s []Stream) (int, error) {
}
for i := range r.buf[:n] {
readStream, err := streams.Decode(r.columnDesc, r.buf[i])
if err != nil {
if err := streams.Decode(r.columnDesc, r.buf[i], &r.stream); err != nil {
return i, fmt.Errorf("decoding stream: %w", err)
}
s[i] = Stream{
ID: readStream.ID,
MinTime: readStream.MinTimestamp,
MaxTime: readStream.MaxTimestamp,
UncompressedSize: readStream.UncompressedSize,
Labels: readStream.Labels,
// Copy record data into pre-allocated output buffer
s[i].ID = r.stream.ID
s[i].MinTime = r.stream.MinTimestamp
s[i].MaxTime = r.stream.MaxTimestamp
s[i].UncompressedSize = r.stream.UncompressedSize
s[i].Labels = slicegrow.GrowToCap(s[i].Labels, len(r.stream.Labels))
s[i].Labels = s[i].Labels[:len(r.stream.Labels)]
for j := range r.stream.Labels {
s[i].Labels[j].Name = r.symbols.Get(r.stream.Labels[j].Name)
s[i].Labels[j].Value = r.symbols.Get(r.stream.Labels[j].Value)
}
}
@ -152,6 +158,12 @@ func (r *StreamsReader) initReader(ctx context.Context) error {
r.reader.Reset(readerOpts)
}
if r.symbols == nil {
r.symbols = symbolizer.New(128, 100_000)
} else {
r.symbols.Reset()
}
r.columnDesc = columnDescs
r.columns = columns
r.ready = true
@ -193,6 +205,10 @@ func (r *StreamsReader) Reset(obj *Object, sectionIndex int) {
r.columns = nil
r.columnDesc = nil
if r.symbols != nil {
r.symbols.Reset()
}
// We leave r.reader as-is to avoid reallocating; it'll be reset on the first
// call to Read.
}
@ -250,7 +266,7 @@ func translateStreamsPredicate(p StreamsPredicate, columns []dataset.Column, col
}
return dataset.EqualPredicate{
Column: metadataColumn,
Value: dataset.StringValue(p.Value),
Value: dataset.ByteArrayValue(unsafeSlice(p.Value, 0)),
}
case LabelFilterPredicate:

Loading…
Cancel
Save