Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
loki/pkg/ingester/tailer.go

245 lines
5.0 KiB

package ingester
import (
"encoding/binary"
"fmt"
"hash/fnv"
"sync"
"time"
"github.com/grafana/loki/pkg/iter"
"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log/level"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
)
const bufferSizeForTailResponse = 5
type tailer struct {
id uint32
orgID string
matchers []*labels.Matcher
expr logql.Expr
sendChan chan *logproto.Stream
done chan struct{}
closeMtx sync.Mutex
blockedAt *time.Time
blockedMtx sync.RWMutex
droppedStreams []*logproto.DroppedStream
conn logproto.Querier_TailServer
}
func newTailer(orgID, query, regex string, conn logproto.Querier_TailServer) (*tailer, error) {
expr, err := logql.ParseExpr(query)
if err != nil {
return nil, err
}
matchers := expr.Matchers()
if regex != "" {
expr = logql.NewFilterExpr(expr, labels.MatchRegexp, regex)
}
return &tailer{
orgID: orgID,
matchers: matchers,
sendChan: make(chan *logproto.Stream, bufferSizeForTailResponse),
conn: conn,
droppedStreams: []*logproto.DroppedStream{},
id: generateUniqueID(orgID, query, regex),
done: make(chan struct{}),
expr: expr,
}, nil
}
func (t *tailer) loop() {
var stream *logproto.Stream
var err error
var ok bool
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
err := t.conn.Context().Err()
if err != nil {
t.close()
return
}
case <-t.done:
return
case stream, ok = <-t.sendChan:
if !ok {
return
} else if stream == nil {
continue
}
// while sending new stream pop lined up dropped streams metadata for sending to querier
tailResponse := logproto.TailResponse{Stream: stream, DroppedStreams: t.popDroppedStreams()}
err = t.conn.Send(&tailResponse)
if err != nil {
level.Error(util.Logger).Log("Error writing to tail client", fmt.Sprintf("%v", err))
t.close()
return
}
}
}
}
func (t *tailer) send(stream logproto.Stream) {
if t.isClosed() {
return
}
// if we are already dropping streams due to blocked connection, drop new streams directly to save some effort
if blockedSince := t.blockedSince(); blockedSince != nil {
if blockedSince.Before(time.Now().Add(-time.Second * 15)) {
t.close()
return
}
t.dropStream(stream)
return
}
err := t.filterEntriesInStream(&stream)
if err != nil {
t.dropStream(stream)
return
}
if len(stream.Entries) == 0 {
return
}
select {
case t.sendChan <- &stream:
default:
t.dropStream(stream)
}
}
func (t *tailer) filterEntriesInStream(stream *logproto.Stream) error {
querier := logql.QuerierFunc(func(matchers []*labels.Matcher, filter logql.Filter) (iter.EntryIterator, error) {
var filteredEntries []logproto.Entry
for _, e := range stream.Entries {
if filter == nil || filter([]byte(e.Line)) {
filteredEntries = append(filteredEntries, e)
}
}
stream.Entries = filteredEntries
return nil, nil
})
_, err := t.expr.Eval(querier)
if err != nil {
return err
}
return nil
}
// Returns true if tailer is interested in the passed labelset
func (t *tailer) isWatchingLabels(metric model.Metric) bool {
for _, matcher := range t.matchers {
if !matcher.Matches(string(metric[model.LabelName(matcher.Name)])) {
return false
}
}
return true
}
func (t *tailer) isClosed() bool {
select {
case <-t.done:
return true
default:
return false
}
}
func (t *tailer) close() {
if t.isClosed() {
return
}
t.closeMtx.Lock()
defer t.closeMtx.Unlock()
if t.isClosed() {
return
}
close(t.done)
close(t.sendChan)
}
func (t *tailer) blockedSince() *time.Time {
t.blockedMtx.RLock()
defer t.blockedMtx.RUnlock()
return t.blockedAt
}
func (t *tailer) dropStream(stream logproto.Stream) {
if len(stream.Entries) == 0 {
return
}
t.blockedMtx.Lock()
defer t.blockedMtx.Unlock()
if t.blockedAt == nil {
blockedAt := time.Now()
t.blockedAt = &blockedAt
}
droppedStream := logproto.DroppedStream{
From: stream.Entries[0].Timestamp,
To: stream.Entries[len(stream.Entries)-1].Timestamp,
Labels: stream.Labels,
}
t.droppedStreams = append(t.droppedStreams, &droppedStream)
}
func (t *tailer) popDroppedStreams() []*logproto.DroppedStream {
t.blockedMtx.Lock()
defer t.blockedMtx.Unlock()
if t.blockedAt == nil {
return nil
}
droppedStreams := t.droppedStreams
t.droppedStreams = []*logproto.DroppedStream{}
t.blockedAt = nil
return droppedStreams
}
func (t *tailer) getID() uint32 {
return t.id
}
// An id is useful in managing tailer instances
func generateUniqueID(orgID, query, regex string) uint32 {
uniqueID := fnv.New32()
_, _ = uniqueID.Write([]byte(orgID))
_, _ = uniqueID.Write([]byte(query))
_, _ = uniqueID.Write([]byte(regex))
timeNow := make([]byte, 8)
binary.LittleEndian.PutUint64(timeNow, uint64(time.Now().UnixNano()))
_, _ = uniqueID.Write(timeNow)
return uniqueID.Sum32()
}