chore(ui): Only rejoin new nodes and add back instrumentation (#16445)

pull/16409/head^2
Cyril Tovena 3 months ago committed by GitHub
parent 6c0f67dc17
commit 7b3f829592
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 6
      docs/sources/shared/configuration.md
  2. 4
      go.mod
  3. 4
      go.sum
  4. 2
      pkg/loki/modules.go
  5. 4
      pkg/ui/config.go
  6. 51
      pkg/ui/discovery.go
  7. 42
      pkg/ui/service.go
  8. 2
      production/helm/loki/README.md
  9. 2
      production/helm/loki/values.yaml
  10. 22
      vendor/github.com/grafana/ckit/metrics.go
  11. 57
      vendor/github.com/grafana/ckit/node.go
  12. 4
      vendor/modules.txt

@ -124,7 +124,7 @@ ui:
# How frequently to rejoin the cluster to address split brain issues.
# CLI flag: -ui.rejoin-interval
[rejoin_interval: <duration> | default = 15s]
[rejoin_interval: <duration> | default = 3m]
# Number of initial peers to join from the discovered set.
# CLI flag: -ui.cluster-max-join-peers
@ -138,6 +138,10 @@ ui:
# CLI flag: -ui.enable-ipv6
[enable_ipv6: <boolean> | default = false]
# Enable debug logging for the UI.
# CLI flag: -ui.debug
[debug: <boolean> | default = false]
discovery:
# List of peers to join the cluster. Supports multiple values separated by
# commas. Each value can be a hostname, an IP address, or a DNS name (A/AAAA

@ -50,7 +50,7 @@ require (
github.com/google/uuid v1.6.0
github.com/gorilla/mux v1.8.1
github.com/gorilla/websocket v1.5.3
github.com/grafana/ckit v0.0.0-20250109002736-4ca45886e452
github.com/grafana/ckit v0.0.0-20250226083311-4f9f4aacabb5
github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2
github.com/grafana/dskit v0.0.0-20241007172036-53283a0f6b41
github.com/grafana/go-gelf/v2 v2.0.1
@ -318,7 +318,7 @@ require (
github.com/hashicorp/go-rootcerts v1.0.2 // indirect
github.com/hashicorp/go-sockaddr v1.0.7 // indirect
github.com/hashicorp/go-uuid v1.0.3 // indirect
github.com/hashicorp/memberlist v0.5.2 // indirect
github.com/hashicorp/memberlist v0.5.3 // indirect
github.com/hashicorp/serf v0.10.1 // indirect
github.com/huandu/xstrings v1.5.0 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect

@ -621,8 +621,8 @@ github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/z
github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/grafana/ckit v0.0.0-20250109002736-4ca45886e452 h1:d/pdVKdLSNUfHUlWsN39OqUI94XgWKOGJdi568yOXmc=
github.com/grafana/ckit v0.0.0-20250109002736-4ca45886e452/go.mod h1:x6HpYv0+NXPJRBbDYA40IcxWHvrrKwgrMe1Mue172wE=
github.com/grafana/ckit v0.0.0-20250226083311-4f9f4aacabb5 h1:EkW+rjr8zqiB4Jd7Gn5BmUhDz6PsZ0w33/4osKRd5x8=
github.com/grafana/ckit v0.0.0-20250226083311-4f9f4aacabb5/go.mod h1:izhHi8mZ16lxMxsdlFjPHzkopbjKNdorTtitYyzAejY=
github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2 h1:qhugDMdQ4Vp68H0tp/0iN17DM2ehRo1rLEdOFe/gB8I=
github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2/go.mod h1:w/aiO1POVIeXUQyl0VQSZjl5OAGDTL5aX+4v0RA1tcw=
github.com/grafana/dskit v0.0.0-20241007172036-53283a0f6b41 h1:a4O59OU3FJZ+EJUVnlvvNTvdAc4uRN1P6EaGwqL9CnA=

@ -1954,7 +1954,7 @@ func (t *Loki) initDataObjExplorer() (services.Service, error) {
func (t *Loki) initUI() (services.Service, error) {
t.Cfg.UI = t.Cfg.UI.WithAdvertisePort(t.Cfg.Server.HTTPListenPort)
svc, err := ui.NewService(t.Cfg.UI, t.Server.HTTP, log.With(util_log.Logger, "component", "ui"))
svc, err := ui.NewService(t.Cfg.UI, t.Server.HTTP, log.With(util_log.Logger, "component", "ui"), prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}

@ -21,6 +21,7 @@ type Config struct {
ClusterMaxJoinPeers int `yaml:"cluster_max_join_peers"` // Number of initial peers to join from the discovered set.
ClusterName string `yaml:"cluster_name"` // Name to prevent nodes without this identifier from joining the cluster.
EnableIPv6 bool `yaml:"enable_ipv6"`
Debug bool `yaml:"debug"`
Discovery struct {
JoinPeers []string `yaml:"join_peers"`
} `yaml:"discovery"`
@ -42,11 +43,12 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.Var((*flagext.StringSlice)(&cfg.InfNames), "ui.interface", "Name of network interface to read address from.")
f.StringVar(&cfg.NodeName, "ui.node-name", hostname, "Name to use for this node in the cluster.")
f.StringVar(&cfg.AdvertiseAddr, "ui.advertise-addr", "", "IP address to advertise in the cluster.")
f.DurationVar(&cfg.RejoinInterval, "ui.rejoin-interval", 15*time.Second, "How frequently to rejoin the cluster to address split brain issues.")
f.DurationVar(&cfg.RejoinInterval, "ui.rejoin-interval", 3*time.Minute, "How frequently to rejoin the cluster to address split brain issues.")
f.IntVar(&cfg.ClusterMaxJoinPeers, "ui.cluster-max-join-peers", 3, "Number of initial peers to join from the discovered set.")
f.StringVar(&cfg.ClusterName, "ui.cluster-name", "", "Name to prevent nodes without this identifier from joining the cluster.")
f.BoolVar(&cfg.EnableIPv6, "ui.enable-ipv6", false, "Enable using a IPv6 instance address.")
f.Var((*flagext.StringSlice)(&cfg.Discovery.JoinPeers), "ui.discovery.join-peers", "List of peers to join the cluster. Supports multiple values separated by commas. Each value can be a hostname, an IP address, or a DNS name (A/AAAA and SRV records).")
f.BoolVar(&cfg.Debug, "ui.debug", false, "Enable debug logging for the UI.")
}
func (cfg Config) Validate() error {

@ -12,6 +12,45 @@ import (
)
func (s *Service) getBootstrapPeers() ([]string, error) {
peers, err := s.discoverPeers()
if err != nil {
return nil, err
}
return selectRandomPeers(peers, s.cfg.ClusterMaxJoinPeers), nil
}
func selectRandomPeers(peers []string, maxPeers int) []string {
// Here we return the entire list because we can't take a subset.
if maxPeers == 0 || len(peers) < maxPeers {
return peers
}
// We shuffle the list and return only a subset of the peers.
rand.Shuffle(len(peers), func(i, j int) {
peers[i], peers[j] = peers[j], peers[i]
})
return peers[:maxPeers]
}
func (s *Service) discoverNewPeers(prevPeers map[string]struct{}) ([]string, error) {
peers, err := s.discoverPeers()
if err != nil {
return nil, err
}
// Build list of new peers that weren't in previous list
var newPeers []string
for _, peer := range peers {
if _, ok := prevPeers[peer]; !ok {
newPeers = append(newPeers, peer)
prevPeers[peer] = struct{}{}
}
}
return selectRandomPeers(newPeers, s.cfg.ClusterMaxJoinPeers), nil
}
func (s *Service) discoverPeers() ([]string, error) {
if len(s.cfg.Discovery.JoinPeers) == 0 {
return nil, nil
}
@ -29,17 +68,7 @@ func (s *Service) getBootstrapPeers() ([]string, error) {
}
// Return unique addresses.
peers := uniq(addresses)
// Here we return the entire list because we can't take a subset.
if s.cfg.ClusterMaxJoinPeers == 0 || len(peers) < s.cfg.ClusterMaxJoinPeers {
return peers, nil
}
// We shuffle the list and return only a subset of the peers.
rand.Shuffle(len(peers), func(i, j int) {
peers[i], peers[j] = peers[j], peers[i]
})
return peers[:s.cfg.ClusterMaxJoinPeers], nil
return uniq(addresses), nil
}
func uniq(addresses []string) []string {

@ -18,9 +18,8 @@ import (
"github.com/grafana/ckit/peer"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/net/http2"
util_log "github.com/grafana/loki/v3/pkg/util/log"
)
// This allows to rate limit the number of updates when the cluster is frequently changing (e.g. during rollout).
@ -37,10 +36,11 @@ type Service struct {
cfg Config
logger log.Logger
reg prometheus.Registerer
}
func NewService(cfg Config, router *mux.Router, logger log.Logger) (*Service, error) {
addr, err := ring.GetInstanceAddr(cfg.AdvertiseAddr, cfg.InfNames, util_log.Logger, cfg.EnableIPv6)
func NewService(cfg Config, router *mux.Router, logger log.Logger, reg prometheus.Registerer) (*Service, error) {
addr, err := ring.GetInstanceAddr(cfg.AdvertiseAddr, cfg.InfNames, logger, cfg.EnableIPv6)
if err != nil {
return nil, err
}
@ -54,21 +54,30 @@ func NewService(cfg Config, router *mux.Router, logger log.Logger) (*Service, er
},
},
}
if !cfg.Debug {
logger = level.NewFilter(logger, level.AllowInfo())
}
advertiseAddr := fmt.Sprintf("%s:%d", cfg.AdvertiseAddr, cfg.AdvertisePort)
node, err := ckit.NewNode(httpClient, ckit.Config{
Name: cfg.NodeName,
// TODO(cyriltovena): ckit debug logs are too verbose
Log: level.NewFilter(logger, level.AllowInfo()),
Name: cfg.NodeName,
Log: logger,
AdvertiseAddr: advertiseAddr,
Label: cfg.ClusterName,
})
if err != nil {
return nil, err
}
if reg != nil {
if err := reg.Register(node.Metrics()); err != nil {
return nil, err
}
}
svc := &Service{
cfg: cfg,
logger: logger,
reg: reg,
node: node,
router: router,
client: httpClient,
@ -102,6 +111,10 @@ func (s *Service) run(ctx context.Context) error {
level.Error(s.logger).Log("msg", "failed to bootstrap a fresh cluster with no peers", "err", err)
}
}
newPeers := make(map[string]struct{})
for _, p := range peers {
newPeers[p] = struct{}{}
}
var wg sync.WaitGroup
if s.cfg.RejoinInterval > 0 {
@ -116,15 +129,17 @@ func (s *Service) run(ctx context.Context) error {
case <-ctx.Done():
return
case <-ticker.C:
peers, err := s.getBootstrapPeers()
peers, err := s.discoverNewPeers(newPeers)
if err != nil {
level.Warn(s.logger).Log("msg", "failed to get peers to join; will try again", "err", err)
continue
}
level.Info(s.logger).Log("msg", "rejoining cluster", "peers_count", len(peers))
if err := s.node.Start(peers); err != nil {
level.Warn(s.logger).Log("msg", "failed to connect to peers; will try again", "err", err)
continue
if len(peers) > 0 {
level.Info(s.logger).Log("msg", "rejoining cluster", "peers_count", len(newPeers))
if err := s.node.Start(peers); err != nil {
level.Warn(s.logger).Log("msg", "failed to connect to peers; will try again", "err", err)
continue
}
}
}
}
@ -142,6 +157,9 @@ func (s *Service) stop(_ error) error {
if err := s.node.ChangeState(ctx, peer.StateTerminating); err != nil {
level.Error(s.logger).Log("msg", "failed to change state to terminating", "err", err)
}
if s.reg != nil {
s.reg.Unregister(s.node.Metrics())
}
return s.node.Stop()
}

@ -16,7 +16,7 @@ Helm chart for Grafana Loki and Grafana Enterprise Logs supporting both simple,
|------------|------|---------|
| https://charts.min.io/ | minio(minio) | 5.4.0 |
| https://grafana.github.io/helm-charts | grafana-agent-operator(grafana-agent-operator) | 0.5.1 |
| https://grafana.github.io/helm-charts | rollout_operator(rollout-operator) | 0.23.0 |
| https://grafana.github.io/helm-charts | rollout_operator(rollout-operator) | 0.24.0 |
Find more information in the Loki Helm Chart [documentation](https://grafana.com/docs/loki/next/installation/helm).

@ -286,7 +286,7 @@ loki:
ui:
discovery:
join_peers:
- '{{ include "loki.distributorFullname" . }}.{{ $.Release.Namespace }}.svc.{{ .Values.global.clusterDomain }}'
- '{{ include "loki.queryFrontendFullname" . }}.{{ $.Release.Namespace }}.svc.{{ .Values.global.clusterDomain }}'
{{- end }}
{{- with .Values.loki.querier }}
querier:

@ -25,12 +25,13 @@ const clusterNameLabel = "cluster_name"
type metrics struct {
metricsutil.Container
gossipEventsTotal *prometheus.CounterVec
nodePeers *prometheus.GaugeVec
nodeUpdating prometheus.Gauge
nodeUpdateDuration prometheus.Histogram
nodeObservers prometheus.Gauge
nodeInfo *metricsutil.InfoCollector
gossipEventsTotal *prometheus.CounterVec
gossipBroadcastsTotal *prometheus.CounterVec
nodePeers *prometheus.GaugeVec
nodeUpdating prometheus.Gauge
nodeUpdateDuration prometheus.Histogram
nodeObservers prometheus.Gauge
nodeInfo *metricsutil.InfoCollector
}
var _ prometheus.Collector = (*metrics)(nil)
@ -46,6 +47,14 @@ func newMetrics(clusterName string) *metrics {
},
}, []string{"event"})
m.gossipBroadcastsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "cluster_node_gossip_broadcasts_total",
Help: "Total number of gossip messages broadcasted by the node.",
ConstLabels: prometheus.Labels{
clusterNameLabel: clusterName,
},
}, []string{"event"})
m.nodePeers = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "cluster_node_peers",
Help: "Current number of healthy peers by state",
@ -89,6 +98,7 @@ func newMetrics(clusterName string) *metrics {
m.Add(
m.gossipEventsTotal,
m.gossipBroadcastsTotal,
m.nodePeers,
m.nodeUpdating,
m.nodeUpdateDuration,

@ -268,9 +268,19 @@ func (n *Node) Start(peers []string) error {
return fmt.Errorf("failed to join memberlist: %w", err)
}
// Broadcast our current state of the node to all peers now that our lamport
// clock is roughly synchronized.
return n.broadcastCurrentState()
// Originally, after calling n.ml.Join, we would broadcast a new state
// message (with a new lamport time) with the node's local state. The intent
// was that there might be stale messages about a previous instance of our
// node with different lamport times, and we'd want to correct them by
// broadcasting a message.
//
// This only appeared necessary due to a bug in how we stale messages about
// our own node, and resulted in a lot of extra state messages being sent due
// to nodes calling Start to fix split brain issues.
//
// Now, we'll correct invalid state messages about our node upon receipt; see
// [Node.handleStateMessage] and [nodeDelegate.MergeRemoteState].
return nil
}
func (n *Node) run(ctx context.Context) {
@ -293,31 +303,6 @@ func (n *Node) run(ctx context.Context) {
}
}
// broadcastCurrentState queues a message to send the current state of the node
// to the cluster. This should be done after joining a new set of nodes once
// the lamport clock is synchronized.
func (n *Node) broadcastCurrentState() error {
n.stateMut.RLock()
defer n.stateMut.RUnlock()
stateMsg := messages.State{
NodeName: n.cfg.Name,
NewState: n.localState,
Time: n.clock.Tick(),
}
// Treat the stateMsg as if it was received externally to track our own state
// along with other nodes.
n.handleStateMessage(stateMsg)
bcast, err := messages.Broadcast(&stateMsg, nil)
if err != nil {
return err
}
n.broadcasts.QueueBroadcast(bcast)
return nil
}
// Stop stops the Node, removing it from the cluster. Callers should first
// first transition to StateTerminating to gracefully leave the cluster.
// Observers will no longer be notified about cluster changes after Stop
@ -441,10 +426,13 @@ func (n *Node) changeState(to peer.State, onDone func()) error {
}
// handleStateMessage handles a state message from a peer. Returns true if the
// message hasn't been seen before.
// message hasn't been seen before. The final return parameter will be the
// message to broadcast: if msg is a stale message from a previous instance of
// the local node, final will be an updated message reflecting the node's local
// state.
//
// handleStateMessage must be called with n.stateMut held for reading.
func (n *Node) handleStateMessage(msg messages.State) (newMessage bool) {
func (n *Node) handleStateMessage(msg messages.State) (final messages.State, newMessage bool) {
n.clock.Observe(msg.Time)
n.peerMut.Lock()
@ -453,8 +441,8 @@ func (n *Node) handleStateMessage(msg messages.State) (newMessage bool) {
curr, exist := n.peerStates[msg.NodeName]
if exist && msg.Time <= curr.Time {
// Ignore a state message if we have the same or a newer one.
return false
} else if exist && msg.NodeName == n.cfg.Name {
return curr, false
} else if msg.NodeName == n.cfg.Name {
level.Debug(n.log).Log("msg", "got stale message about self", "msg", msg)
// A peer has a newer message about ourselves, likely from a previous
@ -477,7 +465,7 @@ func (n *Node) handleStateMessage(msg messages.State) (newMessage bool) {
n.handlePeersChanged()
}
return true
return msg, true
}
// Peers returns all Peers currently known by n. The Peers list will include
@ -605,7 +593,7 @@ func (nd *nodeDelegate) NotifyMsg(raw []byte) {
nd.stateMut.RLock()
defer nd.stateMut.RUnlock()
if nd.handleStateMessage(s) {
if s, broadcast := nd.handleStateMessage(s); broadcast {
// We should continue gossiping the message to other peers if we haven't
// seen it before.
//
@ -614,6 +602,7 @@ func (nd *nodeDelegate) NotifyMsg(raw []byte) {
// messages would still converge eventually using push/pulls.
bcast, _ := messages.Broadcast(&s, nil)
nd.broadcasts.QueueBroadcast(bcast)
nd.m.gossipBroadcastsTotal.WithLabelValues(eventStateChange).Inc()
}
default:

@ -995,7 +995,7 @@ github.com/gorilla/mux
# github.com/gorilla/websocket v1.5.3
## explicit; go 1.12
github.com/gorilla/websocket
# github.com/grafana/ckit v0.0.0-20250109002736-4ca45886e452
# github.com/grafana/ckit v0.0.0-20250226083311-4f9f4aacabb5
## explicit; go 1.20
github.com/grafana/ckit
github.com/grafana/ckit/internal/chash
@ -1132,7 +1132,7 @@ github.com/hashicorp/golang-lru/simplelru
github.com/hashicorp/golang-lru/v2
github.com/hashicorp/golang-lru/v2/internal
github.com/hashicorp/golang-lru/v2/simplelru
# github.com/hashicorp/memberlist v0.5.2 => github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe
# github.com/hashicorp/memberlist v0.5.3 => github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe
## explicit; go 1.12
github.com/hashicorp/memberlist
# github.com/hashicorp/serf v0.10.1

Loading…
Cancel
Save