From 1b7686329e96f1bde2a14cc35c98c2761e224c6e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 28 Aug 2014 17:43:15 +0100 Subject: [PATCH] Don't query the rooms members table so much by using the new notifier api that allows you to specify room_ids to notify. --- synapse/handlers/events.py | 2 +- synapse/handlers/presence.py | 106 +++++++++++++++-------------------- 2 files changed, 46 insertions(+), 62 deletions(-) diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index f814e7165..980a169b2 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -100,7 +100,7 @@ class EventStreamHandler(BaseHandler): logger.debug("Scheduling _later: for %s", auth_user) self._stop_timer_per_user[auth_user] = ( - self.clock.call_later(600, _later) + self.clock.call_later(30, _later) ) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 414a5b7bd..677c1b2d8 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -260,24 +260,29 @@ class PresenceHandler(BaseHandler): @defer.inlineCallbacks def user_joined_room(self, user, room_id): - localusers = set() - remotedomains = set() - - rm_handler = self.homeserver.get_handlers().room_member_handler - yield rm_handler.fetch_room_distributions_into(room_id, - localusers=localusers, remotedomains=remotedomains, - ignore_user=user) + statuscache = self._get_or_make_usercache(user) if user.is_mine: - yield self._send_presence_to_distribution(srcuser=user, - localusers=localusers, remotedomains=remotedomains, - statuscache=self._get_or_offline_usercache(user), + remote_domains = set( + (yield self.store.get_joined_hosts_for_room(room_id)) ) - for srcuser in localusers: - yield self._send_presence(srcuser=srcuser, destuser=user, - statuscache=self._get_or_offline_usercache(srcuser), - ) + if not remote_domains: + defer.returnValue(None) + + deferreds = [] + for domain in remote_domains: + logger.debug(" | push to remote domain %s", domain) + deferreds.append(self._push_presence_remote(user, domain, + state=statuscache.get_state()) + ) + + + self.push_update_to_clients_2( + observed_user=user, + room_ids=[room_id], + statuscache=self._get_or_offline_usercache(user), + ) @defer.inlineCallbacks def send_invite(self, observer_user, observed_user): @@ -546,53 +551,28 @@ class PresenceHandler(BaseHandler): rm_handler = self.homeserver.get_handlers().room_member_handler room_ids = yield rm_handler.get_rooms_for_user(user) + remote_domains = set() for room_id in room_ids: - yield rm_handler.fetch_room_distributions_into( - room_id, localusers=localusers, remotedomains=remotedomains, - ignore_user=user, + remote_domains.update( + (yield self.store.get_joined_hosts_for_room(room_id)) ) if not localusers and not remotedomains: defer.returnValue(None) - yield self._send_presence_to_distribution(user, - localusers=localusers, remotedomains=remotedomains, - statuscache=statuscache - ) - - def _send_presence(self, srcuser, destuser, statuscache): - if destuser.is_mine: - self.push_update_to_clients( - observer_user=destuser, - observed_user=srcuser, - statuscache=statuscache) - return defer.succeed(None) - else: - return self._push_presence_remote(srcuser, destuser.domain, - state=statuscache.get_state() - ) - - @defer.inlineCallbacks - @trace_function - def _send_presence_to_distribution(self, srcuser, localusers=set(), - remotedomains=set(), statuscache=None): - - for u in localusers: - logger.debug(" | push to local user %s", u) - self.push_update_to_clients( - observer_user=u, - observed_user=srcuser, - statuscache=statuscache, - ) - deferreds = [] for domain in remotedomains: logger.debug(" | push to remote domain %s", domain) - deferreds.append(self._push_presence_remote(srcuser, domain, + deferreds.append(self._push_presence_remote(user, domain, state=statuscache.get_state()) ) - yield defer.DeferredList(deferreds) + self.push_update_to_clients_2( + observed_user=user, + users_to_push=localusers, + room_ids=room_ids, + statuscache=statuscache, + ) @defer.inlineCallbacks def _push_presence_remote(self, user, destination, state=None): @@ -633,12 +613,7 @@ class PresenceHandler(BaseHandler): rm_handler = self.homeserver.get_handlers().room_member_handler room_ids = yield rm_handler.get_rooms_for_user(user) - for room_id in room_ids: - yield rm_handler.fetch_room_distributions_into( - room_id, localusers=observers, ignore_user=user - ) - - if not observers: + if not observers and not room_ids: break state = dict(push) @@ -654,12 +629,12 @@ class PresenceHandler(BaseHandler): self._user_cachemap_latest_serial += 1 statuscache.update(state, serial=self._user_cachemap_latest_serial) - for observer_user in observers: - self.push_update_to_clients( - observer_user=observer_user, - observed_user=user, - statuscache=statuscache, - ) + self.push_update_to_clients_2( + observed_user=user, + users_to_push=observers, + room_ids=room_ids, + statuscache=statuscache, + ) if state["state"] == PresenceState.OFFLINE: del self._user_cachemap[user] @@ -701,6 +676,15 @@ class PresenceHandler(BaseHandler): [observer_user], ) + def push_update_to_clients_2(self, observed_user, users_to_push=[], + room_ids=[], statuscache=None): + statuscache.make_event(user=observed_user, clock=self.clock) + + self.notifier.on_new_user_event( + users_to_push, + room_ids, + ) + class UserPresenceCache(object): """Store an observed user's state and status message.