forked from MirrorHub/synapse
Split the table in two
This commit is contained in:
parent
d5477c7afd
commit
21e255a8f1
4 changed files with 138 additions and 31 deletions
|
@ -50,6 +50,7 @@ class UserDirectoyHandler(object):
|
||||||
# When start up for the first time we need to populate the user_directory.
|
# When start up for the first time we need to populate the user_directory.
|
||||||
# This is a set of user_id's we've inserted already
|
# This is a set of user_id's we've inserted already
|
||||||
self.initially_handled_users = set()
|
self.initially_handled_users = set()
|
||||||
|
self.initially_handled_users_in_public = set()
|
||||||
|
|
||||||
# The current position in the current_state_delta stream
|
# The current position in the current_state_delta stream
|
||||||
self.pos = None
|
self.pos = None
|
||||||
|
@ -145,8 +146,6 @@ class UserDirectoyHandler(object):
|
||||||
return
|
return
|
||||||
|
|
||||||
is_public = yield self.store.is_room_world_readable_or_publicly_joinable(room_id)
|
is_public = yield self.store.is_room_world_readable_or_publicly_joinable(room_id)
|
||||||
if not is_public:
|
|
||||||
return
|
|
||||||
|
|
||||||
users_with_profile = yield self.state.get_current_user_in_room(room_id)
|
users_with_profile = yield self.state.get_current_user_in_room(room_id)
|
||||||
unhandled_users = set(users_with_profile) - self.initially_handled_users
|
unhandled_users = set(users_with_profile) - self.initially_handled_users
|
||||||
|
@ -159,6 +158,13 @@ class UserDirectoyHandler(object):
|
||||||
|
|
||||||
self.initially_handled_users |= unhandled_users
|
self.initially_handled_users |= unhandled_users
|
||||||
|
|
||||||
|
if is_public:
|
||||||
|
yield self.store.add_users_to_public_room(
|
||||||
|
room_id,
|
||||||
|
user_ids=unhandled_users - self.initially_handled_users_in_public
|
||||||
|
)
|
||||||
|
self.initially_handled_users_in_public != unhandled_users
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _handle_deltas(self, deltas):
|
def _handle_deltas(self, deltas):
|
||||||
"""Called with the state deltas to process
|
"""Called with the state deltas to process
|
||||||
|
@ -206,14 +212,7 @@ class UserDirectoyHandler(object):
|
||||||
else:
|
else:
|
||||||
logger.debug("Server is still in room: %r", room_id)
|
logger.debug("Server is still in room: %r", room_id)
|
||||||
|
|
||||||
is_public = yield self.store.is_room_world_readable_or_publicly_joinable(
|
|
||||||
room_id
|
|
||||||
)
|
|
||||||
|
|
||||||
if change: # The user joined
|
if change: # The user joined
|
||||||
if not is_public:
|
|
||||||
return
|
|
||||||
|
|
||||||
event = yield self.store.get_event(event_id)
|
event = yield self.store.get_event(event_id)
|
||||||
profile = ProfileInfo(
|
profile = ProfileInfo(
|
||||||
avatar_url=event.content.get("avatar_url"),
|
avatar_url=event.content.get("avatar_url"),
|
||||||
|
@ -276,11 +275,13 @@ class UserDirectoyHandler(object):
|
||||||
# ignore the change
|
# ignore the change
|
||||||
return
|
return
|
||||||
|
|
||||||
users_with_profile = yield self.state.get_current_user_in_room(room_id)
|
if change:
|
||||||
for user_id, profile in users_with_profile.iteritems():
|
users_with_profile = yield self.state.get_current_user_in_room(room_id)
|
||||||
if change:
|
for user_id, profile in users_with_profile.iteritems():
|
||||||
yield self._handle_new_user(room_id, user_id, profile)
|
yield self._handle_new_user(room_id, user_id, profile)
|
||||||
else:
|
else:
|
||||||
|
users = yield self.store.get_users_in_public_due_to_room(room_id)
|
||||||
|
for user_id in users:
|
||||||
yield self._handle_remove_user(room_id, user_id)
|
yield self._handle_remove_user(room_id, user_id)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
|
@ -292,11 +293,21 @@ class UserDirectoyHandler(object):
|
||||||
user_id (str)
|
user_id (str)
|
||||||
"""
|
"""
|
||||||
logger.debug("Adding user to dir, %r", user_id)
|
logger.debug("Adding user to dir, %r", user_id)
|
||||||
|
|
||||||
row = yield self.store.get_user_in_directory(user_id)
|
row = yield self.store.get_user_in_directory(user_id)
|
||||||
if row:
|
if not row:
|
||||||
|
yield self.store.add_profiles_to_user_dir(room_id, {user_id: profile})
|
||||||
|
|
||||||
|
is_public = yield self.store.is_room_world_readable_or_publicly_joinable(
|
||||||
|
room_id
|
||||||
|
)
|
||||||
|
|
||||||
|
if not is_public:
|
||||||
return
|
return
|
||||||
|
|
||||||
yield self.store.add_profiles_to_user_dir(room_id, {user_id: profile})
|
row = yield self.store.get_user_in_public_room(user_id)
|
||||||
|
if not row:
|
||||||
|
yield self.store.add_users_to_public_room(room_id, [user_id])
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _handle_remove_user(self, room_id, user_id):
|
def _handle_remove_user(self, room_id, user_id):
|
||||||
|
@ -309,15 +320,20 @@ class UserDirectoyHandler(object):
|
||||||
logger.debug("Maybe removing user %r", user_id)
|
logger.debug("Maybe removing user %r", user_id)
|
||||||
|
|
||||||
row = yield self.store.get_user_in_directory(user_id)
|
row = yield self.store.get_user_in_directory(user_id)
|
||||||
if not row or row["room_id"] != room_id:
|
update_user_dir = row and row["room_id"] == room_id
|
||||||
# Either the user wasn't in directory or we're still in a room that
|
|
||||||
# is public (i.e. the room_id in the database)
|
row = yield self.store.get_user_in_public_room(user_id)
|
||||||
logger.debug("Not removing as row: %r", row)
|
update_user_in_public = row and row["room_id"] == room_id
|
||||||
|
|
||||||
|
if not update_user_in_public and not update_user_dir:
|
||||||
return
|
return
|
||||||
|
|
||||||
# XXX: Make this faster?
|
# XXX: Make this faster?
|
||||||
rooms = yield self.store.get_rooms_for_user(user_id)
|
rooms = yield self.store.get_rooms_for_user(user_id)
|
||||||
for j_room_id in rooms:
|
for j_room_id in rooms:
|
||||||
|
if not update_user_in_public and not update_user_dir:
|
||||||
|
break
|
||||||
|
|
||||||
is_in_room = yield self.state.get_is_host_in_room(
|
is_in_room = yield self.state.get_is_host_in_room(
|
||||||
j_room_id, self.server_name,
|
j_room_id, self.server_name,
|
||||||
)
|
)
|
||||||
|
@ -325,16 +341,23 @@ class UserDirectoyHandler(object):
|
||||||
if not is_in_room:
|
if not is_in_room:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
is_public = yield self.store.is_room_world_readable_or_publicly_joinable(
|
if update_user_dir:
|
||||||
j_room_id
|
update_user_dir = False
|
||||||
)
|
|
||||||
|
|
||||||
if is_public:
|
|
||||||
yield self.store.update_user_in_user_dir(user_id, j_room_id)
|
yield self.store.update_user_in_user_dir(user_id, j_room_id)
|
||||||
logger.debug("Not removing as found other public room: %r", j_room_id)
|
|
||||||
return
|
|
||||||
|
|
||||||
yield self.store.remove_from_user_dir(user_id)
|
if update_user_in_public:
|
||||||
|
is_public = yield self.store.is_room_world_readable_or_publicly_joinable(
|
||||||
|
j_room_id
|
||||||
|
)
|
||||||
|
|
||||||
|
if is_public:
|
||||||
|
yield self.store.update_user_in_public_user_list(user_id, j_room_id)
|
||||||
|
update_user_in_public = False
|
||||||
|
|
||||||
|
if update_user_dir:
|
||||||
|
yield self.store.remove_from_user_dir(user_id)
|
||||||
|
elif update_user_in_public:
|
||||||
|
yield self.store.remove_from_user_in_public_room(user_id)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _get_key_change(self, prev_event_id, event_id, key_name, public_value):
|
def _get_key_change(self, prev_event_id, event_id, key_name, public_value):
|
||||||
|
|
|
@ -425,6 +425,11 @@ class SQLBaseStore(object):
|
||||||
|
|
||||||
txn.execute(sql, vals)
|
txn.execute(sql, vals)
|
||||||
|
|
||||||
|
def _simple_insert_many(self, table, values, desc):
|
||||||
|
return self.runInteraction(
|
||||||
|
desc, self._simple_insert_many_txn, table, values
|
||||||
|
)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _simple_insert_many_txn(txn, table, values):
|
def _simple_insert_many_txn(txn, table, values):
|
||||||
if not values:
|
if not values:
|
||||||
|
|
|
@ -31,13 +31,21 @@ INSERT INTO user_directory_stream_pos (stream_id) VALUES (null);
|
||||||
|
|
||||||
CREATE TABLE user_directory (
|
CREATE TABLE user_directory (
|
||||||
user_id TEXT NOT NULL,
|
user_id TEXT NOT NULL,
|
||||||
room_id TEXT NOT NULL, -- A room_id that we know is public
|
room_id TEXT NOT NULL, -- A room_id that we know the user is joined to
|
||||||
display_name TEXT,
|
display_name TEXT,
|
||||||
avatar_url TEXT
|
avatar_url TEXT
|
||||||
);
|
);
|
||||||
|
|
||||||
CREATE INDEX user_directory_room_idx ON user_directory(room_id);
|
CREATE INDEX user_directory_room_idx ON user_directory(room_id);
|
||||||
CREATE UNIQUE INDEX user_directory_user_idx ON user_directory(user_id);
|
CREATE UNIQUE INDEX user_directory_user_idx ON user_directory(user_id);
|
||||||
|
|
||||||
|
CREATE TABLE users_in_pubic_room (
|
||||||
|
user_id TEXT NOT NULL,
|
||||||
|
room_id TEXT NOT NULL -- A room_id that we know is public
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE INDEX users_in_pubic_room_room_idx ON users_in_pubic_room(room_id);
|
||||||
|
CREATE UNIQUE INDEX users_in_pubic_room_user_idx ON users_in_pubic_room(user_id);
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -50,12 +50,34 @@ class UserDirectoryStore(SQLBaseStore):
|
||||||
|
|
||||||
defer.returnValue(False)
|
defer.returnValue(False)
|
||||||
|
|
||||||
def add_profiles_to_user_dir(self, room_id, users_with_profile):
|
@defer.inlineCallbacks
|
||||||
"""Add profiles to the user directory
|
def add_users_to_public_room(self, room_id, user_ids):
|
||||||
|
"""Add user to the list of users in public rooms
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
room_id (str): A room_id that all users are in that is world_readable
|
room_id (str): A room_id that all users are in that is world_readable
|
||||||
or publically joinable
|
or publically joinable
|
||||||
|
user_ids (list(str)): Users to add
|
||||||
|
"""
|
||||||
|
yield self._simple_insert_many(
|
||||||
|
table="users_in_pubic_room",
|
||||||
|
values=[
|
||||||
|
{
|
||||||
|
"user_id": user_id,
|
||||||
|
"room_id": room_id,
|
||||||
|
}
|
||||||
|
for user_id in user_ids
|
||||||
|
],
|
||||||
|
desc="add_users_to_public_room"
|
||||||
|
)
|
||||||
|
for user_id in user_ids:
|
||||||
|
self.get_user_in_public_room.invalidate((user_id,))
|
||||||
|
|
||||||
|
def add_profiles_to_user_dir(self, room_id, users_with_profile):
|
||||||
|
"""Add profiles to the user directory
|
||||||
|
|
||||||
|
Args:
|
||||||
|
room_id (str): A room_id that all users are joined to
|
||||||
users_with_profile (dict): Users to add to directory in the form of
|
users_with_profile (dict): Users to add to directory in the form of
|
||||||
mapping of user_id -> ProfileInfo
|
mapping of user_id -> ProfileInfo
|
||||||
"""
|
"""
|
||||||
|
@ -125,7 +147,15 @@ class UserDirectoryStore(SQLBaseStore):
|
||||||
updatevalues={"room_id": room_id},
|
updatevalues={"room_id": room_id},
|
||||||
desc="update_user_in_user_dir",
|
desc="update_user_in_user_dir",
|
||||||
)
|
)
|
||||||
self.get_user_in_directory.invalidate((user_id,))
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def update_user_in_public_user_list(self, user_id, room_id):
|
||||||
|
yield self._simple_update_one(
|
||||||
|
table="users_in_pubic_room",
|
||||||
|
keyvalues={"user_id": user_id},
|
||||||
|
updatevalues={"room_id": room_id},
|
||||||
|
desc="update_user_in_public_user_list",
|
||||||
|
)
|
||||||
|
|
||||||
def remove_from_user_dir(self, user_id):
|
def remove_from_user_dir(self, user_id):
|
||||||
def _remove_from_user_dir_txn(txn):
|
def _remove_from_user_dir_txn(txn):
|
||||||
|
@ -139,13 +169,41 @@ class UserDirectoryStore(SQLBaseStore):
|
||||||
table="user_directory_search",
|
table="user_directory_search",
|
||||||
keyvalues={"user_id": user_id},
|
keyvalues={"user_id": user_id},
|
||||||
)
|
)
|
||||||
|
self._simple_delete_txn(
|
||||||
|
txn,
|
||||||
|
table="users_in_pubic_room",
|
||||||
|
keyvalues={"user_id": user_id},
|
||||||
|
)
|
||||||
txn.call_after(
|
txn.call_after(
|
||||||
self.get_user_in_directory.invalidate, (user_id,)
|
self.get_user_in_directory.invalidate, (user_id,)
|
||||||
)
|
)
|
||||||
|
txn.call_after(
|
||||||
|
self.get_user_in_public_room.invalidate, (user_id,)
|
||||||
|
)
|
||||||
return self.runInteraction(
|
return self.runInteraction(
|
||||||
"remove_from_user_dir", _remove_from_user_dir_txn,
|
"remove_from_user_dir", _remove_from_user_dir_txn,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def remove_from_user_in_public_room(self, user_id):
|
||||||
|
yield self._simple_delete(
|
||||||
|
table="users_in_pubic_room",
|
||||||
|
keyvalues={"user_id": user_id},
|
||||||
|
desc="remove_from_user_in_public_room",
|
||||||
|
)
|
||||||
|
self.get_user_in_public_room.invalidate((user_id,))
|
||||||
|
|
||||||
|
def get_users_in_public_due_to_room(self, room_id):
|
||||||
|
"""Get all user_ids that are in the room directory becuase they're
|
||||||
|
in the given room_id
|
||||||
|
"""
|
||||||
|
return self._simple_select_onecol(
|
||||||
|
table="users_in_pubic_room",
|
||||||
|
keyvalues={"room_id": room_id},
|
||||||
|
retcol="user_id",
|
||||||
|
desc="get_users_in_public_due_to_room",
|
||||||
|
)
|
||||||
|
|
||||||
def get_users_in_dir_due_to_room(self, room_id):
|
def get_users_in_dir_due_to_room(self, room_id):
|
||||||
"""Get all user_ids that are in the room directory becuase they're
|
"""Get all user_ids that are in the room directory becuase they're
|
||||||
in the given room_id
|
in the given room_id
|
||||||
|
@ -173,6 +231,7 @@ class UserDirectoryStore(SQLBaseStore):
|
||||||
def _delete_all_from_user_dir_txn(txn):
|
def _delete_all_from_user_dir_txn(txn):
|
||||||
txn.execute("DELETE FROM user_directory")
|
txn.execute("DELETE FROM user_directory")
|
||||||
txn.execute("DELETE FROM user_directory_search")
|
txn.execute("DELETE FROM user_directory_search")
|
||||||
|
txn.execute("DELETE FROM users_in_pubic_room")
|
||||||
txn.call_after(self.get_user_in_directory.invalidate_all)
|
txn.call_after(self.get_user_in_directory.invalidate_all)
|
||||||
return self.runInteraction(
|
return self.runInteraction(
|
||||||
"delete_all_from_user_dir", _delete_all_from_user_dir_txn
|
"delete_all_from_user_dir", _delete_all_from_user_dir_txn
|
||||||
|
@ -188,6 +247,16 @@ class UserDirectoryStore(SQLBaseStore):
|
||||||
desc="get_user_in_directory",
|
desc="get_user_in_directory",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@cached()
|
||||||
|
def get_user_in_public_room(self, user_id):
|
||||||
|
return self._simple_select_one(
|
||||||
|
table="users_in_pubic_room",
|
||||||
|
keyvalues={"user_id": user_id},
|
||||||
|
retcols=("room_id",),
|
||||||
|
allow_none=True,
|
||||||
|
desc="get_user_in_public_room",
|
||||||
|
)
|
||||||
|
|
||||||
def get_user_directory_stream_pos(self):
|
def get_user_directory_stream_pos(self):
|
||||||
return self._simple_select_one_onecol(
|
return self._simple_select_one_onecol(
|
||||||
table="user_directory_stream_pos",
|
table="user_directory_stream_pos",
|
||||||
|
@ -282,6 +351,7 @@ class UserDirectoryStore(SQLBaseStore):
|
||||||
SELECT user_id, display_name, avatar_url
|
SELECT user_id, display_name, avatar_url
|
||||||
FROM user_directory_search
|
FROM user_directory_search
|
||||||
INNER JOIN user_directory USING (user_id)
|
INNER JOIN user_directory USING (user_id)
|
||||||
|
INNER JOIN users_in_pubic_room USING (user_id)
|
||||||
WHERE vector @@ to_tsquery('english', ?)
|
WHERE vector @@ to_tsquery('english', ?)
|
||||||
ORDER BY
|
ORDER BY
|
||||||
ts_rank_cd(vector, to_tsquery('english', ?), 1) DESC,
|
ts_rank_cd(vector, to_tsquery('english', ?), 1) DESC,
|
||||||
|
@ -295,6 +365,7 @@ class UserDirectoryStore(SQLBaseStore):
|
||||||
SELECT user_id, display_name, avatar_url
|
SELECT user_id, display_name, avatar_url
|
||||||
FROM user_directory_search
|
FROM user_directory_search
|
||||||
INNER JOIN user_directory USING (user_id)
|
INNER JOIN user_directory USING (user_id)
|
||||||
|
INNER JOIN users_in_pubic_room USING (user_id)
|
||||||
WHERE value MATCH ?
|
WHERE value MATCH ?
|
||||||
ORDER BY
|
ORDER BY
|
||||||
rank(matchinfo(user_directory)) DESC,
|
rank(matchinfo(user_directory)) DESC,
|
||||||
|
|
Loading…
Reference in a new issue