Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/engine/executor/pipeline.go

239 lines
6.1 KiB

package executor
import (
"context"
"errors"
"fmt"
"github.com/apache/arrow-go/v18/arrow"
)
type Transport uint8
const (
_ Transport = iota
Local
Remote
)
// Pipeline represents a data processing pipeline that can read Arrow records.
// It provides methods to read data, access the current record, and close resources.
type Pipeline interface {
// Read reads the next value into its state.
// It returns an error if reading fails or when the pipeline is exhausted. In this case, the function returns EOF.
// The implementation must retain the returned error in its state and return it with subsequent Value() calls.
Read(context.Context) error
// Value returns the current value in state.
Value() (arrow.Record, error)
// Close closes the resources of the pipeline.
// The implementation must close all the of the pipeline's inputs.
Close()
// Inputs returns the inputs of the pipeline.
Inputs() []Pipeline
// Transport returns the type of transport of the implementation.
Transport() Transport
}
var (
errNotImplemented = errors.New("pipeline not implemented")
EOF = errors.New("pipeline exhausted") // nolint:revive
Exhausted = failureState(EOF)
Canceled = failureState(context.Canceled)
)
type state struct {
batch arrow.Record
err error
}
func (s state) Value() (arrow.Record, error) {
return s.batch, s.err
}
func failureState(err error) state {
return state{err: err}
}
func successState(batch arrow.Record) state {
return state{batch: batch}
}
func newState(batch arrow.Record, err error) state {
return state{batch: batch, err: err}
}
type GenericPipeline struct {
t Transport
inputs []Pipeline
state state
read func(context.Context, []Pipeline) state
}
func newGenericPipeline(t Transport, read func(context.Context, []Pipeline) state, inputs ...Pipeline) *GenericPipeline {
return &GenericPipeline{
t: t,
read: read,
inputs: inputs,
}
}
var _ Pipeline = (*GenericPipeline)(nil)
// Inputs implements Pipeline.
func (p *GenericPipeline) Inputs() []Pipeline {
return p.inputs
}
// Value implements Pipeline.
func (p *GenericPipeline) Value() (arrow.Record, error) {
return p.state.Value()
}
// Read implements Pipeline.
func (p *GenericPipeline) Read(ctx context.Context) error {
if p.read == nil {
return EOF
}
p.state = p.read(ctx, p.inputs)
return p.state.err
}
// Close implements Pipeline.
func (p *GenericPipeline) Close() {
for _, inp := range p.inputs {
inp.Close()
}
}
// Transport implements Pipeline.
func (p *GenericPipeline) Transport() Transport {
return p.t
}
func errorPipeline(err error) Pipeline {
return newGenericPipeline(Local, func(_ context.Context, _ []Pipeline) state {
return state{err: fmt.Errorf("failed to execute pipeline: %w", err)}
})
}
func emptyPipeline() Pipeline {
return newGenericPipeline(Local, func(_ context.Context, _ []Pipeline) state {
return Exhausted
})
}
// prefetchWrapper wraps a [Pipeline] with pre-fetching capability,
// reading data in a separate goroutine to enable concurrent processing.
type prefetchWrapper struct {
Pipeline // the pipeline that is wrapped
initialized bool // internal state to indicate whether the pre-fetching goroutine is running
ch chan state // the results channel for pre-fetched items
state state // internal state representing the last pre-fetched item
cancel context.CancelCauseFunc // cancellation function for the context
}
var _ Pipeline = (*prefetchWrapper)(nil)
// newPrefetchingPipeline creates a new prefetching pipeline wrapper that reads data from the underlying pipeline
// in a separate goroutine. This allows for concurrent processing where data can be fetched ahead of time
// while the consumer is processing the current batch.
//
// The prefetching pipeline maintains a buffered channel with capacity 1 to store the next batch,
// enabling pipeline parallelism and potentially improving throughput.
//
// The function accepts a [context.Context] `ctx` and a [Pipeline] `p`, which is the underlying pipeline to wrap
// with pre-fetching capability.
//
// Returns a [prefetchWrapper] that implements the [Pipeline] interface.
func newPrefetchingPipeline(p Pipeline) *prefetchWrapper {
return &prefetchWrapper{
Pipeline: p,
ch: make(chan state),
}
}
// Read implements [Pipeline].
func (p *prefetchWrapper) Read(ctx context.Context) error {
p.init(ctx)
return p.read(ctx)
}
func (p *prefetchWrapper) init(ctx context.Context) {
if p.initialized {
return
}
p.initialized = true
ctx, p.cancel = context.WithCancelCause(ctx)
go p.prefetch(ctx) // nolint:errcheck
}
func (p prefetchWrapper) prefetch(ctx context.Context) error {
// Close channel on exit
defer close(p.ch)
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
var s state
s.err = p.Pipeline.Read(ctx)
if s.err != nil {
p.ch <- s
return s.err
}
s.batch, s.err = p.Pipeline.Value()
s.batch.Retain()
// Sending to channel will block until the batch is read by the parent pipeline.
// If the context is cancelled while waiting to send, we return.
select {
case <-ctx.Done():
s.batch.Release()
return ctx.Err()
case p.ch <- s:
}
}
}
}
func (p *prefetchWrapper) read(_ context.Context) error {
// Release previously retained batch
if p.state.batch != nil {
p.state.batch.Release()
}
p.state = <-p.ch
// Reading from a channel that is closed while waiting yields a zero-value.
// In that case, the pipeline should produce an error state.
if p.state.err == nil && p.state.batch == nil {
p.state = Canceled
}
return p.state.err
}
// Value implements [Pipeline].
func (p *prefetchWrapper) Value() (arrow.Record, error) {
return p.state.batch, p.state.err
}
// Close implements [Pipeline].
func (p *prefetchWrapper) Close() {
// NOTE(rfratto): We don't need to drain p.ch because all writes to p.ch are
// guaranteed to abort if the context is canceled.
//
// Attempting to drain p.ch here anyway can cause a deadlock if the
// [prefetchWrapper.Close] is called before [prefetchWrapper.init].
if p.cancel != nil {
p.cancel(errors.New("pipeline is closed"))
}
if p.state.batch != nil {
p.state.batch.Release()
p.state = state{}
}
p.Pipeline.Close()
}