|
|
|
@ -73,8 +73,8 @@ class ReplicationStreamer(object): |
|
|
|
|
# Current connections. |
|
|
|
|
self.connections = [] |
|
|
|
|
|
|
|
|
|
l = LaterGauge("synapse_replication_tcp_resource_total_connections", "", [], lambda: len(self.connections)) |
|
|
|
|
l.register() |
|
|
|
|
LaterGauge("synapse_replication_tcp_resource_total_connections", "", [], |
|
|
|
|
lambda: len(self.connections)) |
|
|
|
|
|
|
|
|
|
# List of streams that clients can subscribe to. |
|
|
|
|
# We only support federation stream if federation sending hase been |
|
|
|
@ -87,14 +87,15 @@ class ReplicationStreamer(object): |
|
|
|
|
self.streams_by_name = {stream.NAME: stream for stream in self.streams} |
|
|
|
|
|
|
|
|
|
LaterGauge( |
|
|
|
|
"synapse_replication_tcp_resource_connections_per_stream", "", ["stream_name"], |
|
|
|
|
"synapse_replication_tcp_resource_connections_per_stream", "", |
|
|
|
|
["stream_name"], |
|
|
|
|
lambda: { |
|
|
|
|
(stream_name,): len([ |
|
|
|
|
conn for conn in self.connections |
|
|
|
|
if stream_name in conn.replication_streams |
|
|
|
|
]) |
|
|
|
|
for stream_name in self.streams_by_name |
|
|
|
|
}).register() |
|
|
|
|
}) |
|
|
|
|
|
|
|
|
|
self.federation_sender = None |
|
|
|
|
if not hs.config.send_federation: |
|
|
|
|