Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/engine/internal/executor/limit.go

60 lines
1.5 KiB

package executor
import (
"context"
"github.com/apache/arrow-go/v18/arrow"
"github.com/grafana/loki/v3/pkg/xcap"
)
func NewLimitPipeline(input Pipeline, skip, fetch uint32, region *xcap.Region) *GenericPipeline {
// We gradually reduce offsetRemaining and limitRemaining as we process more records, as the
// offsetRemaining and limitRemaining may cross record boundaries.
var (
offsetRemaining = int64(skip)
limitRemaining = int64(fetch)
)
return newGenericPipelineWithRegion(func(ctx context.Context, inputs []Pipeline) (arrow.RecordBatch, error) {
var length int64
var start, end int64
var batch arrow.RecordBatch
var err error
// We skip yielding zero-length batches while offsetRemainig > 0
for length == 0 {
// Stop once we reached the limit
if limitRemaining <= 0 {
return nil, EOF
}
// Pull the next item from input
input := inputs[0]
batch, err = input.Read(ctx)
if err != nil {
return nil, err
}
// We want to slice batch so it only contains the rows we're looking for
// accounting for both the limit and offset.
// We constrain the start and end to be within the bounds of the record.
start = min(offsetRemaining, batch.NumRows())
end = min(start+limitRemaining, batch.NumRows())
length = end - start
offsetRemaining -= start
limitRemaining -= length
}
if length <= 0 && offsetRemaining <= 0 {
return nil, EOF
}
if batch.NumRows() == 0 {
return batch, nil
}
return batch.NewSlice(start, end), nil
}, region, input)
}