From 1c90b9c57f6d46a565d2d0c00c4d1284cf04430b Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Tue, 7 Sep 2021 16:13:07 -0400 Subject: [PATCH] remove empty streams after wal replay (#4265) --- pkg/ingester/flush.go | 6 +----- pkg/ingester/instance.go | 9 +++++++++ pkg/ingester/recovery.go | 19 +++++++++++-------- 3 files changed, 21 insertions(+), 13 deletions(-) diff --git a/pkg/ingester/flush.go b/pkg/ingester/flush.go index 92abab92e6..30c202e77e 100644 --- a/pkg/ingester/flush.go +++ b/pkg/ingester/flush.go @@ -327,11 +327,7 @@ func (i *Ingester) removeFlushedChunks(instance *instance, stream *stream, mayRe i.replayController.Sub(int64(subtracted)) if mayRemoveStream && len(stream.chunks) == 0 { - delete(instance.streamsByFP, stream.fp) - delete(instance.streams, stream.labelsString) - instance.index.Delete(stream.labels, stream.fp) - instance.streamsRemovedTotal.Inc() - memoryStreams.WithLabelValues(instance.instanceID).Dec() + instance.removeStream(stream) } } diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index f6cf1d2234..d0566a5182 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -273,6 +273,15 @@ func (i *instance) getOrCreateStream(pushReqStream logproto.Stream, lock bool, r return stream, nil } +// removeStream removes a stream from the instance. The streamsMtx must be held. +func (i *instance) removeStream(s *stream) { + delete(i.streamsByFP, s.fp) + delete(i.streams, s.labelsString) + i.index.Delete(s.labels, s.fp) + i.streamsRemovedTotal.Inc() + memoryStreams.WithLabelValues(i.instanceID).Dec() +} + func (i *instance) getHashForLabels(ls labels.Labels) model.Fingerprint { var fp uint64 fp, i.buf = ls.HashWithoutLabels(i.buf, []string(nil)...) diff --git a/pkg/ingester/recovery.go b/pkg/ingester/recovery.go index 96c63c6881..f572c595f4 100644 --- a/pkg/ingester/recovery.go +++ b/pkg/ingester/recovery.go @@ -220,9 +220,17 @@ func (r *ingesterRecoverer) Close() { for _, inst := range r.ing.getInstances() { inst.forAllStreams(context.Background(), func(s *stream) error { + s.chunkMtx.Lock() + defer s.chunkMtx.Unlock() + // reset all the incrementing stream counters after a successful WAL replay. s.resetCounter() + if len(s.chunks) == 0 { + inst.removeStream(s) + return nil + } + // If we've replayed a WAL with unordered writes, but the new // configuration disables them, convert all streams/head blocks // to ensure unordered writes are disabled after the replay, @@ -232,14 +240,9 @@ func (r *ingesterRecoverer) Close() { s.unorderedWrites = isAllowed if !isAllowed && old { - - s.chunkMtx.Lock() - defer s.chunkMtx.Unlock() - if len(s.chunks) > 0 { - err := s.chunks[len(s.chunks)-1].chunk.ConvertHead(headBlockType(isAllowed)) - if err != nil { - return err - } + err := s.chunks[len(s.chunks)-1].chunk.ConvertHead(headBlockType(isAllowed)) + if err != nil { + return err } }