Improved Tailer loop (dropped entries, logic refactoring, introduced tests)

pull/886/head
Marco Pracucci 7 years ago committed by Sandeep Sukhani
parent 132989031f
commit b9185324f4
  1. 2
      pkg/canary/comparator/comparator_test.go
  2. 8
      pkg/querier/querier.go
  3. 45
      pkg/querier/querier_mock_test.go
  4. 36
      pkg/querier/querier_test.go
  5. 161
      pkg/querier/tail.go
  6. 10
      pkg/querier/tail_mock_test.go
  7. 252
      pkg/querier/tail_test.go

@ -156,7 +156,7 @@ func TestEntryNeverReceived(t *testing.T) {
found := []time.Time{t1, t3, t4, t5}
mr := &mockReader{found}
maxWait := 5 * time.Millisecond
maxWait := 50 * time.Millisecond
c := NewComparator(actual, maxWait, 2*time.Millisecond, 1, make(chan time.Time), make(chan time.Time), mr)
c.entrySent(t1)

@ -20,6 +20,13 @@ import (
"github.com/grafana/loki/pkg/storage"
)
const (
// How long the Tailer should wait - once there are no entries to read from ingesters -
// before checking if a new entry is available (to avoid spinning the CPU in a continuous
// check loop)
tailerWaitEntryThrottle = time.Second / 2
)
var readinessProbeSuccess = []byte("Ready")
// Config for a querier.
@ -308,6 +315,7 @@ func (q *Querier) Tail(ctx context.Context, req *logproto.TailRequest) (*Tailer,
return q.tailDisconnectedIngesters(tailCtx, req, connectedIngestersAddr)
},
q.cfg.TailMaxDuration,
tailerWaitEntryThrottle,
), nil
}

@ -3,6 +3,7 @@ package querier
import (
"context"
"errors"
"fmt"
"time"
"github.com/cortexproject/cortex/pkg/chunk"
@ -107,10 +108,13 @@ func (c *queryClientMock) RecvMsg(m interface{}) error {
type tailClientMock struct {
util.ExtendedMock
logproto.Querier_TailClient
recvTrigger chan time.Time
}
func newTailClientMock() *tailClientMock {
return &tailClientMock{}
return &tailClientMock{
recvTrigger: make(chan time.Time, 10),
}
}
func (c *tailClientMock) Recv() (*logproto.TailResponse, error) {
@ -138,6 +142,20 @@ func (c *tailClientMock) RecvMsg(m interface{}) error {
return nil
}
func (c *tailClientMock) mockRecvWithTrigger(response *logproto.TailResponse) *tailClientMock {
c.On("Recv").WaitUntil(c.recvTrigger).Return(response, nil)
return c
}
// triggerRecv triggers the Recv() mock to return from the next invocation
// or from the current invocation if was already called and waiting for the
// trigger. This method works if and only if the Recv() has been mocked with
// mockRecvWithTrigger().
func (c *tailClientMock) triggerRecv() {
c.recvTrigger <- time.Now()
}
// storeMock is a mockable version of Loki's storage, used in querier unit tests
// to control the behaviour of the store without really hitting any storage backend
type storeMock struct {
@ -227,3 +245,28 @@ func mockReadRingWithOneActiveIngester() *readRingMock {
{Addr: "test", Timestamp: time.Now().UnixNano(), State: ring.ACTIVE, Tokens: []uint32{1, 2, 3}},
})
}
// mockStreamIterator returns an iterator with 1 stream and quantity entries,
// where entries timestamp and line string are constructed as sequential numbers
// starting at from
func mockStreamIterator(from int, quantity int) iter.EntryIterator {
return iter.NewStreamIterator(mockStream(from, quantity))
}
// mockStream return a stream with quantity entries, where entries timestamp and
// line string are constructed as sequential numbers starting at from
func mockStream(from int, quantity int) *logproto.Stream {
entries := make([]logproto.Entry, 0, quantity)
for i := from; i < from+quantity; i++ {
entries = append(entries, logproto.Entry{
Timestamp: time.Unix(int64(i), 0),
Line: fmt.Sprintf("line %d", i),
})
}
return &logproto.Stream{
Entries: entries,
Labels: `{type="test"}`,
}
}

@ -5,7 +5,6 @@ import (
"testing"
"time"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
@ -30,10 +29,10 @@ func TestQuerier_Query_QueryTimeoutConfigFlag(t *testing.T) {
}
store := newStoreMock()
store.On("LazyQuery", mock.Anything, mock.Anything).Return(mockStreamIterator(), nil)
store.On("LazyQuery", mock.Anything, mock.Anything).Return(mockStreamIterator(1, 2), nil)
queryClient := newQueryClientMock()
queryClient.On("Recv").Return(mockQueryResponse([]*logproto.Stream{mockStream()}), nil)
queryClient.On("Recv").Return(mockQueryResponse([]*logproto.Stream{mockStream(1, 2)}), nil)
ingesterClient := newQuerierClientMock()
ingesterClient.On("Query", mock.Anything, &request, mock.Anything).Return(queryClient, nil)
@ -119,13 +118,13 @@ func TestQuerier_Tail_QueryTimeoutConfigFlag(t *testing.T) {
}
store := newStoreMock()
store.On("LazyQuery", mock.Anything, mock.Anything).Return(mockStreamIterator(), nil)
store.On("LazyQuery", mock.Anything, mock.Anything).Return(mockStreamIterator(1, 2), nil)
queryClient := newQueryClientMock()
queryClient.On("Recv").Return(mockQueryResponse([]*logproto.Stream{mockStream()}), nil)
queryClient.On("Recv").Return(mockQueryResponse([]*logproto.Stream{mockStream(1, 2)}), nil)
tailClient := newTailClientMock()
tailClient.On("Recv").Return(mockTailResponse(mockStream()), nil)
tailClient.On("Recv").Return(mockTailResponse(mockStream(1, 2)), nil)
ingesterClient := newQuerierClientMock()
ingesterClient.On("Query", mock.Anything, mock.Anything, mock.Anything).Return(queryClient, nil)
@ -170,24 +169,6 @@ func mockQuerierConfig() Config {
}
}
func mockStreamIterator() iter.EntryIterator {
return iter.NewStreamIterator(mockStream())
}
func mockStream() *logproto.Stream {
entries := []logproto.Entry{
{Timestamp: time.Now(), Line: "line 1"},
{Timestamp: time.Now(), Line: "line 2"},
}
labels := "{type=\"test\"}"
return &logproto.Stream{
Entries: entries,
Labels: labels,
}
}
func mockQueryResponse(streams []*logproto.Stream) *logproto.QueryResponse {
return &logproto.QueryResponse{
Streams: streams,
@ -199,10 +180,3 @@ func mockLabelResponse(values []string) *logproto.LabelResponse {
Values: values,
}
}
func mockTailResponse(stream *logproto.Stream) *logproto.TailResponse {
return &logproto.TailResponse{
Stream: stream,
DroppedStreams: []*logproto.DroppedStream{},
}
}

@ -13,13 +13,20 @@ import (
)
const (
// if we are not seeing any response from ingester, how long do we want to wait by going into sleep
nextEntryWait = time.Second / 2
// keep checking connections with ingesters in duration
checkConnectionsWithIngestersPeriod = time.Second * 5
bufferSizeForTailResponse = 10
// the size of the channel buffer used to send tailing streams
// back to the requesting client
maxBufferedTailResponses = 10
// the maximum number of entries to return in a TailResponse
maxEntriesPerTailResponse = 100
// the maximum number of dropped entries to keep in memory that will be sent along
// with the next successfully pushed response. Once the dropped entries memory buffer
// exceed this value, we start skipping dropped entries too.
maxDroppedEntriesPerTailResponse = 1000
)
type droppedEntry struct {
@ -48,15 +55,14 @@ type Tailer struct {
querierTailClientsMtx sync.Mutex
stopped bool
blocked bool
blockedMtx sync.RWMutex
delayFor time.Duration
responseChan chan *TailResponse
closeErrChan chan error
tailMaxDuration time.Duration
// when tail client is slow, drop entry and store its details in droppedEntries to notify client
droppedEntries []droppedEntry
// if we are not seeing any response from ingester,
// how long do we want to wait by going into sleep
waitEntryThrottle time.Duration
}
func (t *Tailer) readTailClients() {
@ -74,13 +80,9 @@ func (t *Tailer) loop() {
tailMaxDurationTicker := time.NewTicker(t.tailMaxDuration)
defer tailMaxDurationTicker.Stop()
tailResponse := new(TailResponse)
for {
if t.stopped {
return
}
droppedEntries := make([]droppedEntry, 0)
for !t.stopped {
select {
case <-checkConnectionTicker.C:
// Try to reconnect dropped ingesters and connect to new ingesters
@ -96,44 +98,66 @@ func (t *Tailer) loop() {
default:
}
if !t.next() {
if len(tailResponse.Streams) == 0 {
if len(t.querierTailClients) == 0 {
// All the connections to ingesters are dropped, try reconnecting or return error
if err := t.checkIngesterConnections(); err != nil {
level.Error(util.Logger).Log("Error reconnecting to ingesters", fmt.Sprintf("%v", err))
} else {
continue
}
if err := t.close(); err != nil {
level.Error(util.Logger).Log("Error closing Tailer", fmt.Sprintf("%v", err))
}
t.closeErrChan <- errors.New("all ingesters closed the connection")
return
}
time.Sleep(nextEntryWait)
continue
}
} else {
// If channel is blocked already, drop current entry directly to save the effort
if t.isBlocked() {
t.dropEntry(t.currEntry.Timestamp, t.currLabels, nil)
// Read as much entries as we can (up to the max allowed) and populate the
// tail response we'll send over the response channel
tailResponse := new(TailResponse)
entriesCount := 0
for ; entriesCount < maxEntriesPerTailResponse && t.next(); entriesCount++ {
// If the response channel channel is blocked, we drop the current entry directly
// to save the effort
if t.isResponseChanBlocked() {
droppedEntries = dropEntry(droppedEntries, t.currEntry.Timestamp, t.currLabels)
continue
}
tailResponse.Streams = append(tailResponse.Streams, logproto.Stream{Labels: t.currLabels, Entries: []logproto.Entry{t.currEntry}})
if len(tailResponse.Streams) != 100 {
continue
tailResponse.Streams = append(tailResponse.Streams, logproto.Stream{
Labels: t.currLabels,
Entries: []logproto.Entry{t.currEntry},
})
}
// If all consumed entries have been dropped because the response channel is blocked
// we should reiterate on the loop
if len(tailResponse.Streams) == 0 && entriesCount > 0 {
continue
}
// If no entry has been consumed we should ensure it's not caused by all ingesters
// connections dropped and then throttle for a while
if len(tailResponse.Streams) == 0 {
if len(t.querierTailClients) == 0 {
// All the connections to ingesters are dropped, try reconnecting or return error
if err := t.checkIngesterConnections(); err != nil {
level.Error(util.Logger).Log("Error reconnecting to ingesters", fmt.Sprintf("%v", err))
} else {
continue
}
if err := t.close(); err != nil {
level.Error(util.Logger).Log("Error closing Tailer", fmt.Sprintf("%v", err))
}
t.closeErrChan <- errors.New("all ingesters closed the connection")
return
}
tailResponse.DroppedEntries = t.popDroppedEntries()
time.Sleep(t.waitEntryThrottle)
continue
}
// Send the tail response through the response channel without blocking.
// Drop the entry if the response channel buffer is full.
if len(droppedEntries) > 0 {
tailResponse.DroppedEntries = droppedEntries
}
select {
case t.responseChan <- tailResponse:
if len(droppedEntries) > 0 {
droppedEntries = make([]droppedEntry, 0)
}
default:
t.dropEntry(t.currEntry.Timestamp, t.currLabels, tailResponse.DroppedEntries)
droppedEntries = dropEntries(droppedEntries, tailResponse.Streams)
}
tailResponse = new(TailResponse)
}
}
@ -219,33 +243,10 @@ func (t *Tailer) close() error {
return t.openStreamIterator.Close()
}
func (t *Tailer) dropEntry(timestamp time.Time, labels string, alreadyDroppedEntries []droppedEntry) {
t.blockedMtx.Lock()
defer t.blockedMtx.Unlock()
t.droppedEntries = append(t.droppedEntries, alreadyDroppedEntries...)
t.droppedEntries = append(t.droppedEntries, droppedEntry{timestamp, labels})
}
func (t *Tailer) isBlocked() bool {
t.blockedMtx.RLock()
defer t.blockedMtx.RUnlock()
return t.blocked
}
func (t *Tailer) popDroppedEntries() []droppedEntry {
t.blockedMtx.Lock()
defer t.blockedMtx.Unlock()
t.blocked = false
if len(t.droppedEntries) == 0 {
return nil
}
droppedEntries := t.droppedEntries
t.droppedEntries = []droppedEntry{}
return droppedEntries
func (t *Tailer) isResponseChanBlocked() bool {
// Thread-safety: len() and cap() on a channel are thread-safe. The cap() doesn't
// change over the time, while len() does.
return len(t.responseChan) == cap(t.responseChan)
}
func (t *Tailer) getResponseChan() <-chan *TailResponse {
@ -262,18 +263,38 @@ func newTailer(
historicEntries iter.EntryIterator,
tailDisconnectedIngesters func([]string) (map[string]logproto.Querier_TailClient, error),
tailMaxDuration time.Duration,
waitEntryThrottle time.Duration,
) *Tailer {
t := Tailer{
openStreamIterator: iter.NewHeapIterator([]iter.EntryIterator{historicEntries}, logproto.FORWARD),
querierTailClients: querierTailClients,
delayFor: delayFor,
responseChan: make(chan *TailResponse, bufferSizeForTailResponse),
responseChan: make(chan *TailResponse, maxBufferedTailResponses),
closeErrChan: make(chan error),
tailDisconnectedIngesters: tailDisconnectedIngesters,
tailMaxDuration: tailMaxDuration,
waitEntryThrottle: waitEntryThrottle,
}
t.readTailClients()
go t.loop()
return &t
}
func dropEntry(droppedEntries []droppedEntry, timestamp time.Time, labels string) []droppedEntry {
if len(droppedEntries) >= maxDroppedEntriesPerTailResponse {
return droppedEntries
}
return append(droppedEntries, droppedEntry{timestamp, labels})
}
func dropEntries(droppedEntries []droppedEntry, streams []logproto.Stream) []droppedEntry {
for _, stream := range streams {
for _, entry := range stream.Entries {
droppedEntries = dropEntry(droppedEntries, entry.Timestamp, entry.Line)
}
}
return droppedEntries
}

@ -0,0 +1,10 @@
package querier
import "github.com/grafana/loki/pkg/logproto"
func mockTailResponse(stream *logproto.Stream) *logproto.TailResponse {
return &logproto.TailResponse{
Stream: stream,
DroppedStreams: []*logproto.DroppedStream{},
}
}

@ -0,0 +1,252 @@
package querier
import (
"errors"
"testing"
"time"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
const (
timeout = 1 * time.Second
throttle = 10 * time.Millisecond
)
func TestTailer(t *testing.T) {
t.Parallel()
tests := map[string]struct {
historicEntries iter.EntryIterator
tailClient *tailClientMock
tester func(t *testing.T, tailer *Tailer, tailClient *tailClientMock)
}{
"tail logs from historic entries only (no tail clients provided)": {
historicEntries: mockStreamIterator(1, 2),
tailClient: nil,
tester: func(t *testing.T, tailer *Tailer, tailClient *tailClientMock) {
responses, err := readFromTailer(tailer, 2)
require.NoError(t, err)
actual := flattenStreamsFromResponses(responses)
assert.Equal(t, []logproto.Stream{
*mockStream(1, 1),
*mockStream(2, 1),
}, actual)
},
},
"tail logs from tail clients only (no historic entries provided)": {
historicEntries: mockStreamIterator(0, 0),
tailClient: newTailClientMock().mockRecvWithTrigger(mockTailResponse(mockStream(1, 1))),
tester: func(t *testing.T, tailer *Tailer, tailClient *tailClientMock) {
tailClient.triggerRecv()
responses, err := readFromTailer(tailer, 1)
require.NoError(t, err)
actual := flattenStreamsFromResponses(responses)
assert.Equal(t, []logproto.Stream{
*mockStream(1, 1),
}, actual)
},
},
"tail logs both from historic entries and tail clients": {
historicEntries: mockStreamIterator(1, 2),
tailClient: newTailClientMock().mockRecvWithTrigger(mockTailResponse(mockStream(3, 1))),
tester: func(t *testing.T, tailer *Tailer, tailClient *tailClientMock) {
tailClient.triggerRecv()
responses, err := readFromTailer(tailer, 3)
require.NoError(t, err)
actual := flattenStreamsFromResponses(responses)
assert.Equal(t, []logproto.Stream{
*mockStream(1, 1),
*mockStream(2, 1),
*mockStream(3, 1),
}, actual)
},
},
"honor max entries per tail response": {
historicEntries: mockStreamIterator(1, maxEntriesPerTailResponse+1),
tailClient: nil,
tester: func(t *testing.T, tailer *Tailer, tailClient *tailClientMock) {
responses, err := readFromTailer(tailer, maxEntriesPerTailResponse+1)
require.NoError(t, err)
require.Equal(t, 2, len(responses))
assert.Equal(t, maxEntriesPerTailResponse, countEntriesInStreams(responses[0].Streams))
assert.Equal(t, 1, countEntriesInStreams(responses[1].Streams))
assert.Equal(t, 0, len(responses[1].DroppedEntries))
},
},
"honor max buffered tail responses": {
historicEntries: mockStreamIterator(1, (maxEntriesPerTailResponse*maxBufferedTailResponses)+5),
tailClient: newTailClientMock().mockRecvWithTrigger(mockTailResponse(mockStream(1, 1))),
tester: func(t *testing.T, tailer *Tailer, tailClient *tailClientMock) {
err := waitUntilTailerOpenStreamsHaveBeenConsumed(tailer)
require.NoError(t, err)
// Since the response channel is full/blocked, we do expect that all responses
// are "full" and extra entries from historic entries have been dropped
responses, err := readFromTailer(tailer, (maxEntriesPerTailResponse * maxBufferedTailResponses))
require.NoError(t, err)
require.Equal(t, maxBufferedTailResponses, len(responses))
for i := 0; i < maxBufferedTailResponses; i++ {
assert.Equal(t, maxEntriesPerTailResponse, countEntriesInStreams(responses[i].Streams))
assert.Equal(t, 0, len(responses[1].DroppedEntries))
}
// Since we'll not receive dropped entries until the next tail response, we're now
// going to trigger a Recv() from the tail client
tailClient.triggerRecv()
responses, err = readFromTailer(tailer, 1)
require.NoError(t, err)
require.Equal(t, 1, len(responses))
assert.Equal(t, 1, countEntriesInStreams(responses[0].Streams))
assert.Equal(t, 5, len(responses[0].DroppedEntries))
},
},
"honor max dropped entries per tail response": {
historicEntries: mockStreamIterator(1, (maxEntriesPerTailResponse*maxBufferedTailResponses)+maxDroppedEntriesPerTailResponse+5),
tailClient: newTailClientMock().mockRecvWithTrigger(mockTailResponse(mockStream(1, 1))),
tester: func(t *testing.T, tailer *Tailer, tailClient *tailClientMock) {
err := waitUntilTailerOpenStreamsHaveBeenConsumed(tailer)
require.NoError(t, err)
// Since the response channel is full/blocked, we do expect that all responses
// are "full" and extra entries from historic entries have been dropped
responses, err := readFromTailer(tailer, (maxEntriesPerTailResponse * maxBufferedTailResponses))
require.NoError(t, err)
require.Equal(t, maxBufferedTailResponses, len(responses))
for i := 0; i < maxBufferedTailResponses; i++ {
assert.Equal(t, maxEntriesPerTailResponse, countEntriesInStreams(responses[i].Streams))
assert.Equal(t, 0, len(responses[1].DroppedEntries))
}
// Since we'll not receive dropped entries until the next tail response, we're now
// going to trigger a Recv() from the tail client
tailClient.triggerRecv()
responses, err = readFromTailer(tailer, 1)
require.NoError(t, err)
require.Equal(t, 1, len(responses))
assert.Equal(t, 1, countEntriesInStreams(responses[0].Streams))
assert.Equal(t, maxDroppedEntriesPerTailResponse, len(responses[0].DroppedEntries))
},
},
}
for testName, test := range tests {
t.Run(testName, func(t *testing.T) {
tailDisconnectedIngesters := func([]string) (map[string]logproto.Querier_TailClient, error) {
return map[string]logproto.Querier_TailClient{}, nil
}
tailClients := map[string]logproto.Querier_TailClient{}
if test.tailClient != nil {
tailClients["test"] = test.tailClient
}
tailer := newTailer(0, tailClients, test.historicEntries, tailDisconnectedIngesters, timeout, throttle)
defer tailer.close()
test.tester(t, tailer, test.tailClient)
})
}
}
func readFromTailer(tailer *Tailer, maxEntries int) ([]*TailResponse, error) {
responses := make([]*TailResponse, 0)
entriesCount := 0
// Ensure we do not wait indefinitely
timeoutTicker := time.NewTicker(timeout)
defer timeoutTicker.Stop()
for !tailer.stopped && entriesCount < maxEntries {
select {
case <-timeoutTicker.C:
return nil, errors.New("timeout expired while reading responses from Tailer")
case response := <-tailer.getResponseChan():
responses = append(responses, response)
entriesCount += countEntriesInStreams(response.Streams)
default:
time.Sleep(throttle)
}
}
return responses, nil
}
func waitUntilTailerOpenStreamsHaveBeenConsumed(tailer *Tailer) error {
// Ensure we do not wait indefinitely
timeoutTicker := time.NewTicker(timeout)
defer timeoutTicker.Stop()
for {
if isTailerOpenStreamsConsumed(tailer) {
return nil
}
select {
case <-timeoutTicker.C:
return errors.New("timeout expired while reading responses from Tailer")
default:
time.Sleep(throttle)
}
}
}
// isTailerOpenStreamsConsumed returns whether the input Tailer has fully
// consumed all streams from the openStreamIterator, which means the
// Tailer.loop() is now throttling
func isTailerOpenStreamsConsumed(tailer *Tailer) bool {
tailer.streamMtx.Lock()
defer tailer.streamMtx.Unlock()
return tailer.openStreamIterator.Len() == 0 || tailer.openStreamIterator.Peek() == time.Unix(0, 0)
}
func countEntriesInStreams(streams []logproto.Stream) int {
count := 0
for _, stream := range streams {
count += len(stream.Entries)
}
return count
}
// flattenStreamsFromResponses returns an array of streams each one containing
// one and only one entry from the input list of responses. This function is used
// to abstract away implementation details in the Tailer when testing for the output
// regardless how the responses have been generated (ie. multiple entries grouped
// into the same stream)
func flattenStreamsFromResponses(responses []*TailResponse) []logproto.Stream {
result := make([]logproto.Stream, 0)
for _, response := range responses {
for _, stream := range response.Streams {
for _, entry := range stream.Entries {
result = append(result, logproto.Stream{
Entries: []logproto.Entry{entry},
Labels: stream.Labels,
})
}
}
}
return result
}
Loading…
Cancel
Save