Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/engine/internal/worker/node_source.go

119 lines
3.1 KiB

package worker
import (
"context"
"sync"
"time"
"github.com/apache/arrow-go/v18/arrow"
"go.uber.org/atomic"
"github.com/grafana/loki/v3/pkg/engine/internal/executor"
"github.com/grafana/loki/v3/pkg/xcap"
)
// nodeSource exposes data for a receiver of a stream as an [executor.Pipeline].
//
// Records are made available by a [nodeSource] calling [nodeSource.Write],
// after which each record can be read by the [nodeSource.Read] method.
type nodeSource struct {
initOnce sync.Once
closeOnce sync.Once
// streamCount is the number of streams that have been opened on this node
// source.
streamCount atomic.Int64
closed chan struct{}
records chan arrow.RecordBatch
// failErr is set by Fail before the source is closed. Read returns it
// instead of executor.EOF when the source was closed due to a failure.
failErr atomic.Pointer[error]
}
var _ executor.Pipeline = (*nodeSource)(nil)
func (src *nodeSource) Open(_ context.Context) error {
src.lazyInit()
return nil
}
// Read returns the next record of the node data. Blocks until results are
// available or until the provided ctx is canceled.
func (src *nodeSource) Read(ctx context.Context) (arrow.RecordBatch, error) {
region := xcap.RegionFromContext(ctx)
startRecv := time.Now()
defer func() {
region.Record(xcap.TaskRecvDuration.Observe(time.Since(startRecv).Seconds()))
}()
src.lazyInit()
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-src.closed:
if ep := src.failErr.Load(); ep != nil {
return nil, *ep
}
return nil, executor.EOF
case rec := <-src.records:
return rec, nil
}
}
func (src *nodeSource) lazyInit() {
src.initOnce.Do(func() {
src.closed = make(chan struct{})
src.records = make(chan arrow.RecordBatch)
})
}
// Write writes a record to the read end of the node source. Write blocks until
// the record has been read or the context is canceled.
func (src *nodeSource) Write(ctx context.Context, rec arrow.RecordBatch) error {
src.lazyInit()
select {
case <-ctx.Done():
return ctx.Err()
case <-src.closed:
return executor.EOF
case src.records <- rec:
return nil
}
}
// Add adds a delta, which may be negative, to the node source's input stream
// counter. If the counter becomes zero, the source is automatically closed. If
// the counter goes negative, Add panics.
func (src *nodeSource) Add(delta int64) {
src.lazyInit()
newValue := src.streamCount.Add(delta)
if newValue == 0 {
src.Close()
} else if newValue < 0 {
panic("negative stream count")
}
}
// Fail marks the source as failed with err and closes it. Future Read calls
// will return err instead of [executor.EOF]. err must not be nil.
func (src *nodeSource) Fail(err error) {
if err == nil {
// This panic is required because if nil is passed, Read will return (nil, nil) which breaks the Read contract.
panic("nodeSource.Fail called with nil error")
}
src.failErr.Store(&err)
src.Close()
}
// Close closes the source. All future Reads and Write calls will return
// [executor.EOF].
func (src *nodeSource) Close() {
src.lazyInit()
src.closeOnce.Do(func() { close(src.closed) })
}