@ -35,7 +35,6 @@ from twisted.internet.protocol import ReconnectingClientFactory
from synapse . metrics import LaterGauge
from synapse . metrics import LaterGauge
from synapse . metrics . background_process_metrics import run_as_background_process
from synapse . metrics . background_process_metrics import run_as_background_process
from synapse . replication . tcp . client import DirectTcpReplicationClientFactory
from synapse . replication . tcp . commands import (
from synapse . replication . tcp . commands import (
ClearUserSyncsCommand ,
ClearUserSyncsCommand ,
Command ,
Command ,
@ -332,46 +331,31 @@ class ReplicationCommandHandler:
def start_replication ( self , hs : " HomeServer " ) - > None :
def start_replication ( self , hs : " HomeServer " ) - > None :
""" Helper method to start replication. """
""" Helper method to start replication. """
if hs . config . redis . redis_enabled :
from synapse . replication . tcp . redis import RedisDirectTcpReplicationClientFactory
from synapse . replication . tcp . redis import (
RedisDirectTcpReplicationClientFactory ,
)
# First let's ensure that we have a ReplicationStreamer started.
# First let's ensure that we have a ReplicationStreamer started.
hs . get_replication_streamer ( )
hs . get_replication_streamer ( )
# We need two connections to redis, one for the subscription stream and
# We need two connections to redis, one for the subscription stream and
# one to send commands to (as you can't send further redis commands to a
# one to send commands to (as you can't send further redis commands to a
# connection after SUBSCRIBE is called).
# connection after SUBSCRIBE is called).
# First create the connection for sending commands.
# First create the connection for sending commands.
outbound_redis_connection = hs . get_outbound_redis_connection ( )
outbound_redis_connection = hs . get_outbound_redis_connection ( )
# Now create the factory/connection for the subscription stream.
# Now create the factory/connection for the subscription stream.
self . _factory = RedisDirectTcpReplicationClientFactory (
self . _factory = RedisDirectTcpReplicationClientFactory (
hs ,
hs ,
outbound_redis_connection ,
outbound_redis_connection ,
channel_names = self . _channels_to_subscribe_to ,
channel_names = self . _channels_to_subscribe_to ,
)
)
hs . get_reactor ( ) . connectTCP (
hs . get_reactor ( ) . connectTCP (
hs . config . redis . redis_host ,
hs . config . redis . redis_host ,
hs . config . redis . redis_port ,
hs . config . redis . redis_port ,
self . _factory ,
self . _factory ,
timeout = 30 ,
timeout = 30 ,
bindAddress = None ,
bindAddress = None ,
)
)
else :
client_name = hs . get_instance_name ( )
self . _factory = DirectTcpReplicationClientFactory ( hs , client_name , self )
host = hs . config . worker . worker_replication_host
port = hs . config . worker . worker_replication_port
hs . get_reactor ( ) . connectTCP (
host ,
port ,
self . _factory ,
timeout = 30 ,
bindAddress = None ,
)
def get_streams ( self ) - > Dict [ str , Stream ] :
def get_streams ( self ) - > Dict [ str , Stream ] :
""" Get a map from stream name to all streams. """
""" Get a map from stream name to all streams. """