@ -28,7 +28,8 @@ type tailer struct {
id uint32
orgID string
matchers [ ] * labels . Matcher
expr syntax . LogSelectorExpr
pipeline syntax . Pipeline
expr syntax . Expr
pipelineMtx sync . Mutex
sendChan chan * logproto . Stream
@ -51,9 +52,7 @@ func newTailer(orgID, query string, conn TailServer, maxDroppedStreams int) (*ta
if err != nil {
return nil , err
}
// Make sure we can build a pipeline. The stream processing code doesn't have a place to handle
// this error so make sure we handle it here.
_ , err = expr . Pipeline ( )
pipeline , err := expr . Pipeline ( )
if err != nil {
return nil , err
}
@ -62,6 +61,7 @@ func newTailer(orgID, query string, conn TailServer, maxDroppedStreams int) (*ta
return & tailer {
orgID : orgID ,
matchers : matchers ,
pipeline : pipeline ,
sendChan : make ( chan * logproto . Stream , bufferSizeForTailResponse ) ,
conn : conn ,
droppedStreams : make ( [ ] * logproto . DroppedStream , 0 , maxDroppedStreams ) ,
@ -121,44 +121,53 @@ func (t *tailer) send(stream logproto.Stream, lbs labels.Labels) {
return
}
processed := t . processStream ( stream , lbs )
select {
case t . sendChan <- processed :
default :
t . dropStream ( stream )
streams := t . processStream ( stream , lbs )
if len ( streams ) == 0 {
return
}
for _ , s := range streams {
select {
case t . sendChan <- s :
default :
t . dropStream ( * s )
}
}
}
func ( t * tailer ) processStream ( stream logproto . Stream , lbs labels . Labels ) * logproto . Stream {
// Build a new pipeline for each call because the pipeline builds a cache of labels
// and if we don't start with a new pipeline that cache will grow unbounded.
// The error is ignored because it would be handled in the constructor of the tailer.
pipeline , _ := t . expr . Pipeline ( )
func ( t * tailer ) processStream ( stream logproto . Stream , lbs labels . Labels ) [ ] * logproto . Stream {
// Optimization: skip filtering entirely, if no filter is set
if log . IsNoopPipeline ( pipeline ) {
return & stream
if log . IsNoopPipeline ( t . pipeline ) {
return [ ] * logproto . Stream { & stream }
}
// pipeline are not thread safe and tailer can process multiple stream at once.
t . pipelineMtx . Lock ( )
defer t . pipelineMtx . Unlock ( )
responseStream := & logproto . Stream {
Labels : stream . Labels ,
Entries : make ( [ ] logproto . Entry , 0 , len ( stream . Entries ) ) ,
}
sp := pipeline . ForStream ( lbs )
streams := map [ uint64 ] * logproto . Stream { }
sp := t . pipeline . ForStream ( lbs )
for _ , e := range stream . Entries {
newLine , _ , ok := sp . ProcessString ( e . Timestamp . UnixNano ( ) , e . Line )
newLine , parsedLbs , ok := sp . ProcessString ( e . Timestamp . UnixNano ( ) , e . Line )
if ! ok {
continue
}
responseStream . Entries = append ( responseStream . Entries , logproto . Entry {
var stream * logproto . Stream
if stream , ok = streams [ parsedLbs . Hash ( ) ] ; ! ok {
stream = & logproto . Stream {
Labels : parsedLbs . String ( ) ,
}
streams [ parsedLbs . Hash ( ) ] = stream
}
stream . Entries = append ( stream . Entries , logproto . Entry {
Timestamp : e . Timestamp ,
Line : newLine ,
} )
}
return responseStream
streamsResult := make ( [ ] * logproto . Stream , 0 , len ( streams ) )
for _ , stream := range streams {
streamsResult = append ( streamsResult , stream )
}
return streamsResult
}
// isMatching returns true if lbs matches all matchers.