|
|
|
@ -1017,11 +1017,28 @@ class PresenceEventSource(object): |
|
|
|
|
if from_key is not None: |
|
|
|
|
from_key = int(from_key) |
|
|
|
|
|
|
|
|
|
max_token = self.store.get_current_presence_token() |
|
|
|
|
if from_key == max_token: |
|
|
|
|
# This is necessary as due to the way stream ID generators work |
|
|
|
|
# we may get updates that have a stream ID greater than the max |
|
|
|
|
# token (e.g. max_token is N but stream generator may return |
|
|
|
|
# results for N+2, due to N+1 not having finished being |
|
|
|
|
# persisted yet). |
|
|
|
|
# |
|
|
|
|
# This is usually fine, as it just means that we may send down |
|
|
|
|
# some presence updates multiple times. However, we need to be |
|
|
|
|
# careful that the sync stream either actually does make some |
|
|
|
|
# progress or doesn't return, otherwise clients will end up |
|
|
|
|
# tight looping calling /sync due to it immediately returning |
|
|
|
|
# the same token repeatedly. |
|
|
|
|
# |
|
|
|
|
# Hence this guard where we just return nothing so that the sync |
|
|
|
|
# doesn't return. C.f. #5503. |
|
|
|
|
defer.returnValue(([], max_token)) |
|
|
|
|
|
|
|
|
|
presence = self.get_presence_handler() |
|
|
|
|
stream_change_cache = self.store.presence_stream_cache |
|
|
|
|
|
|
|
|
|
max_token = self.store.get_current_presence_token() |
|
|
|
|
|
|
|
|
|
users_interested_in = yield self._get_interested_in(user, explicit_room_id) |
|
|
|
|
|
|
|
|
|
user_ids_changed = set() |
|
|
|
|