Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
loki/pkg/engine/internal/worker/node_source.go

89 lines
2.1 KiB

package worker
import (
"context"
"sync"
"github.com/apache/arrow-go/v18/arrow"
"go.uber.org/atomic"
"github.com/grafana/loki/v3/pkg/engine/internal/executor"
)
// nodeSource exposes data for a receiver of a stream as an [executor.Pipeline].
//
// Records are made available by a [nodeSource] calling [nodeSource.Write],
// after which each record can be read by the [nodeSource.Read] method.
type nodeSource struct {
initOnce sync.Once
closeOnce sync.Once
// streamCount is the number of streams that have been opened on this node
// source.
streamCount atomic.Int64
closed chan struct{}
records chan arrow.RecordBatch
}
var _ executor.Pipeline = (*nodeSource)(nil)
// Read returns the next record of the node data. Blocks until results are
// available or until the provided ctx is canceled.
func (src *nodeSource) Read(ctx context.Context) (arrow.RecordBatch, error) {
src.lazyInit()
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-src.closed:
return nil, executor.EOF
case rec := <-src.records:
return rec, nil
}
}
func (src *nodeSource) lazyInit() {
src.initOnce.Do(func() {
src.closed = make(chan struct{})
src.records = make(chan arrow.RecordBatch)
})
}
// Write writes a record to the read end of the node source. Write blocks until
// the record has been read or the context is canceled.
func (src *nodeSource) Write(ctx context.Context, rec arrow.RecordBatch) error {
src.lazyInit()
select {
case <-ctx.Done():
return ctx.Err()
case <-src.closed:
return executor.EOF
case src.records <- rec:
return nil
}
}
// Add adds a delta, which may be negative, to the node source's input stream
// counter. If the counter becomes zero, the source is automatically closed. If
// the counter goes negative, Add panics.
func (src *nodeSource) Add(delta int64) {
src.lazyInit()
newValue := src.streamCount.Add(delta)
if newValue == 0 {
src.Close()
} else if newValue < 0 {
panic("negative stream count")
}
}
// Close closes the source. All future Reads and Write calls will return
// [executor.EOF].
func (src *nodeSource) Close() {
src.lazyInit()
src.closeOnce.Do(func() { close(src.closed) })
}