Add experimental option to reduce extremities.

Adds new config option `cleanup_extremities_with_dummy_events` which
periodically sends dummy events to rooms with more than 10 extremities.

THIS IS REALLY EXPERIMENTAL.
This commit is contained in:
Erik Johnston 2019-06-17 18:04:42 +01:00
parent 6840ebeef8
commit b42f90470f
6 changed files with 162 additions and 1 deletions

View file

@ -317,6 +317,12 @@ class ServerConfig(Config):
_check_resource_config(self.listeners) _check_resource_config(self.listeners)
# An experimental option to try and periodically clean up extremities
# by sending dummy events.
self.cleanup_extremities_with_dummy_events = config.get(
"cleanup_extremities_with_dummy_events", False,
)
def has_tls_listener(self): def has_tls_listener(self):
return any(l["tls"] for l in self.listeners) return any(l["tls"] for l in self.listeners)

View file

@ -92,6 +92,18 @@ class _EventInternalMetadata(object):
""" """
return getattr(self, "soft_failed", False) return getattr(self, "soft_failed", False)
def should_proactively_send(self):
"""Whether the eventm, if ours, should be sent to other clients and
servers.
This is used for sending dummy events internally. Servers and clients
can still explicitly fetch the event.
Returns:
bool
"""
return getattr(self, "proactively_send", True)
def _event_dict_property(key): def _event_dict_property(key):
# We want to be able to use hasattr with the event dict properties. # We want to be able to use hasattr with the event dict properties.

View file

@ -168,6 +168,9 @@ class FederationSender(object):
if not is_mine and send_on_behalf_of is None: if not is_mine and send_on_behalf_of is None:
return return
if not event.internal_metadata.should_proactively_send():
return
try: try:
# Get the state from before the event. # Get the state from before the event.
# We need to make sure that this is the state from before # We need to make sure that this is the state from before

View file

@ -36,7 +36,7 @@ from synapse.api.urls import ConsentURIBuilder
from synapse.events.validator import EventValidator from synapse.events.validator import EventValidator
from synapse.replication.http.send_event import ReplicationSendEventRestServlet from synapse.replication.http.send_event import ReplicationSendEventRestServlet
from synapse.storage.state import StateFilter from synapse.storage.state import StateFilter
from synapse.types import RoomAlias, UserID from synapse.types import RoomAlias, UserID, create_requester
from synapse.util.async_helpers import Linearizer from synapse.util.async_helpers import Linearizer
from synapse.util.frozenutils import frozendict_json_encoder from synapse.util.frozenutils import frozendict_json_encoder
from synapse.util.logcontext import run_in_background from synapse.util.logcontext import run_in_background
@ -261,6 +261,16 @@ class EventCreationHandler(object):
if self._block_events_without_consent_error: if self._block_events_without_consent_error:
self._consent_uri_builder = ConsentURIBuilder(self.config) self._consent_uri_builder = ConsentURIBuilder(self.config)
if (
not self.config.worker_app
and self.config.cleanup_extremities_with_dummy_events
):
# XXX: Send dummy events.
self.clock.looping_call(
self._send_dummy_events_to_fill_extremities,
5 * 60 * 1000,
)
@defer.inlineCallbacks @defer.inlineCallbacks
def create_event(self, requester, event_dict, token_id=None, txn_id=None, def create_event(self, requester, event_dict, token_id=None, txn_id=None,
prev_events_and_hashes=None, require_consent=True): prev_events_and_hashes=None, require_consent=True):
@ -874,3 +884,63 @@ class EventCreationHandler(object):
yield presence.bump_presence_active_time(user) yield presence.bump_presence_active_time(user)
except Exception: except Exception:
logger.exception("Error bumping presence active time") logger.exception("Error bumping presence active time")
@defer.inlineCallbacks
def _send_dummy_events_to_fill_extremities(self):
"""Background task to send dummy events into rooms that have a large
number of extremities
"""
room_ids = yield self.store.get_rooms_with_many_extremities(
min_count=10, limit=5,
)
for room_id in room_ids:
# For each room we need to find a joined member we can use to send
# the dummy event with.
prev_events_and_hashes = yield self.store.get_prev_events_for_room(
room_id,
)
latest_event_ids = (
event_id for (event_id, _, _) in prev_events_and_hashes
)
members = yield self.state.get_current_users_in_room(
room_id, latest_event_ids=latest_event_ids,
)
user_id = None
for member in members:
if self.hs.is_mine_id(member):
user_id = member
break
if not user_id:
# We don't have a joined user.
# TODO: We should do something here to stop the room from
# appearing next time.
continue
requester = create_requester(user_id)
event, context = yield self.create_event(
requester,
{
"type": "org.matrix.dummy_event",
"content": {},
"room_id": room_id,
"sender": user_id,
},
prev_events_and_hashes=prev_events_and_hashes,
)
event.internal_metadata.proactively_send = False
yield self.send_nonmember_event(
requester,
event,
context,
ratelimit=False,
)

View file

@ -190,6 +190,35 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
room_id, room_id,
) )
def get_rooms_with_many_extremities(self, min_count, limit):
"""Get the top rooms with at least N extremities.
Args:
min_count (int): The minimum number of extremities
limit (int): The maximum number of rooms to return.
Returns:
Deferred[list]: At most `limit` room IDs that have at least
`min_count` extremities, sorted by extremity count.
"""
def _get_rooms_with_many_extremities_txn(txn):
sql = """
SELECT room_id FROM event_forward_extremities
GROUP BY room_id
HAVING count(*) > ?
ORDER BY count(*) DESC
LIMIT ?
"""
txn.execute(sql, (min_count, limit))
return [room_id for room_id, in txn]
return self.runInteraction(
"get_rooms_with_many_extremities",
_get_rooms_with_many_extremities_txn,
)
@cached(max_entries=5000, iterable=True) @cached(max_entries=5000, iterable=True)
def get_latest_event_ids_in_room(self, room_id): def get_latest_event_ids_in_room(self, room_id):
return self._simple_select_onecol( return self._simple_select_onecol(

View file

@ -222,3 +222,44 @@ class CleanupExtremBackgroundUpdateStoreTestCase(HomeserverTestCase):
self.store.get_latest_event_ids_in_room(self.room_id) self.store.get_latest_event_ids_in_room(self.room_id)
) )
self.assertEqual(set(latest_event_ids), set([event_id_b, event_id_c])) self.assertEqual(set(latest_event_ids), set([event_id_b, event_id_c]))
class CleanupExtremDummyEventsTestCase(HomeserverTestCase):
def make_homeserver(self, reactor, clock):
config = self.default_config()
config["cleanup_extremities_with_dummy_events"] = True
return self.setup_test_homeserver(config=config)
def prepare(self, reactor, clock, homeserver):
self.store = homeserver.get_datastore()
self.room_creator = homeserver.get_room_creation_handler()
# Create a test user and room
self.user = UserID("alice", "test")
self.requester = Requester(self.user, None, False, None, None)
info = self.get_success(self.room_creator.create_room(self.requester, {}))
self.room_id = info["room_id"]
def test_send_dummy_event(self):
# Create a bushy graph with 50 extremities.
event_id_start = self.create_and_send_event(self.room_id, self.user)
for _ in range(50):
self.create_and_send_event(
self.room_id, self.user, prev_event_ids=[event_id_start]
)
latest_event_ids = self.get_success(
self.store.get_latest_event_ids_in_room(self.room_id)
)
self.assertEqual(len(latest_event_ids), 50)
# Bump the reacto repeatedly so that the background updates have a
# chance to run.
self.pump(10 * 60)
latest_event_ids = self.get_success(
self.store.get_latest_event_ids_in_room(self.room_id)
)
self.assertTrue(len(latest_event_ids) < 10, len(latest_event_ids))