|
|
|
@ -17,7 +17,7 @@ from synapse.api import errors |
|
|
|
|
from synapse.api.constants import EventTypes |
|
|
|
|
from synapse.util import stringutils |
|
|
|
|
from synapse.util.async import Linearizer |
|
|
|
|
from synapse.types import get_domain_from_id |
|
|
|
|
from synapse.types import get_domain_from_id, RoomStreamToken |
|
|
|
|
from twisted.internet import defer |
|
|
|
|
from ._base import BaseHandler |
|
|
|
|
|
|
|
|
@ -198,20 +198,22 @@ class DeviceHandler(BaseHandler): |
|
|
|
|
"""Notify that a user's device(s) has changed. Pokes the notifier, and |
|
|
|
|
remote servers if the user is local. |
|
|
|
|
""" |
|
|
|
|
rooms = yield self.store.get_rooms_for_user(user_id) |
|
|
|
|
room_ids = [r.room_id for r in rooms] |
|
|
|
|
users_who_share_room = yield self.store.get_users_who_share_room_with_user( |
|
|
|
|
user_id |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
hosts = set() |
|
|
|
|
if self.hs.is_mine_id(user_id): |
|
|
|
|
for room_id in room_ids: |
|
|
|
|
users = yield self.store.get_users_in_room(room_id) |
|
|
|
|
hosts.update(get_domain_from_id(u) for u in users) |
|
|
|
|
hosts.update(get_domain_from_id(u) for u in users_who_share_room) |
|
|
|
|
hosts.discard(self.server_name) |
|
|
|
|
|
|
|
|
|
position = yield self.store.add_device_change_to_streams( |
|
|
|
|
user_id, device_ids, list(hosts) |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
rooms = yield self.store.get_rooms_for_user(user_id) |
|
|
|
|
room_ids = [r.room_id for r in rooms] |
|
|
|
|
|
|
|
|
|
yield self.notifier.on_new_event( |
|
|
|
|
"device_list_key", position, rooms=room_ids, |
|
|
|
|
) |
|
|
|
@ -243,15 +245,15 @@ class DeviceHandler(BaseHandler): |
|
|
|
|
|
|
|
|
|
possibly_changed = set(changed) |
|
|
|
|
for room_id in rooms_changed: |
|
|
|
|
# Fetch (an approximation) of the current state at the time. |
|
|
|
|
event_rows, token = yield self.store.get_recent_event_ids_for_room( |
|
|
|
|
room_id, end_token=from_token.room_key, limit=1, |
|
|
|
|
) |
|
|
|
|
# Fetch the current state at the time. |
|
|
|
|
stream_ordering = RoomStreamToken.parse_stream_token(from_token.room_key) |
|
|
|
|
|
|
|
|
|
if event_rows: |
|
|
|
|
last_event_id = event_rows[-1]["event_id"] |
|
|
|
|
prev_state_ids = yield self.store.get_state_ids_for_event(last_event_id) |
|
|
|
|
else: |
|
|
|
|
try: |
|
|
|
|
event_ids = yield self.store.get_forward_extremeties_for_room( |
|
|
|
|
room_id, stream_ordering=stream_ordering |
|
|
|
|
) |
|
|
|
|
prev_state_ids = yield self.store.get_state_ids_for_events(event_ids) |
|
|
|
|
except: |
|
|
|
|
prev_state_ids = {} |
|
|
|
|
|
|
|
|
|
current_state_ids = yield self.state.get_current_state_ids(room_id) |
|
|
|
@ -266,13 +268,13 @@ class DeviceHandler(BaseHandler): |
|
|
|
|
if not prev_event_id or prev_event_id != event_id: |
|
|
|
|
possibly_changed.add(state_key) |
|
|
|
|
|
|
|
|
|
user_ids_changed = set() |
|
|
|
|
for other_user_id in possibly_changed: |
|
|
|
|
other_rooms = yield self.store.get_rooms_for_user(other_user_id) |
|
|
|
|
if room_ids.intersection(e.room_id for e in other_rooms): |
|
|
|
|
user_ids_changed.add(other_user_id) |
|
|
|
|
users_who_share_room = yield self.store.get_users_who_share_room_with_user( |
|
|
|
|
user_id |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
defer.returnValue(user_ids_changed) |
|
|
|
|
# Take the intersection of the users whose devices may have changed |
|
|
|
|
# and those that actually still share a room with the user |
|
|
|
|
defer.returnValue(users_who_share_room & possibly_changed) |
|
|
|
|
|
|
|
|
|
@defer.inlineCallbacks |
|
|
|
|
def _incoming_device_list_update(self, origin, edu_content): |
|
|
|
|