Makes `tailer.droppedStreams` slice bounded. (#5334)

* Makes `tailer.droppedStreams` slice bounded.

For slow receiver, this dropped streams may cause ingester memory to keep increasing.

Sometimes OOMing the ingester.

Signed-off-by: Kaviraj <kavirajkanagaraj@gmail.com>

* Make droppedStreams slice size configurable

Update tests

Adding logging when slice is dropped

Signed-off-by: Kaviraj <kavirajkanagaraj@gmail.com>

* Add test for dropped stream

Signed-off-by: Kaviraj <kavirajkanagaraj@gmail.com>
pull/5352/head
Kaviraj Kanagaraj 4 years ago committed by GitHub
parent a50cac7674
commit 2db309271c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      pkg/ingester/ingester.go
  2. 2
      pkg/ingester/instance_test.go
  3. 2
      pkg/ingester/stream_test.go
  4. 33
      pkg/ingester/tailer.go
  5. 48
      pkg/ingester/tailer_test.go

@ -90,6 +90,8 @@ type Config struct {
Wrapper Wrapper `yaml:"-"`
IndexShards int `yaml:"index_shards"`
MaxDroppedStreams int `yaml:"max_dropped_streams"`
}
// RegisterFlags registers the flags.
@ -113,6 +115,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.QueryStoreMaxLookBackPeriod, "ingester.query-store-max-look-back-period", 0, "How far back should an ingester be allowed to query the store for data, for use only with boltdb-shipper index and filesystem object store. -1 for infinite.")
f.BoolVar(&cfg.AutoForgetUnhealthy, "ingester.autoforget-unhealthy", false, "Enable to remove unhealthy ingesters from the ring after `ring.kvstore.heartbeat_timeout`")
f.IntVar(&cfg.IndexShards, "ingester.index-shards", index.DefaultIndexShards, "Shard factor used in the ingesters for the in process reverse index. This MUST be evenly divisible by ALL schema shard factors or Loki will not start.")
f.IntVar(&cfg.MaxDroppedStreams, "ingester.tailer.max-dropped-streams", 10, "Maximum number of dropped streams to keep in memory during tailing")
}
func (cfg *Config) Validate() error {
@ -811,7 +814,7 @@ func (i *Ingester) Tail(req *logproto.TailRequest, queryServer logproto.Querier_
}
instance := i.GetOrCreateInstance(instanceID)
tailer, err := newTailer(instanceID, req.Query, queryServer)
tailer, err := newTailer(instanceID, req.Query, queryServer, i.cfg.MaxDroppedStreams)
if err != nil {
return err
}

@ -420,7 +420,7 @@ func Benchmark_instance_addNewTailer(b *testing.B) {
ctx := context.Background()
inst := newInstance(&Config{}, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil)
t, err := newTailer("foo", `{namespace="foo",pod="bar",instance=~"10.*"}`, nil)
t, err := newTailer("foo", `{namespace="foo",pod="bar",instance=~"10.*"}`, nil, 10)
require.NoError(b, err)
for i := 0; i < 10000; i++ {
require.NoError(b, inst.Push(ctx, &logproto.PushRequest{

@ -407,7 +407,7 @@ func Benchmark_PushStream(b *testing.B) {
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)
s := newStream(&Config{MaxChunkAge: 24 * time.Hour}, limiter, "fake", model.Fingerprint(0), ls, true, NilMetrics)
t, err := newTailer("foo", `{namespace="loki-dev"}`, &fakeTailServer{})
t, err := newTailer("foo", `{namespace="loki-dev"}`, &fakeTailServer{}, 10)
require.NoError(b, err)
go t.loop()

@ -39,14 +39,15 @@ type tailer struct {
closeChan chan struct{}
closeOnce sync.Once
blockedAt *time.Time
blockedMtx sync.RWMutex
droppedStreams []*logproto.DroppedStream
blockedAt *time.Time
blockedMtx sync.RWMutex
droppedStreams []*logproto.DroppedStream
maxDroppedStreams int
conn TailServer
}
func newTailer(orgID, query string, conn TailServer) (*tailer, error) {
func newTailer(orgID, query string, conn TailServer, maxDroppedStreams int) (*tailer, error) {
expr, err := logql.ParseLogSelector(query, true)
if err != nil {
return nil, err
@ -58,15 +59,16 @@ func newTailer(orgID, query string, conn TailServer) (*tailer, error) {
matchers := expr.Matchers()
return &tailer{
orgID: orgID,
matchers: matchers,
pipeline: pipeline,
sendChan: make(chan *logproto.Stream, bufferSizeForTailResponse),
conn: conn,
droppedStreams: []*logproto.DroppedStream{},
id: generateUniqueID(orgID, query),
closeChan: make(chan struct{}),
expr: expr,
orgID: orgID,
matchers: matchers,
pipeline: pipeline,
sendChan: make(chan *logproto.Stream, bufferSizeForTailResponse),
conn: conn,
droppedStreams: make([]*logproto.DroppedStream, 0, maxDroppedStreams),
maxDroppedStreams: maxDroppedStreams,
id: generateUniqueID(orgID, query),
closeChan: make(chan struct{}),
expr: expr,
}, nil
}
@ -219,6 +221,11 @@ func (t *tailer) dropStream(stream logproto.Stream) {
t.blockedAt = &blockedAt
}
if len(t.droppedStreams) >= t.maxDroppedStreams {
level.Info(util_log.Logger).Log("msg", "tailer dropped streams is reset", "length", len(t.droppedStreams))
t.droppedStreams = nil
}
t.droppedStreams = append(t.droppedStreams, &logproto.DroppedStream{
From: stream.Entries[0].Timestamp,
To: stream.Entries[len(stream.Entries)-1].Timestamp,

@ -26,7 +26,7 @@ func TestTailer_sendRaceConditionOnSendWhileClosing(t *testing.T) {
}
for run := 0; run < runs; run++ {
tailer, err := newTailer("org-id", stream.Labels, nil)
tailer, err := newTailer("org-id", stream.Labels, nil, 10)
require.NoError(t, err)
require.NotNil(t, tailer)
@ -49,13 +49,57 @@ func TestTailer_sendRaceConditionOnSendWhileClosing(t *testing.T) {
}
}
func Test_dropstream(t *testing.T) {
maxDroppedStreams := 10
entry := logproto.Entry{Timestamp: time.Now(), Line: "foo"}
cases := []struct {
name string
drop int
expected int
}{
{
name: "less than maxDroppedStreams",
drop: maxDroppedStreams - 2,
expected: maxDroppedStreams - 2,
},
{
name: "equal to maxDroppedStreams",
drop: maxDroppedStreams,
expected: maxDroppedStreams,
},
{
name: "greater than maxDroppedStreams",
drop: maxDroppedStreams + 2,
expected: 2, // should be bounded to maxDroppedStreams
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
tail, err := newTailer("foo", `{app="foo"} |= "foo"`, &fakeTailServer{}, maxDroppedStreams)
require.NoError(t, err)
for i := 0; i < c.drop; i++ {
tail.dropStream(logproto.Stream{
Entries: []logproto.Entry{
entry,
},
})
}
assert.Equal(t, c.expected, len(tail.droppedStreams))
})
}
}
type fakeTailServer struct{}
func (f *fakeTailServer) Send(*logproto.TailResponse) error { return nil }
func (f *fakeTailServer) Context() context.Context { return context.Background() }
func Test_TailerSendRace(t *testing.T) {
tail, err := newTailer("foo", `{app="foo"} |= "foo"`, &fakeTailServer{})
tail, err := newTailer("foo", `{app="foo"} |= "foo"`, &fakeTailServer{}, 10)
require.NoError(t, err)
var wg sync.WaitGroup

Loading…
Cancel
Save