0
0
Fork 1
mirror of https://mau.dev/maunium/synapse.git synced 2024-11-15 22:42:23 +01:00

Convert groups local and server to async/await. (#7600)

This commit is contained in:
Patrick Cloke 2020-06-01 07:28:43 -04:00 committed by GitHub
parent c1bdd4fac7
commit 6af9cdca24
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 150 additions and 190 deletions

1
changelog.d/7600.misc Normal file
View file

@ -0,0 +1 @@
Convert groups handlers to async/await.

View file

@ -19,8 +19,6 @@ import logging
from six import string_types from six import string_types
from twisted.internet import defer
from synapse.api.errors import Codes, SynapseError from synapse.api.errors import Codes, SynapseError
from synapse.types import GroupID, RoomID, UserID, get_domain_from_id from synapse.types import GroupID, RoomID, UserID, get_domain_from_id
from synapse.util.async_helpers import concurrently_execute from synapse.util.async_helpers import concurrently_execute
@ -51,8 +49,7 @@ class GroupsServerWorkerHandler(object):
self.transport_client = hs.get_federation_transport_client() self.transport_client = hs.get_federation_transport_client()
self.profile_handler = hs.get_profile_handler() self.profile_handler = hs.get_profile_handler()
@defer.inlineCallbacks async def check_group_is_ours(
def check_group_is_ours(
self, group_id, requester_user_id, and_exists=False, and_is_admin=None self, group_id, requester_user_id, and_exists=False, and_is_admin=None
): ):
"""Check that the group is ours, and optionally if it exists. """Check that the group is ours, and optionally if it exists.
@ -68,25 +65,24 @@ class GroupsServerWorkerHandler(object):
if not self.is_mine_id(group_id): if not self.is_mine_id(group_id):
raise SynapseError(400, "Group not on this server") raise SynapseError(400, "Group not on this server")
group = yield self.store.get_group(group_id) group = await self.store.get_group(group_id)
if and_exists and not group: if and_exists and not group:
raise SynapseError(404, "Unknown group") raise SynapseError(404, "Unknown group")
is_user_in_group = yield self.store.is_user_in_group( is_user_in_group = await self.store.is_user_in_group(
requester_user_id, group_id requester_user_id, group_id
) )
if group and not is_user_in_group and not group["is_public"]: if group and not is_user_in_group and not group["is_public"]:
raise SynapseError(404, "Unknown group") raise SynapseError(404, "Unknown group")
if and_is_admin: if and_is_admin:
is_admin = yield self.store.is_user_admin_in_group(group_id, and_is_admin) is_admin = await self.store.is_user_admin_in_group(group_id, and_is_admin)
if not is_admin: if not is_admin:
raise SynapseError(403, "User is not admin in group") raise SynapseError(403, "User is not admin in group")
return group return group
@defer.inlineCallbacks async def get_group_summary(self, group_id, requester_user_id):
def get_group_summary(self, group_id, requester_user_id):
"""Get the summary for a group as seen by requester_user_id. """Get the summary for a group as seen by requester_user_id.
The group summary consists of the profile of the room, and a curated The group summary consists of the profile of the room, and a curated
@ -95,28 +91,28 @@ class GroupsServerWorkerHandler(object):
A user/room may appear in multiple roles/categories. A user/room may appear in multiple roles/categories.
""" """
yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True) await self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
is_user_in_group = yield self.store.is_user_in_group( is_user_in_group = await self.store.is_user_in_group(
requester_user_id, group_id requester_user_id, group_id
) )
profile = yield self.get_group_profile(group_id, requester_user_id) profile = await self.get_group_profile(group_id, requester_user_id)
users, roles = yield self.store.get_users_for_summary_by_role( users, roles = await self.store.get_users_for_summary_by_role(
group_id, include_private=is_user_in_group group_id, include_private=is_user_in_group
) )
# TODO: Add profiles to users # TODO: Add profiles to users
rooms, categories = yield self.store.get_rooms_for_summary_by_category( rooms, categories = await self.store.get_rooms_for_summary_by_category(
group_id, include_private=is_user_in_group group_id, include_private=is_user_in_group
) )
for room_entry in rooms: for room_entry in rooms:
room_id = room_entry["room_id"] room_id = room_entry["room_id"]
joined_users = yield self.store.get_users_in_room(room_id) joined_users = await self.store.get_users_in_room(room_id)
entry = yield self.room_list_handler.generate_room_entry( entry = await self.room_list_handler.generate_room_entry(
room_id, len(joined_users), with_alias=False, allow_private=True room_id, len(joined_users), with_alias=False, allow_private=True
) )
entry = dict(entry) # so we don't change whats cached entry = dict(entry) # so we don't change whats cached
@ -130,7 +126,7 @@ class GroupsServerWorkerHandler(object):
user_id = entry["user_id"] user_id = entry["user_id"]
if not self.is_mine_id(requester_user_id): if not self.is_mine_id(requester_user_id):
attestation = yield self.store.get_remote_attestation(group_id, user_id) attestation = await self.store.get_remote_attestation(group_id, user_id)
if not attestation: if not attestation:
continue continue
@ -140,12 +136,12 @@ class GroupsServerWorkerHandler(object):
group_id, user_id group_id, user_id
) )
user_profile = yield self.profile_handler.get_profile_from_cache(user_id) user_profile = await self.profile_handler.get_profile_from_cache(user_id)
entry.update(user_profile) entry.update(user_profile)
users.sort(key=lambda e: e.get("order", 0)) users.sort(key=lambda e: e.get("order", 0))
membership_info = yield self.store.get_users_membership_info_in_group( membership_info = await self.store.get_users_membership_info_in_group(
group_id, requester_user_id group_id, requester_user_id
) )
@ -164,22 +160,20 @@ class GroupsServerWorkerHandler(object):
"user": membership_info, "user": membership_info,
} }
@defer.inlineCallbacks async def get_group_categories(self, group_id, requester_user_id):
def get_group_categories(self, group_id, requester_user_id):
"""Get all categories in a group (as seen by user) """Get all categories in a group (as seen by user)
""" """
yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True) await self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
categories = yield self.store.get_group_categories(group_id=group_id) categories = await self.store.get_group_categories(group_id=group_id)
return {"categories": categories} return {"categories": categories}
@defer.inlineCallbacks async def get_group_category(self, group_id, requester_user_id, category_id):
def get_group_category(self, group_id, requester_user_id, category_id):
"""Get a specific category in a group (as seen by user) """Get a specific category in a group (as seen by user)
""" """
yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True) await self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
res = yield self.store.get_group_category( res = await self.store.get_group_category(
group_id=group_id, category_id=category_id group_id=group_id, category_id=category_id
) )
@ -187,32 +181,29 @@ class GroupsServerWorkerHandler(object):
return res return res
@defer.inlineCallbacks async def get_group_roles(self, group_id, requester_user_id):
def get_group_roles(self, group_id, requester_user_id):
"""Get all roles in a group (as seen by user) """Get all roles in a group (as seen by user)
""" """
yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True) await self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
roles = yield self.store.get_group_roles(group_id=group_id) roles = await self.store.get_group_roles(group_id=group_id)
return {"roles": roles} return {"roles": roles}
@defer.inlineCallbacks async def get_group_role(self, group_id, requester_user_id, role_id):
def get_group_role(self, group_id, requester_user_id, role_id):
"""Get a specific role in a group (as seen by user) """Get a specific role in a group (as seen by user)
""" """
yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True) await self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
res = yield self.store.get_group_role(group_id=group_id, role_id=role_id) res = await self.store.get_group_role(group_id=group_id, role_id=role_id)
return res return res
@defer.inlineCallbacks async def get_group_profile(self, group_id, requester_user_id):
def get_group_profile(self, group_id, requester_user_id):
"""Get the group profile as seen by requester_user_id """Get the group profile as seen by requester_user_id
""" """
yield self.check_group_is_ours(group_id, requester_user_id) await self.check_group_is_ours(group_id, requester_user_id)
group = yield self.store.get_group(group_id) group = await self.store.get_group(group_id)
if group: if group:
cols = [ cols = [
@ -229,20 +220,19 @@ class GroupsServerWorkerHandler(object):
else: else:
raise SynapseError(404, "Unknown group") raise SynapseError(404, "Unknown group")
@defer.inlineCallbacks async def get_users_in_group(self, group_id, requester_user_id):
def get_users_in_group(self, group_id, requester_user_id):
"""Get the users in group as seen by requester_user_id. """Get the users in group as seen by requester_user_id.
The ordering is arbitrary at the moment The ordering is arbitrary at the moment
""" """
yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True) await self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
is_user_in_group = yield self.store.is_user_in_group( is_user_in_group = await self.store.is_user_in_group(
requester_user_id, group_id requester_user_id, group_id
) )
user_results = yield self.store.get_users_in_group( user_results = await self.store.get_users_in_group(
group_id, include_private=is_user_in_group group_id, include_private=is_user_in_group
) )
@ -254,14 +244,14 @@ class GroupsServerWorkerHandler(object):
entry = {"user_id": g_user_id} entry = {"user_id": g_user_id}
profile = yield self.profile_handler.get_profile_from_cache(g_user_id) profile = await self.profile_handler.get_profile_from_cache(g_user_id)
entry.update(profile) entry.update(profile)
entry["is_public"] = bool(is_public) entry["is_public"] = bool(is_public)
entry["is_privileged"] = bool(is_privileged) entry["is_privileged"] = bool(is_privileged)
if not self.is_mine_id(g_user_id): if not self.is_mine_id(g_user_id):
attestation = yield self.store.get_remote_attestation( attestation = await self.store.get_remote_attestation(
group_id, g_user_id group_id, g_user_id
) )
if not attestation: if not attestation:
@ -279,30 +269,29 @@ class GroupsServerWorkerHandler(object):
return {"chunk": chunk, "total_user_count_estimate": len(user_results)} return {"chunk": chunk, "total_user_count_estimate": len(user_results)}
@defer.inlineCallbacks async def get_invited_users_in_group(self, group_id, requester_user_id):
def get_invited_users_in_group(self, group_id, requester_user_id):
"""Get the users that have been invited to a group as seen by requester_user_id. """Get the users that have been invited to a group as seen by requester_user_id.
The ordering is arbitrary at the moment The ordering is arbitrary at the moment
""" """
yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True) await self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
is_user_in_group = yield self.store.is_user_in_group( is_user_in_group = await self.store.is_user_in_group(
requester_user_id, group_id requester_user_id, group_id
) )
if not is_user_in_group: if not is_user_in_group:
raise SynapseError(403, "User not in group") raise SynapseError(403, "User not in group")
invited_users = yield self.store.get_invited_users_in_group(group_id) invited_users = await self.store.get_invited_users_in_group(group_id)
user_profiles = [] user_profiles = []
for user_id in invited_users: for user_id in invited_users:
user_profile = {"user_id": user_id} user_profile = {"user_id": user_id}
try: try:
profile = yield self.profile_handler.get_profile_from_cache(user_id) profile = await self.profile_handler.get_profile_from_cache(user_id)
user_profile.update(profile) user_profile.update(profile)
except Exception as e: except Exception as e:
logger.warning("Error getting profile for %s: %s", user_id, e) logger.warning("Error getting profile for %s: %s", user_id, e)
@ -310,20 +299,19 @@ class GroupsServerWorkerHandler(object):
return {"chunk": user_profiles, "total_user_count_estimate": len(invited_users)} return {"chunk": user_profiles, "total_user_count_estimate": len(invited_users)}
@defer.inlineCallbacks async def get_rooms_in_group(self, group_id, requester_user_id):
def get_rooms_in_group(self, group_id, requester_user_id):
"""Get the rooms in group as seen by requester_user_id """Get the rooms in group as seen by requester_user_id
This returns rooms in order of decreasing number of joined users This returns rooms in order of decreasing number of joined users
""" """
yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True) await self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
is_user_in_group = yield self.store.is_user_in_group( is_user_in_group = await self.store.is_user_in_group(
requester_user_id, group_id requester_user_id, group_id
) )
room_results = yield self.store.get_rooms_in_group( room_results = await self.store.get_rooms_in_group(
group_id, include_private=is_user_in_group group_id, include_private=is_user_in_group
) )
@ -331,8 +319,8 @@ class GroupsServerWorkerHandler(object):
for room_result in room_results: for room_result in room_results:
room_id = room_result["room_id"] room_id = room_result["room_id"]
joined_users = yield self.store.get_users_in_room(room_id) joined_users = await self.store.get_users_in_room(room_id)
entry = yield self.room_list_handler.generate_room_entry( entry = await self.room_list_handler.generate_room_entry(
room_id, len(joined_users), with_alias=False, allow_private=True room_id, len(joined_users), with_alias=False, allow_private=True
) )
@ -355,13 +343,12 @@ class GroupsServerHandler(GroupsServerWorkerHandler):
# Ensure attestations get renewed # Ensure attestations get renewed
hs.get_groups_attestation_renewer() hs.get_groups_attestation_renewer()
@defer.inlineCallbacks async def update_group_summary_room(
def update_group_summary_room(
self, group_id, requester_user_id, room_id, category_id, content self, group_id, requester_user_id, room_id, category_id, content
): ):
"""Add/update a room to the group summary """Add/update a room to the group summary
""" """
yield self.check_group_is_ours( await self.check_group_is_ours(
group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
) )
@ -371,7 +358,7 @@ class GroupsServerHandler(GroupsServerWorkerHandler):
is_public = _parse_visibility_from_contents(content) is_public = _parse_visibility_from_contents(content)
yield self.store.add_room_to_summary( await self.store.add_room_to_summary(
group_id=group_id, group_id=group_id,
room_id=room_id, room_id=room_id,
category_id=category_id, category_id=category_id,
@ -381,31 +368,29 @@ class GroupsServerHandler(GroupsServerWorkerHandler):
return {} return {}
@defer.inlineCallbacks async def delete_group_summary_room(
def delete_group_summary_room(
self, group_id, requester_user_id, room_id, category_id self, group_id, requester_user_id, room_id, category_id
): ):
"""Remove a room from the summary """Remove a room from the summary
""" """
yield self.check_group_is_ours( await self.check_group_is_ours(
group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
) )
yield self.store.remove_room_from_summary( await self.store.remove_room_from_summary(
group_id=group_id, room_id=room_id, category_id=category_id group_id=group_id, room_id=room_id, category_id=category_id
) )
return {} return {}
@defer.inlineCallbacks async def set_group_join_policy(self, group_id, requester_user_id, content):
def set_group_join_policy(self, group_id, requester_user_id, content):
"""Sets the group join policy. """Sets the group join policy.
Currently supported policies are: Currently supported policies are:
- "invite": an invite must be received and accepted in order to join. - "invite": an invite must be received and accepted in order to join.
- "open": anyone can join. - "open": anyone can join.
""" """
yield self.check_group_is_ours( await self.check_group_is_ours(
group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
) )
@ -413,22 +398,23 @@ class GroupsServerHandler(GroupsServerWorkerHandler):
if join_policy is None: if join_policy is None:
raise SynapseError(400, "No value specified for 'm.join_policy'") raise SynapseError(400, "No value specified for 'm.join_policy'")
yield self.store.set_group_join_policy(group_id, join_policy=join_policy) await self.store.set_group_join_policy(group_id, join_policy=join_policy)
return {} return {}
@defer.inlineCallbacks async def update_group_category(
def update_group_category(self, group_id, requester_user_id, category_id, content): self, group_id, requester_user_id, category_id, content
):
"""Add/Update a group category """Add/Update a group category
""" """
yield self.check_group_is_ours( await self.check_group_is_ours(
group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
) )
is_public = _parse_visibility_from_contents(content) is_public = _parse_visibility_from_contents(content)
profile = content.get("profile") profile = content.get("profile")
yield self.store.upsert_group_category( await self.store.upsert_group_category(
group_id=group_id, group_id=group_id,
category_id=category_id, category_id=category_id,
is_public=is_public, is_public=is_public,
@ -437,25 +423,23 @@ class GroupsServerHandler(GroupsServerWorkerHandler):
return {} return {}
@defer.inlineCallbacks async def delete_group_category(self, group_id, requester_user_id, category_id):
def delete_group_category(self, group_id, requester_user_id, category_id):
"""Delete a group category """Delete a group category
""" """
yield self.check_group_is_ours( await self.check_group_is_ours(
group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
) )
yield self.store.remove_group_category( await self.store.remove_group_category(
group_id=group_id, category_id=category_id group_id=group_id, category_id=category_id
) )
return {} return {}
@defer.inlineCallbacks async def update_group_role(self, group_id, requester_user_id, role_id, content):
def update_group_role(self, group_id, requester_user_id, role_id, content):
"""Add/update a role in a group """Add/update a role in a group
""" """
yield self.check_group_is_ours( await self.check_group_is_ours(
group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
) )
@ -463,31 +447,29 @@ class GroupsServerHandler(GroupsServerWorkerHandler):
profile = content.get("profile") profile = content.get("profile")
yield self.store.upsert_group_role( await self.store.upsert_group_role(
group_id=group_id, role_id=role_id, is_public=is_public, profile=profile group_id=group_id, role_id=role_id, is_public=is_public, profile=profile
) )
return {} return {}
@defer.inlineCallbacks async def delete_group_role(self, group_id, requester_user_id, role_id):
def delete_group_role(self, group_id, requester_user_id, role_id):
"""Remove role from group """Remove role from group
""" """
yield self.check_group_is_ours( await self.check_group_is_ours(
group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
) )
yield self.store.remove_group_role(group_id=group_id, role_id=role_id) await self.store.remove_group_role(group_id=group_id, role_id=role_id)
return {} return {}
@defer.inlineCallbacks async def update_group_summary_user(
def update_group_summary_user(
self, group_id, requester_user_id, user_id, role_id, content self, group_id, requester_user_id, user_id, role_id, content
): ):
"""Add/update a users entry in the group summary """Add/update a users entry in the group summary
""" """
yield self.check_group_is_ours( await self.check_group_is_ours(
group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
) )
@ -495,7 +477,7 @@ class GroupsServerHandler(GroupsServerWorkerHandler):
is_public = _parse_visibility_from_contents(content) is_public = _parse_visibility_from_contents(content)
yield self.store.add_user_to_summary( await self.store.add_user_to_summary(
group_id=group_id, group_id=group_id,
user_id=user_id, user_id=user_id,
role_id=role_id, role_id=role_id,
@ -505,25 +487,25 @@ class GroupsServerHandler(GroupsServerWorkerHandler):
return {} return {}
@defer.inlineCallbacks async def delete_group_summary_user(
def delete_group_summary_user(self, group_id, requester_user_id, user_id, role_id): self, group_id, requester_user_id, user_id, role_id
):
"""Remove a user from the group summary """Remove a user from the group summary
""" """
yield self.check_group_is_ours( await self.check_group_is_ours(
group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
) )
yield self.store.remove_user_from_summary( await self.store.remove_user_from_summary(
group_id=group_id, user_id=user_id, role_id=role_id group_id=group_id, user_id=user_id, role_id=role_id
) )
return {} return {}
@defer.inlineCallbacks async def update_group_profile(self, group_id, requester_user_id, content):
def update_group_profile(self, group_id, requester_user_id, content):
"""Update the group profile """Update the group profile
""" """
yield self.check_group_is_ours( await self.check_group_is_ours(
group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
) )
@ -535,40 +517,38 @@ class GroupsServerHandler(GroupsServerWorkerHandler):
raise SynapseError(400, "%r value is not a string" % (keyname,)) raise SynapseError(400, "%r value is not a string" % (keyname,))
profile[keyname] = value profile[keyname] = value
yield self.store.update_group_profile(group_id, profile) await self.store.update_group_profile(group_id, profile)
@defer.inlineCallbacks async def add_room_to_group(self, group_id, requester_user_id, room_id, content):
def add_room_to_group(self, group_id, requester_user_id, room_id, content):
"""Add room to group """Add room to group
""" """
RoomID.from_string(room_id) # Ensure valid room id RoomID.from_string(room_id) # Ensure valid room id
yield self.check_group_is_ours( await self.check_group_is_ours(
group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
) )
is_public = _parse_visibility_from_contents(content) is_public = _parse_visibility_from_contents(content)
yield self.store.add_room_to_group(group_id, room_id, is_public=is_public) await self.store.add_room_to_group(group_id, room_id, is_public=is_public)
return {} return {}
@defer.inlineCallbacks async def update_room_in_group(
def update_room_in_group(
self, group_id, requester_user_id, room_id, config_key, content self, group_id, requester_user_id, room_id, config_key, content
): ):
"""Update room in group """Update room in group
""" """
RoomID.from_string(room_id) # Ensure valid room id RoomID.from_string(room_id) # Ensure valid room id
yield self.check_group_is_ours( await self.check_group_is_ours(
group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
) )
if config_key == "m.visibility": if config_key == "m.visibility":
is_public = _parse_visibility_dict(content) is_public = _parse_visibility_dict(content)
yield self.store.update_room_in_group_visibility( await self.store.update_room_in_group_visibility(
group_id, room_id, is_public=is_public group_id, room_id, is_public=is_public
) )
else: else:
@ -576,36 +556,34 @@ class GroupsServerHandler(GroupsServerWorkerHandler):
return {} return {}
@defer.inlineCallbacks async def remove_room_from_group(self, group_id, requester_user_id, room_id):
def remove_room_from_group(self, group_id, requester_user_id, room_id):
"""Remove room from group """Remove room from group
""" """
yield self.check_group_is_ours( await self.check_group_is_ours(
group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
) )
yield self.store.remove_room_from_group(group_id, room_id) await self.store.remove_room_from_group(group_id, room_id)
return {} return {}
@defer.inlineCallbacks async def invite_to_group(self, group_id, user_id, requester_user_id, content):
def invite_to_group(self, group_id, user_id, requester_user_id, content):
"""Invite user to group """Invite user to group
""" """
group = yield self.check_group_is_ours( group = await self.check_group_is_ours(
group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
) )
# TODO: Check if user knocked # TODO: Check if user knocked
invited_users = yield self.store.get_invited_users_in_group(group_id) invited_users = await self.store.get_invited_users_in_group(group_id)
if user_id in invited_users: if user_id in invited_users:
raise SynapseError( raise SynapseError(
400, "User already invited to group", errcode=Codes.BAD_STATE 400, "User already invited to group", errcode=Codes.BAD_STATE
) )
user_results = yield self.store.get_users_in_group( user_results = await self.store.get_users_in_group(
group_id, include_private=True group_id, include_private=True
) )
if user_id in (user_result["user_id"] for user_result in user_results): if user_id in (user_result["user_id"] for user_result in user_results):
@ -618,18 +596,18 @@ class GroupsServerHandler(GroupsServerWorkerHandler):
if self.hs.is_mine_id(user_id): if self.hs.is_mine_id(user_id):
groups_local = self.hs.get_groups_local_handler() groups_local = self.hs.get_groups_local_handler()
res = yield groups_local.on_invite(group_id, user_id, content) res = await groups_local.on_invite(group_id, user_id, content)
local_attestation = None local_attestation = None
else: else:
local_attestation = self.attestations.create_attestation(group_id, user_id) local_attestation = self.attestations.create_attestation(group_id, user_id)
content.update({"attestation": local_attestation}) content.update({"attestation": local_attestation})
res = yield self.transport_client.invite_to_group_notification( res = await self.transport_client.invite_to_group_notification(
get_domain_from_id(user_id), group_id, user_id, content get_domain_from_id(user_id), group_id, user_id, content
) )
user_profile = res.get("user_profile", {}) user_profile = res.get("user_profile", {})
yield self.store.add_remote_profile_cache( await self.store.add_remote_profile_cache(
user_id, user_id,
displayname=user_profile.get("displayname"), displayname=user_profile.get("displayname"),
avatar_url=user_profile.get("avatar_url"), avatar_url=user_profile.get("avatar_url"),
@ -639,13 +617,13 @@ class GroupsServerHandler(GroupsServerWorkerHandler):
if not self.hs.is_mine_id(user_id): if not self.hs.is_mine_id(user_id):
remote_attestation = res["attestation"] remote_attestation = res["attestation"]
yield self.attestations.verify_attestation( await self.attestations.verify_attestation(
remote_attestation, user_id=user_id, group_id=group_id remote_attestation, user_id=user_id, group_id=group_id
) )
else: else:
remote_attestation = None remote_attestation = None
yield self.store.add_user_to_group( await self.store.add_user_to_group(
group_id, group_id,
user_id, user_id,
is_admin=False, is_admin=False,
@ -654,15 +632,14 @@ class GroupsServerHandler(GroupsServerWorkerHandler):
remote_attestation=remote_attestation, remote_attestation=remote_attestation,
) )
elif res["state"] == "invite": elif res["state"] == "invite":
yield self.store.add_group_invite(group_id, user_id) await self.store.add_group_invite(group_id, user_id)
return {"state": "invite"} return {"state": "invite"}
elif res["state"] == "reject": elif res["state"] == "reject":
return {"state": "reject"} return {"state": "reject"}
else: else:
raise SynapseError(502, "Unknown state returned by HS") raise SynapseError(502, "Unknown state returned by HS")
@defer.inlineCallbacks async def _add_user(self, group_id, user_id, content):
def _add_user(self, group_id, user_id, content):
"""Add a user to a group based on a content dict. """Add a user to a group based on a content dict.
See accept_invite, join_group. See accept_invite, join_group.
@ -672,7 +649,7 @@ class GroupsServerHandler(GroupsServerWorkerHandler):
remote_attestation = content["attestation"] remote_attestation = content["attestation"]
yield self.attestations.verify_attestation( await self.attestations.verify_attestation(
remote_attestation, user_id=user_id, group_id=group_id remote_attestation, user_id=user_id, group_id=group_id
) )
else: else:
@ -681,7 +658,7 @@ class GroupsServerHandler(GroupsServerWorkerHandler):
is_public = _parse_visibility_from_contents(content) is_public = _parse_visibility_from_contents(content)
yield self.store.add_user_to_group( await self.store.add_user_to_group(
group_id, group_id,
user_id, user_id,
is_admin=False, is_admin=False,
@ -692,59 +669,55 @@ class GroupsServerHandler(GroupsServerWorkerHandler):
return local_attestation return local_attestation
@defer.inlineCallbacks async def accept_invite(self, group_id, requester_user_id, content):
def accept_invite(self, group_id, requester_user_id, content):
"""User tries to accept an invite to the group. """User tries to accept an invite to the group.
This is different from them asking to join, and so should error if no This is different from them asking to join, and so should error if no
invite exists (and they're not a member of the group) invite exists (and they're not a member of the group)
""" """
yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True) await self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
is_invited = yield self.store.is_user_invited_to_local_group( is_invited = await self.store.is_user_invited_to_local_group(
group_id, requester_user_id group_id, requester_user_id
) )
if not is_invited: if not is_invited:
raise SynapseError(403, "User not invited to group") raise SynapseError(403, "User not invited to group")
local_attestation = yield self._add_user(group_id, requester_user_id, content) local_attestation = await self._add_user(group_id, requester_user_id, content)
return {"state": "join", "attestation": local_attestation} return {"state": "join", "attestation": local_attestation}
@defer.inlineCallbacks async def join_group(self, group_id, requester_user_id, content):
def join_group(self, group_id, requester_user_id, content):
"""User tries to join the group. """User tries to join the group.
This will error if the group requires an invite/knock to join This will error if the group requires an invite/knock to join
""" """
group_info = yield self.check_group_is_ours( group_info = await self.check_group_is_ours(
group_id, requester_user_id, and_exists=True group_id, requester_user_id, and_exists=True
) )
if group_info["join_policy"] != "open": if group_info["join_policy"] != "open":
raise SynapseError(403, "Group is not publicly joinable") raise SynapseError(403, "Group is not publicly joinable")
local_attestation = yield self._add_user(group_id, requester_user_id, content) local_attestation = await self._add_user(group_id, requester_user_id, content)
return {"state": "join", "attestation": local_attestation} return {"state": "join", "attestation": local_attestation}
@defer.inlineCallbacks async def knock(self, group_id, requester_user_id, content):
def knock(self, group_id, requester_user_id, content):
"""A user requests becoming a member of the group """A user requests becoming a member of the group
""" """
yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True) await self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
raise NotImplementedError() raise NotImplementedError()
@defer.inlineCallbacks async def accept_knock(self, group_id, requester_user_id, content):
def accept_knock(self, group_id, requester_user_id, content):
"""Accept a users knock to the room. """Accept a users knock to the room.
Errors if the user hasn't knocked, rather than inviting them. Errors if the user hasn't knocked, rather than inviting them.
""" """
yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True) await self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
raise NotImplementedError() raise NotImplementedError()
@ -872,8 +845,6 @@ class GroupsServerHandler(GroupsServerWorkerHandler):
group_id (str) group_id (str)
request_user_id (str) request_user_id (str)
Returns:
Deferred
""" """
await self.check_group_is_ours(group_id, requester_user_id, and_exists=True) await self.check_group_is_ours(group_id, requester_user_id, and_exists=True)

View file

@ -18,8 +18,6 @@ import logging
from six import iteritems from six import iteritems
from twisted.internet import defer
from synapse.api.errors import HttpResponseException, RequestSendFailed, SynapseError from synapse.api.errors import HttpResponseException, RequestSendFailed, SynapseError
from synapse.types import get_domain_from_id from synapse.types import get_domain_from_id
@ -92,19 +90,18 @@ class GroupsLocalWorkerHandler(object):
get_group_role = _create_rerouter("get_group_role") get_group_role = _create_rerouter("get_group_role")
get_group_roles = _create_rerouter("get_group_roles") get_group_roles = _create_rerouter("get_group_roles")
@defer.inlineCallbacks async def get_group_summary(self, group_id, requester_user_id):
def get_group_summary(self, group_id, requester_user_id):
"""Get the group summary for a group. """Get the group summary for a group.
If the group is remote we check that the users have valid attestations. If the group is remote we check that the users have valid attestations.
""" """
if self.is_mine_id(group_id): if self.is_mine_id(group_id):
res = yield self.groups_server_handler.get_group_summary( res = await self.groups_server_handler.get_group_summary(
group_id, requester_user_id group_id, requester_user_id
) )
else: else:
try: try:
res = yield self.transport_client.get_group_summary( res = await self.transport_client.get_group_summary(
get_domain_from_id(group_id), group_id, requester_user_id get_domain_from_id(group_id), group_id, requester_user_id
) )
except HttpResponseException as e: except HttpResponseException as e:
@ -122,7 +119,7 @@ class GroupsLocalWorkerHandler(object):
attestation = entry.pop("attestation", {}) attestation = entry.pop("attestation", {})
try: try:
if get_domain_from_id(g_user_id) != group_server_name: if get_domain_from_id(g_user_id) != group_server_name:
yield self.attestations.verify_attestation( await self.attestations.verify_attestation(
attestation, attestation,
group_id=group_id, group_id=group_id,
user_id=g_user_id, user_id=g_user_id,
@ -139,19 +136,18 @@ class GroupsLocalWorkerHandler(object):
# Add `is_publicised` flag to indicate whether the user has publicised their # Add `is_publicised` flag to indicate whether the user has publicised their
# membership of the group on their profile # membership of the group on their profile
result = yield self.store.get_publicised_groups_for_user(requester_user_id) result = await self.store.get_publicised_groups_for_user(requester_user_id)
is_publicised = group_id in result is_publicised = group_id in result
res.setdefault("user", {})["is_publicised"] = is_publicised res.setdefault("user", {})["is_publicised"] = is_publicised
return res return res
@defer.inlineCallbacks async def get_users_in_group(self, group_id, requester_user_id):
def get_users_in_group(self, group_id, requester_user_id):
"""Get users in a group """Get users in a group
""" """
if self.is_mine_id(group_id): if self.is_mine_id(group_id):
res = yield self.groups_server_handler.get_users_in_group( res = await self.groups_server_handler.get_users_in_group(
group_id, requester_user_id group_id, requester_user_id
) )
return res return res
@ -159,7 +155,7 @@ class GroupsLocalWorkerHandler(object):
group_server_name = get_domain_from_id(group_id) group_server_name = get_domain_from_id(group_id)
try: try:
res = yield self.transport_client.get_users_in_group( res = await self.transport_client.get_users_in_group(
get_domain_from_id(group_id), group_id, requester_user_id get_domain_from_id(group_id), group_id, requester_user_id
) )
except HttpResponseException as e: except HttpResponseException as e:
@ -174,7 +170,7 @@ class GroupsLocalWorkerHandler(object):
attestation = entry.pop("attestation", {}) attestation = entry.pop("attestation", {})
try: try:
if get_domain_from_id(g_user_id) != group_server_name: if get_domain_from_id(g_user_id) != group_server_name:
yield self.attestations.verify_attestation( await self.attestations.verify_attestation(
attestation, attestation,
group_id=group_id, group_id=group_id,
user_id=g_user_id, user_id=g_user_id,
@ -188,15 +184,13 @@ class GroupsLocalWorkerHandler(object):
return res return res
@defer.inlineCallbacks async def get_joined_groups(self, user_id):
def get_joined_groups(self, user_id): group_ids = await self.store.get_joined_groups(user_id)
group_ids = yield self.store.get_joined_groups(user_id)
return {"groups": group_ids} return {"groups": group_ids}
@defer.inlineCallbacks async def get_publicised_groups_for_user(self, user_id):
def get_publicised_groups_for_user(self, user_id):
if self.hs.is_mine_id(user_id): if self.hs.is_mine_id(user_id):
result = yield self.store.get_publicised_groups_for_user(user_id) result = await self.store.get_publicised_groups_for_user(user_id)
# Check AS associated groups for this user - this depends on the # Check AS associated groups for this user - this depends on the
# RegExps in the AS registration file (under `users`) # RegExps in the AS registration file (under `users`)
@ -206,7 +200,7 @@ class GroupsLocalWorkerHandler(object):
return {"groups": result} return {"groups": result}
else: else:
try: try:
bulk_result = yield self.transport_client.bulk_get_publicised_groups( bulk_result = await self.transport_client.bulk_get_publicised_groups(
get_domain_from_id(user_id), [user_id] get_domain_from_id(user_id), [user_id]
) )
except HttpResponseException as e: except HttpResponseException as e:
@ -218,8 +212,7 @@ class GroupsLocalWorkerHandler(object):
# TODO: Verify attestations # TODO: Verify attestations
return {"groups": result} return {"groups": result}
@defer.inlineCallbacks async def bulk_get_publicised_groups(self, user_ids, proxy=True):
def bulk_get_publicised_groups(self, user_ids, proxy=True):
destinations = {} destinations = {}
local_users = set() local_users = set()
@ -236,7 +229,7 @@ class GroupsLocalWorkerHandler(object):
failed_results = [] failed_results = []
for destination, dest_user_ids in iteritems(destinations): for destination, dest_user_ids in iteritems(destinations):
try: try:
r = yield self.transport_client.bulk_get_publicised_groups( r = await self.transport_client.bulk_get_publicised_groups(
destination, list(dest_user_ids) destination, list(dest_user_ids)
) )
results.update(r["users"]) results.update(r["users"])
@ -244,7 +237,7 @@ class GroupsLocalWorkerHandler(object):
failed_results.extend(dest_user_ids) failed_results.extend(dest_user_ids)
for uid in local_users: for uid in local_users:
results[uid] = yield self.store.get_publicised_groups_for_user(uid) results[uid] = await self.store.get_publicised_groups_for_user(uid)
# Check AS associated groups for this user - this depends on the # Check AS associated groups for this user - this depends on the
# RegExps in the AS registration file (under `users`) # RegExps in the AS registration file (under `users`)
@ -333,12 +326,11 @@ class GroupsLocalHandler(GroupsLocalWorkerHandler):
return res return res
@defer.inlineCallbacks async def join_group(self, group_id, user_id, content):
def join_group(self, group_id, user_id, content):
"""Request to join a group """Request to join a group
""" """
if self.is_mine_id(group_id): if self.is_mine_id(group_id):
yield self.groups_server_handler.join_group(group_id, user_id, content) await self.groups_server_handler.join_group(group_id, user_id, content)
local_attestation = None local_attestation = None
remote_attestation = None remote_attestation = None
else: else:
@ -346,7 +338,7 @@ class GroupsLocalHandler(GroupsLocalWorkerHandler):
content["attestation"] = local_attestation content["attestation"] = local_attestation
try: try:
res = yield self.transport_client.join_group( res = await self.transport_client.join_group(
get_domain_from_id(group_id), group_id, user_id, content get_domain_from_id(group_id), group_id, user_id, content
) )
except HttpResponseException as e: except HttpResponseException as e:
@ -356,7 +348,7 @@ class GroupsLocalHandler(GroupsLocalWorkerHandler):
remote_attestation = res["attestation"] remote_attestation = res["attestation"]
yield self.attestations.verify_attestation( await self.attestations.verify_attestation(
remote_attestation, remote_attestation,
group_id=group_id, group_id=group_id,
user_id=user_id, user_id=user_id,
@ -366,7 +358,7 @@ class GroupsLocalHandler(GroupsLocalWorkerHandler):
# TODO: Check that the group is public and we're being added publically # TODO: Check that the group is public and we're being added publically
is_publicised = content.get("publicise", False) is_publicised = content.get("publicise", False)
token = yield self.store.register_user_group_membership( token = await self.store.register_user_group_membership(
group_id, group_id,
user_id, user_id,
membership="join", membership="join",
@ -379,12 +371,11 @@ class GroupsLocalHandler(GroupsLocalWorkerHandler):
return {} return {}
@defer.inlineCallbacks async def accept_invite(self, group_id, user_id, content):
def accept_invite(self, group_id, user_id, content):
"""Accept an invite to a group """Accept an invite to a group
""" """
if self.is_mine_id(group_id): if self.is_mine_id(group_id):
yield self.groups_server_handler.accept_invite(group_id, user_id, content) await self.groups_server_handler.accept_invite(group_id, user_id, content)
local_attestation = None local_attestation = None
remote_attestation = None remote_attestation = None
else: else:
@ -392,7 +383,7 @@ class GroupsLocalHandler(GroupsLocalWorkerHandler):
content["attestation"] = local_attestation content["attestation"] = local_attestation
try: try:
res = yield self.transport_client.accept_group_invite( res = await self.transport_client.accept_group_invite(
get_domain_from_id(group_id), group_id, user_id, content get_domain_from_id(group_id), group_id, user_id, content
) )
except HttpResponseException as e: except HttpResponseException as e:
@ -402,7 +393,7 @@ class GroupsLocalHandler(GroupsLocalWorkerHandler):
remote_attestation = res["attestation"] remote_attestation = res["attestation"]
yield self.attestations.verify_attestation( await self.attestations.verify_attestation(
remote_attestation, remote_attestation,
group_id=group_id, group_id=group_id,
user_id=user_id, user_id=user_id,
@ -412,7 +403,7 @@ class GroupsLocalHandler(GroupsLocalWorkerHandler):
# TODO: Check that the group is public and we're being added publically # TODO: Check that the group is public and we're being added publically
is_publicised = content.get("publicise", False) is_publicised = content.get("publicise", False)
token = yield self.store.register_user_group_membership( token = await self.store.register_user_group_membership(
group_id, group_id,
user_id, user_id,
membership="join", membership="join",
@ -425,18 +416,17 @@ class GroupsLocalHandler(GroupsLocalWorkerHandler):
return {} return {}
@defer.inlineCallbacks async def invite(self, group_id, user_id, requester_user_id, config):
def invite(self, group_id, user_id, requester_user_id, config):
"""Invite a user to a group """Invite a user to a group
""" """
content = {"requester_user_id": requester_user_id, "config": config} content = {"requester_user_id": requester_user_id, "config": config}
if self.is_mine_id(group_id): if self.is_mine_id(group_id):
res = yield self.groups_server_handler.invite_to_group( res = await self.groups_server_handler.invite_to_group(
group_id, user_id, requester_user_id, content group_id, user_id, requester_user_id, content
) )
else: else:
try: try:
res = yield self.transport_client.invite_to_group( res = await self.transport_client.invite_to_group(
get_domain_from_id(group_id), get_domain_from_id(group_id),
group_id, group_id,
user_id, user_id,
@ -450,8 +440,7 @@ class GroupsLocalHandler(GroupsLocalWorkerHandler):
return res return res
@defer.inlineCallbacks async def on_invite(self, group_id, user_id, content):
def on_invite(self, group_id, user_id, content):
"""One of our users were invited to a group """One of our users were invited to a group
""" """
# TODO: Support auto join and rejection # TODO: Support auto join and rejection
@ -466,7 +455,7 @@ class GroupsLocalHandler(GroupsLocalWorkerHandler):
if "avatar_url" in content["profile"]: if "avatar_url" in content["profile"]:
local_profile["avatar_url"] = content["profile"]["avatar_url"] local_profile["avatar_url"] = content["profile"]["avatar_url"]
token = yield self.store.register_user_group_membership( token = await self.store.register_user_group_membership(
group_id, group_id,
user_id, user_id,
membership="invite", membership="invite",
@ -474,7 +463,7 @@ class GroupsLocalHandler(GroupsLocalWorkerHandler):
) )
self.notifier.on_new_event("groups_key", token, users=[user_id]) self.notifier.on_new_event("groups_key", token, users=[user_id])
try: try:
user_profile = yield self.profile_handler.get_profile(user_id) user_profile = await self.profile_handler.get_profile(user_id)
except Exception as e: except Exception as e:
logger.warning("No profile for user %s: %s", user_id, e) logger.warning("No profile for user %s: %s", user_id, e)
user_profile = {} user_profile = {}
@ -516,12 +505,11 @@ class GroupsLocalHandler(GroupsLocalWorkerHandler):
return res return res
@defer.inlineCallbacks async def user_removed_from_group(self, group_id, user_id, content):
def user_removed_from_group(self, group_id, user_id, content):
"""One of our users was removed/kicked from a group """One of our users was removed/kicked from a group
""" """
# TODO: Check if user in group # TODO: Check if user in group
token = yield self.store.register_user_group_membership( token = await self.store.register_user_group_membership(
group_id, user_id, membership="leave" group_id, user_id, membership="leave"
) )
self.notifier.on_new_event("groups_key", token, users=[user_id]) self.notifier.on_new_event("groups_key", token, users=[user_id])