@ -17,20 +17,18 @@ import logging
from twisted . internet import defer
from synapse . util import unwrapFirstError
from synapse . util . logcontext import PreserveLoggingContext
from synapse . metrics . background_process_metrics import run_as_background_process
from synapse . util . logcontext import make_deferred_yieldable , run_in_background
logger = logging . getLogger ( __name__ )
def user_left_room ( distributor , user , room_id ) :
with PreserveLoggingContext ( ) :
distributor . fire ( " user_left_room " , user = user , room_id = room_id )
distributor . fire ( " user_left_room " , user = user , room_id = room_id )
def user_joined_room ( distributor , user , room_id ) :
with PreserveLoggingContext ( ) :
distributor . fire ( " user_joined_room " , user = user , room_id = room_id )
distributor . fire ( " user_joined_room " , user = user , room_id = room_id )
class Distributor ( object ) :
@ -44,9 +42,7 @@ class Distributor(object):
model will do for today .
"""
def __init__ ( self , suppress_failures = True ) :
self . suppress_failures = suppress_failures
def __init__ ( self ) :
self . signals = { }
self . pre_registration = { }
@ -56,7 +52,6 @@ class Distributor(object):
self . signals [ name ] = Signal (
name ,
suppress_failures = self . suppress_failures ,
)
if name in self . pre_registration :
@ -82,7 +77,11 @@ class Distributor(object):
if name not in self . signals :
raise KeyError ( " %r does not have a signal named %s " % ( self , name ) )
return self . signals [ name ] . fire ( * args , * * kwargs )
run_as_background_process (
name ,
self . signals [ name ] . fire ,
* args , * * kwargs
)
class Signal ( object ) :
@ -95,9 +94,8 @@ class Signal(object):
method into all of the observers .
"""
def __init__ ( self , name , suppress_failures ) :
def __init__ ( self , name ) :
self . name = name
self . suppress_failures = suppress_failures
self . observers = [ ]
def observe ( self , observer ) :
@ -107,7 +105,6 @@ class Signal(object):
Each observer callable may return a Deferred . """
self . observers . append ( observer )
@defer . inlineCallbacks
def fire ( self , * args , * * kwargs ) :
""" Invokes every callable in the observer list, passing in the args and
kwargs . Exceptions thrown by observers are logged but ignored . It is
@ -125,22 +122,17 @@ class Signal(object):
failure . type ,
failure . value ,
failure . getTracebackObject ( ) ) )
if not self . suppress_failures :
return failure
return defer . maybeDeferred ( observer , * args , * * kwargs ) . addErrback ( eb )
with PreserveLoggingContext ( ) :
deferreds = [
do ( observer )
for observer in self . observers
]
res = yield defer . gatherResults (
deferreds , consumeErrors = True
) . addErrback ( unwrapFirstError )
deferreds = [
run_in_background ( do , o )
for o in self . observers
]
defer . returnValue ( res )
return make_deferred_yieldable ( defer . gatherResults (
deferreds , consumeErrors = True ,
) )
def __repr__ ( self ) :
return " <Signal name= %r > " % ( self . name , )