forked from MirrorHub/synapse
Don't drop user dir deltas when server leaves room (#10982)
Fix a long-standing bug where a batch of user directory changes would be silently dropped if the server left a room early in the batch. * Pull out `wait_for_background_update` in tests Co-authored-by: Patrick Cloke <clokep@users.noreply.github.com> Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com>
This commit is contained in:
parent
38b7db5885
commit
370bca32e6
11 changed files with 63 additions and 79 deletions
1
changelog.d/10982.bugfix
Normal file
1
changelog.d/10982.bugfix
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Fix a long-standing bug where the remainder of a batch of user directory changes would be silently dropped if the server left a room early in the batch.
|
|
@ -220,7 +220,7 @@ class UserDirectoryHandler(StateDeltasHandler):
|
||||||
|
|
||||||
for user_id in user_ids:
|
for user_id in user_ids:
|
||||||
await self._handle_remove_user(room_id, user_id)
|
await self._handle_remove_user(room_id, user_id)
|
||||||
return
|
continue
|
||||||
else:
|
else:
|
||||||
logger.debug("Server is still in room: %r", room_id)
|
logger.debug("Server is still in room: %r", room_id)
|
||||||
|
|
||||||
|
|
|
@ -103,12 +103,7 @@ class StatsRoomTests(unittest.HomeserverTestCase):
|
||||||
# Do the initial population of the stats via the background update
|
# Do the initial population of the stats via the background update
|
||||||
self._add_background_updates()
|
self._add_background_updates()
|
||||||
|
|
||||||
while not self.get_success(
|
self.wait_for_background_updates()
|
||||||
self.store.db_pool.updates.has_completed_background_updates()
|
|
||||||
):
|
|
||||||
self.get_success(
|
|
||||||
self.store.db_pool.updates.do_next_background_update(100), by=0.1
|
|
||||||
)
|
|
||||||
|
|
||||||
def test_initial_room(self):
|
def test_initial_room(self):
|
||||||
"""
|
"""
|
||||||
|
@ -140,12 +135,7 @@ class StatsRoomTests(unittest.HomeserverTestCase):
|
||||||
# Do the initial population of the user directory via the background update
|
# Do the initial population of the user directory via the background update
|
||||||
self._add_background_updates()
|
self._add_background_updates()
|
||||||
|
|
||||||
while not self.get_success(
|
self.wait_for_background_updates()
|
||||||
self.store.db_pool.updates.has_completed_background_updates()
|
|
||||||
):
|
|
||||||
self.get_success(
|
|
||||||
self.store.db_pool.updates.do_next_background_update(100), by=0.1
|
|
||||||
)
|
|
||||||
|
|
||||||
r = self.get_success(self.get_all_room_state())
|
r = self.get_success(self.get_all_room_state())
|
||||||
|
|
||||||
|
@ -568,12 +558,7 @@ class StatsRoomTests(unittest.HomeserverTestCase):
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
while not self.get_success(
|
self.wait_for_background_updates()
|
||||||
self.store.db_pool.updates.has_completed_background_updates()
|
|
||||||
):
|
|
||||||
self.get_success(
|
|
||||||
self.store.db_pool.updates.do_next_background_update(100), by=0.1
|
|
||||||
)
|
|
||||||
|
|
||||||
r1stats_complete = self._get_current_stats("room", r1)
|
r1stats_complete = self._get_current_stats("room", r1)
|
||||||
u1stats_complete = self._get_current_stats("user", u1)
|
u1stats_complete = self._get_current_stats("user", u1)
|
||||||
|
|
|
@ -363,6 +363,45 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase):
|
||||||
self.assertEqual(len(s["results"]), 1)
|
self.assertEqual(len(s["results"]), 1)
|
||||||
self.assertEqual(s["results"][0]["user_id"], user)
|
self.assertEqual(s["results"][0]["user_id"], user)
|
||||||
|
|
||||||
|
def test_process_join_after_server_leaves_room(self) -> None:
|
||||||
|
alice = self.register_user("alice", "pass")
|
||||||
|
alice_token = self.login(alice, "pass")
|
||||||
|
bob = self.register_user("bob", "pass")
|
||||||
|
bob_token = self.login(bob, "pass")
|
||||||
|
|
||||||
|
# Alice makes two rooms. Bob joins one of them.
|
||||||
|
room1 = self.helper.create_room_as(alice, tok=alice_token)
|
||||||
|
room2 = self.helper.create_room_as(alice, tok=alice_token)
|
||||||
|
print("room1=", room1)
|
||||||
|
print("room2=", room2)
|
||||||
|
self.helper.join(room1, bob, tok=bob_token)
|
||||||
|
|
||||||
|
# The user sharing tables should have been updated.
|
||||||
|
public1 = self.get_success(self.user_dir_helper.get_users_in_public_rooms())
|
||||||
|
self.assertEqual(set(public1), {(alice, room1), (alice, room2), (bob, room1)})
|
||||||
|
|
||||||
|
# Alice leaves room1. The user sharing tables should be updated.
|
||||||
|
self.helper.leave(room1, alice, tok=alice_token)
|
||||||
|
public2 = self.get_success(self.user_dir_helper.get_users_in_public_rooms())
|
||||||
|
self.assertEqual(set(public2), {(alice, room2), (bob, room1)})
|
||||||
|
|
||||||
|
# Pause the processing of new events.
|
||||||
|
dir_handler = self.hs.get_user_directory_handler()
|
||||||
|
dir_handler.update_user_directory = False
|
||||||
|
|
||||||
|
# Bob leaves one room and joins the other.
|
||||||
|
self.helper.leave(room1, bob, tok=bob_token)
|
||||||
|
self.helper.join(room2, bob, tok=bob_token)
|
||||||
|
|
||||||
|
# Process the leave and join in one go.
|
||||||
|
dir_handler.update_user_directory = True
|
||||||
|
dir_handler.notify_new_event()
|
||||||
|
self.wait_for_background_updates()
|
||||||
|
|
||||||
|
# The user sharing tables should have been updated.
|
||||||
|
public3 = self.get_success(self.user_dir_helper.get_users_in_public_rooms())
|
||||||
|
self.assertEqual(set(public3), {(alice, room2), (bob, room2)})
|
||||||
|
|
||||||
def test_private_room(self) -> None:
|
def test_private_room(self) -> None:
|
||||||
"""
|
"""
|
||||||
A user can be searched for only by people that are either in a public
|
A user can be searched for only by people that are either in a public
|
||||||
|
|
|
@ -79,12 +79,7 @@ class RoomBackgroundUpdateStoreTestCase(HomeserverTestCase):
|
||||||
self.store.db_pool.updates._all_done = False
|
self.store.db_pool.updates._all_done = False
|
||||||
|
|
||||||
# Now let's actually drive the updates to completion
|
# Now let's actually drive the updates to completion
|
||||||
while not self.get_success(
|
self.wait_for_background_updates()
|
||||||
self.store.db_pool.updates.has_completed_background_updates()
|
|
||||||
):
|
|
||||||
self.get_success(
|
|
||||||
self.store.db_pool.updates.do_next_background_update(100), by=0.1
|
|
||||||
)
|
|
||||||
|
|
||||||
# Make sure the background update filled in the room creator
|
# Make sure the background update filled in the room creator
|
||||||
room_creator_after = self.get_success(
|
room_creator_after = self.get_success(
|
||||||
|
|
|
@ -66,12 +66,7 @@ class CleanupExtremBackgroundUpdateStoreTestCase(HomeserverTestCase):
|
||||||
# Ugh, have to reset this flag
|
# Ugh, have to reset this flag
|
||||||
self.store.db_pool.updates._all_done = False
|
self.store.db_pool.updates._all_done = False
|
||||||
|
|
||||||
while not self.get_success(
|
self.wait_for_background_updates()
|
||||||
self.store.db_pool.updates.has_completed_background_updates()
|
|
||||||
):
|
|
||||||
self.get_success(
|
|
||||||
self.store.db_pool.updates.do_next_background_update(100), by=0.1
|
|
||||||
)
|
|
||||||
|
|
||||||
def test_soft_failed_extremities_handled_correctly(self):
|
def test_soft_failed_extremities_handled_correctly(self):
|
||||||
"""Test that extremities are correctly calculated in the presence of
|
"""Test that extremities are correctly calculated in the presence of
|
||||||
|
|
|
@ -242,12 +242,7 @@ class ClientIpStoreTestCase(unittest.HomeserverTestCase):
|
||||||
|
|
||||||
def test_devices_last_seen_bg_update(self):
|
def test_devices_last_seen_bg_update(self):
|
||||||
# First make sure we have completed all updates.
|
# First make sure we have completed all updates.
|
||||||
while not self.get_success(
|
self.wait_for_background_updates()
|
||||||
self.store.db_pool.updates.has_completed_background_updates()
|
|
||||||
):
|
|
||||||
self.get_success(
|
|
||||||
self.store.db_pool.updates.do_next_background_update(100), by=0.1
|
|
||||||
)
|
|
||||||
|
|
||||||
user_id = "@user:id"
|
user_id = "@user:id"
|
||||||
device_id = "MY_DEVICE"
|
device_id = "MY_DEVICE"
|
||||||
|
@ -311,12 +306,7 @@ class ClientIpStoreTestCase(unittest.HomeserverTestCase):
|
||||||
self.store.db_pool.updates._all_done = False
|
self.store.db_pool.updates._all_done = False
|
||||||
|
|
||||||
# Now let's actually drive the updates to completion
|
# Now let's actually drive the updates to completion
|
||||||
while not self.get_success(
|
self.wait_for_background_updates()
|
||||||
self.store.db_pool.updates.has_completed_background_updates()
|
|
||||||
):
|
|
||||||
self.get_success(
|
|
||||||
self.store.db_pool.updates.do_next_background_update(100), by=0.1
|
|
||||||
)
|
|
||||||
|
|
||||||
# We should now get the correct result again
|
# We should now get the correct result again
|
||||||
result = self.get_success(
|
result = self.get_success(
|
||||||
|
@ -337,12 +327,7 @@ class ClientIpStoreTestCase(unittest.HomeserverTestCase):
|
||||||
|
|
||||||
def test_old_user_ips_pruned(self):
|
def test_old_user_ips_pruned(self):
|
||||||
# First make sure we have completed all updates.
|
# First make sure we have completed all updates.
|
||||||
while not self.get_success(
|
self.wait_for_background_updates()
|
||||||
self.store.db_pool.updates.has_completed_background_updates()
|
|
||||||
):
|
|
||||||
self.get_success(
|
|
||||||
self.store.db_pool.updates.do_next_background_update(100), by=0.1
|
|
||||||
)
|
|
||||||
|
|
||||||
user_id = "@user:id"
|
user_id = "@user:id"
|
||||||
device_id = "MY_DEVICE"
|
device_id = "MY_DEVICE"
|
||||||
|
|
|
@ -578,12 +578,7 @@ class EventChainBackgroundUpdateTestCase(HomeserverTestCase):
|
||||||
# Ugh, have to reset this flag
|
# Ugh, have to reset this flag
|
||||||
self.store.db_pool.updates._all_done = False
|
self.store.db_pool.updates._all_done = False
|
||||||
|
|
||||||
while not self.get_success(
|
self.wait_for_background_updates()
|
||||||
self.store.db_pool.updates.has_completed_background_updates()
|
|
||||||
):
|
|
||||||
self.get_success(
|
|
||||||
self.store.db_pool.updates.do_next_background_update(100), by=0.1
|
|
||||||
)
|
|
||||||
|
|
||||||
# Test that the `has_auth_chain_index` has been set
|
# Test that the `has_auth_chain_index` has been set
|
||||||
self.assertTrue(self.get_success(self.store.has_auth_chain_index(room_id)))
|
self.assertTrue(self.get_success(self.store.has_auth_chain_index(room_id)))
|
||||||
|
@ -619,12 +614,7 @@ class EventChainBackgroundUpdateTestCase(HomeserverTestCase):
|
||||||
# Ugh, have to reset this flag
|
# Ugh, have to reset this flag
|
||||||
self.store.db_pool.updates._all_done = False
|
self.store.db_pool.updates._all_done = False
|
||||||
|
|
||||||
while not self.get_success(
|
self.wait_for_background_updates()
|
||||||
self.store.db_pool.updates.has_completed_background_updates()
|
|
||||||
):
|
|
||||||
self.get_success(
|
|
||||||
self.store.db_pool.updates.do_next_background_update(100), by=0.1
|
|
||||||
)
|
|
||||||
|
|
||||||
# Test that the `has_auth_chain_index` has been set
|
# Test that the `has_auth_chain_index` has been set
|
||||||
self.assertTrue(self.get_success(self.store.has_auth_chain_index(room_id1)))
|
self.assertTrue(self.get_success(self.store.has_auth_chain_index(room_id1)))
|
||||||
|
|
|
@ -169,12 +169,7 @@ class CurrentStateMembershipUpdateTestCase(unittest.HomeserverTestCase):
|
||||||
|
|
||||||
def test_can_rerun_update(self):
|
def test_can_rerun_update(self):
|
||||||
# First make sure we have completed all updates.
|
# First make sure we have completed all updates.
|
||||||
while not self.get_success(
|
self.wait_for_background_updates()
|
||||||
self.store.db_pool.updates.has_completed_background_updates()
|
|
||||||
):
|
|
||||||
self.get_success(
|
|
||||||
self.store.db_pool.updates.do_next_background_update(100), by=0.1
|
|
||||||
)
|
|
||||||
|
|
||||||
# Now let's create a room, which will insert a membership
|
# Now let's create a room, which will insert a membership
|
||||||
user = UserID("alice", "test")
|
user = UserID("alice", "test")
|
||||||
|
@ -197,9 +192,4 @@ class CurrentStateMembershipUpdateTestCase(unittest.HomeserverTestCase):
|
||||||
self.store.db_pool.updates._all_done = False
|
self.store.db_pool.updates._all_done = False
|
||||||
|
|
||||||
# Now let's actually drive the updates to completion
|
# Now let's actually drive the updates to completion
|
||||||
while not self.get_success(
|
self.wait_for_background_updates()
|
||||||
self.store.db_pool.updates.has_completed_background_updates()
|
|
||||||
):
|
|
||||||
self.get_success(
|
|
||||||
self.store.db_pool.updates.do_next_background_update(100), by=0.1
|
|
||||||
)
|
|
||||||
|
|
|
@ -212,12 +212,7 @@ class UserDirectoryInitialPopulationTestcase(HomeserverTestCase):
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
while not self.get_success(
|
self.wait_for_background_updates()
|
||||||
self.store.db_pool.updates.has_completed_background_updates()
|
|
||||||
):
|
|
||||||
self.get_success(
|
|
||||||
self.store.db_pool.updates.do_next_background_update(100), by=0.1
|
|
||||||
)
|
|
||||||
|
|
||||||
def test_initial(self) -> None:
|
def test_initial(self) -> None:
|
||||||
"""
|
"""
|
||||||
|
|
|
@ -317,6 +317,15 @@ class HomeserverTestCase(TestCase):
|
||||||
self.reactor.advance(0.01)
|
self.reactor.advance(0.01)
|
||||||
time.sleep(0.01)
|
time.sleep(0.01)
|
||||||
|
|
||||||
|
def wait_for_background_updates(self) -> None:
|
||||||
|
"""Block until all background database updates have completed."""
|
||||||
|
while not self.get_success(
|
||||||
|
self.store.db_pool.updates.has_completed_background_updates()
|
||||||
|
):
|
||||||
|
self.get_success(
|
||||||
|
self.store.db_pool.updates.do_next_background_update(100), by=0.1
|
||||||
|
)
|
||||||
|
|
||||||
def make_homeserver(self, reactor, clock):
|
def make_homeserver(self, reactor, clock):
|
||||||
"""
|
"""
|
||||||
Make and return a homeserver.
|
Make and return a homeserver.
|
||||||
|
|
Loading…
Reference in a new issue