|
|
|
@ -299,14 +299,14 @@ class BasePresenceHandler(abc.ABC): |
|
|
|
|
if not states: |
|
|
|
|
return |
|
|
|
|
|
|
|
|
|
hosts_and_states = await get_interested_remotes( |
|
|
|
|
hosts_to_states = await get_interested_remotes( |
|
|
|
|
self.store, |
|
|
|
|
self.presence_router, |
|
|
|
|
states, |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
for destinations, states in hosts_and_states: |
|
|
|
|
self._federation.send_presence_to_destinations(states, destinations) |
|
|
|
|
for destination, host_states in hosts_to_states.items(): |
|
|
|
|
self._federation.send_presence_to_destinations(host_states, [destination]) |
|
|
|
|
|
|
|
|
|
async def send_full_presence_to_users(self, user_ids: Collection[str]): |
|
|
|
|
""" |
|
|
|
@ -842,15 +842,15 @@ class PresenceHandler(BasePresenceHandler): |
|
|
|
|
if to_federation_ping: |
|
|
|
|
federation_presence_out_counter.inc(len(to_federation_ping)) |
|
|
|
|
|
|
|
|
|
hosts_and_states = await get_interested_remotes( |
|
|
|
|
hosts_to_states = await get_interested_remotes( |
|
|
|
|
self.store, |
|
|
|
|
self.presence_router, |
|
|
|
|
list(to_federation_ping.values()), |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
for destinations, states in hosts_and_states: |
|
|
|
|
for destination, states in hosts_to_states.items(): |
|
|
|
|
self._federation_queue.send_presence_to_destinations( |
|
|
|
|
states, destinations |
|
|
|
|
states, [destination] |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
async def _handle_timeouts(self) -> None: |
|
|
|
@ -1975,7 +1975,7 @@ async def get_interested_remotes( |
|
|
|
|
store: DataStore, |
|
|
|
|
presence_router: PresenceRouter, |
|
|
|
|
states: List[UserPresenceState], |
|
|
|
|
) -> List[Tuple[Collection[str], List[UserPresenceState]]]: |
|
|
|
|
) -> Dict[str, Set[UserPresenceState]]: |
|
|
|
|
"""Given a list of presence states figure out which remote servers |
|
|
|
|
should be sent which. |
|
|
|
|
|
|
|
|
@ -1987,11 +1987,9 @@ async def get_interested_remotes( |
|
|
|
|
states: A list of incoming user presence updates. |
|
|
|
|
|
|
|
|
|
Returns: |
|
|
|
|
A list of 2-tuples of destinations and states, where for |
|
|
|
|
each tuple the list of UserPresenceState should be sent to each |
|
|
|
|
destination |
|
|
|
|
A map from destinations to presence states to send to that destination. |
|
|
|
|
""" |
|
|
|
|
hosts_and_states = [] # type: List[Tuple[Collection[str], List[UserPresenceState]]] |
|
|
|
|
hosts_and_states: Dict[str, Set[UserPresenceState]] = {} |
|
|
|
|
|
|
|
|
|
# First we look up the rooms each user is in (as well as any explicit |
|
|
|
|
# subscriptions), then for each distinct room we look up the remote |
|
|
|
@ -2003,11 +2001,12 @@ async def get_interested_remotes( |
|
|
|
|
for room_id, states in room_ids_to_states.items(): |
|
|
|
|
user_ids = await store.get_users_in_room(room_id) |
|
|
|
|
hosts = {get_domain_from_id(user_id) for user_id in user_ids} |
|
|
|
|
hosts_and_states.append((hosts, states)) |
|
|
|
|
for host in hosts: |
|
|
|
|
hosts_and_states.setdefault(host, set()).update(states) |
|
|
|
|
|
|
|
|
|
for user_id, states in users_to_states.items(): |
|
|
|
|
host = get_domain_from_id(user_id) |
|
|
|
|
hosts_and_states.append(([host], states)) |
|
|
|
|
hosts_and_states.setdefault(host, set()).update(states) |
|
|
|
|
|
|
|
|
|
return hosts_and_states |
|
|
|
|
|
|
|
|
|