Merge branch 'develop' of github.com:matrix-org/synapse into erikj/repl_tcp_server

This commit is contained in:
Erik Johnston 2017-04-04 09:46:16 +01:00
commit 62b89daac6
28 changed files with 490 additions and 181 deletions

View file

@ -1,3 +1,47 @@
Changes in synapse v0.20.0-rc1 (2017-03-30)
===========================================
Features:
* Add delete_devices API (PR #1993)
* Add phone number registration/login support (PR #1994, #2055)
Changes:
* Use JSONSchema for validation of filters. Thanks @pik! (PR #1783)
* Reread log config on SIGHUP (PR #1982)
* Speed up public room list (PR #1989)
* Add helpful texts to logger config options (PR #1990)
* Minor ``/sync`` performance improvements. (PR #2002, #2013, #2022)
* Add some debug to help diagnose weird federation issue (PR #2035)
* Correctly limit retries for all federation requests (PR #2050, #2061)
* Don't lock table when persisting new one time keys (PR #2053)
* Reduce some CPU work on DB threads (PR #2054)
* Cache hosts in room (PR #2060)
* Batch sending of device list pokes (PR #2063)
* Speed up persist event path in certain edge cases (PR #2070)
Bug fixes:
* Fix bug where current_state_events renamed to current_state_ids (PR #1849)
* Fix routing loop when fetching remote media (PR #1992)
* Fix current_state_events table to not lie (PR #1996)
* Fix CAS login to handle PartialDownloadError (PR #1997)
* Fix assertion to stop transaction queue getting wedged (PR #2010)
* Fix presence to fallback to last_active_ts if it beats the last sync time.
Thanks @Half-Shot! (PR #2014)
* Fix bug when federation received a PDU while a room join is in progress (PR
#2016)
* Fix resetting state on rejected events (PR #2025)
* Fix installation issues in readme. Thanks @ricco386 (PR #2037)
* Fix caching of remote servers' signature keys (PR #2042)
* Fix some leaking log context (PR #2048, #2049, #2057, #2058)
* Fix rejection of invites not reaching sync (PR #2056)
Changes in synapse v0.19.3 (2017-03-20) Changes in synapse v0.19.3 (2017-03-20)
======================================= =======================================

View file

@ -108,10 +108,10 @@ Installing prerequisites on ArchLinux::
sudo pacman -S base-devel python2 python-pip \ sudo pacman -S base-devel python2 python-pip \
python-setuptools python-virtualenv sqlite3 python-setuptools python-virtualenv sqlite3
Installing prerequisites on CentOS 7:: Installing prerequisites on CentOS 7 or Fedora 25::
sudo yum install libtiff-devel libjpeg-devel libzip-devel freetype-devel \ sudo yum install libtiff-devel libjpeg-devel libzip-devel freetype-devel \
lcms2-devel libwebp-devel tcl-devel tk-devel \ lcms2-devel libwebp-devel tcl-devel tk-devel redhat-rpm-config \
python-virtualenv libffi-devel openssl-devel python-virtualenv libffi-devel openssl-devel
sudo yum groupinstall "Development Tools" sudo yum groupinstall "Development Tools"

View file

@ -204,9 +204,14 @@ That doesn't follow the rules, but we can fix it by wrapping it with
This technique works equally for external functions which return deferreds, This technique works equally for external functions which return deferreds,
or deferreds we have made ourselves. or deferreds we have made ourselves.
XXX: think this is what ``preserve_context_over_deferred`` is supposed to do, You can also use ``logcontext.make_deferred_yieldable``, which just does the
though it is broken, in that it only restores the logcontext for the duration boilerplate for you, so the above could be written:
of the callbacks, which doesn't comply with the logcontext rules.
.. code:: python
def sleep(seconds):
return logcontext.make_deferred_yieldable(get_sleep_deferred(seconds))
Fire-and-forget Fire-and-forget
--------------- ---------------

View file

@ -112,9 +112,9 @@ script one last time, e.g. if the SQLite database is at ``homeserver.db``
run:: run::
synapse_port_db --sqlite-database homeserver.db \ synapse_port_db --sqlite-database homeserver.db \
--postgres-config database_config.yaml --postgres-config homeserver-postgres.yaml
Once that has completed, change the synapse config to point at the PostgreSQL Once that has completed, change the synapse config to point at the PostgreSQL
database configuration file using the ``database_config`` parameter (see database configuration file ``homeserver-postgres.yaml`` (i.e. rename it to
`Synapse Config`_) and restart synapse. Synapse should now be running against ``homeserver.yaml``) and restart synapse. Synapse should now be running against
PostgreSQL. PostgreSQL.

View file

@ -50,14 +50,37 @@ You may be able to setup coturn via your package manager, or set it up manually
pwgen -s 64 1 pwgen -s 64 1
5. Ensure youe firewall allows traffic into the TURN server on 5. Consider your security settings. TURN lets users request a relay
the ports you've configured it to listen on (remember to allow which will connect to arbitrary IP addresses and ports. At the least
both TCP and UDP if you've enabled both). we recommend:
6. If you've configured coturn to support TLS/DTLS, generate or # VoIP traffic is all UDP. There is no reason to let users connect to arbitrary TCP endpoints via the relay.
no-tcp-relay
# don't let the relay ever try to connect to private IP address ranges within your network (if any)
# given the turn server is likely behind your firewall, remember to include any privileged public IPs too.
denied-peer-ip=10.0.0.0-10.255.255.255
denied-peer-ip=192.168.0.0-192.168.255.255
denied-peer-ip=172.16.0.0-172.31.255.255
# special case the turn server itself so that client->TURN->TURN->client flows work
allowed-peer-ip=10.0.0.1
# consider whether you want to limit the quota of relayed streams per user (or total) to avoid risk of DoS.
user-quota=12 # 4 streams per video call, so 12 streams = 3 simultaneous relayed calls per user.
total-quota=1200
Ideally coturn should refuse to relay traffic which isn't SRTP;
see https://github.com/matrix-org/synapse/issues/2009
6. Ensure your firewall allows traffic into the TURN server on
the ports you've configured it to listen on (remember to allow
both TCP and UDP TURN traffic)
7. If you've configured coturn to support TLS/DTLS, generate or
import your private key and certificate. import your private key and certificate.
7. Start the turn server:: 8. Start the turn server::
bin/turnserver -o bin/turnserver -o
@ -83,12 +106,19 @@ Your home server configuration file needs the following extra keys:
to refresh credentials. The TURN REST API specification recommends to refresh credentials. The TURN REST API specification recommends
one day (86400000). one day (86400000).
4. "turn_allow_guests": Whether to allow guest users to use the TURN
server. This is enabled by default, as otherwise VoIP will not
work reliably for guests. However, it does introduce a security risk
as it lets guests connect to arbitrary endpoints without having gone
through a CAPTCHA or similar to register a real account.
As an example, here is the relevant section of the config file for As an example, here is the relevant section of the config file for
matrix.org:: matrix.org::
turn_uris: [ "turn:turn.matrix.org:3478?transport=udp", "turn:turn.matrix.org:3478?transport=tcp" ] turn_uris: [ "turn:turn.matrix.org:3478?transport=udp", "turn:turn.matrix.org:3478?transport=tcp" ]
turn_shared_secret: n0t4ctuAllymatr1Xd0TorgSshar3d5ecret4obvIousreAsons turn_shared_secret: n0t4ctuAllymatr1Xd0TorgSshar3d5ecret4obvIousreAsons
turn_user_lifetime: 86400000 turn_user_lifetime: 86400000
turn_allow_guests: True
Now, restart synapse:: Now, restart synapse::

View file

@ -9,16 +9,39 @@
ROOMID="$1" ROOMID="$1"
sqlite3 homeserver.db <<EOF sqlite3 homeserver.db <<EOF
DELETE FROM context_depth WHERE context = '$ROOMID'; DELETE FROM event_forward_extremities WHERE room_id = '$ROOMID';
DELETE FROM current_state WHERE context = '$ROOMID'; DELETE FROM event_backward_extremities WHERE room_id = '$ROOMID';
DELETE FROM feedback WHERE room_id = '$ROOMID'; DELETE FROM event_edges WHERE room_id = '$ROOMID';
DELETE FROM messages WHERE room_id = '$ROOMID'; DELETE FROM room_depth WHERE room_id = '$ROOMID';
DELETE FROM pdu_backward_extremities WHERE context = '$ROOMID'; DELETE FROM state_forward_extremities WHERE room_id = '$ROOMID';
DELETE FROM pdu_edges WHERE context = '$ROOMID'; DELETE FROM events WHERE room_id = '$ROOMID';
DELETE FROM pdu_forward_extremities WHERE context = '$ROOMID'; DELETE FROM event_json WHERE room_id = '$ROOMID';
DELETE FROM pdus WHERE context = '$ROOMID'; DELETE FROM state_events WHERE room_id = '$ROOMID';
DELETE FROM room_data WHERE room_id = '$ROOMID'; DELETE FROM current_state_events WHERE room_id = '$ROOMID';
DELETE FROM room_memberships WHERE room_id = '$ROOMID'; DELETE FROM room_memberships WHERE room_id = '$ROOMID';
DELETE FROM feedback WHERE room_id = '$ROOMID';
DELETE FROM topics WHERE room_id = '$ROOMID';
DELETE FROM room_names WHERE room_id = '$ROOMID';
DELETE FROM rooms WHERE room_id = '$ROOMID'; DELETE FROM rooms WHERE room_id = '$ROOMID';
DELETE FROM state_pdus WHERE context = '$ROOMID'; DELETE FROM room_hosts WHERE room_id = '$ROOMID';
DELETE FROM room_aliases WHERE room_id = '$ROOMID';
DELETE FROM state_groups WHERE room_id = '$ROOMID';
DELETE FROM state_groups_state WHERE room_id = '$ROOMID';
DELETE FROM receipts_graph WHERE room_id = '$ROOMID';
DELETE FROM receipts_linearized WHERE room_id = '$ROOMID';
DELETE FROM event_search_content WHERE c1room_id = '$ROOMID';
DELETE FROM guest_access WHERE room_id = '$ROOMID';
DELETE FROM history_visibility WHERE room_id = '$ROOMID';
DELETE FROM room_tags WHERE room_id = '$ROOMID';
DELETE FROM room_tags_revisions WHERE room_id = '$ROOMID';
DELETE FROM room_account_data WHERE room_id = '$ROOMID';
DELETE FROM event_push_actions WHERE room_id = '$ROOMID';
DELETE FROM local_invites WHERE room_id = '$ROOMID';
DELETE FROM pusher_throttle WHERE room_id = '$ROOMID';
DELETE FROM event_reports WHERE room_id = '$ROOMID';
DELETE FROM public_room_list_stream WHERE room_id = '$ROOMID';
DELETE FROM stream_ordering_to_exterm WHERE room_id = '$ROOMID';
DELETE FROM event_auth WHERE room_id = '$ROOMID';
DELETE FROM appservice_room_list WHERE room_id = '$ROOMID';
VACUUM;
EOF EOF

View file

@ -447,9 +447,7 @@ class Porter(object):
postgres_tables = yield self.postgres_store._simple_select_onecol( postgres_tables = yield self.postgres_store._simple_select_onecol(
table="information_schema.tables", table="information_schema.tables",
keyvalues={ keyvalues={},
"table_schema": "public",
},
retcol="distinct table_name", retcol="distinct table_name",
) )

View file

@ -16,4 +16,4 @@
""" This is a reference implementation of a Matrix home server. """ This is a reference implementation of a Matrix home server.
""" """
__version__ = "0.19.3" __version__ = "0.20.0-rc1"

View file

@ -202,7 +202,8 @@ def main():
worker_app = worker_config["worker_app"] worker_app = worker_config["worker_app"]
worker_pidfile = worker_config["worker_pid_file"] worker_pidfile = worker_config["worker_pid_file"]
worker_daemonize = worker_config["worker_daemonize"] worker_daemonize = worker_config["worker_daemonize"]
assert worker_daemonize # TODO print something more user friendly assert worker_daemonize, "In config %r: expected '%s' to be True" % (
worker_configfile, "worker_daemonize")
worker_cache_factor = worker_config.get("synctl_cache_factor") worker_cache_factor = worker_config.get("synctl_cache_factor")
workers.append(Worker( workers.append(Worker(
worker_app, worker_configfile, worker_pidfile, worker_cache_factor, worker_app, worker_configfile, worker_pidfile, worker_cache_factor,

View file

@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from synapse.api.constants import EventTypes from synapse.api.constants import EventTypes
from synapse.util.caches.descriptors import cachedInlineCallbacks
from twisted.internet import defer from twisted.internet import defer
@ -124,29 +125,23 @@ class ApplicationService(object):
raise ValueError( raise ValueError(
"Expected bool for 'exclusive' in ns '%s'" % ns "Expected bool for 'exclusive' in ns '%s'" % ns
) )
if not isinstance(regex_obj.get("regex"), basestring): regex = regex_obj.get("regex")
if isinstance(regex, basestring):
regex_obj["regex"] = re.compile(regex) # Pre-compile regex
else:
raise ValueError( raise ValueError(
"Expected string for 'regex' in ns '%s'" % ns "Expected string for 'regex' in ns '%s'" % ns
) )
return namespaces return namespaces
def _matches_regex(self, test_string, namespace_key, return_obj=False): def _matches_regex(self, test_string, namespace_key):
if not isinstance(test_string, basestring):
logger.error(
"Expected a string to test regex against, but got %s",
test_string
)
return False
for regex_obj in self.namespaces[namespace_key]: for regex_obj in self.namespaces[namespace_key]:
if re.match(regex_obj["regex"], test_string): if regex_obj["regex"].match(test_string):
if return_obj: return regex_obj
return regex_obj return None
return True
return False
def _is_exclusive(self, ns_key, test_string): def _is_exclusive(self, ns_key, test_string):
regex_obj = self._matches_regex(test_string, ns_key, return_obj=True) regex_obj = self._matches_regex(test_string, ns_key)
if regex_obj: if regex_obj:
return regex_obj["exclusive"] return regex_obj["exclusive"]
return False return False
@ -166,7 +161,14 @@ class ApplicationService(object):
if not store: if not store:
defer.returnValue(False) defer.returnValue(False)
member_list = yield store.get_users_in_room(event.room_id) does_match = yield self._matches_user_in_member_list(event.room_id, store)
defer.returnValue(does_match)
@cachedInlineCallbacks(num_args=1, cache_context=True)
def _matches_user_in_member_list(self, room_id, store, cache_context):
member_list = yield store.get_users_in_room(
room_id, on_invalidate=cache_context.invalidate
)
# check joined member events # check joined member events
for user_id in member_list: for user_id in member_list:
@ -219,10 +221,10 @@ class ApplicationService(object):
) )
def is_interested_in_alias(self, alias): def is_interested_in_alias(self, alias):
return self._matches_regex(alias, ApplicationService.NS_ALIASES) return bool(self._matches_regex(alias, ApplicationService.NS_ALIASES))
def is_interested_in_room(self, room_id): def is_interested_in_room(self, room_id):
return self._matches_regex(room_id, ApplicationService.NS_ROOMS) return bool(self._matches_regex(room_id, ApplicationService.NS_ROOMS))
def is_exclusive_user(self, user_id): def is_exclusive_user(self, user_id):
return ( return (

View file

@ -23,6 +23,7 @@ class VoipConfig(Config):
self.turn_username = config.get("turn_username") self.turn_username = config.get("turn_username")
self.turn_password = config.get("turn_password") self.turn_password = config.get("turn_password")
self.turn_user_lifetime = self.parse_duration(config["turn_user_lifetime"]) self.turn_user_lifetime = self.parse_duration(config["turn_user_lifetime"])
self.turn_allow_guests = config.get("turn_allow_guests", True)
def default_config(self, **kwargs): def default_config(self, **kwargs):
return """\ return """\
@ -41,4 +42,11 @@ class VoipConfig(Config):
# How long generated TURN credentials last # How long generated TURN credentials last
turn_user_lifetime: "1h" turn_user_lifetime: "1h"
# Whether guests should be allowed to use the TURN server.
# This defaults to True, otherwise VoIP will be unreliable for guests.
# However, it does introduce a slight security risk as it allows users to
# connect to arbitrary endpoints without having first signed up for a
# valid account (e.g. by passing a CAPTCHA).
turn_allow_guests: True
""" """

View file

@ -146,11 +146,15 @@ class FederationServer(FederationBase):
# check that it's actually being sent from a valid destination to # check that it's actually being sent from a valid destination to
# workaround bug #1753 in 0.18.5 and 0.18.6 # workaround bug #1753 in 0.18.5 and 0.18.6
if transaction.origin != get_domain_from_id(pdu.event_id): if transaction.origin != get_domain_from_id(pdu.event_id):
# We continue to accept join events from any server; this is
# necessary for the federation join dance to work correctly.
# (When we join over federation, the "helper" server is
# responsible for sending out the join event, rather than the
# origin. See bug #1893).
if not ( if not (
pdu.type == 'm.room.member' and pdu.type == 'm.room.member' and
pdu.content and pdu.content and
pdu.content.get("membership", None) == 'join' and pdu.content.get("membership", None) == 'join'
self.hs.is_mine_id(pdu.state_key)
): ):
logger.info( logger.info(
"Discarding PDU %s from invalid origin %s", "Discarding PDU %s from invalid origin %s",

View file

@ -28,7 +28,7 @@ from synapse.api.constants import EventTypes, Membership, RejectedReason
from synapse.events.validator import EventValidator from synapse.events.validator import EventValidator
from synapse.util import unwrapFirstError from synapse.util import unwrapFirstError
from synapse.util.logcontext import ( from synapse.util.logcontext import (
PreserveLoggingContext, preserve_fn, preserve_context_over_deferred preserve_fn, preserve_context_over_deferred
) )
from synapse.util.metrics import measure_func from synapse.util.metrics import measure_func
from synapse.util.logutils import log_function from synapse.util.logutils import log_function
@ -394,11 +394,10 @@ class FederationHandler(BaseHandler):
target_user = UserID.from_string(target_user_id) target_user = UserID.from_string(target_user_id)
extra_users.append(target_user) extra_users.append(target_user)
with PreserveLoggingContext(): self.notifier.on_new_room_event(
self.notifier.on_new_room_event( event, event_stream_id, max_stream_id,
event, event_stream_id, max_stream_id, extra_users=extra_users
extra_users=extra_users )
)
if event.type == EventTypes.Member: if event.type == EventTypes.Member:
if event.membership == Membership.JOIN: if event.membership == Membership.JOIN:
@ -916,11 +915,10 @@ class FederationHandler(BaseHandler):
origin, auth_chain, state, event origin, auth_chain, state, event
) )
with PreserveLoggingContext(): self.notifier.on_new_room_event(
self.notifier.on_new_room_event( event, event_stream_id, max_stream_id,
event, event_stream_id, max_stream_id, extra_users=[joinee]
extra_users=[joinee] )
)
logger.debug("Finished joining %s to %s", joinee, room_id) logger.debug("Finished joining %s to %s", joinee, room_id)
finally: finally:
@ -1004,9 +1002,19 @@ class FederationHandler(BaseHandler):
) )
event.internal_metadata.outlier = False event.internal_metadata.outlier = False
# Send this event on behalf of the origin server since they may not # Send this event on behalf of the origin server.
# have an up to data view of the state of the room at this event so #
# will not know which servers to send the event to. # The reasons we have the destination server rather than the origin
# server send it are slightly mysterious: the origin server should have
# all the neccessary state once it gets the response to the send_join,
# so it could send the event itself if it wanted to. It may be that
# doing it this way reduces failure modes, or avoids certain attacks
# where a new server selectively tells a subset of the federation that
# it has joined.
#
# The fact is that, as of the current writing, Synapse doesn't send out
# the join event over federation after joining, and changing it now
# would introduce the danger of backwards-compatibility problems.
event.internal_metadata.send_on_behalf_of = origin event.internal_metadata.send_on_behalf_of = origin
context, event_stream_id, max_stream_id = yield self._handle_new_event( context, event_stream_id, max_stream_id = yield self._handle_new_event(
@ -1025,10 +1033,9 @@ class FederationHandler(BaseHandler):
target_user = UserID.from_string(target_user_id) target_user = UserID.from_string(target_user_id)
extra_users.append(target_user) extra_users.append(target_user)
with PreserveLoggingContext(): self.notifier.on_new_room_event(
self.notifier.on_new_room_event( event, event_stream_id, max_stream_id, extra_users=extra_users
event, event_stream_id, max_stream_id, extra_users=extra_users )
)
if event.type == EventTypes.Member: if event.type == EventTypes.Member:
if event.content["membership"] == Membership.JOIN: if event.content["membership"] == Membership.JOIN:
@ -1074,11 +1081,10 @@ class FederationHandler(BaseHandler):
) )
target_user = UserID.from_string(event.state_key) target_user = UserID.from_string(event.state_key)
with PreserveLoggingContext(): self.notifier.on_new_room_event(
self.notifier.on_new_room_event( event, event_stream_id, max_stream_id,
event, event_stream_id, max_stream_id, extra_users=[target_user],
extra_users=[target_user], )
)
defer.returnValue(event) defer.returnValue(event)
@ -1236,10 +1242,9 @@ class FederationHandler(BaseHandler):
target_user = UserID.from_string(target_user_id) target_user = UserID.from_string(target_user_id)
extra_users.append(target_user) extra_users.append(target_user)
with PreserveLoggingContext(): self.notifier.on_new_room_event(
self.notifier.on_new_room_event( event, event_stream_id, max_stream_id, extra_users=extra_users
event, event_stream_id, max_stream_id, extra_users=extra_users )
)
defer.returnValue(None) defer.returnValue(None)

View file

@ -612,7 +612,7 @@ class MessageHandler(BaseHandler):
@defer.inlineCallbacks @defer.inlineCallbacks
def _notify(): def _notify():
yield run_on_reactor() yield run_on_reactor()
yield self.notifier.on_new_room_event( self.notifier.on_new_room_event(
event, event_stream_id, max_stream_id, event, event_stream_id, max_stream_id,
extra_users=extra_users extra_users=extra_users
) )

View file

@ -210,7 +210,6 @@ class Notifier(object):
""" """
self.replication_callbacks.append(cb) self.replication_callbacks.append(cb)
@preserve_fn
def on_new_room_event(self, event, room_stream_id, max_room_stream_id, def on_new_room_event(self, event, room_stream_id, max_room_stream_id,
extra_users=[]): extra_users=[]):
""" Used by handlers to inform the notifier something has happened """ Used by handlers to inform the notifier something has happened
@ -224,15 +223,13 @@ class Notifier(object):
until all previous events have been persisted before notifying until all previous events have been persisted before notifying
the client streams. the client streams.
""" """
with PreserveLoggingContext(): self.pending_new_room_events.append((
self.pending_new_room_events.append(( room_stream_id, event, extra_users
room_stream_id, event, extra_users ))
)) self._notify_pending_new_room_events(max_room_stream_id)
self._notify_pending_new_room_events(max_room_stream_id)
self.notify_replication() self.notify_replication()
@preserve_fn
def _notify_pending_new_room_events(self, max_room_stream_id): def _notify_pending_new_room_events(self, max_room_stream_id):
"""Notify for the room events that were queued waiting for a previous """Notify for the room events that were queued waiting for a previous
event to be persisted. event to be persisted.
@ -250,14 +247,16 @@ class Notifier(object):
else: else:
self._on_new_room_event(event, room_stream_id, extra_users) self._on_new_room_event(event, room_stream_id, extra_users)
@preserve_fn
def _on_new_room_event(self, event, room_stream_id, extra_users=[]): def _on_new_room_event(self, event, room_stream_id, extra_users=[]):
"""Notify any user streams that are interested in this room event""" """Notify any user streams that are interested in this room event"""
# poke any interested application service. # poke any interested application service.
self.appservice_handler.notify_interested_services(room_stream_id) preserve_fn(self.appservice_handler.notify_interested_services)(
room_stream_id)
if self.federation_sender: if self.federation_sender:
self.federation_sender.notify_new_events(room_stream_id) preserve_fn(self.federation_sender.notify_new_events)(
room_stream_id
)
if event.type == EventTypes.Member and event.membership == Membership.JOIN: if event.type == EventTypes.Member and event.membership == Membership.JOIN:
self._user_joined_room(event.state_key, event.room_id) self._user_joined_room(event.state_key, event.room_id)
@ -268,7 +267,6 @@ class Notifier(object):
rooms=[event.room_id], rooms=[event.room_id],
) )
@preserve_fn
def on_new_event(self, stream_key, new_token, users=[], rooms=[]): def on_new_event(self, stream_key, new_token, users=[], rooms=[]):
""" Used to inform listeners that something has happend event wise. """ Used to inform listeners that something has happend event wise.
@ -295,7 +293,6 @@ class Notifier(object):
self.notify_replication() self.notify_replication()
@preserve_fn
def on_new_replication_data(self): def on_new_replication_data(self):
"""Used to inform replication listeners that something has happend """Used to inform replication listeners that something has happend
without waking up any of the normal user event streams""" without waking up any of the normal user event streams"""

View file

@ -17,6 +17,7 @@ import logging
import re import re
from synapse.types import UserID from synapse.types import UserID
from synapse.util.caches import CACHE_SIZE_FACTOR, register_cache
from synapse.util.caches.lrucache import LruCache from synapse.util.caches.lrucache import LruCache
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -125,6 +126,11 @@ class PushRuleEvaluatorForEvent(object):
return self._value_cache.get(dotted_key, None) return self._value_cache.get(dotted_key, None)
# Caches (glob, word_boundary) -> regex for push. See _glob_matches
regex_cache = LruCache(50000 * CACHE_SIZE_FACTOR)
register_cache("regex_push_cache", regex_cache)
def _glob_matches(glob, value, word_boundary=False): def _glob_matches(glob, value, word_boundary=False):
"""Tests if value matches glob. """Tests if value matches glob.
@ -137,46 +143,63 @@ def _glob_matches(glob, value, word_boundary=False):
Returns: Returns:
bool bool
""" """
try: try:
if IS_GLOB.search(glob): r = regex_cache.get((glob, word_boundary), None)
r = re.escape(glob) if not r:
r = _glob_to_re(glob, word_boundary)
r = r.replace(r'\*', '.*?') regex_cache[(glob, word_boundary)] = r
r = r.replace(r'\?', '.') return r.search(value)
# handle [abc], [a-z] and [!a-z] style ranges.
r = GLOB_REGEX.sub(
lambda x: (
'[%s%s]' % (
x.group(1) and '^' or '',
x.group(2).replace(r'\\\-', '-')
)
),
r,
)
if word_boundary:
r = r"\b%s\b" % (r,)
r = _compile_regex(r)
return r.search(value)
else:
r = r + "$"
r = _compile_regex(r)
return r.match(value)
elif word_boundary:
r = re.escape(glob)
r = r"\b%s\b" % (r,)
r = _compile_regex(r)
return r.search(value)
else:
return value.lower() == glob.lower()
except re.error: except re.error:
logger.warn("Failed to parse glob to regex: %r", glob) logger.warn("Failed to parse glob to regex: %r", glob)
return False return False
def _glob_to_re(glob, word_boundary):
"""Generates regex for a given glob.
Args:
glob (string)
word_boundary (bool): Whether to match against word boundaries or entire
string. Defaults to False.
Returns:
regex object
"""
if IS_GLOB.search(glob):
r = re.escape(glob)
r = r.replace(r'\*', '.*?')
r = r.replace(r'\?', '.')
# handle [abc], [a-z] and [!a-z] style ranges.
r = GLOB_REGEX.sub(
lambda x: (
'[%s%s]' % (
x.group(1) and '^' or '',
x.group(2).replace(r'\\\-', '-')
)
),
r,
)
if word_boundary:
r = r"\b%s\b" % (r,)
return re.compile(r, flags=re.IGNORECASE)
else:
r = "^" + r + "$"
return re.compile(r, flags=re.IGNORECASE)
elif word_boundary:
r = re.escape(glob)
r = r"\b%s\b" % (r,)
return re.compile(r, flags=re.IGNORECASE)
else:
r = "^" + re.escape(glob) + "$"
return re.compile(r, flags=re.IGNORECASE)
def _flatten_dict(d, prefix=[], result={}): def _flatten_dict(d, prefix=[], result={}):
for key, value in d.items(): for key, value in d.items():
if isinstance(value, basestring): if isinstance(value, basestring):
@ -185,16 +208,3 @@ def _flatten_dict(d, prefix=[], result={}):
_flatten_dict(value, prefix=(prefix + [key]), result=result) _flatten_dict(value, prefix=(prefix + [key]), result=result)
return result return result
regex_cache = LruCache(5000)
def _compile_regex(regex_str):
r = regex_cache.get(regex_str, None)
if r:
return r
r = re.compile(regex_str, flags=re.IGNORECASE)
regex_cache[regex_str] = r
return r

View file

@ -17,15 +17,12 @@ from twisted.internet import defer
from synapse.push.presentable_names import ( from synapse.push.presentable_names import (
calculate_room_name, name_from_member_event calculate_room_name, name_from_member_event
) )
from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
@defer.inlineCallbacks @defer.inlineCallbacks
def get_badge_count(store, user_id): def get_badge_count(store, user_id):
invites, joins = yield preserve_context_over_deferred(defer.gatherResults([ invites = yield store.get_invited_rooms_for_user(user_id)
preserve_fn(store.get_invited_rooms_for_user)(user_id), joins = yield store.get_rooms_for_user(user_id)
preserve_fn(store.get_rooms_for_user)(user_id),
], consumeErrors=True))
my_receipts_by_room = yield store.get_receipts_for_user( my_receipts_by_room = yield store.get_receipts_for_user(
user_id, "m.read", user_id, "m.read",

View file

@ -28,7 +28,10 @@ class VoipRestServlet(ClientV1RestServlet):
@defer.inlineCallbacks @defer.inlineCallbacks
def on_GET(self, request): def on_GET(self, request):
requester = yield self.auth.get_user_by_req(request) requester = yield self.auth.get_user_by_req(
request,
self.hs.config.turn_allow_guests
)
turnUris = self.hs.config.turn_uris turnUris = self.hs.config.turn_uris
turnSecret = self.hs.config.turn_shared_secret turnSecret = self.hs.config.turn_shared_secret

View file

@ -36,7 +36,7 @@ class ThirdPartyProtocolsServlet(RestServlet):
@defer.inlineCallbacks @defer.inlineCallbacks
def on_GET(self, request): def on_GET(self, request):
yield self.auth.get_user_by_req(request) yield self.auth.get_user_by_req(request, allow_guest=True)
protocols = yield self.appservice_handler.get_3pe_protocols() protocols = yield self.appservice_handler.get_3pe_protocols()
defer.returnValue((200, protocols)) defer.returnValue((200, protocols))
@ -54,7 +54,7 @@ class ThirdPartyProtocolServlet(RestServlet):
@defer.inlineCallbacks @defer.inlineCallbacks
def on_GET(self, request, protocol): def on_GET(self, request, protocol):
yield self.auth.get_user_by_req(request) yield self.auth.get_user_by_req(request, allow_guest=True)
protocols = yield self.appservice_handler.get_3pe_protocols( protocols = yield self.appservice_handler.get_3pe_protocols(
only_protocol=protocol, only_protocol=protocol,
@ -77,7 +77,7 @@ class ThirdPartyUserServlet(RestServlet):
@defer.inlineCallbacks @defer.inlineCallbacks
def on_GET(self, request, protocol): def on_GET(self, request, protocol):
yield self.auth.get_user_by_req(request) yield self.auth.get_user_by_req(request, allow_guest=True)
fields = request.args fields = request.args
fields.pop("access_token", None) fields.pop("access_token", None)
@ -101,7 +101,7 @@ class ThirdPartyLocationServlet(RestServlet):
@defer.inlineCallbacks @defer.inlineCallbacks
def on_GET(self, request, protocol): def on_GET(self, request, protocol):
yield self.auth.get_user_by_req(request) yield self.auth.get_user_by_req(request, allow_guest=True)
fields = request.args fields = request.args
fields.pop("access_token", None) fields.pop("access_token", None)

View file

@ -216,9 +216,7 @@ class StreamToken(
return self return self
def copy_and_replace(self, key, new_value): def copy_and_replace(self, key, new_value):
d = self._asdict() return self._replace(**{key: new_value})
d[key] = new_value
return StreamToken(**d)
StreamToken.START = StreamToken( StreamToken.START = StreamToken(

View file

@ -89,6 +89,11 @@ class ObservableDeferred(object):
deferred.addCallbacks(callback, errback) deferred.addCallbacks(callback, errback)
def observe(self): def observe(self):
"""Observe the underlying deferred.
Can return either a deferred if the underlying deferred is still pending
(or has failed), or the actual value. Callers may need to use maybeDeferred.
"""
if not self._result: if not self._result:
d = defer.Deferred() d = defer.Deferred()
@ -101,7 +106,7 @@ class ObservableDeferred(object):
return d return d
else: else:
success, res = self._result success, res = self._result
return defer.succeed(res) if success else defer.fail(res) return res if success else defer.fail(res)
def observers(self): def observers(self):
return self._observers return self._observers

View file

@ -15,12 +15,9 @@
import logging import logging
from synapse.util.async import ObservableDeferred from synapse.util.async import ObservableDeferred
from synapse.util import unwrapFirstError from synapse.util import unwrapFirstError, logcontext
from synapse.util.caches.lrucache import LruCache from synapse.util.caches.lrucache import LruCache
from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry
from synapse.util.logcontext import (
PreserveLoggingContext, preserve_context_over_deferred, preserve_context_over_fn
)
from . import DEBUG_CACHES, register_cache from . import DEBUG_CACHES, register_cache
@ -227,8 +224,20 @@ class _CacheDescriptorBase(object):
) )
self.num_args = num_args self.num_args = num_args
# list of the names of the args used as the cache key
self.arg_names = all_args[1:num_args + 1] self.arg_names = all_args[1:num_args + 1]
# self.arg_defaults is a map of arg name to its default value for each
# argument that has a default value
if arg_spec.defaults:
self.arg_defaults = dict(zip(
all_args[-len(arg_spec.defaults):],
arg_spec.defaults
))
else:
self.arg_defaults = {}
if "cache_context" in self.arg_names: if "cache_context" in self.arg_names:
raise Exception( raise Exception(
"cache_context arg cannot be included among the cache keys" "cache_context arg cannot be included among the cache keys"
@ -292,18 +301,31 @@ class CacheDescriptor(_CacheDescriptorBase):
iterable=self.iterable, iterable=self.iterable,
) )
def get_cache_key(args, kwargs):
"""Given some args/kwargs return a generator that resolves into
the cache_key.
We loop through each arg name, looking up if its in the `kwargs`,
otherwise using the next argument in `args`. If there are no more
args then we try looking the arg name up in the defaults
"""
pos = 0
for nm in self.arg_names:
if nm in kwargs:
yield kwargs[nm]
elif pos < len(args):
yield args[pos]
pos += 1
else:
yield self.arg_defaults[nm]
@functools.wraps(self.orig) @functools.wraps(self.orig)
def wrapped(*args, **kwargs): def wrapped(*args, **kwargs):
# If we're passed a cache_context then we'll want to call its invalidate() # If we're passed a cache_context then we'll want to call its invalidate()
# whenever we are invalidated # whenever we are invalidated
invalidate_callback = kwargs.pop("on_invalidate", None) invalidate_callback = kwargs.pop("on_invalidate", None)
# Add temp cache_context so inspect.getcallargs doesn't explode cache_key = tuple(get_cache_key(args, kwargs))
if self.add_cache_context:
kwargs["cache_context"] = None
arg_dict = inspect.getcallargs(self.orig, obj, *args, **kwargs)
cache_key = tuple(arg_dict[arg_nm] for arg_nm in self.arg_names)
# Add our own `cache_context` to argument list if the wrapped function # Add our own `cache_context` to argument list if the wrapped function
# has asked for one # has asked for one
@ -328,11 +350,9 @@ class CacheDescriptor(_CacheDescriptorBase):
defer.returnValue(cached_result) defer.returnValue(cached_result)
observer.addCallback(check_result) observer.addCallback(check_result)
return preserve_context_over_deferred(observer)
except KeyError: except KeyError:
ret = defer.maybeDeferred( ret = defer.maybeDeferred(
preserve_context_over_fn, logcontext.preserve_fn(self.function_to_call),
self.function_to_call,
obj, *args, **kwargs obj, *args, **kwargs
) )
@ -342,10 +362,14 @@ class CacheDescriptor(_CacheDescriptorBase):
ret.addErrback(onErr) ret.addErrback(onErr)
ret = ObservableDeferred(ret, consumeErrors=True) result_d = ObservableDeferred(ret, consumeErrors=True)
cache.set(cache_key, ret, callback=invalidate_callback) cache.set(cache_key, result_d, callback=invalidate_callback)
observer = result_d.observe()
return preserve_context_over_deferred(ret.observe()) if isinstance(observer, defer.Deferred):
return logcontext.make_deferred_yieldable(observer)
else:
return observer
wrapped.invalidate = cache.invalidate wrapped.invalidate = cache.invalidate
wrapped.invalidate_all = cache.invalidate_all wrapped.invalidate_all = cache.invalidate_all
@ -362,7 +386,11 @@ class CacheListDescriptor(_CacheDescriptorBase):
"""Wraps an existing cache to support bulk fetching of keys. """Wraps an existing cache to support bulk fetching of keys.
Given a list of keys it looks in the cache to find any hits, then passes Given a list of keys it looks in the cache to find any hits, then passes
the list of missing keys to the wrapped fucntion. the list of missing keys to the wrapped function.
Once wrapped, the function returns either a Deferred which resolves to
the list of results, or (if all results were cached), just the list of
results.
""" """
def __init__(self, orig, cached_method_name, list_name, num_args=None, def __init__(self, orig, cached_method_name, list_name, num_args=None,
@ -433,8 +461,7 @@ class CacheListDescriptor(_CacheDescriptorBase):
args_to_call[self.list_name] = missing args_to_call[self.list_name] = missing
ret_d = defer.maybeDeferred( ret_d = defer.maybeDeferred(
preserve_context_over_fn, logcontext.preserve_fn(self.function_to_call),
self.function_to_call,
**args_to_call **args_to_call
) )
@ -443,8 +470,7 @@ class CacheListDescriptor(_CacheDescriptorBase):
# We need to create deferreds for each arg in the list so that # We need to create deferreds for each arg in the list so that
# we can insert the new deferred into the cache. # we can insert the new deferred into the cache.
for arg in missing: for arg in missing:
with PreserveLoggingContext(): observer = ret_d.observe()
observer = ret_d.observe()
observer.addCallback(lambda r, arg: r.get(arg, None), arg) observer.addCallback(lambda r, arg: r.get(arg, None), arg)
observer = ObservableDeferred(observer) observer = ObservableDeferred(observer)
@ -471,7 +497,7 @@ class CacheListDescriptor(_CacheDescriptorBase):
results.update(res) results.update(res)
return results return results
return preserve_context_over_deferred(defer.gatherResults( return logcontext.make_deferred_yieldable(defer.gatherResults(
cached_defers.values(), cached_defers.values(),
consumeErrors=True, consumeErrors=True,
).addCallback(update_results_dict).addErrback( ).addCallback(update_results_dict).addErrback(

View file

@ -310,6 +310,10 @@ def preserve_context_over_fn(fn, *args, **kwargs):
def preserve_context_over_deferred(deferred, context=None): def preserve_context_over_deferred(deferred, context=None):
"""Given a deferred wrap it such that any callbacks added later to it will """Given a deferred wrap it such that any callbacks added later to it will
be invoked with the current context. be invoked with the current context.
Deprecated: this almost certainly doesn't do want you want, ie make
the deferred follow the synapse logcontext rules: try
``make_deferred_yieldable`` instead.
""" """
if context is None: if context is None:
context = LoggingContext.current_context() context = LoggingContext.current_context()
@ -330,12 +334,8 @@ def preserve_fn(f):
LoggingContext.set_current_context(LoggingContext.sentinel) LoggingContext.set_current_context(LoggingContext.sentinel)
return result return result
# XXX: why is this here rather than inside g? surely we want to preserve
# the context from the time the function was called, not when it was
# wrapped?
current = LoggingContext.current_context()
def g(*args, **kwargs): def g(*args, **kwargs):
current = LoggingContext.current_context()
res = f(*args, **kwargs) res = f(*args, **kwargs)
if isinstance(res, defer.Deferred) and not res.called: if isinstance(res, defer.Deferred) and not res.called:
# The function will have reset the context before returning, so # The function will have reset the context before returning, so
@ -359,6 +359,25 @@ def preserve_fn(f):
return g return g
@defer.inlineCallbacks
def make_deferred_yieldable(deferred):
"""Given a deferred, make it follow the Synapse logcontext rules:
If the deferred has completed (or is not actually a Deferred), essentially
does nothing (just returns another completed deferred with the
result/failure).
If the deferred has not yet completed, resets the logcontext before
returning a deferred. Then, when the deferred completes, restores the
current logcontext before running callbacks/errbacks.
(This is more-or-less the opposite operation to preserve_fn.)
"""
with PreserveLoggingContext():
r = yield deferred
defer.returnValue(r)
# modules to ignore in `logcontext_tracer` # modules to ignore in `logcontext_tracer`
_to_ignore = [ _to_ignore = [
"synapse.util.logcontext", "synapse.util.logcontext",

View file

@ -56,7 +56,8 @@ def filter_events_for_clients(store, user_tuples, events, event_id_to_state):
events ([synapse.events.EventBase]): list of events to filter events ([synapse.events.EventBase]): list of events to filter
""" """
forgotten = yield preserve_context_over_deferred(defer.gatherResults([ forgotten = yield preserve_context_over_deferred(defer.gatherResults([
preserve_fn(store.who_forgot_in_room)( defer.maybeDeferred(
preserve_fn(store.who_forgot_in_room),
room_id, room_id,
) )
for room_id in frozenset(e.room_id for e in events) for room_id in frozenset(e.room_id for e in events)

View file

@ -19,10 +19,12 @@ from twisted.internet import defer
from mock import Mock from mock import Mock
from tests import unittest from tests import unittest
import re
def _regex(regex, exclusive=True): def _regex(regex, exclusive=True):
return { return {
"regex": regex, "regex": re.compile(regex),
"exclusive": exclusive "exclusive": exclusive
} }

View file

@ -199,7 +199,7 @@ class CacheDecoratorTestCase(unittest.TestCase):
a.func.prefill(("foo",), ObservableDeferred(d)) a.func.prefill(("foo",), ObservableDeferred(d))
self.assertEquals(a.func("foo").result, d.result) self.assertEquals(a.func("foo"), d.result)
self.assertEquals(callcount[0], 0) self.assertEquals(callcount[0], 0)
@defer.inlineCallbacks @defer.inlineCallbacks

View file

@ -12,11 +12,18 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import logging
import mock import mock
from synapse.api.errors import SynapseError
from synapse.util import async
from synapse.util import logcontext
from twisted.internet import defer from twisted.internet import defer
from synapse.util.caches import descriptors from synapse.util.caches import descriptors
from tests import unittest from tests import unittest
logger = logging.getLogger(__name__)
class DescriptorTestCase(unittest.TestCase): class DescriptorTestCase(unittest.TestCase):
@defer.inlineCallbacks @defer.inlineCallbacks
@ -84,3 +91,125 @@ class DescriptorTestCase(unittest.TestCase):
r = yield obj.fn(2, 5) r = yield obj.fn(2, 5)
self.assertEqual(r, 'chips') self.assertEqual(r, 'chips')
obj.mock.assert_not_called() obj.mock.assert_not_called()
def test_cache_logcontexts(self):
"""Check that logcontexts are set and restored correctly when
using the cache."""
complete_lookup = defer.Deferred()
class Cls(object):
@descriptors.cached()
def fn(self, arg1):
@defer.inlineCallbacks
def inner_fn():
with logcontext.PreserveLoggingContext():
yield complete_lookup
defer.returnValue(1)
return inner_fn()
@defer.inlineCallbacks
def do_lookup():
with logcontext.LoggingContext() as c1:
c1.name = "c1"
r = yield obj.fn(1)
self.assertEqual(logcontext.LoggingContext.current_context(),
c1)
defer.returnValue(r)
def check_result(r):
self.assertEqual(r, 1)
obj = Cls()
# set off a deferred which will do a cache lookup
d1 = do_lookup()
self.assertEqual(logcontext.LoggingContext.current_context(),
logcontext.LoggingContext.sentinel)
d1.addCallback(check_result)
# and another
d2 = do_lookup()
self.assertEqual(logcontext.LoggingContext.current_context(),
logcontext.LoggingContext.sentinel)
d2.addCallback(check_result)
# let the lookup complete
complete_lookup.callback(None)
return defer.gatherResults([d1, d2])
def test_cache_logcontexts_with_exception(self):
"""Check that the cache sets and restores logcontexts correctly when
the lookup function throws an exception"""
class Cls(object):
@descriptors.cached()
def fn(self, arg1):
@defer.inlineCallbacks
def inner_fn():
yield async.run_on_reactor()
raise SynapseError(400, "blah")
return inner_fn()
@defer.inlineCallbacks
def do_lookup():
with logcontext.LoggingContext() as c1:
c1.name = "c1"
try:
yield obj.fn(1)
self.fail("No exception thrown")
except SynapseError:
pass
self.assertEqual(logcontext.LoggingContext.current_context(),
c1)
obj = Cls()
# set off a deferred which will do a cache lookup
d1 = do_lookup()
self.assertEqual(logcontext.LoggingContext.current_context(),
logcontext.LoggingContext.sentinel)
return d1
@defer.inlineCallbacks
def test_cache_default_args(self):
class Cls(object):
def __init__(self):
self.mock = mock.Mock()
@descriptors.cached()
def fn(self, arg1, arg2=2, arg3=3):
return self.mock(arg1, arg2, arg3)
obj = Cls()
obj.mock.return_value = 'fish'
r = yield obj.fn(1, 2, 3)
self.assertEqual(r, 'fish')
obj.mock.assert_called_once_with(1, 2, 3)
obj.mock.reset_mock()
# a call with same params shouldn't call the mock again
r = yield obj.fn(1, 2)
self.assertEqual(r, 'fish')
obj.mock.assert_not_called()
obj.mock.reset_mock()
# a call with different params should call the mock again
obj.mock.return_value = 'chips'
r = yield obj.fn(2, 3)
self.assertEqual(r, 'chips')
obj.mock.assert_called_once_with(2, 3, 3)
obj.mock.reset_mock()
# the two values should now be cached
r = yield obj.fn(1, 2)
self.assertEqual(r, 'fish')
r = yield obj.fn(2, 3)
self.assertEqual(r, 'chips')
obj.mock.assert_not_called()

View file

@ -53,7 +53,9 @@ class SnapshotCacheTestCase(unittest.TestCase):
# before the cache expires returns a resolved deferred. # before the cache expires returns a resolved deferred.
get_result_at_11 = self.cache.get(11, "key") get_result_at_11 = self.cache.get(11, "key")
self.assertIsNotNone(get_result_at_11) self.assertIsNotNone(get_result_at_11)
self.assertTrue(get_result_at_11.called) if isinstance(get_result_at_11, Deferred):
# The cache may return the actual result rather than a deferred
self.assertTrue(get_result_at_11.called)
# Check that getting the key after the deferred has resolved # Check that getting the key after the deferred has resolved
# after the cache expires returns None # after the cache expires returns None