Remove legacy code of single user device resync api (#15418)

* Removed single-user resync usage and updated it to use multi-user counterpart

Signed-off-by: Alok Kumar Singh alokaks601@gmail.com
1.103.0-whithout-watcha
Alok Kumar Singh 2 years ago committed by GitHub
parent 5e024a0645
commit 197fbb123b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      changelog.d/15418.misc
  2. 56
      synapse/handlers/device.py
  3. 14
      synapse/handlers/devicemessage.py
  4. 14
      synapse/handlers/federation_event.py
  5. 57
      synapse/replication/http/devices.py
  6. 4
      tests/test_federation.py

@ -0,0 +1 @@
Always use multi-user device resync replication endpoints.

@ -14,7 +14,6 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import logging import logging
from http import HTTPStatus
from typing import ( from typing import (
TYPE_CHECKING, TYPE_CHECKING,
Any, Any,
@ -921,12 +920,8 @@ class DeviceListWorkerUpdater:
def __init__(self, hs: "HomeServer"): def __init__(self, hs: "HomeServer"):
from synapse.replication.http.devices import ( from synapse.replication.http.devices import (
ReplicationMultiUserDevicesResyncRestServlet, ReplicationMultiUserDevicesResyncRestServlet,
ReplicationUserDevicesResyncRestServlet,
) )
self._user_device_resync_client = (
ReplicationUserDevicesResyncRestServlet.make_client(hs)
)
self._multi_user_device_resync_client = ( self._multi_user_device_resync_client = (
ReplicationMultiUserDevicesResyncRestServlet.make_client(hs) ReplicationMultiUserDevicesResyncRestServlet.make_client(hs)
) )
@ -948,37 +943,7 @@ class DeviceListWorkerUpdater:
# Shortcut empty requests # Shortcut empty requests
return {} return {}
try:
return await self._multi_user_device_resync_client(user_ids=user_ids) return await self._multi_user_device_resync_client(user_ids=user_ids)
except SynapseError as err:
if not (
err.code == HTTPStatus.NOT_FOUND and err.errcode == Codes.UNRECOGNIZED
):
raise
# Fall back to single requests
result: Dict[str, Optional[JsonDict]] = {}
for user_id in user_ids:
result[user_id] = await self._user_device_resync_client(user_id=user_id)
return result
async def user_device_resync(
self, user_id: str, mark_failed_as_stale: bool = True
) -> Optional[JsonDict]:
"""Fetches all devices for a user and updates the device cache with them.
Args:
user_id: The user's id whose device_list will be updated.
mark_failed_as_stale: Whether to mark the user's device list as stale
if the attempt to resync failed.
Returns:
A dict with device info as under the "devices" in the result of this
request:
https://matrix.org/docs/spec/server_server/r0.1.2#get-matrix-federation-v1-user-devices-userid
None when we weren't able to fetch the device info for some reason,
e.g. due to a connection problem.
"""
return (await self.multi_user_device_resync([user_id]))[user_id]
class DeviceListUpdater(DeviceListWorkerUpdater): class DeviceListUpdater(DeviceListWorkerUpdater):
@ -1131,7 +1096,7 @@ class DeviceListUpdater(DeviceListWorkerUpdater):
) )
if resync: if resync:
await self.user_device_resync(user_id) await self.multi_user_device_resync([user_id])
else: else:
# Simply update the single device, since we know that is the only # Simply update the single device, since we know that is the only
# change (because of the single prev_id matching the current cache) # change (because of the single prev_id matching the current cache)
@ -1198,10 +1163,9 @@ class DeviceListUpdater(DeviceListWorkerUpdater):
for user_id in need_resync: for user_id in need_resync:
try: try:
# Try to resync the current user's devices list. # Try to resync the current user's devices list.
result = await self.user_device_resync( result = (await self.multi_user_device_resync([user_id], False))[
user_id=user_id, user_id
mark_failed_as_stale=False, ]
)
# user_device_resync only returns a result if it managed to # user_device_resync only returns a result if it managed to
# successfully resync and update the database. Updating the table # successfully resync and update the database. Updating the table
@ -1260,18 +1224,6 @@ class DeviceListUpdater(DeviceListWorkerUpdater):
return result return result
async def user_device_resync(
self, user_id: str, mark_failed_as_stale: bool = True
) -> Optional[JsonDict]:
result, failed = await self._user_device_resync_returning_failed(user_id)
if failed and mark_failed_as_stale:
# Mark the remote user's device list as stale so we know we need to retry
# it later.
await self.store.mark_remote_users_device_caches_as_stale((user_id,))
return result
async def _user_device_resync_returning_failed( async def _user_device_resync_returning_failed(
self, user_id: str self, user_id: str
) -> Tuple[Optional[JsonDict], bool]: ) -> Tuple[Optional[JsonDict], bool]:

@ -25,7 +25,9 @@ from synapse.logging.opentracing import (
log_kv, log_kv,
set_tag, set_tag,
) )
from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet from synapse.replication.http.devices import (
ReplicationMultiUserDevicesResyncRestServlet,
)
from synapse.types import JsonDict, Requester, StreamKeyType, UserID, get_domain_from_id from synapse.types import JsonDict, Requester, StreamKeyType, UserID, get_domain_from_id
from synapse.util import json_encoder from synapse.util import json_encoder
from synapse.util.stringutils import random_string from synapse.util.stringutils import random_string
@ -71,12 +73,12 @@ class DeviceMessageHandler:
# sync. We do all device list resyncing on the master instance, so if # sync. We do all device list resyncing on the master instance, so if
# we're on a worker we hit the device resync replication API. # we're on a worker we hit the device resync replication API.
if hs.config.worker.worker_app is None: if hs.config.worker.worker_app is None:
self._user_device_resync = ( self._multi_user_device_resync = (
hs.get_device_handler().device_list_updater.user_device_resync hs.get_device_handler().device_list_updater.multi_user_device_resync
) )
else: else:
self._user_device_resync = ( self._multi_user_device_resync = (
ReplicationUserDevicesResyncRestServlet.make_client(hs) ReplicationMultiUserDevicesResyncRestServlet.make_client(hs)
) )
# a rate limiter for room key requests. The keys are # a rate limiter for room key requests. The keys are
@ -198,7 +200,7 @@ class DeviceMessageHandler:
await self.store.mark_remote_users_device_caches_as_stale((sender_user_id,)) await self.store.mark_remote_users_device_caches_as_stale((sender_user_id,))
# Immediately attempt a resync in the background # Immediately attempt a resync in the background
run_in_background(self._user_device_resync, user_id=sender_user_id) run_in_background(self._multi_user_device_resync, user_ids=[sender_user_id])
async def send_device_message( async def send_device_message(
self, self,

@ -70,7 +70,9 @@ from synapse.logging.opentracing import (
trace, trace,
) )
from synapse.metrics.background_process_metrics import run_as_background_process from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet from synapse.replication.http.devices import (
ReplicationMultiUserDevicesResyncRestServlet,
)
from synapse.replication.http.federation import ( from synapse.replication.http.federation import (
ReplicationFederationSendEventsRestServlet, ReplicationFederationSendEventsRestServlet,
) )
@ -167,8 +169,8 @@ class FederationEventHandler:
self._send_events = ReplicationFederationSendEventsRestServlet.make_client(hs) self._send_events = ReplicationFederationSendEventsRestServlet.make_client(hs)
if hs.config.worker.worker_app: if hs.config.worker.worker_app:
self._user_device_resync = ( self._multi_user_device_resync = (
ReplicationUserDevicesResyncRestServlet.make_client(hs) ReplicationMultiUserDevicesResyncRestServlet.make_client(hs)
) )
else: else:
self._device_list_updater = hs.get_device_handler().device_list_updater self._device_list_updater = hs.get_device_handler().device_list_updater
@ -1487,9 +1489,11 @@ class FederationEventHandler:
# Immediately attempt a resync in the background # Immediately attempt a resync in the background
if self._config.worker.worker_app: if self._config.worker.worker_app:
await self._user_device_resync(user_id=sender) await self._multi_user_device_resync(user_ids=[sender])
else: else:
await self._device_list_updater.user_device_resync(sender) await self._device_list_updater.multi_user_device_resync(
user_ids=[sender]
)
except Exception: except Exception:
logger.exception("Failed to resync device for %s", sender) logger.exception("Failed to resync device for %s", sender)

@ -28,62 +28,6 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class ReplicationUserDevicesResyncRestServlet(ReplicationEndpoint):
"""Ask master to resync the device list for a user by contacting their
server.
This must happen on master so that the results can be correctly cached in
the database and streamed to workers.
Request format:
POST /_synapse/replication/user_device_resync/:user_id
{}
Response is equivalent to ` /_matrix/federation/v1/user/devices/:user_id`
response, e.g.:
{
"user_id": "@alice:example.org",
"devices": [
{
"device_id": "JLAFKJWSCS",
"keys": { ... },
"device_display_name": "Alice's Mobile Phone"
}
]
}
"""
NAME = "user_device_resync"
PATH_ARGS = ("user_id",)
CACHE = False
def __init__(self, hs: "HomeServer"):
super().__init__(hs)
from synapse.handlers.device import DeviceHandler
handler = hs.get_device_handler()
assert isinstance(handler, DeviceHandler)
self.device_list_updater = handler.device_list_updater
self.store = hs.get_datastores().main
self.clock = hs.get_clock()
@staticmethod
async def _serialize_payload(user_id: str) -> JsonDict: # type: ignore[override]
return {}
async def _handle_request( # type: ignore[override]
self, request: Request, content: JsonDict, user_id: str
) -> Tuple[int, Optional[JsonDict]]:
user_devices = await self.device_list_updater.user_device_resync(user_id)
return 200, user_devices
class ReplicationMultiUserDevicesResyncRestServlet(ReplicationEndpoint): class ReplicationMultiUserDevicesResyncRestServlet(ReplicationEndpoint):
"""Ask master to resync the device list for multiple users from the same """Ask master to resync the device list for multiple users from the same
remote server by contacting their server. remote server by contacting their server.
@ -216,6 +160,5 @@ class ReplicationUploadKeysForUserRestServlet(ReplicationEndpoint):
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
ReplicationUserDevicesResyncRestServlet(hs).register(http_server)
ReplicationMultiUserDevicesResyncRestServlet(hs).register(http_server) ReplicationMultiUserDevicesResyncRestServlet(hs).register(http_server)
ReplicationUploadKeysForUserRestServlet(hs).register(http_server) ReplicationUploadKeysForUserRestServlet(hs).register(http_server)

@ -267,7 +267,9 @@ class MessageAcceptTests(unittest.HomeserverTestCase):
# Resync the device list. # Resync the device list.
device_handler = self.hs.get_device_handler() device_handler = self.hs.get_device_handler()
self.get_success( self.get_success(
device_handler.device_list_updater.user_device_resync(remote_user_id), device_handler.device_list_updater.multi_user_device_resync(
[remote_user_id]
),
) )
# Retrieve the cross-signing keys for this user. # Retrieve the cross-signing keys for this user.

Loading…
Cancel
Save