|
|
|
@ -396,31 +396,30 @@ class Notifier: |
|
|
|
|
|
|
|
|
|
Will wake up all listeners for the given users and rooms. |
|
|
|
|
""" |
|
|
|
|
with PreserveLoggingContext(): |
|
|
|
|
with Measure(self.clock, "on_new_event"): |
|
|
|
|
user_streams = set() |
|
|
|
|
with Measure(self.clock, "on_new_event"): |
|
|
|
|
user_streams = set() |
|
|
|
|
|
|
|
|
|
for user in users: |
|
|
|
|
user_stream = self.user_to_user_stream.get(str(user)) |
|
|
|
|
if user_stream is not None: |
|
|
|
|
user_streams.add(user_stream) |
|
|
|
|
for user in users: |
|
|
|
|
user_stream = self.user_to_user_stream.get(str(user)) |
|
|
|
|
if user_stream is not None: |
|
|
|
|
user_streams.add(user_stream) |
|
|
|
|
|
|
|
|
|
for room in rooms: |
|
|
|
|
user_streams |= self.room_to_user_streams.get(room, set()) |
|
|
|
|
for room in rooms: |
|
|
|
|
user_streams |= self.room_to_user_streams.get(room, set()) |
|
|
|
|
|
|
|
|
|
time_now_ms = self.clock.time_msec() |
|
|
|
|
for user_stream in user_streams: |
|
|
|
|
try: |
|
|
|
|
user_stream.notify(stream_key, new_token, time_now_ms) |
|
|
|
|
except Exception: |
|
|
|
|
logger.exception("Failed to notify listener") |
|
|
|
|
time_now_ms = self.clock.time_msec() |
|
|
|
|
for user_stream in user_streams: |
|
|
|
|
try: |
|
|
|
|
user_stream.notify(stream_key, new_token, time_now_ms) |
|
|
|
|
except Exception: |
|
|
|
|
logger.exception("Failed to notify listener") |
|
|
|
|
|
|
|
|
|
self.notify_replication() |
|
|
|
|
self.notify_replication() |
|
|
|
|
|
|
|
|
|
# Notify appservices |
|
|
|
|
self._notify_app_services_ephemeral( |
|
|
|
|
stream_key, new_token, users, |
|
|
|
|
) |
|
|
|
|
# Notify appservices |
|
|
|
|
self._notify_app_services_ephemeral( |
|
|
|
|
stream_key, new_token, users, |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
def on_new_replication_data(self) -> None: |
|
|
|
|
"""Used to inform replication listeners that something has happened |
|
|
|
|