|
|
|
@ -16,7 +16,7 @@ |
|
|
|
|
|
|
|
|
|
import synapse |
|
|
|
|
|
|
|
|
|
from synapse.api.constants import EventTypes |
|
|
|
|
from synapse.api.constants import EventTypes, PresenceState |
|
|
|
|
from synapse.config._base import ConfigError |
|
|
|
|
from synapse.config.database import DatabaseConfig |
|
|
|
|
from synapse.config.logger import LoggingConfig |
|
|
|
@ -41,7 +41,7 @@ from synapse.storage.presence import UserPresenceState |
|
|
|
|
from synapse.storage.roommember import RoomMemberStore |
|
|
|
|
from synapse.util.async import sleep |
|
|
|
|
from synapse.util.httpresourcetree import create_resource_tree |
|
|
|
|
from synapse.util.logcontext import LoggingContext |
|
|
|
|
from synapse.util.logcontext import LoggingContext, preserve_fn |
|
|
|
|
from synapse.util.manhole import manhole |
|
|
|
|
from synapse.util.rlimit import change_resource_limit |
|
|
|
|
from synapse.util.stringutils import random_string |
|
|
|
@ -135,6 +135,8 @@ class SynchrotronSlavedStore( |
|
|
|
|
RoomMemberStore.__dict__["who_forgot_in_room"] |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
UPDATE_SYNCING_USERS_MS = 10 * 1000 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class SynchrotronPresence(object): |
|
|
|
|
def __init__(self, hs): |
|
|
|
@ -153,6 +155,13 @@ class SynchrotronPresence(object): |
|
|
|
|
self.process_id = random_string(16) |
|
|
|
|
logger.info("Presence process_id is %r", self.process_id) |
|
|
|
|
|
|
|
|
|
self._sending_sync = False |
|
|
|
|
self._need_to_send_sync = False |
|
|
|
|
self.clock.looping_call( |
|
|
|
|
self._send_syncing_users_regularly, |
|
|
|
|
UPDATE_SYNCING_USERS_MS, |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
def set_state(self, user, state): |
|
|
|
|
# TODO Hows this supposed to work? |
|
|
|
|
pass |
|
|
|
@ -165,12 +174,10 @@ class SynchrotronPresence(object): |
|
|
|
|
if affect_presence: |
|
|
|
|
curr_sync = self.user_to_num_current_syncs.get(user_id, 0) |
|
|
|
|
self.user_to_num_current_syncs[user_id] = curr_sync + 1 |
|
|
|
|
# TODO: Send this less frequently. |
|
|
|
|
# TODO: Make sure this doesn't race. Currently we can lose updates |
|
|
|
|
# if two users come online in quick sucession and the second http |
|
|
|
|
# to the master completes before the first. |
|
|
|
|
# TODO: Don't block the sync request on this HTTP hit. |
|
|
|
|
yield self._send_syncing_users() |
|
|
|
|
prev_states = yield self.current_state_for_users([user_id]) |
|
|
|
|
if prev_states[user_id].state == PresenceState.OFFLINE: |
|
|
|
|
# TODO: Don't block the sync request on this HTTP hit. |
|
|
|
|
yield self._send_syncing_users_now() |
|
|
|
|
|
|
|
|
|
def _end(): |
|
|
|
|
if affect_presence: |
|
|
|
@ -185,8 +192,24 @@ class SynchrotronPresence(object): |
|
|
|
|
|
|
|
|
|
defer.returnValue(_user_syncing()) |
|
|
|
|
|
|
|
|
|
def _send_syncing_users(self): |
|
|
|
|
return self.http_client.post_json_get_json(self.syncing_users_url, { |
|
|
|
|
def _send_syncing_users_regularly(self): |
|
|
|
|
# Only send an update if we aren't in the middle of sending one. |
|
|
|
|
if not self._sending_sync: |
|
|
|
|
preserve_fn(self._send_syncing_users_now)() |
|
|
|
|
|
|
|
|
|
@defer.inlineCallbacks |
|
|
|
|
def _send_syncing_users_now(self): |
|
|
|
|
if self._sending_sync: |
|
|
|
|
# We don't want to race with sending another update. |
|
|
|
|
# Instead we wait for that update to finish and send another |
|
|
|
|
# update afterwards. |
|
|
|
|
self._need_to_send_sync = True |
|
|
|
|
return |
|
|
|
|
|
|
|
|
|
# Flag that we are sending an update. |
|
|
|
|
self._sending_sync = True |
|
|
|
|
|
|
|
|
|
yield self.http_client.post_json_get_json(self.syncing_users_url, { |
|
|
|
|
"process_id": self.process_id, |
|
|
|
|
"syncing_users": [ |
|
|
|
|
user_id for user_id, count in self.user_to_num_current_syncs.items() |
|
|
|
@ -194,6 +217,16 @@ class SynchrotronPresence(object): |
|
|
|
|
], |
|
|
|
|
}) |
|
|
|
|
|
|
|
|
|
# Unset the flag as we are no longer sending an update. |
|
|
|
|
self._sending_sync = False |
|
|
|
|
if self._need_to_send_sync: |
|
|
|
|
# If something happened while we were sending the update then |
|
|
|
|
# we might need to send another update. |
|
|
|
|
# TODO: Check if the update that was sent matches the current state |
|
|
|
|
# as we only need to send an update if they are different. |
|
|
|
|
self._need_to_send_sync = False |
|
|
|
|
yield self._send_syncing_users_now() |
|
|
|
|
|
|
|
|
|
def process_replication(self, result): |
|
|
|
|
stream = result.get("presence", {"rows": []}) |
|
|
|
|
for row in stream["rows"]: |
|
|
|
|