Improve code documentation for the typing stream over replication. (#12211)

This commit is contained in:
reivilibre 2022-03-11 14:00:15 +00:00 committed by GitHub
parent 735e89bd3a
commit 4a53f35737
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 20 additions and 6 deletions

1
changelog.d/12211.misc Normal file
View file

@ -0,0 +1 @@
Improve code documentation for the typing stream over replication.

View file

@ -160,8 +160,9 @@ class FollowerTypingHandler:
"""Should be called whenever we receive updates for typing stream.""" """Should be called whenever we receive updates for typing stream."""
if self._latest_room_serial > token: if self._latest_room_serial > token:
# The master has gone backwards. To prevent inconsistent data, just # The typing worker has gone backwards (e.g. it may have restarted).
# clear everything. # To prevent inconsistent data, just clear everything.
logger.info("Typing handler stream went backwards; resetting")
self._reset() self._reset()
# Set the latest serial token to whatever the server gave us. # Set the latest serial token to whatever the server gave us.

View file

@ -709,7 +709,7 @@ class ReplicationCommandHandler:
self.send_command(RemoteServerUpCommand(server)) self.send_command(RemoteServerUpCommand(server))
def stream_update(self, stream_name: str, token: Optional[int], data: Any) -> None: def stream_update(self, stream_name: str, token: Optional[int], data: Any) -> None:
"""Called when a new update is available to stream to clients. """Called when a new update is available to stream to Redis subscribers.
We need to check if the client is interested in the stream or not We need to check if the client is interested in the stream or not
""" """

View file

@ -67,8 +67,8 @@ class ReplicationStreamProtocolFactory(ServerFactory):
class ReplicationStreamer: class ReplicationStreamer:
"""Handles replication connections. """Handles replication connections.
This needs to be poked when new replication data may be available. When new This needs to be poked when new replication data may be available.
data is available it will propagate to all connected clients. When new data is available it will propagate to all Redis subscribers.
""" """
def __init__(self, hs: "HomeServer"): def __init__(self, hs: "HomeServer"):
@ -109,7 +109,7 @@ class ReplicationStreamer:
def on_notifier_poke(self) -> None: def on_notifier_poke(self) -> None:
"""Checks if there is actually any new data and sends it to the """Checks if there is actually any new data and sends it to the
connections if there are. Redis subscribers if there are.
This should get called each time new data is available, even if it This should get called each time new data is available, even if it
is currently being executed, so that nothing gets missed is currently being executed, so that nothing gets missed

View file

@ -316,7 +316,19 @@ class PresenceFederationStream(Stream):
class TypingStream(Stream): class TypingStream(Stream):
@attr.s(slots=True, frozen=True, auto_attribs=True) @attr.s(slots=True, frozen=True, auto_attribs=True)
class TypingStreamRow: class TypingStreamRow:
"""
An entry in the typing stream.
Describes all the users that are 'typing' right now in one room.
When a user stops typing, it will be streamed as a new update with that
user absent; you can think of the `user_ids` list as overwriting the
entire list that was there previously.
"""
# The room that this update is for.
room_id: str room_id: str
# All the users that are 'typing' right now in the specified room.
user_ids: List[str] user_ids: List[str]
NAME = "typing" NAME = "typing"