forked from MirrorHub/synapse
Merge pull request #3138 from matrix-org/rav/catch_unhandled_exceptions
Improve exception handling for background processes
This commit is contained in:
commit
9c3da24561
20 changed files with 331 additions and 233 deletions
|
@ -32,10 +32,10 @@ from synapse.replication.tcp.client import ReplicationClientHandler
|
|||
from synapse.server import HomeServer
|
||||
from synapse.storage.engines import create_engine
|
||||
from synapse.util.httpresourcetree import create_resource_tree
|
||||
from synapse.util.logcontext import LoggingContext, preserve_fn
|
||||
from synapse.util.logcontext import LoggingContext, run_in_background
|
||||
from synapse.util.manhole import manhole
|
||||
from synapse.util.versionstring import get_version_string
|
||||
from twisted.internet import reactor
|
||||
from twisted.internet import reactor, defer
|
||||
from twisted.web.resource import NoResource
|
||||
|
||||
logger = logging.getLogger("synapse.app.appservice")
|
||||
|
@ -112,9 +112,14 @@ class ASReplicationHandler(ReplicationClientHandler):
|
|||
|
||||
if stream_name == "events":
|
||||
max_stream_id = self.store.get_room_max_stream_ordering()
|
||||
preserve_fn(
|
||||
self.appservice_handler.notify_interested_services
|
||||
)(max_stream_id)
|
||||
run_in_background(self._notify_app_services, max_stream_id)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _notify_app_services(self, room_stream_id):
|
||||
try:
|
||||
yield self.appservice_handler.notify_interested_services(room_stream_id)
|
||||
except Exception:
|
||||
logger.exception("Error notifying application services of event")
|
||||
|
||||
|
||||
def start(config_options):
|
||||
|
|
|
@ -237,6 +237,7 @@ class FederationSenderHandler(object):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def update_token(self, token):
|
||||
try:
|
||||
self.federation_position = token
|
||||
|
||||
# We linearize here to ensure we don't have races updating the token
|
||||
|
@ -250,6 +251,8 @@ class FederationSenderHandler(object):
|
|||
# its in memory queues
|
||||
self.replication_client.send_federation_ack(self.federation_position)
|
||||
self._last_ack = self.federation_position
|
||||
except Exception:
|
||||
logger.exception("Error updating federation stream position")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
|
|
@ -144,6 +144,7 @@ class PusherReplicationHandler(ReplicationClientHandler):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def poke_pushers(self, stream_name, token, rows):
|
||||
try:
|
||||
if stream_name == "pushers":
|
||||
for row in rows:
|
||||
if row.deleted:
|
||||
|
@ -158,6 +159,8 @@ class PusherReplicationHandler(ReplicationClientHandler):
|
|||
yield self.pusher_pool.on_new_receipts(
|
||||
token, token, set(row.room_id for row in rows)
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("Error poking pushers")
|
||||
|
||||
def stop_pusher(self, user_id, app_id, pushkey):
|
||||
key = "%s:%s" % (app_id, pushkey)
|
||||
|
|
|
@ -340,6 +340,7 @@ class SyncReplicationHandler(ReplicationClientHandler):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def process_and_notify(self, stream_name, token, rows):
|
||||
try:
|
||||
if stream_name == "events":
|
||||
# We shouldn't get multiple rows per token for events stream, so
|
||||
# we don't need to optimise this for multiple rows.
|
||||
|
@ -389,6 +390,8 @@ class SyncReplicationHandler(ReplicationClientHandler):
|
|||
self.notifier.on_new_event(
|
||||
"groups_key", token, users=[row.user_id for row in rows],
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("Error processing replication")
|
||||
|
||||
|
||||
def start(config_options):
|
||||
|
|
|
@ -39,10 +39,10 @@ from synapse.storage.engines import create_engine
|
|||
from synapse.storage.user_directory import UserDirectoryStore
|
||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||
from synapse.util.httpresourcetree import create_resource_tree
|
||||
from synapse.util.logcontext import LoggingContext, preserve_fn
|
||||
from synapse.util.logcontext import LoggingContext, run_in_background
|
||||
from synapse.util.manhole import manhole
|
||||
from synapse.util.versionstring import get_version_string
|
||||
from twisted.internet import reactor
|
||||
from twisted.internet import reactor, defer
|
||||
from twisted.web.resource import NoResource
|
||||
|
||||
logger = logging.getLogger("synapse.app.user_dir")
|
||||
|
@ -164,7 +164,14 @@ class UserDirectoryReplicationHandler(ReplicationClientHandler):
|
|||
stream_name, token, rows
|
||||
)
|
||||
if stream_name == "current_state_deltas":
|
||||
preserve_fn(self.user_directory.notify_new_event)()
|
||||
run_in_background(self._notify_directory)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _notify_directory(self):
|
||||
try:
|
||||
yield self.user_directory.notify_new_event()
|
||||
except Exception:
|
||||
logger.exception("Error notifiying user directory of state update")
|
||||
|
||||
|
||||
def start(config_options):
|
||||
|
|
|
@ -176,6 +176,7 @@ class _TransactionController(object):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def _start_recoverer(self, service):
|
||||
try:
|
||||
yield self.store.set_appservice_state(
|
||||
service,
|
||||
ApplicationServiceState.DOWN
|
||||
|
@ -187,6 +188,8 @@ class _TransactionController(object):
|
|||
recoverer = self.recoverer_fn(service, self.on_recovered)
|
||||
self.add_recoverers([recoverer])
|
||||
recoverer.recover()
|
||||
except Exception:
|
||||
logger.exception("Error starting AS recoverer")
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _is_service_up(self, service):
|
||||
|
|
|
@ -146,6 +146,7 @@ class Keyring(object):
|
|||
verify_requests (List[VerifyKeyRequest]):
|
||||
"""
|
||||
|
||||
try:
|
||||
# create a deferred for each server we're going to look up the keys
|
||||
# for; we'll resolve them once we have completed our lookups.
|
||||
# These will be passed into wait_for_previous_lookups to block
|
||||
|
@ -192,6 +193,8 @@ class Keyring(object):
|
|||
verify_request.deferred.addBoth(
|
||||
remove_deferreds, verify_request,
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("Error starting key lookups")
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def wait_for_previous_lookups(self, server_names, server_to_deferred):
|
||||
|
|
|
@ -323,6 +323,8 @@ class TransactionQueue(object):
|
|||
break
|
||||
|
||||
yield self._process_presence_inner(states_map.values())
|
||||
except Exception:
|
||||
logger.exception("Error sending presence states to servers")
|
||||
finally:
|
||||
self._processing_pending_presence = False
|
||||
|
||||
|
|
|
@ -25,7 +25,7 @@ from synapse.http.servlet import (
|
|||
)
|
||||
from synapse.util.ratelimitutils import FederationRateLimiter
|
||||
from synapse.util.versionstring import get_version_string
|
||||
from synapse.util.logcontext import preserve_fn
|
||||
from synapse.util.logcontext import run_in_background
|
||||
from synapse.types import ThirdPartyInstanceID, get_domain_from_id
|
||||
|
||||
import functools
|
||||
|
@ -152,11 +152,18 @@ class Authenticator(object):
|
|||
# alive
|
||||
retry_timings = yield self.store.get_destination_retry_timings(origin)
|
||||
if retry_timings and retry_timings["retry_last_ts"]:
|
||||
logger.info("Marking origin %r as up", origin)
|
||||
preserve_fn(self.store.set_destination_retry_timings)(origin, 0, 0)
|
||||
run_in_background(self._reset_retry_timings, origin)
|
||||
|
||||
defer.returnValue(origin)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _reset_retry_timings(self, origin):
|
||||
try:
|
||||
logger.info("Marking origin %r as up", origin)
|
||||
yield self.store.set_destination_retry_timings(origin, 0, 0)
|
||||
except Exception:
|
||||
logger.exception("Error resetting retry timings on %s", origin)
|
||||
|
||||
|
||||
class BaseFederationServlet(object):
|
||||
REQUIRE_AUTH = True
|
||||
|
|
|
@ -165,6 +165,7 @@ class GroupAttestionRenewer(object):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def _renew_attestation(group_id, user_id):
|
||||
try:
|
||||
if not self.is_mine_id(group_id):
|
||||
destination = get_domain_from_id(group_id)
|
||||
elif not self.is_mine_id(user_id):
|
||||
|
@ -187,6 +188,9 @@ class GroupAttestionRenewer(object):
|
|||
yield self.store.update_attestation_renewal(
|
||||
group_id, user_id, attestation
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("Error renewing attestation of %r in %r",
|
||||
user_id, group_id)
|
||||
|
||||
for row in rows:
|
||||
group_id = row["group_id"]
|
||||
|
|
|
@ -857,15 +857,25 @@ class EventCreationHandler(object):
|
|||
@defer.inlineCallbacks
|
||||
def _notify():
|
||||
yield run_on_reactor()
|
||||
try:
|
||||
self.notifier.on_new_room_event(
|
||||
event, event_stream_id, max_stream_id,
|
||||
extra_users=extra_users
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("Error notifying about new room event")
|
||||
|
||||
preserve_fn(_notify)()
|
||||
|
||||
if event.type == EventTypes.Message:
|
||||
presence = self.hs.get_presence_handler()
|
||||
# We don't want to block sending messages on any presence code. This
|
||||
# matters as sometimes presence code can take a while.
|
||||
preserve_fn(presence.bump_presence_active_time)(requester.user)
|
||||
run_in_background(self._bump_active_time, requester.user)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _bump_active_time(self, user):
|
||||
try:
|
||||
presence = self.hs.get_presence_handler()
|
||||
yield presence.bump_presence_active_time(user)
|
||||
except Exception:
|
||||
logger.exception("Error bumping presence active time")
|
||||
|
|
|
@ -31,7 +31,7 @@ from synapse.storage.presence import UserPresenceState
|
|||
|
||||
from synapse.util.caches.descriptors import cachedInlineCallbacks
|
||||
from synapse.util.async import Linearizer
|
||||
from synapse.util.logcontext import preserve_fn
|
||||
from synapse.util.logcontext import run_in_background
|
||||
from synapse.util.logutils import log_function
|
||||
from synapse.util.metrics import Measure
|
||||
from synapse.util.wheel_timer import WheelTimer
|
||||
|
@ -254,6 +254,14 @@ class PresenceHandler(object):
|
|||
|
||||
logger.info("Finished _persist_unpersisted_changes")
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _update_states_and_catch_exception(self, new_states):
|
||||
try:
|
||||
res = yield self._update_states(new_states)
|
||||
defer.returnValue(res)
|
||||
except Exception:
|
||||
logger.exception("Error updating presence")
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _update_states(self, new_states):
|
||||
"""Updates presence of users. Sets the appropriate timeouts. Pokes
|
||||
|
@ -364,7 +372,7 @@ class PresenceHandler(object):
|
|||
now=now,
|
||||
)
|
||||
|
||||
preserve_fn(self._update_states)(changes)
|
||||
run_in_background(self._update_states_and_catch_exception, changes)
|
||||
except Exception:
|
||||
logger.exception("Exception in _handle_timeouts loop")
|
||||
|
||||
|
@ -422,20 +430,23 @@ class PresenceHandler(object):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def _end():
|
||||
if affect_presence:
|
||||
try:
|
||||
self.user_to_num_current_syncs[user_id] -= 1
|
||||
|
||||
prev_state = yield self.current_state_for_user(user_id)
|
||||
yield self._update_states([prev_state.copy_and_replace(
|
||||
last_user_sync_ts=self.clock.time_msec(),
|
||||
)])
|
||||
except Exception:
|
||||
logger.exception("Error updating presence after sync")
|
||||
|
||||
@contextmanager
|
||||
def _user_syncing():
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
preserve_fn(_end)()
|
||||
if affect_presence:
|
||||
run_in_background(_end)
|
||||
|
||||
defer.returnValue(_user_syncing())
|
||||
|
||||
|
|
|
@ -135,6 +135,7 @@ class ReceiptsHandler(BaseHandler):
|
|||
"""Given a list of receipts, works out which remote servers should be
|
||||
poked and pokes them.
|
||||
"""
|
||||
try:
|
||||
# TODO: Some of this stuff should be coallesced.
|
||||
for receipt in receipts:
|
||||
room_id = receipt["room_id"]
|
||||
|
@ -166,6 +167,8 @@ class ReceiptsHandler(BaseHandler):
|
|||
},
|
||||
key=(room_id, receipt_type, user_id),
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("Error pushing receipts to remote servers")
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_receipts_for_room(self, room_id, to_key):
|
||||
|
|
|
@ -205,6 +205,7 @@ class TypingHandler(object):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def _push_remote(self, member, typing):
|
||||
try:
|
||||
users = yield self.state.get_current_user_in_room(member.room_id)
|
||||
self._member_last_federation_poke[member] = self.clock.time_msec()
|
||||
|
||||
|
@ -227,6 +228,8 @@ class TypingHandler(object):
|
|||
},
|
||||
key=member,
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("Error pushing typing notif to remotes")
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _recv_edu(self, origin, content):
|
||||
|
|
|
@ -21,7 +21,7 @@ from synapse.handlers.presence import format_user_presence_state
|
|||
from synapse.util import DeferredTimedOutError
|
||||
from synapse.util.logutils import log_function
|
||||
from synapse.util.async import ObservableDeferred
|
||||
from synapse.util.logcontext import PreserveLoggingContext, preserve_fn
|
||||
from synapse.util.logcontext import PreserveLoggingContext, run_in_background
|
||||
from synapse.util.metrics import Measure
|
||||
from synapse.types import StreamToken
|
||||
from synapse.visibility import filter_events_for_client
|
||||
|
@ -251,9 +251,7 @@ class Notifier(object):
|
|||
def _on_new_room_event(self, event, room_stream_id, extra_users=[]):
|
||||
"""Notify any user streams that are interested in this room event"""
|
||||
# poke any interested application service.
|
||||
preserve_fn(self.appservice_handler.notify_interested_services)(
|
||||
room_stream_id
|
||||
)
|
||||
run_in_background(self._notify_app_services, room_stream_id)
|
||||
|
||||
if self.federation_sender:
|
||||
self.federation_sender.notify_new_events(room_stream_id)
|
||||
|
@ -267,6 +265,13 @@ class Notifier(object):
|
|||
rooms=[event.room_id],
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _notify_app_services(self, room_stream_id):
|
||||
try:
|
||||
yield self.appservice_handler.notify_interested_services(room_stream_id)
|
||||
except Exception:
|
||||
logger.exception("Error notifying application services of event")
|
||||
|
||||
def on_new_event(self, stream_key, new_token, users=[], rooms=[]):
|
||||
""" Used to inform listeners that something has happend event wise.
|
||||
|
||||
|
|
|
@ -77,10 +77,13 @@ class EmailPusher(object):
|
|||
@defer.inlineCallbacks
|
||||
def on_started(self):
|
||||
if self.mailer is not None:
|
||||
try:
|
||||
self.throttle_params = yield self.store.get_throttle_params_by_room(
|
||||
self.pusher_id
|
||||
)
|
||||
yield self._process()
|
||||
except Exception:
|
||||
logger.exception("Error starting email pusher")
|
||||
|
||||
def on_stop(self):
|
||||
if self.timed_call:
|
||||
|
|
|
@ -94,7 +94,10 @@ class HttpPusher(object):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def on_started(self):
|
||||
try:
|
||||
yield self._process()
|
||||
except Exception:
|
||||
logger.exception("Error starting http pusher")
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
|
||||
|
|
|
@ -18,7 +18,7 @@ from twisted.internet import defer, threads
|
|||
from .media_storage import FileResponder
|
||||
|
||||
from synapse.config._base import Config
|
||||
from synapse.util.logcontext import preserve_fn
|
||||
from synapse.util.logcontext import run_in_background
|
||||
|
||||
import logging
|
||||
import os
|
||||
|
@ -87,7 +87,12 @@ class StorageProviderWrapper(StorageProvider):
|
|||
return self.backend.store_file(path, file_info)
|
||||
else:
|
||||
# TODO: Handle errors.
|
||||
preserve_fn(self.backend.store_file)(path, file_info)
|
||||
def store():
|
||||
try:
|
||||
return self.backend.store_file(path, file_info)
|
||||
except Exception:
|
||||
logger.exception("Error storing file")
|
||||
run_in_background(store)
|
||||
return defer.succeed(None)
|
||||
|
||||
def fetch(self, path, file_info):
|
||||
|
|
|
@ -448,6 +448,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
|
|||
"add_push_actions_to_staging", _add_push_actions_to_staging_txn
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def remove_push_actions_from_staging(self, event_id):
|
||||
"""Called if we failed to persist the event to ensure that stale push
|
||||
actions don't build up in the DB
|
||||
|
@ -456,13 +457,22 @@ class EventPushActionsWorkerStore(SQLBaseStore):
|
|||
event_id (str)
|
||||
"""
|
||||
|
||||
return self._simple_delete(
|
||||
try:
|
||||
res = yield self._simple_delete(
|
||||
table="event_push_actions_staging",
|
||||
keyvalues={
|
||||
"event_id": event_id,
|
||||
},
|
||||
desc="remove_push_actions_from_staging",
|
||||
)
|
||||
defer.returnValue(res)
|
||||
except Exception:
|
||||
# this method is called from an exception handler, so propagating
|
||||
# another exception here really isn't helpful - there's nothing
|
||||
# the caller can do about it. Just log the exception and move on.
|
||||
logger.exception(
|
||||
"Error removing push actions after event persistence failure",
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _find_stream_orderings_for_times(self):
|
||||
|
|
|
@ -305,7 +305,12 @@ def run_in_background(f, *args, **kwargs):
|
|||
deferred returned by the funtion completes.
|
||||
|
||||
Useful for wrapping functions that return a deferred which you don't yield
|
||||
on.
|
||||
on (for instance because you want to pass it to deferred.gatherResults()).
|
||||
|
||||
Note that if you completely discard the result, you should make sure that
|
||||
`f` doesn't raise any deferred exceptions, otherwise a scary-looking
|
||||
CRITICAL error about an unhandled error will be logged without much
|
||||
indication about where it came from.
|
||||
"""
|
||||
current = LoggingContext.current_context()
|
||||
res = f(*args, **kwargs)
|
||||
|
|
Loading…
Reference in a new issue