0
0
Fork 1
mirror of https://mau.dev/maunium/synapse.git synced 2024-12-15 04:43:51 +01:00

Improve exception handling for background processes

There were a bunch of places where we fire off a process to happen in the
background, but don't have any exception handling on it - instead relying on
the unhandled error being logged when the relevent deferred gets
garbage-collected.

This is unsatisfactory for a number of reasons:
 - logging on garbage collection is best-effort and may happen some time after
   the error, if at all
 - it can be hard to figure out where the error actually happened.
 - it is logged as a scary CRITICAL error which (a) I always forget to grep for
   and (b) it's not really CRITICAL if a background process we don't care about
   fails.

So this is an attempt to add exception handling to everything we fire off into
the background.
This commit is contained in:
Richard van der Hoff 2018-04-27 11:07:40 +01:00
parent 0ced8b5b47
commit 9255a6cb17
20 changed files with 331 additions and 233 deletions

View file

@ -32,10 +32,10 @@ from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.server import HomeServer from synapse.server import HomeServer
from synapse.storage.engines import create_engine from synapse.storage.engines import create_engine
from synapse.util.httpresourcetree import create_resource_tree from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, preserve_fn from synapse.util.logcontext import LoggingContext, run_in_background
from synapse.util.manhole import manhole from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string from synapse.util.versionstring import get_version_string
from twisted.internet import reactor from twisted.internet import reactor, defer
from twisted.web.resource import NoResource from twisted.web.resource import NoResource
logger = logging.getLogger("synapse.app.appservice") logger = logging.getLogger("synapse.app.appservice")
@ -112,9 +112,14 @@ class ASReplicationHandler(ReplicationClientHandler):
if stream_name == "events": if stream_name == "events":
max_stream_id = self.store.get_room_max_stream_ordering() max_stream_id = self.store.get_room_max_stream_ordering()
preserve_fn( run_in_background(self._notify_app_services, max_stream_id)
self.appservice_handler.notify_interested_services
)(max_stream_id) @defer.inlineCallbacks
def _notify_app_services(self, room_stream_id):
try:
yield self.appservice_handler.notify_interested_services(room_stream_id)
except Exception:
logger.exception("Error notifying application services of event")
def start(config_options): def start(config_options):

View file

@ -237,6 +237,7 @@ class FederationSenderHandler(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def update_token(self, token): def update_token(self, token):
try:
self.federation_position = token self.federation_position = token
# We linearize here to ensure we don't have races updating the token # We linearize here to ensure we don't have races updating the token
@ -250,6 +251,8 @@ class FederationSenderHandler(object):
# its in memory queues # its in memory queues
self.replication_client.send_federation_ack(self.federation_position) self.replication_client.send_federation_ack(self.federation_position)
self._last_ack = self.federation_position self._last_ack = self.federation_position
except Exception:
logger.exception("Error updating federation stream position")
if __name__ == '__main__': if __name__ == '__main__':

View file

@ -144,6 +144,7 @@ class PusherReplicationHandler(ReplicationClientHandler):
@defer.inlineCallbacks @defer.inlineCallbacks
def poke_pushers(self, stream_name, token, rows): def poke_pushers(self, stream_name, token, rows):
try:
if stream_name == "pushers": if stream_name == "pushers":
for row in rows: for row in rows:
if row.deleted: if row.deleted:
@ -158,6 +159,8 @@ class PusherReplicationHandler(ReplicationClientHandler):
yield self.pusher_pool.on_new_receipts( yield self.pusher_pool.on_new_receipts(
token, token, set(row.room_id for row in rows) token, token, set(row.room_id for row in rows)
) )
except Exception:
logger.exception("Error poking pushers")
def stop_pusher(self, user_id, app_id, pushkey): def stop_pusher(self, user_id, app_id, pushkey):
key = "%s:%s" % (app_id, pushkey) key = "%s:%s" % (app_id, pushkey)

View file

@ -340,6 +340,7 @@ class SyncReplicationHandler(ReplicationClientHandler):
@defer.inlineCallbacks @defer.inlineCallbacks
def process_and_notify(self, stream_name, token, rows): def process_and_notify(self, stream_name, token, rows):
try:
if stream_name == "events": if stream_name == "events":
# We shouldn't get multiple rows per token for events stream, so # We shouldn't get multiple rows per token for events stream, so
# we don't need to optimise this for multiple rows. # we don't need to optimise this for multiple rows.
@ -389,6 +390,8 @@ class SyncReplicationHandler(ReplicationClientHandler):
self.notifier.on_new_event( self.notifier.on_new_event(
"groups_key", token, users=[row.user_id for row in rows], "groups_key", token, users=[row.user_id for row in rows],
) )
except Exception:
logger.exception("Error processing replication")
def start(config_options): def start(config_options):

View file

@ -39,10 +39,10 @@ from synapse.storage.engines import create_engine
from synapse.storage.user_directory import UserDirectoryStore from synapse.storage.user_directory import UserDirectoryStore
from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.httpresourcetree import create_resource_tree from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, preserve_fn from synapse.util.logcontext import LoggingContext, run_in_background
from synapse.util.manhole import manhole from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string from synapse.util.versionstring import get_version_string
from twisted.internet import reactor from twisted.internet import reactor, defer
from twisted.web.resource import NoResource from twisted.web.resource import NoResource
logger = logging.getLogger("synapse.app.user_dir") logger = logging.getLogger("synapse.app.user_dir")
@ -164,7 +164,14 @@ class UserDirectoryReplicationHandler(ReplicationClientHandler):
stream_name, token, rows stream_name, token, rows
) )
if stream_name == "current_state_deltas": if stream_name == "current_state_deltas":
preserve_fn(self.user_directory.notify_new_event)() run_in_background(self._notify_directory)
@defer.inlineCallbacks
def _notify_directory(self):
try:
yield self.user_directory.notify_new_event()
except Exception:
logger.exception("Error notifiying user directory of state update")
def start(config_options): def start(config_options):

View file

@ -176,6 +176,7 @@ class _TransactionController(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def _start_recoverer(self, service): def _start_recoverer(self, service):
try:
yield self.store.set_appservice_state( yield self.store.set_appservice_state(
service, service,
ApplicationServiceState.DOWN ApplicationServiceState.DOWN
@ -187,6 +188,8 @@ class _TransactionController(object):
recoverer = self.recoverer_fn(service, self.on_recovered) recoverer = self.recoverer_fn(service, self.on_recovered)
self.add_recoverers([recoverer]) self.add_recoverers([recoverer])
recoverer.recover() recoverer.recover()
except Exception:
logger.exception("Error starting AS recoverer")
@defer.inlineCallbacks @defer.inlineCallbacks
def _is_service_up(self, service): def _is_service_up(self, service):

View file

@ -146,6 +146,7 @@ class Keyring(object):
verify_requests (List[VerifyKeyRequest]): verify_requests (List[VerifyKeyRequest]):
""" """
try:
# create a deferred for each server we're going to look up the keys # create a deferred for each server we're going to look up the keys
# for; we'll resolve them once we have completed our lookups. # for; we'll resolve them once we have completed our lookups.
# These will be passed into wait_for_previous_lookups to block # These will be passed into wait_for_previous_lookups to block
@ -192,6 +193,8 @@ class Keyring(object):
verify_request.deferred.addBoth( verify_request.deferred.addBoth(
remove_deferreds, verify_request, remove_deferreds, verify_request,
) )
except Exception:
logger.exception("Error starting key lookups")
@defer.inlineCallbacks @defer.inlineCallbacks
def wait_for_previous_lookups(self, server_names, server_to_deferred): def wait_for_previous_lookups(self, server_names, server_to_deferred):

View file

@ -323,6 +323,8 @@ class TransactionQueue(object):
break break
yield self._process_presence_inner(states_map.values()) yield self._process_presence_inner(states_map.values())
except Exception:
logger.exception("Error sending presence states to servers")
finally: finally:
self._processing_pending_presence = False self._processing_pending_presence = False

View file

@ -25,7 +25,7 @@ from synapse.http.servlet import (
) )
from synapse.util.ratelimitutils import FederationRateLimiter from synapse.util.ratelimitutils import FederationRateLimiter
from synapse.util.versionstring import get_version_string from synapse.util.versionstring import get_version_string
from synapse.util.logcontext import preserve_fn from synapse.util.logcontext import run_in_background
from synapse.types import ThirdPartyInstanceID, get_domain_from_id from synapse.types import ThirdPartyInstanceID, get_domain_from_id
import functools import functools
@ -152,11 +152,18 @@ class Authenticator(object):
# alive # alive
retry_timings = yield self.store.get_destination_retry_timings(origin) retry_timings = yield self.store.get_destination_retry_timings(origin)
if retry_timings and retry_timings["retry_last_ts"]: if retry_timings and retry_timings["retry_last_ts"]:
logger.info("Marking origin %r as up", origin) run_in_background(self._reset_retry_timings, origin)
preserve_fn(self.store.set_destination_retry_timings)(origin, 0, 0)
defer.returnValue(origin) defer.returnValue(origin)
@defer.inlineCallbacks
def _reset_retry_timings(self, origin):
try:
logger.info("Marking origin %r as up", origin)
yield self.store.set_destination_retry_timings(origin, 0, 0)
except Exception:
logger.exception("Error resetting retry timings on %s", origin)
class BaseFederationServlet(object): class BaseFederationServlet(object):
REQUIRE_AUTH = True REQUIRE_AUTH = True

View file

@ -165,6 +165,7 @@ class GroupAttestionRenewer(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def _renew_attestation(group_id, user_id): def _renew_attestation(group_id, user_id):
try:
if not self.is_mine_id(group_id): if not self.is_mine_id(group_id):
destination = get_domain_from_id(group_id) destination = get_domain_from_id(group_id)
elif not self.is_mine_id(user_id): elif not self.is_mine_id(user_id):
@ -187,6 +188,9 @@ class GroupAttestionRenewer(object):
yield self.store.update_attestation_renewal( yield self.store.update_attestation_renewal(
group_id, user_id, attestation group_id, user_id, attestation
) )
except Exception:
logger.exception("Error renewing attestation of %r in %r",
user_id, group_id)
for row in rows: for row in rows:
group_id = row["group_id"] group_id = row["group_id"]

View file

@ -857,15 +857,25 @@ class EventCreationHandler(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def _notify(): def _notify():
yield run_on_reactor() yield run_on_reactor()
try:
self.notifier.on_new_room_event( self.notifier.on_new_room_event(
event, event_stream_id, max_stream_id, event, event_stream_id, max_stream_id,
extra_users=extra_users extra_users=extra_users
) )
except Exception:
logger.exception("Error notifying about new room event")
preserve_fn(_notify)() preserve_fn(_notify)()
if event.type == EventTypes.Message: if event.type == EventTypes.Message:
presence = self.hs.get_presence_handler()
# We don't want to block sending messages on any presence code. This # We don't want to block sending messages on any presence code. This
# matters as sometimes presence code can take a while. # matters as sometimes presence code can take a while.
preserve_fn(presence.bump_presence_active_time)(requester.user) run_in_background(self._bump_active_time, requester.user)
@defer.inlineCallbacks
def _bump_active_time(self, user):
try:
presence = self.hs.get_presence_handler()
yield presence.bump_presence_active_time(user)
except Exception:
logger.exception("Error bumping presence active time")

View file

@ -31,7 +31,7 @@ from synapse.storage.presence import UserPresenceState
from synapse.util.caches.descriptors import cachedInlineCallbacks from synapse.util.caches.descriptors import cachedInlineCallbacks
from synapse.util.async import Linearizer from synapse.util.async import Linearizer
from synapse.util.logcontext import preserve_fn from synapse.util.logcontext import run_in_background
from synapse.util.logutils import log_function from synapse.util.logutils import log_function
from synapse.util.metrics import Measure from synapse.util.metrics import Measure
from synapse.util.wheel_timer import WheelTimer from synapse.util.wheel_timer import WheelTimer
@ -254,6 +254,14 @@ class PresenceHandler(object):
logger.info("Finished _persist_unpersisted_changes") logger.info("Finished _persist_unpersisted_changes")
@defer.inlineCallbacks
def _update_states_and_catch_exception(self, new_states):
try:
res = yield self._update_states(new_states)
defer.returnValue(res)
except Exception:
logger.exception("Error updating presence")
@defer.inlineCallbacks @defer.inlineCallbacks
def _update_states(self, new_states): def _update_states(self, new_states):
"""Updates presence of users. Sets the appropriate timeouts. Pokes """Updates presence of users. Sets the appropriate timeouts. Pokes
@ -364,7 +372,7 @@ class PresenceHandler(object):
now=now, now=now,
) )
preserve_fn(self._update_states)(changes) run_in_background(self._update_states_and_catch_exception, changes)
except Exception: except Exception:
logger.exception("Exception in _handle_timeouts loop") logger.exception("Exception in _handle_timeouts loop")
@ -422,20 +430,23 @@ class PresenceHandler(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def _end(): def _end():
if affect_presence: try:
self.user_to_num_current_syncs[user_id] -= 1 self.user_to_num_current_syncs[user_id] -= 1
prev_state = yield self.current_state_for_user(user_id) prev_state = yield self.current_state_for_user(user_id)
yield self._update_states([prev_state.copy_and_replace( yield self._update_states([prev_state.copy_and_replace(
last_user_sync_ts=self.clock.time_msec(), last_user_sync_ts=self.clock.time_msec(),
)]) )])
except Exception:
logger.exception("Error updating presence after sync")
@contextmanager @contextmanager
def _user_syncing(): def _user_syncing():
try: try:
yield yield
finally: finally:
preserve_fn(_end)() if affect_presence:
run_in_background(_end)
defer.returnValue(_user_syncing()) defer.returnValue(_user_syncing())

View file

@ -135,6 +135,7 @@ class ReceiptsHandler(BaseHandler):
"""Given a list of receipts, works out which remote servers should be """Given a list of receipts, works out which remote servers should be
poked and pokes them. poked and pokes them.
""" """
try:
# TODO: Some of this stuff should be coallesced. # TODO: Some of this stuff should be coallesced.
for receipt in receipts: for receipt in receipts:
room_id = receipt["room_id"] room_id = receipt["room_id"]
@ -166,6 +167,8 @@ class ReceiptsHandler(BaseHandler):
}, },
key=(room_id, receipt_type, user_id), key=(room_id, receipt_type, user_id),
) )
except Exception:
logger.exception("Error pushing receipts to remote servers")
@defer.inlineCallbacks @defer.inlineCallbacks
def get_receipts_for_room(self, room_id, to_key): def get_receipts_for_room(self, room_id, to_key):

View file

@ -205,6 +205,7 @@ class TypingHandler(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def _push_remote(self, member, typing): def _push_remote(self, member, typing):
try:
users = yield self.state.get_current_user_in_room(member.room_id) users = yield self.state.get_current_user_in_room(member.room_id)
self._member_last_federation_poke[member] = self.clock.time_msec() self._member_last_federation_poke[member] = self.clock.time_msec()
@ -227,6 +228,8 @@ class TypingHandler(object):
}, },
key=member, key=member,
) )
except Exception:
logger.exception("Error pushing typing notif to remotes")
@defer.inlineCallbacks @defer.inlineCallbacks
def _recv_edu(self, origin, content): def _recv_edu(self, origin, content):

View file

@ -21,7 +21,7 @@ from synapse.handlers.presence import format_user_presence_state
from synapse.util import DeferredTimedOutError from synapse.util import DeferredTimedOutError
from synapse.util.logutils import log_function from synapse.util.logutils import log_function
from synapse.util.async import ObservableDeferred from synapse.util.async import ObservableDeferred
from synapse.util.logcontext import PreserveLoggingContext, preserve_fn from synapse.util.logcontext import PreserveLoggingContext, run_in_background
from synapse.util.metrics import Measure from synapse.util.metrics import Measure
from synapse.types import StreamToken from synapse.types import StreamToken
from synapse.visibility import filter_events_for_client from synapse.visibility import filter_events_for_client
@ -251,9 +251,7 @@ class Notifier(object):
def _on_new_room_event(self, event, room_stream_id, extra_users=[]): def _on_new_room_event(self, event, room_stream_id, extra_users=[]):
"""Notify any user streams that are interested in this room event""" """Notify any user streams that are interested in this room event"""
# poke any interested application service. # poke any interested application service.
preserve_fn(self.appservice_handler.notify_interested_services)( run_in_background(self._notify_app_services, room_stream_id)
room_stream_id
)
if self.federation_sender: if self.federation_sender:
self.federation_sender.notify_new_events(room_stream_id) self.federation_sender.notify_new_events(room_stream_id)
@ -267,6 +265,13 @@ class Notifier(object):
rooms=[event.room_id], rooms=[event.room_id],
) )
@defer.inlineCallbacks
def _notify_app_services(self, room_stream_id):
try:
yield self.appservice_handler.notify_interested_services(room_stream_id)
except Exception:
logger.exception("Error notifying application services of event")
def on_new_event(self, stream_key, new_token, users=[], rooms=[]): def on_new_event(self, stream_key, new_token, users=[], rooms=[]):
""" Used to inform listeners that something has happend event wise. """ Used to inform listeners that something has happend event wise.

View file

@ -77,10 +77,13 @@ class EmailPusher(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def on_started(self): def on_started(self):
if self.mailer is not None: if self.mailer is not None:
try:
self.throttle_params = yield self.store.get_throttle_params_by_room( self.throttle_params = yield self.store.get_throttle_params_by_room(
self.pusher_id self.pusher_id
) )
yield self._process() yield self._process()
except Exception:
logger.exception("Error starting email pusher")
def on_stop(self): def on_stop(self):
if self.timed_call: if self.timed_call:

View file

@ -94,7 +94,10 @@ class HttpPusher(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def on_started(self): def on_started(self):
try:
yield self._process() yield self._process()
except Exception:
logger.exception("Error starting http pusher")
@defer.inlineCallbacks @defer.inlineCallbacks
def on_new_notifications(self, min_stream_ordering, max_stream_ordering): def on_new_notifications(self, min_stream_ordering, max_stream_ordering):

View file

@ -18,7 +18,7 @@ from twisted.internet import defer, threads
from .media_storage import FileResponder from .media_storage import FileResponder
from synapse.config._base import Config from synapse.config._base import Config
from synapse.util.logcontext import preserve_fn from synapse.util.logcontext import run_in_background
import logging import logging
import os import os
@ -87,7 +87,12 @@ class StorageProviderWrapper(StorageProvider):
return self.backend.store_file(path, file_info) return self.backend.store_file(path, file_info)
else: else:
# TODO: Handle errors. # TODO: Handle errors.
preserve_fn(self.backend.store_file)(path, file_info) def store():
try:
return self.backend.store_file(path, file_info)
except Exception:
logger.exception("Error storing file")
run_in_background(store)
return defer.succeed(None) return defer.succeed(None)
def fetch(self, path, file_info): def fetch(self, path, file_info):

View file

@ -448,6 +448,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
"add_push_actions_to_staging", _add_push_actions_to_staging_txn "add_push_actions_to_staging", _add_push_actions_to_staging_txn
) )
@defer.inlineCallbacks
def remove_push_actions_from_staging(self, event_id): def remove_push_actions_from_staging(self, event_id):
"""Called if we failed to persist the event to ensure that stale push """Called if we failed to persist the event to ensure that stale push
actions don't build up in the DB actions don't build up in the DB
@ -456,13 +457,22 @@ class EventPushActionsWorkerStore(SQLBaseStore):
event_id (str) event_id (str)
""" """
return self._simple_delete( try:
res = yield self._simple_delete(
table="event_push_actions_staging", table="event_push_actions_staging",
keyvalues={ keyvalues={
"event_id": event_id, "event_id": event_id,
}, },
desc="remove_push_actions_from_staging", desc="remove_push_actions_from_staging",
) )
defer.returnValue(res)
except Exception:
# this method is called from an exception handler, so propagating
# another exception here really isn't helpful - there's nothing
# the caller can do about it. Just log the exception and move on.
logger.exception(
"Error removing push actions after event persistence failure",
)
@defer.inlineCallbacks @defer.inlineCallbacks
def _find_stream_orderings_for_times(self): def _find_stream_orderings_for_times(self):

View file

@ -305,7 +305,12 @@ def run_in_background(f, *args, **kwargs):
deferred returned by the funtion completes. deferred returned by the funtion completes.
Useful for wrapping functions that return a deferred which you don't yield Useful for wrapping functions that return a deferred which you don't yield
on. on (for instance because you want to pass it to deferred.gatherResults()).
Note that if you completely discard the result, you should make sure that
`f` doesn't raise any deferred exceptions, otherwise a scary-looking
CRITICAL error about an unhandled error will be logged without much
indication about where it came from.
""" """
current = LoggingContext.current_context() current = LoggingContext.current_context()
res = f(*args, **kwargs) res = f(*args, **kwargs)