forked from MirrorHub/synapse
Use the ignored_users table to test event visibility & sync. (#12225)
Instead of fetching the raw account data and re-parsing it. The ignored_users table is a denormalised version of the account data for quick searching.
This commit is contained in:
parent
dea577998f
commit
dda9b7fc4d
6 changed files with 62 additions and 47 deletions
1
changelog.d/12225.misc
Normal file
1
changelog.d/12225.misc
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Use the `ignored_users` table in additional places instead of re-parsing the account data.
|
|
@ -28,7 +28,7 @@ from typing import (
|
||||||
import attr
|
import attr
|
||||||
from prometheus_client import Counter
|
from prometheus_client import Counter
|
||||||
|
|
||||||
from synapse.api.constants import AccountDataTypes, EventTypes, Membership, ReceiptTypes
|
from synapse.api.constants import EventTypes, Membership, ReceiptTypes
|
||||||
from synapse.api.filtering import FilterCollection
|
from synapse.api.filtering import FilterCollection
|
||||||
from synapse.api.presence import UserPresenceState
|
from synapse.api.presence import UserPresenceState
|
||||||
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
|
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
|
||||||
|
@ -1601,7 +1601,7 @@ class SyncHandler:
|
||||||
return set(), set(), set(), set()
|
return set(), set(), set(), set()
|
||||||
|
|
||||||
# 3. Work out which rooms need reporting in the sync response.
|
# 3. Work out which rooms need reporting in the sync response.
|
||||||
ignored_users = await self._get_ignored_users(user_id)
|
ignored_users = await self.store.ignored_users(user_id)
|
||||||
if since_token:
|
if since_token:
|
||||||
room_changes = await self._get_rooms_changed(
|
room_changes = await self._get_rooms_changed(
|
||||||
sync_result_builder, ignored_users
|
sync_result_builder, ignored_users
|
||||||
|
@ -1627,7 +1627,6 @@ class SyncHandler:
|
||||||
logger.debug("Generating room entry for %s", room_entry.room_id)
|
logger.debug("Generating room entry for %s", room_entry.room_id)
|
||||||
await self._generate_room_entry(
|
await self._generate_room_entry(
|
||||||
sync_result_builder,
|
sync_result_builder,
|
||||||
ignored_users,
|
|
||||||
room_entry,
|
room_entry,
|
||||||
ephemeral=ephemeral_by_room.get(room_entry.room_id, []),
|
ephemeral=ephemeral_by_room.get(room_entry.room_id, []),
|
||||||
tags=tags_by_room.get(room_entry.room_id),
|
tags=tags_by_room.get(room_entry.room_id),
|
||||||
|
@ -1657,29 +1656,6 @@ class SyncHandler:
|
||||||
newly_left_users,
|
newly_left_users,
|
||||||
)
|
)
|
||||||
|
|
||||||
async def _get_ignored_users(self, user_id: str) -> FrozenSet[str]:
|
|
||||||
"""Retrieve the users ignored by the given user from their global account_data.
|
|
||||||
|
|
||||||
Returns an empty set if
|
|
||||||
- there is no global account_data entry for ignored_users
|
|
||||||
- there is such an entry, but it's not a JSON object.
|
|
||||||
"""
|
|
||||||
# TODO: Can we `SELECT ignored_user_id FROM ignored_users WHERE ignorer_user_id=?;` instead?
|
|
||||||
ignored_account_data = (
|
|
||||||
await self.store.get_global_account_data_by_type_for_user(
|
|
||||||
user_id=user_id, data_type=AccountDataTypes.IGNORED_USER_LIST
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
# If there is ignored users account data and it matches the proper type,
|
|
||||||
# then use it.
|
|
||||||
ignored_users: FrozenSet[str] = frozenset()
|
|
||||||
if ignored_account_data:
|
|
||||||
ignored_users_data = ignored_account_data.get("ignored_users", {})
|
|
||||||
if isinstance(ignored_users_data, dict):
|
|
||||||
ignored_users = frozenset(ignored_users_data.keys())
|
|
||||||
return ignored_users
|
|
||||||
|
|
||||||
async def _have_rooms_changed(
|
async def _have_rooms_changed(
|
||||||
self, sync_result_builder: "SyncResultBuilder"
|
self, sync_result_builder: "SyncResultBuilder"
|
||||||
) -> bool:
|
) -> bool:
|
||||||
|
@ -2022,7 +1998,6 @@ class SyncHandler:
|
||||||
async def _generate_room_entry(
|
async def _generate_room_entry(
|
||||||
self,
|
self,
|
||||||
sync_result_builder: "SyncResultBuilder",
|
sync_result_builder: "SyncResultBuilder",
|
||||||
ignored_users: FrozenSet[str],
|
|
||||||
room_builder: "RoomSyncResultBuilder",
|
room_builder: "RoomSyncResultBuilder",
|
||||||
ephemeral: List[JsonDict],
|
ephemeral: List[JsonDict],
|
||||||
tags: Optional[Dict[str, Dict[str, Any]]],
|
tags: Optional[Dict[str, Dict[str, Any]]],
|
||||||
|
@ -2051,7 +2026,6 @@ class SyncHandler:
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
sync_result_builder
|
sync_result_builder
|
||||||
ignored_users: Set of users ignored by user.
|
|
||||||
room_builder
|
room_builder
|
||||||
ephemeral: List of new ephemeral events for room
|
ephemeral: List of new ephemeral events for room
|
||||||
tags: List of *all* tags for room, or None if there has been
|
tags: List of *all* tags for room, or None if there has been
|
||||||
|
|
|
@ -213,7 +213,7 @@ class BulkPushRuleEvaluator:
|
||||||
if not event.is_state():
|
if not event.is_state():
|
||||||
ignorers = await self.store.ignored_by(event.sender)
|
ignorers = await self.store.ignored_by(event.sender)
|
||||||
else:
|
else:
|
||||||
ignorers = set()
|
ignorers = frozenset()
|
||||||
|
|
||||||
for uid, rules in rules_by_user.items():
|
for uid, rules in rules_by_user.items():
|
||||||
if event.sender == uid:
|
if event.sender == uid:
|
||||||
|
|
|
@ -14,7 +14,17 @@
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Set, Tuple, cast
|
from typing import (
|
||||||
|
TYPE_CHECKING,
|
||||||
|
Any,
|
||||||
|
Dict,
|
||||||
|
FrozenSet,
|
||||||
|
Iterable,
|
||||||
|
List,
|
||||||
|
Optional,
|
||||||
|
Tuple,
|
||||||
|
cast,
|
||||||
|
)
|
||||||
|
|
||||||
from synapse.api.constants import AccountDataTypes
|
from synapse.api.constants import AccountDataTypes
|
||||||
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
|
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
|
||||||
|
@ -365,7 +375,7 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
|
||||||
)
|
)
|
||||||
|
|
||||||
@cached(max_entries=5000, iterable=True)
|
@cached(max_entries=5000, iterable=True)
|
||||||
async def ignored_by(self, user_id: str) -> Set[str]:
|
async def ignored_by(self, user_id: str) -> FrozenSet[str]:
|
||||||
"""
|
"""
|
||||||
Get users which ignore the given user.
|
Get users which ignore the given user.
|
||||||
|
|
||||||
|
@ -375,7 +385,7 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
|
||||||
Return:
|
Return:
|
||||||
The user IDs which ignore the given user.
|
The user IDs which ignore the given user.
|
||||||
"""
|
"""
|
||||||
return set(
|
return frozenset(
|
||||||
await self.db_pool.simple_select_onecol(
|
await self.db_pool.simple_select_onecol(
|
||||||
table="ignored_users",
|
table="ignored_users",
|
||||||
keyvalues={"ignored_user_id": user_id},
|
keyvalues={"ignored_user_id": user_id},
|
||||||
|
@ -384,6 +394,26 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@cached(max_entries=5000, iterable=True)
|
||||||
|
async def ignored_users(self, user_id: str) -> FrozenSet[str]:
|
||||||
|
"""
|
||||||
|
Get users which the given user ignores.
|
||||||
|
|
||||||
|
Params:
|
||||||
|
user_id: The user ID which is making the request.
|
||||||
|
|
||||||
|
Return:
|
||||||
|
The user IDs which are ignored by the given user.
|
||||||
|
"""
|
||||||
|
return frozenset(
|
||||||
|
await self.db_pool.simple_select_onecol(
|
||||||
|
table="ignored_users",
|
||||||
|
keyvalues={"ignorer_user_id": user_id},
|
||||||
|
retcol="ignored_user_id",
|
||||||
|
desc="ignored_users",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
def process_replication_rows(
|
def process_replication_rows(
|
||||||
self,
|
self,
|
||||||
stream_name: str,
|
stream_name: str,
|
||||||
|
@ -529,6 +559,10 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
|
||||||
else:
|
else:
|
||||||
currently_ignored_users = set()
|
currently_ignored_users = set()
|
||||||
|
|
||||||
|
# If the data has not changed, nothing to do.
|
||||||
|
if previously_ignored_users == currently_ignored_users:
|
||||||
|
return
|
||||||
|
|
||||||
# Delete entries which are no longer ignored.
|
# Delete entries which are no longer ignored.
|
||||||
self.db_pool.simple_delete_many_txn(
|
self.db_pool.simple_delete_many_txn(
|
||||||
txn,
|
txn,
|
||||||
|
@ -551,6 +585,7 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
|
||||||
# Invalidate the cache for any ignored users which were added or removed.
|
# Invalidate the cache for any ignored users which were added or removed.
|
||||||
for ignored_user_id in previously_ignored_users ^ currently_ignored_users:
|
for ignored_user_id in previously_ignored_users ^ currently_ignored_users:
|
||||||
self._invalidate_cache_and_stream(txn, self.ignored_by, (ignored_user_id,))
|
self._invalidate_cache_and_stream(txn, self.ignored_by, (ignored_user_id,))
|
||||||
|
self._invalidate_cache_and_stream(txn, self.ignored_users, (user_id,))
|
||||||
|
|
||||||
async def purge_account_data_for_user(self, user_id: str) -> None:
|
async def purge_account_data_for_user(self, user_id: str) -> None:
|
||||||
"""
|
"""
|
||||||
|
|
|
@ -14,12 +14,7 @@
|
||||||
import logging
|
import logging
|
||||||
from typing import Dict, FrozenSet, List, Optional
|
from typing import Dict, FrozenSet, List, Optional
|
||||||
|
|
||||||
from synapse.api.constants import (
|
from synapse.api.constants import EventTypes, HistoryVisibility, Membership
|
||||||
AccountDataTypes,
|
|
||||||
EventTypes,
|
|
||||||
HistoryVisibility,
|
|
||||||
Membership,
|
|
||||||
)
|
|
||||||
from synapse.events import EventBase
|
from synapse.events import EventBase
|
||||||
from synapse.events.utils import prune_event
|
from synapse.events.utils import prune_event
|
||||||
from synapse.storage import Storage
|
from synapse.storage import Storage
|
||||||
|
@ -87,15 +82,8 @@ async def filter_events_for_client(
|
||||||
state_filter=StateFilter.from_types(types),
|
state_filter=StateFilter.from_types(types),
|
||||||
)
|
)
|
||||||
|
|
||||||
ignore_dict_content = await storage.main.get_global_account_data_by_type_for_user(
|
# Get the users who are ignored by the requesting user.
|
||||||
user_id, AccountDataTypes.IGNORED_USER_LIST
|
ignore_list = await storage.main.ignored_users(user_id)
|
||||||
)
|
|
||||||
|
|
||||||
ignore_list: FrozenSet[str] = frozenset()
|
|
||||||
if ignore_dict_content:
|
|
||||||
ignored_users_dict = ignore_dict_content.get("ignored_users", {})
|
|
||||||
if isinstance(ignored_users_dict, dict):
|
|
||||||
ignore_list = frozenset(ignored_users_dict.keys())
|
|
||||||
|
|
||||||
erased_senders = await storage.main.are_users_erased(e.sender for e in events)
|
erased_senders = await storage.main.are_users_erased(e.sender for e in events)
|
||||||
|
|
||||||
|
|
|
@ -47,9 +47,18 @@ class IgnoredUsersTestCase(unittest.HomeserverTestCase):
|
||||||
expected_ignorer_user_ids,
|
expected_ignorer_user_ids,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def assert_ignored(
|
||||||
|
self, ignorer_user_id: str, expected_ignored_user_ids: Set[str]
|
||||||
|
) -> None:
|
||||||
|
self.assertEqual(
|
||||||
|
self.get_success(self.store.ignored_users(ignorer_user_id)),
|
||||||
|
expected_ignored_user_ids,
|
||||||
|
)
|
||||||
|
|
||||||
def test_ignoring_users(self):
|
def test_ignoring_users(self):
|
||||||
"""Basic adding/removing of users from the ignore list."""
|
"""Basic adding/removing of users from the ignore list."""
|
||||||
self._update_ignore_list("@other:test", "@another:remote")
|
self._update_ignore_list("@other:test", "@another:remote")
|
||||||
|
self.assert_ignored(self.user, {"@other:test", "@another:remote"})
|
||||||
|
|
||||||
# Check a user which no one ignores.
|
# Check a user which no one ignores.
|
||||||
self.assert_ignorers("@user:test", set())
|
self.assert_ignorers("@user:test", set())
|
||||||
|
@ -62,6 +71,7 @@ class IgnoredUsersTestCase(unittest.HomeserverTestCase):
|
||||||
|
|
||||||
# Add one user, remove one user, and leave one user.
|
# Add one user, remove one user, and leave one user.
|
||||||
self._update_ignore_list("@foo:test", "@another:remote")
|
self._update_ignore_list("@foo:test", "@another:remote")
|
||||||
|
self.assert_ignored(self.user, {"@foo:test", "@another:remote"})
|
||||||
|
|
||||||
# Check the removed user.
|
# Check the removed user.
|
||||||
self.assert_ignorers("@other:test", set())
|
self.assert_ignorers("@other:test", set())
|
||||||
|
@ -76,20 +86,24 @@ class IgnoredUsersTestCase(unittest.HomeserverTestCase):
|
||||||
"""Ensure that caching works properly between different users."""
|
"""Ensure that caching works properly between different users."""
|
||||||
# The first user ignores a user.
|
# The first user ignores a user.
|
||||||
self._update_ignore_list("@other:test")
|
self._update_ignore_list("@other:test")
|
||||||
|
self.assert_ignored(self.user, {"@other:test"})
|
||||||
self.assert_ignorers("@other:test", {self.user})
|
self.assert_ignorers("@other:test", {self.user})
|
||||||
|
|
||||||
# The second user ignores them.
|
# The second user ignores them.
|
||||||
self._update_ignore_list("@other:test", ignorer_user_id="@second:test")
|
self._update_ignore_list("@other:test", ignorer_user_id="@second:test")
|
||||||
|
self.assert_ignored("@second:test", {"@other:test"})
|
||||||
self.assert_ignorers("@other:test", {self.user, "@second:test"})
|
self.assert_ignorers("@other:test", {self.user, "@second:test"})
|
||||||
|
|
||||||
# The first user un-ignores them.
|
# The first user un-ignores them.
|
||||||
self._update_ignore_list()
|
self._update_ignore_list()
|
||||||
|
self.assert_ignored(self.user, set())
|
||||||
self.assert_ignorers("@other:test", {"@second:test"})
|
self.assert_ignorers("@other:test", {"@second:test"})
|
||||||
|
|
||||||
def test_invalid_data(self):
|
def test_invalid_data(self):
|
||||||
"""Invalid data ends up clearing out the ignored users list."""
|
"""Invalid data ends up clearing out the ignored users list."""
|
||||||
# Add some data and ensure it is there.
|
# Add some data and ensure it is there.
|
||||||
self._update_ignore_list("@other:test")
|
self._update_ignore_list("@other:test")
|
||||||
|
self.assert_ignored(self.user, {"@other:test"})
|
||||||
self.assert_ignorers("@other:test", {self.user})
|
self.assert_ignorers("@other:test", {self.user})
|
||||||
|
|
||||||
# No ignored_users key.
|
# No ignored_users key.
|
||||||
|
@ -102,10 +116,12 @@ class IgnoredUsersTestCase(unittest.HomeserverTestCase):
|
||||||
)
|
)
|
||||||
|
|
||||||
# No one ignores the user now.
|
# No one ignores the user now.
|
||||||
|
self.assert_ignored(self.user, set())
|
||||||
self.assert_ignorers("@other:test", set())
|
self.assert_ignorers("@other:test", set())
|
||||||
|
|
||||||
# Add some data and ensure it is there.
|
# Add some data and ensure it is there.
|
||||||
self._update_ignore_list("@other:test")
|
self._update_ignore_list("@other:test")
|
||||||
|
self.assert_ignored(self.user, {"@other:test"})
|
||||||
self.assert_ignorers("@other:test", {self.user})
|
self.assert_ignorers("@other:test", {self.user})
|
||||||
|
|
||||||
# Invalid data.
|
# Invalid data.
|
||||||
|
@ -118,4 +134,5 @@ class IgnoredUsersTestCase(unittest.HomeserverTestCase):
|
||||||
)
|
)
|
||||||
|
|
||||||
# No one ignores the user now.
|
# No one ignores the user now.
|
||||||
|
self.assert_ignored(self.user, set())
|
||||||
self.assert_ignorers("@other:test", set())
|
self.assert_ignorers("@other:test", set())
|
||||||
|
|
Loading…
Reference in a new issue