Split MessageHandler into read only and writers

This will let us call the read only parts from workers, and so be able
to move some APIs off of master, e.g. the `/state` API.
This commit is contained in:
Erik Johnston 2018-07-18 15:22:02 +01:00
parent dab00faa83
commit 8cb8df55e9
5 changed files with 176 additions and 149 deletions

View file

@ -17,7 +17,6 @@ from .admin import AdminHandler
from .directory import DirectoryHandler from .directory import DirectoryHandler
from .federation import FederationHandler from .federation import FederationHandler
from .identity import IdentityHandler from .identity import IdentityHandler
from .message import MessageHandler
from .register import RegistrationHandler from .register import RegistrationHandler
from .room import RoomContextHandler from .room import RoomContextHandler
from .search import SearchHandler from .search import SearchHandler
@ -44,7 +43,6 @@ class Handlers(object):
def __init__(self, hs): def __init__(self, hs):
self.registration_handler = RegistrationHandler(hs) self.registration_handler = RegistrationHandler(hs)
self.message_handler = MessageHandler(hs)
self.federation_handler = FederationHandler(hs) self.federation_handler = FederationHandler(hs)
self.directory_handler = DirectoryHandler(hs) self.directory_handler = DirectoryHandler(hs)
self.admin_handler = AdminHandler(hs) self.admin_handler = AdminHandler(hs)

View file

@ -75,12 +75,159 @@ class PurgeStatus(object):
} }
class MessageHandler(BaseHandler): class MessageHandler(object):
"""Contains some read only APIs to get state about a room
"""
def __init__(self, hs): def __init__(self, hs):
super(MessageHandler, self).__init__(hs) self.auth = hs.get_auth()
self.hs = hs self.clock = hs.get_clock()
self.state = hs.get_state_handler() self.state = hs.get_state_handler()
self.store = hs.get_datastore()
@defer.inlineCallbacks
def get_room_data(self, user_id=None, room_id=None,
event_type=None, state_key="", is_guest=False):
""" Get data from a room.
Args:
event : The room path event
Returns:
The path data content.
Raises:
SynapseError if something went wrong.
"""
membership, membership_event_id = yield self._check_in_room_or_world_readable(
room_id, user_id
)
if membership == Membership.JOIN:
data = yield self.state.get_current_state(
room_id, event_type, state_key
)
elif membership == Membership.LEAVE:
key = (event_type, state_key)
room_state = yield self.store.get_state_for_events(
[membership_event_id], [key]
)
data = room_state[membership_event_id].get(key)
defer.returnValue(data)
@defer.inlineCallbacks
def _check_in_room_or_world_readable(self, room_id, user_id):
try:
# check_user_was_in_room will return the most recent membership
# event for the user if:
# * The user is a non-guest user, and was ever in the room
# * The user is a guest user, and has joined the room
# else it will throw.
member_event = yield self.auth.check_user_was_in_room(room_id, user_id)
defer.returnValue((member_event.membership, member_event.event_id))
return
except AuthError:
visibility = yield self.state.get_current_state(
room_id, EventTypes.RoomHistoryVisibility, ""
)
if (
visibility and
visibility.content["history_visibility"] == "world_readable"
):
defer.returnValue((Membership.JOIN, None))
return
raise AuthError(
403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN
)
@defer.inlineCallbacks
def get_state_events(self, user_id, room_id, is_guest=False):
"""Retrieve all state events for a given room. If the user is
joined to the room then return the current state. If the user has
left the room return the state events from when they left.
Args:
user_id(str): The user requesting state events.
room_id(str): The room ID to get all state events from.
Returns:
A list of dicts representing state events. [{}, {}, {}]
"""
membership, membership_event_id = yield self._check_in_room_or_world_readable(
room_id, user_id
)
if membership == Membership.JOIN:
room_state = yield self.state.get_current_state(room_id)
elif membership == Membership.LEAVE:
room_state = yield self.store.get_state_for_events(
[membership_event_id], None
)
room_state = room_state[membership_event_id]
now = self.clock.time_msec()
defer.returnValue(
[serialize_event(c, now) for c in room_state.values()]
)
@defer.inlineCallbacks
def get_joined_members(self, requester, room_id):
"""Get all the joined members in the room and their profile information.
If the user has left the room return the state events from when they left.
Args:
requester(Requester): The user requesting state events.
room_id(str): The room ID to get all state events from.
Returns:
A dict of user_id to profile info
"""
user_id = requester.user.to_string()
if not requester.app_service:
# We check AS auth after fetching the room membership, as it
# requires us to pull out all joined members anyway.
membership, _ = yield self._check_in_room_or_world_readable(
room_id, user_id
)
if membership != Membership.JOIN:
raise NotImplementedError(
"Getting joined members after leaving is not implemented"
)
users_with_profile = yield self.state.get_current_user_in_room(room_id)
# If this is an AS, double check that they are allowed to see the members.
# This can either be because the AS user is in the room or because there
# is a user in the room that the AS is "interested in"
if requester.app_service and user_id not in users_with_profile:
for uid in users_with_profile:
if requester.app_service.is_interested_in_user(uid):
break
else:
# Loop fell through, AS has no interested users in room
raise AuthError(403, "Appservice not in room")
defer.returnValue({
user_id: {
"avatar_url": profile.avatar_url,
"display_name": profile.display_name,
}
for user_id, profile in iteritems(users_with_profile)
})
class PaginationHandler(MessageHandler):
"""Handles pagination and purge history requests.
These are in the same handler due to the fact we need to block clients
paginating during a purge.
This subclasses MessageHandler to get at _check_in_room_or_world_readable
"""
def __init__(self, hs):
super(PaginationHandler, self).__init__(hs)
self.hs = hs
self.store = hs.get_datastore()
self.clock = hs.get_clock() self.clock = hs.get_clock()
self.pagination_lock = ReadWriteLock() self.pagination_lock = ReadWriteLock()
@ -274,134 +421,6 @@ class MessageHandler(BaseHandler):
defer.returnValue(chunk) defer.returnValue(chunk)
@defer.inlineCallbacks
def get_room_data(self, user_id=None, room_id=None,
event_type=None, state_key="", is_guest=False):
""" Get data from a room.
Args:
event : The room path event
Returns:
The path data content.
Raises:
SynapseError if something went wrong.
"""
membership, membership_event_id = yield self._check_in_room_or_world_readable(
room_id, user_id
)
if membership == Membership.JOIN:
data = yield self.state_handler.get_current_state(
room_id, event_type, state_key
)
elif membership == Membership.LEAVE:
key = (event_type, state_key)
room_state = yield self.store.get_state_for_events(
[membership_event_id], [key]
)
data = room_state[membership_event_id].get(key)
defer.returnValue(data)
@defer.inlineCallbacks
def _check_in_room_or_world_readable(self, room_id, user_id):
try:
# check_user_was_in_room will return the most recent membership
# event for the user if:
# * The user is a non-guest user, and was ever in the room
# * The user is a guest user, and has joined the room
# else it will throw.
member_event = yield self.auth.check_user_was_in_room(room_id, user_id)
defer.returnValue((member_event.membership, member_event.event_id))
return
except AuthError:
visibility = yield self.state_handler.get_current_state(
room_id, EventTypes.RoomHistoryVisibility, ""
)
if (
visibility and
visibility.content["history_visibility"] == "world_readable"
):
defer.returnValue((Membership.JOIN, None))
return
raise AuthError(
403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN
)
@defer.inlineCallbacks
def get_state_events(self, user_id, room_id, is_guest=False):
"""Retrieve all state events for a given room. If the user is
joined to the room then return the current state. If the user has
left the room return the state events from when they left.
Args:
user_id(str): The user requesting state events.
room_id(str): The room ID to get all state events from.
Returns:
A list of dicts representing state events. [{}, {}, {}]
"""
membership, membership_event_id = yield self._check_in_room_or_world_readable(
room_id, user_id
)
if membership == Membership.JOIN:
room_state = yield self.state_handler.get_current_state(room_id)
elif membership == Membership.LEAVE:
room_state = yield self.store.get_state_for_events(
[membership_event_id], None
)
room_state = room_state[membership_event_id]
now = self.clock.time_msec()
defer.returnValue(
[serialize_event(c, now) for c in room_state.values()]
)
@defer.inlineCallbacks
def get_joined_members(self, requester, room_id):
"""Get all the joined members in the room and their profile information.
If the user has left the room return the state events from when they left.
Args:
requester(Requester): The user requesting state events.
room_id(str): The room ID to get all state events from.
Returns:
A dict of user_id to profile info
"""
user_id = requester.user.to_string()
if not requester.app_service:
# We check AS auth after fetching the room membership, as it
# requires us to pull out all joined members anyway.
membership, _ = yield self._check_in_room_or_world_readable(
room_id, user_id
)
if membership != Membership.JOIN:
raise NotImplementedError(
"Getting joined members after leaving is not implemented"
)
users_with_profile = yield self.state.get_current_user_in_room(room_id)
# If this is an AS, double check that they are allowed to see the members.
# This can either be because the AS user is in the room or because there
# is a user in the room that the AS is "interested in"
if requester.app_service and user_id not in users_with_profile:
for uid in users_with_profile:
if requester.app_service.is_interested_in_user(uid):
break
else:
# Loop fell through, AS has no interested users in room
raise AuthError(403, "Appservice not in room")
defer.returnValue({
user_id: {
"avatar_url": profile.avatar_url,
"display_name": profile.display_name,
}
for user_id, profile in iteritems(users_with_profile)
})
class EventCreationHandler(object): class EventCreationHandler(object):
def __init__(self, hs): def __init__(self, hs):

View file

@ -123,7 +123,7 @@ class PurgeHistoryRestServlet(ClientV1RestServlet):
hs (synapse.server.HomeServer) hs (synapse.server.HomeServer)
""" """
super(PurgeHistoryRestServlet, self).__init__(hs) super(PurgeHistoryRestServlet, self).__init__(hs)
self.handlers = hs.get_handlers() self.pagination_handler = hs.get_pagination_handler()
self.store = hs.get_datastore() self.store = hs.get_datastore()
@defer.inlineCallbacks @defer.inlineCallbacks
@ -198,7 +198,7 @@ class PurgeHistoryRestServlet(ClientV1RestServlet):
errcode=Codes.BAD_JSON, errcode=Codes.BAD_JSON,
) )
purge_id = yield self.handlers.message_handler.start_purge_history( purge_id = yield self.pagination_handler.start_purge_history(
room_id, token, room_id, token,
delete_local_events=delete_local_events, delete_local_events=delete_local_events,
) )
@ -220,7 +220,7 @@ class PurgeHistoryStatusRestServlet(ClientV1RestServlet):
hs (synapse.server.HomeServer) hs (synapse.server.HomeServer)
""" """
super(PurgeHistoryStatusRestServlet, self).__init__(hs) super(PurgeHistoryStatusRestServlet, self).__init__(hs)
self.handlers = hs.get_handlers() self.pagination_handler = hs.get_pagination_handler()
@defer.inlineCallbacks @defer.inlineCallbacks
def on_GET(self, request, purge_id): def on_GET(self, request, purge_id):
@ -230,7 +230,7 @@ class PurgeHistoryStatusRestServlet(ClientV1RestServlet):
if not is_admin: if not is_admin:
raise AuthError(403, "You are not a server admin") raise AuthError(403, "You are not a server admin")
purge_status = self.handlers.message_handler.get_purge_status(purge_id) purge_status = self.pagination_handler.get_purge_status(purge_id)
if purge_status is None: if purge_status is None:
raise NotFoundError("purge id '%s' not found" % purge_id) raise NotFoundError("purge id '%s' not found" % purge_id)

View file

@ -90,6 +90,7 @@ class RoomStateEventRestServlet(ClientV1RestServlet):
self.handlers = hs.get_handlers() self.handlers = hs.get_handlers()
self.event_creation_hander = hs.get_event_creation_handler() self.event_creation_hander = hs.get_event_creation_handler()
self.room_member_handler = hs.get_room_member_handler() self.room_member_handler = hs.get_room_member_handler()
self.message_handler = hs.get_message_handler()
def register(self, http_server): def register(self, http_server):
# /room/$roomid/state/$eventtype # /room/$roomid/state/$eventtype
@ -124,7 +125,7 @@ class RoomStateEventRestServlet(ClientV1RestServlet):
format = parse_string(request, "format", default="content", format = parse_string(request, "format", default="content",
allowed_values=["content", "event"]) allowed_values=["content", "event"])
msg_handler = self.handlers.message_handler msg_handler = self.message_handler
data = yield msg_handler.get_room_data( data = yield msg_handler.get_room_data(
user_id=requester.user.to_string(), user_id=requester.user.to_string(),
room_id=room_id, room_id=room_id,
@ -377,14 +378,13 @@ class RoomMemberListRestServlet(ClientV1RestServlet):
def __init__(self, hs): def __init__(self, hs):
super(RoomMemberListRestServlet, self).__init__(hs) super(RoomMemberListRestServlet, self).__init__(hs)
self.handlers = hs.get_handlers() self.message_handler = hs.get_message_handler()
@defer.inlineCallbacks @defer.inlineCallbacks
def on_GET(self, request, room_id): def on_GET(self, request, room_id):
# TODO support Pagination stream API (limit/tokens) # TODO support Pagination stream API (limit/tokens)
requester = yield self.auth.get_user_by_req(request) requester = yield self.auth.get_user_by_req(request)
handler = self.handlers.message_handler events = yield self.message_handler.get_state_events(
events = yield handler.get_state_events(
room_id=room_id, room_id=room_id,
user_id=requester.user.to_string(), user_id=requester.user.to_string(),
) )
@ -406,7 +406,7 @@ class JoinedRoomMemberListRestServlet(ClientV1RestServlet):
def __init__(self, hs): def __init__(self, hs):
super(JoinedRoomMemberListRestServlet, self).__init__(hs) super(JoinedRoomMemberListRestServlet, self).__init__(hs)
self.message_handler = hs.get_handlers().message_handler self.message_handler = hs.get_message_handler()
@defer.inlineCallbacks @defer.inlineCallbacks
def on_GET(self, request, room_id): def on_GET(self, request, room_id):
@ -427,7 +427,7 @@ class RoomMessageListRestServlet(ClientV1RestServlet):
def __init__(self, hs): def __init__(self, hs):
super(RoomMessageListRestServlet, self).__init__(hs) super(RoomMessageListRestServlet, self).__init__(hs)
self.handlers = hs.get_handlers() self.pagination_handler = hs.get_pagination_handler()
@defer.inlineCallbacks @defer.inlineCallbacks
def on_GET(self, request, room_id): def on_GET(self, request, room_id):
@ -442,8 +442,7 @@ class RoomMessageListRestServlet(ClientV1RestServlet):
event_filter = Filter(json.loads(filter_json)) event_filter = Filter(json.loads(filter_json))
else: else:
event_filter = None event_filter = None
handler = self.handlers.message_handler msgs = yield self.pagination_handler.get_messages(
msgs = yield handler.get_messages(
room_id=room_id, room_id=room_id,
requester=requester, requester=requester,
pagin_config=pagination_config, pagin_config=pagination_config,
@ -460,14 +459,13 @@ class RoomStateRestServlet(ClientV1RestServlet):
def __init__(self, hs): def __init__(self, hs):
super(RoomStateRestServlet, self).__init__(hs) super(RoomStateRestServlet, self).__init__(hs)
self.handlers = hs.get_handlers() self.message_handler = hs.get_message_handler()
@defer.inlineCallbacks @defer.inlineCallbacks
def on_GET(self, request, room_id): def on_GET(self, request, room_id):
requester = yield self.auth.get_user_by_req(request, allow_guest=True) requester = yield self.auth.get_user_by_req(request, allow_guest=True)
handler = self.handlers.message_handler
# Get all the current state for this room # Get all the current state for this room
events = yield handler.get_state_events( events = yield self.message_handler.get_state_events(
room_id=room_id, room_id=room_id,
user_id=requester.user.to_string(), user_id=requester.user.to_string(),
is_guest=requester.is_guest, is_guest=requester.is_guest,

View file

@ -52,7 +52,11 @@ from synapse.handlers.e2e_keys import E2eKeysHandler
from synapse.handlers.events import EventHandler, EventStreamHandler from synapse.handlers.events import EventHandler, EventStreamHandler
from synapse.handlers.groups_local import GroupsLocalHandler from synapse.handlers.groups_local import GroupsLocalHandler
from synapse.handlers.initial_sync import InitialSyncHandler from synapse.handlers.initial_sync import InitialSyncHandler
from synapse.handlers.message import EventCreationHandler from synapse.handlers.message import (
EventCreationHandler,
MessageHandler,
PaginationHandler,
)
from synapse.handlers.presence import PresenceHandler from synapse.handlers.presence import PresenceHandler
from synapse.handlers.profile import ProfileHandler from synapse.handlers.profile import ProfileHandler
from synapse.handlers.read_marker import ReadMarkerHandler from synapse.handlers.read_marker import ReadMarkerHandler
@ -163,6 +167,8 @@ class HomeServer(object):
'federation_registry', 'federation_registry',
'server_notices_manager', 'server_notices_manager',
'server_notices_sender', 'server_notices_sender',
'message_handler',
'pagination_handler',
] ]
def __init__(self, hostname, reactor=None, **kwargs): def __init__(self, hostname, reactor=None, **kwargs):
@ -426,6 +432,12 @@ class HomeServer(object):
return WorkerServerNoticesSender(self) return WorkerServerNoticesSender(self)
return ServerNoticesSender(self) return ServerNoticesSender(self)
def build_message_handler(self):
return MessageHandler(self)
def build_pagination_handler(self):
return PaginationHandler(self)
def remove_pusher(self, app_id, push_key, user_id): def remove_pusher(self, app_id, push_key, user_id):
return self.get_pusherpool().remove_pusher(app_id, push_key, user_id) return self.get_pusherpool().remove_pusher(app_id, push_key, user_id)