diff --git a/pkg/dataobj/sections/logs/reader.go b/pkg/dataobj/sections/logs/reader.go index 5c76e3f910..0f7ce549a8 100644 --- a/pkg/dataobj/sections/logs/reader.go +++ b/pkg/dataobj/sections/logs/reader.go @@ -166,6 +166,12 @@ func (r *Reader) Read(ctx context.Context, batchSize int) (arrow.RecordBatch, er builder := array.NewRecordBuilder(r.opts.Allocator, r.schema) n, readErr := r.inner.Read(ctx, r.buf) + + // Preallocate all builders + for _, columnBuilder := range builder.Fields() { + columnBuilder.Reserve(n) + } + for rowIndex := range n { row := r.buf[rowIndex] diff --git a/pkg/engine/internal/executor/aggregator.go b/pkg/engine/internal/executor/aggregator.go index cc59363c30..83e1211e4c 100644 --- a/pkg/engine/internal/executor/aggregator.go +++ b/pkg/engine/internal/executor/aggregator.go @@ -36,17 +36,19 @@ const ( // aggregator is used to aggregate sample values by a set of grouping keys for each point in time. type aggregator struct { - points map[time.Time]map[uint64]*groupState // holds the groupState for each point in time series - digest *xxhash.Digest // used to compute key for each group - operation aggregationOperation // aggregation type - labels []arrow.Field // combined list of all label fields for all sample values + points map[time.Time]map[uint64]*groupState // holds the groupState for each point in time series + digest *xxhash.Digest // used to compute key for each group + operation aggregationOperation // aggregation type + labels []arrow.Field // combined list of all label fields for all sample values + clonedLabelValues map[string]string // cache of cloned strings to reduce allocations for repeated values } // newAggregator creates a new aggregator with the specified grouping. func newAggregator(pointsSizeHint int, operation aggregationOperation) *aggregator { a := aggregator{ - digest: xxhash.New(), - operation: operation, + digest: xxhash.New(), + operation: operation, + clonedLabelValues: make(map[string]string), } if pointsSizeHint > 0 { @@ -135,7 +137,12 @@ func (a *aggregator) Add(ts time.Time, value float64, labels []arrow.Field, labe // copy the value as this is backed by the arrow array data buffer. // We could retain the record to avoid this copy, but that would hold // all other columns in memory for as long as the query is evaluated. - labelValuesCopy[i] = strings.Clone(v) + cloned, ok := a.clonedLabelValues[v] + if !ok { + cloned = strings.Clone(v) + a.clonedLabelValues[v] = cloned + } + labelValuesCopy[i] = cloned } // TODO: add limits on number of groups @@ -167,7 +174,20 @@ func (a *aggregator) BuildRecord() (arrow.RecordBatch, error) { rb := array.NewRecordBuilder(memory.NewGoAllocator(), schema) // emit aggregated results in sorted order of timestamp - for _, ts := range a.getSortedTimestamps() { + sortedTimestamps := a.getSortedTimestamps() + + // preallocate all builders to the total amount of rows + total := 0 + for _, ts := range sortedTimestamps { + total += len(a.points[ts]) + } + rb.Field(0).Reserve(total) + rb.Field(1).Reserve(total) + for i := range a.labels { + rb.Field(2 + i).Reserve(total) + } + + for _, ts := range sortedTimestamps { tsValue, _ := arrow.TimestampFromTime(ts, arrow.Nanosecond) for _, entry := range a.points[ts] { diff --git a/pkg/engine/internal/executor/column.go b/pkg/engine/internal/executor/column.go index b3b66381ac..577f490686 100644 --- a/pkg/engine/internal/executor/column.go +++ b/pkg/engine/internal/executor/column.go @@ -13,12 +13,11 @@ import ( func NewScalar(value types.Literal, rows int) arrow.Array { builder := array.NewBuilder(memory.DefaultAllocator, value.Type().ArrowType()) + builder.Reserve(rows) switch builder := builder.(type) { case *array.NullBuilder: - for range rows { - builder.AppendNull() - } + builder.AppendNulls(rows) case *array.BooleanBuilder: value := value.Any().(bool) for range rows { @@ -85,6 +84,7 @@ func NewCoalesce(columns []*columnWithType) arrow.Array { // Only string columns are supported builder := array.NewBuilder(memory.DefaultAllocator, columns[0].col.DataType()).(*array.StringBuilder) + builder.Reserve(columns[0].col.Len()) for i := 0; i < columns[0].col.Len(); i++ { val, isNull := firstNotNullValue(i, columns) if isNull { diff --git a/pkg/engine/internal/executor/functions.go b/pkg/engine/internal/executor/functions.go index 7b666496fd..a4341a1cfb 100644 --- a/pkg/engine/internal/executor/functions.go +++ b/pkg/engine/internal/executor/functions.go @@ -250,6 +250,8 @@ func (f *regexpFunction) Evaluate(lhs arrow.Array, rhs arrow.Array, _, rhsIsScal return builder.NewArray(), nil } + builder.Reserve(lhsArr.Len()) + var ( re *regexp.Regexp err error @@ -306,6 +308,7 @@ func (f *genericBoolFunction[E, T]) Evaluate(lhs arrow.Array, rhs arrow.Array, _ } builder := array.NewBooleanBuilder(memory.DefaultAllocator) + builder.Reserve(lhsArr.Len()) for i := range lhsArr.Len() { if lhsArr.IsNull(i) || rhsArr.IsNull(i) { @@ -345,6 +348,7 @@ func (f *genericFloat64Function[E, T]) Evaluate(lhs arrow.Array, rhs arrow.Array } builder := array.NewFloat64Builder(memory.DefaultAllocator) + builder.Reserve(lhsArr.Len()) for i := range lhsArr.Len() { if lhsArr.IsNull(i) || rhsArr.IsNull(i) { diff --git a/pkg/engine/internal/executor/range_aggregation.go b/pkg/engine/internal/executor/range_aggregation.go index 09cdae3e91..2e34ad0618 100644 --- a/pkg/engine/internal/executor/range_aggregation.go +++ b/pkg/engine/internal/executor/range_aggregation.go @@ -239,6 +239,8 @@ func (r *rangeAggregationPipeline) read(ctx context.Context) (arrow.RecordBatch, valArr = valVec.(*array.Float64) } + labelsCache := newLabelsCache() + for row := range int(record.NumRows()) { var value float64 if r.opts.operation != types.RangeAggregationTypeCount { @@ -254,15 +256,7 @@ func (r *rangeAggregationPipeline) read(ctx context.Context) (arrow.RecordBatch, continue // out of range, skip this row } - labelValues := make([]string, 0, len(arrays)) - labels := make([]arrow.Field, 0, len(arrays)) - for i, arr := range arrays { - val := arr.Value(row) - if val != "" { - labelValues = append(labelValues, val) - labels = append(labels, fields[i]) - } - } + labels, labelValues := labelsCache.getLabels(arrays, fields, row) for _, w := range windows { r.aggregator.Add(w.end, value, labels, labelValues) @@ -344,7 +338,7 @@ func (f *matcherFactory) createExactMatcher(windows []window) timestampMatchingW if len(windows) == 0 { return nil } - return []window{windows[0]} + return windows[0:1] } } @@ -367,7 +361,7 @@ func (f *matcherFactory) createAlignedMatcher(windows []window) timestampMatchin tNs := t.UnixNano() // valid timestamps for window i: t > startNs + (i-1) * intervalNs && t <= startNs + i * intervalNs windowIndex := (tNs - startNs + stepNs - 1) / stepNs // subtract 1ns because we are calculating 0-based indexes - return []window{windows[windowIndex]} + return windows[windowIndex : windowIndex+1] } } @@ -391,11 +385,10 @@ func (f *matcherFactory) createGappedMatcher(windows []window) timestampMatching tNs := t.UnixNano() // For gapped windows, window i covers: (start + i*step - interval, start + i*step] windowIndex := (tNs - startNs + stepNs - 1) / stepNs // subtract 1ns because we are calculating 0-based indexes - matchingWindow := windows[windowIndex] // Verify the timestamp is within the window (not in a gap) - if tNs > matchingWindow.start.UnixNano() { - return []window{matchingWindow} + if tNs > windows[windowIndex].start.UnixNano() { + return windows[windowIndex : windowIndex+1] } return nil // timestamp is in a gap diff --git a/pkg/engine/internal/executor/stream_injector.go b/pkg/engine/internal/executor/stream_injector.go index 81a49590e5..2b73406947 100644 --- a/pkg/engine/internal/executor/stream_injector.go +++ b/pkg/engine/internal/executor/stream_injector.go @@ -61,19 +61,18 @@ func (si *streamInjector) Inject(ctx context.Context, in arrow.RecordBatch) (arr ) getColumn := func(name string) *labelColumn { - ident := semconv.NewIdentifier(name, types.ColumnTypeLabel, types.Loki.String) - - if col, ok := labelLookup[ident.FQN()]; ok { + if col, ok := labelLookup[name]; ok { return col } + ident := semconv.NewIdentifier(name, types.ColumnTypeLabel, types.Loki.String) col := &labelColumn{ Field: semconv.FieldFromIdent(ident, true), // labels are nullable Builder: array.NewStringBuilder(memory.DefaultAllocator), } labels = append(labels, col) - labelLookup[ident.FQN()] = col + labelLookup[name] = col return col } @@ -83,12 +82,12 @@ func (si *streamInjector) Inject(ctx context.Context, in arrow.RecordBatch) (arr // TODO(rfratto): this flips the processing of stream IDs into row-based // processing. It may be more efficient to vectorize this by building each // label column all at once. - it, err := si.view.Labels(ctx, findID) + lbs, err := si.view.Labels(ctx, findID) if err != nil { return nil, err } - for label := range it { + for _, label := range lbs { col := getColumn(label.Name) // Backfill any missing NULLs in the column if needed. diff --git a/pkg/engine/internal/executor/streams_view.go b/pkg/engine/internal/executor/streams_view.go index 53fde30cdf..879c740e4d 100644 --- a/pkg/engine/internal/executor/streams_view.go +++ b/pkg/engine/internal/executor/streams_view.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "io" - "iter" "github.com/apache/arrow-go/v18/arrow" "github.com/apache/arrow-go/v18/arrow/array" @@ -31,6 +30,7 @@ type streamsView struct { initialized bool streams arrow.Table idRowMapping map[int64]int // Mapping of stream ID to absolute row index in the streams table. + streamLabels map[int64][]labels.Label } type streamsViewOptions struct { @@ -80,6 +80,7 @@ func newStreamsView(sec *streams.Section, opts *streamsViewOptions) *streamsView idColumn: streamsIDColumn, searchColumns: append([]*streams.Column{streamsIDColumn}, cols...), batchSize: opts.BatchSize, + streamLabels: make(map[int64][]labels.Label), } } @@ -88,56 +89,63 @@ func (v *streamsView) NumLabels() int { return len(v.searchColumns) - 1 } -// Labels iterates over all of the non-null labels of a stream with the given +// Labels returns all of the non-null labels of a stream with the given // id. If [streamsViewOptions] included a subset of labels, only those labels // are returned. -func (v *streamsView) Labels(ctx context.Context, id int64) (iter.Seq[labels.Label], error) { +func (v *streamsView) Labels(ctx context.Context, id int64) ([]labels.Label, error) { if err := v.init(ctx); err != nil { return nil, err } + lbs, ok := v.streamLabels[id] + if ok { + return lbs, nil + } + rowColumnIndex, ok := v.idRowMapping[id] if !ok { return nil, fmt.Errorf("stream ID %d not found in section or not included in filter", id) } - return func(yield func(labels.Label) bool) { - for colIndex := range int(v.streams.NumCols()) { - // Skip any column which isn't a label column. - // - // This is safe because [streams.Reader] guarantees that the order of - // columns in the Arrow schema matches the order of [streams.Column]s - // provided to the reader. - if v.searchColumns[colIndex].Type != streams.ColumnTypeLabel { - continue - } + lbs = make([]labels.Label, 0) - // Find the array where our row is. While columnChunkedRow can return nil - // if the row isn't found, we know we're giving it a valid row index so - // we can bypass the check here. - arr, rowArrayIndex := columnChunkedRow(v.streams.Column(colIndex), rowColumnIndex) - if arr.IsNull(rowArrayIndex) { - continue - } + for colIndex := range int(v.streams.NumCols()) { + // Skip any column which isn't a label column. + // + // This is safe because [streams.Reader] guarantees that the order of + // columns in the Arrow schema matches the order of [streams.Column]s + // provided to the reader. + if v.searchColumns[colIndex].Type != streams.ColumnTypeLabel { + continue + } - label := labels.Label{ - Name: v.searchColumns[colIndex].Name, - } + // Find the array where our row is. While columnChunkedRow can return nil + // if the row isn't found, we know we're giving it a valid row index so + // we can bypass the check here. + arr, rowArrayIndex := columnChunkedRow(v.streams.Column(colIndex), rowColumnIndex) + if arr.IsNull(rowArrayIndex) { + continue + } - switch colValues := arr.(type) { - case *array.String: - label.Value = colValues.Value(rowArrayIndex) - case *array.Binary: - label.Value = string(colValues.Value(rowArrayIndex)) - default: - panic(fmt.Sprintf("unexpected column type %T for labels", colValues)) - } + label := labels.Label{ + Name: v.searchColumns[colIndex].Name, + } - if !yield(label) { - return - } + switch colValues := arr.(type) { + case *array.String: + label.Value = colValues.Value(rowArrayIndex) + case *array.Binary: + label.Value = string(colValues.Value(rowArrayIndex)) + default: + panic(fmt.Sprintf("unexpected column type %T for labels", colValues)) } - }, nil + + lbs = append(lbs, label) + } + + v.streamLabels[id] = lbs + + return lbs, nil } func (v *streamsView) init(ctx context.Context) (err error) { diff --git a/pkg/engine/internal/executor/streams_view_test.go b/pkg/engine/internal/executor/streams_view_test.go index 21766109dc..ee8a3d88ab 100644 --- a/pkg/engine/internal/executor/streams_view_test.go +++ b/pkg/engine/internal/executor/streams_view_test.go @@ -1,7 +1,6 @@ package executor import ( - "iter" "slices" "testing" "time" @@ -30,9 +29,9 @@ func Test_streamsView(t *testing.T) { var actual []labels.Labels for id := 1; id <= 3; id++ { - it, err := view.Labels(t.Context(), int64(id)) - require.NoError(t, err, "failed to get labels iterator") - actual = append(actual, collectLabels(it)) + lbs, err := view.Labels(t.Context(), int64(id)) + require.NoError(t, err, "failed to get labels") + actual = append(actual, labels.New(lbs...)) } require.Equal(t, inputStreams, actual, "expected all streams to be returned") @@ -46,9 +45,9 @@ func Test_streamsView(t *testing.T) { var actual []labels.Labels - it, err := view.Labels(t.Context(), int64(2)) - require.NoError(t, err, "failed to get labels iterator") - actual = append(actual, collectLabels(it)) + lbs, err := view.Labels(t.Context(), int64(2)) + require.NoError(t, err, "failed to get labels") + actual = append(actual, labels.New(lbs...)) expected := []labels.Labels{ inputStreams[1], // Stream ID 2 @@ -65,9 +64,9 @@ func Test_streamsView(t *testing.T) { var actual []labels.Labels for _, id := range []int{2, 3} { - it, err := view.Labels(t.Context(), int64(id)) - require.NoError(t, err, "failed to get labels iterator") - actual = append(actual, collectLabels(it)) + lbs, err := view.Labels(t.Context(), int64(id)) + require.NoError(t, err, "failed to get labels") + actual = append(actual, labels.New(lbs...)) } expected := []labels.Labels{ @@ -96,9 +95,9 @@ func Test_streamsView(t *testing.T) { var actual []labels.Labels for id := 1; id <= 3; id++ { - it, err := view.Labels(t.Context(), int64(id)) - require.NoError(t, err, "failed to get labels iterator") - actual = append(actual, collectLabels(it)) + lbs, err := view.Labels(t.Context(), int64(id)) + require.NoError(t, err, "failed to get labels") + actual = append(actual, labels.New(lbs...)) } require.Equal(t, expect, actual, "expected all streams to be returned with the proper labels") @@ -129,9 +128,9 @@ func Test_streamsView(t *testing.T) { var actual []labels.Labels for id := 1; id <= 3; id++ { - it, err := view.Labels(t.Context(), int64(id)) - require.NoError(t, err, "failed to get labels iterator") - actual = append(actual, collectLabels(it)) + lbs, err := view.Labels(t.Context(), int64(id)) + require.NoError(t, err, "failed to get labels") + actual = append(actual, labels.New(lbs...)) } require.Equal(t, expect, actual, "expected all streams to be returned with the proper labels") @@ -155,11 +154,3 @@ func buildStreamsSection(t *testing.T, streamLabels []labels.Labels) *streams.Se require.NoError(t, err, "failed to open streams section") return sec } - -func collectLabels(it iter.Seq[labels.Label]) labels.Labels { - var ls []labels.Label - for l := range it { - ls = append(ls, l) - } - return labels.New(ls...) -} diff --git a/pkg/engine/internal/executor/util.go b/pkg/engine/internal/executor/util.go index cc8a174ebc..4788bc554a 100644 --- a/pkg/engine/internal/executor/util.go +++ b/pkg/engine/internal/executor/util.go @@ -4,6 +4,8 @@ import ( "fmt" "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/cespare/xxhash/v2" "github.com/grafana/loki/v3/pkg/engine/internal/semconv" ) @@ -27,3 +29,61 @@ func columnForFQN(fqn string, batch arrow.RecordBatch) (arrow.Array, int, error) return batch.Column(indices[0]), indices[0], nil } + +// labelsCache returns labels and label values for a given row in range and vector aggregators, but cache them in order +// to reduce object allocations for repeated label sets. It first scans the row for non-empty labels and computes xxhash. +// In case of a cache miss it scans the row again and allocates arrays for label names and label values. +type labelsCache struct { + digest *xxhash.Digest + cache map[uint64]struct { + labels []arrow.Field + labelValues []string + } +} + +func newLabelsCache() *labelsCache { + return &labelsCache{ + digest: xxhash.New(), + cache: make(map[uint64]struct { + labels []arrow.Field + labelValues []string + }), + } +} + +func (c *labelsCache) getLabels(arrays []*array.String, fields []arrow.Field, row int) ([]arrow.Field, []string) { + c.digest.Reset() + + for i, arr := range arrays { + val := arr.Value(row) + if val != "" { + _, _ = c.digest.Write([]byte{0}) + _, _ = c.digest.WriteString(fields[i].Name) + _, _ = c.digest.Write([]byte("=")) + _, _ = c.digest.WriteString(val) + + } + } + key := c.digest.Sum64() + + l, ok := c.cache[key] + if !ok { + l = struct { + labels []arrow.Field + labelValues []string + }{ + labels: make([]arrow.Field, 0), + labelValues: make([]string, 0), + } + for i, arr := range arrays { + val := arr.Value(row) + if val != "" { + l.labelValues = append(l.labelValues, val) + l.labels = append(l.labels, fields[i]) + } + } + c.cache[key] = l + } + + return l.labels, l.labelValues +} diff --git a/pkg/engine/internal/executor/vector_aggregate.go b/pkg/engine/internal/executor/vector_aggregate.go index 585112db35..a9cc7ec411 100644 --- a/pkg/engine/internal/executor/vector_aggregate.go +++ b/pkg/engine/internal/executor/vector_aggregate.go @@ -179,20 +179,14 @@ func (v *vectorAggregationPipeline) read(ctx context.Context) (arrow.RecordBatch v.aggregator.AddLabels(fields) + labelsCache := newLabelsCache() + for row := range int(record.NumRows()) { if valueArr.IsNull(row) { continue } - labelValues := make([]string, 0, len(arrays)) - labels := make([]arrow.Field, 0, len(arrays)) - for i, arr := range arrays { - val := arr.Value(row) - if val != "" { - labelValues = append(labelValues, val) - labels = append(labels, fields[i]) - } - } + labels, labelValues := labelsCache.getLabels(arrays, fields, row) v.aggregator.Add(tsCol.Value(row).ToTime(arrow.Nanosecond), valueArr.Value(row), labels, labelValues) } diff --git a/pkg/util/sample.go b/pkg/util/sample.go index 1ede6a37cd..01b226254f 100644 --- a/pkg/util/sample.go +++ b/pkg/util/sample.go @@ -5,7 +5,7 @@ import ( ) func UniqueSampleHash(lblString string, line []byte) uint64 { - uniqueID := make([]byte, 0, len(lblString)+len(line)) + uniqueID := make([]byte, 0, len(lblString)+len(line)+1) uniqueID = append(uniqueID, lblString...) uniqueID = append(uniqueID, ':') uniqueID = append(uniqueID, line...)