chore(engine/executor): reduce cost of executing DataObjScan plan node (#18679)

pull/18683/head
Robert Fratto 9 months ago committed by GitHub
parent a5c722cbed
commit 385d397404
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 11
      pkg/engine/executor/executor.go
  2. 54
      pkg/engine/executor/pipeline.go
  3. 9
      pkg/engine/executor/streams_view.go

@ -68,7 +68,16 @@ func (c *Context) execute(ctx context.Context, node physical.Node) Pipeline {
switch n := node.(type) {
case *physical.DataObjScan:
return tracePipeline("physical.DataObjScan", c.executeDataObjScan(ctx, n))
// DataObjScan reads from object storage to determine the full pipeline to
// construct, making it expensive to call during planning time.
//
// TODO(rfratto): find a way to remove the logic from executeDataObjScan
// which wraps the pipeline with a topk/limit without reintroducing
// planning cost for thousands of scan nodes.
return newLazyPipeline(func(ctx context.Context, _ []Pipeline) Pipeline {
return tracePipeline("physical.DataObjScan", c.executeDataObjScan(ctx, n))
}, inputs)
case *physical.SortMerge:
return tracePipeline("physical.SortMerge", c.executeSortMerge(ctx, n, inputs))
case *physical.Limit:

@ -283,3 +283,57 @@ func (p *tracedPipeline) Close() { p.inner.Close() }
func (p *tracedPipeline) Inputs() []Pipeline { return p.inner.Inputs() }
func (p *tracedPipeline) Transport() Transport { return Local }
type lazyPipeline struct {
ctor func(ctx context.Context, inputs []Pipeline) Pipeline
inputs []Pipeline
built Pipeline
}
// newLazyPipeline allows for defering construction of a [Pipeline] to query
// execution time instead of planning time. This is useful for pipelines which
// are expensive to construct, or have dependencies which are only available
// during execution.
//
// The ctor function will be invoked on the first call to [Pipeline.Read].
func newLazyPipeline(ctor func(ctx context.Context, inputs []Pipeline) Pipeline, inputs []Pipeline) *lazyPipeline {
return &lazyPipeline{
ctor: ctor,
inputs: inputs,
}
}
var _ Pipeline = (*lazyPipeline)(nil)
// Read reads the next value from the inner pipeline. If this is the first call
// to Read, the inner pipeline will be constructed using the provided context.
func (lp *lazyPipeline) Read(ctx context.Context) error {
if lp.built == nil {
lp.built = lp.ctor(ctx, lp.inputs)
}
return lp.built.Read(ctx)
}
// Value returns the current value from the lazily constructed pipeline. If the
// pipeline has not been constructed yet, it returns an error.
func (lp *lazyPipeline) Value() (arrow.Record, error) {
if lp.built == nil {
return nil, fmt.Errorf("lazyPipeline not built yet")
}
return lp.built.Value()
}
// Close closes the lazily constructed pipeline if it has been built.
func (lp *lazyPipeline) Close() {
if lp.built != nil {
lp.built.Close()
}
lp.built = nil
}
// Inputs implements [Pipeline].
func (lp *lazyPipeline) Inputs() []Pipeline { return lp.inputs }
// Transport implements [Pipeline].
func (lp *lazyPipeline) Transport() Transport { return Local }

@ -26,6 +26,7 @@ type streamsView struct {
idColumn *streams.Column
searchColumns []*streams.Column // stream ID + labels
batchSize int
pageCacheSize int
initialized bool
streams arrow.Table
@ -45,6 +46,9 @@ type streamsViewOptions struct {
// Maximum number of stream records to read at once. Defaults to 128.
BatchSize int
// The size of the page cache to use for reading sections.
CacheSize int
}
// newStreamsView creates a new view of the given streams section. Only the
@ -149,8 +153,9 @@ func (v *streamsView) init(ctx context.Context) (err error) {
}
readerOptions := streams.ReaderOptions{
Columns: v.searchColumns,
Allocator: memory.DefaultAllocator,
Columns: v.searchColumns,
Allocator: memory.DefaultAllocator,
PageCacheSize: v.pageCacheSize,
}
var scalarIDs []scalar.Scalar

Loading…
Cancel
Save