chore(dataobj): fix regression in allocations of row readers (#17784)

pull/17756/head
Robert Fratto 12 months ago committed by GitHub
parent 708c8e4983
commit 36bcb5aa2d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 20
      pkg/dataobj/consumer/logsobj/builder.go
  2. 20
      pkg/dataobj/querier/iter.go
  3. 9
      pkg/dataobj/sections/logs/builder.go
  4. 11
      pkg/dataobj/sections/logs/builder_test.go
  5. 26
      pkg/dataobj/sections/logs/iter.go
  6. 7
      pkg/dataobj/sections/logs/iter_test.go
  7. 29
      pkg/dataobj/sections/logs/row_reader.go
  8. 4
      pkg/dataobj/sections/logs/table_build.go
  9. 2
      pkg/dataobj/sections/streams/builder.go
  10. 1
      pkg/dataobj/sections/streams/builder_test.go
  11. 23
      pkg/dataobj/sections/streams/iter.go
  12. 18
      pkg/dataobj/sections/streams/row_reader.go
  13. 12
      pkg/dataobj/sections/streams/row_reader_test.go
  14. 11
      pkg/engine/executor/dataobjscan.go

@ -10,7 +10,6 @@ import (
"fmt"
"sort"
"time"
"unsafe"
"github.com/grafana/dskit/flagext"
lru "github.com/hashicorp/golang-lru/v2"
@ -272,31 +271,20 @@ func streamSizeEstimate(stream logproto.Stream) int {
return size
}
func convertMetadata(md push.LabelsAdapter) []logs.RecordMetadata {
l := make([]logs.RecordMetadata, 0, len(md))
func convertMetadata(md push.LabelsAdapter) labels.Labels {
l := make(labels.Labels, 0, len(md))
for _, label := range md {
l = append(l, logs.RecordMetadata{Name: label.Name, Value: unsafeSlice(label.Value, 0)})
l = append(l, labels.Label{Name: label.Name, Value: label.Value})
}
sort.Slice(l, func(i, j int) bool {
if l[i].Name == l[j].Name {
return cmp.Compare(unsafeString(l[i].Value), unsafeString(l[j].Value)) < 0
return cmp.Compare(l[i].Value, l[j].Value) < 0
}
return cmp.Compare(l[i].Name, l[j].Name) < 0
})
return l
}
func unsafeSlice(data string, capacity int) []byte {
if capacity <= 0 {
capacity = len(data)
}
return unsafe.Slice(unsafe.StringData(data), capacity)
}
func unsafeString(data []byte) string {
return unsafe.String(unsafe.SliceData(data), len(data))
}
func (b *Builder) estimatedSize() int {
var size int
size += b.streams.EstimatedSize()

@ -7,8 +7,6 @@ import (
"slices"
"sync"
"github.com/prometheus/prometheus/model/labels"
"github.com/grafana/loki/v3/pkg/dataobj/sections/logs"
"github.com/grafana/loki/v3/pkg/dataobj/sections/streams"
"github.com/grafana/loki/v3/pkg/iter"
@ -103,7 +101,7 @@ func newEntryIterator(ctx context.Context,
}
timestamp := record.Timestamp.UnixNano()
line, parsedLabels, ok := streamExtractor.Process(timestamp, record.Line, metadataToLabels(record.Metadata))
line, parsedLabels, ok := streamExtractor.Process(timestamp, record.Line, record.Metadata)
if !ok {
continue
}
@ -125,20 +123,6 @@ func newEntryIterator(ctx context.Context,
return heapIterator(&top), nil
}
func metadataToLabels(md []logs.RecordMetadata) labels.Labels {
// TODO(rfratto): The conversion here undoes some of the memory optimization
// work performed a few weeks ago; we need to revisit how to avoid the
// conversion here while still reusing memory where we can.
res := make(labels.Labels, len(md))
for i, m := range md {
res[i] = labels.Label{
Name: m.Name,
Value: string(m.Value),
}
}
return res
}
func lessFn(direction logproto.Direction) func(a, b entryWithLabels) bool {
switch direction {
case logproto.FORWARD:
@ -271,7 +255,7 @@ func newSampleIterator(ctx context.Context,
// In the case of multi-variant expressions, the only difference between the multiple extractors should be the final value, with all
// other filters and processing already done.
statistics.AddDecompressedLines(1)
samples, ok := streamExtractor.Process(timestamp, record.Line, metadataToLabels(record.Metadata)...)
samples, ok := streamExtractor.Process(timestamp, record.Line, record.Metadata...)
if !ok {
continue
}

@ -8,6 +8,7 @@ import (
"github.com/klauspost/compress/zstd"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/dataobj/internal/dataset"
@ -19,16 +20,10 @@ import (
type Record struct {
StreamID int64
Timestamp time.Time
Metadata []RecordMetadata
Metadata labels.Labels
Line []byte
}
// A Labels-like type that holds byte buffers instead of strings.
type RecordMetadata struct {
Name string
Value []byte
}
// BuilderOptions configures the behavior of the logs section.
type BuilderOptions struct {
// PageSizeHint is the size of pages to use when encoding the logs section.

@ -6,6 +6,7 @@ import (
"testing"
"time"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/dataobj"
@ -23,13 +24,13 @@ func Test(t *testing.T) {
{
StreamID: 2,
Timestamp: time.Unix(100, 0),
Metadata: []logs.RecordMetadata{{Name: "cluster", Value: []byte("test")}, {Name: "app", Value: []byte("bar")}},
Metadata: []labels.Label{{Name: "cluster", Value: "test"}, {Name: "app", Value: "bar"}},
Line: []byte("goodbye world"),
},
{
StreamID: 1,
Timestamp: time.Unix(5, 0),
Metadata: []logs.RecordMetadata{{Name: "cluster", Value: []byte("test")}, {Name: "app", Value: []byte("foo")}},
Metadata: []labels.Label{{Name: "cluster", Value: "test"}, {Name: "app", Value: "foo"}},
Line: []byte("foo bar"),
},
}
@ -54,19 +55,19 @@ func Test(t *testing.T) {
{
StreamID: 1,
Timestamp: time.Unix(5, 0),
Metadata: []logs.RecordMetadata{{Name: "app", Value: []byte("foo")}, {Name: "cluster", Value: []byte("test")}},
Metadata: []labels.Label{{Name: "app", Value: "foo"}, {Name: "cluster", Value: "test"}},
Line: []byte("foo bar"),
},
{
StreamID: 1,
Timestamp: time.Unix(10, 0),
Metadata: []logs.RecordMetadata{},
Metadata: []labels.Label{},
Line: []byte("hello world"),
},
{
StreamID: 2,
Timestamp: time.Unix(100, 0),
Metadata: []logs.RecordMetadata{{Name: "app", Value: []byte("bar")}, {Name: "cluster", Value: []byte("test")}},
Metadata: []labels.Label{{Name: "app", Value: "bar"}, {Name: "cluster", Value: "test"}},
Line: []byte("goodbye world"),
},
}

@ -1,7 +1,6 @@
package logs
import (
"bytes"
"context"
"errors"
"fmt"
@ -10,12 +9,15 @@ import (
"strings"
"time"
"github.com/prometheus/prometheus/model/labels"
"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/dataobj/internal/dataset"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/logsmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/result"
"github.com/grafana/loki/v3/pkg/dataobj/internal/util/slicegrow"
"github.com/grafana/loki/v3/pkg/dataobj/internal/util/symbolizer"
)
// Iter iterates over records in the provided decoder. All logs sections are
@ -77,7 +79,7 @@ func IterSection(ctx context.Context, section *Section) result.Seq[Record] {
return nil
}
for _, row := range rows[:n] {
err := decodeRow(streamsColumns, row, &record)
err := decodeRow(streamsColumns, row, &record, nil)
if err != nil || !yield(record) {
return err
}
@ -89,7 +91,10 @@ func IterSection(ctx context.Context, section *Section) result.Seq[Record] {
// decodeRow decodes a record from a [dataset.Row], using the provided columns
// to determine the column type. The list of columns must match the columns
// used to create the row.
func decodeRow(columns []*logsmd.ColumnDesc, row dataset.Row, record *Record) error {
//
// The sym argument is used for reusing metadata strings between calls to
// decodeRow. If sym is nil, metadata strings are always allocated.
func decodeRow(columns []*logsmd.ColumnDesc, row dataset.Row, record *Record, sym *symbolizer.Symbolizer) error {
metadataColumns := metadataColumns(columns)
record.Metadata = slicegrow.GrowToCap(record.Metadata, metadataColumns)
record.Metadata = record.Metadata[:metadataColumns]
@ -119,11 +124,14 @@ func decodeRow(columns []*logsmd.ColumnDesc, row dataset.Row, record *Record) er
return fmt.Errorf("invalid type %s for %s", ty, column.Type)
}
// Convert the target pointer to a byte slice and grow it if necessary.
target := slicegrow.Copy(record.Metadata[nextMetadataIdx].Value, columnValue.ByteArray())
record.Metadata[nextMetadataIdx].Name = column.Info.Name
record.Metadata[nextMetadataIdx].Value = target
if sym != nil {
record.Metadata[nextMetadataIdx].Value = sym.Get(unsafeString(columnValue.ByteArray()))
} else {
record.Metadata[nextMetadataIdx].Value = string(columnValue.ByteArray())
}
nextMetadataIdx++
case logsmd.COLUMN_TYPE_MESSAGE:
@ -141,11 +149,11 @@ func decodeRow(columns []*logsmd.ColumnDesc, row dataset.Row, record *Record) er
// Metadata is originally sorted in received order; we sort it by key
// per-record since it might not be obvious why keys appear in a certain
// order.
slices.SortFunc(record.Metadata, func(a, b RecordMetadata) int {
slices.SortFunc(record.Metadata, func(a, b labels.Label) int {
if res := strings.Compare(a.Name, b.Name); res != 0 {
return res
}
return bytes.Compare(a.Value, b.Value)
return strings.Compare(a.Value, b.Value)
})
return nil

@ -4,6 +4,7 @@ import (
"testing"
"time"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/dataobj/internal/dataset"
@ -40,7 +41,7 @@ func TestDecode(t *testing.T) {
expected: Record{
StreamID: 123,
Timestamp: time.Unix(0, 1234567890000000000),
Metadata: []RecordMetadata{{Name: "app", Value: []byte("test-app")}, {Name: "env", Value: []byte("prod")}},
Metadata: []labels.Label{{Name: "app", Value: "test-app"}, {Name: "env", Value: "prod"}},
Line: []byte("test message"),
},
},
@ -63,7 +64,7 @@ func TestDecode(t *testing.T) {
expected: Record{
StreamID: 123,
Timestamp: time.Unix(0, 1234567890000000000),
Metadata: []RecordMetadata{},
Metadata: []labels.Label{},
Line: []byte("test message"),
},
},
@ -120,7 +121,7 @@ func TestDecode(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
record := Record{}
err := decodeRow(tt.columns, tt.row, &record)
err := decodeRow(tt.columns, tt.row, &record, nil)
if tt.wantErr {
require.Error(t, err)
return

@ -12,6 +12,8 @@ import (
"strconv"
"unsafe"
"github.com/prometheus/prometheus/model/labels"
"github.com/grafana/loki/pkg/push"
"github.com/grafana/loki/v3/pkg/dataobj/internal/dataset"
@ -30,8 +32,7 @@ type RowReader struct {
matchIDs map[int64]struct{}
predicates []RowPredicate
buf []dataset.Row
record Record
buf []dataset.Row
reader *dataset.Reader
columns []dataset.Column
@ -108,24 +109,10 @@ func (r *RowReader) Read(ctx context.Context, s []Record) (int, error) {
}
for i := range r.buf[:n] {
err := decodeRow(r.columnDesc, r.buf[i], &r.record)
err := decodeRow(r.columnDesc, r.buf[i], &s[i], r.symbols)
if err != nil {
return i, fmt.Errorf("decoding record: %w", err)
}
// Copy record data into pre-allocated output buffer
s[i].StreamID = r.record.StreamID
s[i].Timestamp = r.record.Timestamp
s[i].Metadata = slicegrow.GrowToCap(s[i].Metadata, len(r.record.Metadata))
s[i].Metadata = s[i].Metadata[:len(r.record.Metadata)]
for j := range r.record.Metadata {
// TODO(rfratto): the slice conversion is likely to undo some of the
// memory optimization work; we need to double check how we should reuse
// memory if there's no inner type to translate from.
s[i].Metadata[j].Name = r.symbols.Get(r.record.Metadata[j].Name)
s[i].Metadata[j].Value = []byte(r.symbols.Get(unsafeString(r.record.Metadata[j].Value)))
}
s[i].Line = slicegrow.Copy(s[i].Line, r.record.Line)
}
return n, nil
@ -195,14 +182,14 @@ func (r *RowReader) initReader(ctx context.Context) error {
return nil
}
func convertMetadata(md push.LabelsAdapter) []RecordMetadata {
l := make([]RecordMetadata, 0, len(md))
func convertMetadata(md push.LabelsAdapter) labels.Labels {
l := make(labels.Labels, 0, len(md))
for _, label := range md {
l = append(l, RecordMetadata{Name: label.Name, Value: unsafeSlice(label.Value, 0)})
l = append(l, labels.Label{Name: label.Name, Value: label.Value})
}
sort.Slice(l, func(i, j int) bool {
if l[i].Name == l[j].Name {
return cmp.Compare(unsafeString(l[i].Value), unsafeString(l[j].Value)) < 0
return cmp.Compare(l[i].Value, l[j].Value) < 0
}
return cmp.Compare(l[i].Name, l[j].Name) < 0
})

@ -30,8 +30,10 @@ func buildTable(buf *tableBuffer, pageSize int, compressionOpts dataset.Compress
_ = messageBuilder.Append(i, dataset.ByteArrayValue(record.Line))
for _, md := range record.Metadata {
// Passing around md.Value as an unsafe slice is safe here: appending
// values is always read-only and the byte slice will never be mutated.
metadataBuilder := buf.Metadata(md.Name, pageSize, compressionOpts)
_ = metadataBuilder.Append(i, dataset.ByteArrayValue(md.Value))
_ = metadataBuilder.Append(i, dataset.ByteArrayValue(unsafeSlice(md.Value, 0)))
}
}

@ -38,8 +38,6 @@ type Stream struct {
// Total number of log records in the stream.
Rows int
LbValueCaps []int // Capacities for each label value's byte array
}
// Reset zeroes all values in the stream struct so it can be reused.

@ -63,7 +63,6 @@ func Test(t *testing.T) {
stream, err := result.Value()
require.NoError(t, err)
stream.Labels = copyLabels(stream.Labels)
stream.LbValueCaps = nil
actual = append(actual, stream)
}

@ -14,6 +14,7 @@ import (
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/streamsmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/result"
"github.com/grafana/loki/v3/pkg/dataobj/internal/util/slicegrow"
"github.com/grafana/loki/v3/pkg/dataobj/internal/util/symbolizer"
)
// Iter iterates over streams in the provided decoder. All streams sections are
@ -75,7 +76,7 @@ func IterSection(ctx context.Context, section *Section) result.Seq[Stream] {
var stream Stream
for _, row := range rows[:n] {
if err := decodeRow(streamsColumns, row, &stream); err != nil {
if err := decodeRow(streamsColumns, row, &stream, nil); err != nil {
return err
}
@ -90,12 +91,13 @@ func IterSection(ctx context.Context, section *Section) result.Seq[Stream] {
// decodeRow decodes a stream from a [dataset.Row], using the provided columns to
// determine the column type. The list of columns must match the columns used
// to create the row.
func decodeRow(columns []*streamsmd.ColumnDesc, row dataset.Row, stream *Stream) error {
//
// The sym argument is used for reusing label values between calls to
// decodeRow. If sym is nil, label value strings are always allocated.
func decodeRow(columns []*streamsmd.ColumnDesc, row dataset.Row, stream *Stream, sym *symbolizer.Symbolizer) error {
labelColumns := labelColumns(columns)
stream.Labels = slicegrow.GrowToCap(stream.Labels, labelColumns)
stream.Labels = stream.Labels[:labelColumns]
stream.LbValueCaps = slicegrow.GrowToCap(stream.LbValueCaps, labelColumns)
stream.LbValueCaps = stream.LbValueCaps[:labelColumns]
nextLabelIdx := 0
for columnIndex, columnValue := range row.Values {
@ -140,13 +142,14 @@ func decodeRow(columns []*streamsmd.ColumnDesc, row dataset.Row, stream *Stream)
return fmt.Errorf("invalid type %s for %s", ty, column.Type)
}
// Convert the target pointer to a byte slice and grow it if necessary.
target := unsafeSlice(stream.Labels[nextLabelIdx].Value, stream.LbValueCaps[nextLabelIdx])
target = slicegrow.Copy(target, columnValue.ByteArray())
stream.LbValueCaps[nextLabelIdx] = cap(target)
stream.Labels[nextLabelIdx].Name = column.Info.Name
stream.Labels[nextLabelIdx].Value = unsafeString(target)
if sym != nil {
stream.Labels[nextLabelIdx].Value = sym.Get(unsafeString(columnValue.ByteArray()))
} else {
stream.Labels[nextLabelIdx].Value = string(columnValue.ByteArray())
}
nextLabelIdx++
default:

@ -22,8 +22,7 @@ type RowReader struct {
predicate RowPredicate
buf []dataset.Row
stream Stream
buf []dataset.Row
reader *dataset.Reader
columns []dataset.Column
@ -82,22 +81,9 @@ func (r *RowReader) Read(ctx context.Context, s []Stream) (int, error) {
}
for i := range r.buf[:n] {
if err := decodeRow(r.columnDesc, r.buf[i], &r.stream); err != nil {
if err := decodeRow(r.columnDesc, r.buf[i], &s[i], r.symbols); err != nil {
return i, fmt.Errorf("decoding stream: %w", err)
}
// Copy record data into pre-allocated output buffer
s[i].ID = r.stream.ID
s[i].MinTimestamp = r.stream.MinTimestamp
s[i].MaxTimestamp = r.stream.MaxTimestamp
s[i].UncompressedSize = r.stream.UncompressedSize
s[i].Labels = slicegrow.GrowToCap(s[i].Labels, len(r.stream.Labels))
s[i].Labels = s[i].Labels[:len(r.stream.Labels)]
for j := range r.stream.Labels {
s[i].Labels[j].Name = r.symbols.Get(r.stream.Labels[j].Name)
s[i].Labels[j].Value = r.symbols.Get(r.stream.Labels[j].Value)
}
s[i].Rows = r.stream.Rows
}
return n, nil

@ -31,9 +31,9 @@ var streamsTestdata = []struct {
func TestRowReader(t *testing.T) {
expect := []streams.Stream{
{1, unixTime(10), unixTime(15), 25, labels.FromStrings("cluster", "test", "app", "foo"), 2, nil},
{2, unixTime(5), unixTime(20), 45, labels.FromStrings("cluster", "test", "app", "bar"), 2, nil},
{3, unixTime(25), unixTime(30), 35, labels.FromStrings("cluster", "test", "app", "baz"), 2, nil},
{1, unixTime(10), unixTime(15), 25, labels.FromStrings("cluster", "test", "app", "foo"), 2},
{2, unixTime(5), unixTime(20), 45, labels.FromStrings("cluster", "test", "app", "bar"), 2},
{3, unixTime(25), unixTime(30), 35, labels.FromStrings("cluster", "test", "app", "baz"), 2},
}
dec := buildStreamsDecoder(t, 1) // Many pages
@ -45,7 +45,7 @@ func TestRowReader(t *testing.T) {
func TestRowReader_AddLabelMatcher(t *testing.T) {
expect := []streams.Stream{
{2, unixTime(5), unixTime(20), 45, labels.FromStrings("cluster", "test", "app", "bar"), 2, nil},
{2, unixTime(5), unixTime(20), 45, labels.FromStrings("cluster", "test", "app", "bar"), 2},
}
dec := buildStreamsDecoder(t, 1) // Many pages
@ -59,8 +59,8 @@ func TestRowReader_AddLabelMatcher(t *testing.T) {
func TestRowReader_AddLabelFilter(t *testing.T) {
expect := []streams.Stream{
{2, unixTime(5), unixTime(20), 45, labels.FromStrings("cluster", "test", "app", "bar"), 2, nil},
{3, unixTime(25), unixTime(30), 35, labels.FromStrings("cluster", "test", "app", "baz"), 2, nil},
{2, unixTime(5), unixTime(20), 45, labels.FromStrings("cluster", "test", "app", "bar"), 2},
{3, unixTime(25), unixTime(30), 35, labels.FromStrings("cluster", "test", "app", "baz"), 2},
}
dec := buildStreamsDecoder(t, 1) // Many pages

@ -526,7 +526,7 @@ func (s *dataobjScan) appendToBuilder(builder array.Builder, field *arrow.Field,
}
case types.ColumnTypeMetadata.String():
val := getMetadataValue(record.Metadata, field.Name)
val := record.Metadata.Get(field.Name)
if val == "" {
builder.(*array.StringBuilder).AppendNull()
} else {
@ -551,15 +551,6 @@ func (s *dataobjScan) appendToBuilder(builder array.Builder, field *arrow.Field,
}
}
func getMetadataValue(md []logs.RecordMetadata, name string) string {
for _, m := range md {
if m.Name == name {
return string(m.Value)
}
}
return ""
}
// Value returns the current [arrow.Record] retrieved by the previous call to
// [dataobjScan.Read], or an error if the record cannot be read.
func (s *dataobjScan) Value() (arrow.Record, error) { return s.state.batch, s.state.err }

Loading…
Cancel
Save