|
|
|
@ -10,6 +10,7 @@ import ( |
|
|
|
|
"github.com/go-kit/kit/log/level" |
|
|
|
|
"github.com/prometheus/common/model" |
|
|
|
|
"github.com/prometheus/prometheus/pkg/labels" |
|
|
|
|
"golang.org/x/net/context" |
|
|
|
|
|
|
|
|
|
"github.com/grafana/loki/pkg/logproto" |
|
|
|
|
"github.com/grafana/loki/pkg/logql" |
|
|
|
@ -19,12 +20,18 @@ import ( |
|
|
|
|
|
|
|
|
|
const bufferSizeForTailResponse = 5 |
|
|
|
|
|
|
|
|
|
type TailServer interface { |
|
|
|
|
Send(*logproto.TailResponse) error |
|
|
|
|
Context() context.Context |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
type tailer struct { |
|
|
|
|
id uint32 |
|
|
|
|
orgID string |
|
|
|
|
matchers []*labels.Matcher |
|
|
|
|
pipeline logql.Pipeline |
|
|
|
|
expr logql.Expr |
|
|
|
|
id uint32 |
|
|
|
|
orgID string |
|
|
|
|
matchers []*labels.Matcher |
|
|
|
|
pipeline logql.Pipeline |
|
|
|
|
expr logql.Expr |
|
|
|
|
pipelineMtx sync.Mutex |
|
|
|
|
|
|
|
|
|
sendChan chan *logproto.Stream |
|
|
|
|
|
|
|
|
@ -37,10 +44,10 @@ type tailer struct { |
|
|
|
|
blockedMtx sync.RWMutex |
|
|
|
|
droppedStreams []*logproto.DroppedStream |
|
|
|
|
|
|
|
|
|
conn logproto.Querier_TailServer |
|
|
|
|
conn TailServer |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func newTailer(orgID, query string, conn logproto.Querier_TailServer) (*tailer, error) { |
|
|
|
|
func newTailer(orgID, query string, conn TailServer) (*tailer, error) { |
|
|
|
|
expr, err := logql.ParseLogSelector(query) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
@ -141,6 +148,10 @@ func (t *tailer) processStream(stream logproto.Stream) ([]logproto.Stream, error |
|
|
|
|
if log.IsNoopPipeline(t.pipeline) { |
|
|
|
|
return []logproto.Stream{stream}, nil |
|
|
|
|
} |
|
|
|
|
// pipeline are not thread safe and tailer can process multiple stream at once.
|
|
|
|
|
t.pipelineMtx.Lock() |
|
|
|
|
defer t.pipelineMtx.Unlock() |
|
|
|
|
|
|
|
|
|
streams := map[uint64]*logproto.Stream{} |
|
|
|
|
lbs, err := util.ParseLabels(stream.Labels) |
|
|
|
|
if err != nil { |
|
|
|
|