Limit AS transactions to 100 events (#8606)

* Limit AS transactions to 100 events

* Update changelog.d/8606.feature

Co-authored-by: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com>

* Add tests

* Update synapse/appservice/scheduler.py

Co-authored-by: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com>
This commit is contained in:
Will Hunt 2020-10-21 15:36:53 +01:00 committed by GitHub
parent 20a67aa70d
commit 70259d8c8c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 58 additions and 2 deletions

1
changelog.d/8606.feature Normal file
View file

@ -0,0 +1 @@
Limit appservice transactions to 100 persistent and 100 ephemeral events.

View file

@ -60,6 +60,13 @@ from synapse.types import JsonDict
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Maximum number of events to provide in an AS transaction.
MAX_PERSISTENT_EVENTS_PER_TRANSACTION = 100
# Maximum number of ephemeral events to provide in an AS transaction.
MAX_EPHEMERAL_EVENTS_PER_TRANSACTION = 100
class ApplicationServiceScheduler: class ApplicationServiceScheduler:
""" Public facing API for this module. Does the required DI to tie the """ Public facing API for this module. Does the required DI to tie the
components together. This also serves as the "event_pool", which in this components together. This also serves as the "event_pool", which in this
@ -136,10 +143,17 @@ class _ServiceQueuer:
self.requests_in_flight.add(service.id) self.requests_in_flight.add(service.id)
try: try:
while True: while True:
events = self.queued_events.pop(service.id, []) all_events = self.queued_events.get(service.id, [])
ephemeral = self.queued_ephemeral.pop(service.id, []) events = all_events[:MAX_PERSISTENT_EVENTS_PER_TRANSACTION]
del all_events[:MAX_PERSISTENT_EVENTS_PER_TRANSACTION]
all_events_ephemeral = self.queued_ephemeral.get(service.id, [])
ephemeral = all_events_ephemeral[:MAX_EPHEMERAL_EVENTS_PER_TRANSACTION]
del all_events_ephemeral[:MAX_EPHEMERAL_EVENTS_PER_TRANSACTION]
if not events and not ephemeral: if not events and not ephemeral:
return return
try: try:
await self.txn_ctrl.send(service, events, ephemeral) await self.txn_ctrl.send(service, events, ephemeral)
except Exception: except Exception:

View file

@ -260,6 +260,31 @@ class ApplicationServiceSchedulerQueuerTestCase(unittest.TestCase):
self.txn_ctrl.send.assert_called_with(srv2, [srv_2_event2], []) self.txn_ctrl.send.assert_called_with(srv2, [srv_2_event2], [])
self.assertEquals(3, self.txn_ctrl.send.call_count) self.assertEquals(3, self.txn_ctrl.send.call_count)
def test_send_large_txns(self):
srv_1_defer = defer.Deferred()
srv_2_defer = defer.Deferred()
send_return_list = [srv_1_defer, srv_2_defer]
def do_send(x, y, z):
return make_deferred_yieldable(send_return_list.pop(0))
self.txn_ctrl.send = Mock(side_effect=do_send)
service = Mock(id=4, name="service")
event_list = [Mock(name="event%i" % (i + 1)) for i in range(200)]
for event in event_list:
self.queuer.enqueue_event(service, event)
# Expect the first event to be sent immediately.
self.txn_ctrl.send.assert_called_with(service, [event_list[0]], [])
srv_1_defer.callback(service)
# Then send the next 100 events
self.txn_ctrl.send.assert_called_with(service, event_list[1:101], [])
srv_2_defer.callback(service)
# Then the final 99 events
self.txn_ctrl.send.assert_called_with(service, event_list[101:], [])
self.assertEquals(3, self.txn_ctrl.send.call_count)
def test_send_single_ephemeral_no_queue(self): def test_send_single_ephemeral_no_queue(self):
# Expect the event to be sent immediately. # Expect the event to be sent immediately.
service = Mock(id=4, name="service") service = Mock(id=4, name="service")
@ -296,3 +321,19 @@ class ApplicationServiceSchedulerQueuerTestCase(unittest.TestCase):
# Expect the queued events to be sent # Expect the queued events to be sent
self.txn_ctrl.send.assert_called_with(service, [], event_list_2 + event_list_3) self.txn_ctrl.send.assert_called_with(service, [], event_list_2 + event_list_3)
self.assertEquals(2, self.txn_ctrl.send.call_count) self.assertEquals(2, self.txn_ctrl.send.call_count)
def test_send_large_txns_ephemeral(self):
d = defer.Deferred()
self.txn_ctrl.send = Mock(
side_effect=lambda x, y, z: make_deferred_yieldable(d)
)
# Expect the event to be sent immediately.
service = Mock(id=4, name="service")
first_chunk = [Mock(name="event%i" % (i + 1)) for i in range(100)]
second_chunk = [Mock(name="event%i" % (i + 101)) for i in range(50)]
event_list = first_chunk + second_chunk
self.queuer.enqueue_ephemeral(service, event_list)
self.txn_ctrl.send.assert_called_once_with(service, [], first_chunk)
d.callback(service)
self.txn_ctrl.send.assert_called_with(service, [], second_chunk)
self.assertEquals(2, self.txn_ctrl.send.call_count)