|
|
|
@ -16,6 +16,7 @@ |
|
|
|
|
from twisted.internet import defer, reactor |
|
|
|
|
|
|
|
|
|
from synapse.util.logutils import log_function |
|
|
|
|
from synapse.util.logcontext import PreserveLoggingContext |
|
|
|
|
|
|
|
|
|
import logging |
|
|
|
|
|
|
|
|
@ -79,6 +80,8 @@ class Notifier(object): |
|
|
|
|
|
|
|
|
|
self.event_sources = hs.get_event_sources() |
|
|
|
|
|
|
|
|
|
self.clock = hs.get_clock() |
|
|
|
|
|
|
|
|
|
hs.get_distributor().observe( |
|
|
|
|
"user_joined_room", self._user_joined_room |
|
|
|
|
) |
|
|
|
@ -127,9 +130,10 @@ class Notifier(object): |
|
|
|
|
def eb(failure): |
|
|
|
|
logger.exception("Failed to notify listener", failure) |
|
|
|
|
|
|
|
|
|
yield defer.DeferredList( |
|
|
|
|
[notify(l).addErrback(eb) for l in listeners] |
|
|
|
|
) |
|
|
|
|
with PreserveLoggingContext(): |
|
|
|
|
yield defer.DeferredList( |
|
|
|
|
[notify(l).addErrback(eb) for l in listeners] |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
@defer.inlineCallbacks |
|
|
|
|
@log_function |
|
|
|
@ -175,9 +179,10 @@ class Notifier(object): |
|
|
|
|
failure.getTracebackObject()) |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
yield defer.DeferredList( |
|
|
|
|
[notify(l).addErrback(eb) for l in listeners] |
|
|
|
|
) |
|
|
|
|
with PreserveLoggingContext(): |
|
|
|
|
yield defer.DeferredList( |
|
|
|
|
[notify(l).addErrback(eb) for l in listeners] |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
def get_events_for(self, user, rooms, pagination_config, timeout): |
|
|
|
|
""" For the given user and rooms, return any new events for them. If |
|
|
|
@ -206,29 +211,28 @@ class Notifier(object): |
|
|
|
|
timeout, |
|
|
|
|
deferred, |
|
|
|
|
) |
|
|
|
|
def _timeout_listener(): |
|
|
|
|
# TODO (erikj): We should probably set to_token to the current |
|
|
|
|
# max rather than reusing from_token. |
|
|
|
|
listener.notify( |
|
|
|
|
self, |
|
|
|
|
[], |
|
|
|
|
listener.from_token, |
|
|
|
|
listener.from_token, |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
if timeout: |
|
|
|
|
reactor.callLater(timeout/1000.0, self._timeout_listener, listener) |
|
|
|
|
self.clock.call_later(timeout/1000.0, _timeout_listener) |
|
|
|
|
|
|
|
|
|
self._register_with_keys(listener) |
|
|
|
|
|
|
|
|
|
yield self._check_for_updates(listener) |
|
|
|
|
|
|
|
|
|
if not timeout: |
|
|
|
|
self._timeout_listener(listener) |
|
|
|
|
_timeout_listener() |
|
|
|
|
|
|
|
|
|
return |
|
|
|
|
|
|
|
|
|
def _timeout_listener(self, listener): |
|
|
|
|
# TODO (erikj): We should probably set to_token to the current max |
|
|
|
|
# rather than reusing from_token. |
|
|
|
|
listener.notify( |
|
|
|
|
self, |
|
|
|
|
[], |
|
|
|
|
listener.from_token, |
|
|
|
|
listener.from_token, |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
@log_function |
|
|
|
|
def _register_with_keys(self, listener): |
|
|
|
|
for room in listener.rooms: |
|
|
|
|