@ -22,10 +22,14 @@
# Imports required for the default HomeServer() implementation
import abc
import functools
import logging
import os
from typing import Any , Callable , Dict , List , Optional , TypeVar , cast
import twisted
from twisted . mail . smtp import sendmail
from twisted . web . iweb import IPolicyForHTTPS
from synapse . api . auth import Auth
from synapse . api . filtering import Filtering
@ -65,6 +69,7 @@ from synapse.handlers.events import EventHandler, EventStreamHandler
from synapse . handlers . groups_local import GroupsLocalHandler , GroupsLocalWorkerHandler
from synapse . handlers . initial_sync import InitialSyncHandler
from synapse . handlers . message import EventCreationHandler , MessageHandler
from synapse . handlers . oidc_handler import OidcHandler
from synapse . handlers . pagination import PaginationHandler
from synapse . handlers . password_policy import PasswordPolicyHandler
from synapse . handlers . presence import PresenceHandler
@ -80,6 +85,7 @@ from synapse.handlers.room import (
from synapse . handlers . room_list import RoomListHandler
from synapse . handlers . room_member import RoomMemberMasterHandler
from synapse . handlers . room_member_worker import RoomMemberWorkerHandler
from synapse . handlers . saml_handler import SamlHandler
from synapse . handlers . set_password import SetPasswordHandler
from synapse . handlers . stats import StatsHandler
from synapse . handlers . sync import SyncHandler
@ -93,7 +99,7 @@ from synapse.push.pusherpool import PusherPool
from synapse . replication . tcp . client import ReplicationDataHandler
from synapse . replication . tcp . handler import ReplicationCommandHandler
from synapse . replication . tcp . resource import ReplicationStreamer
from synapse . replication . tcp . streams import STREAMS_MAP
from synapse . replication . tcp . streams import STREAMS_MAP , Stream
from synapse . rest . media . v1 . media_repository import (
MediaRepository ,
MediaRepositoryResource ,
@ -107,6 +113,7 @@ from synapse.server_notices.worker_server_notices_sender import (
from synapse . state import StateHandler , StateResolutionHandler
from synapse . storage import Databases , DataStore , Storage
from synapse . streams . events import EventSources
from synapse . types import DomainSpecificString
from synapse . util import Clock
from synapse . util . distributor import Distributor
from synapse . util . stringutils import random_string
@ -114,23 +121,58 @@ from synapse.util.stringutils import random_string
logger = logging . getLogger ( __name__ )
class HomeServer ( object ) :
T = TypeVar ( " T " , bound = Callable [ . . . , Any ] )
def cache_in_self ( builder : T ) - > T :
""" Wraps a function called e.g. `get_foo`, checking if `self.foo` exists and
returning if so . If not , calls the given function and sets ` self . foo ` to it .
Also ensures that dependency cycles throw an exception correctly , rather
than overflowing the stack .
"""
if not builder . __name__ . startswith ( " get_ " ) :
raise Exception (
" @cache_in_self can only be used on functions starting with `get_` "
)
depname = builder . __name__ [ len ( " get_ " ) : ]
building = [ False ]
@functools . wraps ( builder )
def _get ( self ) :
try :
return getattr ( self , depname )
except AttributeError :
pass
# Prevent cyclic dependencies from deadlocking
if building [ 0 ] :
raise ValueError ( " Cyclic dependency while building %s " % ( depname , ) )
building [ 0 ] = True
try :
dep = builder ( self )
setattr ( self , depname , dep )
finally :
building [ 0 ] = False
return dep
return cast ( T , _get )
class HomeServer ( metaclass = abc . ABCMeta ) :
""" A basic homeserver object without lazy component builders.
This will need all of the components it requires to either be passed as
constructor arguments , or the relevant methods overriding to create them .
Typically this would only be used for unit tests .
For every dependency in the DEPENDENCIES list below , this class creates one
method ,
def get_DEPENDENCY ( self )
which returns the value of that dependency . If no value has yet been set
nor was provided to the constructor , it will attempt to call a lazy builder
method called
def build_DEPENDENCY ( self )
which must be implemented by the subclass . This code may call any of the
required " get " methods on the instance to obtain the sub - dependencies that
one requires .
Dependencies should be added by creating a ` def get_ < depname > ( self ) `
function , wrapping it in ` @cache_in_self ` .
Attributes :
config ( synapse . config . homeserver . HomeserverConfig ) :
@ -138,86 +180,6 @@ class HomeServer(object):
we are listening on to provide HTTP services .
"""
__metaclass__ = abc . ABCMeta
DEPENDENCIES = [
" http_client " ,
" federation_client " ,
" federation_server " ,
" handlers " ,
" auth " ,
" room_creation_handler " ,
" room_shutdown_handler " ,
" state_handler " ,
" state_resolution_handler " ,
" presence_handler " ,
" sync_handler " ,
" typing_handler " ,
" room_list_handler " ,
" acme_handler " ,
" auth_handler " ,
" device_handler " ,
" stats_handler " ,
" e2e_keys_handler " ,
" e2e_room_keys_handler " ,
" event_handler " ,
" event_stream_handler " ,
" initial_sync_handler " ,
" application_service_api " ,
" application_service_scheduler " ,
" application_service_handler " ,
" device_message_handler " ,
" profile_handler " ,
" event_creation_handler " ,
" deactivate_account_handler " ,
" set_password_handler " ,
" notifier " ,
" event_sources " ,
" keyring " ,
" pusherpool " ,
" event_builder_factory " ,
" filtering " ,
" http_client_context_factory " ,
" simple_http_client " ,
" proxied_http_client " ,
" media_repository " ,
" media_repository_resource " ,
" federation_transport_client " ,
" federation_sender " ,
" receipts_handler " ,
" macaroon_generator " ,
" tcp_replication " ,
" read_marker_handler " ,
" action_generator " ,
" user_directory_handler " ,
" groups_local_handler " ,
" groups_server_handler " ,
" groups_attestation_signing " ,
" groups_attestation_renewer " ,
" secrets " ,
" spam_checker " ,
" third_party_event_rules " ,
" room_member_handler " ,
" federation_registry " ,
" server_notices_manager " ,
" server_notices_sender " ,
" message_handler " ,
" pagination_handler " ,
" room_context_handler " ,
" sendmail " ,
" registration_handler " ,
" account_validity_handler " ,
" cas_handler " ,
" saml_handler " ,
" oidc_handler " ,
" event_client_serializer " ,
" password_policy_handler " ,
" storage " ,
" replication_streamer " ,
" replication_data_handler " ,
" replication_streams " ,
]
REQUIRED_ON_MASTER_STARTUP = [ " user_directory_handler " , " stats_handler " ]
# This is overridden in derived application classes
@ -232,16 +194,17 @@ class HomeServer(object):
config : The full config for the homeserver .
"""
if not reactor :
from twisted . internet import reactor
from twisted . internet import reactor as _reactor
reactor = _reactor
self . _reactor = reactor
self . hostname = hostname
# the key we use to sign events and requests
self . signing_key = config . key . signing_key [ 0 ]
self . config = config
self . _building = { }
self . _listening_services = [ ]
self . start_time = None
self . _listening_services = [ ] # type: List[twisted.internet.tcp.Port]
self . start_time = None # type: Optional[int]
self . _instance_id = random_string ( 5 )
self . _instance_name = config . worker_name or " master "
@ -255,13 +218,13 @@ class HomeServer(object):
burst_count = config . rc_registration . burst_count ,
)
self . datastores = None
self . datastores = None # type: Optional[Databases]
# Other kwargs are explicit dependencies
for depname in kwargs :
setattr ( self , depname , kwargs [ depname ] )
def get_instance_id ( self ) :
def get_instance_id ( self ) - > str :
""" A unique ID for this synapse process instance.
This is used to distinguish running instances in worker - based
@ -277,13 +240,13 @@ class HomeServer(object):
"""
return self . _instance_name
def setup ( self ) :
def setup ( self ) - > None :
logger . info ( " Setting up. " )
self . start_time = int ( self . get_clock ( ) . time ( ) )
self . datastores = Databases ( self . DATASTORE_CLASS , self )
logger . info ( " Finished setting up. " )
def setup_master ( self ) :
def setup_master ( self ) - > None :
"""
Some handlers have side effects on instantiation ( like registering
background updates ) . This function causes them to be fetched , and
@ -292,192 +255,242 @@ class HomeServer(object):
for i in self . REQUIRED_ON_MASTER_STARTUP :
getattr ( self , " get_ " + i ) ( )
def get_reactor ( self ) :
def get_reactor ( self ) - > twisted . internet . base . ReactorBase :
"""
Fetch the Twisted reactor in use by this HomeServer .
"""
return self . _reactor
def get_ip_from_request ( self , request ) :
def get_ip_from_request ( self , request ) - > str :
# X-Forwarded-For is handled by our custom request type.
return request . getClientIP ( )
def is_mine ( self , domain_specific_string ) :
def is_mine ( self , domain_specific_string : DomainSpecificString ) - > bool :
return domain_specific_string . domain == self . hostname
def is_mine_id ( self , string ) :
def is_mine_id ( self , string : str ) - > bool :
return string . split ( " : " , 1 ) [ 1 ] == self . hostname
def get_clock ( self ) :
def get_clock ( self ) - > Clock :
return self . clock
def get_datastore ( self ) - > DataStore :
if not self . datastores :
raise Exception ( " HomeServer.setup must be called before getting datastores " )
return self . datastores . main
def get_datastores ( self ) :
def get_datastores ( self ) - > Databases :
if not self . datastores :
raise Exception ( " HomeServer.setup must be called before getting datastores " )
return self . datastores
def get_config ( self ) :
def get_config ( self ) - > HomeServerConfig :
return self . config
def get_distributor ( self ) :
def get_distributor ( self ) - > Distributor :
return self . distributor
def get_registration_ratelimiter ( self ) - > Ratelimiter :
return self . registration_ratelimiter
def build_federation_client ( self ) :
@cache_in_self
def get_federation_client ( self ) - > FederationClient :
return FederationClient ( self )
def build_federation_server ( self ) :
@cache_in_self
def get_federation_server ( self ) - > FederationServer :
return FederationServer ( self )
def build_handlers ( self ) :
@cache_in_self
def get_handlers ( self ) - > Handlers :
return Handlers ( self )
def build_notifier ( self ) :
@cache_in_self
def get_notifier ( self ) - > Notifier :
return Notifier ( self )
def build_auth ( self ) :
@cache_in_self
def get_auth ( self ) - > Auth :
return Auth ( self )
def build_http_client_context_factory ( self ) :
@cache_in_self
def get_http_client_context_factory ( self ) - > IPolicyForHTTPS :
return (
InsecureInterceptableContextFactory ( )
if self . config . use_insecure_ssl_client_just_for_testing_do_not_use
else RegularPolicyForHTTPS ( )
)
def build_simple_http_client ( self ) :
@cache_in_self
def get_simple_http_client ( self ) - > SimpleHttpClient :
return SimpleHttpClient ( self )
def build_proxied_http_client ( self ) :
@cache_in_self
def get_proxied_http_client ( self ) - > SimpleHttpClient :
return SimpleHttpClient (
self ,
http_proxy = os . getenvb ( b " http_proxy " ) ,
https_proxy = os . getenvb ( b " HTTPS_PROXY " ) ,
)
def build_room_creation_handler ( self ) :
@cache_in_self
def get_room_creation_handler ( self ) - > RoomCreationHandler :
return RoomCreationHandler ( self )
def build_room_shutdown_handler ( self ) :
@cache_in_self
def get_room_shutdown_handler ( self ) - > RoomShutdownHandler :
return RoomShutdownHandler ( self )
def build_sendmail ( self ) :
@cache_in_self
def get_sendmail ( self ) - > sendmail :
return sendmail
def build_state_handler ( self ) :
@cache_in_self
def get_state_handler ( self ) - > StateHandler :
return StateHandler ( self )
def build_state_resolution_handler ( self ) :
@cache_in_self
def get_state_resolution_handler ( self ) - > StateResolutionHandler :
return StateResolutionHandler ( self )
def build_presence_handler ( self ) :
@cache_in_self
def get_presence_handler ( self ) - > PresenceHandler :
return PresenceHandler ( self )
def build_typing_handler ( self ) :
@cache_in_self
def get_typing_handler ( self ) :
if self . config . worker . writers . typing == self . get_instance_name ( ) :
return TypingWriterHandler ( self )
else :
return FollowerTypingHandler ( self )
def build_sync_handler ( self ) :
@cache_in_self
def get_sync_handler ( self ) - > SyncHandler :
return SyncHandler ( self )
def build_room_list_handler ( self ) :
@cache_in_self
def get_room_list_handler ( self ) - > RoomListHandler :
return RoomListHandler ( self )
def build_auth_handler ( self ) :
@cache_in_self
def get_auth_handler ( self ) - > AuthHandler :
return AuthHandler ( self )
def build_macaroon_generator ( self ) :
@cache_in_self
def get_macaroon_generator ( self ) - > MacaroonGenerator :
return MacaroonGenerator ( self )
def build_device_handler ( self ) :
@cache_in_self
def get_device_handler ( self ) :
if self . config . worker_app :
return DeviceWorkerHandler ( self )
else :
return DeviceHandler ( self )
def build_device_message_handler ( self ) :
@cache_in_self
def get_device_message_handler ( self ) - > DeviceMessageHandler :
return DeviceMessageHandler ( self )
def build_e2e_keys_handler ( self ) :
@cache_in_self
def get_e2e_keys_handler ( self ) - > E2eKeysHandler :
return E2eKeysHandler ( self )
def build_e2e_room_keys_handler ( self ) :
@cache_in_self
def get_e2e_room_keys_handler ( self ) - > E2eRoomKeysHandler :
return E2eRoomKeysHandler ( self )
def build_acme_handler ( self ) :
@cache_in_self
def get_acme_handler ( self ) - > AcmeHandler :
return AcmeHandler ( self )
def build_application_service_api ( self ) :
@cache_in_self
def get_application_service_api ( self ) - > ApplicationServiceApi :
return ApplicationServiceApi ( self )
def build_application_service_scheduler ( self ) :
@cache_in_self
def get_application_service_scheduler ( self ) - > ApplicationServiceScheduler :
return ApplicationServiceScheduler ( self )
def build_application_service_handler ( self ) :
@cache_in_self
def get_application_service_handler ( self ) - > ApplicationServicesHandler :
return ApplicationServicesHandler ( self )
def build_event_handler ( self ) :
@cache_in_self
def get_event_handler ( self ) - > EventHandler :
return EventHandler ( self )
def build_event_stream_handler ( self ) :
@cache_in_self
def get_event_stream_handler ( self ) - > EventStreamHandler :
return EventStreamHandler ( self )
def build_initial_sync_handler ( self ) :
@cache_in_self
def get_initial_sync_handler ( self ) - > InitialSyncHandler :
return InitialSyncHandler ( self )
def build_profile_handler ( self ) :
@cache_in_self
def get_profile_handler ( self ) :
if self . config . worker_app :
return BaseProfileHandler ( self )
else :
return MasterProfileHandler ( self )
def build_event_creation_handler ( self ) :
@cache_in_self
def get_event_creation_handler ( self ) - > EventCreationHandler :
return EventCreationHandler ( self )
def build_deactivate_account_handler ( self ) :
@cache_in_self
def get_deactivate_account_handler ( self ) - > DeactivateAccountHandler :
return DeactivateAccountHandler ( self )
def build_set_password_handler ( self ) :
@cache_in_self
def get_set_password_handler ( self ) - > SetPasswordHandler :
return SetPasswordHandler ( self )
def build_event_sources ( self ) :
@cache_in_self
def get_event_sources ( self ) - > EventSources :
return EventSources ( self )
def build_keyring ( self ) :
@cache_in_self
def get_keyring ( self ) - > Keyring :
return Keyring ( self )
def build_event_builder_factory ( self ) :
@cache_in_self
def get_event_builder_factory ( self ) - > EventBuilderFactory :
return EventBuilderFactory ( self )
def build_filtering ( self ) :
@cache_in_self
def get_filtering ( self ) - > Filtering :
return Filtering ( self )
def build_pusherpool ( self ) :
@cache_in_self
def get_pusherpool ( self ) - > PusherPool :
return PusherPool ( self )
def build_http_client ( self ) :
@cache_in_self
def get_http_client ( self ) - > MatrixFederationHttpClient :
tls_client_options_factory = context_factory . FederationPolicyForHTTPS (
self . config
)
return MatrixFederationHttpClient ( self , tls_client_options_factory )
def build_media_repository_resource ( self ) :
@cache_in_self
def get_media_repository_resource ( self ) - > MediaRepositoryResource :
# build the media repo resource. This indirects through the HomeServer
# to ensure that we only have a single instance of
return MediaRepositoryResource ( self )
def build_media_repository ( self ) :
@cache_in_self
def get_media_repository ( self ) - > MediaRepository :
return MediaRepository ( self )
def build_federation_transport_client ( self ) :
@cache_in_self
def get_federation_transport_client ( self ) - > TransportLayerClient :
return TransportLayerClient ( self )
def build_federation_sender ( self ) :
@cache_in_self
def get_federation_sender ( self ) :
if self . should_send_federation ( ) :
return FederationSender ( self )
elif not self . config . worker_app :
@ -485,156 +498,150 @@ class HomeServer(object):
else :
raise Exception ( " Workers cannot send federation traffic " )
def build_receipts_handler ( self ) :
@cache_in_self
def get_receipts_handler ( self ) - > ReceiptsHandler :
return ReceiptsHandler ( self )
def build_read_marker_handler ( self ) :
@cache_in_self
def get_read_marker_handler ( self ) - > ReadMarkerHandler :
return ReadMarkerHandler ( self )
def build_tcp_replication ( self ) :
@cache_in_self
def get_tcp_replication ( self ) - > ReplicationCommandHandler :
return ReplicationCommandHandler ( self )
def build_action_generator ( self ) :
@cache_in_self
def get_action_generator ( self ) - > ActionGenerator :
return ActionGenerator ( self )
def build_user_directory_handler ( self ) :
@cache_in_self
def get_user_directory_handler ( self ) - > UserDirectoryHandler :
return UserDirectoryHandler ( self )
def build_groups_local_handler ( self ) :
@cache_in_self
def get_groups_local_handler ( self ) :
if self . config . worker_app :
return GroupsLocalWorkerHandler ( self )
else :
return GroupsLocalHandler ( self )
def build_groups_server_handler ( self ) :
@cache_in_self
def get_groups_server_handler ( self ) :
if self . config . worker_app :
return GroupsServerWorkerHandler ( self )
else :
return GroupsServerHandler ( self )
def build_groups_attestation_signing ( self ) :
@cache_in_self
def get_groups_attestation_signing ( self ) - > GroupAttestationSigning :
return GroupAttestationSigning ( self )
def build_groups_attestation_renewer ( self ) :
@cache_in_self
def get_groups_attestation_renewer ( self ) - > GroupAttestionRenewer :
return GroupAttestionRenewer ( self )
def build_secrets ( self ) :
@cache_in_self
def get_secrets ( self ) - > Secrets :
return Secrets ( )
def build_stats_handler ( self ) :
@cache_in_self
def get_stats_handler ( self ) - > StatsHandler :
return StatsHandler ( self )
def build_spam_checker ( self ) :
@cache_in_self
def get_spam_checker ( self ) :
return SpamChecker ( self )
def build_third_party_event_rules ( self ) :
@cache_in_self
def get_third_party_event_rules ( self ) - > ThirdPartyEventRules :
return ThirdPartyEventRules ( self )
def build_room_member_handler ( self ) :
@cache_in_self
def get_room_member_handler ( self ) :
if self . config . worker_app :
return RoomMemberWorkerHandler ( self )
return RoomMemberMasterHandler ( self )
def build_federation_registry ( self ) :
@cache_in_self
def get_federation_registry ( self ) - > FederationHandlerRegistry :
return FederationHandlerRegistry ( self )
def build_server_notices_manager ( self ) :
@cache_in_self
def get_server_notices_manager ( self ) :
if self . config . worker_app :
raise Exception ( " Workers cannot send server notices " )
return ServerNoticesManager ( self )
def build_server_notices_sender ( self ) :
@cache_in_self
def get_server_notices_sender ( self ) :
if self . config . worker_app :
return WorkerServerNoticesSender ( self )
return ServerNoticesSender ( self )
def build_message_handler ( self ) :
@cache_in_self
def get_message_handler ( self ) - > MessageHandler :
return MessageHandler ( self )
def build_pagination_handler ( self ) :
@cache_in_self
def get_pagination_handler ( self ) - > PaginationHandler :
return PaginationHandler ( self )
def build_room_context_handler ( self ) :
@cache_in_self
def get_room_context_handler ( self ) - > RoomContextHandler :
return RoomContextHandler ( self )
def build_registration_handler ( self ) :
@cache_in_self
def get_registration_handler ( self ) - > RegistrationHandler :
return RegistrationHandler ( self )
def build_account_validity_handler ( self ) :
@cache_in_self
def get_account_validity_handler ( self ) - > AccountValidityHandler :
return AccountValidityHandler ( self )
def build_cas_handler ( self ) :
@cache_in_self
def get_cas_handler ( self ) - > CasHandler :
return CasHandler ( self )
def build_saml_handler ( self ) :
from synapse . handlers . saml_handler import SamlHandler
@cache_in_self
def get_saml_handler ( self ) - > SamlHandler :
return SamlHandler ( self )
def build_oidc_handler ( self ) :
from synapse . handlers . oidc_handler import OidcHandler
@cache_in_self
def get_oidc_handler ( self ) - > OidcHandler :
return OidcHandler ( self )
def build_event_client_serializer ( self ) :
@cache_in_self
def get_event_client_serializer ( self ) - > EventClientSerializer :
return EventClientSerializer ( self )
def build_password_policy_handler ( self ) :
@cache_in_self
def get_password_policy_handler ( self ) - > PasswordPolicyHandler :
return PasswordPolicyHandler ( self )
def build_storage ( self ) - > Storage :
return Storage ( self , self . datastores )
@cache_in_self
def get_storage ( self ) - > Storage :
return Storage ( self , self . get_datastores ( ) )
def build_replication_streamer ( self ) - > ReplicationStreamer :
@cache_in_self
def get_replication_streamer ( self ) - > ReplicationStreamer :
return ReplicationStreamer ( self )
def build_replication_data_handler ( self ) :
@cache_in_self
def get_replication_data_handler ( self ) - > ReplicationDataHandler :
return ReplicationDataHandler ( self )
def build_replication_streams ( self ) :
@cache_in_self
def get_replication_streams ( self ) - > Dict [ str , Stream ] :
return { stream . NAME : stream ( self ) for stream in STREAMS_MAP . values ( ) }
def remove_pusher ( self , app_id , push_key , user_id ) :
return self . get_pusherpool ( ) . remove_pusher ( app_id , push_key , user_id )
async def remove_pusher ( self , app_id : str , push_key : str , user_id : str ) :
return await self . get_pusherpool ( ) . remove_pusher ( app_id , push_key , user_id )
def should_send_federation ( self ) :
def should_send_federation ( self ) - > bool :
" Should this server be sending federation traffic directly? "
return self . config . send_federation and (
not self . config . worker_app
or self . config . worker_app == " synapse.app.federation_sender "
)
def _make_dependency_method ( depname ) :
def _get ( hs ) :
try :
return getattr ( hs , depname )
except AttributeError :
pass
try :
builder = getattr ( hs , " build_ %s " % ( depname ) )
except AttributeError :
raise NotImplementedError (
" %s has no %s nor a builder for it " % ( type ( hs ) . __name__ , depname )
)
# Prevent cyclic dependencies from deadlocking
if depname in hs . _building :
raise ValueError ( " Cyclic dependency while building %s " % ( depname , ) )
hs . _building [ depname ] = 1
try :
dep = builder ( )
setattr ( hs , depname , dep )
finally :
del hs . _building [ depname ]
return dep
setattr ( HomeServer , " get_ %s " % ( depname ) , _get )
# Build magic accessors for every dependency
for depname in HomeServer . DEPENDENCIES :
_make_dependency_method ( depname )