mirror of
https://mau.dev/maunium/synapse.git
synced 2024-12-14 08:23:47 +01:00
parent
d64b70ada2
commit
034db2ba21
5 changed files with 270 additions and 43 deletions
1
changelog.d/6053.bugfix
Normal file
1
changelog.d/6053.bugfix
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Prevent exceptions being logged when extremity-cleanup events fail due to lack of user consent to the terms of service.
|
|
@ -222,6 +222,13 @@ class MessageHandler(object):
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
# The duration (in ms) after which rooms should be removed
|
||||||
|
# `_rooms_to_exclude_from_dummy_event_insertion` (with the effect that we will try
|
||||||
|
# to generate a dummy event for them once more)
|
||||||
|
#
|
||||||
|
_DUMMY_EVENT_ROOM_EXCLUSION_EXPIRY = 7 * 24 * 60 * 60 * 1000
|
||||||
|
|
||||||
|
|
||||||
class EventCreationHandler(object):
|
class EventCreationHandler(object):
|
||||||
def __init__(self, hs):
|
def __init__(self, hs):
|
||||||
self.hs = hs
|
self.hs = hs
|
||||||
|
@ -258,6 +265,13 @@ class EventCreationHandler(object):
|
||||||
self.config.block_events_without_consent_error
|
self.config.block_events_without_consent_error
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Rooms which should be excluded from dummy insertion. (For instance,
|
||||||
|
# those without local users who can send events into the room).
|
||||||
|
#
|
||||||
|
# map from room id to time-of-last-attempt.
|
||||||
|
#
|
||||||
|
self._rooms_to_exclude_from_dummy_event_insertion = {} # type: dict[str, int]
|
||||||
|
|
||||||
# we need to construct a ConsentURIBuilder here, as it checks that the necessary
|
# we need to construct a ConsentURIBuilder here, as it checks that the necessary
|
||||||
# config options, but *only* if we have a configuration for which we are
|
# config options, but *only* if we have a configuration for which we are
|
||||||
# going to need it.
|
# going to need it.
|
||||||
|
@ -888,9 +902,11 @@ class EventCreationHandler(object):
|
||||||
"""Background task to send dummy events into rooms that have a large
|
"""Background task to send dummy events into rooms that have a large
|
||||||
number of extremities
|
number of extremities
|
||||||
"""
|
"""
|
||||||
|
self._expire_rooms_to_exclude_from_dummy_event_insertion()
|
||||||
room_ids = yield self.store.get_rooms_with_many_extremities(
|
room_ids = yield self.store.get_rooms_with_many_extremities(
|
||||||
min_count=10, limit=5
|
min_count=10,
|
||||||
|
limit=5,
|
||||||
|
room_id_filter=self._rooms_to_exclude_from_dummy_event_insertion.keys(),
|
||||||
)
|
)
|
||||||
|
|
||||||
for room_id in room_ids:
|
for room_id in room_ids:
|
||||||
|
@ -904,32 +920,61 @@ class EventCreationHandler(object):
|
||||||
members = yield self.state.get_current_users_in_room(
|
members = yield self.state.get_current_users_in_room(
|
||||||
room_id, latest_event_ids=latest_event_ids
|
room_id, latest_event_ids=latest_event_ids
|
||||||
)
|
)
|
||||||
|
dummy_event_sent = False
|
||||||
|
for user_id in members:
|
||||||
|
if not self.hs.is_mine_id(user_id):
|
||||||
|
continue
|
||||||
|
requester = create_requester(user_id)
|
||||||
|
try:
|
||||||
|
event, context = yield self.create_event(
|
||||||
|
requester,
|
||||||
|
{
|
||||||
|
"type": "org.matrix.dummy_event",
|
||||||
|
"content": {},
|
||||||
|
"room_id": room_id,
|
||||||
|
"sender": user_id,
|
||||||
|
},
|
||||||
|
prev_events_and_hashes=prev_events_and_hashes,
|
||||||
|
)
|
||||||
|
|
||||||
user_id = None
|
event.internal_metadata.proactively_send = False
|
||||||
for member in members:
|
|
||||||
if self.hs.is_mine_id(member):
|
yield self.send_nonmember_event(
|
||||||
user_id = member
|
requester, event, context, ratelimit=False
|
||||||
|
)
|
||||||
|
dummy_event_sent = True
|
||||||
break
|
break
|
||||||
|
except ConsentNotGivenError:
|
||||||
|
logger.info(
|
||||||
|
"Failed to send dummy event into room %s for user %s due to "
|
||||||
|
"lack of consent. Will try another user" % (room_id, user_id)
|
||||||
|
)
|
||||||
|
except AuthError:
|
||||||
|
logger.info(
|
||||||
|
"Failed to send dummy event into room %s for user %s due to "
|
||||||
|
"lack of power. Will try another user" % (room_id, user_id)
|
||||||
|
)
|
||||||
|
|
||||||
if not user_id:
|
if not dummy_event_sent:
|
||||||
# We don't have a joined user.
|
# Did not find a valid user in the room, so remove from future attempts
|
||||||
# TODO: We should do something here to stop the room from
|
# Exclusion is time limited, so the room will be rechecked in the future
|
||||||
# appearing next time.
|
# dependent on _DUMMY_EVENT_ROOM_EXCLUSION_EXPIRY
|
||||||
continue
|
logger.info(
|
||||||
|
"Failed to send dummy event into room %s. Will exclude it from "
|
||||||
|
"future attempts until cache expires" % (room_id,)
|
||||||
|
)
|
||||||
|
now = self.clock.time_msec()
|
||||||
|
self._rooms_to_exclude_from_dummy_event_insertion[room_id] = now
|
||||||
|
|
||||||
requester = create_requester(user_id)
|
def _expire_rooms_to_exclude_from_dummy_event_insertion(self):
|
||||||
|
expire_before = self.clock.time_msec() - _DUMMY_EVENT_ROOM_EXCLUSION_EXPIRY
|
||||||
event, context = yield self.create_event(
|
to_expire = set()
|
||||||
requester,
|
for room_id, time in self._rooms_to_exclude_from_dummy_event_insertion.items():
|
||||||
{
|
if time < expire_before:
|
||||||
"type": "org.matrix.dummy_event",
|
to_expire.add(room_id)
|
||||||
"content": {},
|
for room_id in to_expire:
|
||||||
"room_id": room_id,
|
logger.debug(
|
||||||
"sender": user_id,
|
"Expiring room id %s from dummy event insertion exclusion cache",
|
||||||
},
|
room_id,
|
||||||
prev_events_and_hashes=prev_events_and_hashes,
|
|
||||||
)
|
)
|
||||||
|
del self._rooms_to_exclude_from_dummy_event_insertion[room_id]
|
||||||
event.internal_metadata.proactively_send = False
|
|
||||||
|
|
||||||
yield self.send_nonmember_event(requester, event, context, ratelimit=False)
|
|
||||||
|
|
|
@ -12,6 +12,7 @@
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
import itertools
|
||||||
import logging
|
import logging
|
||||||
import random
|
import random
|
||||||
|
|
||||||
|
@ -190,12 +191,13 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
|
||||||
room_id,
|
room_id,
|
||||||
)
|
)
|
||||||
|
|
||||||
def get_rooms_with_many_extremities(self, min_count, limit):
|
def get_rooms_with_many_extremities(self, min_count, limit, room_id_filter):
|
||||||
"""Get the top rooms with at least N extremities.
|
"""Get the top rooms with at least N extremities.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
min_count (int): The minimum number of extremities
|
min_count (int): The minimum number of extremities
|
||||||
limit (int): The maximum number of rooms to return.
|
limit (int): The maximum number of rooms to return.
|
||||||
|
room_id_filter (iterable[str]): room_ids to exclude from the results
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Deferred[list]: At most `limit` room IDs that have at least
|
Deferred[list]: At most `limit` room IDs that have at least
|
||||||
|
@ -203,15 +205,25 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def _get_rooms_with_many_extremities_txn(txn):
|
def _get_rooms_with_many_extremities_txn(txn):
|
||||||
|
where_clause = "1=1"
|
||||||
|
if room_id_filter:
|
||||||
|
where_clause = "room_id NOT IN (%s)" % (
|
||||||
|
",".join("?" for _ in room_id_filter),
|
||||||
|
)
|
||||||
|
|
||||||
sql = """
|
sql = """
|
||||||
SELECT room_id FROM event_forward_extremities
|
SELECT room_id FROM event_forward_extremities
|
||||||
|
WHERE %s
|
||||||
GROUP BY room_id
|
GROUP BY room_id
|
||||||
HAVING count(*) > ?
|
HAVING count(*) > ?
|
||||||
ORDER BY count(*) DESC
|
ORDER BY count(*) DESC
|
||||||
LIMIT ?
|
LIMIT ?
|
||||||
"""
|
""" % (
|
||||||
|
where_clause,
|
||||||
|
)
|
||||||
|
|
||||||
txn.execute(sql, (min_count, limit))
|
query_args = list(itertools.chain(room_id_filter, [min_count, limit]))
|
||||||
|
txn.execute(sql, query_args)
|
||||||
return [room_id for room_id, in txn]
|
return [room_id for room_id, in txn]
|
||||||
|
|
||||||
return self.runInteraction(
|
return self.runInteraction(
|
||||||
|
|
|
@ -14,7 +14,13 @@
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
import os.path
|
import os.path
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
from mock import Mock
|
||||||
|
|
||||||
|
import synapse.rest.admin
|
||||||
|
from synapse.api.constants import EventTypes
|
||||||
|
from synapse.rest.client.v1 import login, room
|
||||||
from synapse.storage import prepare_database
|
from synapse.storage import prepare_database
|
||||||
from synapse.types import Requester, UserID
|
from synapse.types import Requester, UserID
|
||||||
|
|
||||||
|
@ -225,6 +231,14 @@ class CleanupExtremBackgroundUpdateStoreTestCase(HomeserverTestCase):
|
||||||
|
|
||||||
|
|
||||||
class CleanupExtremDummyEventsTestCase(HomeserverTestCase):
|
class CleanupExtremDummyEventsTestCase(HomeserverTestCase):
|
||||||
|
CONSENT_VERSION = "1"
|
||||||
|
EXTREMITIES_COUNT = 50
|
||||||
|
servlets = [
|
||||||
|
synapse.rest.admin.register_servlets_for_client_rest_resource,
|
||||||
|
login.register_servlets,
|
||||||
|
room.register_servlets,
|
||||||
|
]
|
||||||
|
|
||||||
def make_homeserver(self, reactor, clock):
|
def make_homeserver(self, reactor, clock):
|
||||||
config = self.default_config()
|
config = self.default_config()
|
||||||
config["cleanup_extremities_with_dummy_events"] = True
|
config["cleanup_extremities_with_dummy_events"] = True
|
||||||
|
@ -233,27 +247,19 @@ class CleanupExtremDummyEventsTestCase(HomeserverTestCase):
|
||||||
def prepare(self, reactor, clock, homeserver):
|
def prepare(self, reactor, clock, homeserver):
|
||||||
self.store = homeserver.get_datastore()
|
self.store = homeserver.get_datastore()
|
||||||
self.room_creator = homeserver.get_room_creation_handler()
|
self.room_creator = homeserver.get_room_creation_handler()
|
||||||
|
self.event_creator_handler = homeserver.get_event_creation_handler()
|
||||||
|
|
||||||
# Create a test user and room
|
# Create a test user and room
|
||||||
self.user = UserID("alice", "test")
|
self.user = UserID.from_string(self.register_user("user1", "password"))
|
||||||
|
self.token1 = self.login("user1", "password")
|
||||||
self.requester = Requester(self.user, None, False, None, None)
|
self.requester = Requester(self.user, None, False, None, None)
|
||||||
info = self.get_success(self.room_creator.create_room(self.requester, {}))
|
info = self.get_success(self.room_creator.create_room(self.requester, {}))
|
||||||
self.room_id = info["room_id"]
|
self.room_id = info["room_id"]
|
||||||
|
self.event_creator = homeserver.get_event_creation_handler()
|
||||||
|
homeserver.config.user_consent_version = self.CONSENT_VERSION
|
||||||
|
|
||||||
def test_send_dummy_event(self):
|
def test_send_dummy_event(self):
|
||||||
# Create a bushy graph with 50 extremities.
|
self._create_extremity_rich_graph()
|
||||||
|
|
||||||
event_id_start = self.create_and_send_event(self.room_id, self.user)
|
|
||||||
|
|
||||||
for _ in range(50):
|
|
||||||
self.create_and_send_event(
|
|
||||||
self.room_id, self.user, prev_event_ids=[event_id_start]
|
|
||||||
)
|
|
||||||
|
|
||||||
latest_event_ids = self.get_success(
|
|
||||||
self.store.get_latest_event_ids_in_room(self.room_id)
|
|
||||||
)
|
|
||||||
self.assertEqual(len(latest_event_ids), 50)
|
|
||||||
|
|
||||||
# Pump the reactor repeatedly so that the background updates have a
|
# Pump the reactor repeatedly so that the background updates have a
|
||||||
# chance to run.
|
# chance to run.
|
||||||
|
@ -263,3 +269,126 @@ class CleanupExtremDummyEventsTestCase(HomeserverTestCase):
|
||||||
self.store.get_latest_event_ids_in_room(self.room_id)
|
self.store.get_latest_event_ids_in_room(self.room_id)
|
||||||
)
|
)
|
||||||
self.assertTrue(len(latest_event_ids) < 10, len(latest_event_ids))
|
self.assertTrue(len(latest_event_ids) < 10, len(latest_event_ids))
|
||||||
|
|
||||||
|
@patch("synapse.handlers.message._DUMMY_EVENT_ROOM_EXCLUSION_EXPIRY", new=0)
|
||||||
|
def test_send_dummy_events_when_insufficient_power(self):
|
||||||
|
self._create_extremity_rich_graph()
|
||||||
|
# Criple power levels
|
||||||
|
self.helper.send_state(
|
||||||
|
self.room_id,
|
||||||
|
EventTypes.PowerLevels,
|
||||||
|
body={"users": {str(self.user): -1}},
|
||||||
|
tok=self.token1,
|
||||||
|
)
|
||||||
|
# Pump the reactor repeatedly so that the background updates have a
|
||||||
|
# chance to run.
|
||||||
|
self.pump(10 * 60)
|
||||||
|
|
||||||
|
latest_event_ids = self.get_success(
|
||||||
|
self.store.get_latest_event_ids_in_room(self.room_id)
|
||||||
|
)
|
||||||
|
# Check that the room has not been pruned
|
||||||
|
self.assertTrue(len(latest_event_ids) > 10)
|
||||||
|
|
||||||
|
# New user with regular levels
|
||||||
|
user2 = self.register_user("user2", "password")
|
||||||
|
token2 = self.login("user2", "password")
|
||||||
|
self.helper.join(self.room_id, user2, tok=token2)
|
||||||
|
self.pump(10 * 60)
|
||||||
|
|
||||||
|
latest_event_ids = self.get_success(
|
||||||
|
self.store.get_latest_event_ids_in_room(self.room_id)
|
||||||
|
)
|
||||||
|
self.assertTrue(len(latest_event_ids) < 10, len(latest_event_ids))
|
||||||
|
|
||||||
|
@patch("synapse.handlers.message._DUMMY_EVENT_ROOM_EXCLUSION_EXPIRY", new=0)
|
||||||
|
def test_send_dummy_event_without_consent(self):
|
||||||
|
self._create_extremity_rich_graph()
|
||||||
|
self._enable_consent_checking()
|
||||||
|
|
||||||
|
# Pump the reactor repeatedly so that the background updates have a
|
||||||
|
# chance to run. Attempt to add dummy event with user that has not consented
|
||||||
|
# Check that dummy event send fails.
|
||||||
|
self.pump(10 * 60)
|
||||||
|
latest_event_ids = self.get_success(
|
||||||
|
self.store.get_latest_event_ids_in_room(self.room_id)
|
||||||
|
)
|
||||||
|
self.assertTrue(len(latest_event_ids) == self.EXTREMITIES_COUNT)
|
||||||
|
|
||||||
|
# Create new user, and add consent
|
||||||
|
user2 = self.register_user("user2", "password")
|
||||||
|
token2 = self.login("user2", "password")
|
||||||
|
self.get_success(
|
||||||
|
self.store.user_set_consent_version(user2, self.CONSENT_VERSION)
|
||||||
|
)
|
||||||
|
self.helper.join(self.room_id, user2, tok=token2)
|
||||||
|
|
||||||
|
# Background updates should now cause a dummy event to be added to the graph
|
||||||
|
self.pump(10 * 60)
|
||||||
|
|
||||||
|
latest_event_ids = self.get_success(
|
||||||
|
self.store.get_latest_event_ids_in_room(self.room_id)
|
||||||
|
)
|
||||||
|
self.assertTrue(len(latest_event_ids) < 10, len(latest_event_ids))
|
||||||
|
|
||||||
|
@patch("synapse.handlers.message._DUMMY_EVENT_ROOM_EXCLUSION_EXPIRY", new=250)
|
||||||
|
def test_expiry_logic(self):
|
||||||
|
"""Simple test to ensure that _expire_rooms_to_exclude_from_dummy_event_insertion()
|
||||||
|
expires old entries correctly.
|
||||||
|
"""
|
||||||
|
self.event_creator_handler._rooms_to_exclude_from_dummy_event_insertion[
|
||||||
|
"1"
|
||||||
|
] = 100000
|
||||||
|
self.event_creator_handler._rooms_to_exclude_from_dummy_event_insertion[
|
||||||
|
"2"
|
||||||
|
] = 200000
|
||||||
|
self.event_creator_handler._rooms_to_exclude_from_dummy_event_insertion[
|
||||||
|
"3"
|
||||||
|
] = 300000
|
||||||
|
self.event_creator_handler._expire_rooms_to_exclude_from_dummy_event_insertion()
|
||||||
|
# All entries within time frame
|
||||||
|
self.assertEqual(
|
||||||
|
len(
|
||||||
|
self.event_creator_handler._rooms_to_exclude_from_dummy_event_insertion
|
||||||
|
),
|
||||||
|
3,
|
||||||
|
)
|
||||||
|
# Oldest room to expire
|
||||||
|
self.pump(1)
|
||||||
|
self.event_creator_handler._expire_rooms_to_exclude_from_dummy_event_insertion()
|
||||||
|
self.assertEqual(
|
||||||
|
len(
|
||||||
|
self.event_creator_handler._rooms_to_exclude_from_dummy_event_insertion
|
||||||
|
),
|
||||||
|
2,
|
||||||
|
)
|
||||||
|
# All rooms to expire
|
||||||
|
self.pump(2)
|
||||||
|
self.assertEqual(
|
||||||
|
len(
|
||||||
|
self.event_creator_handler._rooms_to_exclude_from_dummy_event_insertion
|
||||||
|
),
|
||||||
|
0,
|
||||||
|
)
|
||||||
|
|
||||||
|
def _create_extremity_rich_graph(self):
|
||||||
|
"""Helper method to create bushy graph on demand"""
|
||||||
|
|
||||||
|
event_id_start = self.create_and_send_event(self.room_id, self.user)
|
||||||
|
|
||||||
|
for _ in range(self.EXTREMITIES_COUNT):
|
||||||
|
self.create_and_send_event(
|
||||||
|
self.room_id, self.user, prev_event_ids=[event_id_start]
|
||||||
|
)
|
||||||
|
|
||||||
|
latest_event_ids = self.get_success(
|
||||||
|
self.store.get_latest_event_ids_in_room(self.room_id)
|
||||||
|
)
|
||||||
|
self.assertEqual(len(latest_event_ids), 50)
|
||||||
|
|
||||||
|
def _enable_consent_checking(self):
|
||||||
|
"""Helper method to enable consent checking"""
|
||||||
|
self.event_creator._block_events_without_consent_error = "No consent from user"
|
||||||
|
consent_uri_builder = Mock()
|
||||||
|
consent_uri_builder.build_user_consent_uri.return_value = "http://example.com"
|
||||||
|
self.event_creator._consent_uri_builder = consent_uri_builder
|
||||||
|
|
|
@ -75,3 +75,43 @@ class EventFederationWorkerStoreTestCase(tests.unittest.TestCase):
|
||||||
el = r[i]
|
el = r[i]
|
||||||
depth = el[2]
|
depth = el[2]
|
||||||
self.assertLessEqual(5, depth)
|
self.assertLessEqual(5, depth)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def test_get_rooms_with_many_extremities(self):
|
||||||
|
room1 = "#room1"
|
||||||
|
room2 = "#room2"
|
||||||
|
room3 = "#room3"
|
||||||
|
|
||||||
|
def insert_event(txn, i, room_id):
|
||||||
|
event_id = "$event_%i:local" % i
|
||||||
|
txn.execute(
|
||||||
|
(
|
||||||
|
"INSERT INTO event_forward_extremities (room_id, event_id) "
|
||||||
|
"VALUES (?, ?)"
|
||||||
|
),
|
||||||
|
(room_id, event_id),
|
||||||
|
)
|
||||||
|
|
||||||
|
for i in range(0, 20):
|
||||||
|
yield self.store.runInteraction("insert", insert_event, i, room1)
|
||||||
|
yield self.store.runInteraction("insert", insert_event, i, room2)
|
||||||
|
yield self.store.runInteraction("insert", insert_event, i, room3)
|
||||||
|
|
||||||
|
# Test simple case
|
||||||
|
r = yield self.store.get_rooms_with_many_extremities(5, 5, [])
|
||||||
|
self.assertEqual(len(r), 3)
|
||||||
|
|
||||||
|
# Does filter work?
|
||||||
|
|
||||||
|
r = yield self.store.get_rooms_with_many_extremities(5, 5, [room1])
|
||||||
|
self.assertTrue(room2 in r)
|
||||||
|
self.assertTrue(room3 in r)
|
||||||
|
self.assertEqual(len(r), 2)
|
||||||
|
|
||||||
|
r = yield self.store.get_rooms_with_many_extremities(5, 5, [room1, room2])
|
||||||
|
self.assertEqual(r, [room3])
|
||||||
|
|
||||||
|
# Does filter and limit work?
|
||||||
|
|
||||||
|
r = yield self.store.get_rooms_with_many_extremities(5, 1, [room1])
|
||||||
|
self.assertTrue(r == [room2] or r == [room3])
|
||||||
|
|
Loading…
Reference in a new issue