Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
loki/pkg/engine/internal/executor/util_test.go

170 lines
4.5 KiB

package executor
import (
"context"
"errors"
"math"
"testing"
chore(engine): Implement execution pipeline for SortMerge operator (#17406) This PR contains an implementation of the k-way merge operation without using a heap, like @rfratto described [here](https://github.com/grafana/loki/pull/17280). The SortMerge is implemented only using slices: * Maintain the following invariant: * For each input pipeline, we store the next record to process. (this already exists as `HeapSortMerge.batches`) * Additionally for each record, track the starting slice offset (which resets to zero whenever a new record is loaded in). * Iteration stops when all input pipelines have been exhausted (no change from how this is now). * To get the next record: * Iterate through each record, looking at the value from their starting slice offset. * Track the top _two_ winners (e.g., the record whose next value is the smallest and the record whose next value is the next smallest). * Find the largest offset in the starting record whose value is still less than the value of the runner-up record from the previous step. * Return the slice of that record using the two offsets, and update the stored offset of the returned record for the next call to `Read`. This approach, like the one with heap, still requires to concatenate (coalesce) the single row records - which is not implemented in this PR yet. On that note, single row records are the worst case scenario with this implementation, not necessarily the regular case. **Update:** After an offline discussion, @owen-d and I agreed on ignoring the worst-case scenario of single-row records for now. Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
8 months ago
"time"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/grafana/loki/v3/pkg/engine/internal/semconv"
"github.com/grafana/loki/v3/pkg/util/arrowtest"
)
var (
incrementingIntPipeline = newRecordGenerator(
arrow.NewSchema([]arrow.Field{
semconv.FieldFromFQN("int64.builtin.id", false),
}, nil),
chore(engine): Implement execution pipeline for SortMerge operator (#17406) This PR contains an implementation of the k-way merge operation without using a heap, like @rfratto described [here](https://github.com/grafana/loki/pull/17280). The SortMerge is implemented only using slices: * Maintain the following invariant: * For each input pipeline, we store the next record to process. (this already exists as `HeapSortMerge.batches`) * Additionally for each record, track the starting slice offset (which resets to zero whenever a new record is loaded in). * Iteration stops when all input pipelines have been exhausted (no change from how this is now). * To get the next record: * Iterate through each record, looking at the value from their starting slice offset. * Track the top _two_ winners (e.g., the record whose next value is the smallest and the record whose next value is the next smallest). * Find the largest offset in the starting record whose value is still less than the value of the runner-up record from the previous step. * Return the slice of that record using the two offsets, and update the stored offset of the returned record for the next call to `Read`. This approach, like the one with heap, still requires to concatenate (coalesce) the single row records - which is not implemented in this PR yet. On that note, single row records are the worst case scenario with this implementation, not necessarily the regular case. **Update:** After an offline discussion, @owen-d and I agreed on ignoring the worst-case scenario of single-row records for now. Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
8 months ago
func(offset, maxRows, batchSize int64, schema *arrow.Schema) arrow.RecordBatch {
builder := array.NewInt64Builder(memory.DefaultAllocator)
rows := int64(0)
for ; rows < batchSize && offset+rows < maxRows; rows++ {
builder.Append(offset + rows)
}
data := builder.NewArray()
columns := []arrow.Array{data}
return array.NewRecordBatch(schema, columns, rows)
},
)
)
chore(engine): Implement execution pipeline for SortMerge operator (#17406) This PR contains an implementation of the k-way merge operation without using a heap, like @rfratto described [here](https://github.com/grafana/loki/pull/17280). The SortMerge is implemented only using slices: * Maintain the following invariant: * For each input pipeline, we store the next record to process. (this already exists as `HeapSortMerge.batches`) * Additionally for each record, track the starting slice offset (which resets to zero whenever a new record is loaded in). * Iteration stops when all input pipelines have been exhausted (no change from how this is now). * To get the next record: * Iterate through each record, looking at the value from their starting slice offset. * Track the top _two_ winners (e.g., the record whose next value is the smallest and the record whose next value is the next smallest). * Find the largest offset in the starting record whose value is still less than the value of the runner-up record from the previous step. * Return the slice of that record using the two offsets, and update the stored offset of the returned record for the next call to `Read`. This approach, like the one with heap, still requires to concatenate (coalesce) the single row records - which is not implemented in this PR yet. On that note, single row records are the worst case scenario with this implementation, not necessarily the regular case. **Update:** After an offline discussion, @owen-d and I agreed on ignoring the worst-case scenario of single-row records for now. Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
8 months ago
func ascendingTimestampPipeline(start time.Time) *recordGenerator {
return timestampPipeline(start, ascending)
}
func descendingTimestampPipeline(start time.Time) *recordGenerator {
return timestampPipeline(start, descending)
}
const (
ascending = time.Duration(1)
descending = time.Duration(-1)
)
func timestampPipeline(start time.Time, order time.Duration) *recordGenerator {
return newRecordGenerator(
arrow.NewSchema([]arrow.Field{
semconv.FieldFromFQN("int64.builtin.id", false),
semconv.FieldFromFQN("timestamp_ns.builtin.timestamp", false),
chore(engine): Implement execution pipeline for SortMerge operator (#17406) This PR contains an implementation of the k-way merge operation without using a heap, like @rfratto described [here](https://github.com/grafana/loki/pull/17280). The SortMerge is implemented only using slices: * Maintain the following invariant: * For each input pipeline, we store the next record to process. (this already exists as `HeapSortMerge.batches`) * Additionally for each record, track the starting slice offset (which resets to zero whenever a new record is loaded in). * Iteration stops when all input pipelines have been exhausted (no change from how this is now). * To get the next record: * Iterate through each record, looking at the value from their starting slice offset. * Track the top _two_ winners (e.g., the record whose next value is the smallest and the record whose next value is the next smallest). * Find the largest offset in the starting record whose value is still less than the value of the runner-up record from the previous step. * Return the slice of that record using the two offsets, and update the stored offset of the returned record for the next call to `Read`. This approach, like the one with heap, still requires to concatenate (coalesce) the single row records - which is not implemented in this PR yet. On that note, single row records are the worst case scenario with this implementation, not necessarily the regular case. **Update:** After an offline discussion, @owen-d and I agreed on ignoring the worst-case scenario of single-row records for now. Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
8 months ago
}, nil),
func(offset, maxRows, batchSize int64, schema *arrow.Schema) arrow.RecordBatch {
chore(engine): Implement execution pipeline for SortMerge operator (#17406) This PR contains an implementation of the k-way merge operation without using a heap, like @rfratto described [here](https://github.com/grafana/loki/pull/17280). The SortMerge is implemented only using slices: * Maintain the following invariant: * For each input pipeline, we store the next record to process. (this already exists as `HeapSortMerge.batches`) * Additionally for each record, track the starting slice offset (which resets to zero whenever a new record is loaded in). * Iteration stops when all input pipelines have been exhausted (no change from how this is now). * To get the next record: * Iterate through each record, looking at the value from their starting slice offset. * Track the top _two_ winners (e.g., the record whose next value is the smallest and the record whose next value is the next smallest). * Find the largest offset in the starting record whose value is still less than the value of the runner-up record from the previous step. * Return the slice of that record using the two offsets, and update the stored offset of the returned record for the next call to `Read`. This approach, like the one with heap, still requires to concatenate (coalesce) the single row records - which is not implemented in this PR yet. On that note, single row records are the worst case scenario with this implementation, not necessarily the regular case. **Update:** After an offline discussion, @owen-d and I agreed on ignoring the worst-case scenario of single-row records for now. Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
8 months ago
idColBuilder := array.NewInt64Builder(memory.DefaultAllocator)
tsColBuilder := array.NewTimestampBuilder(memory.DefaultAllocator, arrow.FixedWidthTypes.Timestamp_ns.(*arrow.TimestampType))
chore(engine): Implement execution pipeline for SortMerge operator (#17406) This PR contains an implementation of the k-way merge operation without using a heap, like @rfratto described [here](https://github.com/grafana/loki/pull/17280). The SortMerge is implemented only using slices: * Maintain the following invariant: * For each input pipeline, we store the next record to process. (this already exists as `HeapSortMerge.batches`) * Additionally for each record, track the starting slice offset (which resets to zero whenever a new record is loaded in). * Iteration stops when all input pipelines have been exhausted (no change from how this is now). * To get the next record: * Iterate through each record, looking at the value from their starting slice offset. * Track the top _two_ winners (e.g., the record whose next value is the smallest and the record whose next value is the next smallest). * Find the largest offset in the starting record whose value is still less than the value of the runner-up record from the previous step. * Return the slice of that record using the two offsets, and update the stored offset of the returned record for the next call to `Read`. This approach, like the one with heap, still requires to concatenate (coalesce) the single row records - which is not implemented in this PR yet. On that note, single row records are the worst case scenario with this implementation, not necessarily the regular case. **Update:** After an offline discussion, @owen-d and I agreed on ignoring the worst-case scenario of single-row records for now. Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
8 months ago
rows := int64(0)
for ; rows < batchSize && offset+rows < maxRows; rows++ {
idColBuilder.Append(offset + rows)
tsColBuilder.Append(arrow.Timestamp(start.Add(order * (time.Duration(offset)*time.Second + time.Duration(rows)*time.Millisecond)).UnixNano()))
chore(engine): Implement execution pipeline for SortMerge operator (#17406) This PR contains an implementation of the k-way merge operation without using a heap, like @rfratto described [here](https://github.com/grafana/loki/pull/17280). The SortMerge is implemented only using slices: * Maintain the following invariant: * For each input pipeline, we store the next record to process. (this already exists as `HeapSortMerge.batches`) * Additionally for each record, track the starting slice offset (which resets to zero whenever a new record is loaded in). * Iteration stops when all input pipelines have been exhausted (no change from how this is now). * To get the next record: * Iterate through each record, looking at the value from their starting slice offset. * Track the top _two_ winners (e.g., the record whose next value is the smallest and the record whose next value is the next smallest). * Find the largest offset in the starting record whose value is still less than the value of the runner-up record from the previous step. * Return the slice of that record using the two offsets, and update the stored offset of the returned record for the next call to `Read`. This approach, like the one with heap, still requires to concatenate (coalesce) the single row records - which is not implemented in this PR yet. On that note, single row records are the worst case scenario with this implementation, not necessarily the regular case. **Update:** After an offline discussion, @owen-d and I agreed on ignoring the worst-case scenario of single-row records for now. Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
8 months ago
}
idData := idColBuilder.NewArray()
tsData := tsColBuilder.NewArray()
columns := []arrow.Array{idData, tsData}
return array.NewRecordBatch(schema, columns, rows)
chore(engine): Implement execution pipeline for SortMerge operator (#17406) This PR contains an implementation of the k-way merge operation without using a heap, like @rfratto described [here](https://github.com/grafana/loki/pull/17280). The SortMerge is implemented only using slices: * Maintain the following invariant: * For each input pipeline, we store the next record to process. (this already exists as `HeapSortMerge.batches`) * Additionally for each record, track the starting slice offset (which resets to zero whenever a new record is loaded in). * Iteration stops when all input pipelines have been exhausted (no change from how this is now). * To get the next record: * Iterate through each record, looking at the value from their starting slice offset. * Track the top _two_ winners (e.g., the record whose next value is the smallest and the record whose next value is the next smallest). * Find the largest offset in the starting record whose value is still less than the value of the runner-up record from the previous step. * Return the slice of that record using the two offsets, and update the stored offset of the returned record for the next call to `Read`. This approach, like the one with heap, still requires to concatenate (coalesce) the single row records - which is not implemented in this PR yet. On that note, single row records are the worst case scenario with this implementation, not necessarily the regular case. **Update:** After an offline discussion, @owen-d and I agreed on ignoring the worst-case scenario of single-row records for now. Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
8 months ago
},
)
}
type batchFunc func(offset, maxRows, batchSize int64, schema *arrow.Schema) arrow.RecordBatch
type recordGenerator struct {
schema *arrow.Schema
batch batchFunc
}
func newRecordGenerator(schema *arrow.Schema, batch batchFunc) *recordGenerator {
return &recordGenerator{
schema: schema,
batch: batch,
}
}
func (p *recordGenerator) Pipeline(batchSize int64, rows int64) Pipeline {
var pos int64
return newGenericPipeline(
func(_ context.Context, _ []Pipeline) (arrow.RecordBatch, error) {
if pos >= rows {
return nil, EOF
}
batch := p.batch(pos, rows, batchSize, p.schema)
pos += batch.NumRows()
return batch, nil
},
nil,
)
}
// collect reads all data from the pipeline until it is exhausted or returns an error.
func collect(t *testing.T, pipeline Pipeline) (batches int64, rows int64) {
ctx := t.Context()
for {
batch, err := pipeline.Read(ctx)
if errors.Is(err, EOF) {
break
}
if err != nil {
t.Fatalf("did not expect error, got %s", err.Error())
}
t.Log("batch", batch, "err", err)
batches++
rows += batch.NumRows()
}
return batches, rows
}
// ArrowtestPipeline creates a [Pipeline] that emits test data from a sequence
// of [arrowtest.Rows].
type ArrowtestPipeline struct {
schema *arrow.Schema
rows []arrowtest.Rows
cur int
}
var _ Pipeline = (*ArrowtestPipeline)(nil)
// NewArrowtestPipeline creates a new ArrowtestPipeline which will emit each
// [arrowtest.Rows] as a record.
//
// If schema is defined, all rows will be emitted using that schema. If schema
// is nil, the schema is derived from each element in rows as it is emitted.
func NewArrowtestPipeline(schema *arrow.Schema, rows ...arrowtest.Rows) *ArrowtestPipeline {
return &ArrowtestPipeline{
schema: schema,
rows: rows,
}
}
// Read implements [Pipeline], converting the next [arrowtest.Rows] into a
// [arrow.RecordBatch] and storing it in the pipeline's state. The state can then be
// accessed via [ArrowtestPipeline.Value].
func (p *ArrowtestPipeline) Read(_ context.Context) (arrow.RecordBatch, error) {
if p.cur >= len(p.rows) {
return nil, EOF
}
rows := p.rows[p.cur]
schema := p.schema
if schema == nil {
schema = rows.Schema()
}
p.cur++
return rows.Record(memory.DefaultAllocator, schema), nil
}
// Close implements [Pipeline], immediately exhausting the pipeline.
func (p *ArrowtestPipeline) Close() { p.cur = math.MaxInt64 }