diff --git a/changelog.d/3659.feature b/changelog.d/3659.feature new file mode 100644 index 000000000..a5b4821c0 --- /dev/null +++ b/changelog.d/3659.feature @@ -0,0 +1 @@ +Support profile API endpoints on workers diff --git a/docs/workers.rst b/docs/workers.rst index aec319dd8..81146a211 100644 --- a/docs/workers.rst +++ b/docs/workers.rst @@ -265,6 +265,7 @@ Handles some event creation. It can handle REST endpoints matching:: ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/send ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/(join|invite|leave|ban|unban|kick)$ ^/_matrix/client/(api/v1|r0|unstable)/join/ + ^/_matrix/client/(api/v1|r0|unstable)/profile/ It will create events locally and then send them on to the main synapse instance to be persisted and handled. diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py index 03d39968a..a34c89fa9 100644 --- a/synapse/app/event_creator.py +++ b/synapse/app/event_creator.py @@ -45,6 +45,11 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto from synapse.replication.slave.storage.room import RoomStore from synapse.replication.slave.storage.transactions import SlavedTransactionStore from synapse.replication.tcp.client import ReplicationClientHandler +from synapse.rest.client.v1.profile import ( + ProfileAvatarURLRestServlet, + ProfileDisplaynameRestServlet, + ProfileRestServlet, +) from synapse.rest.client.v1.room import ( JoinRoomAliasServlet, RoomMembershipRestServlet, @@ -53,6 +58,7 @@ from synapse.rest.client.v1.room import ( ) from synapse.server import HomeServer from synapse.storage.engines import create_engine +from synapse.storage.user_directory import UserDirectoryStore from synapse.util.httpresourcetree import create_resource_tree from synapse.util.logcontext import LoggingContext from synapse.util.manhole import manhole @@ -62,6 +68,9 @@ logger = logging.getLogger("synapse.app.event_creator") class EventCreatorSlavedStore( + # FIXME(#3714): We need to add UserDirectoryStore as we write directly + # rather than going via the correct worker. + UserDirectoryStore, DirectoryStore, SlavedTransactionStore, SlavedProfileStore, @@ -101,6 +110,9 @@ class EventCreatorServer(HomeServer): RoomMembershipRestServlet(self).register(resource) RoomStateEventRestServlet(self).register(resource) JoinRoomAliasServlet(self).register(resource) + ProfileAvatarURLRestServlet(self).register(resource) + ProfileDisplaynameRestServlet(self).register(resource) + ProfileRestServlet(self).register(resource) resources.update({ "/_matrix/client/r0": resource, "/_matrix/client/unstable": resource, diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 9af2e8f86..75b8b7ce6 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -32,12 +32,16 @@ from ._base import BaseHandler logger = logging.getLogger(__name__) -class ProfileHandler(BaseHandler): - PROFILE_UPDATE_MS = 60 * 1000 - PROFILE_UPDATE_EVERY_MS = 24 * 60 * 60 * 1000 +class BaseProfileHandler(BaseHandler): + """Handles fetching and updating user profile information. + + BaseProfileHandler can be instantiated directly on workers and will + delegate to master when necessary. The master process should use the + subclass MasterProfileHandler + """ def __init__(self, hs): - super(ProfileHandler, self).__init__(hs) + super(BaseProfileHandler, self).__init__(hs) self.federation = hs.get_federation_client() hs.get_federation_registry().register_query_handler( @@ -46,11 +50,6 @@ class ProfileHandler(BaseHandler): self.user_directory_handler = hs.get_user_directory_handler() - if hs.config.worker_app is None: - self.clock.looping_call( - self._start_update_remote_profile_cache, self.PROFILE_UPDATE_MS, - ) - @defer.inlineCallbacks def get_profile(self, user_id): target_user = UserID.from_string(user_id) @@ -282,6 +281,20 @@ class ProfileHandler(BaseHandler): room_id, str(e.message) ) + +class MasterProfileHandler(BaseProfileHandler): + PROFILE_UPDATE_MS = 60 * 1000 + PROFILE_UPDATE_EVERY_MS = 24 * 60 * 60 * 1000 + + def __init__(self, hs): + super(MasterProfileHandler, self).__init__(hs) + + assert hs.config.worker_app is None + + self.clock.looping_call( + self._start_update_remote_profile_cache, self.PROFILE_UPDATE_MS, + ) + def _start_update_remote_profile_cache(self): return run_as_background_process( "Update remote profile", self._update_remote_profile_cache, diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 37dda6458..d8413d6aa 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -119,6 +119,8 @@ class UserDirectoryHandler(object): """Called to update index of our local user profiles when they change irrespective of any rooms the user may be in. """ + # FIXME(#3714): We should probably do this in the same worker as all + # the other changes. yield self.store.update_profile_in_user_dir( user_id, profile.display_name, profile.avatar_url, None, ) @@ -127,6 +129,8 @@ class UserDirectoryHandler(object): def handle_user_deactivated(self, user_id): """Called when a user ID is deactivated """ + # FIXME(#3714): We should probably do this in the same worker as all + # the other changes. yield self.store.remove_from_user_dir(user_id) yield self.store.remove_from_user_in_public_room(user_id) diff --git a/synapse/server.py b/synapse/server.py index 26228d8c7..a6fbc6ec0 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -56,7 +56,7 @@ from synapse.handlers.initial_sync import InitialSyncHandler from synapse.handlers.message import EventCreationHandler, MessageHandler from synapse.handlers.pagination import PaginationHandler from synapse.handlers.presence import PresenceHandler -from synapse.handlers.profile import ProfileHandler +from synapse.handlers.profile import BaseProfileHandler, MasterProfileHandler from synapse.handlers.read_marker import ReadMarkerHandler from synapse.handlers.receipts import ReceiptsHandler from synapse.handlers.room import RoomContextHandler, RoomCreationHandler @@ -308,7 +308,10 @@ class HomeServer(object): return InitialSyncHandler(self) def build_profile_handler(self): - return ProfileHandler(self) + if self.config.worker_app: + return BaseProfileHandler(self) + else: + return MasterProfileHandler(self) def build_event_creation_handler(self): return EventCreationHandler(self) diff --git a/synapse/storage/profile.py b/synapse/storage/profile.py index 60295da25..88b50f33b 100644 --- a/synapse/storage/profile.py +++ b/synapse/storage/profile.py @@ -71,8 +71,6 @@ class ProfileWorkerStore(SQLBaseStore): desc="get_from_remote_profile_cache", ) - -class ProfileStore(ProfileWorkerStore): def create_profile(self, user_localpart): return self._simple_insert( table="profiles", @@ -96,6 +94,8 @@ class ProfileStore(ProfileWorkerStore): desc="set_profile_avatar_url", ) + +class ProfileStore(ProfileWorkerStore): def add_remote_profile_cache(self, user_id, displayname, avatar_url): """Ensure we are caching the remote user's profiles. diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 3378fc77d..61013b891 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -186,6 +186,35 @@ class RoomWorkerStore(SQLBaseStore): desc="is_room_blocked", ) + @cachedInlineCallbacks(max_entries=10000) + def get_ratelimit_for_user(self, user_id): + """Check if there are any overrides for ratelimiting for the given + user + + Args: + user_id (str) + + Returns: + RatelimitOverride if there is an override, else None. If the contents + of RatelimitOverride are None or 0 then ratelimitng has been + disabled for that user entirely. + """ + row = yield self._simple_select_one( + table="ratelimit_override", + keyvalues={"user_id": user_id}, + retcols=("messages_per_second", "burst_count"), + allow_none=True, + desc="get_ratelimit_for_user", + ) + + if row: + defer.returnValue(RatelimitOverride( + messages_per_second=row["messages_per_second"], + burst_count=row["burst_count"], + )) + else: + defer.returnValue(None) + class RoomStore(RoomWorkerStore, SearchStore): @@ -469,35 +498,6 @@ class RoomStore(RoomWorkerStore, SearchStore): "get_all_new_public_rooms", get_all_new_public_rooms ) - @cachedInlineCallbacks(max_entries=10000) - def get_ratelimit_for_user(self, user_id): - """Check if there are any overrides for ratelimiting for the given - user - - Args: - user_id (str) - - Returns: - RatelimitOverride if there is an override, else None. If the contents - of RatelimitOverride are None or 0 then ratelimitng has been - disabled for that user entirely. - """ - row = yield self._simple_select_one( - table="ratelimit_override", - keyvalues={"user_id": user_id}, - retcols=("messages_per_second", "burst_count"), - allow_none=True, - desc="get_ratelimit_for_user", - ) - - if row: - defer.returnValue(RatelimitOverride( - messages_per_second=row["messages_per_second"], - burst_count=row["burst_count"], - )) - else: - defer.returnValue(None) - @defer.inlineCallbacks def block_room(self, room_id, user_id): yield self._simple_insert( diff --git a/tests/handlers/test_profile.py b/tests/handlers/test_profile.py index 62dc69003..80da1c895 100644 --- a/tests/handlers/test_profile.py +++ b/tests/handlers/test_profile.py @@ -20,7 +20,7 @@ from twisted.internet import defer import synapse.types from synapse.api.errors import AuthError -from synapse.handlers.profile import ProfileHandler +from synapse.handlers.profile import MasterProfileHandler from synapse.types import UserID from tests import unittest @@ -29,7 +29,7 @@ from tests.utils import setup_test_homeserver class ProfileHandlers(object): def __init__(self, hs): - self.profile_handler = ProfileHandler(hs) + self.profile_handler = MasterProfileHandler(hs) class ProfileTestCase(unittest.TestCase):