From 210dc355cb77f45cbe3fde9fc4d7daaeedbacd26 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Mon, 17 May 2021 10:54:48 -0400 Subject: [PATCH] Fixes a goroutine leak in the store when doing cancellation. (#3733) Found out that we might have no listener and trying to send the information of cancellation. I've moved the error information down the respective log and sample iterators so that we don't need to send it via the channel. This seems to have cause some pretty big memory leak on our side. Signed-off-by: Cyril Tovena --- pkg/storage/batch.go | 33 ++++++++++++++++++++------------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/pkg/storage/batch.go b/pkg/storage/batch.go index aa492d7613..32b35be6e3 100644 --- a/pkg/storage/batch.go +++ b/pkg/storage/batch.go @@ -128,25 +128,18 @@ func newBatchChunkIterator( func (it *batchChunkIterator) Start() { if !it.begun { it.begun = true - go it.loop(it.ctx) + go it.loop() } } -func (it *batchChunkIterator) loop(ctx context.Context) { - defer func() { - if p := recover(); p != nil { - it.next <- &chunkBatch{err: errors.Errorf("panic while fecthing chunks %+v", p)} - close(it.next) - } - }() +func (it *batchChunkIterator) loop() { for { if it.chunks.Len() == 0 { close(it.next) return } select { - case <-ctx.Done(): - it.next <- &chunkBatch{err: ctx.Err()} + case <-it.ctx.Done(): close(it.next) return case it.next <- it.nextBatch(): @@ -159,7 +152,15 @@ func (it *batchChunkIterator) Next() *chunkBatch { return <-it.next } -func (it *batchChunkIterator) nextBatch() *chunkBatch { +func (it *batchChunkIterator) nextBatch() (res *chunkBatch) { + defer func() { + if p := recover(); p != nil { + level.Error(util_log.Logger).Log("msg", "panic while fetching chunks", "panic", p) + res = &chunkBatch{ + err: errors.Errorf("panic while fecthing chunks %+v", p), + } + } + }() // the first chunk of the batch headChunk := it.chunks.Peek() from, through := it.start, it.end @@ -329,9 +330,12 @@ func (it *logBatchIterator) Error() error { if it.err != nil { return it.err } - if it.curr != nil { + if it.curr != nil && it.curr.Error() != nil { return it.curr.Error() } + if it.ctx.Err() != nil { + return it.ctx.Err() + } return nil } @@ -464,9 +468,12 @@ func (it *sampleBatchIterator) Error() error { if it.err != nil { return it.err } - if it.curr != nil { + if it.curr != nil && it.curr.Error() != nil { return it.curr.Error() } + if it.ctx.Err() != nil { + return it.ctx.Err() + } return nil }