|
|
|
@ -62,7 +62,8 @@ class _NotificationListener(object): |
|
|
|
|
|
|
|
|
|
self.rooms = rooms |
|
|
|
|
|
|
|
|
|
self.pending_notifications = [] |
|
|
|
|
def notified(self): |
|
|
|
|
return self.deferred.called |
|
|
|
|
|
|
|
|
|
def notify(self, notifier, events, start_token, end_token): |
|
|
|
|
""" Inform whoever is listening about the new events. This will |
|
|
|
@ -78,11 +79,15 @@ class _NotificationListener(object): |
|
|
|
|
except defer.AlreadyCalledError: |
|
|
|
|
pass |
|
|
|
|
|
|
|
|
|
# Should the following be done be using intrusively linked lists? |
|
|
|
|
# -- erikj |
|
|
|
|
|
|
|
|
|
for room in self.rooms: |
|
|
|
|
lst = notifier.room_to_listeners.get(room, set()) |
|
|
|
|
lst.discard(self) |
|
|
|
|
|
|
|
|
|
notifier.user_to_listeners.get(self.user, set()).discard(self) |
|
|
|
|
|
|
|
|
|
if self.appservice: |
|
|
|
|
notifier.appservice_to_listeners.get( |
|
|
|
|
self.appservice, set() |
|
|
|
@ -161,10 +166,18 @@ class Notifier(object): |
|
|
|
|
|
|
|
|
|
room_source = self.event_sources.sources["room"] |
|
|
|
|
|
|
|
|
|
listeners = self.room_to_listeners.get(room_id, set()).copy() |
|
|
|
|
room_listeners = self.room_to_listeners.get(room_id, set()) |
|
|
|
|
|
|
|
|
|
_discard_if_notified(room_listeners) |
|
|
|
|
|
|
|
|
|
listeners = room_listeners.copy() |
|
|
|
|
|
|
|
|
|
for user in extra_users: |
|
|
|
|
listeners |= self.user_to_listeners.get(user, set()).copy() |
|
|
|
|
user_listeners = self.user_to_listeners.get(user, set()) |
|
|
|
|
|
|
|
|
|
_discard_if_notified(user_listeners) |
|
|
|
|
|
|
|
|
|
listeners |= user_listeners |
|
|
|
|
|
|
|
|
|
for appservice in self.appservice_to_listeners: |
|
|
|
|
# TODO (kegan): Redundant appservice listener checks? |
|
|
|
@ -173,9 +186,13 @@ class Notifier(object): |
|
|
|
|
# receive *invites* for users they are interested in. Does this |
|
|
|
|
# make the room_to_listeners check somewhat obselete? |
|
|
|
|
if appservice.is_interested(event): |
|
|
|
|
listeners |= self.appservice_to_listeners.get( |
|
|
|
|
app_listeners = self.appservice_to_listeners.get( |
|
|
|
|
appservice, set() |
|
|
|
|
).copy() |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
_discard_if_notified(app_listeners) |
|
|
|
|
|
|
|
|
|
listeners |= app_listeners |
|
|
|
|
|
|
|
|
|
logger.debug("on_new_room_event listeners %s", listeners) |
|
|
|
|
|
|
|
|
@ -226,10 +243,18 @@ class Notifier(object): |
|
|
|
|
listeners = set() |
|
|
|
|
|
|
|
|
|
for user in users: |
|
|
|
|
listeners |= self.user_to_listeners.get(user, set()).copy() |
|
|
|
|
user_listeners = self.user_to_listeners.get(user, set()) |
|
|
|
|
|
|
|
|
|
_discard_if_notified(user_listeners) |
|
|
|
|
|
|
|
|
|
listeners |= user_listeners |
|
|
|
|
|
|
|
|
|
for room in rooms: |
|
|
|
|
listeners |= self.room_to_listeners.get(room, set()).copy() |
|
|
|
|
room_listeners = self.room_to_listeners.get(room, set()) |
|
|
|
|
|
|
|
|
|
_discard_if_notified(room_listeners) |
|
|
|
|
|
|
|
|
|
listeners |= room_listeners |
|
|
|
|
|
|
|
|
|
@defer.inlineCallbacks |
|
|
|
|
def notify(listener): |
|
|
|
@ -427,3 +452,17 @@ class Notifier(object): |
|
|
|
|
|
|
|
|
|
listeners = self.room_to_listeners.setdefault(room_id, set()) |
|
|
|
|
listeners |= new_listeners |
|
|
|
|
|
|
|
|
|
for l in new_listeners: |
|
|
|
|
l.rooms.add(room_id) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _discard_if_notified(listener_set): |
|
|
|
|
"""Remove any 'stale' listeners from the given set. |
|
|
|
|
""" |
|
|
|
|
to_discard = set() |
|
|
|
|
for l in listener_set: |
|
|
|
|
if l.notified(): |
|
|
|
|
to_discard.add(l) |
|
|
|
|
|
|
|
|
|
listener_set -= to_discard |
|
|
|
|