forked from MirrorHub/synapse
Prune old typing notifications (#15332)
Rather than keeping them around forever in memory, slowing things down. Fixes #11750.
This commit is contained in:
parent
4fc85e5a92
commit
96f163d932
2 changed files with 26 additions and 0 deletions
1
changelog.d/15332.bugfix
Normal file
1
changelog.d/15332.bugfix
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Fix bug in worker mode where on a rolling restart of workers the "typing" worker would consume 100% CPU until it got restarted.
|
|
@ -52,6 +52,11 @@ FEDERATION_TIMEOUT = 60 * 1000
|
||||||
FEDERATION_PING_INTERVAL = 40 * 1000
|
FEDERATION_PING_INTERVAL = 40 * 1000
|
||||||
|
|
||||||
|
|
||||||
|
# How long to remember a typing notification happened in a room before
|
||||||
|
# forgetting about it.
|
||||||
|
FORGET_TIMEOUT = 10 * 60 * 1000
|
||||||
|
|
||||||
|
|
||||||
class FollowerTypingHandler:
|
class FollowerTypingHandler:
|
||||||
"""A typing handler on a different process than the writer that is updated
|
"""A typing handler on a different process than the writer that is updated
|
||||||
via replication.
|
via replication.
|
||||||
|
@ -83,7 +88,10 @@ class FollowerTypingHandler:
|
||||||
self.wheel_timer: WheelTimer[RoomMember] = WheelTimer(bucket_size=5000)
|
self.wheel_timer: WheelTimer[RoomMember] = WheelTimer(bucket_size=5000)
|
||||||
self._latest_room_serial = 0
|
self._latest_room_serial = 0
|
||||||
|
|
||||||
|
self._rooms_updated: Set[str] = set()
|
||||||
|
|
||||||
self.clock.looping_call(self._handle_timeouts, 5000)
|
self.clock.looping_call(self._handle_timeouts, 5000)
|
||||||
|
self.clock.looping_call(self._prune_old_typing, FORGET_TIMEOUT)
|
||||||
|
|
||||||
def _reset(self) -> None:
|
def _reset(self) -> None:
|
||||||
"""Reset the typing handler's data caches."""
|
"""Reset the typing handler's data caches."""
|
||||||
|
@ -92,6 +100,8 @@ class FollowerTypingHandler:
|
||||||
# map room IDs to sets of users currently typing
|
# map room IDs to sets of users currently typing
|
||||||
self._room_typing = {}
|
self._room_typing = {}
|
||||||
|
|
||||||
|
self._rooms_updated = set()
|
||||||
|
|
||||||
self._member_last_federation_poke = {}
|
self._member_last_federation_poke = {}
|
||||||
self.wheel_timer = WheelTimer(bucket_size=5000)
|
self.wheel_timer = WheelTimer(bucket_size=5000)
|
||||||
|
|
||||||
|
@ -178,6 +188,7 @@ class FollowerTypingHandler:
|
||||||
prev_typing = self._room_typing.get(row.room_id, set())
|
prev_typing = self._room_typing.get(row.room_id, set())
|
||||||
now_typing = set(row.user_ids)
|
now_typing = set(row.user_ids)
|
||||||
self._room_typing[row.room_id] = now_typing
|
self._room_typing[row.room_id] = now_typing
|
||||||
|
self._rooms_updated.add(row.room_id)
|
||||||
|
|
||||||
if self.federation:
|
if self.federation:
|
||||||
run_as_background_process(
|
run_as_background_process(
|
||||||
|
@ -209,6 +220,19 @@ class FollowerTypingHandler:
|
||||||
def get_current_token(self) -> int:
|
def get_current_token(self) -> int:
|
||||||
return self._latest_room_serial
|
return self._latest_room_serial
|
||||||
|
|
||||||
|
def _prune_old_typing(self) -> None:
|
||||||
|
"""Prune rooms that haven't seen typing updates since last time.
|
||||||
|
|
||||||
|
This is safe to do as clients should time out old typing notifications.
|
||||||
|
"""
|
||||||
|
stale_rooms = self._room_serials.keys() - self._rooms_updated
|
||||||
|
|
||||||
|
for room_id in stale_rooms:
|
||||||
|
self._room_serials.pop(room_id, None)
|
||||||
|
self._room_typing.pop(room_id, None)
|
||||||
|
|
||||||
|
self._rooms_updated = set()
|
||||||
|
|
||||||
|
|
||||||
class TypingWriterHandler(FollowerTypingHandler):
|
class TypingWriterHandler(FollowerTypingHandler):
|
||||||
def __init__(self, hs: "HomeServer"):
|
def __init__(self, hs: "HomeServer"):
|
||||||
|
@ -388,6 +412,7 @@ class TypingWriterHandler(FollowerTypingHandler):
|
||||||
self._typing_stream_change_cache.entity_has_changed(
|
self._typing_stream_change_cache.entity_has_changed(
|
||||||
member.room_id, self._latest_room_serial
|
member.room_id, self._latest_room_serial
|
||||||
)
|
)
|
||||||
|
self._rooms_updated.add(member.room_id)
|
||||||
|
|
||||||
self.notifier.on_new_event(
|
self.notifier.on_new_event(
|
||||||
StreamKeyType.TYPING, self._latest_room_serial, rooms=[member.room_id]
|
StreamKeyType.TYPING, self._latest_room_serial, rooms=[member.room_id]
|
||||||
|
|
Loading…
Reference in a new issue