forked from MirrorHub/synapse
Merge branch 'notifier_unify' into notifier_performance
This commit is contained in:
commit
899d4675dd
4 changed files with 10 additions and 21 deletions
|
@ -117,7 +117,7 @@ Installing prerequisites on Mac OS X::
|
||||||
|
|
||||||
To install the synapse homeserver run::
|
To install the synapse homeserver run::
|
||||||
|
|
||||||
$ virtualenv ~/.synapse
|
$ virtualenv -p python2.7 ~/.synapse
|
||||||
$ source ~/.synapse/bin/activate
|
$ source ~/.synapse/bin/activate
|
||||||
$ pip install --process-dependency-links https://github.com/matrix-org/synapse/tarball/master
|
$ pip install --process-dependency-links https://github.com/matrix-org/synapse/tarball/master
|
||||||
|
|
||||||
|
|
|
@ -315,6 +315,7 @@ class FederationHandler(BaseHandler):
|
||||||
|
|
||||||
likely_domains = [
|
likely_domains = [
|
||||||
domain for domain, depth in curr_domains
|
domain for domain, depth in curr_domains
|
||||||
|
if domain is not self.server_name
|
||||||
]
|
]
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
|
@ -364,6 +365,7 @@ class FederationHandler(BaseHandler):
|
||||||
# from the time.
|
# from the time.
|
||||||
|
|
||||||
tried_domains = set(likely_domains)
|
tried_domains = set(likely_domains)
|
||||||
|
tried_domains.add(self.server_name)
|
||||||
|
|
||||||
event_ids = list(extremities.keys())
|
event_ids = list(extremities.keys())
|
||||||
|
|
||||||
|
|
|
@ -26,7 +26,6 @@ import synapse.metrics
|
||||||
from ._base import BaseHandler
|
from ._base import BaseHandler
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
from collections import OrderedDict
|
|
||||||
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
@ -144,7 +143,7 @@ class PresenceHandler(BaseHandler):
|
||||||
self._remote_offline_serials = []
|
self._remote_offline_serials = []
|
||||||
|
|
||||||
# map any user to a UserPresenceCache
|
# map any user to a UserPresenceCache
|
||||||
self._user_cachemap = OrderedDict() # keep them sorted by serial
|
self._user_cachemap = {}
|
||||||
self._user_cachemap_latest_serial = 0
|
self._user_cachemap_latest_serial = 0
|
||||||
|
|
||||||
metrics.register_callback(
|
metrics.register_callback(
|
||||||
|
@ -166,14 +165,6 @@ class PresenceHandler(BaseHandler):
|
||||||
else:
|
else:
|
||||||
return UserPresenceCache()
|
return UserPresenceCache()
|
||||||
|
|
||||||
def _bump_serial(self, user=None):
|
|
||||||
self._user_cachemap_latest_serial += 1
|
|
||||||
|
|
||||||
if user:
|
|
||||||
# Move to end
|
|
||||||
cache = self._user_cachemap.pop(user)
|
|
||||||
self._user_cachemap[user] = cache
|
|
||||||
|
|
||||||
def registered_user(self, user):
|
def registered_user(self, user):
|
||||||
return self.store.create_presence(user.localpart)
|
return self.store.create_presence(user.localpart)
|
||||||
|
|
||||||
|
@ -309,7 +300,7 @@ class PresenceHandler(BaseHandler):
|
||||||
def changed_presencelike_data(self, user, state):
|
def changed_presencelike_data(self, user, state):
|
||||||
statuscache = self._get_or_make_usercache(user)
|
statuscache = self._get_or_make_usercache(user)
|
||||||
|
|
||||||
self._bump_serial(user=user)
|
self._user_cachemap_latest_serial += 1
|
||||||
statuscache.update(state, serial=self._user_cachemap_latest_serial)
|
statuscache.update(state, serial=self._user_cachemap_latest_serial)
|
||||||
|
|
||||||
return self.push_presence(user, statuscache=statuscache)
|
return self.push_presence(user, statuscache=statuscache)
|
||||||
|
@ -331,7 +322,7 @@ class PresenceHandler(BaseHandler):
|
||||||
|
|
||||||
# No actual update but we need to bump the serial anyway for the
|
# No actual update but we need to bump the serial anyway for the
|
||||||
# event source
|
# event source
|
||||||
self._bump_serial()
|
self._user_cachemap_latest_serial += 1
|
||||||
statuscache.update({}, serial=self._user_cachemap_latest_serial)
|
statuscache.update({}, serial=self._user_cachemap_latest_serial)
|
||||||
|
|
||||||
self.push_update_to_local_and_remote(
|
self.push_update_to_local_and_remote(
|
||||||
|
@ -715,7 +706,7 @@ class PresenceHandler(BaseHandler):
|
||||||
|
|
||||||
statuscache = self._get_or_make_usercache(user)
|
statuscache = self._get_or_make_usercache(user)
|
||||||
|
|
||||||
self._bump_serial(user=user)
|
self._user_cachemap_latest_serial += 1
|
||||||
statuscache.update(state, serial=self._user_cachemap_latest_serial)
|
statuscache.update(state, serial=self._user_cachemap_latest_serial)
|
||||||
|
|
||||||
if not observers and not room_ids:
|
if not observers and not room_ids:
|
||||||
|
@ -877,15 +868,10 @@ class PresenceEventSource(object):
|
||||||
|
|
||||||
updates = []
|
updates = []
|
||||||
# TODO(paul): use a DeferredList ? How to limit concurrency.
|
# TODO(paul): use a DeferredList ? How to limit concurrency.
|
||||||
for observed_user in reversed(cachemap.keys()):
|
for observed_user in cachemap.keys():
|
||||||
cached = cachemap[observed_user]
|
cached = cachemap[observed_user]
|
||||||
|
|
||||||
# Since this is ordered in descending order of serial, we can just
|
if cached.serial <= from_key or cached.serial > max_serial:
|
||||||
# stop once we've seen enough
|
|
||||||
if cached.serial <= from_key:
|
|
||||||
break
|
|
||||||
|
|
||||||
if cached.serial > max_serial:
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if not (yield self.is_visible(observer_user, observed_user)):
|
if not (yield self.is_visible(observer_user, observed_user)):
|
||||||
|
|
|
@ -48,6 +48,7 @@ class EventsStore(SQLBaseStore):
|
||||||
@contextmanager
|
@contextmanager
|
||||||
def stream_ordering_manager():
|
def stream_ordering_manager():
|
||||||
yield stream_ordering
|
yield stream_ordering
|
||||||
|
stream_ordering_manager = stream_ordering_manager()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
with stream_ordering_manager as stream_ordering:
|
with stream_ordering_manager as stream_ordering:
|
||||||
|
|
Loading…
Reference in a new issue