mirror of
https://mau.dev/maunium/synapse.git
synced 2024-11-13 21:41:30 +01:00
Bound ephemeral events by key (#12544)
Co-authored-by: Brad Murray <bradtgmurray@gmail.com> Co-authored-by: Andrew Morgan <andrewm@element.io>
This commit is contained in:
parent
9986621bc8
commit
63ba9ba38b
4 changed files with 87 additions and 4 deletions
1
changelog.d/12544.bugfix
Normal file
1
changelog.d/12544.bugfix
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Fix a bug where attempting to send a large amount of read receipts to an application service all at once would result in duplicate content and abnormally high memory usage. Contributed by Brad & Nick @ Beeper.
|
|
@ -416,7 +416,7 @@ class ApplicationServicesHandler:
|
||||||
return typing
|
return typing
|
||||||
|
|
||||||
async def _handle_receipts(
|
async def _handle_receipts(
|
||||||
self, service: ApplicationService, new_token: Optional[int]
|
self, service: ApplicationService, new_token: int
|
||||||
) -> List[JsonDict]:
|
) -> List[JsonDict]:
|
||||||
"""
|
"""
|
||||||
Return the latest read receipts that the given application service should receive.
|
Return the latest read receipts that the given application service should receive.
|
||||||
|
@ -447,7 +447,7 @@ class ApplicationServicesHandler:
|
||||||
|
|
||||||
receipts_source = self.event_sources.sources.receipt
|
receipts_source = self.event_sources.sources.receipt
|
||||||
receipts, _ = await receipts_source.get_new_events_as(
|
receipts, _ = await receipts_source.get_new_events_as(
|
||||||
service=service, from_key=from_key
|
service=service, from_key=from_key, to_key=new_token
|
||||||
)
|
)
|
||||||
return receipts
|
return receipts
|
||||||
|
|
||||||
|
|
|
@ -239,13 +239,14 @@ class ReceiptEventSource(EventSource[int, JsonDict]):
|
||||||
return events, to_key
|
return events, to_key
|
||||||
|
|
||||||
async def get_new_events_as(
|
async def get_new_events_as(
|
||||||
self, from_key: int, service: ApplicationService
|
self, from_key: int, to_key: int, service: ApplicationService
|
||||||
) -> Tuple[List[JsonDict], int]:
|
) -> Tuple[List[JsonDict], int]:
|
||||||
"""Returns a set of new read receipt events that an appservice
|
"""Returns a set of new read receipt events that an appservice
|
||||||
may be interested in.
|
may be interested in.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
from_key: the stream position at which events should be fetched from
|
from_key: the stream position at which events should be fetched from
|
||||||
|
to_key: the stream position up to which events should be fetched to
|
||||||
service: The appservice which may be interested
|
service: The appservice which may be interested
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
|
@ -255,7 +256,6 @@ class ReceiptEventSource(EventSource[int, JsonDict]):
|
||||||
* The current read receipt stream token.
|
* The current read receipt stream token.
|
||||||
"""
|
"""
|
||||||
from_key = int(from_key)
|
from_key = int(from_key)
|
||||||
to_key = self.get_current_key()
|
|
||||||
|
|
||||||
if from_key == to_key:
|
if from_key == to_key:
|
||||||
return [], to_key
|
return [], to_key
|
||||||
|
|
|
@ -411,6 +411,88 @@ class ApplicationServicesHandlerSendEventsTestCase(unittest.HomeserverTestCase):
|
||||||
"exclusive_as_user", "password", self.exclusive_as_user_device_id
|
"exclusive_as_user", "password", self.exclusive_as_user_device_id
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def test_sending_read_receipt_batches_to_application_services(self):
|
||||||
|
"""Tests that a large batch of read receipts are sent correctly to
|
||||||
|
interested application services.
|
||||||
|
"""
|
||||||
|
# Register an application service that's interested in a certain user
|
||||||
|
# and room prefix
|
||||||
|
interested_appservice = self._register_application_service(
|
||||||
|
namespaces={
|
||||||
|
ApplicationService.NS_USERS: [
|
||||||
|
{
|
||||||
|
"regex": "@exclusive_as_user:.+",
|
||||||
|
"exclusive": True,
|
||||||
|
}
|
||||||
|
],
|
||||||
|
ApplicationService.NS_ROOMS: [
|
||||||
|
{
|
||||||
|
"regex": "!fakeroom_.*",
|
||||||
|
"exclusive": True,
|
||||||
|
}
|
||||||
|
],
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
# "Complete" a transaction.
|
||||||
|
# All this really does for us is make an entry in the application_services_state
|
||||||
|
# database table, which tracks the current stream_token per stream ID per AS.
|
||||||
|
self.get_success(
|
||||||
|
self.hs.get_datastores().main.complete_appservice_txn(
|
||||||
|
0,
|
||||||
|
interested_appservice,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Now, pretend that we receive a large burst of read receipts (300 total) that
|
||||||
|
# all come in at once.
|
||||||
|
for i in range(300):
|
||||||
|
self.get_success(
|
||||||
|
# Insert a fake read receipt into the database
|
||||||
|
self.hs.get_datastores().main.insert_receipt(
|
||||||
|
# We have to use unique room ID + user ID combinations here, as the db query
|
||||||
|
# is an upsert.
|
||||||
|
room_id=f"!fakeroom_{i}:test",
|
||||||
|
receipt_type="m.read",
|
||||||
|
user_id=self.local_user,
|
||||||
|
event_ids=[f"$eventid_{i}"],
|
||||||
|
data={},
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Now notify the appservice handler that 300 read receipts have all arrived
|
||||||
|
# at once. What will it do!
|
||||||
|
# note: stream tokens start at 2
|
||||||
|
for stream_token in range(2, 303):
|
||||||
|
self.get_success(
|
||||||
|
self.hs.get_application_service_handler()._notify_interested_services_ephemeral(
|
||||||
|
services=[interested_appservice],
|
||||||
|
stream_key="receipt_key",
|
||||||
|
new_token=stream_token,
|
||||||
|
users=[self.exclusive_as_user],
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Using our txn send mock, we can see what the AS received. After iterating over every
|
||||||
|
# transaction, we'd like to see all 300 read receipts accounted for.
|
||||||
|
# No more, no less.
|
||||||
|
all_ephemeral_events = []
|
||||||
|
for call in self.send_mock.call_args_list:
|
||||||
|
ephemeral_events = call[0][2]
|
||||||
|
all_ephemeral_events += ephemeral_events
|
||||||
|
|
||||||
|
# Ensure that no duplicate events were sent
|
||||||
|
self.assertEqual(len(all_ephemeral_events), 300)
|
||||||
|
|
||||||
|
# Check that the ephemeral event is a read receipt with the expected structure
|
||||||
|
latest_read_receipt = all_ephemeral_events[-1]
|
||||||
|
self.assertEqual(latest_read_receipt["type"], "m.receipt")
|
||||||
|
|
||||||
|
event_id = list(latest_read_receipt["content"].keys())[0]
|
||||||
|
self.assertEqual(
|
||||||
|
latest_read_receipt["content"][event_id]["m.read"], {self.local_user: {}}
|
||||||
|
)
|
||||||
|
|
||||||
@unittest.override_config(
|
@unittest.override_config(
|
||||||
{"experimental_features": {"msc2409_to_device_messages_enabled": True}}
|
{"experimental_features": {"msc2409_to_device_messages_enabled": True}}
|
||||||
)
|
)
|
||||||
|
|
Loading…
Reference in a new issue