forked from MirrorHub/synapse
Remove remaining pieces of groups code. (#12966)
* Remove an unused stream ID generator. * Remove the now unused remote profile cache.
This commit is contained in:
parent
44de53bb79
commit
f7baffd8ec
8 changed files with 6 additions and 209 deletions
1
changelog.d/12966.removal
Normal file
1
changelog.d/12966.removal
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Remove support for the non-standard groups/communities feature from Synapse.
|
|
@ -182,6 +182,7 @@ IGNORED_TABLES = {
|
||||||
"groups",
|
"groups",
|
||||||
"local_group_membership",
|
"local_group_membership",
|
||||||
"local_group_updates",
|
"local_group_updates",
|
||||||
|
"remote_profile_cache",
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -23,14 +23,7 @@ from synapse.api.errors import (
|
||||||
StoreError,
|
StoreError,
|
||||||
SynapseError,
|
SynapseError,
|
||||||
)
|
)
|
||||||
from synapse.metrics.background_process_metrics import wrap_as_background_process
|
from synapse.types import JsonDict, Requester, UserID, create_requester
|
||||||
from synapse.types import (
|
|
||||||
JsonDict,
|
|
||||||
Requester,
|
|
||||||
UserID,
|
|
||||||
create_requester,
|
|
||||||
get_domain_from_id,
|
|
||||||
)
|
|
||||||
from synapse.util.caches.descriptors import cached
|
from synapse.util.caches.descriptors import cached
|
||||||
from synapse.util.stringutils import parse_and_validate_mxc_uri
|
from synapse.util.stringutils import parse_and_validate_mxc_uri
|
||||||
|
|
||||||
|
@ -50,9 +43,6 @@ class ProfileHandler:
|
||||||
delegate to master when necessary.
|
delegate to master when necessary.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
PROFILE_UPDATE_MS = 60 * 1000
|
|
||||||
PROFILE_UPDATE_EVERY_MS = 24 * 60 * 60 * 1000
|
|
||||||
|
|
||||||
def __init__(self, hs: "HomeServer"):
|
def __init__(self, hs: "HomeServer"):
|
||||||
self.store = hs.get_datastores().main
|
self.store = hs.get_datastores().main
|
||||||
self.clock = hs.get_clock()
|
self.clock = hs.get_clock()
|
||||||
|
@ -73,11 +63,6 @@ class ProfileHandler:
|
||||||
|
|
||||||
self._third_party_rules = hs.get_third_party_event_rules()
|
self._third_party_rules = hs.get_third_party_event_rules()
|
||||||
|
|
||||||
if hs.config.worker.run_background_tasks:
|
|
||||||
self.clock.looping_call(
|
|
||||||
self._update_remote_profile_cache, self.PROFILE_UPDATE_MS
|
|
||||||
)
|
|
||||||
|
|
||||||
async def get_profile(self, user_id: str) -> JsonDict:
|
async def get_profile(self, user_id: str) -> JsonDict:
|
||||||
target_user = UserID.from_string(user_id)
|
target_user = UserID.from_string(user_id)
|
||||||
|
|
||||||
|
@ -116,30 +101,6 @@ class ProfileHandler:
|
||||||
raise SynapseError(502, "Failed to fetch profile")
|
raise SynapseError(502, "Failed to fetch profile")
|
||||||
raise e.to_synapse_error()
|
raise e.to_synapse_error()
|
||||||
|
|
||||||
async def get_profile_from_cache(self, user_id: str) -> JsonDict:
|
|
||||||
"""Get the profile information from our local cache. If the user is
|
|
||||||
ours then the profile information will always be correct. Otherwise,
|
|
||||||
it may be out of date/missing.
|
|
||||||
"""
|
|
||||||
target_user = UserID.from_string(user_id)
|
|
||||||
if self.hs.is_mine(target_user):
|
|
||||||
try:
|
|
||||||
displayname = await self.store.get_profile_displayname(
|
|
||||||
target_user.localpart
|
|
||||||
)
|
|
||||||
avatar_url = await self.store.get_profile_avatar_url(
|
|
||||||
target_user.localpart
|
|
||||||
)
|
|
||||||
except StoreError as e:
|
|
||||||
if e.code == 404:
|
|
||||||
raise SynapseError(404, "Profile was not found", Codes.NOT_FOUND)
|
|
||||||
raise
|
|
||||||
|
|
||||||
return {"displayname": displayname, "avatar_url": avatar_url}
|
|
||||||
else:
|
|
||||||
profile = await self.store.get_from_remote_profile_cache(user_id)
|
|
||||||
return profile or {}
|
|
||||||
|
|
||||||
async def get_displayname(self, target_user: UserID) -> Optional[str]:
|
async def get_displayname(self, target_user: UserID) -> Optional[str]:
|
||||||
if self.hs.is_mine(target_user):
|
if self.hs.is_mine(target_user):
|
||||||
try:
|
try:
|
||||||
|
@ -509,45 +470,3 @@ class ProfileHandler:
|
||||||
# so we act as if we couldn't find the profile.
|
# so we act as if we couldn't find the profile.
|
||||||
raise SynapseError(403, "Profile isn't available", Codes.FORBIDDEN)
|
raise SynapseError(403, "Profile isn't available", Codes.FORBIDDEN)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
@wrap_as_background_process("Update remote profile")
|
|
||||||
async def _update_remote_profile_cache(self) -> None:
|
|
||||||
"""Called periodically to check profiles of remote users we haven't
|
|
||||||
checked in a while.
|
|
||||||
"""
|
|
||||||
entries = await self.store.get_remote_profile_cache_entries_that_expire(
|
|
||||||
last_checked=self.clock.time_msec() - self.PROFILE_UPDATE_EVERY_MS
|
|
||||||
)
|
|
||||||
|
|
||||||
for user_id, displayname, avatar_url in entries:
|
|
||||||
is_subscribed = await self.store.is_subscribed_remote_profile_for_user(
|
|
||||||
user_id
|
|
||||||
)
|
|
||||||
if not is_subscribed:
|
|
||||||
await self.store.maybe_delete_remote_profile_cache(user_id)
|
|
||||||
continue
|
|
||||||
|
|
||||||
try:
|
|
||||||
profile = await self.federation.make_query(
|
|
||||||
destination=get_domain_from_id(user_id),
|
|
||||||
query_type="profile",
|
|
||||||
args={"user_id": user_id},
|
|
||||||
ignore_backoff=True,
|
|
||||||
)
|
|
||||||
except Exception:
|
|
||||||
logger.exception("Failed to get avatar_url")
|
|
||||||
|
|
||||||
await self.store.update_remote_profile_cache(
|
|
||||||
user_id, displayname, avatar_url
|
|
||||||
)
|
|
||||||
continue
|
|
||||||
|
|
||||||
new_name = profile.get("displayname")
|
|
||||||
if not isinstance(new_name, str):
|
|
||||||
new_name = None
|
|
||||||
new_avatar = profile.get("avatar_url")
|
|
||||||
if not isinstance(new_avatar, str):
|
|
||||||
new_avatar = None
|
|
||||||
|
|
||||||
# We always hit update to update the last_check timestamp
|
|
||||||
await self.store.update_remote_profile_cache(user_id, new_name, new_avatar)
|
|
||||||
|
|
|
@ -151,10 +151,6 @@ class DataStore(
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
self._group_updates_id_gen = StreamIdGenerator(
|
|
||||||
db_conn, "local_group_updates", "stream_id"
|
|
||||||
)
|
|
||||||
|
|
||||||
self._cache_id_gen: Optional[MultiWriterIdGenerator]
|
self._cache_id_gen: Optional[MultiWriterIdGenerator]
|
||||||
if isinstance(self.database_engine, PostgresEngine):
|
if isinstance(self.database_engine, PostgresEngine):
|
||||||
# We set the `writers` to an empty list here as we don't care about
|
# We set the `writers` to an empty list here as we don't care about
|
||||||
|
@ -197,20 +193,6 @@ class DataStore(
|
||||||
prefilled_cache=curr_state_delta_prefill,
|
prefilled_cache=curr_state_delta_prefill,
|
||||||
)
|
)
|
||||||
|
|
||||||
_group_updates_prefill, min_group_updates_id = self.db_pool.get_cache_dict(
|
|
||||||
db_conn,
|
|
||||||
"local_group_updates",
|
|
||||||
entity_column="user_id",
|
|
||||||
stream_column="stream_id",
|
|
||||||
max_value=self._group_updates_id_gen.get_current_token(),
|
|
||||||
limit=1000,
|
|
||||||
)
|
|
||||||
self._group_updates_stream_cache = StreamChangeCache(
|
|
||||||
"_group_updates_stream_cache",
|
|
||||||
min_group_updates_id,
|
|
||||||
prefilled_cache=_group_updates_prefill,
|
|
||||||
)
|
|
||||||
|
|
||||||
self._stream_order_on_start = self.get_room_max_stream_ordering()
|
self._stream_order_on_start = self.get_room_max_stream_ordering()
|
||||||
self._min_stream_order_on_start = self.get_room_min_stream_ordering()
|
self._min_stream_order_on_start = self.get_room_min_stream_ordering()
|
||||||
|
|
||||||
|
|
|
@ -11,11 +11,10 @@
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
from typing import Any, Dict, List, Optional
|
from typing import Optional
|
||||||
|
|
||||||
from synapse.api.errors import StoreError
|
from synapse.api.errors import StoreError
|
||||||
from synapse.storage._base import SQLBaseStore
|
from synapse.storage._base import SQLBaseStore
|
||||||
from synapse.storage.database import LoggingTransaction
|
|
||||||
from synapse.storage.databases.main.roommember import ProfileInfo
|
from synapse.storage.databases.main.roommember import ProfileInfo
|
||||||
|
|
||||||
|
|
||||||
|
@ -55,17 +54,6 @@ class ProfileWorkerStore(SQLBaseStore):
|
||||||
desc="get_profile_avatar_url",
|
desc="get_profile_avatar_url",
|
||||||
)
|
)
|
||||||
|
|
||||||
async def get_from_remote_profile_cache(
|
|
||||||
self, user_id: str
|
|
||||||
) -> Optional[Dict[str, Any]]:
|
|
||||||
return await self.db_pool.simple_select_one(
|
|
||||||
table="remote_profile_cache",
|
|
||||||
keyvalues={"user_id": user_id},
|
|
||||||
retcols=("displayname", "avatar_url"),
|
|
||||||
allow_none=True,
|
|
||||||
desc="get_from_remote_profile_cache",
|
|
||||||
)
|
|
||||||
|
|
||||||
async def create_profile(self, user_localpart: str) -> None:
|
async def create_profile(self, user_localpart: str) -> None:
|
||||||
await self.db_pool.simple_insert(
|
await self.db_pool.simple_insert(
|
||||||
table="profiles", values={"user_id": user_localpart}, desc="create_profile"
|
table="profiles", values={"user_id": user_localpart}, desc="create_profile"
|
||||||
|
@ -91,97 +79,6 @@ class ProfileWorkerStore(SQLBaseStore):
|
||||||
desc="set_profile_avatar_url",
|
desc="set_profile_avatar_url",
|
||||||
)
|
)
|
||||||
|
|
||||||
async def update_remote_profile_cache(
|
|
||||||
self, user_id: str, displayname: Optional[str], avatar_url: Optional[str]
|
|
||||||
) -> int:
|
|
||||||
return await self.db_pool.simple_update(
|
|
||||||
table="remote_profile_cache",
|
|
||||||
keyvalues={"user_id": user_id},
|
|
||||||
updatevalues={
|
|
||||||
"displayname": displayname,
|
|
||||||
"avatar_url": avatar_url,
|
|
||||||
"last_check": self._clock.time_msec(),
|
|
||||||
},
|
|
||||||
desc="update_remote_profile_cache",
|
|
||||||
)
|
|
||||||
|
|
||||||
async def maybe_delete_remote_profile_cache(self, user_id: str) -> None:
|
|
||||||
"""Check if we still care about the remote user's profile, and if we
|
|
||||||
don't then remove their profile from the cache
|
|
||||||
"""
|
|
||||||
subscribed = await self.is_subscribed_remote_profile_for_user(user_id)
|
|
||||||
if not subscribed:
|
|
||||||
await self.db_pool.simple_delete(
|
|
||||||
table="remote_profile_cache",
|
|
||||||
keyvalues={"user_id": user_id},
|
|
||||||
desc="delete_remote_profile_cache",
|
|
||||||
)
|
|
||||||
|
|
||||||
async def is_subscribed_remote_profile_for_user(self, user_id: str) -> bool:
|
|
||||||
"""Check whether we are interested in a remote user's profile."""
|
|
||||||
res: Optional[str] = await self.db_pool.simple_select_one_onecol(
|
|
||||||
table="group_users",
|
|
||||||
keyvalues={"user_id": user_id},
|
|
||||||
retcol="user_id",
|
|
||||||
allow_none=True,
|
|
||||||
desc="should_update_remote_profile_cache_for_user",
|
|
||||||
)
|
|
||||||
|
|
||||||
if res:
|
|
||||||
return True
|
|
||||||
|
|
||||||
res = await self.db_pool.simple_select_one_onecol(
|
|
||||||
table="group_invites",
|
|
||||||
keyvalues={"user_id": user_id},
|
|
||||||
retcol="user_id",
|
|
||||||
allow_none=True,
|
|
||||||
desc="should_update_remote_profile_cache_for_user",
|
|
||||||
)
|
|
||||||
|
|
||||||
if res:
|
|
||||||
return True
|
|
||||||
return False
|
|
||||||
|
|
||||||
async def get_remote_profile_cache_entries_that_expire(
|
|
||||||
self, last_checked: int
|
|
||||||
) -> List[Dict[str, str]]:
|
|
||||||
"""Get all users who haven't been checked since `last_checked`"""
|
|
||||||
|
|
||||||
def _get_remote_profile_cache_entries_that_expire_txn(
|
|
||||||
txn: LoggingTransaction,
|
|
||||||
) -> List[Dict[str, str]]:
|
|
||||||
sql = """
|
|
||||||
SELECT user_id, displayname, avatar_url
|
|
||||||
FROM remote_profile_cache
|
|
||||||
WHERE last_check < ?
|
|
||||||
"""
|
|
||||||
|
|
||||||
txn.execute(sql, (last_checked,))
|
|
||||||
|
|
||||||
return self.db_pool.cursor_to_dict(txn)
|
|
||||||
|
|
||||||
return await self.db_pool.runInteraction(
|
|
||||||
"get_remote_profile_cache_entries_that_expire",
|
|
||||||
_get_remote_profile_cache_entries_that_expire_txn,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class ProfileStore(ProfileWorkerStore):
|
class ProfileStore(ProfileWorkerStore):
|
||||||
async def add_remote_profile_cache(
|
pass
|
||||||
self, user_id: str, displayname: str, avatar_url: str
|
|
||||||
) -> None:
|
|
||||||
"""Ensure we are caching the remote user's profiles.
|
|
||||||
|
|
||||||
This should only be called when `is_subscribed_remote_profile_for_user`
|
|
||||||
would return true for the user.
|
|
||||||
"""
|
|
||||||
await self.db_pool.simple_upsert(
|
|
||||||
table="remote_profile_cache",
|
|
||||||
keyvalues={"user_id": user_id},
|
|
||||||
values={
|
|
||||||
"displayname": displayname,
|
|
||||||
"avatar_url": avatar_url,
|
|
||||||
"last_check": self._clock.time_msec(),
|
|
||||||
},
|
|
||||||
desc="add_remote_profile_cache",
|
|
||||||
)
|
|
||||||
|
|
|
@ -393,7 +393,6 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
|
||||||
"partial_state_events",
|
"partial_state_events",
|
||||||
"events",
|
"events",
|
||||||
"federation_inbound_events_staging",
|
"federation_inbound_events_staging",
|
||||||
"group_rooms",
|
|
||||||
"local_current_membership",
|
"local_current_membership",
|
||||||
"partial_state_rooms_servers",
|
"partial_state_rooms_servers",
|
||||||
"partial_state_rooms",
|
"partial_state_rooms",
|
||||||
|
@ -413,7 +412,6 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
|
||||||
"e2e_room_keys",
|
"e2e_room_keys",
|
||||||
"event_push_summary",
|
"event_push_summary",
|
||||||
"pusher_throttle",
|
"pusher_throttle",
|
||||||
"group_summary_rooms",
|
|
||||||
"room_account_data",
|
"room_account_data",
|
||||||
"room_tags",
|
"room_tags",
|
||||||
# "rooms" happens last, to keep the foreign keys in the other tables
|
# "rooms" happens last, to keep the foreign keys in the other tables
|
||||||
|
|
|
@ -70,6 +70,7 @@ Changes in SCHEMA_VERSION = 70:
|
||||||
|
|
||||||
Changes in SCHEMA_VERSION = 71:
|
Changes in SCHEMA_VERSION = 71:
|
||||||
- event_edges.room_id is no longer read from.
|
- event_edges.room_id is no longer read from.
|
||||||
|
- Tables related to groups are no longer accessed.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -2467,7 +2467,6 @@ PURGE_TABLES = [
|
||||||
"event_push_actions",
|
"event_push_actions",
|
||||||
"event_search",
|
"event_search",
|
||||||
"events",
|
"events",
|
||||||
"group_rooms",
|
|
||||||
"receipts_graph",
|
"receipts_graph",
|
||||||
"receipts_linearized",
|
"receipts_linearized",
|
||||||
"room_aliases",
|
"room_aliases",
|
||||||
|
@ -2484,7 +2483,6 @@ PURGE_TABLES = [
|
||||||
"e2e_room_keys",
|
"e2e_room_keys",
|
||||||
"event_push_summary",
|
"event_push_summary",
|
||||||
"pusher_throttle",
|
"pusher_throttle",
|
||||||
"group_summary_rooms",
|
|
||||||
"room_account_data",
|
"room_account_data",
|
||||||
"room_tags",
|
"room_tags",
|
||||||
# "state_groups", # Current impl leaves orphaned state groups around.
|
# "state_groups", # Current impl leaves orphaned state groups around.
|
||||||
|
|
Loading…
Reference in a new issue