diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index fc45123d0..20a026e77 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -60,14 +60,16 @@ class UserDirectoryHandler(object): self.update_user_directory = hs.config.update_user_directory self.search_all_users = hs.config.user_directory_search_all_users + # If we're a worker, don't sleep when doing the initial room work, as it + # won't monopolise the master's CPU. + if hs.config.worker_app: + self.INITIAL_ROOM_SLEEP_MS = 0 + self.INITIAL_USER_SLEEP_MS = 0 + # When start up for the first time we need to populate the user_directory. # This is a set of user_id's we've inserted already self.initially_handled_users = set() - self.register_background_update_handler( - "users_in_public_rooms_initial", self._populate_users_in_public_rooms - ) - # The current position in the current_state_delta stream self.pos = None @@ -81,41 +83,6 @@ class UserDirectoryHandler(object): # we start populating the user directory self.clock.call_later(0, self.notify_new_event) - @defer.inlineCallbacks - def _populate_users_in_public_rooms(self, progress, batch_size): - """ - Populate the users_in_public_rooms table with the contents of the - users_who_share_public_rooms table. - """ - - def _fetch(txn): - sql = "SELECT DISTINCT other_user_id FROM users_who_share_public_rooms" - txn.execute(sql) - return txn.fetchall() - - users = yield self.store.runInteraction( - "populate_users_in_public_rooms_fetch", _fetch - ) - - if users: - - def _fill(txn): - self._simple_upsert_many_txn( - txn, - table="users_in_public_rooms", - key_names=["user_id"], - key_values=users, - value_names=(), - value_values=None, - ) - - users = yield self.store.runInteraction( - "populate_users_in_public_rooms_fill", _fill - ) - - yield self._end_background_update("users_in_public_rooms_initial") - defer.returnValue(1) - def search_users(self, user_id, search_term, limit): """Searches for users in directory diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index a0333d530..7e3903859 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -767,18 +767,25 @@ class SQLBaseStore(object): """ allvalues = {} allvalues.update(keyvalues) - allvalues.update(values) allvalues.update(insertion_values) + if not values: + latter = "NOTHING" + else: + allvalues.update(values) + latter = ( + "UPDATE SET " + ", ".join(k + "=EXCLUDED." + k for k in values) + ) + sql = ( "INSERT INTO %s (%s) VALUES (%s) " - "ON CONFLICT (%s) DO UPDATE SET %s" + "ON CONFLICT (%s) DO %s" ) % ( table, ", ".join(k for k in allvalues), ", ".join("?" for _ in allvalues), ", ".join(k for k in keyvalues), - ", ".join(k + "=EXCLUDED." + k for k in values), + latter ) txn.execute(sql, list(allvalues.values())) diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index 8f40277b5..a15366a11 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -22,16 +22,57 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, JoinRules from synapse.storage.engines import PostgresEngine, Sqlite3Engine +from synapse.storage.background_updates import BackgroundUpdateStore from synapse.storage.state import StateFilter from synapse.types import get_domain_from_id, get_localpart_from_id from synapse.util.caches.descriptors import cached, cachedInlineCallbacks -from ._base import SQLBaseStore - logger = logging.getLogger(__name__) -class UserDirectoryStore(SQLBaseStore): +class UserDirectoryStore(BackgroundUpdateStore): + def __init__(self, dbconn, hs): + super(UserDirectoryStore, self).__init__(dbconn, hs) + + self.register_background_update_handler( + "users_in_public_rooms_initial", self._populate_users_in_public_rooms + ) + + + @defer.inlineCallbacks + def _populate_users_in_public_rooms(self, progress, batch_size): + """ + Populate the users_in_public_rooms table with the contents of the + users_who_share_public_rooms table. + """ + + def _fetch(txn): + sql = "SELECT DISTINCT other_user_id FROM users_who_share_public_rooms" + txn.execute(sql) + return txn.fetchall() + + users = yield self.runInteraction( + "populate_users_in_public_rooms_fetch", _fetch + ) + + if users: + def _fill(txn): + self._simple_upsert_many_txn( + txn, + table="users_in_public_rooms", + key_names=["user_id"], + key_values=users, + value_names=(), + value_values=None, + ) + + users = yield self.runInteraction( + "populate_users_in_public_rooms_fill", _fill + ) + + yield self._end_background_update("users_in_public_rooms_initial") + defer.returnValue(1) + @defer.inlineCallbacks def is_room_world_readable_or_publicly_joinable(self, room_id): """Check if the room is either world_readable or publically joinable @@ -353,8 +394,7 @@ class UserDirectoryStore(SQLBaseStore): txn, "users_in_public_rooms", keyvalues={"user_id": user_id}, - values={}, - desc="add_user_as_in_public_room", + values=None, ) for user_id, other_user_id in user_id_tuples: @@ -603,7 +643,7 @@ class UserDirectoryStore(SQLBaseStore): else: join_clause = """ LEFT JOIN ( - SELECT other_user_id AS user_id FROM users_who_share_public_rooms + SELECT user_id FROM users_in_public_rooms UNION SELECT other_user_id AS user_id FROM users_who_share_private_rooms WHERE user_id = ? diff --git a/tests/handlers/test_user_directory.py b/tests/handlers/test_user_directory.py index 0e0ac0a48..7a78451a6 100644 --- a/tests/handlers/test_user_directory.py +++ b/tests/handlers/test_user_directory.py @@ -116,12 +116,13 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase): # Check we have populated the database correctly. shares_public = self.get_users_who_share_public_rooms() shares_private = self.get_users_who_share_private_rooms() + public_users = self.get_users_in_public_rooms() self.assertEqual(shares_public, []) self.assertEqual( self._compress_shared(shares_private), set([(u1, u2, room), (u2, u1, room)]) ) - self.assertEqual(set(public_users), set([u1, u2])) + self.assertEqual(public_users, []) # We get one search result when searching for user2 by user1. s = self.get_success(self.handler.search_users(u1, "user2", 10)) @@ -145,7 +146,7 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase): self.assertEqual(shares_public, []) self.assertEqual(self._compress_shared(shares_private), set()) - self.assertEqual(public_users, [u1]) + self.assertEqual(public_users, []) # User1 now gets no search results for any of the other users. s = self.get_success(self.handler.search_users(u1, "user2", 10)) @@ -165,10 +166,10 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase): def get_users_in_public_rooms(self): return self.get_success( - self.store._simple_select_list( + self.store._simple_select_onecol( "users_in_public_rooms", None, - ["user_id"], + "user_id", ) ) @@ -214,9 +215,12 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase): shares_public = self.get_users_who_share_public_rooms() shares_private = self.get_users_who_share_private_rooms() + public_users = self.get_users_in_public_rooms() + # Nothing updated yet self.assertEqual(shares_private, []) self.assertEqual(shares_public, []) + self.assertEqual(public_users, []) # Reset the handled users caches self.handler.initially_handled_users = set() @@ -233,6 +237,7 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase): shares_public = self.get_users_who_share_public_rooms() shares_private = self.get_users_who_share_private_rooms() + public_users = self.get_users_in_public_rooms() # User 1 and User 2 share public rooms self.assertEqual( @@ -245,6 +250,9 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase): set([(u1, u3, private_room), (u3, u1, private_room)]), ) + # User 1 and 2 are in public rooms + self.assertEqual(set(public_users), set([u1, u2])) + def test_search_all_users(self): """ Search all users = True means that a user does not have to share a