From 922babb15796bfa74d1953d978d86d1b30cd3d6b Mon Sep 17 00:00:00 2001 From: Christian Inkster Date: Thu, 22 Aug 2024 23:01:33 +0800 Subject: [PATCH] Alerting: Add mutex to Redis HA subs (#89870) --- pkg/services/ngalert/notifier/redis_peer.go | 53 ++++++++++++++++----- 1 file changed, 41 insertions(+), 12 deletions(-) diff --git a/pkg/services/ngalert/notifier/redis_peer.go b/pkg/services/ngalert/notifier/redis_peer.go index 5a5ad3f8cfc..11b8c5d6d1c 100644 --- a/pkg/services/ngalert/notifier/redis_peer.go +++ b/pkg/services/ngalert/notifier/redis_peer.go @@ -64,6 +64,7 @@ type redisPeer struct { states map[string]alertingCluster.State subs map[string]*redis.PubSub statesMtx sync.RWMutex + subsMtx sync.RWMutex readyc chan struct{} shutdownc chan struct{} @@ -225,8 +226,10 @@ func newRedisPeer(cfg redisConfig, logger log.Logger, reg prometheus.Registerer, p.nodePingDuration = nodePingDuration p.nodePingFailures = nodePingFailures + p.subsMtx.Lock() p.subs[fullStateChannel] = p.redis.Subscribe(context.Background(), p.withPrefix(fullStateChannel)) p.subs[fullStateChannelReq] = p.redis.Subscribe(context.Background(), p.withPrefix(fullStateChannelReq)) + p.subsMtx.Unlock() go p.heartbeatLoop() go p.membersSyncLoop() @@ -461,7 +464,9 @@ func (p *redisPeer) AddState(key string, state alertingCluster.State, _ promethe // As we also want to get the state from other nodes, we subscribe to the key. sub := p.redis.Subscribe(context.Background(), p.withPrefix(key)) go p.receiveLoop(sub) + p.subsMtx.Lock() p.subs[key] = sub + p.subsMtx.Unlock() return newRedisChannel(p, key, p.withPrefix(key), update) } @@ -507,17 +512,29 @@ func (p *redisPeer) fullStateReqReceiveLoop() { select { case <-p.shutdownc: return - case data := <-p.subs[fullStateChannelReq].Channel(): - // The payload of a full state request is the name of the peer that is - // requesting the full state. In case we received our own request, we - // can just ignore it. Redis pub/sub fanouts to all clients, regardless - // if a client was also the publisher. - if data.Payload == p.name { + default: + p.subsMtx.RLock() + sub, ok := p.subs[fullStateChannelReq] + p.subsMtx.RUnlock() + + if !ok { + time.Sleep(waitForMsgIdle) continue } - p.fullStateSyncPublish() - default: - time.Sleep(waitForMsgIdle) + + select { + case data := <-sub.Channel(): + // The payload of a full state request is the name of the peer that is + // requesting the full state. In case we received our own request, we + // can just ignore it. Redis pub/sub fanouts to all clients, regardless + // if a client was also the publisher. + if data.Payload == p.name { + continue + } + p.fullStateSyncPublish() + default: + time.Sleep(waitForMsgIdle) + } } } } @@ -527,10 +544,22 @@ func (p *redisPeer) fullStateSyncReceiveLoop() { select { case <-p.shutdownc: return - case data := <-p.subs[fullStateChannel].Channel(): - p.mergeFullState([]byte(data.Payload)) default: - time.Sleep(waitForMsgIdle) + p.subsMtx.RLock() + sub, ok := p.subs[fullStateChannel] + p.subsMtx.RUnlock() + + if !ok { + time.Sleep(waitForMsgIdle) + continue + } + + select { + case data := <-sub.Channel(): + p.mergeFullState([]byte(data.Payload)) + default: + time.Sleep(waitForMsgIdle) + } } } }