Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/querier/tail_test.go

254 lines
8.3 KiB

package querier
import (
"errors"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/iter"
loghttp "github.com/grafana/loki/pkg/loghttp/legacy"
"github.com/grafana/loki/pkg/logproto"
)
const (
timeout = 1 * time.Second
throttle = 10 * time.Millisecond
)
func TestTailer(t *testing.T) {
t.Parallel()
tests := map[string]struct {
historicEntries iter.EntryIterator
tailClient *tailClientMock
tester func(t *testing.T, tailer *Tailer, tailClient *tailClientMock)
}{
"tail logs from historic entries only (no tail clients provided)": {
historicEntries: mockStreamIterator(1, 2),
tailClient: nil,
tester: func(t *testing.T, tailer *Tailer, tailClient *tailClientMock) {
responses, err := readFromTailer(tailer, 2)
require.NoError(t, err)
actual := flattenStreamsFromResponses(responses)
assert.Equal(t, []logproto.Stream{
mockStream(1, 1),
mockStream(2, 1),
}, actual)
},
},
"tail logs from tail clients only (no historic entries provided)": {
historicEntries: mockStreamIterator(0, 0),
tailClient: newTailClientMock().mockRecvWithTrigger(mockTailResponse(mockStream(1, 1))),
tester: func(t *testing.T, tailer *Tailer, tailClient *tailClientMock) {
tailClient.triggerRecv()
responses, err := readFromTailer(tailer, 1)
require.NoError(t, err)
actual := flattenStreamsFromResponses(responses)
assert.Equal(t, []logproto.Stream{
mockStream(1, 1),
}, actual)
},
},
"tail logs both from historic entries and tail clients": {
historicEntries: mockStreamIterator(1, 2),
tailClient: newTailClientMock().mockRecvWithTrigger(mockTailResponse(mockStream(3, 1))),
tester: func(t *testing.T, tailer *Tailer, tailClient *tailClientMock) {
tailClient.triggerRecv()
responses, err := readFromTailer(tailer, 3)
require.NoError(t, err)
actual := flattenStreamsFromResponses(responses)
assert.Equal(t, []logproto.Stream{
mockStream(1, 1),
mockStream(2, 1),
mockStream(3, 1),
}, actual)
},
},
"honor max entries per tail response": {
historicEntries: mockStreamIterator(1, maxEntriesPerTailResponse+1),
tailClient: nil,
tester: func(t *testing.T, tailer *Tailer, tailClient *tailClientMock) {
responses, err := readFromTailer(tailer, maxEntriesPerTailResponse+1)
require.NoError(t, err)
require.Equal(t, 2, len(responses))
assert.Equal(t, maxEntriesPerTailResponse, countEntriesInStreams(responses[0].Streams))
assert.Equal(t, 1, countEntriesInStreams(responses[1].Streams))
assert.Equal(t, 0, len(responses[1].DroppedEntries))
},
},
"honor max buffered tail responses": {
historicEntries: mockStreamIterator(1, (maxEntriesPerTailResponse*maxBufferedTailResponses)+5),
tailClient: newTailClientMock().mockRecvWithTrigger(mockTailResponse(mockStream(1, 1))),
tester: func(t *testing.T, tailer *Tailer, tailClient *tailClientMock) {
err := waitUntilTailerOpenStreamsHaveBeenConsumed(tailer)
require.NoError(t, err)
// Since the response channel is full/blocked, we do expect that all responses
// are "full" and extra entries from historic entries have been dropped
responses, err := readFromTailer(tailer, (maxEntriesPerTailResponse * maxBufferedTailResponses))
require.NoError(t, err)
require.Equal(t, maxBufferedTailResponses, len(responses))
for i := 0; i < maxBufferedTailResponses; i++ {
assert.Equal(t, maxEntriesPerTailResponse, countEntriesInStreams(responses[i].Streams))
assert.Equal(t, 0, len(responses[1].DroppedEntries))
}
// Since we'll not receive dropped entries until the next tail response, we're now
// going to trigger a Recv() from the tail client
tailClient.triggerRecv()
responses, err = readFromTailer(tailer, 1)
require.NoError(t, err)
require.Equal(t, 1, len(responses))
assert.Equal(t, 1, countEntriesInStreams(responses[0].Streams))
assert.Equal(t, 5, len(responses[0].DroppedEntries))
},
},
"honor max dropped entries per tail response": {
historicEntries: mockStreamIterator(1, (maxEntriesPerTailResponse*maxBufferedTailResponses)+maxDroppedEntriesPerTailResponse+5),
tailClient: newTailClientMock().mockRecvWithTrigger(mockTailResponse(mockStream(1, 1))),
tester: func(t *testing.T, tailer *Tailer, tailClient *tailClientMock) {
err := waitUntilTailerOpenStreamsHaveBeenConsumed(tailer)
require.NoError(t, err)
// Since the response channel is full/blocked, we do expect that all responses
// are "full" and extra entries from historic entries have been dropped
responses, err := readFromTailer(tailer, (maxEntriesPerTailResponse * maxBufferedTailResponses))
require.NoError(t, err)
require.Equal(t, maxBufferedTailResponses, len(responses))
for i := 0; i < maxBufferedTailResponses; i++ {
assert.Equal(t, maxEntriesPerTailResponse, countEntriesInStreams(responses[i].Streams))
assert.Equal(t, 0, len(responses[1].DroppedEntries))
}
// Since we'll not receive dropped entries until the next tail response, we're now
// going to trigger a Recv() from the tail client
tailClient.triggerRecv()
responses, err = readFromTailer(tailer, 1)
require.NoError(t, err)
require.Equal(t, 1, len(responses))
assert.Equal(t, 1, countEntriesInStreams(responses[0].Streams))
assert.Equal(t, maxDroppedEntriesPerTailResponse, len(responses[0].DroppedEntries))
},
},
}
for testName, test := range tests {
t.Run(testName, func(t *testing.T) {
tailDisconnectedIngesters := func([]string) (map[string]logproto.Querier_TailClient, error) {
return map[string]logproto.Querier_TailClient{}, nil
}
tailClients := map[string]logproto.Querier_TailClient{}
if test.tailClient != nil {
tailClients["test"] = test.tailClient
}
tailer := newTailer(0, tailClients, test.historicEntries, tailDisconnectedIngesters, timeout, throttle, NewMetrics(nil))
defer tailer.close()
test.tester(t, tailer, test.tailClient)
})
}
}
func readFromTailer(tailer *Tailer, maxEntries int) ([]*loghttp.TailResponse, error) {
responses := make([]*loghttp.TailResponse, 0)
entriesCount := 0
// Ensure we do not wait indefinitely
timeoutTicker := time.NewTicker(timeout)
defer timeoutTicker.Stop()
for !tailer.stopped && entriesCount < maxEntries {
select {
case <-timeoutTicker.C:
return nil, errors.New("timeout expired while reading responses from Tailer")
case response := <-tailer.getResponseChan():
responses = append(responses, response)
entriesCount += countEntriesInStreams(response.Streams)
default:
time.Sleep(throttle)
}
}
return responses, nil
}
func waitUntilTailerOpenStreamsHaveBeenConsumed(tailer *Tailer) error {
// Ensure we do not wait indefinitely
timeoutTicker := time.NewTicker(timeout)
defer timeoutTicker.Stop()
for {
if isTailerOpenStreamsConsumed(tailer) {
return nil
}
select {
case <-timeoutTicker.C:
return errors.New("timeout expired while reading responses from Tailer")
default:
time.Sleep(throttle)
}
}
}
// isTailerOpenStreamsConsumed returns whether the input Tailer has fully
// consumed all streams from the openStreamIterator, which means the
// Tailer.loop() is now throttling
func isTailerOpenStreamsConsumed(tailer *Tailer) bool {
tailer.streamMtx.Lock()
defer tailer.streamMtx.Unlock()
return tailer.openStreamIterator.IsEmpty() || tailer.openStreamIterator.Peek() == time.Unix(0, 0)
}
func countEntriesInStreams(streams []logproto.Stream) int {
count := 0
for _, stream := range streams {
count += len(stream.Entries)
}
return count
}
// flattenStreamsFromResponses returns an array of streams each one containing
// one and only one entry from the input list of responses. This function is used
// to abstract away implementation details in the Tailer when testing for the output
// regardless how the responses have been generated (ie. multiple entries grouped
// into the same stream)
func flattenStreamsFromResponses(responses []*loghttp.TailResponse) []logproto.Stream {
result := make([]logproto.Stream, 0)
for _, response := range responses {
for _, stream := range response.Streams {
for _, entry := range stream.Entries {
result = append(result, logproto.Stream{
Entries: []logproto.Entry{entry},
Labels: stream.Labels,
})
}
}
}
return result
}