Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/engine/internal/executor/merge.go

132 lines
3.4 KiB

package executor
import (
"context"
"errors"
"fmt"
"github.com/apache/arrow-go/v18/arrow"
"github.com/grafana/loki/v3/pkg/xcap"
)
// Merge is a pipeline that takes N inputs and sequentially consumes each one of them.
// It completely exhausts an input before moving to the next one.
type Merge struct {
inputs []Pipeline
maxPrefetch int
initialized bool
currInput int // index of the currently processed input
region *xcap.Region
}
var _ Pipeline = (*Merge)(nil)
// newMergePipeline creates a new merge pipeline that merges N inputs into a single output.
//
// The argument maxPrefetch controls how many inputs are prefetched simultaneously while the current one is consumed.
// Set maxPrefetch to 0 to disable prefetching of the next input.
// Set maxPrefetch to 1 to prefetch only the next input, and so on.
// Set maxPrefetch to -1 to pretetch all inputs at once.
func newMergePipeline(inputs []Pipeline, maxPrefetch int, region *xcap.Region) (*Merge, error) {
if len(inputs) == 0 {
return nil, fmt.Errorf("merge pipeline: no inputs provided")
}
// Default to number of inputs if maxConcurrency is negative or exceeds the number of inputs.
if maxPrefetch < 0 || maxPrefetch >= len(inputs) {
maxPrefetch = len(inputs) - 1
}
// Wrap inputs into prefetching pipeline.
for i := range inputs {
// Only wrap input, but do not call init() on it, as it would start prefetching.
// Prefetching is started in the [Merge.init] function
inputs[i] = newPrefetchingPipeline(inputs[i])
}
return &Merge{
inputs: inputs,
maxPrefetch: maxPrefetch,
region: region,
}, nil
}
func (m *Merge) init(ctx context.Context) {
if m.initialized {
return
}
// Initialize pre-fetching of inputs defined by maxPrefetch.
// The first/current input is always initialized.
for i := range m.inputs {
if i <= m.maxPrefetch {
m.startPrefetchingInputAtIndex(ctx, i)
}
}
m.initialized = true
}
// startPrefetchingInputAtIndex initializes the input at given index i,
// if the index is not out of bounds and if the input is of type [prefetchWrapper].
// Initializing the input will start its prefetching.
func (m *Merge) startPrefetchingInputAtIndex(ctx context.Context, i int) {
if i >= len(m.inputs) {
return
}
inp, ok := m.inputs[i].(*prefetchWrapper)
if ok {
inp.init(ctx)
}
}
// Read reads the next batch from the pipeline.
// It returns an error if reading fails or when the pipeline is exhausted.
func (m *Merge) Read(ctx context.Context) (arrow.RecordBatch, error) {
m.init(ctx)
return m.read(ctx)
}
func (m *Merge) read(ctx context.Context) (arrow.RecordBatch, error) {
// All inputs have been consumed and are exhausted
if m.currInput >= len(m.inputs) {
return nil, EOF
}
for m.currInput < len(m.inputs) {
input := m.inputs[m.currInput]
rec, err := input.Read(ctx)
if err != nil {
if errors.Is(err, EOF) {
input.Close()
// Proceed to the next input
m.currInput++
// Initialize the next input so it starts prefetching
m.startPrefetchingInputAtIndex(ctx, m.currInput+m.maxPrefetch)
continue
}
return nil, err
}
return rec, nil
}
// Return EOF if none of the inputs returned a record.
return nil, EOF
}
// Close implements Pipeline.
func (m *Merge) Close() {
if m.region != nil {
m.region.End()
}
// exhausted inputs are already closed
for _, input := range m.inputs[m.currInput:] {
input.Close()
}
}
// Region implements RegionProvider.
func (m *Merge) Region() *xcap.Region {
return m.region
}