pull/12003/head^2
Salva Corts 1 year ago committed by GitHub
parent 0660cfc9df
commit b7cb85f92b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 38
      pkg/ingester/stream.go
  2. 64
      pkg/ingester/tailer.go
  3. 50
      pkg/ingester/tailer_test.go

@ -288,30 +288,28 @@ func (s *stream) recordAndSendToTailers(record *wal.Record, entries []logproto.E
hasTailers := len(s.tailers) != 0
s.tailerMtx.RUnlock()
if hasTailers {
go func() {
stream := logproto.Stream{Labels: s.labelsString, Entries: entries}
closedTailers := []uint32{}
s.tailerMtx.RLock()
for _, tailer := range s.tailers {
if tailer.isClosed() {
closedTailers = append(closedTailers, tailer.getID())
continue
}
tailer.send(stream, s.labels)
stream := logproto.Stream{Labels: s.labelsString, Entries: entries}
closedTailers := []uint32{}
s.tailerMtx.RLock()
for _, tailer := range s.tailers {
if tailer.isClosed() {
closedTailers = append(closedTailers, tailer.getID())
continue
}
s.tailerMtx.RUnlock()
tailer.send(stream, s.labels)
}
s.tailerMtx.RUnlock()
if len(closedTailers) != 0 {
s.tailerMtx.Lock()
defer s.tailerMtx.Unlock()
if len(closedTailers) != 0 {
s.tailerMtx.Lock()
defer s.tailerMtx.Unlock()
for _, closedTailerID := range closedTailers {
delete(s.tailers, closedTailerID)
}
for _, closedTailerID := range closedTailers {
delete(s.tailers, closedTailerID)
}
}()
}
}
}

@ -17,13 +17,21 @@ import (
util_log "github.com/grafana/loki/pkg/util/log"
)
const bufferSizeForTailResponse = 5
const (
bufferSizeForTailResponse = 5
bufferSizeForTailStream = 100
)
type TailServer interface {
Send(*logproto.TailResponse) error
Context() context.Context
}
type tailRequest struct {
stream logproto.Stream
lbs labels.Labels
}
type tailer struct {
id uint32
orgID string
@ -31,6 +39,7 @@ type tailer struct {
pipeline syntax.Pipeline
pipelineMtx sync.Mutex
queue chan tailRequest
sendChan chan *logproto.Stream
// Signaling channel used to notify once the tailer gets closed
@ -59,6 +68,7 @@ func newTailer(orgID string, expr syntax.LogSelectorExpr, conn TailServer, maxDr
orgID: orgID,
matchers: matchers,
sendChan: make(chan *logproto.Stream, bufferSizeForTailResponse),
queue: make(chan tailRequest, bufferSizeForTailStream),
conn: conn,
droppedStreams: make([]*logproto.DroppedStream, 0, maxDroppedStreams),
maxDroppedStreams: maxDroppedStreams,
@ -73,6 +83,9 @@ func (t *tailer) loop() {
var err error
var ok bool
// Launch a go routine to receive streams sent with t.send
go t.receiveStreamsLoop()
for {
select {
case <-t.conn.Context().Done():
@ -102,6 +115,37 @@ func (t *tailer) loop() {
}
}
func (t *tailer) receiveStreamsLoop() {
defer t.close()
for {
select {
case <-t.conn.Context().Done():
return
case <-t.closeChan:
return
case req, ok := <-t.queue:
if !ok {
return
}
streams := t.processStream(req.stream, req.lbs)
if len(streams) == 0 {
continue
}
for _, s := range streams {
select {
case t.sendChan <- s:
default:
t.dropStream(*s)
}
}
}
}
}
// send sends a stream to the tailer for processing and sending to the client.
// It will drop the stream if the tailer is blocked or the queue is full.
func (t *tailer) send(stream logproto.Stream, lbs labels.Labels) {
if t.isClosed() {
return
@ -117,16 +161,16 @@ func (t *tailer) send(stream logproto.Stream, lbs labels.Labels) {
return
}
streams := t.processStream(stream, lbs)
if len(streams) == 0 {
return
// Send stream to queue for processing asynchronously
// If the queue is full, drop the stream
req := tailRequest{
stream: stream,
lbs: lbs,
}
for _, s := range streams {
select {
case t.sendChan <- s:
default:
t.dropStream(*s)
}
select {
case t.queue <- req:
default:
t.dropStream(stream)
}
}

@ -2,6 +2,7 @@ package ingester
import (
"context"
"fmt"
"math/rand"
"sync"
"testing"
@ -15,6 +16,55 @@ import (
"github.com/grafana/loki/pkg/logql/syntax"
)
func TestTailer_RoundTrip(t *testing.T) {
server := &fakeTailServer{}
lbs := makeRandomLabels()
expr, err := syntax.ParseLogSelector(lbs.String(), true)
require.NoError(t, err)
tail, err := newTailer("org-id", expr, server, 10)
require.NoError(t, err)
var wg sync.WaitGroup
wg.Add(1)
go func() {
tail.loop()
wg.Done()
}()
const numStreams = 1000
var entries []logproto.Entry
for i := 0; i < numStreams; i += 3 {
var iterEntries []logproto.Entry
for j := 0; j < 3; j++ {
iterEntries = append(iterEntries, logproto.Entry{Timestamp: time.Unix(0, int64(i+j)), Line: fmt.Sprintf("line %d", i+j)})
}
entries = append(entries, iterEntries...)
tail.send(logproto.Stream{
Labels: lbs.String(),
Entries: iterEntries,
}, lbs)
// sleep a bit to allow the tailer to process the stream without dropping
// This should take about 5 seconds to process all the streams
time.Sleep(5 * time.Millisecond)
}
// Wait for the stream to be received by the server.
require.Eventually(t, func() bool {
return len(server.GetResponses()) > 0
}, 30*time.Second, 1*time.Second, "stream was not received")
var processedEntries []logproto.Entry
for _, response := range server.GetResponses() {
processedEntries = append(processedEntries, response.Stream.Entries...)
}
require.ElementsMatch(t, entries, processedEntries)
tail.close()
wg.Wait()
}
func TestTailer_sendRaceConditionOnSendWhileClosing(t *testing.T) {
runs := 100

Loading…
Cancel
Save