forked from MirrorHub/synapse
Use wait_for_events to implement 'get_events'
This commit is contained in:
parent
d244fa9741
commit
5e3b254dc8
1 changed files with 33 additions and 82 deletions
|
@ -305,14 +305,16 @@ class Notifier(object):
|
|||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def wait_for_events(self, user, rooms, filter, timeout, callback):
|
||||
def wait_for_events(self, user, rooms, timeout, callback,
|
||||
from_token=StreamToken("s0", "0", "0")):
|
||||
"""Wait until the callback returns a non empty response or the
|
||||
timeout fires.
|
||||
"""
|
||||
|
||||
deferred = defer.Deferred()
|
||||
|
||||
from_token = StreamToken("s0", "0", "0")
|
||||
appservice = yield self.hs.get_datastore().get_app_service_by_user_id(
|
||||
user.to_string()
|
||||
)
|
||||
|
||||
listener = [_NotificationListener(
|
||||
user=user,
|
||||
|
@ -321,6 +323,7 @@ class Notifier(object):
|
|||
limit=1,
|
||||
timeout=timeout,
|
||||
deferred=deferred,
|
||||
appservice=appservice,
|
||||
)]
|
||||
|
||||
if timeout:
|
||||
|
@ -363,65 +366,43 @@ class Notifier(object):
|
|||
|
||||
defer.returnValue(result)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_events_for(self, user, rooms, pagination_config, timeout):
|
||||
""" For the given user and rooms, return any new events for them. If
|
||||
there are no new events wait for up to `timeout` milliseconds for any
|
||||
new events to happen before returning.
|
||||
"""
|
||||
deferred = defer.Deferred()
|
||||
|
||||
self._get_events(
|
||||
deferred, user, rooms, pagination_config.from_token,
|
||||
pagination_config.limit, timeout
|
||||
).addErrback(deferred.errback)
|
||||
|
||||
return deferred
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _get_events(self, deferred, user, rooms, from_token, limit, timeout):
|
||||
from_token = pagination_config.from_token
|
||||
if not from_token:
|
||||
from_token = yield self.event_sources.get_current_token()
|
||||
|
||||
appservice = yield self.hs.get_datastore().get_app_service_by_user_id(
|
||||
user.to_string()
|
||||
limit = pagination_config.limit
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def check_for_updates():
|
||||
events = []
|
||||
end_token = from_token
|
||||
for name, source in self.event_sources.sources.items():
|
||||
keyname = "%s_key" % name
|
||||
stuff, new_key = yield source.get_new_events_for_user(
|
||||
user, getattr(from_token, keyname), limit,
|
||||
)
|
||||
events.extend(stuff)
|
||||
end_token = from_token.copy_and_replace(keyname, new_key)
|
||||
|
||||
listener = _NotificationListener(
|
||||
user,
|
||||
rooms,
|
||||
from_token,
|
||||
limit,
|
||||
timeout,
|
||||
deferred,
|
||||
appservice=appservice
|
||||
)
|
||||
|
||||
def _timeout_listener():
|
||||
# TODO (erikj): We should probably set to_token to the current
|
||||
# max rather than reusing from_token.
|
||||
# Remove the timer from the listener so we don't try to cancel it.
|
||||
listener.timer = None
|
||||
listener.notify(
|
||||
self,
|
||||
[],
|
||||
listener.from_token,
|
||||
listener.from_token,
|
||||
)
|
||||
|
||||
if timeout:
|
||||
self._register_with_keys(listener)
|
||||
|
||||
yield self._check_for_updates(listener)
|
||||
|
||||
if not timeout:
|
||||
_timeout_listener()
|
||||
if events:
|
||||
defer.returnValue((events, (from_token, end_token)))
|
||||
else:
|
||||
# Only add the timer if the listener hasn't been notified
|
||||
if not listener.notified():
|
||||
listener.timer = self.clock.call_later(
|
||||
timeout/1000.0, _timeout_listener
|
||||
defer.returnValue(None)
|
||||
|
||||
result = yield self.wait_for_events(
|
||||
user, rooms, timeout, check_for_updates, from_token=from_token
|
||||
)
|
||||
return
|
||||
|
||||
if result is None:
|
||||
result = ([], (from_token, from_token))
|
||||
|
||||
defer.returnValue(result)
|
||||
|
||||
@log_function
|
||||
def _register_with_keys(self, listener):
|
||||
|
@ -436,36 +417,6 @@ class Notifier(object):
|
|||
listener.appservice, set()
|
||||
).add(listener)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@log_function
|
||||
def _check_for_updates(self, listener):
|
||||
# TODO (erikj): We need to think about limits across multiple sources
|
||||
events = []
|
||||
|
||||
from_token = listener.from_token
|
||||
limit = listener.limit
|
||||
|
||||
# TODO (erikj): DeferredList?
|
||||
for name, source in self.event_sources.sources.items():
|
||||
keyname = "%s_key" % name
|
||||
|
||||
stuff, new_key = yield source.get_new_events_for_user(
|
||||
listener.user,
|
||||
getattr(from_token, keyname),
|
||||
limit,
|
||||
)
|
||||
|
||||
events.extend(stuff)
|
||||
|
||||
from_token = from_token.copy_and_replace(keyname, new_key)
|
||||
|
||||
end_token = from_token
|
||||
|
||||
if events:
|
||||
listener.notify(self, events, listener.from_token, end_token)
|
||||
|
||||
defer.returnValue(listener)
|
||||
|
||||
def _user_joined_room(self, user, room_id):
|
||||
new_listeners = self.user_to_listeners.get(user, set())
|
||||
|
||||
|
|
Loading…
Reference in a new issue