diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 93dcd4032..4993c92b7 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -15,6 +15,7 @@ from twisted.internet import defer +from synapse.util.logcontext import PreserveLoggingContext from synapse.util.logutils import log_function from ._base import BaseHandler @@ -66,9 +67,10 @@ class EventStreamHandler(BaseHandler): rm_handler = self.hs.get_handlers().room_member_handler room_ids = yield rm_handler.get_rooms_for_user(auth_user) - events, tokens = yield self.notifier.get_events_for( - auth_user, room_ids, pagin_config, timeout - ) + with PreserveLoggingContext(): + events, tokens = yield self.notifier.get_events_for( + auth_user, room_ids, pagin_config, timeout + ) chunks = [self.hs.serialize_event(e) for e in events] diff --git a/synapse/notifier.py b/synapse/notifier.py index f38c410e3..022d60a3b 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -16,6 +16,7 @@ from twisted.internet import defer, reactor from synapse.util.logutils import log_function +from synapse.util.logcontext import PreserveLoggingContext import logging @@ -79,6 +80,8 @@ class Notifier(object): self.event_sources = hs.get_event_sources() + self.clock = hs.get_clock() + hs.get_distributor().observe( "user_joined_room", self._user_joined_room ) @@ -127,9 +130,10 @@ class Notifier(object): def eb(failure): logger.exception("Failed to notify listener", failure) - yield defer.DeferredList( - [notify(l).addErrback(eb) for l in listeners] - ) + with PreserveLoggingContext(): + yield defer.DeferredList( + [notify(l).addErrback(eb) for l in listeners] + ) @defer.inlineCallbacks @log_function @@ -175,9 +179,10 @@ class Notifier(object): failure.getTracebackObject()) ) - yield defer.DeferredList( - [notify(l).addErrback(eb) for l in listeners] - ) + with PreserveLoggingContext(): + yield defer.DeferredList( + [notify(l).addErrback(eb) for l in listeners] + ) def get_events_for(self, user, rooms, pagination_config, timeout): """ For the given user and rooms, return any new events for them. If @@ -206,29 +211,28 @@ class Notifier(object): timeout, deferred, ) + def _timeout_listener(): + # TODO (erikj): We should probably set to_token to the current + # max rather than reusing from_token. + listener.notify( + self, + [], + listener.from_token, + listener.from_token, + ) if timeout: - reactor.callLater(timeout/1000.0, self._timeout_listener, listener) + self.clock.call_later(timeout/1000.0, _timeout_listener) self._register_with_keys(listener) yield self._check_for_updates(listener) if not timeout: - self._timeout_listener(listener) + _timeout_listener() return - def _timeout_listener(self, listener): - # TODO (erikj): We should probably set to_token to the current max - # rather than reusing from_token. - listener.notify( - self, - [], - listener.from_token, - listener.from_token, - ) - @log_function def _register_with_keys(self, listener): for room in listener.rooms: diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index c9a73b041..9ad613b8f 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from synapse.util.logcontext import LoggingContext from twisted.internet import reactor @@ -35,7 +36,11 @@ class Clock(object): return self.time() * 1000 def call_later(self, delay, callback): - return reactor.callLater(delay, callback) + current_context = LoggingContext.current_context() + def wrapped_callback(): + current_context.thread_local.current_context = current_context + callback() + return reactor.callLater(delay, wrapped_callback) def cancel_call_later(self, timer): timer.cancel() diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index 13176b05c..2f430a0f1 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -18,6 +18,9 @@ class LoggingContext(object): __slots__ = [] + def __str__(self): + return "sentinel" + def copy_to(self, record): pass @@ -102,6 +105,7 @@ class PreserveLoggingContext(object): def __enter__(self): """Captures the current logging context""" self.current_context = LoggingContext.current_context() + LoggingContext.thread_local.current_context = LoggingContext.sentinel def __exit__(self, type, value, traceback): """Restores the current logging context"""