Merge pull request #150 from matrix-org/notifier_unify

Make v1 and v2 client APIs interact with the notifier in the same way.
This commit is contained in:
Mark Haines 2015-05-14 14:16:59 +01:00
commit 4770cec7bc

View file

@ -16,6 +16,7 @@
from twisted.internet import defer
from synapse.util.logutils import log_function
from synapse.util.async import run_on_reactor
from synapse.types import StreamToken
import synapse.metrics
@ -49,13 +50,9 @@ class _NotificationListener(object):
so that it can remove itself from the indexes in the Notifier class.
"""
def __init__(self, user, rooms, from_token, limit, timeout, deferred,
appservice=None):
def __init__(self, user, rooms, deferred, appservice=None):
self.user = user
self.appservice = appservice
self.from_token = from_token
self.limit = limit
self.timeout = timeout
self.deferred = deferred
self.rooms = rooms
self.timer = None
@ -63,17 +60,14 @@ class _NotificationListener(object):
def notified(self):
return self.deferred.called
def notify(self, notifier, events, start_token, end_token):
def notify(self, notifier):
""" Inform whoever is listening about the new events. This will
also remove this listener from all the indexes in the Notifier
it knows about.
"""
result = (events, (start_token, end_token))
try:
self.deferred.callback(result)
notified_events_counter.inc_by(len(events))
self.deferred.callback(None)
except defer.AlreadyCalledError:
pass
@ -160,6 +154,7 @@ class Notifier(object):
listening to the room, and any listeners for the users in the
`extra_users` param.
"""
yield run_on_reactor()
# poke any interested application service.
self.hs.get_handlers().appservice_handler.notify_interested_services(
event
@ -167,8 +162,6 @@ class Notifier(object):
room_id = event.room_id
room_source = self.event_sources.sources["room"]
room_listeners = self.room_to_listeners.get(room_id, set())
_discard_if_notified(room_listeners)
@ -199,33 +192,11 @@ class Notifier(object):
logger.debug("on_new_room_event listeners %s", listeners)
# TODO (erikj): Can we make this more efficient by hitting the
# db once?
@defer.inlineCallbacks
def notify(listener):
events, end_key = yield room_source.get_new_events_for_user(
listener.user,
listener.from_token.room_key,
listener.limit,
)
if events:
end_token = listener.from_token.copy_and_replace(
"room_key", end_key
)
listener.notify(
self, events, listener.from_token, end_token
)
def eb(failure):
logger.exception("Failed to notify listener", failure)
yield defer.DeferredList(
[notify(l).addErrback(eb) for l in listeners],
consumeErrors=True,
)
for listener in listeners:
try:
listener.notify(self)
except:
logger.exception("Failed to notify listener")
@defer.inlineCallbacks
@log_function
@ -235,11 +206,7 @@ class Notifier(object):
Will wake up all listeners for the given users and rooms.
"""
# TODO(paul): This is horrible, having to manually list every event
# source here individually
presence_source = self.event_sources.sources["presence"]
typing_source = self.event_sources.sources["typing"]
yield run_on_reactor()
listeners = set()
for user in users:
@ -256,68 +223,29 @@ class Notifier(object):
listeners |= room_listeners
@defer.inlineCallbacks
def notify(listener):
presence_events, presence_end_key = (
yield presence_source.get_new_events_for_user(
listener.user,
listener.from_token.presence_key,
listener.limit,
)
)
typing_events, typing_end_key = (
yield typing_source.get_new_events_for_user(
listener.user,
listener.from_token.typing_key,
listener.limit,
)
)
if presence_events or typing_events:
end_token = listener.from_token.copy_and_replace(
"presence_key", presence_end_key
).copy_and_replace(
"typing_key", typing_end_key
)
listener.notify(
self,
presence_events + typing_events,
listener.from_token,
end_token
)
def eb(failure):
logger.error(
"Failed to notify listener",
exc_info=(
failure.type,
failure.value,
failure.getTracebackObject())
)
yield defer.DeferredList(
[notify(l).addErrback(eb) for l in listeners],
consumeErrors=True,
)
for listener in listeners:
try:
listener.notify(self)
except:
logger.exception("Failed to notify listener")
@defer.inlineCallbacks
def wait_for_events(self, user, rooms, filter, timeout, callback):
def wait_for_events(self, user, rooms, timeout, callback,
from_token=StreamToken("s0", "0", "0")):
"""Wait until the callback returns a non empty response or the
timeout fires.
"""
deferred = defer.Deferred()
from_token = StreamToken("s0", "0", "0")
appservice = yield self.hs.get_datastore().get_app_service_by_user_id(
user.to_string()
)
listener = [_NotificationListener(
user=user,
rooms=rooms,
from_token=from_token,
limit=1,
timeout=timeout,
deferred=deferred,
appservice=appservice,
)]
if timeout:
@ -332,7 +260,7 @@ class Notifier(object):
def _timeout_listener():
timed_out[0] = True
timer[0] = None
listener[0].notify(self, [], from_token, from_token)
listener[0].notify(self)
# We create multiple notification listeners so we have to manage
# canceling the timeout ourselves.
@ -344,10 +272,8 @@ class Notifier(object):
listener[0] = _NotificationListener(
user=user,
rooms=rooms,
from_token=from_token,
limit=1,
timeout=timeout,
deferred=deferred,
appservice=appservice,
)
self._register_with_keys(listener[0])
result = yield callback()
@ -360,65 +286,43 @@ class Notifier(object):
defer.returnValue(result)
@defer.inlineCallbacks
def get_events_for(self, user, rooms, pagination_config, timeout):
""" For the given user and rooms, return any new events for them. If
there are no new events wait for up to `timeout` milliseconds for any
new events to happen before returning.
"""
deferred = defer.Deferred()
self._get_events(
deferred, user, rooms, pagination_config.from_token,
pagination_config.limit, timeout
).addErrback(deferred.errback)
return deferred
@defer.inlineCallbacks
def _get_events(self, deferred, user, rooms, from_token, limit, timeout):
from_token = pagination_config.from_token
if not from_token:
from_token = yield self.event_sources.get_current_token()
appservice = yield self.hs.get_datastore().get_app_service_by_user_id(
user.to_string()
)
limit = pagination_config.limit
listener = _NotificationListener(
user,
rooms,
from_token,
limit,
timeout,
deferred,
appservice=appservice
)
def _timeout_listener():
# TODO (erikj): We should probably set to_token to the current
# max rather than reusing from_token.
# Remove the timer from the listener so we don't try to cancel it.
listener.timer = None
listener.notify(
self,
[],
listener.from_token,
listener.from_token,
)
if timeout:
self._register_with_keys(listener)
yield self._check_for_updates(listener)
if not timeout:
_timeout_listener()
else:
# Only add the timer if the listener hasn't been notified
if not listener.notified():
listener.timer = self.clock.call_later(
timeout/1000.0, _timeout_listener
@defer.inlineCallbacks
def check_for_updates():
events = []
end_token = from_token
for name, source in self.event_sources.sources.items():
keyname = "%s_key" % name
stuff, new_key = yield source.get_new_events_for_user(
user, getattr(from_token, keyname), limit,
)
return
events.extend(stuff)
end_token = end_token.copy_and_replace(keyname, new_key)
if events:
defer.returnValue((events, (from_token, end_token)))
else:
defer.returnValue(None)
result = yield self.wait_for_events(
user, rooms, timeout, check_for_updates, from_token=from_token
)
if result is None:
result = ([], (from_token, from_token))
defer.returnValue(result)
@log_function
def _register_with_keys(self, listener):
@ -433,36 +337,6 @@ class Notifier(object):
listener.appservice, set()
).add(listener)
@defer.inlineCallbacks
@log_function
def _check_for_updates(self, listener):
# TODO (erikj): We need to think about limits across multiple sources
events = []
from_token = listener.from_token
limit = listener.limit
# TODO (erikj): DeferredList?
for name, source in self.event_sources.sources.items():
keyname = "%s_key" % name
stuff, new_key = yield source.get_new_events_for_user(
listener.user,
getattr(from_token, keyname),
limit,
)
events.extend(stuff)
from_token = from_token.copy_and_replace(keyname, new_key)
end_token = from_token
if events:
listener.notify(self, events, listener.from_token, end_token)
defer.returnValue(listener)
def _user_joined_room(self, user, room_id):
new_listeners = self.user_to_listeners.get(user, set())