remove empty streams after wal replay (#4265)

pull/4322/head
Owen Diehl 4 years ago committed by GitHub
parent 58219ff464
commit 1c90b9c57f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      pkg/ingester/flush.go
  2. 9
      pkg/ingester/instance.go
  3. 19
      pkg/ingester/recovery.go

@ -327,11 +327,7 @@ func (i *Ingester) removeFlushedChunks(instance *instance, stream *stream, mayRe
i.replayController.Sub(int64(subtracted))
if mayRemoveStream && len(stream.chunks) == 0 {
delete(instance.streamsByFP, stream.fp)
delete(instance.streams, stream.labelsString)
instance.index.Delete(stream.labels, stream.fp)
instance.streamsRemovedTotal.Inc()
memoryStreams.WithLabelValues(instance.instanceID).Dec()
instance.removeStream(stream)
}
}

@ -273,6 +273,15 @@ func (i *instance) getOrCreateStream(pushReqStream logproto.Stream, lock bool, r
return stream, nil
}
// removeStream removes a stream from the instance. The streamsMtx must be held.
func (i *instance) removeStream(s *stream) {
delete(i.streamsByFP, s.fp)
delete(i.streams, s.labelsString)
i.index.Delete(s.labels, s.fp)
i.streamsRemovedTotal.Inc()
memoryStreams.WithLabelValues(i.instanceID).Dec()
}
func (i *instance) getHashForLabels(ls labels.Labels) model.Fingerprint {
var fp uint64
fp, i.buf = ls.HashWithoutLabels(i.buf, []string(nil)...)

@ -220,9 +220,17 @@ func (r *ingesterRecoverer) Close() {
for _, inst := range r.ing.getInstances() {
inst.forAllStreams(context.Background(), func(s *stream) error {
s.chunkMtx.Lock()
defer s.chunkMtx.Unlock()
// reset all the incrementing stream counters after a successful WAL replay.
s.resetCounter()
if len(s.chunks) == 0 {
inst.removeStream(s)
return nil
}
// If we've replayed a WAL with unordered writes, but the new
// configuration disables them, convert all streams/head blocks
// to ensure unordered writes are disabled after the replay,
@ -232,14 +240,9 @@ func (r *ingesterRecoverer) Close() {
s.unorderedWrites = isAllowed
if !isAllowed && old {
s.chunkMtx.Lock()
defer s.chunkMtx.Unlock()
if len(s.chunks) > 0 {
err := s.chunks[len(s.chunks)-1].chunk.ConvertHead(headBlockType(isAllowed))
if err != nil {
return err
}
err := s.chunks[len(s.chunks)-1].chunk.ConvertHead(headBlockType(isAllowed))
if err != nil {
return err
}
}

Loading…
Cancel
Save