synapse/tests/replication/test_federation_sender_shard.py
Erik Johnston 2927921942
Clean up ShardedWorkerHandlingConfig (#9466)
* Split ShardedWorkerHandlingConfig

This is so that we have a type level understanding of when it is safe to
call `get_instance(..)` (as opposed to `should_handle(..)`).

* Remove special cases in ShardedWorkerHandlingConfig.

`ShardedWorkerHandlingConfig` tried to handle the various different ways
it was possible to configure federation senders and pushers. This led to
special cases that weren't hit during testing.

To fix this the handling of the different cases is moved from there and
`generic_worker` into the worker config class. This allows us to have
the logic in one place and allows the rest of the code to ignore the
different cases.
2021-02-24 13:23:18 +00:00

234 lines
8.5 KiB
Python

# -*- coding: utf-8 -*-
# Copyright 2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from mock import Mock
from synapse.api.constants import EventTypes, Membership
from synapse.events.builder import EventBuilderFactory
from synapse.rest.admin import register_servlets_for_client_rest_resource
from synapse.rest.client.v1 import login, room
from synapse.types import UserID, create_requester
from tests.replication._base import BaseMultiWorkerStreamTestCase
from tests.test_utils import make_awaitable
logger = logging.getLogger(__name__)
class FederationSenderTestCase(BaseMultiWorkerStreamTestCase):
servlets = [
login.register_servlets,
register_servlets_for_client_rest_resource,
room.register_servlets,
]
def default_config(self):
conf = super().default_config()
conf["send_federation"] = False
return conf
def test_send_event_single_sender(self):
"""Test that using a single federation sender worker correctly sends a
new event.
"""
mock_client = Mock(spec=["put_json"])
mock_client.put_json.return_value = make_awaitable({})
self.make_worker_hs(
"synapse.app.federation_sender",
{"send_federation": False},
federation_http_client=mock_client,
)
user = self.register_user("user", "pass")
token = self.login("user", "pass")
room = self.create_room_with_remote_server(user, token)
mock_client.put_json.reset_mock()
self.create_and_send_event(room, UserID.from_string(user))
self.replicate()
# Assert that the event was sent out over federation.
mock_client.put_json.assert_called()
self.assertEqual(mock_client.put_json.call_args[0][0], "other_server")
self.assertTrue(mock_client.put_json.call_args[1]["data"].get("pdus"))
def test_send_event_sharded(self):
"""Test that using two federation sender workers correctly sends
new events.
"""
mock_client1 = Mock(spec=["put_json"])
mock_client1.put_json.return_value = make_awaitable({})
self.make_worker_hs(
"synapse.app.federation_sender",
{
"send_federation": True,
"worker_name": "sender1",
"federation_sender_instances": ["sender1", "sender2"],
},
federation_http_client=mock_client1,
)
mock_client2 = Mock(spec=["put_json"])
mock_client2.put_json.return_value = make_awaitable({})
self.make_worker_hs(
"synapse.app.federation_sender",
{
"send_federation": True,
"worker_name": "sender2",
"federation_sender_instances": ["sender1", "sender2"],
},
federation_http_client=mock_client2,
)
user = self.register_user("user2", "pass")
token = self.login("user2", "pass")
sent_on_1 = False
sent_on_2 = False
for i in range(20):
server_name = "other_server_%d" % (i,)
room = self.create_room_with_remote_server(user, token, server_name)
mock_client1.reset_mock() # type: ignore[attr-defined]
mock_client2.reset_mock() # type: ignore[attr-defined]
self.create_and_send_event(room, UserID.from_string(user))
self.replicate()
if mock_client1.put_json.called:
sent_on_1 = True
mock_client2.put_json.assert_not_called()
self.assertEqual(mock_client1.put_json.call_args[0][0], server_name)
self.assertTrue(mock_client1.put_json.call_args[1]["data"].get("pdus"))
elif mock_client2.put_json.called:
sent_on_2 = True
mock_client1.put_json.assert_not_called()
self.assertEqual(mock_client2.put_json.call_args[0][0], server_name)
self.assertTrue(mock_client2.put_json.call_args[1]["data"].get("pdus"))
else:
raise AssertionError(
"Expected send transaction from one or the other sender"
)
if sent_on_1 and sent_on_2:
break
self.assertTrue(sent_on_1)
self.assertTrue(sent_on_2)
def test_send_typing_sharded(self):
"""Test that using two federation sender workers correctly sends
new typing EDUs.
"""
mock_client1 = Mock(spec=["put_json"])
mock_client1.put_json.return_value = make_awaitable({})
self.make_worker_hs(
"synapse.app.federation_sender",
{
"send_federation": True,
"worker_name": "sender1",
"federation_sender_instances": ["sender1", "sender2"],
},
federation_http_client=mock_client1,
)
mock_client2 = Mock(spec=["put_json"])
mock_client2.put_json.return_value = make_awaitable({})
self.make_worker_hs(
"synapse.app.federation_sender",
{
"send_federation": True,
"worker_name": "sender2",
"federation_sender_instances": ["sender1", "sender2"],
},
federation_http_client=mock_client2,
)
user = self.register_user("user3", "pass")
token = self.login("user3", "pass")
typing_handler = self.hs.get_typing_handler()
sent_on_1 = False
sent_on_2 = False
for i in range(20):
server_name = "other_server_%d" % (i,)
room = self.create_room_with_remote_server(user, token, server_name)
mock_client1.reset_mock() # type: ignore[attr-defined]
mock_client2.reset_mock() # type: ignore[attr-defined]
self.get_success(
typing_handler.started_typing(
target_user=UserID.from_string(user),
requester=create_requester(user),
room_id=room,
timeout=20000,
)
)
self.replicate()
if mock_client1.put_json.called:
sent_on_1 = True
mock_client2.put_json.assert_not_called()
self.assertEqual(mock_client1.put_json.call_args[0][0], server_name)
self.assertTrue(mock_client1.put_json.call_args[1]["data"].get("edus"))
elif mock_client2.put_json.called:
sent_on_2 = True
mock_client1.put_json.assert_not_called()
self.assertEqual(mock_client2.put_json.call_args[0][0], server_name)
self.assertTrue(mock_client2.put_json.call_args[1]["data"].get("edus"))
else:
raise AssertionError(
"Expected send transaction from one or the other sender"
)
if sent_on_1 and sent_on_2:
break
self.assertTrue(sent_on_1)
self.assertTrue(sent_on_2)
def create_room_with_remote_server(self, user, token, remote_server="other_server"):
room = self.helper.create_room_as(user, tok=token)
store = self.hs.get_datastore()
federation = self.hs.get_federation_handler()
prev_event_ids = self.get_success(store.get_latest_event_ids_in_room(room))
room_version = self.get_success(store.get_room_version(room))
factory = EventBuilderFactory(self.hs)
factory.hostname = remote_server
user_id = UserID("user", remote_server).to_string()
event_dict = {
"type": EventTypes.Member,
"state_key": user_id,
"content": {"membership": Membership.JOIN},
"sender": user_id,
"room_id": room,
}
builder = factory.for_room_version(room_version, event_dict)
join_event = self.get_success(builder.build(prev_event_ids, None))
self.get_success(federation.on_send_join_request(remote_server, join_event))
self.replicate()
return room