Add basic tests for sync/pagination with vector clock tokens. (#8488)

These are tests for #8439
This commit is contained in:
Erik Johnston 2020-10-14 13:53:20 +01:00 committed by GitHub
parent 921a3f8a59
commit 1264c8ac89
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 249 additions and 1 deletions

1
changelog.d/8488.misc Normal file
View file

@ -0,0 +1 @@
Allow events to be sent to clients sooner when using sharded event persisters.

View file

@ -14,8 +14,12 @@
# limitations under the License. # limitations under the License.
import logging import logging
from mock import patch
from synapse.api.room_versions import RoomVersion
from synapse.rest import admin from synapse.rest import admin
from synapse.rest.client.v1 import login, room from synapse.rest.client.v1 import login, room
from synapse.rest.client.v2_alpha import sync
from tests.replication._base import BaseMultiWorkerStreamTestCase from tests.replication._base import BaseMultiWorkerStreamTestCase
from tests.utils import USE_POSTGRES_FOR_TESTS from tests.utils import USE_POSTGRES_FOR_TESTS
@ -36,6 +40,7 @@ class EventPersisterShardTestCase(BaseMultiWorkerStreamTestCase):
admin.register_servlets_for_client_rest_resource, admin.register_servlets_for_client_rest_resource,
room.register_servlets, room.register_servlets,
login.register_servlets, login.register_servlets,
sync.register_servlets,
] ]
def prepare(self, reactor, clock, hs): def prepare(self, reactor, clock, hs):
@ -43,6 +48,9 @@ class EventPersisterShardTestCase(BaseMultiWorkerStreamTestCase):
self.other_user_id = self.register_user("otheruser", "pass") self.other_user_id = self.register_user("otheruser", "pass")
self.other_access_token = self.login("otheruser", "pass") self.other_access_token = self.login("otheruser", "pass")
self.room_creator = self.hs.get_room_creation_handler()
self.store = hs.get_datastore()
def default_config(self): def default_config(self):
conf = super().default_config() conf = super().default_config()
conf["redis"] = {"enabled": "true"} conf["redis"] = {"enabled": "true"}
@ -53,6 +61,29 @@ class EventPersisterShardTestCase(BaseMultiWorkerStreamTestCase):
} }
return conf return conf
def _create_room(self, room_id: str, user_id: str, tok: str):
"""Create a room with given room_id
"""
# We control the room ID generation by patching out the
# `_generate_room_id` method
async def generate_room(
creator_id: str, is_public: bool, room_version: RoomVersion
):
await self.store.store_room(
room_id=room_id,
room_creator_user_id=creator_id,
is_public=is_public,
room_version=room_version,
)
return room_id
with patch(
"synapse.handlers.room.RoomCreationHandler._generate_room_id"
) as mock:
mock.side_effect = generate_room
self.helper.create_room_as(user_id, tok=tok)
def test_basic(self): def test_basic(self):
"""Simple test to ensure that multiple rooms can be created and joined, """Simple test to ensure that multiple rooms can be created and joined,
and that different rooms get handled by different instances. and that different rooms get handled by different instances.
@ -100,3 +131,189 @@ class EventPersisterShardTestCase(BaseMultiWorkerStreamTestCase):
self.assertTrue(persisted_on_1) self.assertTrue(persisted_on_1)
self.assertTrue(persisted_on_2) self.assertTrue(persisted_on_2)
def test_vector_clock_token(self):
"""Tests that using a stream token with a vector clock component works
correctly with basic /sync and /messages usage.
"""
self.make_worker_hs(
"synapse.app.generic_worker", {"worker_name": "worker1"},
)
worker_hs2 = self.make_worker_hs(
"synapse.app.generic_worker", {"worker_name": "worker2"},
)
sync_hs = self.make_worker_hs(
"synapse.app.generic_worker", {"worker_name": "sync"},
)
# Specially selected room IDs that get persisted on different workers.
room_id1 = "!foo:test"
room_id2 = "!baz:test"
self.assertEqual(
self.hs.config.worker.events_shard_config.get_instance(room_id1), "worker1"
)
self.assertEqual(
self.hs.config.worker.events_shard_config.get_instance(room_id2), "worker2"
)
user_id = self.register_user("user", "pass")
access_token = self.login("user", "pass")
store = self.hs.get_datastore()
# Create two room on the different workers.
self._create_room(room_id1, user_id, access_token)
self._create_room(room_id2, user_id, access_token)
# The other user joins
self.helper.join(
room=room_id1, user=self.other_user_id, tok=self.other_access_token
)
self.helper.join(
room=room_id2, user=self.other_user_id, tok=self.other_access_token
)
# Do an initial sync so that we're up to date.
request, channel = self.make_request("GET", "/sync", access_token=access_token)
self.render_on_worker(sync_hs, request)
next_batch = channel.json_body["next_batch"]
# We now gut wrench into the events stream MultiWriterIdGenerator on
# worker2 to mimic it getting stuck persisting an event. This ensures
# that when we send an event on worker1 we end up in a state where
# worker2 events stream position lags that on worker1, resulting in a
# RoomStreamToken with a non-empty instance map component.
#
# Worker2's event stream position will not advance until we call
# __aexit__ again.
actx = worker_hs2.get_datastore()._stream_id_gen.get_next()
self.get_success(actx.__aenter__())
response = self.helper.send(room_id1, body="Hi!", tok=self.other_access_token)
first_event_in_room1 = response["event_id"]
# Assert that the current stream token has an instance map component, as
# we are trying to test vector clock tokens.
room_stream_token = store.get_room_max_token()
self.assertNotEqual(len(room_stream_token.instance_map), 0)
# Check that syncing still gets the new event, despite the gap in the
# stream IDs.
request, channel = self.make_request(
"GET", "/sync?since={}".format(next_batch), access_token=access_token
)
self.render_on_worker(sync_hs, request)
# We should only see the new event and nothing else
self.assertIn(room_id1, channel.json_body["rooms"]["join"])
self.assertNotIn(room_id2, channel.json_body["rooms"]["join"])
events = channel.json_body["rooms"]["join"][room_id1]["timeline"]["events"]
self.assertListEqual(
[first_event_in_room1], [event["event_id"] for event in events]
)
# Get the next batch and makes sure its a vector clock style token.
vector_clock_token = channel.json_body["next_batch"]
self.assertTrue(vector_clock_token.startswith("m"))
# Now that we've got a vector clock token we finish the fake persisting
# an event we started above.
self.get_success(actx.__aexit__(None, None, None))
# Now try and send an event to the other rooom so that we can test that
# the vector clock style token works as a `since` token.
response = self.helper.send(room_id2, body="Hi!", tok=self.other_access_token)
first_event_in_room2 = response["event_id"]
request, channel = self.make_request(
"GET",
"/sync?since={}".format(vector_clock_token),
access_token=access_token,
)
self.render_on_worker(sync_hs, request)
self.assertNotIn(room_id1, channel.json_body["rooms"]["join"])
self.assertIn(room_id2, channel.json_body["rooms"]["join"])
events = channel.json_body["rooms"]["join"][room_id2]["timeline"]["events"]
self.assertListEqual(
[first_event_in_room2], [event["event_id"] for event in events]
)
next_batch = channel.json_body["next_batch"]
# We also want to test that the vector clock style token works with
# pagination. We do this by sending a couple of new events into the room
# and syncing again to get a prev_batch token for each room, then
# paginating from there back to the vector clock token.
self.helper.send(room_id1, body="Hi again!", tok=self.other_access_token)
self.helper.send(room_id2, body="Hi again!", tok=self.other_access_token)
request, channel = self.make_request(
"GET", "/sync?since={}".format(next_batch), access_token=access_token
)
self.render_on_worker(sync_hs, request)
prev_batch1 = channel.json_body["rooms"]["join"][room_id1]["timeline"][
"prev_batch"
]
prev_batch2 = channel.json_body["rooms"]["join"][room_id2]["timeline"][
"prev_batch"
]
# Paginating back in the first room should not produce any results, as
# no events have happened in it. This tests that we are correctly
# filtering results based on the vector clock portion.
request, channel = self.make_request(
"GET",
"/rooms/{}/messages?from={}&to={}&dir=b".format(
room_id1, prev_batch1, vector_clock_token
),
access_token=access_token,
)
self.render_on_worker(sync_hs, request)
self.assertListEqual([], channel.json_body["chunk"])
# Paginating back on the second room should produce the first event
# again. This tests that pagination isn't completely broken.
request, channel = self.make_request(
"GET",
"/rooms/{}/messages?from={}&to={}&dir=b".format(
room_id2, prev_batch2, vector_clock_token
),
access_token=access_token,
)
self.render_on_worker(sync_hs, request)
self.assertEqual(len(channel.json_body["chunk"]), 1)
self.assertEqual(
channel.json_body["chunk"][0]["event_id"], first_event_in_room2
)
# Paginating forwards should give the same results
request, channel = self.make_request(
"GET",
"/rooms/{}/messages?from={}&to={}&dir=f".format(
room_id1, vector_clock_token, prev_batch1
),
access_token=access_token,
)
self.render_on_worker(sync_hs, request)
self.assertListEqual([], channel.json_body["chunk"])
request, channel = self.make_request(
"GET",
"/rooms/{}/messages?from={}&to={}&dir=f".format(
room_id2, vector_clock_token, prev_batch2,
),
access_token=access_token,
)
self.render_on_worker(sync_hs, request)
self.assertEqual(len(channel.json_body["chunk"]), 1)
self.assertEqual(
channel.json_body["chunk"][0]["event_id"], first_event_in_room2
)

View file

@ -20,7 +20,7 @@ import hmac
import inspect import inspect
import logging import logging
import time import time
from typing import Optional, Tuple, Type, TypeVar, Union from typing import Optional, Tuple, Type, TypeVar, Union, overload
from mock import Mock, patch from mock import Mock, patch
@ -364,6 +364,36 @@ class HomeserverTestCase(TestCase):
Function to optionally be overridden in subclasses. Function to optionally be overridden in subclasses.
""" """
# Annoyingly mypy doesn't seem to pick up the fact that T is SynapseRequest
# when the `request` arg isn't given, so we define an explicit override to
# cover that case.
@overload
def make_request(
self,
method: Union[bytes, str],
path: Union[bytes, str],
content: Union[bytes, dict] = b"",
access_token: Optional[str] = None,
shorthand: bool = True,
federation_auth_origin: str = None,
content_is_form: bool = False,
) -> Tuple[SynapseRequest, FakeChannel]:
...
@overload
def make_request(
self,
method: Union[bytes, str],
path: Union[bytes, str],
content: Union[bytes, dict] = b"",
access_token: Optional[str] = None,
request: Type[T] = SynapseRequest,
shorthand: bool = True,
federation_auth_origin: str = None,
content_is_form: bool = False,
) -> Tuple[T, FakeChannel]:
...
def make_request( def make_request(
self, self,
method: Union[bytes, str], method: Union[bytes, str],