forked from MirrorHub/synapse
Don't send normal presence updates over federation replication stream (#9828)
This commit is contained in:
parent
c571736c6c
commit
2b7dd21655
5 changed files with 75 additions and 183 deletions
1
changelog.d/9828.feature
Normal file
1
changelog.d/9828.feature
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Add experimental support for handling presence on a worker.
|
|
@ -76,9 +76,6 @@ class FederationRemoteSendQueue(AbstractFederationSender):
|
||||||
# Pending presence map user_id -> UserPresenceState
|
# Pending presence map user_id -> UserPresenceState
|
||||||
self.presence_map = {} # type: Dict[str, UserPresenceState]
|
self.presence_map = {} # type: Dict[str, UserPresenceState]
|
||||||
|
|
||||||
# Stream position -> list[user_id]
|
|
||||||
self.presence_changed = SortedDict() # type: SortedDict[int, List[str]]
|
|
||||||
|
|
||||||
# Stores the destinations we need to explicitly send presence to about a
|
# Stores the destinations we need to explicitly send presence to about a
|
||||||
# given user.
|
# given user.
|
||||||
# Stream position -> (user_id, destinations)
|
# Stream position -> (user_id, destinations)
|
||||||
|
@ -96,7 +93,7 @@ class FederationRemoteSendQueue(AbstractFederationSender):
|
||||||
|
|
||||||
self.edus = SortedDict() # type: SortedDict[int, Edu]
|
self.edus = SortedDict() # type: SortedDict[int, Edu]
|
||||||
|
|
||||||
# stream ID for the next entry into presence_changed/keyed_edu_changed/edus.
|
# stream ID for the next entry into keyed_edu_changed/edus.
|
||||||
self.pos = 1
|
self.pos = 1
|
||||||
|
|
||||||
# map from stream ID to the time that stream entry was generated, so that we
|
# map from stream ID to the time that stream entry was generated, so that we
|
||||||
|
@ -117,7 +114,6 @@ class FederationRemoteSendQueue(AbstractFederationSender):
|
||||||
|
|
||||||
for queue_name in [
|
for queue_name in [
|
||||||
"presence_map",
|
"presence_map",
|
||||||
"presence_changed",
|
|
||||||
"keyed_edu",
|
"keyed_edu",
|
||||||
"keyed_edu_changed",
|
"keyed_edu_changed",
|
||||||
"edus",
|
"edus",
|
||||||
|
@ -155,23 +151,12 @@ class FederationRemoteSendQueue(AbstractFederationSender):
|
||||||
"""Clear all the queues from before a given position"""
|
"""Clear all the queues from before a given position"""
|
||||||
with Measure(self.clock, "send_queue._clear"):
|
with Measure(self.clock, "send_queue._clear"):
|
||||||
# Delete things out of presence maps
|
# Delete things out of presence maps
|
||||||
keys = self.presence_changed.keys()
|
|
||||||
i = self.presence_changed.bisect_left(position_to_delete)
|
|
||||||
for key in keys[:i]:
|
|
||||||
del self.presence_changed[key]
|
|
||||||
|
|
||||||
user_ids = {
|
|
||||||
user_id for uids in self.presence_changed.values() for user_id in uids
|
|
||||||
}
|
|
||||||
|
|
||||||
keys = self.presence_destinations.keys()
|
keys = self.presence_destinations.keys()
|
||||||
i = self.presence_destinations.bisect_left(position_to_delete)
|
i = self.presence_destinations.bisect_left(position_to_delete)
|
||||||
for key in keys[:i]:
|
for key in keys[:i]:
|
||||||
del self.presence_destinations[key]
|
del self.presence_destinations[key]
|
||||||
|
|
||||||
user_ids.update(
|
user_ids = {user_id for user_id, _ in self.presence_destinations.values()}
|
||||||
user_id for user_id, _ in self.presence_destinations.values()
|
|
||||||
)
|
|
||||||
|
|
||||||
to_del = [
|
to_del = [
|
||||||
user_id for user_id in self.presence_map if user_id not in user_ids
|
user_id for user_id in self.presence_map if user_id not in user_ids
|
||||||
|
@ -244,23 +229,6 @@ class FederationRemoteSendQueue(AbstractFederationSender):
|
||||||
"""
|
"""
|
||||||
# nothing to do here: the replication listener will handle it.
|
# nothing to do here: the replication listener will handle it.
|
||||||
|
|
||||||
def send_presence(self, states: List[UserPresenceState]) -> None:
|
|
||||||
"""As per FederationSender
|
|
||||||
|
|
||||||
Args:
|
|
||||||
states
|
|
||||||
"""
|
|
||||||
pos = self._next_pos()
|
|
||||||
|
|
||||||
# We only want to send presence for our own users, so lets always just
|
|
||||||
# filter here just in case.
|
|
||||||
local_states = [s for s in states if self.is_mine_id(s.user_id)]
|
|
||||||
|
|
||||||
self.presence_map.update({state.user_id: state for state in local_states})
|
|
||||||
self.presence_changed[pos] = [state.user_id for state in local_states]
|
|
||||||
|
|
||||||
self.notifier.on_new_replication_data()
|
|
||||||
|
|
||||||
def send_presence_to_destinations(
|
def send_presence_to_destinations(
|
||||||
self, states: Iterable[UserPresenceState], destinations: Iterable[str]
|
self, states: Iterable[UserPresenceState], destinations: Iterable[str]
|
||||||
) -> None:
|
) -> None:
|
||||||
|
@ -325,18 +293,6 @@ class FederationRemoteSendQueue(AbstractFederationSender):
|
||||||
# of the federation stream.
|
# of the federation stream.
|
||||||
rows = [] # type: List[Tuple[int, BaseFederationRow]]
|
rows = [] # type: List[Tuple[int, BaseFederationRow]]
|
||||||
|
|
||||||
# Fetch changed presence
|
|
||||||
i = self.presence_changed.bisect_right(from_token)
|
|
||||||
j = self.presence_changed.bisect_right(to_token) + 1
|
|
||||||
dest_user_ids = [
|
|
||||||
(pos, user_id)
|
|
||||||
for pos, user_id_list in self.presence_changed.items()[i:j]
|
|
||||||
for user_id in user_id_list
|
|
||||||
]
|
|
||||||
|
|
||||||
for (key, user_id) in dest_user_ids:
|
|
||||||
rows.append((key, PresenceRow(state=self.presence_map[user_id])))
|
|
||||||
|
|
||||||
# Fetch presence to send to destinations
|
# Fetch presence to send to destinations
|
||||||
i = self.presence_destinations.bisect_right(from_token)
|
i = self.presence_destinations.bisect_right(from_token)
|
||||||
j = self.presence_destinations.bisect_right(to_token) + 1
|
j = self.presence_destinations.bisect_right(to_token) + 1
|
||||||
|
@ -427,22 +383,6 @@ class BaseFederationRow:
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
|
|
||||||
class PresenceRow(
|
|
||||||
BaseFederationRow, namedtuple("PresenceRow", ("state",)) # UserPresenceState
|
|
||||||
):
|
|
||||||
TypeId = "p"
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def from_data(data):
|
|
||||||
return PresenceRow(state=UserPresenceState.from_dict(data))
|
|
||||||
|
|
||||||
def to_data(self):
|
|
||||||
return self.state.as_dict()
|
|
||||||
|
|
||||||
def add_to_buffer(self, buff):
|
|
||||||
buff.presence.append(self.state)
|
|
||||||
|
|
||||||
|
|
||||||
class PresenceDestinationsRow(
|
class PresenceDestinationsRow(
|
||||||
BaseFederationRow,
|
BaseFederationRow,
|
||||||
namedtuple(
|
namedtuple(
|
||||||
|
@ -506,7 +446,6 @@ class EduRow(BaseFederationRow, namedtuple("EduRow", ("edu",))): # Edu
|
||||||
|
|
||||||
|
|
||||||
_rowtypes = (
|
_rowtypes = (
|
||||||
PresenceRow,
|
|
||||||
PresenceDestinationsRow,
|
PresenceDestinationsRow,
|
||||||
KeyedEduRow,
|
KeyedEduRow,
|
||||||
EduRow,
|
EduRow,
|
||||||
|
@ -518,7 +457,6 @@ TypeToRow = {Row.TypeId: Row for Row in _rowtypes}
|
||||||
ParsedFederationStreamData = namedtuple(
|
ParsedFederationStreamData = namedtuple(
|
||||||
"ParsedFederationStreamData",
|
"ParsedFederationStreamData",
|
||||||
(
|
(
|
||||||
"presence", # list(UserPresenceState)
|
|
||||||
"presence_destinations", # list of tuples of UserPresenceState and destinations
|
"presence_destinations", # list of tuples of UserPresenceState and destinations
|
||||||
"keyed_edus", # dict of destination -> { key -> Edu }
|
"keyed_edus", # dict of destination -> { key -> Edu }
|
||||||
"edus", # dict of destination -> [Edu]
|
"edus", # dict of destination -> [Edu]
|
||||||
|
@ -543,7 +481,6 @@ def process_rows_for_federation(
|
||||||
# them into the appropriate collection and then send them off.
|
# them into the appropriate collection and then send them off.
|
||||||
|
|
||||||
buff = ParsedFederationStreamData(
|
buff = ParsedFederationStreamData(
|
||||||
presence=[],
|
|
||||||
presence_destinations=[],
|
presence_destinations=[],
|
||||||
keyed_edus={},
|
keyed_edus={},
|
||||||
edus={},
|
edus={},
|
||||||
|
@ -559,9 +496,6 @@ def process_rows_for_federation(
|
||||||
parsed_row = RowType.from_data(row.data)
|
parsed_row = RowType.from_data(row.data)
|
||||||
parsed_row.add_to_buffer(buff)
|
parsed_row.add_to_buffer(buff)
|
||||||
|
|
||||||
if buff.presence:
|
|
||||||
transaction_queue.send_presence(buff.presence)
|
|
||||||
|
|
||||||
for state, destinations in buff.presence_destinations:
|
for state, destinations in buff.presence_destinations:
|
||||||
transaction_queue.send_presence_to_destinations(
|
transaction_queue.send_presence_to_destinations(
|
||||||
states=[state], destinations=destinations
|
states=[state], destinations=destinations
|
||||||
|
|
|
@ -24,8 +24,6 @@ from synapse.events import EventBase
|
||||||
from synapse.federation.sender.per_destination_queue import PerDestinationQueue
|
from synapse.federation.sender.per_destination_queue import PerDestinationQueue
|
||||||
from synapse.federation.sender.transaction_manager import TransactionManager
|
from synapse.federation.sender.transaction_manager import TransactionManager
|
||||||
from synapse.federation.units import Edu
|
from synapse.federation.units import Edu
|
||||||
from synapse.handlers.presence import get_interested_remotes
|
|
||||||
from synapse.logging.context import preserve_fn
|
|
||||||
from synapse.metrics import (
|
from synapse.metrics import (
|
||||||
LaterGauge,
|
LaterGauge,
|
||||||
event_processing_loop_counter,
|
event_processing_loop_counter,
|
||||||
|
@ -34,7 +32,7 @@ from synapse.metrics import (
|
||||||
)
|
)
|
||||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||||
from synapse.types import Collection, JsonDict, ReadReceipt, RoomStreamToken
|
from synapse.types import Collection, JsonDict, ReadReceipt, RoomStreamToken
|
||||||
from synapse.util.metrics import Measure, measure_func
|
from synapse.util.metrics import Measure
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from synapse.events.presence_router import PresenceRouter
|
from synapse.events.presence_router import PresenceRouter
|
||||||
|
@ -79,15 +77,6 @@ class AbstractFederationSender(metaclass=abc.ABCMeta):
|
||||||
"""
|
"""
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
@abc.abstractmethod
|
|
||||||
def send_presence(self, states: List[UserPresenceState]) -> None:
|
|
||||||
"""Send the new presence states to the appropriate destinations.
|
|
||||||
|
|
||||||
This actually queues up the presence states ready for sending and
|
|
||||||
triggers a background task to process them and send out the transactions.
|
|
||||||
"""
|
|
||||||
raise NotImplementedError()
|
|
||||||
|
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
def send_presence_to_destinations(
|
def send_presence_to_destinations(
|
||||||
self, states: Iterable[UserPresenceState], destinations: Iterable[str]
|
self, states: Iterable[UserPresenceState], destinations: Iterable[str]
|
||||||
|
@ -176,11 +165,6 @@ class FederationSender(AbstractFederationSender):
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
# Map of user_id -> UserPresenceState for all the pending presence
|
|
||||||
# to be sent out by user_id. Entries here get processed and put in
|
|
||||||
# pending_presence_by_dest
|
|
||||||
self.pending_presence = {} # type: Dict[str, UserPresenceState]
|
|
||||||
|
|
||||||
LaterGauge(
|
LaterGauge(
|
||||||
"synapse_federation_transaction_queue_pending_pdus",
|
"synapse_federation_transaction_queue_pending_pdus",
|
||||||
"",
|
"",
|
||||||
|
@ -201,8 +185,6 @@ class FederationSender(AbstractFederationSender):
|
||||||
self._is_processing = False
|
self._is_processing = False
|
||||||
self._last_poked_id = -1
|
self._last_poked_id = -1
|
||||||
|
|
||||||
self._processing_pending_presence = False
|
|
||||||
|
|
||||||
# map from room_id to a set of PerDestinationQueues which we believe are
|
# map from room_id to a set of PerDestinationQueues which we believe are
|
||||||
# awaiting a call to flush_read_receipts_for_room. The presence of an entry
|
# awaiting a call to flush_read_receipts_for_room. The presence of an entry
|
||||||
# here for a given room means that we are rate-limiting RR flushes to that room,
|
# here for a given room means that we are rate-limiting RR flushes to that room,
|
||||||
|
@ -546,48 +528,6 @@ class FederationSender(AbstractFederationSender):
|
||||||
for queue in queues:
|
for queue in queues:
|
||||||
queue.flush_read_receipts_for_room(room_id)
|
queue.flush_read_receipts_for_room(room_id)
|
||||||
|
|
||||||
@preserve_fn # the caller should not yield on this
|
|
||||||
async def send_presence(self, states: List[UserPresenceState]) -> None:
|
|
||||||
"""Send the new presence states to the appropriate destinations.
|
|
||||||
|
|
||||||
This actually queues up the presence states ready for sending and
|
|
||||||
triggers a background task to process them and send out the transactions.
|
|
||||||
"""
|
|
||||||
if not self.hs.config.use_presence:
|
|
||||||
# No-op if presence is disabled.
|
|
||||||
return
|
|
||||||
|
|
||||||
# First we queue up the new presence by user ID, so multiple presence
|
|
||||||
# updates in quick succession are correctly handled.
|
|
||||||
# We only want to send presence for our own users, so lets always just
|
|
||||||
# filter here just in case.
|
|
||||||
self.pending_presence.update(
|
|
||||||
{state.user_id: state for state in states if self.is_mine_id(state.user_id)}
|
|
||||||
)
|
|
||||||
|
|
||||||
# We then handle the new pending presence in batches, first figuring
|
|
||||||
# out the destinations we need to send each state to and then poking it
|
|
||||||
# to attempt a new transaction. We linearize this so that we don't
|
|
||||||
# accidentally mess up the ordering and send multiple presence updates
|
|
||||||
# in the wrong order
|
|
||||||
if self._processing_pending_presence:
|
|
||||||
return
|
|
||||||
|
|
||||||
self._processing_pending_presence = True
|
|
||||||
try:
|
|
||||||
while True:
|
|
||||||
states_map = self.pending_presence
|
|
||||||
self.pending_presence = {}
|
|
||||||
|
|
||||||
if not states_map:
|
|
||||||
break
|
|
||||||
|
|
||||||
await self._process_presence_inner(list(states_map.values()))
|
|
||||||
except Exception:
|
|
||||||
logger.exception("Error sending presence states to servers")
|
|
||||||
finally:
|
|
||||||
self._processing_pending_presence = False
|
|
||||||
|
|
||||||
def send_presence_to_destinations(
|
def send_presence_to_destinations(
|
||||||
self, states: Iterable[UserPresenceState], destinations: Iterable[str]
|
self, states: Iterable[UserPresenceState], destinations: Iterable[str]
|
||||||
) -> None:
|
) -> None:
|
||||||
|
@ -608,40 +548,6 @@ class FederationSender(AbstractFederationSender):
|
||||||
continue
|
continue
|
||||||
self._get_per_destination_queue(destination).send_presence(states)
|
self._get_per_destination_queue(destination).send_presence(states)
|
||||||
|
|
||||||
@measure_func("txnqueue._process_presence")
|
|
||||||
async def _process_presence_inner(self, states: List[UserPresenceState]) -> None:
|
|
||||||
"""Given a list of states populate self.pending_presence_by_dest and
|
|
||||||
poke to send a new transaction to each destination
|
|
||||||
"""
|
|
||||||
# We pull the presence router here instead of __init__
|
|
||||||
# to prevent a dependency cycle:
|
|
||||||
#
|
|
||||||
# AuthHandler -> Notifier -> FederationSender
|
|
||||||
# -> PresenceRouter -> ModuleApi -> AuthHandler
|
|
||||||
if self._presence_router is None:
|
|
||||||
self._presence_router = self.hs.get_presence_router()
|
|
||||||
|
|
||||||
assert self._presence_router is not None
|
|
||||||
|
|
||||||
hosts_and_states = await get_interested_remotes(
|
|
||||||
self.store,
|
|
||||||
self._presence_router,
|
|
||||||
states,
|
|
||||||
self.state,
|
|
||||||
)
|
|
||||||
|
|
||||||
for destinations, states in hosts_and_states:
|
|
||||||
for destination in destinations:
|
|
||||||
if destination == self.server_name:
|
|
||||||
continue
|
|
||||||
|
|
||||||
if not self._federation_shard_config.should_handle(
|
|
||||||
self._instance_name, destination
|
|
||||||
):
|
|
||||||
continue
|
|
||||||
|
|
||||||
self._get_per_destination_queue(destination).send_presence(states)
|
|
||||||
|
|
||||||
def build_and_send_edu(
|
def build_and_send_edu(
|
||||||
self,
|
self,
|
||||||
destination: str,
|
destination: str,
|
||||||
|
|
|
@ -123,6 +123,14 @@ class BasePresenceHandler(abc.ABC):
|
||||||
def __init__(self, hs: "HomeServer"):
|
def __init__(self, hs: "HomeServer"):
|
||||||
self.clock = hs.get_clock()
|
self.clock = hs.get_clock()
|
||||||
self.store = hs.get_datastore()
|
self.store = hs.get_datastore()
|
||||||
|
self.presence_router = hs.get_presence_router()
|
||||||
|
self.state = hs.get_state_handler()
|
||||||
|
|
||||||
|
self._federation = None
|
||||||
|
if hs.should_send_federation() or not hs.config.worker_app:
|
||||||
|
self._federation = hs.get_federation_sender()
|
||||||
|
|
||||||
|
self._send_federation = hs.should_send_federation()
|
||||||
|
|
||||||
self._busy_presence_enabled = hs.config.experimental.msc3026_enabled
|
self._busy_presence_enabled = hs.config.experimental.msc3026_enabled
|
||||||
|
|
||||||
|
@ -249,6 +257,29 @@ class BasePresenceHandler(abc.ABC):
|
||||||
"""Process presence stream rows received over replication."""
|
"""Process presence stream rows received over replication."""
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
async def maybe_send_presence_to_interested_destinations(
|
||||||
|
self, states: List[UserPresenceState]
|
||||||
|
):
|
||||||
|
"""If this instance is a federation sender, send the states to all
|
||||||
|
destinations that are interested.
|
||||||
|
"""
|
||||||
|
|
||||||
|
if not self._send_federation:
|
||||||
|
return
|
||||||
|
|
||||||
|
# If this worker sends federation we must have a FederationSender.
|
||||||
|
assert self._federation
|
||||||
|
|
||||||
|
hosts_and_states = await get_interested_remotes(
|
||||||
|
self.store,
|
||||||
|
self.presence_router,
|
||||||
|
states,
|
||||||
|
self.state,
|
||||||
|
)
|
||||||
|
|
||||||
|
for destinations, states in hosts_and_states:
|
||||||
|
self._federation.send_presence_to_destinations(states, destinations)
|
||||||
|
|
||||||
|
|
||||||
class _NullContextManager(ContextManager[None]):
|
class _NullContextManager(ContextManager[None]):
|
||||||
"""A context manager which does nothing."""
|
"""A context manager which does nothing."""
|
||||||
|
@ -263,7 +294,6 @@ class WorkerPresenceHandler(BasePresenceHandler):
|
||||||
self.hs = hs
|
self.hs = hs
|
||||||
self.is_mine_id = hs.is_mine_id
|
self.is_mine_id = hs.is_mine_id
|
||||||
|
|
||||||
self.presence_router = hs.get_presence_router()
|
|
||||||
self._presence_enabled = hs.config.use_presence
|
self._presence_enabled = hs.config.use_presence
|
||||||
|
|
||||||
# The number of ongoing syncs on this process, by user id.
|
# The number of ongoing syncs on this process, by user id.
|
||||||
|
@ -388,6 +418,9 @@ class WorkerPresenceHandler(BasePresenceHandler):
|
||||||
users=users_to_states.keys(),
|
users=users_to_states.keys(),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# If this is a federation sender, notify about presence updates.
|
||||||
|
await self.maybe_send_presence_to_interested_destinations(states)
|
||||||
|
|
||||||
async def process_replication_rows(self, token, rows):
|
async def process_replication_rows(self, token, rows):
|
||||||
states = [
|
states = [
|
||||||
UserPresenceState(
|
UserPresenceState(
|
||||||
|
@ -463,9 +496,6 @@ class PresenceHandler(BasePresenceHandler):
|
||||||
self.server_name = hs.hostname
|
self.server_name = hs.hostname
|
||||||
self.wheel_timer = WheelTimer()
|
self.wheel_timer = WheelTimer()
|
||||||
self.notifier = hs.get_notifier()
|
self.notifier = hs.get_notifier()
|
||||||
self.federation = hs.get_federation_sender()
|
|
||||||
self.state = hs.get_state_handler()
|
|
||||||
self.presence_router = hs.get_presence_router()
|
|
||||||
self._presence_enabled = hs.config.use_presence
|
self._presence_enabled = hs.config.use_presence
|
||||||
|
|
||||||
federation_registry = hs.get_federation_registry()
|
federation_registry = hs.get_federation_registry()
|
||||||
|
@ -672,6 +702,13 @@ class PresenceHandler(BasePresenceHandler):
|
||||||
self.unpersisted_users_changes |= {s.user_id for s in new_states}
|
self.unpersisted_users_changes |= {s.user_id for s in new_states}
|
||||||
self.unpersisted_users_changes -= set(to_notify.keys())
|
self.unpersisted_users_changes -= set(to_notify.keys())
|
||||||
|
|
||||||
|
# Check if we need to resend any presence states to remote hosts. We
|
||||||
|
# only do this for states that haven't been updated in a while to
|
||||||
|
# ensure that the remote host doesn't time the presence state out.
|
||||||
|
#
|
||||||
|
# Note that since these are states that have *not* been updated,
|
||||||
|
# they won't get sent down the normal presence replication stream,
|
||||||
|
# and so we have to explicitly send them via the federation stream.
|
||||||
to_federation_ping = {
|
to_federation_ping = {
|
||||||
user_id: state
|
user_id: state
|
||||||
for user_id, state in to_federation_ping.items()
|
for user_id, state in to_federation_ping.items()
|
||||||
|
@ -680,7 +717,19 @@ class PresenceHandler(BasePresenceHandler):
|
||||||
if to_federation_ping:
|
if to_federation_ping:
|
||||||
federation_presence_out_counter.inc(len(to_federation_ping))
|
federation_presence_out_counter.inc(len(to_federation_ping))
|
||||||
|
|
||||||
self._push_to_remotes(to_federation_ping.values())
|
hosts_and_states = await get_interested_remotes(
|
||||||
|
self.store,
|
||||||
|
self.presence_router,
|
||||||
|
list(to_federation_ping.values()),
|
||||||
|
self.state,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Since this is master we know that we have a federation sender or
|
||||||
|
# queue, and so this will be defined.
|
||||||
|
assert self._federation
|
||||||
|
|
||||||
|
for destinations, states in hosts_and_states:
|
||||||
|
self._federation.send_presence_to_destinations(states, destinations)
|
||||||
|
|
||||||
async def _handle_timeouts(self):
|
async def _handle_timeouts(self):
|
||||||
"""Checks the presence of users that have timed out and updates as
|
"""Checks the presence of users that have timed out and updates as
|
||||||
|
@ -920,15 +969,10 @@ class PresenceHandler(BasePresenceHandler):
|
||||||
users=[UserID.from_string(u) for u in users_to_states],
|
users=[UserID.from_string(u) for u in users_to_states],
|
||||||
)
|
)
|
||||||
|
|
||||||
self._push_to_remotes(states)
|
# We only want to poke the local federation sender, if any, as other
|
||||||
|
# workers will receive the presence updates via the presence replication
|
||||||
def _push_to_remotes(self, states):
|
# stream (which is updated by `store.update_presence`).
|
||||||
"""Sends state updates to remote servers.
|
await self.maybe_send_presence_to_interested_destinations(states)
|
||||||
|
|
||||||
Args:
|
|
||||||
states (list(UserPresenceState))
|
|
||||||
"""
|
|
||||||
self.federation.send_presence(states)
|
|
||||||
|
|
||||||
async def incoming_presence(self, origin, content):
|
async def incoming_presence(self, origin, content):
|
||||||
"""Called when we receive a `m.presence` EDU from a remote server."""
|
"""Called when we receive a `m.presence` EDU from a remote server."""
|
||||||
|
@ -1164,9 +1208,13 @@ class PresenceHandler(BasePresenceHandler):
|
||||||
user_presence_states
|
user_presence_states
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Since this is master we know that we have a federation sender or
|
||||||
|
# queue, and so this will be defined.
|
||||||
|
assert self._federation
|
||||||
|
|
||||||
# Send out user presence updates for each destination
|
# Send out user presence updates for each destination
|
||||||
for destination, user_state_set in presence_destinations.items():
|
for destination, user_state_set in presence_destinations.items():
|
||||||
self.federation.send_presence_to_destinations(
|
self._federation.send_presence_to_destinations(
|
||||||
destinations=[destination], states=user_state_set
|
destinations=[destination], states=user_state_set
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -50,6 +50,7 @@ class ModuleApi:
|
||||||
self._auth_handler = auth_handler
|
self._auth_handler = auth_handler
|
||||||
self._server_name = hs.hostname
|
self._server_name = hs.hostname
|
||||||
self._presence_stream = hs.get_event_sources().sources["presence"]
|
self._presence_stream = hs.get_event_sources().sources["presence"]
|
||||||
|
self._state = hs.get_state_handler()
|
||||||
|
|
||||||
# We expose these as properties below in order to attach a helpful docstring.
|
# We expose these as properties below in order to attach a helpful docstring.
|
||||||
self._http_client = hs.get_simple_http_client() # type: SimpleHttpClient
|
self._http_client = hs.get_simple_http_client() # type: SimpleHttpClient
|
||||||
|
@ -429,11 +430,13 @@ class ModuleApi:
|
||||||
UserID.from_string(user), from_key=None, include_offline=False
|
UserID.from_string(user), from_key=None, include_offline=False
|
||||||
)
|
)
|
||||||
|
|
||||||
# Send to remote destinations
|
# Send to remote destinations.
|
||||||
await make_deferred_yieldable(
|
|
||||||
# We pull the federation sender here as we can only do so on workers
|
# We pull out the presence handler here to break a cyclic
|
||||||
# that support sending presence
|
# dependency between the presence router and module API.
|
||||||
self._hs.get_federation_sender().send_presence(presence_events)
|
presence_handler = self._hs.get_presence_handler()
|
||||||
|
await presence_handler.maybe_send_presence_to_interested_destinations(
|
||||||
|
presence_events
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue