|
|
|
@ -20,6 +20,7 @@ from synapse.api.errors import AuthError |
|
|
|
|
from synapse.util.logutils import log_function |
|
|
|
|
from synapse.util.async import ObservableDeferred |
|
|
|
|
from synapse.util.logcontext import PreserveLoggingContext |
|
|
|
|
from synapse.util.metrics import Measure |
|
|
|
|
from synapse.types import StreamToken |
|
|
|
|
from synapse.visibility import filter_events_for_client |
|
|
|
|
import synapse.metrics |
|
|
|
@ -231,30 +232,32 @@ class Notifier(object): |
|
|
|
|
Will wake up all listeners for the given users and rooms. |
|
|
|
|
""" |
|
|
|
|
with PreserveLoggingContext(): |
|
|
|
|
user_streams = set() |
|
|
|
|
with Measure(self.clock, "on_new_event"): |
|
|
|
|
user_streams = set() |
|
|
|
|
|
|
|
|
|
for user in users: |
|
|
|
|
user_stream = self.user_to_user_stream.get(str(user)) |
|
|
|
|
if user_stream is not None: |
|
|
|
|
user_streams.add(user_stream) |
|
|
|
|
for user in users: |
|
|
|
|
user_stream = self.user_to_user_stream.get(str(user)) |
|
|
|
|
if user_stream is not None: |
|
|
|
|
user_streams.add(user_stream) |
|
|
|
|
|
|
|
|
|
for room in rooms: |
|
|
|
|
user_streams |= self.room_to_user_streams.get(room, set()) |
|
|
|
|
for room in rooms: |
|
|
|
|
user_streams |= self.room_to_user_streams.get(room, set()) |
|
|
|
|
|
|
|
|
|
time_now_ms = self.clock.time_msec() |
|
|
|
|
for user_stream in user_streams: |
|
|
|
|
try: |
|
|
|
|
user_stream.notify(stream_key, new_token, time_now_ms) |
|
|
|
|
except: |
|
|
|
|
logger.exception("Failed to notify listener") |
|
|
|
|
time_now_ms = self.clock.time_msec() |
|
|
|
|
for user_stream in user_streams: |
|
|
|
|
try: |
|
|
|
|
user_stream.notify(stream_key, new_token, time_now_ms) |
|
|
|
|
except: |
|
|
|
|
logger.exception("Failed to notify listener") |
|
|
|
|
|
|
|
|
|
self.notify_replication() |
|
|
|
|
self.notify_replication() |
|
|
|
|
|
|
|
|
|
def on_new_replication_data(self): |
|
|
|
|
"""Used to inform replication listeners that something has happend |
|
|
|
|
without waking up any of the normal user event streams""" |
|
|
|
|
with PreserveLoggingContext(): |
|
|
|
|
self.notify_replication() |
|
|
|
|
with Measure(self.clock, "on_new_replication_data"): |
|
|
|
|
self.notify_replication() |
|
|
|
|
|
|
|
|
|
@defer.inlineCallbacks |
|
|
|
|
def wait_for_events(self, user_id, timeout, callback, room_ids=None, |
|
|
|
|