chore(engine): Implement execution pipeline for the limit operator (#17264)

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
pull/17311/head^2
Christian Haudum 1 year ago committed by GitHub
parent cff0df63b3
commit e828c48702
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 50
      pkg/engine/executor/executor.go
  2. 7
      pkg/engine/executor/executor_test.go
  3. 1
      pkg/engine/executor/limit.go
  4. 65
      pkg/engine/executor/limit_test.go
  5. 77
      pkg/engine/executor/util_test.go

@ -5,6 +5,8 @@ import (
"errors"
"fmt"
"github.com/apache/arrow-go/v18/arrow"
"github.com/grafana/loki/v3/pkg/engine/planner/physical"
)
@ -68,7 +70,7 @@ func (c *Context) executeSortMerge(_ context.Context, _ *physical.SortMerge, inp
return errorPipeline(errNotImplemented)
}
func (c *Context) executeLimit(_ context.Context, _ *physical.Limit, inputs []Pipeline) Pipeline {
func (c *Context) executeLimit(_ context.Context, limit *physical.Limit, inputs []Pipeline) Pipeline {
if len(inputs) == 0 {
return emptyPipeline()
}
@ -77,7 +79,51 @@ func (c *Context) executeLimit(_ context.Context, _ *physical.Limit, inputs []Pi
return errorPipeline(fmt.Errorf("limit expects exactly one input, got %d", len(inputs)))
}
return errorPipeline(errNotImplemented)
// We gradually reduce offsetRemaining and limitRemaining as we process more records, as the
// offsetRemaining and limitRemaining may cross record boundaries.
var (
offsetRemaining = int64(limit.Skip)
limitRemaining = int64(limit.Fetch)
)
return newGenericPipeline(Local, func(inputs []Pipeline) state {
var length int64
var start, end int64
var batch arrow.Record
// We skip yielding zero-length batches while offsetRemainig > 0
for length == 0 {
// Stop once we reached the limit
if limitRemaining <= 0 {
return Exhausted
}
// Pull the next item from downstream
input := inputs[0]
err := input.Read()
if err != nil {
return newState(input.Value())
}
batch, _ = input.Value()
// We want to slice batch so it only contains the rows we're looking for
// accounting for both the limit and offset.
// We constrain the start and end to be within the bounds of the record.
start = min(offsetRemaining, batch.NumRows())
end = min(start+limitRemaining, batch.NumRows())
length = end - start
offsetRemaining -= start
limitRemaining -= length
}
if length <= 0 && offsetRemaining <= 0 {
return Exhausted
}
rec := batch.NewSlice(start, end)
return successState(rec)
}, inputs...)
}
func (c *Context) executeFilter(_ context.Context, _ *physical.Filter, inputs []Pipeline) Pipeline {

@ -57,13 +57,6 @@ func TestExecutor_Limit(t *testing.T) {
require.ErrorContains(t, err, EOF.Error())
})
t.Run("is not implemented", func(t *testing.T) {
c := &Context{}
pipeline := c.executeLimit(context.TODO(), &physical.Limit{}, []Pipeline{emptyPipeline()})
err := pipeline.Read()
require.ErrorContains(t, err, errNotImplemented.Error())
})
t.Run("multiple inputs result in error", func(t *testing.T) {
c := &Context{}
pipeline := c.executeLimit(context.TODO(), &physical.Limit{}, []Pipeline{emptyPipeline(), emptyPipeline()})

@ -0,0 +1 @@
package executor

@ -0,0 +1,65 @@
package executor
import (
"context"
"testing"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/engine/planner/physical"
)
func TestLimit(t *testing.T) {
for _, tt := range []struct {
name string
offset uint32
limit uint32
batchSize int64
expectedBatches int64
expectedRows int64
}{
{
name: "without offset",
offset: 0,
limit: 5,
batchSize: 3,
expectedBatches: 2,
expectedRows: 5,
},
{
name: "with offset",
offset: 3,
limit: 5,
batchSize: 4,
expectedBatches: 2,
expectedRows: 5,
},
{
name: "with offset greater than batch size",
offset: 5,
limit: 6,
batchSize: 2,
expectedBatches: 4,
expectedRows: 6,
},
} {
t.Run(tt.name, func(t *testing.T) {
c := &Context{
batchSize: tt.batchSize,
}
limit := &physical.Limit{
Skip: tt.offset,
Fetch: tt.limit,
}
inputs := []Pipeline{
incrementingIntPipeline.Pipeline(tt.batchSize, 1000),
}
pipeline := c.executeLimit(context.Background(), limit, inputs)
batches, rows := collect(t, pipeline)
require.Equal(t, tt.expectedBatches, batches)
require.Equal(t, tt.expectedRows, rows)
})
}
}

@ -0,0 +1,77 @@
package executor
import (
"testing"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"
)
var (
incrementingIntPipeline = newRecordGenerator(
arrow.NewSchema([]arrow.Field{
{Name: "id", Type: arrow.PrimitiveTypes.Int64},
}, nil),
func(offset, sz int64, schema *arrow.Schema) arrow.Record {
builder := array.NewInt64Builder(memory.DefaultAllocator)
defer builder.Release()
for i := int64(0); i < sz; i++ {
builder.Append(offset + i)
}
data := builder.NewArray()
defer data.Release()
columns := []arrow.Array{data}
return array.NewRecord(schema, columns, sz)
},
)
)
type recordGenerator struct {
schema *arrow.Schema
batch func(offset, sz int64, schema *arrow.Schema) arrow.Record
}
func newRecordGenerator(schema *arrow.Schema, batch func(offset, sz int64, schema *arrow.Schema) arrow.Record) *recordGenerator {
return &recordGenerator{
schema: schema,
batch: batch,
}
}
func (p *recordGenerator) Pipeline(batchSize int64, rows int64) Pipeline {
var pos int64
return newGenericPipeline(
Local,
func(_ []Pipeline) state {
if pos >= rows {
return Exhausted
}
batch := p.batch(pos, batchSize, p.schema)
pos += batch.NumRows()
return successState(batch)
},
nil,
)
}
// collect reads all data from the pipeline until it is exhausted or returns an error.
func collect(t *testing.T, pipeline Pipeline) (batches int64, rows int64) {
for {
err := pipeline.Read()
if err == EOF {
break
}
if err != nil {
t.Fatalf("did not expect error, got %s", err.Error())
}
batch, _ := pipeline.Value()
t.Log("batch", batch, "err", err)
batches++
rows += batch.NumRows()
}
return batches, rows
}
Loading…
Cancel
Save