forked from MirrorHub/synapse
Preparatory work to fix the user directory assuming that any remote membership state events represent a profile change. [rei:userdirpriv] (#14755)
* Remove special-case method for new memberships only, use more generic method * Only collect profiles from state events in public rooms * Add a table to track stale remote user profiles * Add store methods to set and delete rows in this new table * Mark remote profiles as stale when a member state event comes in to a private room * Newsfile Signed-off-by: Olivier Wilkinson (reivilibre) <oliverw@matrix.org> * Simplify by removing Optionality of `event_id` * Replace names and avatars with None if they're set to dodgy things I think this makes more sense anyway. * Move schema delta to 74 (I missed the boat?) * Turns out these can be None after all --------- Signed-off-by: Olivier Wilkinson (reivilibre) <oliverw@matrix.org>
This commit is contained in:
parent
3bf973edc7
commit
f54f877f27
4 changed files with 127 additions and 34 deletions
1
changelog.d/14755.bugfix
Normal file
1
changelog.d/14755.bugfix
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Fix a long-standing bug in which the user directory would assume any remote membership state events represent a profile change.
|
|
@ -28,6 +28,11 @@ if TYPE_CHECKING:
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# Don't refresh a stale user directory entry, using a Federation /profile request,
|
||||||
|
# for 60 seconds. This gives time for other state events to arrive (which will
|
||||||
|
# then be coalesced such that only one /profile request is made).
|
||||||
|
USER_DIRECTORY_STALE_REFRESH_TIME_MS = 60 * 1000
|
||||||
|
|
||||||
|
|
||||||
class UserDirectoryHandler(StateDeltasHandler):
|
class UserDirectoryHandler(StateDeltasHandler):
|
||||||
"""Handles queries and updates for the user_directory.
|
"""Handles queries and updates for the user_directory.
|
||||||
|
@ -200,8 +205,8 @@ class UserDirectoryHandler(StateDeltasHandler):
|
||||||
typ = delta["type"]
|
typ = delta["type"]
|
||||||
state_key = delta["state_key"]
|
state_key = delta["state_key"]
|
||||||
room_id = delta["room_id"]
|
room_id = delta["room_id"]
|
||||||
event_id = delta["event_id"]
|
event_id: Optional[str] = delta["event_id"]
|
||||||
prev_event_id = delta["prev_event_id"]
|
prev_event_id: Optional[str] = delta["prev_event_id"]
|
||||||
|
|
||||||
logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
|
logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
|
||||||
|
|
||||||
|
@ -297,8 +302,8 @@ class UserDirectoryHandler(StateDeltasHandler):
|
||||||
async def _handle_room_membership_event(
|
async def _handle_room_membership_event(
|
||||||
self,
|
self,
|
||||||
room_id: str,
|
room_id: str,
|
||||||
prev_event_id: str,
|
prev_event_id: Optional[str],
|
||||||
event_id: str,
|
event_id: Optional[str],
|
||||||
state_key: str,
|
state_key: str,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Process a single room membershp event.
|
"""Process a single room membershp event.
|
||||||
|
@ -348,7 +353,8 @@ class UserDirectoryHandler(StateDeltasHandler):
|
||||||
# Handle any profile changes for remote users.
|
# Handle any profile changes for remote users.
|
||||||
# (For local users the rest of the application calls
|
# (For local users the rest of the application calls
|
||||||
# `handle_local_profile_change`.)
|
# `handle_local_profile_change`.)
|
||||||
if is_remote:
|
# Only process if there is an event_id.
|
||||||
|
if is_remote and event_id is not None:
|
||||||
await self._handle_possible_remote_profile_change(
|
await self._handle_possible_remote_profile_change(
|
||||||
state_key, room_id, prev_event_id, event_id
|
state_key, room_id, prev_event_id, event_id
|
||||||
)
|
)
|
||||||
|
@ -356,28 +362,12 @@ class UserDirectoryHandler(StateDeltasHandler):
|
||||||
# This may be the first time we've seen a remote user. If
|
# This may be the first time we've seen a remote user. If
|
||||||
# so, ensure we have a directory entry for them. (For local users,
|
# so, ensure we have a directory entry for them. (For local users,
|
||||||
# the rest of the application calls `handle_local_profile_change`.)
|
# the rest of the application calls `handle_local_profile_change`.)
|
||||||
if is_remote:
|
# Only process if there is an event_id.
|
||||||
await self._upsert_directory_entry_for_remote_user(state_key, event_id)
|
if is_remote and event_id is not None:
|
||||||
await self._track_user_joined_room(room_id, state_key)
|
await self._handle_possible_remote_profile_change(
|
||||||
|
state_key, room_id, None, event_id
|
||||||
async def _upsert_directory_entry_for_remote_user(
|
|
||||||
self, user_id: str, event_id: str
|
|
||||||
) -> None:
|
|
||||||
"""A remote user has just joined a room. Ensure they have an entry in
|
|
||||||
the user directory. The caller is responsible for making sure they're
|
|
||||||
remote.
|
|
||||||
"""
|
|
||||||
event = await self.store.get_event(event_id, allow_none=True)
|
|
||||||
# It isn't expected for this event to not exist, but we
|
|
||||||
# don't want the entire background process to break.
|
|
||||||
if event is None:
|
|
||||||
return
|
|
||||||
|
|
||||||
logger.debug("Adding new user to dir, %r", user_id)
|
|
||||||
|
|
||||||
await self.store.update_profile_in_user_dir(
|
|
||||||
user_id, event.content.get("displayname"), event.content.get("avatar_url")
|
|
||||||
)
|
)
|
||||||
|
await self._track_user_joined_room(room_id, state_key)
|
||||||
|
|
||||||
async def _track_user_joined_room(self, room_id: str, joining_user_id: str) -> None:
|
async def _track_user_joined_room(self, room_id: str, joining_user_id: str) -> None:
|
||||||
"""Someone's just joined a room. Update `users_in_public_rooms` or
|
"""Someone's just joined a room. Update `users_in_public_rooms` or
|
||||||
|
@ -460,14 +450,17 @@ class UserDirectoryHandler(StateDeltasHandler):
|
||||||
user_id: str,
|
user_id: str,
|
||||||
room_id: str,
|
room_id: str,
|
||||||
prev_event_id: Optional[str],
|
prev_event_id: Optional[str],
|
||||||
event_id: Optional[str],
|
event_id: str,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Check member event changes for any profile changes and update the
|
"""Check member event changes for any profile changes and update the
|
||||||
database if there are. This is intended for remote users only. The caller
|
database if there are. This is intended for remote users only. The caller
|
||||||
is responsible for checking that the given user is remote.
|
is responsible for checking that the given user is remote.
|
||||||
"""
|
"""
|
||||||
if not prev_event_id or not event_id:
|
|
||||||
return
|
if not prev_event_id:
|
||||||
|
# If we don't have an older event to fall back on, just fetch the same
|
||||||
|
# event itself.
|
||||||
|
prev_event_id = event_id
|
||||||
|
|
||||||
prev_event = await self.store.get_event(prev_event_id, allow_none=True)
|
prev_event = await self.store.get_event(prev_event_id, allow_none=True)
|
||||||
event = await self.store.get_event(event_id, allow_none=True)
|
event = await self.store.get_event(event_id, allow_none=True)
|
||||||
|
@ -478,17 +471,37 @@ class UserDirectoryHandler(StateDeltasHandler):
|
||||||
if event.membership != Membership.JOIN:
|
if event.membership != Membership.JOIN:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
is_public = await self.store.is_room_world_readable_or_publicly_joinable(
|
||||||
|
room_id
|
||||||
|
)
|
||||||
|
if not is_public:
|
||||||
|
# Don't collect user profiles from private rooms as they are not guaranteed
|
||||||
|
# to be the same as the user's global profile.
|
||||||
|
now_ts = self.clock.time_msec()
|
||||||
|
await self.store.set_remote_user_profile_in_user_dir_stale(
|
||||||
|
user_id,
|
||||||
|
next_try_at_ms=now_ts + USER_DIRECTORY_STALE_REFRESH_TIME_MS,
|
||||||
|
retry_counter=0,
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
prev_name = prev_event.content.get("displayname")
|
prev_name = prev_event.content.get("displayname")
|
||||||
new_name = event.content.get("displayname")
|
new_name = event.content.get("displayname")
|
||||||
# If the new name is an unexpected form, do not update the directory.
|
# If the new name is an unexpected form, replace with None.
|
||||||
if not isinstance(new_name, str):
|
if not isinstance(new_name, str):
|
||||||
new_name = prev_name
|
new_name = None
|
||||||
|
|
||||||
prev_avatar = prev_event.content.get("avatar_url")
|
prev_avatar = prev_event.content.get("avatar_url")
|
||||||
new_avatar = event.content.get("avatar_url")
|
new_avatar = event.content.get("avatar_url")
|
||||||
# If the new avatar is an unexpected form, do not update the directory.
|
# If the new avatar is an unexpected form, replace with None.
|
||||||
if not isinstance(new_avatar, str):
|
if not isinstance(new_avatar, str):
|
||||||
new_avatar = prev_avatar
|
new_avatar = None
|
||||||
|
|
||||||
if prev_name != new_name or prev_avatar != new_avatar:
|
if (
|
||||||
|
prev_name != new_name
|
||||||
|
or prev_avatar != new_avatar
|
||||||
|
or prev_event_id == event_id
|
||||||
|
):
|
||||||
|
# Only update if something has changed, or we didn't have a previous event
|
||||||
|
# in the first place.
|
||||||
await self.store.update_profile_in_user_dir(user_id, new_name, new_avatar)
|
await self.store.update_profile_in_user_dir(user_id, new_name, new_avatar)
|
||||||
|
|
|
@ -54,6 +54,7 @@ from synapse.storage.databases.main.state_deltas import StateDeltasStore
|
||||||
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
|
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
|
||||||
from synapse.types import (
|
from synapse.types import (
|
||||||
JsonDict,
|
JsonDict,
|
||||||
|
UserID,
|
||||||
UserProfile,
|
UserProfile,
|
||||||
get_domain_from_id,
|
get_domain_from_id,
|
||||||
get_localpart_from_id,
|
get_localpart_from_id,
|
||||||
|
@ -473,11 +474,42 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
|
||||||
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
async def set_remote_user_profile_in_user_dir_stale(
|
||||||
|
self, user_id: str, next_try_at_ms: int, retry_counter: int
|
||||||
|
) -> None:
|
||||||
|
"""
|
||||||
|
Marks a remote user as having a possibly-stale user directory profile.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
user_id: the remote user who may have a stale profile on this server.
|
||||||
|
next_try_at_ms: timestamp in ms after which the user directory profile can be
|
||||||
|
refreshed.
|
||||||
|
retry_counter: number of failures in refreshing the profile so far. Used for
|
||||||
|
exponential backoff calculations.
|
||||||
|
"""
|
||||||
|
assert not self.hs.is_mine_id(
|
||||||
|
user_id
|
||||||
|
), "Can't mark a local user as a stale remote user."
|
||||||
|
|
||||||
|
server_name = UserID.from_string(user_id).domain
|
||||||
|
|
||||||
|
await self.db_pool.simple_upsert(
|
||||||
|
table="user_directory_stale_remote_users",
|
||||||
|
keyvalues={"user_id": user_id},
|
||||||
|
values={
|
||||||
|
"next_try_at_ts": next_try_at_ms,
|
||||||
|
"retry_counter": retry_counter,
|
||||||
|
"user_server_name": server_name,
|
||||||
|
},
|
||||||
|
desc="set_remote_user_profile_in_user_dir_stale",
|
||||||
|
)
|
||||||
|
|
||||||
async def update_profile_in_user_dir(
|
async def update_profile_in_user_dir(
|
||||||
self, user_id: str, display_name: Optional[str], avatar_url: Optional[str]
|
self, user_id: str, display_name: Optional[str], avatar_url: Optional[str]
|
||||||
) -> None:
|
) -> None:
|
||||||
"""
|
"""
|
||||||
Update or add a user's profile in the user directory.
|
Update or add a user's profile in the user directory.
|
||||||
|
If the user is remote, the profile will be marked as not stale.
|
||||||
"""
|
"""
|
||||||
# If the display name or avatar URL are unexpected types, replace with None.
|
# If the display name or avatar URL are unexpected types, replace with None.
|
||||||
display_name = non_null_str_or_none(display_name)
|
display_name = non_null_str_or_none(display_name)
|
||||||
|
@ -491,6 +523,14 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
|
||||||
values={"display_name": display_name, "avatar_url": avatar_url},
|
values={"display_name": display_name, "avatar_url": avatar_url},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if not self.hs.is_mine_id(user_id):
|
||||||
|
# Remote users: Make sure the profile is not marked as stale anymore.
|
||||||
|
self.db_pool.simple_delete_txn(
|
||||||
|
txn,
|
||||||
|
table="user_directory_stale_remote_users",
|
||||||
|
keyvalues={"user_id": user_id},
|
||||||
|
)
|
||||||
|
|
||||||
# The display name that goes into the database index.
|
# The display name that goes into the database index.
|
||||||
index_display_name = display_name
|
index_display_name = display_name
|
||||||
if index_display_name is not None:
|
if index_display_name is not None:
|
||||||
|
|
|
@ -0,0 +1,39 @@
|
||||||
|
/* Copyright 2022 The Matrix.org Foundation C.I.C
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
-- Table containing a list of remote users whose profiles may have changed
|
||||||
|
-- since their last update in the user directory.
|
||||||
|
CREATE TABLE user_directory_stale_remote_users (
|
||||||
|
-- The User ID of the remote user whose profile may be stale.
|
||||||
|
user_id TEXT NOT NULL PRIMARY KEY,
|
||||||
|
|
||||||
|
-- The server name of the user.
|
||||||
|
user_server_name TEXT NOT NULL,
|
||||||
|
|
||||||
|
-- The timestamp (in ms) after which we should next try to request the user's
|
||||||
|
-- latest profile.
|
||||||
|
next_try_at_ts BIGINT NOT NULL,
|
||||||
|
|
||||||
|
-- The number of retries so far.
|
||||||
|
-- 0 means we have not yet attempted to refresh the profile.
|
||||||
|
-- Used for calculating exponential backoff.
|
||||||
|
retry_counter INTEGER NOT NULL
|
||||||
|
);
|
||||||
|
|
||||||
|
-- Create an index so we can easily query upcoming servers to try.
|
||||||
|
CREATE INDEX user_directory_stale_remote_users_next_try_idx ON user_directory_stale_remote_users(next_try_at_ts, user_server_name);
|
||||||
|
|
||||||
|
-- Create an index so we can easily query upcoming users to try for a particular server.
|
||||||
|
CREATE INDEX user_directory_stale_remote_users_next_try_by_server_idx ON user_directory_stale_remote_users(user_server_name, next_try_at_ts);
|
Loading…
Reference in a new issue