diff --git a/synapse/federation/handler.py b/synapse/federation/handler.py deleted file mode 100644 index ce98f4f94a..0000000000 --- a/synapse/federation/handler.py +++ /dev/null @@ -1,156 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2014 matrix.org -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from twisted.internet import defer - -from .pdu_codec import PduCodec - -from synapse.api.errors import AuthError -from synapse.util.logutils import log_function - -import logging - - -logger = logging.getLogger(__name__) - - -class FederationEventHandler(object): - """ Responsible for: - a) handling received Pdus before handing them on as Events to the rest - of the home server (including auth and state conflict resoultion) - b) converting events that were produced by local clients that may need - to be sent to remote home servers. - """ - - def __init__(self, hs): - self.store = hs.get_datastore() - self.replication_layer = hs.get_replication_layer() - self.state_handler = hs.get_state_handler() - # self.auth_handler = gs.get_auth_handler() - self.event_handler = hs.get_handlers().federation_handler - self.server_name = hs.hostname - - self.lock_manager = hs.get_room_lock_manager() - - self.replication_layer.set_handler(self) - - self.pdu_codec = PduCodec(hs) - - @log_function - @defer.inlineCallbacks - def handle_new_event(self, event, snapshot): - """ Takes in an event from the client to server side, that has already - been authed and handled by the state module, and sends it to any - remote home servers that may be interested. - - Args: - event - snapshot (.storage.Snapshot): THe snapshot the event happened after - - Returns: - Deferred: Resolved when it has successfully been queued for - processing. - """ - yield self.fill_out_prev_events(event, snapshot) - - pdu = self.pdu_codec.pdu_from_event(event) - - if not hasattr(pdu, "destinations") or not pdu.destinations: - pdu.destinations = [] - - yield self.replication_layer.send_pdu(pdu) - - @log_function - @defer.inlineCallbacks - def backfill(self, dest, room_id, limit): - pdus = yield self.replication_layer.backfill(dest, room_id, limit) - - if not pdus: - defer.returnValue([]) - - events = [ - self.pdu_codec.event_from_pdu(pdu) - for pdu in pdus - ] - - defer.returnValue(events) - - @log_function - def get_state_for_room(self, destination, room_id): - return self.replication_layer.get_state_for_context( - destination, room_id - ) - - @log_function - @defer.inlineCallbacks - def on_receive_pdu(self, pdu, backfilled): - """ Called by the ReplicationLayer when we have a new pdu. We need to - do auth checks and put it throught the StateHandler. - """ - event = self.pdu_codec.event_from_pdu(pdu) - - try: - with (yield self.lock_manager.lock(pdu.context)): - if event.is_state and not backfilled: - is_new_state = yield self.state_handler.handle_new_state( - pdu - ) - if not is_new_state: - return - else: - is_new_state = False - - yield self.event_handler.on_receive(event, is_new_state, backfilled) - - except AuthError: - # TODO: Implement something in federation that allows us to - # respond to PDU. - raise - - return - - @defer.inlineCallbacks - def _on_new_state(self, pdu, new_state_event): - # TODO: Do any store stuff here. Notifiy C2S about this new - # state. - - yield self.store.update_current_state( - pdu_id=pdu.pdu_id, - origin=pdu.origin, - context=pdu.context, - pdu_type=pdu.pdu_type, - state_key=pdu.state_key - ) - - yield self.event_handler.on_receive(new_state_event) - - @defer.inlineCallbacks - def fill_out_prev_events(self, event, snapshot): - if hasattr(event, "prev_events"): - return - - results = snapshot.prev_pdus - - es = [ - "%s@%s" % (p_id, origin) for p_id, origin, _ in results - ] - - event.prev_events = [e for e in es if e != event.event_id] - - if results: - event.depth = max([int(v) for _, _, v in results]) + 1 - else: - event.depth = 0 diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 32c0d6b8aa..00da47bb5d 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -43,4 +43,5 @@ class BaseRoomHandler(BaseHandler): self.notifier.on_new_room_event(event, store_id) - yield self.hs.get_federation().handle_new_event(event, snapshot) + federation_handler = self.hs.get_handlers().federation_handler + yield federation_handler.handle_new_event(event, snapshot) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index bfc1ab86f2..a2e935add0 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -20,6 +20,9 @@ from ._base import BaseHandler from synapse.api.events.room import InviteJoinEvent, RoomMemberEvent from synapse.api.constants import Membership from synapse.util.logutils import log_function +from synapse.federation.pdu_codec import PduCodec + +from synapse.api.errors import AuthError from twisted.internet import defer @@ -30,8 +33,14 @@ logger = logging.getLogger(__name__) class FederationHandler(BaseHandler): + """Handles events that originated from federation. + Responsible for: + a) handling received Pdus before handing them on as Events to the rest + of the home server (including auth and state conflict resoultion) + b) converting events that were produced by local clients that may need + to be sent to remote home servers. + """ - """Handles events that originated from federation.""" def __init__(self, hs): super(FederationHandler, self).__init__(hs) @@ -42,6 +51,112 @@ class FederationHandler(BaseHandler): self.waiting_for_join_list = {} + self.store = hs.get_datastore() + self.replication_layer = hs.get_replication_layer() + self.state_handler = hs.get_state_handler() + # self.auth_handler = gs.get_auth_handler() + self.server_name = hs.hostname + + self.lock_manager = hs.get_room_lock_manager() + + self.replication_layer.set_handler(self) + + self.pdu_codec = PduCodec(hs) + + @log_function + @defer.inlineCallbacks + def handle_new_event(self, event, snapshot): + """ Takes in an event from the client to server side, that has already + been authed and handled by the state module, and sends it to any + remote home servers that may be interested. + + Args: + event + snapshot (.storage.Snapshot): THe snapshot the event happened after + + Returns: + Deferred: Resolved when it has successfully been queued for + processing. + """ + yield self.fill_out_prev_events(event, snapshot) + + pdu = self.pdu_codec.pdu_from_event(event) + + if not hasattr(pdu, "destinations") or not pdu.destinations: + pdu.destinations = [] + + yield self.replication_layer.send_pdu(pdu) + + + @log_function + def get_state_for_room(self, destination, room_id): + return self.replication_layer.get_state_for_context( + destination, room_id + ) + + @log_function + @defer.inlineCallbacks + def on_receive_pdu(self, pdu, backfilled): + """ Called by the ReplicationLayer when we have a new pdu. We need to + do auth checks and put it throught the StateHandler. + """ + event = self.pdu_codec.event_from_pdu(pdu) + + try: + with (yield self.lock_manager.lock(pdu.context)): + if event.is_state and not backfilled: + is_new_state = yield self.state_handler.handle_new_state( + pdu + ) + if not is_new_state: + return + else: + is_new_state = False + + yield self.on_receive(event, is_new_state, backfilled) + + except AuthError: + # TODO: Implement something in federation that allows us to + # respond to PDU. + raise + + return + + @defer.inlineCallbacks + def _on_new_state(self, pdu, new_state_event): + # TODO: Do any store stuff here. Notifiy C2S about this new + # state. + + yield self.store.update_current_state( + pdu_id=pdu.pdu_id, + origin=pdu.origin, + context=pdu.context, + pdu_type=pdu.pdu_type, + state_key=pdu.state_key + ) + + yield self.on_receive(new_state_event) + + @defer.inlineCallbacks + def fill_out_prev_events(self, event, snapshot): + if hasattr(event, "prev_events"): + return + + results = snapshot.prev_pdus + + es = [ + "%s@%s" % (p_id, origin) for p_id, origin, _ in results + ] + + event.prev_events = [e for e in es if e != event.event_id] + + if results: + event.depth = max([int(v) for _, _, v in results]) + 1 + else: + event.depth = 0 + + + @log_function @defer.inlineCallbacks def on_receive(self, event, is_new_state, backfilled): @@ -86,8 +201,7 @@ class FederationHandler(BaseHandler): if not room: # Huh, let's try and get the current state try: - federation = self.hs.get_federation() - yield federation.get_state_for_room( + yield self.get_state_for_room( event.origin, event.room_id ) @@ -119,11 +233,10 @@ class FederationHandler(BaseHandler): "user_joined_room", user=user, room_id=event.room_id ) - @log_function @defer.inlineCallbacks def backfill(self, dest, room_id, limit): - events = yield self.hs.get_federation().backfill(dest, room_id, limit) + events = yield self._backfill(dest, room_id, limit) for event in events: try: @@ -133,10 +246,23 @@ class FederationHandler(BaseHandler): defer.returnValue(events) + @defer.inlineCallbacks + def _backfill(self, dest, room_id, limit): + pdus = yield self.replication_layer.backfill(dest, room_id, limit) + + if not pdus: + defer.returnValue([]) + + events = [ + self.pdu_codec.event_from_pdu(pdu) + for pdu in pdus + ] + + defer.returnValue(events) + @log_function @defer.inlineCallbacks def do_invite_join(self, target_host, room_id, joinee, content): - federation = self.hs.get_federation() hosts = yield self.store.get_joined_hosts_for_room(room_id) if self.hs.hostname in hosts: @@ -146,7 +272,7 @@ class FederationHandler(BaseHandler): # First get current state to see if we are already joined. try: - yield federation.get_state_for_room(target_host, room_id) + yield self.get_state_for_room(target_host, room_id) hosts = yield self.store.get_joined_hosts_for_room(room_id) if self.hs.hostname in hosts: @@ -166,7 +292,7 @@ class FederationHandler(BaseHandler): new_event.destinations = [target_host] - yield federation.handle_new_event(new_event) + yield self.handle_new_event(new_event) # TODO (erikj): Time out here. d = defer.Deferred() diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 4797f8be0c..6abfa00c5c 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -368,7 +368,8 @@ class RoomCreationHandler(BaseRoomHandler): yield self.state_handler.handle_new_event(config_event) # store_id = persist... - yield self.hs.get_federation().handle_new_event(config_event) + federation_handler = self.hs.get_handlers().federation_handler + yield federation_handler.handle_new_event(config_event) # self.notifier.on_new_room_event(event, store_id) content = {"membership": Membership.JOIN} diff --git a/synapse/server.py b/synapse/server.py index c5b0a32757..b825917748 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -20,7 +20,6 @@ # Imports required for the default HomeServer() implementation from synapse.federation import initialize_http_replication -from synapse.federation.handler import FederationEventHandler from synapse.api.events.factory import EventFactory from synapse.api.notifier import Notifier from synapse.api.auth import Auth @@ -58,7 +57,6 @@ class BaseHomeServer(object): 'http_client', 'db_pool', 'persistence_service', - 'federation', 'replication_layer', 'datastore', 'event_factory', @@ -152,9 +150,6 @@ class HomeServer(BaseHomeServer): def build_replication_layer(self): return initialize_http_replication(self) - def build_federation(self): - return FederationEventHandler(self) - def build_datastore(self): return DataStore(self) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index a726b7346b..130387184f 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -60,7 +60,9 @@ class DataStore(RoomMemberStore, RoomStore, def persist_event(self, event=None, backfilled=False, pdu=None): # FIXME (erikj): This should be removed when we start amalgamating # event and pdu storage - yield self.hs.get_federation().fill_out_prev_events(event) + if event is not None: + federation_handler = self.hs.get_handlers().federation_handler + yield federation_handler.fill_out_prev_events(event) stream_ordering = None if backfilled: diff --git a/tests/handlers/test_room.py b/tests/handlers/test_room.py index a1ab8dde68..613f5c307e 100644 --- a/tests/handlers/test_room.py +++ b/tests/handlers/test_room.py @@ -53,23 +53,26 @@ class RoomMemberHandlerTestCase(unittest.TestCase): handlers=NonCallableMock(spec_set=[ "room_member_handler", "profile_handler", + "federation_handler", ]), auth=NonCallableMock(spec_set=["check"]), - federation=NonCallableMock(spec_set=[ - "handle_new_event", - "get_state_for_room", - ]), state_handler=NonCallableMock(spec_set=["handle_new_event"]), ) + self.federation = NonCallableMock(spec_set=[ + "handle_new_event", + "get_state_for_room", + ]) + self.datastore = hs.get_datastore() self.handlers = hs.get_handlers() self.notifier = hs.get_notifier() - self.federation = hs.get_federation() self.state_handler = hs.get_state_handler() self.distributor = hs.get_distributor() self.hs = hs + self.handlers.federation_handler = self.federation + self.distributor.declare("collect_presencelike_data") self.handlers.room_member_handler = RoomMemberHandler(self.hs) @@ -333,21 +336,24 @@ class RoomCreationTest(unittest.TestCase): handlers=NonCallableMock(spec_set=[ "room_creation_handler", "room_member_handler", + "federation_handler", ]), auth=NonCallableMock(spec_set=["check"]), - federation=NonCallableMock(spec_set=[ - "handle_new_event", - ]), state_handler=NonCallableMock(spec_set=["handle_new_event"]), ) + self.federation = NonCallableMock(spec_set=[ + "handle_new_event", + ]) + self.datastore = hs.get_datastore() self.handlers = hs.get_handlers() self.notifier = hs.get_notifier() - self.federation = hs.get_federation() self.state_handler = hs.get_state_handler() self.hs = hs + self.handlers.federation_handler = self.federation + self.handlers.room_creation_handler = RoomCreationHandler(self.hs) self.room_creation_handler = self.handlers.room_creation_handler diff --git a/tests/rest/test_events.py b/tests/rest/test_events.py index 4025e14581..cb641e0232 100644 --- a/tests/rest/test_events.py +++ b/tests/rest/test_events.py @@ -128,9 +128,9 @@ class EventStreamPermissionsTestCase(RestTestCase): "test", db_pool=None, http_client=None, - federation=Mock(), replication_layer=Mock(), state_handler=state_handler, + datastore=MemoryDataStore(), persistence_service=persistence_service, clock=Mock(spec=[ "call_later", @@ -139,9 +139,10 @@ class EventStreamPermissionsTestCase(RestTestCase): ]), ) + hs.get_handlers().federation_handler = Mock() + hs.get_clock().time_msec.return_value = 1000000 - hs.datastore = MemoryDataStore() synapse.rest.register.register_servlets(hs, self.mock_resource) synapse.rest.events.register_servlets(hs, self.mock_resource) synapse.rest.room.register_servlets(hs, self.mock_resource) diff --git a/tests/rest/test_rooms.py b/tests/rest/test_rooms.py index 8514d6ba21..83e5d0c129 100644 --- a/tests/rest/test_rooms.py +++ b/tests/rest/test_rooms.py @@ -54,12 +54,12 @@ class RoomPermissionsTestCase(RestTestCase): "test", db_pool=None, http_client=None, - federation=Mock(), datastore=MemoryDataStore(), replication_layer=Mock(), state_handler=state_handler, persistence_service=persistence_service, ) + hs.get_handlers().federation_handler = Mock() def _get_user_by_token(token=None): return hs.parse_userid(self.auth_user_id) @@ -403,12 +403,12 @@ class RoomsMemberListTestCase(RestTestCase): "test", db_pool=None, http_client=None, - federation=Mock(), datastore=MemoryDataStore(), replication_layer=Mock(), state_handler=state_handler, persistence_service=persistence_service, ) + hs.get_handlers().federation_handler = Mock() self.auth_user_id = self.user_id @@ -484,12 +484,12 @@ class RoomsCreateTestCase(RestTestCase): "test", db_pool=None, http_client=None, - federation=Mock(), datastore=MemoryDataStore(), replication_layer=Mock(), state_handler=state_handler, persistence_service=persistence_service, ) + hs.get_handlers().federation_handler = Mock() def _get_user_by_token(token=None): return hs.parse_userid(self.auth_user_id) @@ -626,12 +626,12 @@ class RoomTopicTestCase(RestTestCase): "test", db_pool=None, http_client=None, - federation=Mock(), datastore=MemoryDataStore(), replication_layer=Mock(), state_handler=state_handler, persistence_service=persistence_service, ) + hs.get_handlers().federation_handler = Mock() def _get_user_by_token(token=None): return hs.parse_userid(self.auth_user_id) @@ -729,12 +729,12 @@ class RoomMemberStateTestCase(RestTestCase): "test", db_pool=None, http_client=None, - federation=Mock(), datastore=MemoryDataStore(), replication_layer=Mock(), state_handler=state_handler, persistence_service=persistence_service, ) + hs.get_handlers().federation_handler = Mock() def _get_user_by_token(token=None): return hs.parse_userid(self.auth_user_id) @@ -855,12 +855,12 @@ class RoomMessagesTestCase(RestTestCase): "test", db_pool=None, http_client=None, - federation=Mock(), datastore=MemoryDataStore(), replication_layer=Mock(), state_handler=state_handler, persistence_service=persistence_service, ) + hs.get_handlers().federation_handler = Mock() def _get_user_by_token(token=None): return hs.parse_userid(self.auth_user_id)