|
|
|
@ -835,50 +835,51 @@ class PresenceEventSource(object): |
|
|
|
|
# We don't try and limit the presence updates by the current token, as |
|
|
|
|
# sending down the rare duplicate is not a concern. |
|
|
|
|
|
|
|
|
|
user_id = user.to_string() |
|
|
|
|
if from_key is not None: |
|
|
|
|
from_key = int(from_key) |
|
|
|
|
room_ids = room_ids or [] |
|
|
|
|
with Measure(self.clock, "Presence.get_new_events"): |
|
|
|
|
user_id = user.to_string() |
|
|
|
|
if from_key is not None: |
|
|
|
|
from_key = int(from_key) |
|
|
|
|
room_ids = room_ids or [] |
|
|
|
|
|
|
|
|
|
presence = self.hs.get_handlers().presence_handler |
|
|
|
|
presence = self.hs.get_handlers().presence_handler |
|
|
|
|
|
|
|
|
|
if not room_ids: |
|
|
|
|
rooms = yield self.store.get_rooms_for_user(user_id) |
|
|
|
|
room_ids = set(e.room_id for e in rooms) |
|
|
|
|
if not room_ids: |
|
|
|
|
rooms = yield self.store.get_rooms_for_user(user_id) |
|
|
|
|
room_ids = set(e.room_id for e in rooms) |
|
|
|
|
|
|
|
|
|
user_ids_to_check = set() |
|
|
|
|
for room_id in room_ids: |
|
|
|
|
users = yield self.store.get_users_in_room(room_id) |
|
|
|
|
user_ids_to_check.update(users) |
|
|
|
|
user_ids_to_check = set() |
|
|
|
|
for room_id in room_ids: |
|
|
|
|
users = yield self.store.get_users_in_room(room_id) |
|
|
|
|
user_ids_to_check.update(users) |
|
|
|
|
|
|
|
|
|
plist = yield self.store.get_presence_list_accepted(user.localpart) |
|
|
|
|
user_ids_to_check.update([row["observed_user_id"] for row in plist]) |
|
|
|
|
plist = yield self.store.get_presence_list_accepted(user.localpart) |
|
|
|
|
user_ids_to_check.update([row["observed_user_id"] for row in plist]) |
|
|
|
|
|
|
|
|
|
# Always include yourself. Only really matters for when the user is |
|
|
|
|
# not in any rooms, but still. |
|
|
|
|
user_ids_to_check.add(user_id) |
|
|
|
|
# Always include yourself. Only really matters for when the user is |
|
|
|
|
# not in any rooms, but still. |
|
|
|
|
user_ids_to_check.add(user_id) |
|
|
|
|
|
|
|
|
|
max_token = self.store.get_current_presence_token() |
|
|
|
|
max_token = self.store.get_current_presence_token() |
|
|
|
|
|
|
|
|
|
if from_key: |
|
|
|
|
user_ids_changed = self.store.presence_stream_cache.get_entities_changed( |
|
|
|
|
user_ids_to_check, from_key, |
|
|
|
|
) |
|
|
|
|
else: |
|
|
|
|
user_ids_changed = user_ids_to_check |
|
|
|
|
if from_key: |
|
|
|
|
user_ids_changed = self.store.presence_stream_cache.get_entities_changed( |
|
|
|
|
user_ids_to_check, from_key, |
|
|
|
|
) |
|
|
|
|
else: |
|
|
|
|
user_ids_changed = user_ids_to_check |
|
|
|
|
|
|
|
|
|
updates = yield presence.current_state_for_users(user_ids_changed) |
|
|
|
|
updates = yield presence.current_state_for_users(user_ids_changed) |
|
|
|
|
|
|
|
|
|
now = self.clock.time_msec() |
|
|
|
|
now = self.clock.time_msec() |
|
|
|
|
|
|
|
|
|
defer.returnValue(([ |
|
|
|
|
{ |
|
|
|
|
"type": "m.presence", |
|
|
|
|
"content": _format_user_presence_state(s, now), |
|
|
|
|
} |
|
|
|
|
for s in updates.values() |
|
|
|
|
if include_offline or s.state != PresenceState.OFFLINE |
|
|
|
|
], max_token)) |
|
|
|
|
defer.returnValue(([ |
|
|
|
|
{ |
|
|
|
|
"type": "m.presence", |
|
|
|
|
"content": _format_user_presence_state(s, now), |
|
|
|
|
} |
|
|
|
|
for s in updates.values() |
|
|
|
|
if include_offline or s.state != PresenceState.OFFLINE |
|
|
|
|
], max_token)) |
|
|
|
|
|
|
|
|
|
def get_current_key(self): |
|
|
|
|
return self.store.get_current_presence_token() |
|
|
|
|