forked from MirrorHub/synapse
Don't query the rooms members table so much by using the new notifier api that allows you to specify room_ids to notify.
This commit is contained in:
parent
54d0a75573
commit
1b7686329e
2 changed files with 46 additions and 62 deletions
|
@ -100,7 +100,7 @@ class EventStreamHandler(BaseHandler):
|
|||
|
||||
logger.debug("Scheduling _later: for %s", auth_user)
|
||||
self._stop_timer_per_user[auth_user] = (
|
||||
self.clock.call_later(600, _later)
|
||||
self.clock.call_later(30, _later)
|
||||
)
|
||||
|
||||
|
||||
|
|
|
@ -260,24 +260,29 @@ class PresenceHandler(BaseHandler):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def user_joined_room(self, user, room_id):
|
||||
localusers = set()
|
||||
remotedomains = set()
|
||||
|
||||
rm_handler = self.homeserver.get_handlers().room_member_handler
|
||||
yield rm_handler.fetch_room_distributions_into(room_id,
|
||||
localusers=localusers, remotedomains=remotedomains,
|
||||
ignore_user=user)
|
||||
statuscache = self._get_or_make_usercache(user)
|
||||
|
||||
if user.is_mine:
|
||||
yield self._send_presence_to_distribution(srcuser=user,
|
||||
localusers=localusers, remotedomains=remotedomains,
|
||||
statuscache=self._get_or_offline_usercache(user),
|
||||
remote_domains = set(
|
||||
(yield self.store.get_joined_hosts_for_room(room_id))
|
||||
)
|
||||
|
||||
for srcuser in localusers:
|
||||
yield self._send_presence(srcuser=srcuser, destuser=user,
|
||||
statuscache=self._get_or_offline_usercache(srcuser),
|
||||
)
|
||||
if not remote_domains:
|
||||
defer.returnValue(None)
|
||||
|
||||
deferreds = []
|
||||
for domain in remote_domains:
|
||||
logger.debug(" | push to remote domain %s", domain)
|
||||
deferreds.append(self._push_presence_remote(user, domain,
|
||||
state=statuscache.get_state())
|
||||
)
|
||||
|
||||
|
||||
self.push_update_to_clients_2(
|
||||
observed_user=user,
|
||||
room_ids=[room_id],
|
||||
statuscache=self._get_or_offline_usercache(user),
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def send_invite(self, observer_user, observed_user):
|
||||
|
@ -546,53 +551,28 @@ class PresenceHandler(BaseHandler):
|
|||
rm_handler = self.homeserver.get_handlers().room_member_handler
|
||||
room_ids = yield rm_handler.get_rooms_for_user(user)
|
||||
|
||||
remote_domains = set()
|
||||
for room_id in room_ids:
|
||||
yield rm_handler.fetch_room_distributions_into(
|
||||
room_id, localusers=localusers, remotedomains=remotedomains,
|
||||
ignore_user=user,
|
||||
remote_domains.update(
|
||||
(yield self.store.get_joined_hosts_for_room(room_id))
|
||||
)
|
||||
|
||||
if not localusers and not remotedomains:
|
||||
defer.returnValue(None)
|
||||
|
||||
yield self._send_presence_to_distribution(user,
|
||||
localusers=localusers, remotedomains=remotedomains,
|
||||
statuscache=statuscache
|
||||
)
|
||||
|
||||
def _send_presence(self, srcuser, destuser, statuscache):
|
||||
if destuser.is_mine:
|
||||
self.push_update_to_clients(
|
||||
observer_user=destuser,
|
||||
observed_user=srcuser,
|
||||
statuscache=statuscache)
|
||||
return defer.succeed(None)
|
||||
else:
|
||||
return self._push_presence_remote(srcuser, destuser.domain,
|
||||
state=statuscache.get_state()
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@trace_function
|
||||
def _send_presence_to_distribution(self, srcuser, localusers=set(),
|
||||
remotedomains=set(), statuscache=None):
|
||||
|
||||
for u in localusers:
|
||||
logger.debug(" | push to local user %s", u)
|
||||
self.push_update_to_clients(
|
||||
observer_user=u,
|
||||
observed_user=srcuser,
|
||||
statuscache=statuscache,
|
||||
)
|
||||
|
||||
deferreds = []
|
||||
for domain in remotedomains:
|
||||
logger.debug(" | push to remote domain %s", domain)
|
||||
deferreds.append(self._push_presence_remote(srcuser, domain,
|
||||
deferreds.append(self._push_presence_remote(user, domain,
|
||||
state=statuscache.get_state())
|
||||
)
|
||||
|
||||
yield defer.DeferredList(deferreds)
|
||||
self.push_update_to_clients_2(
|
||||
observed_user=user,
|
||||
users_to_push=localusers,
|
||||
room_ids=room_ids,
|
||||
statuscache=statuscache,
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _push_presence_remote(self, user, destination, state=None):
|
||||
|
@ -633,12 +613,7 @@ class PresenceHandler(BaseHandler):
|
|||
rm_handler = self.homeserver.get_handlers().room_member_handler
|
||||
room_ids = yield rm_handler.get_rooms_for_user(user)
|
||||
|
||||
for room_id in room_ids:
|
||||
yield rm_handler.fetch_room_distributions_into(
|
||||
room_id, localusers=observers, ignore_user=user
|
||||
)
|
||||
|
||||
if not observers:
|
||||
if not observers and not room_ids:
|
||||
break
|
||||
|
||||
state = dict(push)
|
||||
|
@ -654,12 +629,12 @@ class PresenceHandler(BaseHandler):
|
|||
self._user_cachemap_latest_serial += 1
|
||||
statuscache.update(state, serial=self._user_cachemap_latest_serial)
|
||||
|
||||
for observer_user in observers:
|
||||
self.push_update_to_clients(
|
||||
observer_user=observer_user,
|
||||
observed_user=user,
|
||||
statuscache=statuscache,
|
||||
)
|
||||
self.push_update_to_clients_2(
|
||||
observed_user=user,
|
||||
users_to_push=observers,
|
||||
room_ids=room_ids,
|
||||
statuscache=statuscache,
|
||||
)
|
||||
|
||||
if state["state"] == PresenceState.OFFLINE:
|
||||
del self._user_cachemap[user]
|
||||
|
@ -701,6 +676,15 @@ class PresenceHandler(BaseHandler):
|
|||
[observer_user],
|
||||
)
|
||||
|
||||
def push_update_to_clients_2(self, observed_user, users_to_push=[],
|
||||
room_ids=[], statuscache=None):
|
||||
statuscache.make_event(user=observed_user, clock=self.clock)
|
||||
|
||||
self.notifier.on_new_user_event(
|
||||
users_to_push,
|
||||
room_ids,
|
||||
)
|
||||
|
||||
|
||||
class UserPresenceCache(object):
|
||||
"""Store an observed user's state and status message.
|
||||
|
|
Loading…
Reference in a new issue