Merge pull request #3659 from matrix-org/erikj/split_profiles

Allow profile updates to happen on workers
pull/14/head
Erik Johnston 6 years ago committed by GitHub
commit 764030cf63
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      changelog.d/3659.feature
  2. 1
      docs/workers.rst
  3. 12
      synapse/app/event_creator.py
  4. 31
      synapse/handlers/profile.py
  5. 4
      synapse/handlers/user_directory.py
  6. 7
      synapse/server.py
  7. 4
      synapse/storage/profile.py
  8. 58
      synapse/storage/room.py
  9. 4
      tests/handlers/test_profile.py

@ -0,0 +1 @@
Support profile API endpoints on workers

@ -265,6 +265,7 @@ Handles some event creation. It can handle REST endpoints matching::
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/send ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/send
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/(join|invite|leave|ban|unban|kick)$ ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/(join|invite|leave|ban|unban|kick)$
^/_matrix/client/(api/v1|r0|unstable)/join/ ^/_matrix/client/(api/v1|r0|unstable)/join/
^/_matrix/client/(api/v1|r0|unstable)/profile/
It will create events locally and then send them on to the main synapse It will create events locally and then send them on to the main synapse
instance to be persisted and handled. instance to be persisted and handled.

@ -45,6 +45,11 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto
from synapse.replication.slave.storage.room import RoomStore from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.client.v1.profile import (
ProfileAvatarURLRestServlet,
ProfileDisplaynameRestServlet,
ProfileRestServlet,
)
from synapse.rest.client.v1.room import ( from synapse.rest.client.v1.room import (
JoinRoomAliasServlet, JoinRoomAliasServlet,
RoomMembershipRestServlet, RoomMembershipRestServlet,
@ -53,6 +58,7 @@ from synapse.rest.client.v1.room import (
) )
from synapse.server import HomeServer from synapse.server import HomeServer
from synapse.storage.engines import create_engine from synapse.storage.engines import create_engine
from synapse.storage.user_directory import UserDirectoryStore
from synapse.util.httpresourcetree import create_resource_tree from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext from synapse.util.logcontext import LoggingContext
from synapse.util.manhole import manhole from synapse.util.manhole import manhole
@ -62,6 +68,9 @@ logger = logging.getLogger("synapse.app.event_creator")
class EventCreatorSlavedStore( class EventCreatorSlavedStore(
# FIXME(#3714): We need to add UserDirectoryStore as we write directly
# rather than going via the correct worker.
UserDirectoryStore,
DirectoryStore, DirectoryStore,
SlavedTransactionStore, SlavedTransactionStore,
SlavedProfileStore, SlavedProfileStore,
@ -101,6 +110,9 @@ class EventCreatorServer(HomeServer):
RoomMembershipRestServlet(self).register(resource) RoomMembershipRestServlet(self).register(resource)
RoomStateEventRestServlet(self).register(resource) RoomStateEventRestServlet(self).register(resource)
JoinRoomAliasServlet(self).register(resource) JoinRoomAliasServlet(self).register(resource)
ProfileAvatarURLRestServlet(self).register(resource)
ProfileDisplaynameRestServlet(self).register(resource)
ProfileRestServlet(self).register(resource)
resources.update({ resources.update({
"/_matrix/client/r0": resource, "/_matrix/client/r0": resource,
"/_matrix/client/unstable": resource, "/_matrix/client/unstable": resource,

@ -32,12 +32,16 @@ from ._base import BaseHandler
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class ProfileHandler(BaseHandler): class BaseProfileHandler(BaseHandler):
PROFILE_UPDATE_MS = 60 * 1000 """Handles fetching and updating user profile information.
PROFILE_UPDATE_EVERY_MS = 24 * 60 * 60 * 1000
BaseProfileHandler can be instantiated directly on workers and will
delegate to master when necessary. The master process should use the
subclass MasterProfileHandler
"""
def __init__(self, hs): def __init__(self, hs):
super(ProfileHandler, self).__init__(hs) super(BaseProfileHandler, self).__init__(hs)
self.federation = hs.get_federation_client() self.federation = hs.get_federation_client()
hs.get_federation_registry().register_query_handler( hs.get_federation_registry().register_query_handler(
@ -46,11 +50,6 @@ class ProfileHandler(BaseHandler):
self.user_directory_handler = hs.get_user_directory_handler() self.user_directory_handler = hs.get_user_directory_handler()
if hs.config.worker_app is None:
self.clock.looping_call(
self._start_update_remote_profile_cache, self.PROFILE_UPDATE_MS,
)
@defer.inlineCallbacks @defer.inlineCallbacks
def get_profile(self, user_id): def get_profile(self, user_id):
target_user = UserID.from_string(user_id) target_user = UserID.from_string(user_id)
@ -282,6 +281,20 @@ class ProfileHandler(BaseHandler):
room_id, str(e.message) room_id, str(e.message)
) )
class MasterProfileHandler(BaseProfileHandler):
PROFILE_UPDATE_MS = 60 * 1000
PROFILE_UPDATE_EVERY_MS = 24 * 60 * 60 * 1000
def __init__(self, hs):
super(MasterProfileHandler, self).__init__(hs)
assert hs.config.worker_app is None
self.clock.looping_call(
self._start_update_remote_profile_cache, self.PROFILE_UPDATE_MS,
)
def _start_update_remote_profile_cache(self): def _start_update_remote_profile_cache(self):
return run_as_background_process( return run_as_background_process(
"Update remote profile", self._update_remote_profile_cache, "Update remote profile", self._update_remote_profile_cache,

@ -119,6 +119,8 @@ class UserDirectoryHandler(object):
"""Called to update index of our local user profiles when they change """Called to update index of our local user profiles when they change
irrespective of any rooms the user may be in. irrespective of any rooms the user may be in.
""" """
# FIXME(#3714): We should probably do this in the same worker as all
# the other changes.
yield self.store.update_profile_in_user_dir( yield self.store.update_profile_in_user_dir(
user_id, profile.display_name, profile.avatar_url, None, user_id, profile.display_name, profile.avatar_url, None,
) )
@ -127,6 +129,8 @@ class UserDirectoryHandler(object):
def handle_user_deactivated(self, user_id): def handle_user_deactivated(self, user_id):
"""Called when a user ID is deactivated """Called when a user ID is deactivated
""" """
# FIXME(#3714): We should probably do this in the same worker as all
# the other changes.
yield self.store.remove_from_user_dir(user_id) yield self.store.remove_from_user_dir(user_id)
yield self.store.remove_from_user_in_public_room(user_id) yield self.store.remove_from_user_in_public_room(user_id)

@ -56,7 +56,7 @@ from synapse.handlers.initial_sync import InitialSyncHandler
from synapse.handlers.message import EventCreationHandler, MessageHandler from synapse.handlers.message import EventCreationHandler, MessageHandler
from synapse.handlers.pagination import PaginationHandler from synapse.handlers.pagination import PaginationHandler
from synapse.handlers.presence import PresenceHandler from synapse.handlers.presence import PresenceHandler
from synapse.handlers.profile import ProfileHandler from synapse.handlers.profile import BaseProfileHandler, MasterProfileHandler
from synapse.handlers.read_marker import ReadMarkerHandler from synapse.handlers.read_marker import ReadMarkerHandler
from synapse.handlers.receipts import ReceiptsHandler from synapse.handlers.receipts import ReceiptsHandler
from synapse.handlers.room import RoomContextHandler, RoomCreationHandler from synapse.handlers.room import RoomContextHandler, RoomCreationHandler
@ -308,7 +308,10 @@ class HomeServer(object):
return InitialSyncHandler(self) return InitialSyncHandler(self)
def build_profile_handler(self): def build_profile_handler(self):
return ProfileHandler(self) if self.config.worker_app:
return BaseProfileHandler(self)
else:
return MasterProfileHandler(self)
def build_event_creation_handler(self): def build_event_creation_handler(self):
return EventCreationHandler(self) return EventCreationHandler(self)

@ -71,8 +71,6 @@ class ProfileWorkerStore(SQLBaseStore):
desc="get_from_remote_profile_cache", desc="get_from_remote_profile_cache",
) )
class ProfileStore(ProfileWorkerStore):
def create_profile(self, user_localpart): def create_profile(self, user_localpart):
return self._simple_insert( return self._simple_insert(
table="profiles", table="profiles",
@ -96,6 +94,8 @@ class ProfileStore(ProfileWorkerStore):
desc="set_profile_avatar_url", desc="set_profile_avatar_url",
) )
class ProfileStore(ProfileWorkerStore):
def add_remote_profile_cache(self, user_id, displayname, avatar_url): def add_remote_profile_cache(self, user_id, displayname, avatar_url):
"""Ensure we are caching the remote user's profiles. """Ensure we are caching the remote user's profiles.

@ -186,6 +186,35 @@ class RoomWorkerStore(SQLBaseStore):
desc="is_room_blocked", desc="is_room_blocked",
) )
@cachedInlineCallbacks(max_entries=10000)
def get_ratelimit_for_user(self, user_id):
"""Check if there are any overrides for ratelimiting for the given
user
Args:
user_id (str)
Returns:
RatelimitOverride if there is an override, else None. If the contents
of RatelimitOverride are None or 0 then ratelimitng has been
disabled for that user entirely.
"""
row = yield self._simple_select_one(
table="ratelimit_override",
keyvalues={"user_id": user_id},
retcols=("messages_per_second", "burst_count"),
allow_none=True,
desc="get_ratelimit_for_user",
)
if row:
defer.returnValue(RatelimitOverride(
messages_per_second=row["messages_per_second"],
burst_count=row["burst_count"],
))
else:
defer.returnValue(None)
class RoomStore(RoomWorkerStore, SearchStore): class RoomStore(RoomWorkerStore, SearchStore):
@ -469,35 +498,6 @@ class RoomStore(RoomWorkerStore, SearchStore):
"get_all_new_public_rooms", get_all_new_public_rooms "get_all_new_public_rooms", get_all_new_public_rooms
) )
@cachedInlineCallbacks(max_entries=10000)
def get_ratelimit_for_user(self, user_id):
"""Check if there are any overrides for ratelimiting for the given
user
Args:
user_id (str)
Returns:
RatelimitOverride if there is an override, else None. If the contents
of RatelimitOverride are None or 0 then ratelimitng has been
disabled for that user entirely.
"""
row = yield self._simple_select_one(
table="ratelimit_override",
keyvalues={"user_id": user_id},
retcols=("messages_per_second", "burst_count"),
allow_none=True,
desc="get_ratelimit_for_user",
)
if row:
defer.returnValue(RatelimitOverride(
messages_per_second=row["messages_per_second"],
burst_count=row["burst_count"],
))
else:
defer.returnValue(None)
@defer.inlineCallbacks @defer.inlineCallbacks
def block_room(self, room_id, user_id): def block_room(self, room_id, user_id):
yield self._simple_insert( yield self._simple_insert(

@ -20,7 +20,7 @@ from twisted.internet import defer
import synapse.types import synapse.types
from synapse.api.errors import AuthError from synapse.api.errors import AuthError
from synapse.handlers.profile import ProfileHandler from synapse.handlers.profile import MasterProfileHandler
from synapse.types import UserID from synapse.types import UserID
from tests import unittest from tests import unittest
@ -29,7 +29,7 @@ from tests.utils import setup_test_homeserver
class ProfileHandlers(object): class ProfileHandlers(object):
def __init__(self, hs): def __init__(self, hs):
self.profile_handler = ProfileHandler(hs) self.profile_handler = MasterProfileHandler(hs)
class ProfileTestCase(unittest.TestCase): class ProfileTestCase(unittest.TestCase):

Loading…
Cancel
Save