Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/engine/internal/worker/stream_source.go

90 lines
1.9 KiB

package worker
import (
"context"
"errors"
"sync"
"github.com/apache/arrow-go/v18/arrow"
"github.com/grafana/loki/v3/pkg/engine/internal/scheduler/wire"
)
// streamSource handles incoming data for a stream, forwarding it to a bound
// [nodeSource] for processing.
type streamSource struct {
// stateMut ensures that we don't call Bind and Close concurrently.
stateMut sync.Mutex
nodeSource *nodeSource // Node source to forward data to.
closed chan struct{}
bound chan struct{}
initOnce sync.Once
closeOnce sync.Once
}
// Write forwards a record to the bound [nodeSource]. Write blocks until a
// nodeSource is bound and accepts the write, or the provided context is
// canceled.
func (src *streamSource) Write(ctx context.Context, rec arrow.RecordBatch) error {
src.lazyInit()
select {
case <-ctx.Done():
return ctx.Err()
case <-src.closed:
return wire.ErrConnClosed
case <-src.bound:
}
return src.nodeSource.Write(ctx, rec)
}
func (src *streamSource) lazyInit() {
src.initOnce.Do(func() {
src.closed = make(chan struct{})
src.bound = make(chan struct{})
})
}
// Bind binds the streamSource to a nodeSource. Calls to Bind after the first
// will return an error.
func (src *streamSource) Bind(nodeSource *nodeSource) error {
src.lazyInit()
src.stateMut.Lock()
defer src.stateMut.Unlock()
// If the stream source was closed, don't permit binding.
select {
case <-src.closed:
return wire.ErrConnClosed
default:
}
if src.nodeSource != nil {
return errors.New("stream already bound")
}
nodeSource.Add(1)
src.nodeSource = nodeSource
close(src.bound)
return nil
}
// Close closes the source. All future Reads and Write calls will return
// [executor.EOF].
func (src *streamSource) Close() {
src.lazyInit()
src.stateMut.Lock()
defer src.stateMut.Unlock()
src.closeOnce.Do(func() {
if src.nodeSource != nil {
src.nodeSource.Add(-1) // Remove ourselves from the nodeSource.
}
close(src.closed)
})
}