Merge remote-tracking branch 'origin/develop' into rav/use_run_in_background

This commit is contained in:
Richard van der Hoff 2018-04-27 14:31:23 +01:00
commit fc149b4eeb
28 changed files with 412 additions and 257 deletions

View file

@ -1,9 +1,19 @@
Changes in synapse v0.28.0 (2018-04-26)
===========================================
Bug Fixes:
* Fix quarantine media admin API and search reindex (PR #3130)
* Fix media admin APIs (PR #3134)
Changes in synapse v0.28.0-rc1 (2018-04-24) Changes in synapse v0.28.0-rc1 (2018-04-24)
=========================================== ===========================================
Minor performance improvement to federation sending and bug fixes. Minor performance improvement to federation sending and bug fixes.
(Note: This release does not include state resolutions discussed in matrix live) (Note: This release does not include the delta state resolution implementation discussed in matrix live)
Features: Features:
@ -16,8 +26,7 @@ Changes:
* move handling of auto_join_rooms to RegisterHandler (PR #2996) Thanks to @krombel! * move handling of auto_join_rooms to RegisterHandler (PR #2996) Thanks to @krombel!
* Improve handling of SRV records for federation connections (PR #3016) Thanks to @silkeh! * Improve handling of SRV records for federation connections (PR #3016) Thanks to @silkeh!
* Document the behaviour of ResponseCache (PR #3059) * Document the behaviour of ResponseCache (PR #3059)
* Preparation for py3 (PR #3061, #3073, #3074, #3075, #3103, #3104, #3106, #3107 * Preparation for py3 (PR #3061, #3073, #3074, #3075, #3103, #3104, #3106, #3107, #3109, #3110) Thanks to @NotAFile!
#3109, #3110) Thanks to @NotAFile!
* update prometheus dashboard to use new metric names (PR #3069) Thanks to @krombel! * update prometheus dashboard to use new metric names (PR #3069) Thanks to @krombel!
* use python3-compatible prints (PR #3074) Thanks to @NotAFile! * use python3-compatible prints (PR #3074) Thanks to @NotAFile!
* Send federation events concurrently (PR #3078) * Send federation events concurrently (PR #3078)

View file

@ -1,5 +1,7 @@
#! /bin/bash #! /bin/bash
set -eux
cd "`dirname $0`/.." cd "`dirname $0`/.."
TOX_DIR=$WORKSPACE/.tox TOX_DIR=$WORKSPACE/.tox
@ -14,7 +16,20 @@ fi
tox -e py27 --notest -v tox -e py27 --notest -v
TOX_BIN=$TOX_DIR/py27/bin TOX_BIN=$TOX_DIR/py27/bin
$TOX_BIN/pip install setuptools
# cryptography 2.2 requires setuptools >= 18.5.
#
# older versions of virtualenv (?) give us a virtualenv with the same version
# of setuptools as is installed on the system python (and tox runs virtualenv
# under python3, so we get the version of setuptools that is installed on that).
#
# anyway, make sure that we have a recent enough setuptools.
$TOX_BIN/pip install 'setuptools>=18.5'
# we also need a semi-recent version of pip, because old ones fail to install
# the "enum34" dependency of cryptography.
$TOX_BIN/pip install 'pip>=10'
{ python synapse/python_dependencies.py { python synapse/python_dependencies.py
echo lxml psycopg2 echo lxml psycopg2
} | xargs $TOX_BIN/pip install } | xargs $TOX_BIN/pip install

View file

@ -16,4 +16,4 @@
""" This is a reference implementation of a Matrix home server. """ This is a reference implementation of a Matrix home server.
""" """
__version__ = "0.28.0-rc1" __version__ = "0.28.0"

View file

@ -32,10 +32,10 @@ from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.server import HomeServer from synapse.server import HomeServer
from synapse.storage.engines import create_engine from synapse.storage.engines import create_engine
from synapse.util.httpresourcetree import create_resource_tree from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, preserve_fn from synapse.util.logcontext import LoggingContext, run_in_background
from synapse.util.manhole import manhole from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string from synapse.util.versionstring import get_version_string
from twisted.internet import reactor from twisted.internet import reactor, defer
from twisted.web.resource import NoResource from twisted.web.resource import NoResource
logger = logging.getLogger("synapse.app.appservice") logger = logging.getLogger("synapse.app.appservice")
@ -112,9 +112,14 @@ class ASReplicationHandler(ReplicationClientHandler):
if stream_name == "events": if stream_name == "events":
max_stream_id = self.store.get_room_max_stream_ordering() max_stream_id = self.store.get_room_max_stream_ordering()
preserve_fn( run_in_background(self._notify_app_services, max_stream_id)
self.appservice_handler.notify_interested_services
)(max_stream_id) @defer.inlineCallbacks
def _notify_app_services(self, room_stream_id):
try:
yield self.appservice_handler.notify_interested_services(room_stream_id)
except Exception:
logger.exception("Error notifying application services of event")
def start(config_options): def start(config_options):

View file

@ -237,19 +237,22 @@ class FederationSenderHandler(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def update_token(self, token): def update_token(self, token):
self.federation_position = token try:
self.federation_position = token
# We linearize here to ensure we don't have races updating the token # We linearize here to ensure we don't have races updating the token
with (yield self._fed_position_linearizer.queue(None)): with (yield self._fed_position_linearizer.queue(None)):
if self._last_ack < self.federation_position: if self._last_ack < self.federation_position:
yield self.store.update_federation_out_pos( yield self.store.update_federation_out_pos(
"federation", self.federation_position "federation", self.federation_position
) )
# We ACK this token over replication so that the master can drop # We ACK this token over replication so that the master can drop
# its in memory queues # its in memory queues
self.replication_client.send_federation_ack(self.federation_position) self.replication_client.send_federation_ack(self.federation_position)
self._last_ack = self.federation_position self._last_ack = self.federation_position
except Exception:
logger.exception("Error updating federation stream position")
if __name__ == '__main__': if __name__ == '__main__':

View file

@ -144,20 +144,23 @@ class PusherReplicationHandler(ReplicationClientHandler):
@defer.inlineCallbacks @defer.inlineCallbacks
def poke_pushers(self, stream_name, token, rows): def poke_pushers(self, stream_name, token, rows):
if stream_name == "pushers": try:
for row in rows: if stream_name == "pushers":
if row.deleted: for row in rows:
yield self.stop_pusher(row.user_id, row.app_id, row.pushkey) if row.deleted:
else: yield self.stop_pusher(row.user_id, row.app_id, row.pushkey)
yield self.start_pusher(row.user_id, row.app_id, row.pushkey) else:
elif stream_name == "events": yield self.start_pusher(row.user_id, row.app_id, row.pushkey)
yield self.pusher_pool.on_new_notifications( elif stream_name == "events":
token, token, yield self.pusher_pool.on_new_notifications(
) token, token,
elif stream_name == "receipts": )
yield self.pusher_pool.on_new_receipts( elif stream_name == "receipts":
token, token, set(row.room_id for row in rows) yield self.pusher_pool.on_new_receipts(
) token, token, set(row.room_id for row in rows)
)
except Exception:
logger.exception("Error poking pushers")
def stop_pusher(self, user_id, app_id, pushkey): def stop_pusher(self, user_id, app_id, pushkey):
key = "%s:%s" % (app_id, pushkey) key = "%s:%s" % (app_id, pushkey)

View file

@ -339,55 +339,58 @@ class SyncReplicationHandler(ReplicationClientHandler):
@defer.inlineCallbacks @defer.inlineCallbacks
def process_and_notify(self, stream_name, token, rows): def process_and_notify(self, stream_name, token, rows):
if stream_name == "events": try:
# We shouldn't get multiple rows per token for events stream, so if stream_name == "events":
# we don't need to optimise this for multiple rows. # We shouldn't get multiple rows per token for events stream, so
for row in rows: # we don't need to optimise this for multiple rows.
event = yield self.store.get_event(row.event_id) for row in rows:
extra_users = () event = yield self.store.get_event(row.event_id)
if event.type == EventTypes.Member: extra_users = ()
extra_users = (event.state_key,) if event.type == EventTypes.Member:
max_token = self.store.get_room_max_stream_ordering() extra_users = (event.state_key,)
self.notifier.on_new_room_event( max_token = self.store.get_room_max_stream_ordering()
event, token, max_token, extra_users self.notifier.on_new_room_event(
) event, token, max_token, extra_users
elif stream_name == "push_rules": )
self.notifier.on_new_event( elif stream_name == "push_rules":
"push_rules_key", token, users=[row.user_id for row in rows],
)
elif stream_name in ("account_data", "tag_account_data",):
self.notifier.on_new_event(
"account_data_key", token, users=[row.user_id for row in rows],
)
elif stream_name == "receipts":
self.notifier.on_new_event(
"receipt_key", token, rooms=[row.room_id for row in rows],
)
elif stream_name == "typing":
self.typing_handler.process_replication_rows(token, rows)
self.notifier.on_new_event(
"typing_key", token, rooms=[row.room_id for row in rows],
)
elif stream_name == "to_device":
entities = [row.entity for row in rows if row.entity.startswith("@")]
if entities:
self.notifier.on_new_event( self.notifier.on_new_event(
"to_device_key", token, users=entities, "push_rules_key", token, users=[row.user_id for row in rows],
) )
elif stream_name == "device_lists": elif stream_name in ("account_data", "tag_account_data",):
all_room_ids = set() self.notifier.on_new_event(
for row in rows: "account_data_key", token, users=[row.user_id for row in rows],
room_ids = yield self.store.get_rooms_for_user(row.user_id) )
all_room_ids.update(room_ids) elif stream_name == "receipts":
self.notifier.on_new_event( self.notifier.on_new_event(
"device_list_key", token, rooms=all_room_ids, "receipt_key", token, rooms=[row.room_id for row in rows],
) )
elif stream_name == "presence": elif stream_name == "typing":
yield self.presence_handler.process_replication_rows(token, rows) self.typing_handler.process_replication_rows(token, rows)
elif stream_name == "receipts": self.notifier.on_new_event(
self.notifier.on_new_event( "typing_key", token, rooms=[row.room_id for row in rows],
"groups_key", token, users=[row.user_id for row in rows], )
) elif stream_name == "to_device":
entities = [row.entity for row in rows if row.entity.startswith("@")]
if entities:
self.notifier.on_new_event(
"to_device_key", token, users=entities,
)
elif stream_name == "device_lists":
all_room_ids = set()
for row in rows:
room_ids = yield self.store.get_rooms_for_user(row.user_id)
all_room_ids.update(room_ids)
self.notifier.on_new_event(
"device_list_key", token, rooms=all_room_ids,
)
elif stream_name == "presence":
yield self.presence_handler.process_replication_rows(token, rows)
elif stream_name == "receipts":
self.notifier.on_new_event(
"groups_key", token, users=[row.user_id for row in rows],
)
except Exception:
logger.exception("Error processing replication")
def start(config_options): def start(config_options):

View file

@ -39,10 +39,10 @@ from synapse.storage.engines import create_engine
from synapse.storage.user_directory import UserDirectoryStore from synapse.storage.user_directory import UserDirectoryStore
from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.httpresourcetree import create_resource_tree from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, preserve_fn from synapse.util.logcontext import LoggingContext, run_in_background
from synapse.util.manhole import manhole from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string from synapse.util.versionstring import get_version_string
from twisted.internet import reactor from twisted.internet import reactor, defer
from twisted.web.resource import NoResource from twisted.web.resource import NoResource
logger = logging.getLogger("synapse.app.user_dir") logger = logging.getLogger("synapse.app.user_dir")
@ -164,7 +164,14 @@ class UserDirectoryReplicationHandler(ReplicationClientHandler):
stream_name, token, rows stream_name, token, rows
) )
if stream_name == "current_state_deltas": if stream_name == "current_state_deltas":
preserve_fn(self.user_directory.notify_new_event)() run_in_background(self._notify_directory)
@defer.inlineCallbacks
def _notify_directory(self):
try:
yield self.user_directory.notify_new_event()
except Exception:
logger.exception("Error notifiying user directory of state update")
def start(config_options): def start(config_options):

View file

@ -176,17 +176,20 @@ class _TransactionController(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def _start_recoverer(self, service): def _start_recoverer(self, service):
yield self.store.set_appservice_state( try:
service, yield self.store.set_appservice_state(
ApplicationServiceState.DOWN service,
) ApplicationServiceState.DOWN
logger.info( )
"Application service falling behind. Starting recoverer. AS ID %s", logger.info(
service.id "Application service falling behind. Starting recoverer. AS ID %s",
) service.id
recoverer = self.recoverer_fn(service, self.on_recovered) )
self.add_recoverers([recoverer]) recoverer = self.recoverer_fn(service, self.on_recovered)
recoverer.recover() self.add_recoverers([recoverer])
recoverer.recover()
except Exception:
logger.exception("Error starting AS recoverer")
@defer.inlineCallbacks @defer.inlineCallbacks
def _is_service_up(self, service): def _is_service_up(self, service):

View file

@ -147,53 +147,56 @@ class Keyring(object):
verify_requests (List[VerifyKeyRequest]): verify_requests (List[VerifyKeyRequest]):
""" """
# create a deferred for each server we're going to look up the keys try:
# for; we'll resolve them once we have completed our lookups. # create a deferred for each server we're going to look up the keys
# These will be passed into wait_for_previous_lookups to block # for; we'll resolve them once we have completed our lookups.
# any other lookups until we have finished. # These will be passed into wait_for_previous_lookups to block
# The deferreds are called with no logcontext. # any other lookups until we have finished.
server_to_deferred = { # The deferreds are called with no logcontext.
rq.server_name: defer.Deferred() server_to_deferred = {
for rq in verify_requests rq.server_name: defer.Deferred()
} for rq in verify_requests
}
# We want to wait for any previous lookups to complete before # We want to wait for any previous lookups to complete before
# proceeding. # proceeding.
yield self.wait_for_previous_lookups( yield self.wait_for_previous_lookups(
[rq.server_name for rq in verify_requests], [rq.server_name for rq in verify_requests],
server_to_deferred, server_to_deferred,
)
# Actually start fetching keys.
self._get_server_verify_keys(verify_requests)
# When we've finished fetching all the keys for a given server_name,
# resolve the deferred passed to `wait_for_previous_lookups` so that
# any lookups waiting will proceed.
#
# map from server name to a set of request ids
server_to_request_ids = {}
for verify_request in verify_requests:
server_name = verify_request.server_name
request_id = id(verify_request)
server_to_request_ids.setdefault(server_name, set()).add(request_id)
def remove_deferreds(res, verify_request):
server_name = verify_request.server_name
request_id = id(verify_request)
server_to_request_ids[server_name].discard(request_id)
if not server_to_request_ids[server_name]:
d = server_to_deferred.pop(server_name, None)
if d:
d.callback(None)
return res
for verify_request in verify_requests:
verify_request.deferred.addBoth(
remove_deferreds, verify_request,
) )
# Actually start fetching keys.
self._get_server_verify_keys(verify_requests)
# When we've finished fetching all the keys for a given server_name,
# resolve the deferred passed to `wait_for_previous_lookups` so that
# any lookups waiting will proceed.
#
# map from server name to a set of request ids
server_to_request_ids = {}
for verify_request in verify_requests:
server_name = verify_request.server_name
request_id = id(verify_request)
server_to_request_ids.setdefault(server_name, set()).add(request_id)
def remove_deferreds(res, verify_request):
server_name = verify_request.server_name
request_id = id(verify_request)
server_to_request_ids[server_name].discard(request_id)
if not server_to_request_ids[server_name]:
d = server_to_deferred.pop(server_name, None)
if d:
d.callback(None)
return res
for verify_request in verify_requests:
verify_request.deferred.addBoth(
remove_deferreds, verify_request,
)
except Exception:
logger.exception("Error starting key lookups")
@defer.inlineCallbacks @defer.inlineCallbacks
def wait_for_previous_lookups(self, server_names, server_to_deferred): def wait_for_previous_lookups(self, server_names, server_to_deferred):
"""Waits for any previous key lookups for the given servers to finish. """Waits for any previous key lookups for the given servers to finish.

View file

@ -323,6 +323,8 @@ class TransactionQueue(object):
break break
yield self._process_presence_inner(states_map.values()) yield self._process_presence_inner(states_map.values())
except Exception:
logger.exception("Error sending presence states to servers")
finally: finally:
self._processing_pending_presence = False self._processing_pending_presence = False

View file

@ -25,7 +25,7 @@ from synapse.http.servlet import (
) )
from synapse.util.ratelimitutils import FederationRateLimiter from synapse.util.ratelimitutils import FederationRateLimiter
from synapse.util.versionstring import get_version_string from synapse.util.versionstring import get_version_string
from synapse.util.logcontext import preserve_fn from synapse.util.logcontext import run_in_background
from synapse.types import ThirdPartyInstanceID, get_domain_from_id from synapse.types import ThirdPartyInstanceID, get_domain_from_id
import functools import functools
@ -152,11 +152,18 @@ class Authenticator(object):
# alive # alive
retry_timings = yield self.store.get_destination_retry_timings(origin) retry_timings = yield self.store.get_destination_retry_timings(origin)
if retry_timings and retry_timings["retry_last_ts"]: if retry_timings and retry_timings["retry_last_ts"]:
logger.info("Marking origin %r as up", origin) run_in_background(self._reset_retry_timings, origin)
preserve_fn(self.store.set_destination_retry_timings)(origin, 0, 0)
defer.returnValue(origin) defer.returnValue(origin)
@defer.inlineCallbacks
def _reset_retry_timings(self, origin):
try:
logger.info("Marking origin %r as up", origin)
yield self.store.set_destination_retry_timings(origin, 0, 0)
except Exception:
logger.exception("Error resetting retry timings on %s", origin)
class BaseFederationServlet(object): class BaseFederationServlet(object):
REQUIRE_AUTH = True REQUIRE_AUTH = True

View file

@ -165,28 +165,32 @@ class GroupAttestionRenewer(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def _renew_attestation(group_id, user_id): def _renew_attestation(group_id, user_id):
if not self.is_mine_id(group_id): try:
destination = get_domain_from_id(group_id) if not self.is_mine_id(group_id):
elif not self.is_mine_id(user_id): destination = get_domain_from_id(group_id)
destination = get_domain_from_id(user_id) elif not self.is_mine_id(user_id):
else: destination = get_domain_from_id(user_id)
logger.warn( else:
"Incorrectly trying to do attestations for user: %r in %r", logger.warn(
user_id, group_id, "Incorrectly trying to do attestations for user: %r in %r",
user_id, group_id,
)
yield self.store.remove_attestation_renewal(group_id, user_id)
return
attestation = self.attestations.create_attestation(group_id, user_id)
yield self.transport_client.renew_group_attestation(
destination, group_id, user_id,
content={"attestation": attestation},
) )
yield self.store.remove_attestation_renewal(group_id, user_id)
return
attestation = self.attestations.create_attestation(group_id, user_id) yield self.store.update_attestation_renewal(
group_id, user_id, attestation
yield self.transport_client.renew_group_attestation( )
destination, group_id, user_id, except Exception:
content={"attestation": attestation}, logger.exception("Error renewing attestation of %r in %r",
) user_id, group_id)
yield self.store.update_attestation_renewal(
group_id, user_id, attestation
)
for row in rows: for row in rows:
group_id = row["group_id"] group_id = row["group_id"]

View file

@ -141,7 +141,7 @@ class E2eKeysHandler(object):
yield make_deferred_yieldable(defer.gatherResults([ yield make_deferred_yieldable(defer.gatherResults([
run_in_background(do_remote_query, destination) run_in_background(do_remote_query, destination)
for destination in remote_queries_not_in_cache for destination in remote_queries_not_in_cache
])) ], consumeErrors=True))
defer.returnValue({ defer.returnValue({
"device_keys": results, "failures": failures, "device_keys": results, "failures": failures,
@ -244,7 +244,7 @@ class E2eKeysHandler(object):
yield make_deferred_yieldable(defer.gatherResults([ yield make_deferred_yieldable(defer.gatherResults([
run_in_background(claim_client_keys, destination) run_in_background(claim_client_keys, destination)
for destination in remote_queries for destination in remote_queries
])) ], consumeErrors=True))
logger.info( logger.info(
"Claimed one-time-keys: %s", "Claimed one-time-keys: %s",

View file

@ -19,9 +19,11 @@
import httplib import httplib
import itertools import itertools
import logging import logging
import sys
from signedjson.key import decode_verify_key_bytes from signedjson.key import decode_verify_key_bytes
from signedjson.sign import verify_signed_json from signedjson.sign import verify_signed_json
import six
from twisted.internet import defer from twisted.internet import defer
from unpaddedbase64 import decode_base64 from unpaddedbase64 import decode_base64
@ -1514,12 +1516,14 @@ class FederationHandler(BaseHandler):
backfilled=backfilled, backfilled=backfilled,
) )
except: # noqa: E722, as we reraise the exception this is fine. except: # noqa: E722, as we reraise the exception this is fine.
# Ensure that we actually remove the entries in the push actions tp, value, tb = sys.exc_info()
# staging area
logcontext.preserve_fn( logcontext.run_in_background(
self.store.remove_push_actions_from_staging self.store.remove_push_actions_from_staging,
)(event.event_id) event.event_id,
raise )
six.reraise(tp, value, tb)
if not backfilled: if not backfilled:
# this intentionally does not yield: we don't care about the result # this intentionally does not yield: we don't care about the result

View file

@ -13,6 +13,12 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import logging
import simplejson
import sys
from canonicaljson import encode_canonical_json
import six
from twisted.internet import defer, reactor from twisted.internet import defer, reactor
from twisted.python.failure import Failure from twisted.python.failure import Failure
@ -34,11 +40,6 @@ from synapse.replication.http.send_event import send_event_to_master
from ._base import BaseHandler from ._base import BaseHandler
from canonicaljson import encode_canonical_json
import logging
import simplejson
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -729,8 +730,14 @@ class EventCreationHandler(object):
except: # noqa: E722, as we reraise the exception this is fine. except: # noqa: E722, as we reraise the exception this is fine.
# Ensure that we actually remove the entries in the push actions # Ensure that we actually remove the entries in the push actions
# staging area, if we calculated them. # staging area, if we calculated them.
preserve_fn(self.store.remove_push_actions_from_staging)(event.event_id) tp, value, tb = sys.exc_info()
raise
run_in_background(
self.store.remove_push_actions_from_staging,
event.event_id,
)
six.reraise(tp, value, tb)
@defer.inlineCallbacks @defer.inlineCallbacks
def persist_and_notify_client_event( def persist_and_notify_client_event(
@ -858,15 +865,25 @@ class EventCreationHandler(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def _notify(): def _notify():
yield run_on_reactor() yield run_on_reactor()
self.notifier.on_new_room_event( try:
event, event_stream_id, max_stream_id, self.notifier.on_new_room_event(
extra_users=extra_users event, event_stream_id, max_stream_id,
) extra_users=extra_users
)
except Exception:
logger.exception("Error notifying about new room event")
run_in_background(_notify) run_in_background(_notify)
if event.type == EventTypes.Message: if event.type == EventTypes.Message:
presence = self.hs.get_presence_handler()
# We don't want to block sending messages on any presence code. This # We don't want to block sending messages on any presence code. This
# matters as sometimes presence code can take a while. # matters as sometimes presence code can take a while.
preserve_fn(presence.bump_presence_active_time)(requester.user) run_in_background(self._bump_active_time, requester.user)
@defer.inlineCallbacks
def _bump_active_time(self, user):
try:
presence = self.hs.get_presence_handler()
yield presence.bump_presence_active_time(user)
except Exception:
logger.exception("Error bumping presence active time")

View file

@ -31,7 +31,7 @@ from synapse.storage.presence import UserPresenceState
from synapse.util.caches.descriptors import cachedInlineCallbacks from synapse.util.caches.descriptors import cachedInlineCallbacks
from synapse.util.async import Linearizer from synapse.util.async import Linearizer
from synapse.util.logcontext import preserve_fn from synapse.util.logcontext import run_in_background
from synapse.util.logutils import log_function from synapse.util.logutils import log_function
from synapse.util.metrics import Measure from synapse.util.metrics import Measure
from synapse.util.wheel_timer import WheelTimer from synapse.util.wheel_timer import WheelTimer
@ -254,6 +254,14 @@ class PresenceHandler(object):
logger.info("Finished _persist_unpersisted_changes") logger.info("Finished _persist_unpersisted_changes")
@defer.inlineCallbacks
def _update_states_and_catch_exception(self, new_states):
try:
res = yield self._update_states(new_states)
defer.returnValue(res)
except Exception:
logger.exception("Error updating presence")
@defer.inlineCallbacks @defer.inlineCallbacks
def _update_states(self, new_states): def _update_states(self, new_states):
"""Updates presence of users. Sets the appropriate timeouts. Pokes """Updates presence of users. Sets the appropriate timeouts. Pokes
@ -364,7 +372,7 @@ class PresenceHandler(object):
now=now, now=now,
) )
preserve_fn(self._update_states)(changes) run_in_background(self._update_states_and_catch_exception, changes)
except Exception: except Exception:
logger.exception("Exception in _handle_timeouts loop") logger.exception("Exception in _handle_timeouts loop")
@ -422,20 +430,23 @@ class PresenceHandler(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def _end(): def _end():
if affect_presence: try:
self.user_to_num_current_syncs[user_id] -= 1 self.user_to_num_current_syncs[user_id] -= 1
prev_state = yield self.current_state_for_user(user_id) prev_state = yield self.current_state_for_user(user_id)
yield self._update_states([prev_state.copy_and_replace( yield self._update_states([prev_state.copy_and_replace(
last_user_sync_ts=self.clock.time_msec(), last_user_sync_ts=self.clock.time_msec(),
)]) )])
except Exception:
logger.exception("Error updating presence after sync")
@contextmanager @contextmanager
def _user_syncing(): def _user_syncing():
try: try:
yield yield
finally: finally:
preserve_fn(_end)() if affect_presence:
run_in_background(_end)
defer.returnValue(_user_syncing()) defer.returnValue(_user_syncing())

View file

@ -135,37 +135,40 @@ class ReceiptsHandler(BaseHandler):
"""Given a list of receipts, works out which remote servers should be """Given a list of receipts, works out which remote servers should be
poked and pokes them. poked and pokes them.
""" """
# TODO: Some of this stuff should be coallesced. try:
for receipt in receipts: # TODO: Some of this stuff should be coallesced.
room_id = receipt["room_id"] for receipt in receipts:
receipt_type = receipt["receipt_type"] room_id = receipt["room_id"]
user_id = receipt["user_id"] receipt_type = receipt["receipt_type"]
event_ids = receipt["event_ids"] user_id = receipt["user_id"]
data = receipt["data"] event_ids = receipt["event_ids"]
data = receipt["data"]
users = yield self.state.get_current_user_in_room(room_id) users = yield self.state.get_current_user_in_room(room_id)
remotedomains = set(get_domain_from_id(u) for u in users) remotedomains = set(get_domain_from_id(u) for u in users)
remotedomains = remotedomains.copy() remotedomains = remotedomains.copy()
remotedomains.discard(self.server_name) remotedomains.discard(self.server_name)
logger.debug("Sending receipt to: %r", remotedomains) logger.debug("Sending receipt to: %r", remotedomains)
for domain in remotedomains: for domain in remotedomains:
self.federation.send_edu( self.federation.send_edu(
destination=domain, destination=domain,
edu_type="m.receipt", edu_type="m.receipt",
content={ content={
room_id: { room_id: {
receipt_type: { receipt_type: {
user_id: { user_id: {
"event_ids": event_ids, "event_ids": event_ids,
"data": data, "data": data,
}
} }
} },
}, },
}, key=(room_id, receipt_type, user_id),
key=(room_id, receipt_type, user_id), )
) except Exception:
logger.exception("Error pushing receipts to remote servers")
@defer.inlineCallbacks @defer.inlineCallbacks
def get_receipts_for_room(self, room_id, to_key): def get_receipts_for_room(self, room_id, to_key):

View file

@ -206,28 +206,31 @@ class TypingHandler(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def _push_remote(self, member, typing): def _push_remote(self, member, typing):
users = yield self.state.get_current_user_in_room(member.room_id) try:
self._member_last_federation_poke[member] = self.clock.time_msec() users = yield self.state.get_current_user_in_room(member.room_id)
self._member_last_federation_poke[member] = self.clock.time_msec()
now = self.clock.time_msec() now = self.clock.time_msec()
self.wheel_timer.insert( self.wheel_timer.insert(
now=now, now=now,
obj=member, obj=member,
then=now + FEDERATION_PING_INTERVAL, then=now + FEDERATION_PING_INTERVAL,
) )
for domain in set(get_domain_from_id(u) for u in users): for domain in set(get_domain_from_id(u) for u in users):
if domain != self.server_name: if domain != self.server_name:
self.federation.send_edu( self.federation.send_edu(
destination=domain, destination=domain,
edu_type="m.typing", edu_type="m.typing",
content={ content={
"room_id": member.room_id, "room_id": member.room_id,
"user_id": member.user_id, "user_id": member.user_id,
"typing": typing, "typing": typing,
}, },
key=member, key=member,
) )
except Exception:
logger.exception("Error pushing typing notif to remotes")
@defer.inlineCallbacks @defer.inlineCallbacks
def _recv_edu(self, origin, content): def _recv_edu(self, origin, content):

View file

@ -21,7 +21,7 @@ from synapse.handlers.presence import format_user_presence_state
from synapse.util import DeferredTimedOutError from synapse.util import DeferredTimedOutError
from synapse.util.logutils import log_function from synapse.util.logutils import log_function
from synapse.util.async import ObservableDeferred from synapse.util.async import ObservableDeferred
from synapse.util.logcontext import PreserveLoggingContext, preserve_fn from synapse.util.logcontext import PreserveLoggingContext, run_in_background
from synapse.util.metrics import Measure from synapse.util.metrics import Measure
from synapse.types import StreamToken from synapse.types import StreamToken
from synapse.visibility import filter_events_for_client from synapse.visibility import filter_events_for_client
@ -251,9 +251,7 @@ class Notifier(object):
def _on_new_room_event(self, event, room_stream_id, extra_users=[]): def _on_new_room_event(self, event, room_stream_id, extra_users=[]):
"""Notify any user streams that are interested in this room event""" """Notify any user streams that are interested in this room event"""
# poke any interested application service. # poke any interested application service.
preserve_fn(self.appservice_handler.notify_interested_services)( run_in_background(self._notify_app_services, room_stream_id)
room_stream_id
)
if self.federation_sender: if self.federation_sender:
self.federation_sender.notify_new_events(room_stream_id) self.federation_sender.notify_new_events(room_stream_id)
@ -267,6 +265,13 @@ class Notifier(object):
rooms=[event.room_id], rooms=[event.room_id],
) )
@defer.inlineCallbacks
def _notify_app_services(self, room_stream_id):
try:
yield self.appservice_handler.notify_interested_services(room_stream_id)
except Exception:
logger.exception("Error notifying application services of event")
def on_new_event(self, stream_key, new_token, users=[], rooms=[]): def on_new_event(self, stream_key, new_token, users=[], rooms=[]):
""" Used to inform listeners that something has happend event wise. """ Used to inform listeners that something has happend event wise.

View file

@ -77,10 +77,13 @@ class EmailPusher(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def on_started(self): def on_started(self):
if self.mailer is not None: if self.mailer is not None:
self.throttle_params = yield self.store.get_throttle_params_by_room( try:
self.pusher_id self.throttle_params = yield self.store.get_throttle_params_by_room(
) self.pusher_id
yield self._process() )
yield self._process()
except Exception:
logger.exception("Error starting email pusher")
def on_stop(self): def on_stop(self):
if self.timed_call: if self.timed_call:

View file

@ -94,7 +94,10 @@ class HttpPusher(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def on_started(self): def on_started(self):
yield self._process() try:
yield self._process()
except Exception:
logger.exception("Error starting http pusher")
@defer.inlineCallbacks @defer.inlineCallbacks
def on_new_notifications(self, min_stream_ordering, max_stream_ordering): def on_new_notifications(self, min_stream_ordering, max_stream_ordering):

View file

@ -143,7 +143,9 @@ class PusherPool:
) )
) )
yield make_deferred_yieldable(defer.gatherResults(deferreds)) yield make_deferred_yieldable(
defer.gatherResults(deferreds, consumeErrors=True),
)
except Exception: except Exception:
logger.exception("Exception in pusher on_new_notifications") logger.exception("Exception in pusher on_new_notifications")
@ -171,7 +173,9 @@ class PusherPool:
) )
) )
yield make_deferred_yieldable(defer.gatherResults(deferreds)) yield make_deferred_yieldable(
defer.gatherResults(deferreds, consumeErrors=True),
)
except Exception: except Exception:
logger.exception("Exception in pusher on_new_receipts") logger.exception("Exception in pusher on_new_receipts")

View file

@ -1,5 +1,6 @@
# Copyright 2015, 2016 OpenMarket Ltd # Copyright 2015, 2016 OpenMarket Ltd
# Copyright 2017 Vector Creations Ltd # Copyright 2017 Vector Creations Ltd
# Copyright 2018 New Vector Ltd
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
@ -18,6 +19,18 @@ from distutils.version import LooseVersion
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# this dict maps from python package name to a list of modules we expect it to
# provide.
#
# the key is a "requirement specifier", as used as a parameter to `pip
# install`[1], or an `install_requires` argument to `setuptools.setup` [2].
#
# the value is a sequence of strings; each entry should be the name of the
# python module, optionally followed by a version assertion which can be either
# ">=<ver>" or "==<ver>".
#
# [1] https://pip.pypa.io/en/stable/reference/pip_install/#requirement-specifiers.
# [2] https://setuptools.readthedocs.io/en/latest/setuptools.html#declaring-dependencies
REQUIREMENTS = { REQUIREMENTS = {
"jsonschema>=2.5.1": ["jsonschema>=2.5.1"], "jsonschema>=2.5.1": ["jsonschema>=2.5.1"],
"frozendict>=0.4": ["frozendict"], "frozendict>=0.4": ["frozendict"],
@ -26,7 +39,11 @@ REQUIREMENTS = {
"signedjson>=1.0.0": ["signedjson>=1.0.0"], "signedjson>=1.0.0": ["signedjson>=1.0.0"],
"pynacl>=1.2.1": ["nacl>=1.2.1", "nacl.bindings"], "pynacl>=1.2.1": ["nacl>=1.2.1", "nacl.bindings"],
"service_identity>=1.0.0": ["service_identity>=1.0.0"], "service_identity>=1.0.0": ["service_identity>=1.0.0"],
"Twisted>=16.0.0": ["twisted>=16.0.0"],
# we break under Twisted 18.4
# (https://github.com/matrix-org/synapse/issues/3135)
"Twisted>=16.0.0,<18.4": ["twisted>=16.0.0"],
"pyopenssl>=0.14": ["OpenSSL>=0.14"], "pyopenssl>=0.14": ["OpenSSL>=0.14"],
"pyyaml": ["yaml"], "pyyaml": ["yaml"],
"pyasn1": ["pyasn1"], "pyasn1": ["pyasn1"],
@ -39,6 +56,7 @@ REQUIREMENTS = {
"pymacaroons-pynacl": ["pymacaroons"], "pymacaroons-pynacl": ["pymacaroons"],
"msgpack-python>=0.3.0": ["msgpack"], "msgpack-python>=0.3.0": ["msgpack"],
"phonenumbers>=8.2.0": ["phonenumbers"], "phonenumbers>=8.2.0": ["phonenumbers"],
"six": ["six"],
} }
CONDITIONAL_REQUIREMENTS = { CONDITIONAL_REQUIREMENTS = {
"web_client": { "web_client": {

View file

@ -18,7 +18,7 @@ from twisted.internet import defer, threads
from .media_storage import FileResponder from .media_storage import FileResponder
from synapse.config._base import Config from synapse.config._base import Config
from synapse.util.logcontext import preserve_fn from synapse.util.logcontext import run_in_background
import logging import logging
import os import os
@ -87,7 +87,12 @@ class StorageProviderWrapper(StorageProvider):
return self.backend.store_file(path, file_info) return self.backend.store_file(path, file_info)
else: else:
# TODO: Handle errors. # TODO: Handle errors.
preserve_fn(self.backend.store_file)(path, file_info) def store():
try:
return self.backend.store_file(path, file_info)
except Exception:
logger.exception("Error storing file")
run_in_background(store)
return defer.succeed(None) return defer.succeed(None)
def fetch(self, path, file_info): def fetch(self, path, file_info):

View file

@ -448,6 +448,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
"add_push_actions_to_staging", _add_push_actions_to_staging_txn "add_push_actions_to_staging", _add_push_actions_to_staging_txn
) )
@defer.inlineCallbacks
def remove_push_actions_from_staging(self, event_id): def remove_push_actions_from_staging(self, event_id):
"""Called if we failed to persist the event to ensure that stale push """Called if we failed to persist the event to ensure that stale push
actions don't build up in the DB actions don't build up in the DB
@ -456,13 +457,22 @@ class EventPushActionsWorkerStore(SQLBaseStore):
event_id (str) event_id (str)
""" """
return self._simple_delete( try:
table="event_push_actions_staging", res = yield self._simple_delete(
keyvalues={ table="event_push_actions_staging",
"event_id": event_id, keyvalues={
}, "event_id": event_id,
desc="remove_push_actions_from_staging", },
) desc="remove_push_actions_from_staging",
)
defer.returnValue(res)
except Exception:
# this method is called from an exception handler, so propagating
# another exception here really isn't helpful - there's nothing
# the caller can do about it. Just log the exception and move on.
logger.exception(
"Error removing push actions after event persistence failure",
)
@defer.inlineCallbacks @defer.inlineCallbacks
def _find_stream_orderings_for_times(self): def _find_stream_orderings_for_times(self):

View file

@ -203,7 +203,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
room_id, from_key, to_key, limit, order=order, room_id, from_key, to_key, limit, order=order,
) )
for room_id in rm_ids for room_id in rm_ids
])) ], consumeErrors=True))
results.update(dict(zip(rm_ids, res))) results.update(dict(zip(rm_ids, res)))
defer.returnValue(results) defer.returnValue(results)

View file

@ -305,7 +305,12 @@ def run_in_background(f, *args, **kwargs):
deferred returned by the funtion completes. deferred returned by the funtion completes.
Useful for wrapping functions that return a deferred which you don't yield Useful for wrapping functions that return a deferred which you don't yield
on. on (for instance because you want to pass it to deferred.gatherResults()).
Note that if you completely discard the result, you should make sure that
`f` doesn't raise any deferred exceptions, otherwise a scary-looking
CRITICAL error about an unhandled error will be logged without much
indication about where it came from.
""" """
current = LoggingContext.current_context() current = LoggingContext.current_context()
res = f(*args, **kwargs) res = f(*args, **kwargs)