Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes

This commit is contained in:
Erik Johnston 2018-10-02 14:39:30 +01:00
commit 7fa156af80
79 changed files with 849 additions and 479 deletions

View file

@ -1,5 +1,27 @@
version: 2
jobs:
dockerhubuploadrelease:
machine: true
steps:
- checkout
- run: docker build -f docker/Dockerfile -t matrixdotorg/synapse:${CIRCLE_TAG} .
- run: docker build -f docker/Dockerfile -t matrixdotorg/synapse:${CIRCLE_TAG}-py3 --build-arg PYTHON_VERSION=3.6 .
- run: docker login --username $DOCKER_HUB_USERNAME --password $DOCKER_HUB_PASSWORD
- run: docker push matrixdotorg/synapse:${CIRCLE_TAG}
- run: docker push matrixdotorg/synapse:${CIRCLE_TAG}-py3
dockerhubuploadlatest:
machine: true
steps:
- checkout
- run: docker build -f docker/Dockerfile -t matrixdotorg/synapse:${CIRCLE_SHA1} .
- run: docker build -f docker/Dockerfile -t matrixdotorg/synapse:${CIRCLE_SHA1}-py3 --build-arg PYTHON_VERSION=3.6 .
- run: docker login --username $DOCKER_HUB_USERNAME --password $DOCKER_HUB_PASSWORD
- run: docker tag matrixdotorg/synapse:${CIRCLE_SHA1} matrixdotorg/synapse:latest
- run: docker tag matrixdotorg/synapse:${CIRCLE_SHA1}-py3 matrixdotorg/synapse:latest-py3
- run: docker push matrixdotorg/synapse:${CIRCLE_SHA1}
- run: docker push matrixdotorg/synapse:${CIRCLE_SHA1}-py3
- run: docker push matrixdotorg/synapse:latest
- run: docker push matrixdotorg/synapse:latest-py3
sytestpy2:
machine: true
steps:
@ -99,23 +121,45 @@ workflows:
version: 2
build:
jobs:
- sytestpy2
- sytestpy2postgres
- sytestpy3
- sytestpy3postgres
- sytestpy2:
filters:
branches:
only: /develop|master|release-.*/
- sytestpy2postgres:
filters:
branches:
only: /develop|master|release-.*/
- sytestpy3:
filters:
branches:
only: /develop|master|release-.*/
- sytestpy3postgres:
filters:
branches:
only: /develop|master|release-.*/
- sytestpy2merged:
filters:
branches:
ignore: /develop|master/
ignore: /develop|master|release-.*/
- sytestpy2postgresmerged:
filters:
branches:
ignore: /develop|master/
ignore: /develop|master|release-.*/
- sytestpy3merged:
filters:
branches:
ignore: /develop|master/
ignore: /develop|master|release-.*/
- sytestpy3postgresmerged:
filters:
branches:
ignore: /develop|master/
ignore: /develop|master|release-.*/
- dockerhubuploadrelease:
filters:
tags:
only: /^v[0-9].[0-9]+.[0-9]+(.[0-9]+)?/
branches:
ignore: /.*/
- dockerhubuploadlatest:
filters:
branches:
only: master

View file

@ -9,12 +9,15 @@ source $BASH_ENV
if [[ -z "${CIRCLE_PR_NUMBER}" ]]
then
echo "Can't figure out what the PR number is!"
exit 1
fi
echo "Can't figure out what the PR number is! Assuming merge target is develop."
# Get the reference, using the GitHub API
GITBASE=`curl -q https://api.github.com/repos/matrix-org/synapse/pulls/${CIRCLE_PR_NUMBER} | jq -r '.base.ref'`
# It probably hasn't had a PR opened yet. Since all PRs land on develop, we
# can probably assume it's based on it and will be merged into it.
GITBASE="develop"
else
# Get the reference, using the GitHub API
GITBASE=`curl -q https://api.github.com/repos/matrix-org/synapse/pulls/${CIRCLE_PR_NUMBER} | jq -r '.base.ref'`
fi
# Show what we are before
git show -s

View file

@ -20,6 +20,9 @@ matrix:
- python: 2.7
env: TOX_ENV=py27
- python: 2.7
env: TOX_ENV=py27-old
- python: 2.7
env: TOX_ENV=py27-postgres TRIAL_FLAGS="-j 4"
services:

View file

@ -81,7 +81,7 @@ Thanks for using Matrix!
Synapse Installation
====================
Synapse is the reference python/twisted Matrix homeserver implementation.
Synapse is the reference Python/Twisted Matrix homeserver implementation.
System requirements:
@ -91,12 +91,13 @@ System requirements:
Installing from source
----------------------
(Prebuilt packages are available for some platforms - see `Platform-Specific
Instructions`_.)
Synapse is written in python but some of the libraries it uses are written in
C. So before we can install synapse itself we need a working C compiler and the
header files for python C extensions.
Synapse is written in Python but some of the libraries it uses are written in
C. So before we can install Synapse itself we need a working C compiler and the
header files for Python C extensions.
Installing prerequisites on Ubuntu or Debian::
@ -143,18 +144,24 @@ Installing prerequisites on OpenBSD::
doas pkg_add python libffi py-pip py-setuptools sqlite3 py-virtualenv \
libxslt
To install the synapse homeserver run::
To install the Synapse homeserver run::
virtualenv -p python2.7 ~/.synapse
source ~/.synapse/bin/activate
pip install --upgrade pip
pip install --upgrade setuptools
pip install https://github.com/matrix-org/synapse/tarball/master
pip install matrix-synapse
This installs synapse, along with the libraries it uses, into a virtual
This installs Synapse, along with the libraries it uses, into a virtual
environment under ``~/.synapse``. Feel free to pick a different directory
if you prefer.
This Synapse installation can then be later upgraded by using pip again with the
update flag::
source ~/.synapse/bin/activate
pip install -U matrix-synapse
In case of problems, please see the _`Troubleshooting` section below.
There is an offical synapse image available at
@ -167,7 +174,7 @@ Alternatively, Andreas Peters (previously Silvio Fricke) has contributed a
Dockerfile to automate a synapse server in a single Docker image, at
https://hub.docker.com/r/avhost/docker-matrix/tags/
Configuring synapse
Configuring Synapse
-------------------
Before you can start Synapse, you will need to generate a configuration
@ -249,26 +256,6 @@ Setting up a TURN server
For reliable VoIP calls to be routed via this homeserver, you MUST configure
a TURN server. See `<docs/turn-howto.rst>`_ for details.
IPv6
----
As of Synapse 0.19 we finally support IPv6, many thanks to @kyrias and @glyph
for providing PR #1696.
However, for federation to work on hosts with IPv6 DNS servers you **must**
be running Twisted 17.1.0 or later - see https://github.com/matrix-org/synapse/issues/1002
for details. We can't make Synapse depend on Twisted 17.1 by default
yet as it will break most older distributions (see https://github.com/matrix-org/synapse/pull/1909)
so if you are using operating system dependencies you'll have to install your
own Twisted 17.1 package via pip or backports etc.
If you're running in a virtualenv then pip should have installed the newest
Twisted automatically, but if your virtualenv is old you will need to manually
upgrade to a newer Twisted dependency via:
pip install Twisted>=17.1.0
Running Synapse
===============
@ -444,8 +431,7 @@ settings require a slightly more difficult installation process.
using the ``.`` command, rather than ``bash``'s ``source``.
5) Optionally, use ``pip`` to install ``lxml``, which Synapse needs to parse
webpages for their titles.
6) Use ``pip`` to install this repository: ``pip install
https://github.com/matrix-org/synapse/tarball/master``
6) Use ``pip`` to install this repository: ``pip install matrix-synapse``
7) Optionally, change ``_synapse``'s shell to ``/bin/false`` to reduce the
chance of a compromised Synapse server being used to take over your box.
@ -473,7 +459,7 @@ Troubleshooting
Troubleshooting Installation
----------------------------
Synapse requires pip 1.7 or later, so if your OS provides too old a version you
Synapse requires pip 8 or later, so if your OS provides too old a version you
may need to manually upgrade it::
sudo pip install --upgrade pip
@ -508,28 +494,6 @@ failing, e.g.::
pip install twisted
On OS X, if you encounter clang: error: unknown argument: '-mno-fused-madd' you
will need to export CFLAGS=-Qunused-arguments.
Troubleshooting Running
-----------------------
If synapse fails with ``missing "sodium.h"`` crypto errors, you may need
to manually upgrade PyNaCL, as synapse uses NaCl (https://nacl.cr.yp.to/) for
encryption and digital signatures.
Unfortunately PyNACL currently has a few issues
(https://github.com/pyca/pynacl/issues/53) and
(https://github.com/pyca/pynacl/issues/79) that mean it may not install
correctly, causing all tests to fail with errors about missing "sodium.h". To
fix try re-installing from PyPI or directly from
(https://github.com/pyca/pynacl)::
# Install from PyPI
pip install --user --upgrade --force pynacl
# Install from github
pip install --user https://github.com/pyca/pynacl/tarball/master
Running out of File Handles
~~~~~~~~~~~~~~~~~~~~~~~~~~~

View file

@ -18,7 +18,7 @@ instructions that may be required are listed later in this document.
.. code:: bash
pip install --upgrade --process-dependency-links https://github.com/matrix-org/synapse/tarball/master
pip install --upgrade --process-dependency-links matrix-synapse
# restart synapse
synctl restart
@ -48,11 +48,11 @@ returned by the Client-Server API:
# configured on port 443.
curl -kv https://<host.name>/_matrix/client/versions 2>&1 | grep "Server:"
Upgrading to $NEXT_VERSION
Upgrading to v0.27.3
====================
This release expands the anonymous usage stats sent if the opt-in
``report_stats`` configuration is set to ``true``. We now capture RSS memory
``report_stats`` configuration is set to ``true``. We now capture RSS memory
and cpu use at a very coarse level. This requires administrators to install
the optional ``psutil`` python module.

1
changelog.d/3794.misc Normal file
View file

@ -0,0 +1 @@
Speed up calculation of typing updates for replication

1
changelog.d/3836.bugfix Normal file
View file

@ -0,0 +1 @@
support registering regular users non-interactively with register_new_matrix_user script

1
changelog.d/3933.misc Normal file
View file

@ -0,0 +1 @@
Add a cache to get_destination_retry_timings

1
changelog.d/3938.bugfix Normal file
View file

@ -0,0 +1 @@
Sending server notices regarding user consent now works on Python 3.

1
changelog.d/3946.misc Normal file
View file

@ -0,0 +1 @@
Automate pushes to docker hub

1
changelog.d/3952.misc Normal file
View file

@ -0,0 +1 @@
Run the test suite on the oldest supported versions of our dependencies in CI.

1
changelog.d/3957.misc Normal file
View file

@ -0,0 +1 @@
CircleCI now only runs merged jobs on PRs, and commit jobs on develop, master, and release branches.

1
changelog.d/3958.misc Normal file
View file

@ -0,0 +1 @@
Fix docstrings and add tests for state store methods

1
changelog.d/3959.feature Normal file
View file

@ -0,0 +1 @@
Include eventid in log lines when processing incoming federation transactions

1
changelog.d/3960.bugfix Normal file
View file

@ -0,0 +1 @@
Fix error message for events with m.room.create missing from auth_events

1
changelog.d/3961.bugfix Normal file
View file

@ -0,0 +1 @@
Fix errors due to concurrent monthly_active_user upserts

1
changelog.d/3963.misc Normal file
View file

@ -0,0 +1 @@
fix docstring for FederationClient.get_state_for_room

1
changelog.d/3964.feature Normal file
View file

@ -0,0 +1 @@
Remove spurious check which made 'localhost' servers not work

1
changelog.d/3965.misc Normal file
View file

@ -0,0 +1 @@
Run notify_app_services as a bg process

1
changelog.d/3966.misc Normal file
View file

@ -0,0 +1 @@
Improve the logging when handling a federation transaction

1
changelog.d/3967.misc Normal file
View file

@ -0,0 +1 @@
Clarifications in FederationHandler

1
changelog.d/3968.bugfix Normal file
View file

@ -0,0 +1 @@
Fix exceptions when processing incoming events over federation

1
changelog.d/3970.bugfix Normal file
View file

@ -0,0 +1 @@
Replaced all occurences of e.message with str(e). Contributed by Schnuffle

1
changelog.d/3972.misc Normal file
View file

@ -0,0 +1 @@
Further reduce the docker image size

1
changelog.d/3976.misc Normal file
View file

@ -0,0 +1 @@
Build py3 docker images for docker hub too

1
changelog.d/3980.bugfix Normal file
View file

@ -0,0 +1 @@
Fix some instances of ExpiringCache not expiring cache items

1
changelog.d/3985.misc Normal file
View file

@ -0,0 +1 @@
Updated the installation instructions to point to the matrix-synapse package on PyPI.

1
changelog.d/3986.bugfix Normal file
View file

@ -0,0 +1 @@
Fix lazy loaded sync in the presence of rejected state events

1
changelog.d/3987.misc Normal file
View file

@ -0,0 +1 @@
Disable USE_FROZEN_DICTS for unittests by default.

1
changelog.d/3989.misc Normal file
View file

@ -0,0 +1 @@
Improve stacktraces in certain exceptions in the logs

1
changelog.d/3990.bugfix Normal file
View file

@ -0,0 +1 @@
Fix error when logging incomplete HTTP requests

View file

@ -1,9 +1,13 @@
ARG PYTHON_VERSION=2
FROM docker.io/python:${PYTHON_VERSION}-alpine3.8
COPY . /synapse
###
### Stage 0: builder
###
FROM docker.io/python:${PYTHON_VERSION}-alpine3.8 as builder
RUN apk add --no-cache --virtual .build_deps \
# install the OS build deps
RUN apk add \
build-base \
libffi-dev \
libjpeg-turbo-dev \
@ -11,30 +15,47 @@ RUN apk add --no-cache --virtual .build_deps \
libxslt-dev \
linux-headers \
postgresql-dev \
zlib-dev \
&& cd /synapse \
&& apk add --no-cache --virtual .runtime_deps \
libffi \
libjpeg-turbo \
libressl \
libxslt \
libpq \
zlib \
su-exec \
&& pip install --upgrade \
zlib-dev
# build things which have slow build steps, before we copy synapse, so that
# the layer can be cached.
#
# (we really just care about caching a wheel here, as the "pip install" below
# will install them again.)
RUN pip install --prefix="/install" --no-warn-script-location \
cryptography \
msgpack-python \
pillow \
pynacl
# now install synapse and all of the python deps to /install.
COPY . /synapse
RUN pip install --prefix="/install" --no-warn-script-location \
lxml \
pip \
psycopg2 \
setuptools \
&& mkdir -p /synapse/cache \
&& pip install -f /synapse/cache --upgrade --process-dependency-links . \
&& mv /synapse/docker/start.py /synapse/docker/conf / \
&& rm -rf \
setup.cfg \
setup.py \
synapse \
&& apk del .build_deps
/synapse
###
### Stage 1: runtime
###
FROM docker.io/python:${PYTHON_VERSION}-alpine3.8
RUN apk add --no-cache --virtual .runtime_deps \
libffi \
libjpeg-turbo \
libressl \
libxslt \
libpq \
zlib \
su-exec
COPY --from=builder /install /usr/local
COPY ./docker/start.py /start.py
COPY ./docker/conf /conf
VOLUME ["/data"]
EXPOSE 8008/tcp 8448/tcp

View file

@ -21,4 +21,4 @@ try:
verifier.verify(macaroon, key)
print "Signature is correct"
except Exception as e:
print e.message
print str(e)

View file

@ -0,0 +1,9 @@
#!/bin/bash
set -e
# Fetch the current GitHub issue number, add one to it -- presto! The likely
# next PR number.
CURRENT_NUMBER=`curl -s "https://api.github.com/repos/matrix-org/synapse/issues?state=all&per_page=1" | jq -r ".[0].number"`
CURRENT_NUMBER=$((CURRENT_NUMBER+1))
echo $CURRENT_NUMBER

View file

@ -133,7 +133,7 @@ def register_new_user(user, password, server_location, shared_secret, admin):
print "Passwords do not match"
sys.exit(1)
if not admin:
if admin is None:
admin = raw_input("Make admin [no]: ")
if admin in ("y", "yes", "true"):
admin = True
@ -160,10 +160,16 @@ if __name__ == "__main__":
default=None,
help="New password for user. Will prompt if omitted.",
)
parser.add_argument(
admin_group = parser.add_mutually_exclusive_group()
admin_group.add_argument(
"-a", "--admin",
action="store_true",
help="Register new user as an admin. Will prompt if omitted.",
help="Register new user as an admin. Will prompt if --no-admin is not set either.",
)
admin_group.add_argument(
"--no-admin",
action="store_true",
help="Register new user as a regular user. Will prompt if --admin is not set either.",
)
group = parser.add_mutually_exclusive_group(required=True)
@ -197,4 +203,8 @@ if __name__ == "__main__":
else:
secret = args.shared_secret
register_new_user(args.user, args.password, args.server_url, secret, args.admin)
admin = None
if args.admin or args.no_admin:
admin = args.admin
register_new_user(args.user, args.password, args.server_url, secret, admin)

View file

@ -226,7 +226,7 @@ class Filtering(object):
jsonschema.validate(user_filter_json, USER_FILTER_SCHEMA,
format_checker=FormatChecker())
except jsonschema.ValidationError as e:
raise SynapseError(400, e.message)
raise SynapseError(400, str(e))
class FilterCollection(object):

View file

@ -64,7 +64,7 @@ class ConsentURIBuilder(object):
"""
mac = hmac.new(
key=self._hmac_secret,
msg=user_id,
msg=user_id.encode('ascii'),
digestmod=sha256,
).hexdigest()
consent_uri = "%s_matrix/consent?%s" % (

View file

@ -24,7 +24,7 @@ try:
python_dependencies.check_requirements()
except python_dependencies.MissingRequirementError as e:
message = "\n".join([
"Missing Requirement: %s" % (e.message,),
"Missing Requirement: %s" % (str(e),),
"To install run:",
" pip install --upgrade --force \"%s\"" % (e.dependency,),
"",

View file

@ -136,7 +136,7 @@ def start(config_options):
"Synapse appservice", config_options
)
except ConfigError as e:
sys.stderr.write("\n" + e.message + "\n")
sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
assert config.worker_app == "synapse.app.appservice"

View file

@ -153,7 +153,7 @@ def start(config_options):
"Synapse client reader", config_options
)
except ConfigError as e:
sys.stderr.write("\n" + e.message + "\n")
sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
assert config.worker_app == "synapse.app.client_reader"

View file

@ -169,7 +169,7 @@ def start(config_options):
"Synapse event creator", config_options
)
except ConfigError as e:
sys.stderr.write("\n" + e.message + "\n")
sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
assert config.worker_app == "synapse.app.event_creator"

View file

@ -140,7 +140,7 @@ def start(config_options):
"Synapse federation reader", config_options
)
except ConfigError as e:
sys.stderr.write("\n" + e.message + "\n")
sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
assert config.worker_app == "synapse.app.federation_reader"

View file

@ -160,7 +160,7 @@ def start(config_options):
"Synapse federation sender", config_options
)
except ConfigError as e:
sys.stderr.write("\n" + e.message + "\n")
sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
assert config.worker_app == "synapse.app.federation_sender"

View file

@ -228,7 +228,7 @@ def start(config_options):
"Synapse frontend proxy", config_options
)
except ConfigError as e:
sys.stderr.write("\n" + e.message + "\n")
sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
assert config.worker_app == "synapse.app.frontend_proxy"

View file

@ -301,7 +301,7 @@ class SynapseHomeServer(HomeServer):
try:
database_engine.check_database(db_conn.cursor())
except IncorrectDatabaseSetup as e:
quit_with_error(e.message)
quit_with_error(str(e))
# Gauges to expose monthly active user control metrics
@ -328,7 +328,7 @@ def setup(config_options):
config_options,
)
except ConfigError as e:
sys.stderr.write("\n" + e.message + "\n")
sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
if not config:
@ -386,7 +386,6 @@ def setup(config_options):
hs.get_pusherpool().start()
hs.get_datastore().start_profiling()
hs.get_datastore().start_doing_background_updates()
hs.get_federation_client().start_get_pdu_cache()
reactor.callWhenRunning(start)

View file

@ -133,7 +133,7 @@ def start(config_options):
"Synapse media repository", config_options
)
except ConfigError as e:
sys.stderr.write("\n" + e.message + "\n")
sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
assert config.worker_app == "synapse.app.media_repository"

View file

@ -191,7 +191,7 @@ def start(config_options):
"Synapse pusher", config_options
)
except ConfigError as e:
sys.stderr.write("\n" + e.message + "\n")
sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
assert config.worker_app == "synapse.app.pusher"

View file

@ -410,7 +410,7 @@ def start(config_options):
"Synapse synchrotron", config_options
)
except ConfigError as e:
sys.stderr.write("\n" + e.message + "\n")
sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
assert config.worker_app == "synapse.app.synchrotron"

View file

@ -188,7 +188,7 @@ def start(config_options):
"Synapse user directory", config_options
)
except ConfigError as e:
sys.stderr.write("\n" + e.message + "\n")
sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
assert config.worker_app == "synapse.app.user_dir"

View file

@ -25,7 +25,7 @@ if __name__ == "__main__":
try:
config = HomeServerConfig.load_config("", sys.argv[3:])
except ConfigError as e:
sys.stderr.write("\n" + e.message + "\n")
sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
print (getattr(config, key))

View file

@ -98,9 +98,9 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True):
creation_event = auth_events.get((EventTypes.Create, ""), None)
if not creation_event:
raise SynapseError(
raise AuthError(
403,
"Room %r does not exist" % (event.room_id,)
"No create event in auth events",
)
creating_domain = get_domain_from_id(event.room_id)

View file

@ -13,15 +13,22 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from distutils.util import strtobool
import six
from synapse.util.caches import intern_dict
from synapse.util.frozenutils import freeze
# Whether we should use frozen_dict in FrozenEvent. Using frozen_dicts prevents
# bugs where we accidentally share e.g. signature dicts. However, converting
# a dict to frozen_dicts is expensive.
USE_FROZEN_DICTS = True
# bugs where we accidentally share e.g. signature dicts. However, converting a
# dict to frozen_dicts is expensive.
#
# NOTE: This is overridden by the configuration by the Synapse worker apps, but
# for the sake of tests, it is set here while it cannot be configured on the
# homeserver object itself.
USE_FROZEN_DICTS = strtobool(os.environ.get("SYNAPSE_USE_FROZEN_DICTS", "0"))
class _EventInternalMetadata(object):

View file

@ -209,8 +209,6 @@ class FederationClient(FederationBase):
Will attempt to get the PDU from each destination in the list until
one succeeds.
This will persist the PDU locally upon receipt.
Args:
destinations (list): Which home servers to query
event_id (str): event to fetch
@ -289,8 +287,7 @@ class FederationClient(FederationBase):
@defer.inlineCallbacks
@log_function
def get_state_for_room(self, destination, room_id, event_id):
"""Requests all of the `current` state PDUs for a given room from
a remote home server.
"""Requests all of the room state at a given event from a remote home server.
Args:
destination (str): The remote homeserver to query for the state.
@ -298,9 +295,10 @@ class FederationClient(FederationBase):
event_id (str): The id of the event we want the state at.
Returns:
Deferred: Results in a list of PDUs.
Deferred[Tuple[List[EventBase], List[EventBase]]]:
A list of events in the state, and a list of events in the auth chain
for the given event.
"""
try:
# First we try and ask for just the IDs, as thats far quicker if
# we have most of the state and auth_chain already.

View file

@ -46,6 +46,7 @@ from synapse.replication.http.federation import (
from synapse.types import get_domain_from_id
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.logcontext import nested_logging_context
from synapse.util.logutils import log_function
# when processing incoming transactions, we try to handle multiple rooms in
@ -187,21 +188,22 @@ class FederationServer(FederationBase):
for pdu in pdus_by_room[room_id]:
event_id = pdu.event_id
try:
yield self._handle_received_pdu(
origin, pdu
)
pdu_results[event_id] = {}
except FederationError as e:
logger.warn("Error handling PDU %s: %s", event_id, e)
pdu_results[event_id] = {"error": str(e)}
except Exception as e:
f = failure.Failure()
pdu_results[event_id] = {"error": str(e)}
logger.error(
"Failed to handle PDU %s: %s",
event_id, f.getTraceback().rstrip(),
)
with nested_logging_context(event_id):
try:
yield self._handle_received_pdu(
origin, pdu
)
pdu_results[event_id] = {}
except FederationError as e:
logger.warn("Error handling PDU %s: %s", event_id, e)
pdu_results[event_id] = {"error": str(e)}
except Exception as e:
f = failure.Failure()
pdu_results[event_id] = {"error": str(e)}
logger.error(
"Failed to handle PDU %s: %s",
event_id, f.getTraceback().rstrip(),
)
yield concurrently_execute(
process_pdus_for_room, pdus_by_room.keys(),

View file

@ -137,26 +137,6 @@ class TransactionQueue(object):
self._processing_pending_presence = False
def can_send_to(self, destination):
"""Can we send messages to the given server?
We can't send messages to ourselves. If we are running on localhost
then we can only federation with other servers running on localhost.
Otherwise we only federate with servers on a public domain.
Args:
destination(str): The server we are possibly trying to send to.
Returns:
bool: True if we can send to the server.
"""
if destination == self.server_name:
return False
if self.server_name.startswith("localhost"):
return destination.startswith("localhost")
else:
return not destination.startswith("localhost")
def notify_new_events(self, current_id):
"""This gets called when we have some new events we might want to
send out to other servers.
@ -279,10 +259,7 @@ class TransactionQueue(object):
self._order += 1
destinations = set(destinations)
destinations = set(
dest for dest in destinations if self.can_send_to(dest)
)
destinations.discard(self.server_name)
logger.debug("Sending to: %s", str(destinations))
if not destinations:
@ -358,7 +335,7 @@ class TransactionQueue(object):
for destinations, states in hosts_and_states:
for destination in destinations:
if not self.can_send_to(destination):
if destination == self.server_name:
continue
self.pending_presence_by_dest.setdefault(
@ -377,7 +354,8 @@ class TransactionQueue(object):
content=content,
)
if not self.can_send_to(destination):
if destination == self.server_name:
logger.info("Not sending EDU to ourselves")
return
sent_edus_counter.inc()
@ -392,10 +370,8 @@ class TransactionQueue(object):
self._attempt_new_transaction(destination)
def send_device_messages(self, destination):
if destination == self.server_name or destination == "localhost":
return
if not self.can_send_to(destination):
if destination == self.server_name:
logger.info("Not sending device update to ourselves")
return
self._attempt_new_transaction(destination)

View file

@ -341,7 +341,7 @@ class E2eKeysHandler(object):
def _exception_to_failure(e):
if isinstance(e, CodeMessageException):
return {
"status": e.code, "message": e.message,
"status": e.code, "message": str(e),
}
if isinstance(e, NotRetryingDestination):

View file

@ -18,7 +18,6 @@
import itertools
import logging
import sys
import six
from six import iteritems, itervalues
@ -106,7 +105,7 @@ class FederationHandler(BaseHandler):
self.hs = hs
self.store = hs.get_datastore()
self.store = hs.get_datastore() # type: synapse.storage.DataStore
self.federation_client = hs.get_federation_client()
self.state_handler = hs.get_state_handler()
self.server_name = hs.hostname
@ -323,14 +322,22 @@ class FederationHandler(BaseHandler):
affected=pdu.event_id,
)
# Calculate the state of the previous events, and
# de-conflict them to find the current state.
state_groups = []
# Calculate the state after each of the previous events, and
# resolve them to find the correct state at the current event.
auth_chains = set()
event_map = {
event_id: pdu,
}
try:
# Get the state of the events we know about
ours = yield self.store.get_state_groups(room_id, list(seen))
state_groups.append(ours)
ours = yield self.store.get_state_groups_ids(room_id, seen)
# state_maps is a list of mappings from (type, state_key) to event_id
# type: list[dict[tuple[str, str], str]]
state_maps = list(ours.values())
# we don't need this any more, let's delete it.
del ours
# Ask the remote server for the states we don't
# know about
@ -339,27 +346,65 @@ class FederationHandler(BaseHandler):
"[%s %s] Requesting state at missing prev_event %s",
room_id, event_id, p,
)
state, got_auth_chain = (
yield self.federation_client.get_state_for_room(
origin, room_id, p,
with logcontext.nested_logging_context(p):
# note that if any of the missing prevs share missing state or
# auth events, the requests to fetch those events are deduped
# by the get_pdu_cache in federation_client.
remote_state, got_auth_chain = (
yield self.federation_client.get_state_for_room(
origin, room_id, p,
)
)
)
auth_chains.update(got_auth_chain)
state_group = {(x.type, x.state_key): x.event_id for x in state}
state_groups.append(state_group)
# we want the state *after* p; get_state_for_room returns the
# state *before* p.
remote_event = yield self.federation_client.get_pdu(
[origin], p, outlier=True,
)
if remote_event is None:
raise Exception(
"Unable to get missing prev_event %s" % (p, )
)
if remote_event.is_state():
remote_state.append(remote_event)
# XXX hrm I'm not convinced that duplicate events will compare
# for equality, so I'm not sure this does what the author
# hoped.
auth_chains.update(got_auth_chain)
remote_state_map = {
(x.type, x.state_key): x.event_id for x in remote_state
}
state_maps.append(remote_state_map)
for x in remote_state:
event_map[x.event_id] = x
# Resolve any conflicting state
@defer.inlineCallbacks
def fetch(ev_ids):
return self.store.get_events(
ev_ids, get_prev_content=False, check_redacted=False
fetched = yield self.store.get_events(
ev_ids, get_prev_content=False, check_redacted=False,
)
# add any events we fetch here to the `event_map` so that we
# can use them to build the state event list below.
event_map.update(fetched)
defer.returnValue(fetched)
room_version = yield self.store.get_room_version(room_id)
state_map = yield resolve_events_with_factory(
room_version, state_groups, {event_id: pdu}, fetch
room_version, state_maps, event_map, fetch,
)
state = (yield self.store.get_events(state_map.values())).values()
# we need to give _process_received_pdu the actual state events
# rather than event ids, so generate that now.
state = [
event_map[e] for e in six.itervalues(state_map)
]
auth_chain = list(auth_chains)
except Exception:
logger.warn(
@ -483,20 +528,21 @@ class FederationHandler(BaseHandler):
"[%s %s] Handling received prev_event %s",
room_id, event_id, ev.event_id,
)
try:
yield self.on_receive_pdu(
origin,
ev,
sent_to_us_directly=False,
)
except FederationError as e:
if e.code == 403:
logger.warn(
"[%s %s] Received prev_event %s failed history check.",
room_id, event_id, ev.event_id,
with logcontext.nested_logging_context(ev.event_id):
try:
yield self.on_receive_pdu(
origin,
ev,
sent_to_us_directly=False,
)
else:
raise
except FederationError as e:
if e.code == 403:
logger.warn(
"[%s %s] Received prev_event %s failed history check.",
room_id, event_id, ev.event_id,
)
else:
raise
@defer.inlineCallbacks
def _process_received_pdu(self, origin, event, state, auth_chain):
@ -572,6 +618,10 @@ class FederationHandler(BaseHandler):
})
seen_ids.add(e.event_id)
logger.info(
"[%s %s] persisting newly-received auth/state events %s",
room_id, event_id, [e["event"].event_id for e in event_infos]
)
yield self._handle_new_events(origin, event_infos)
try:
@ -1135,7 +1185,8 @@ class FederationHandler(BaseHandler):
try:
logger.info("Processing queued PDU %s which was received "
"while we were joining %s", p.event_id, p.room_id)
yield self.on_receive_pdu(origin, p, sent_to_us_directly=True)
with logcontext.nested_logging_context(p.event_id):
yield self.on_receive_pdu(origin, p, sent_to_us_directly=True)
except Exception as e:
logger.warn(
"Error handling queued PDU %s from %s: %s",
@ -1550,6 +1601,9 @@ class FederationHandler(BaseHandler):
auth_events=auth_events,
)
# reraise does not allow inlineCallbacks to preserve the stacktrace, so we
# hack around with a try/finally instead.
success = False
try:
if not event.internal_metadata.is_outlier() and not backfilled:
yield self.action_generator.handle_push_actions_for_event(
@ -1560,15 +1614,13 @@ class FederationHandler(BaseHandler):
[(event, context)],
backfilled=backfilled,
)
except: # noqa: E722, as we reraise the exception this is fine.
tp, value, tb = sys.exc_info()
logcontext.run_in_background(
self.store.remove_push_actions_from_staging,
event.event_id,
)
six.reraise(tp, value, tb)
success = True
finally:
if not success:
logcontext.run_in_background(
self.store.remove_push_actions_from_staging,
event.event_id,
)
defer.returnValue(context)
@ -1581,15 +1633,22 @@ class FederationHandler(BaseHandler):
Notifies about the events where appropriate.
"""
contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
logcontext.run_in_background(
self._prep_event,
@defer.inlineCallbacks
def prep(ev_info):
event = ev_info["event"]
with logcontext.nested_logging_context(suffix=event.event_id):
res = yield self._prep_event(
origin,
ev_info["event"],
event,
state=ev_info.get("state"),
auth_events=ev_info.get("auth_events"),
)
defer.returnValue(res)
contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
logcontext.run_in_background(prep, ev_info)
for ev_info in event_infos
], consumeErrors=True,
))

View file

@ -14,9 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import sys
import six
from six import iteritems, itervalues, string_types
from canonicaljson import encode_canonical_json, json
@ -624,6 +622,9 @@ class EventCreationHandler(object):
event, context
)
# reraise does not allow inlineCallbacks to preserve the stacktrace, so we
# hack around with a try/finally instead.
success = False
try:
# If we're a worker we need to hit out to the master.
if self.config.worker_app:
@ -636,6 +637,7 @@ class EventCreationHandler(object):
ratelimit=ratelimit,
extra_users=extra_users,
)
success = True
return
yield self.persist_and_notify_client_event(
@ -645,17 +647,16 @@ class EventCreationHandler(object):
ratelimit=ratelimit,
extra_users=extra_users,
)
except: # noqa: E722, as we reraise the exception this is fine.
# Ensure that we actually remove the entries in the push actions
# staging area, if we calculated them.
tp, value, tb = sys.exc_info()
run_in_background(
self.store.remove_push_actions_from_staging,
event.event_id,
)
six.reraise(tp, value, tb)
success = True
finally:
if not success:
# Ensure that we actually remove the entries in the push actions
# staging area, if we calculated them.
run_in_background(
self.store.remove_push_actions_from_staging,
event.event_id,
)
@defer.inlineCallbacks
def persist_and_notify_client_event(

View file

@ -278,7 +278,7 @@ class BaseProfileHandler(BaseHandler):
except Exception as e:
logger.warn(
"Failed to update join event for room %s - %s",
room_id, str(e.message)
room_id, str(e)
)

View file

@ -571,13 +571,13 @@ class SyncHandler(object):
# be a valid name or canonical_alias - i.e. we're checking that they
# haven't been "deleted" by blatting {} over the top.
if name_id:
name = yield self.store.get_event(name_id, allow_none=False)
name = yield self.store.get_event(name_id, allow_none=True)
if name and name.content:
defer.returnValue(summary)
if canonical_alias_id:
canonical_alias = yield self.store.get_event(
canonical_alias_id, allow_none=False,
canonical_alias_id, allow_none=True,
)
if canonical_alias and canonical_alias.content:
defer.returnValue(summary)

View file

@ -20,6 +20,7 @@ from twisted.internet import defer
from synapse.api.errors import AuthError, SynapseError
from synapse.types import UserID, get_domain_from_id
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.logcontext import run_in_background
from synapse.util.metrics import Measure
from synapse.util.wheel_timer import WheelTimer
@ -68,6 +69,11 @@ class TypingHandler(object):
# map room IDs to sets of users currently typing
self._room_typing = {}
# caches which room_ids changed at which serials
self._typing_stream_change_cache = StreamChangeCache(
"TypingStreamChangeCache", self._latest_room_serial,
)
self.clock.looping_call(
self._handle_timeouts,
5000,
@ -218,6 +224,7 @@ class TypingHandler(object):
for domain in set(get_domain_from_id(u) for u in users):
if domain != self.server_name:
logger.debug("sending typing update to %s", domain)
self.federation.send_edu(
destination=domain,
edu_type="m.typing",
@ -274,19 +281,29 @@ class TypingHandler(object):
self._latest_room_serial += 1
self._room_serials[member.room_id] = self._latest_room_serial
self._typing_stream_change_cache.entity_has_changed(
member.room_id, self._latest_room_serial,
)
self.notifier.on_new_event(
"typing_key", self._latest_room_serial, rooms=[member.room_id]
)
def get_all_typing_updates(self, last_id, current_id):
# TODO: Work out a way to do this without scanning the entire state.
if last_id == current_id:
return []
changed_rooms = self._typing_stream_change_cache.get_all_entities_changed(
last_id,
)
if changed_rooms is None:
changed_rooms = self._room_serials
rows = []
for room_id, serial in self._room_serials.items():
if last_id < serial and serial <= current_id:
for room_id in changed_rooms:
serial = self._room_serials[room_id]
if last_id < serial <= current_id:
typing = self._room_typing[room_id]
rows.append((serial, room_id, list(typing)))
rows.sort()

View file

@ -75,14 +75,14 @@ class SynapseRequest(Request):
return '<%s at 0x%x method=%r uri=%r clientproto=%r site=%r>' % (
self.__class__.__name__,
id(self),
self.method.decode('ascii', errors='replace'),
self.get_method(),
self.get_redacted_uri(),
self.clientproto.decode('ascii', errors='replace'),
self.site.site_tag,
)
def get_request_id(self):
return "%s-%i" % (self.method.decode('ascii'), self.request_seq)
return "%s-%i" % (self.get_method(), self.request_seq)
def get_redacted_uri(self):
uri = self.uri
@ -90,6 +90,21 @@ class SynapseRequest(Request):
uri = self.uri.decode('ascii')
return redact_uri(uri)
def get_method(self):
"""Gets the method associated with the request (or placeholder if not
method has yet been received).
Note: This is necessary as the placeholder value in twisted is str
rather than bytes, so we need to sanitise `self.method`.
Returns:
str
"""
method = self.method
if isinstance(method, bytes):
method = self.method.decode('ascii')
return method
def get_user_agent(self):
return self.requestHeaders.getRawHeaders(b"User-Agent", [None])[-1]
@ -119,7 +134,7 @@ class SynapseRequest(Request):
# dispatching to the handler, so that the handler
# can update the servlet name in the request
# metrics
requests_counter.labels(self.method.decode('ascii'),
requests_counter.labels(self.get_method(),
self.request_metrics.name).inc()
@contextlib.contextmanager
@ -207,14 +222,14 @@ class SynapseRequest(Request):
self.start_time = time.time()
self.request_metrics = RequestMetrics()
self.request_metrics.start(
self.start_time, name=servlet_name, method=self.method.decode('ascii'),
self.start_time, name=servlet_name, method=self.get_method(),
)
self.site.access_logger.info(
"%s - %s - Received request: %s %s",
self.getClientIP(),
self.site.site_tag,
self.method.decode('ascii'),
self.get_method(),
self.get_redacted_uri()
)
@ -280,7 +295,7 @@ class SynapseRequest(Request):
int(usage.db_txn_count),
self.sentLength,
code,
self.method.decode('ascii'),
self.get_method(),
self.get_redacted_uri(),
self.clientproto.decode('ascii', errors='replace'),
user_agent,

View file

@ -24,9 +24,10 @@ from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError
from synapse.handlers.presence import format_user_presence_state
from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import StreamToken
from synapse.util.async_helpers import ObservableDeferred, timeout_deferred
from synapse.util.logcontext import PreserveLoggingContext, run_in_background
from synapse.util.logcontext import PreserveLoggingContext
from synapse.util.logutils import log_function
from synapse.util.metrics import Measure
from synapse.visibility import filter_events_for_client
@ -248,7 +249,10 @@ class Notifier(object):
def _on_new_room_event(self, event, room_stream_id, extra_users=[]):
"""Notify any user streams that are interested in this room event"""
# poke any interested application service.
run_in_background(self._notify_app_services, room_stream_id)
run_as_background_process(
"notify_app_services",
self._notify_app_services, room_stream_id,
)
if self.federation_sender:
self.federation_sender.notify_new_events(room_stream_id)

View file

@ -33,31 +33,32 @@ logger = logging.getLogger(__name__)
# [2] https://setuptools.readthedocs.io/en/latest/setuptools.html#declaring-dependencies
REQUIREMENTS = {
"jsonschema>=2.5.1": ["jsonschema>=2.5.1"],
"frozendict>=0.4": ["frozendict"],
"frozendict>=1": ["frozendict"],
"unpaddedbase64>=1.1.0": ["unpaddedbase64>=1.1.0"],
"canonicaljson>=1.1.3": ["canonicaljson>=1.1.3"],
"signedjson>=1.0.0": ["signedjson>=1.0.0"],
"pynacl>=1.2.1": ["nacl>=1.2.1", "nacl.bindings"],
"service_identity>=1.0.0": ["service_identity>=1.0.0"],
"service_identity>=16.0.0": ["service_identity>=16.0.0"],
"Twisted>=17.1.0": ["twisted>=17.1.0"],
"treq>=15.1": ["treq>=15.1"],
# Twisted has required pyopenssl 16.0 since about Twisted 16.6.
"pyopenssl>=16.0.0": ["OpenSSL>=16.0.0"],
"pyyaml": ["yaml"],
"pyasn1": ["pyasn1"],
"daemonize": ["daemonize"],
"bcrypt": ["bcrypt>=3.1.0"],
"pillow": ["PIL"],
"pydenticon": ["pydenticon"],
"sortedcontainers": ["sortedcontainers"],
"pysaml2>=3.0.0": ["saml2>=3.0.0"],
"pymacaroons-pynacl": ["pymacaroons"],
"pyyaml>=3.11": ["yaml"],
"pyasn1>=0.1.9": ["pyasn1"],
"pyasn1-modules>=0.0.7": ["pyasn1_modules"],
"daemonize>=2.3.1": ["daemonize"],
"bcrypt>=3.1.0": ["bcrypt>=3.1.0"],
"pillow>=3.1.2": ["PIL"],
"pydenticon>=0.2": ["pydenticon"],
"sortedcontainers>=1.4.4": ["sortedcontainers"],
"pysaml2>=3.0.0": ["saml2"],
"pymacaroons-pynacl>=0.9.3": ["pymacaroons"],
"msgpack-python>=0.3.0": ["msgpack"],
"phonenumbers>=8.2.0": ["phonenumbers"],
"six": ["six"],
"prometheus_client": ["prometheus_client"],
"six>=1.10": ["six"],
"prometheus_client>=0.0.18": ["prometheus_client"],
# we use attr.s(slots), which arrived in 16.0.0
"attrs>=16.0.0": ["attr>=16.0.0"],

View file

@ -65,10 +65,15 @@ def resolve_events_with_factory(state_sets, event_map, state_map_factory):
for event_ids in itervalues(conflicted_state)
for event_id in event_ids
)
needed_event_count = len(needed_events)
if event_map is not None:
needed_events -= set(iterkeys(event_map))
logger.info("Asking for %d conflicted events", len(needed_events))
logger.info(
"Asking for %d/%d conflicted events",
len(needed_events),
needed_event_count,
)
# dict[str, FrozenEvent]: a map from state event id to event. Only includes
# the state events which are in conflict (and those in event_map)
@ -85,11 +90,16 @@ def resolve_events_with_factory(state_sets, event_map, state_map_factory):
)
new_needed_events = set(itervalues(auth_events))
new_needed_event_count = len(new_needed_events)
new_needed_events -= needed_events
if event_map is not None:
new_needed_events -= set(iterkeys(event_map))
logger.info("Asking for %d auth events", len(new_needed_events))
logger.info(
"Asking for %d/%d auth events",
len(new_needed_events),
new_needed_event_count,
)
state_map_new = yield state_map_factory(new_needed_events)
state_map.update(state_map_new)

View file

@ -172,6 +172,10 @@ class MonthlyActiveUsersStore(SQLBaseStore):
Deferred[bool]: True if a new entry was created, False if an
existing one was updated.
"""
# Am consciously deciding to lock the table on the basis that is ought
# never be a big table and alternative approaches (batching multiple
# upserts into a single txn) introduced a lot of extra complexity.
# See https://github.com/matrix-org/synapse/issues/3854 for more
is_insert = yield self._simple_upsert(
desc="upsert_monthly_active_user",
table="monthly_active_users",
@ -181,7 +185,6 @@ class MonthlyActiveUsersStore(SQLBaseStore):
values={
"timestamp": int(self._clock.time_msec()),
},
lock=False,
)
if is_insert:
self.user_last_seen_monthly_active.invalidate((user_id,))

View file

@ -255,7 +255,17 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
)
@defer.inlineCallbacks
def get_state_groups_ids(self, room_id, event_ids):
def get_state_groups_ids(self, _room_id, event_ids):
"""Get the event IDs of all the state for the state groups for the given events
Args:
_room_id (str): id of the room for these events
event_ids (iterable[str]): ids of the events
Returns:
Deferred[dict[int, dict[tuple[str, str], str]]]:
dict of state_group_id -> (dict of (type, state_key) -> event id)
"""
if not event_ids:
defer.returnValue({})
@ -270,7 +280,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
@defer.inlineCallbacks
def get_state_ids_for_group(self, state_group):
"""Get the state IDs for the given state group
"""Get the event IDs of all the state in the given state group
Args:
state_group (int)
@ -286,7 +296,9 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
def get_state_groups(self, room_id, event_ids):
""" Get the state groups for the given list of event_ids
The return value is a dict mapping group names to lists of events.
Returns:
Deferred[dict[int, list[EventBase]]]:
dict of state_group_id -> list of state events.
"""
if not event_ids:
defer.returnValue({})
@ -324,7 +336,9 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
member events (if True), or to exclude member events (if False)
Returns:
dictionary state_group -> (dict of (type, state_key) -> event id)
Returns:
Deferred[dict[int, dict[tuple[str, str], str]]]:
dict of state_group_id -> (dict of (type, state_key) -> event id)
"""
results = {}
@ -732,8 +746,8 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
If None, `types` filtering is applied to all events.
Returns:
Deferred[dict[int, dict[(type, state_key), EventBase]]]
a dictionary mapping from state group to state dictionary.
Deferred[dict[int, dict[tuple[str, str], str]]]:
dict of state_group_id -> (dict of (type, state_key) -> event id)
"""
if types is not None:
non_member_types = [t for t in types if t[0] != EventTypes.Member]
@ -788,8 +802,8 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
If None, `types` filtering is applied to all events.
Returns:
Deferred[dict[int, dict[(type, state_key), EventBase]]]
a dictionary mapping from state group to state dictionary.
Deferred[dict[int, dict[tuple[str, str], str]]]:
dict of state_group_id -> (dict of (type, state_key) -> event id)
"""
if types:
types = frozenset(types)

View file

@ -23,6 +23,7 @@ from canonicaljson import encode_canonical_json
from twisted.internet import defer
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.caches.expiringcache import ExpiringCache
from ._base import SQLBaseStore, db_to_json
@ -49,6 +50,8 @@ _UpdateTransactionRow = namedtuple(
)
)
SENTINEL = object()
class TransactionStore(SQLBaseStore):
"""A collection of queries for handling PDUs.
@ -59,6 +62,12 @@ class TransactionStore(SQLBaseStore):
self._clock.looping_call(self._start_cleanup_transactions, 30 * 60 * 1000)
self._destination_retry_cache = ExpiringCache(
cache_name="get_destination_retry_timings",
clock=self._clock,
expiry_ms=5 * 60 * 1000,
)
def get_received_txn_response(self, transaction_id, origin):
"""For an incoming transaction from a given origin, check if we have
already responded to it. If so, return the response code and response
@ -155,6 +164,7 @@ class TransactionStore(SQLBaseStore):
"""
pass
@defer.inlineCallbacks
def get_destination_retry_timings(self, destination):
"""Gets the current retry timings (if any) for a given destination.
@ -165,10 +175,20 @@ class TransactionStore(SQLBaseStore):
None if not retrying
Otherwise a dict for the retry scheme
"""
return self.runInteraction(
result = self._destination_retry_cache.get(destination, SENTINEL)
if result is not SENTINEL:
defer.returnValue(result)
result = yield self.runInteraction(
"get_destination_retry_timings",
self._get_destination_retry_timings, destination)
# We don't hugely care about race conditions between getting and
# invalidating the cache, since we time out fairly quickly anyway.
self._destination_retry_cache[destination] = result
defer.returnValue(result)
def _get_destination_retry_timings(self, txn, destination):
result = self._simple_select_one_txn(
txn,
@ -196,6 +216,7 @@ class TransactionStore(SQLBaseStore):
retry_interval (int) - how long until next retry in ms
"""
self._destination_retry_cache.pop(destination)
return self.runInteraction(
"set_destination_retry_timings",
self._set_destination_retry_timings,

View file

@ -16,7 +16,7 @@
import logging
from collections import OrderedDict
from six import itervalues
from six import iteritems, itervalues
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.caches import register_cache
@ -24,6 +24,9 @@ from synapse.util.caches import register_cache
logger = logging.getLogger(__name__)
SENTINEL = object()
class ExpiringCache(object):
def __init__(self, cache_name, clock, max_len=0, expiry_ms=0,
reset_expiry_on_get=False, iterable=False):
@ -95,6 +98,21 @@ class ExpiringCache(object):
return entry.value
def pop(self, key, default=SENTINEL):
"""Removes and returns the value with the given key from the cache.
If the key isn't in the cache then `default` will be returned if
specified, otherwise `KeyError` will get raised.
Identical functionality to `dict.pop(..)`.
"""
value = self._cache.pop(key, default)
if value is SENTINEL:
raise KeyError(key)
return value
def __contains__(self, key):
return key in self._cache
@ -122,7 +140,7 @@ class ExpiringCache(object):
keys_to_delete = set()
for key, cache_entry in self._cache.items():
for key, cache_entry in iteritems(self._cache):
if now - cache_entry.time > self._expiry_ms:
keys_to_delete.add(key)
@ -146,6 +164,8 @@ class ExpiringCache(object):
class _CacheEntry(object):
__slots__ = ["time", "value"]
def __init__(self, time, value):
self.time = time
self.value = value

View file

@ -200,7 +200,7 @@ class LoggingContext(object):
sentinel = Sentinel()
def __init__(self, name=None, parent_context=None):
def __init__(self, name=None, parent_context=None, request=None):
self.previous_context = LoggingContext.current_context()
self.name = name
@ -218,6 +218,13 @@ class LoggingContext(object):
self.parent_context = parent_context
if self.parent_context is not None:
self.parent_context.copy_to(self)
if request is not None:
# the request param overrides the request from the parent context
self.request = request
def __str__(self):
return "%s@%x" % (self.name, id(self))
@ -256,9 +263,6 @@ class LoggingContext(object):
)
self.alive = True
if self.parent_context is not None:
self.parent_context.copy_to(self)
return self
def __exit__(self, type, value, traceback):
@ -439,6 +443,35 @@ class PreserveLoggingContext(object):
)
def nested_logging_context(suffix, parent_context=None):
"""Creates a new logging context as a child of another.
The nested logging context will have a 'request' made up of the parent context's
request, plus the given suffix.
CPU/db usage stats will be added to the parent context's on exit.
Normal usage looks like:
with nested_logging_context(suffix):
# ... do stuff
Args:
suffix (str): suffix to add to the parent context's 'request'.
parent_context (LoggingContext|None): parent context. Will use the current context
if None.
Returns:
LoggingContext: new logging context.
"""
if parent_context is None:
parent_context = LoggingContext.current_context()
return LoggingContext(
parent_context=parent_context,
request=parent_context.request + "-" + suffix,
)
def preserve_fn(f):
"""Function decorator which wraps the function with run_in_background"""
def g(*args, **kwargs):

View file

@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from canonicaljson import encode_canonical_json
from synapse.events import FrozenEvent, _EventInternalMetadata
from synapse.events.snapshot import EventContext
from synapse.replication.slave.storage.events import SlavedEventStore
@ -26,7 +28,9 @@ ROOM_ID = "!room:blue"
def dict_equals(self, other):
return self.__dict__ == other.__dict__
me = encode_canonical_json(self._event_dict)
them = encode_canonical_json(other._event_dict)
return me == them
def patch__eq__(cls):

View file

@ -0,0 +1,100 @@
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.rest.client.v1 import admin, login, room
from synapse.rest.client.v2_alpha import sync
from tests import unittest
class ConsentNoticesTests(unittest.HomeserverTestCase):
servlets = [
sync.register_servlets,
admin.register_servlets,
login.register_servlets,
room.register_servlets,
]
def make_homeserver(self, reactor, clock):
self.consent_notice_message = "consent %(consent_uri)s"
config = self.default_config()
config.user_consent_version = "1"
config.user_consent_server_notice_content = {
"msgtype": "m.text",
"body": self.consent_notice_message,
}
config.public_baseurl = "https://example.com/"
config.form_secret = "123abc"
config.server_notices_mxid = "@notices:test"
config.server_notices_mxid_display_name = "test display name"
config.server_notices_mxid_avatar_url = None
config.server_notices_room_name = "Server Notices"
hs = self.setup_test_homeserver(config=config)
return hs
def prepare(self, reactor, clock, hs):
self.user_id = self.register_user("bob", "abc123")
self.access_token = self.login("bob", "abc123")
def test_get_sync_message(self):
"""
When user consent server notices are enabled, a sync will cause a notice
to fire (in a room which the user is invited to). The notice contains
the notice URL + an authentication code.
"""
# Initial sync, to get the user consent room invite
request, channel = self.make_request(
"GET", "/_matrix/client/r0/sync", access_token=self.access_token
)
self.render(request)
self.assertEqual(channel.code, 200)
# Get the Room ID to join
room_id = list(channel.json_body["rooms"]["invite"].keys())[0]
# Join the room
request, channel = self.make_request(
"POST",
"/_matrix/client/r0/rooms/" + room_id + "/join",
access_token=self.access_token,
)
self.render(request)
self.assertEqual(channel.code, 200)
# Sync again, to get the message in the room
request, channel = self.make_request(
"GET", "/_matrix/client/r0/sync", access_token=self.access_token
)
self.render(request)
self.assertEqual(channel.code, 200)
# Get the message
room = channel.json_body["rooms"]["join"][room_id]
messages = [
x for x in room["timeline"]["events"] if x["type"] == "m.room.message"
]
# One message, with the consent URL
self.assertEqual(len(messages), 1)
self.assertTrue(
messages[0]["content"]["body"].startswith(
"consent https://example.com/_matrix/consent"
)
)

View file

@ -14,10 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import hashlib
import hmac
import json
from mock import Mock
from twisted.internet import defer
@ -145,34 +141,8 @@ class ClientIpAuthTestCase(unittest.HomeserverTestCase):
return hs
def prepare(self, hs, reactor, clock):
self.hs.config.registration_shared_secret = u"shared"
self.store = self.hs.get_datastore()
# Create the user
request, channel = self.make_request("GET", "/_matrix/client/r0/admin/register")
self.render(request)
nonce = channel.json_body["nonce"]
want_mac = hmac.new(key=b"shared", digestmod=hashlib.sha1)
want_mac.update(nonce.encode('ascii') + b"\x00bob\x00abc123\x00admin")
want_mac = want_mac.hexdigest()
body = json.dumps(
{
"nonce": nonce,
"username": "bob",
"password": "abc123",
"admin": True,
"mac": want_mac,
}
)
request, channel = self.make_request(
"POST", "/_matrix/client/r0/admin/register", body.encode('utf8')
)
self.render(request)
self.assertEqual(channel.code, 200)
self.user_id = channel.json_body["user_id"]
self.user_id = self.register_user("bob", "abc123", True)
def test_request_with_xforwarded(self):
"""
@ -194,20 +164,7 @@ class ClientIpAuthTestCase(unittest.HomeserverTestCase):
def _runtest(self, headers, expected_ip, make_request_args):
device_id = "bleb"
body = json.dumps(
{
"type": "m.login.password",
"user": "bob",
"password": "abc123",
"device_id": device_id,
}
)
request, channel = self.make_request(
"POST", "/_matrix/client/r0/login", body.encode('utf8'), **make_request_args
)
self.render(request)
self.assertEqual(channel.code, 200)
access_token = channel.json_body["access_token"].encode('ascii')
access_token = self.login("bob", "abc123", device_id=device_id)
# Advance to a known time
self.reactor.advance(123456 - self.reactor.seconds())
@ -215,7 +172,6 @@ class ClientIpAuthTestCase(unittest.HomeserverTestCase):
request, channel = self.make_request(
"GET",
"/_matrix/client/r0/admin/users/" + self.user_id,
body.encode('utf8'),
access_token=access_token,
**make_request_args
)

View file

@ -74,6 +74,45 @@ class StateStoreTestCase(tests.unittest.TestCase):
self.assertEqual(s1[t].event_id, s2[t].event_id)
self.assertEqual(len(s1), len(s2))
@defer.inlineCallbacks
def test_get_state_groups_ids(self):
e1 = yield self.inject_state_event(
self.room, self.u_alice, EventTypes.Create, '', {}
)
e2 = yield self.inject_state_event(
self.room, self.u_alice, EventTypes.Name, '', {"name": "test room"}
)
state_group_map = yield self.store.get_state_groups_ids(self.room, [e2.event_id])
self.assertEqual(len(state_group_map), 1)
state_map = list(state_group_map.values())[0]
self.assertDictEqual(
state_map,
{
(EventTypes.Create, ''): e1.event_id,
(EventTypes.Name, ''): e2.event_id,
},
)
@defer.inlineCallbacks
def test_get_state_groups(self):
e1 = yield self.inject_state_event(
self.room, self.u_alice, EventTypes.Create, '', {}
)
e2 = yield self.inject_state_event(
self.room, self.u_alice, EventTypes.Name, '', {"name": "test room"}
)
state_group_map = yield self.store.get_state_groups(
self.room, [e2.event_id])
self.assertEqual(len(state_group_map), 1)
state_list = list(state_group_map.values())[0]
self.assertEqual(
{ev.event_id for ev in state_list},
{e1.event_id, e2.event_id},
)
@defer.inlineCallbacks
def test_get_state_for_event(self):

View file

@ -6,6 +6,7 @@ from twisted.internet.defer import maybeDeferred, succeed
from synapse.events import FrozenEvent
from synapse.types import Requester, UserID
from synapse.util import Clock
from synapse.util.logcontext import LoggingContext
from tests import unittest
from tests.server import ThreadedMemoryReactorClock, setup_test_homeserver
@ -117,9 +118,10 @@ class MessageAcceptTests(unittest.TestCase):
}
)
d = self.handler.on_receive_pdu(
"test.serv", lying_event, sent_to_us_directly=True
)
with LoggingContext(request="lying_event"):
d = self.handler.on_receive_pdu(
"test.serv", lying_event, sent_to_us_directly=True
)
# Step the reactor, so the database fetches come back
self.reactor.advance(1)
@ -139,107 +141,3 @@ class MessageAcceptTests(unittest.TestCase):
self.homeserver.datastore.get_latest_event_ids_in_room, self.room_id
)
self.assertEqual(self.successResultOf(extrem)[0], "$join:test.serv")
def test_cant_hide_past_history(self):
"""
If you send a message, you must be able to provide the direct
prev_events that said event references.
"""
def post_json(destination, path, data, headers=None, timeout=0):
if path.startswith("/_matrix/federation/v1/get_missing_events/"):
return {
"events": [
{
"room_id": self.room_id,
"sender": "@baduser:test.serv",
"event_id": "three:test.serv",
"depth": 1000,
"origin_server_ts": 1,
"type": "m.room.message",
"origin": "test.serv",
"content": "hewwo?",
"auth_events": [],
"prev_events": [("four:test.serv", {})],
}
]
}
self.http_client.post_json = post_json
def get_json(destination, path, args, headers=None):
if path.startswith("/_matrix/federation/v1/state_ids/"):
d = self.successResultOf(
self.homeserver.datastore.get_state_ids_for_event("one:test.serv")
)
return succeed(
{
"pdu_ids": [
y
for x, y in d.items()
if x == ("m.room.member", "@us:test")
],
"auth_chain_ids": list(d.values()),
}
)
self.http_client.get_json = get_json
# Figure out what the most recent event is
most_recent = self.successResultOf(
maybeDeferred(
self.homeserver.datastore.get_latest_event_ids_in_room, self.room_id
)
)[0]
# Make a good event
good_event = FrozenEvent(
{
"room_id": self.room_id,
"sender": "@baduser:test.serv",
"event_id": "one:test.serv",
"depth": 1000,
"origin_server_ts": 1,
"type": "m.room.message",
"origin": "test.serv",
"content": "hewwo?",
"auth_events": [],
"prev_events": [(most_recent, {})],
}
)
d = self.handler.on_receive_pdu(
"test.serv", good_event, sent_to_us_directly=True
)
self.reactor.advance(1)
self.assertEqual(self.successResultOf(d), None)
bad_event = FrozenEvent(
{
"room_id": self.room_id,
"sender": "@baduser:test.serv",
"event_id": "two:test.serv",
"depth": 1000,
"origin_server_ts": 1,
"type": "m.room.message",
"origin": "test.serv",
"content": "hewwo?",
"auth_events": [],
"prev_events": [("one:test.serv", {}), ("three:test.serv", {})],
}
)
d = self.handler.on_receive_pdu(
"test.serv", bad_event, sent_to_us_directly=True
)
self.reactor.advance(1)
extrem = maybeDeferred(
self.homeserver.datastore.get_latest_event_ids_in_room, self.room_id
)
self.assertEqual(self.successResultOf(extrem)[0], "two:test.serv")
state = self.homeserver.get_state_handler().get_current_state_ids(self.room_id)
self.reactor.advance(1)
self.assertIn(("m.room.member", "@us:test"), self.successResultOf(state).keys())

View file

@ -14,6 +14,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import hashlib
import hmac
import logging
from mock import Mock
@ -32,6 +34,7 @@ from synapse.types import UserID, create_requester
from synapse.util.logcontext import LoggingContextFilter
from tests.server import get_clock, make_request, render, setup_test_homeserver
from tests.utils import default_config
# Set up putting Synapse's logs into Trial's.
rootLogger = logging.getLogger()
@ -223,6 +226,15 @@ class HomeserverTestCase(TestCase):
hs = self.setup_test_homeserver()
return hs
def default_config(self, name="test"):
"""
Get a default HomeServer config object.
Args:
name (str): The homeserver name/domain.
"""
return default_config(name)
def prepare(self, reactor, clock, homeserver):
"""
Prepare for the test. This involves things like mocking out parts of
@ -297,3 +309,69 @@ class HomeserverTestCase(TestCase):
return d
self.pump()
return self.successResultOf(d)
def register_user(self, username, password, admin=False):
"""
Register a user. Requires the Admin API be registered.
Args:
username (bytes/unicode): The user part of the new user.
password (bytes/unicode): The password of the new user.
admin (bool): Whether the user should be created as an admin
or not.
Returns:
The MXID of the new user (unicode).
"""
self.hs.config.registration_shared_secret = u"shared"
# Create the user
request, channel = self.make_request("GET", "/_matrix/client/r0/admin/register")
self.render(request)
nonce = channel.json_body["nonce"]
want_mac = hmac.new(key=b"shared", digestmod=hashlib.sha1)
nonce_str = b"\x00".join([username.encode('utf8'), password.encode('utf8')])
if admin:
nonce_str += b"\x00admin"
else:
nonce_str += b"\x00notadmin"
want_mac.update(nonce.encode('ascii') + b"\x00" + nonce_str)
want_mac = want_mac.hexdigest()
body = json.dumps(
{
"nonce": nonce,
"username": username,
"password": password,
"admin": admin,
"mac": want_mac,
}
)
request, channel = self.make_request(
"POST", "/_matrix/client/r0/admin/register", body.encode('utf8')
)
self.render(request)
self.assertEqual(channel.code, 200)
user_id = channel.json_body["user_id"]
return user_id
def login(self, username, password, device_id=None):
"""
Log in a user, and get an access token. Requires the Login API be
registered.
"""
body = {"type": "m.login.password", "user": username, "password": password}
if device_id:
body["device_id"] = device_id
request, channel = self.make_request(
"POST", "/_matrix/client/r0/login", json.dumps(body).encode('utf8')
)
self.render(request)
self.assertEqual(channel.code, 200)
access_token = channel.json_body["access_token"].encode('ascii')
return access_token

View file

@ -159,6 +159,11 @@ class LoggingContextTestCase(unittest.TestCase):
self.assertEqual(r, "bum")
self._check_test_key("one")
def test_nested_logging_context(self):
with LoggingContext(request="foo"):
nested_context = logcontext.nested_logging_context(suffix="bar")
self.assertEqual(nested_context.request, "foo-bar")
# a function which returns a deferred which has been "called", but
# which had a function which returned another incomplete deferred on

View file

@ -96,6 +96,64 @@ def setupdb():
atexit.register(_cleanup)
def default_config(name):
"""
Create a reasonable test config.
"""
config = Mock()
config.signing_key = [MockKey()]
config.event_cache_size = 1
config.enable_registration = True
config.macaroon_secret_key = "not even a little secret"
config.expire_access_token = False
config.server_name = name
config.trusted_third_party_id_servers = []
config.room_invite_state_types = []
config.password_providers = []
config.worker_replication_url = ""
config.worker_app = None
config.email_enable_notifs = False
config.block_non_admin_invites = False
config.federation_domain_whitelist = None
config.federation_rc_reject_limit = 10
config.federation_rc_sleep_limit = 10
config.federation_rc_sleep_delay = 100
config.federation_rc_concurrent = 10
config.filter_timeline_limit = 5000
config.user_directory_search_all_users = False
config.user_consent_server_notice_content = None
config.block_events_without_consent_error = None
config.media_storage_providers = []
config.auto_join_rooms = []
config.limit_usage_by_mau = False
config.hs_disabled = False
config.hs_disabled_message = ""
config.hs_disabled_limit_type = ""
config.max_mau_value = 50
config.mau_trial_days = 0
config.mau_limits_reserved_threepids = []
config.admin_contact = None
config.rc_messages_per_second = 10000
config.rc_message_burst_count = 10000
config.use_frozen_dicts = False
# we need a sane default_room_version, otherwise attempts to create rooms will
# fail.
config.default_room_version = "1"
# disable user directory updates, because they get done in the
# background, which upsets the test runner.
config.update_user_directory = False
def is_threepid_reserved(threepid):
return ServerConfig.is_threepid_reserved(config, threepid)
config.is_threepid_reserved.side_effect = is_threepid_reserved
return config
class TestHomeServer(HomeServer):
DATASTORE_CLASS = DataStore
@ -124,56 +182,8 @@ def setup_test_homeserver(
from twisted.internet import reactor
if config is None:
config = Mock()
config.signing_key = [MockKey()]
config.event_cache_size = 1
config.enable_registration = True
config.macaroon_secret_key = "not even a little secret"
config.expire_access_token = False
config.server_name = name
config.trusted_third_party_id_servers = []
config.room_invite_state_types = []
config.password_providers = []
config.worker_replication_url = ""
config.worker_app = None
config.email_enable_notifs = False
config.block_non_admin_invites = False
config.federation_domain_whitelist = None
config.federation_rc_reject_limit = 10
config.federation_rc_sleep_limit = 10
config.federation_rc_sleep_delay = 100
config.federation_rc_concurrent = 10
config.filter_timeline_limit = 5000
config.user_directory_search_all_users = False
config.user_consent_server_notice_content = None
config.block_events_without_consent_error = None
config.media_storage_providers = []
config.auto_join_rooms = []
config.limit_usage_by_mau = False
config.hs_disabled = False
config.hs_disabled_message = ""
config.hs_disabled_limit_type = ""
config.max_mau_value = 50
config.mau_trial_days = 0
config.mau_limits_reserved_threepids = []
config.admin_contact = None
config.rc_messages_per_second = 10000
config.rc_message_burst_count = 10000
config = default_config(name)
# we need a sane default_room_version, otherwise attempts to create rooms will
# fail.
config.default_room_version = "1"
# disable user directory updates, because they get done in the
# background, which upsets the test runner.
config.update_user_directory = False
def is_threepid_reserved(threepid):
return ServerConfig.is_threepid_reserved(config, threepid)
config.is_threepid_reserved.side_effect = is_threepid_reserved
config.use_frozen_dicts = True
config.ldap_enabled = False
if "clock" not in kargs:

20
tox.ini
View file

@ -64,6 +64,26 @@ setenv =
{[base]setenv}
SYNAPSE_POSTGRES = 1
# A test suite for the oldest supported versions of Python libraries, to catch
# any uses of APIs not available in them.
[testenv:py27-old]
skip_install=True
deps =
# Old automat version for Twisted
Automat == 0.3.0
mock
lxml
commands =
/usr/bin/find "{toxinidir}" -name '*.pyc' -delete
# Make all greater-thans equals so we test the oldest version of our direct
# dependencies, but make the pyopenssl 17.0, which can work against an
# OpenSSL 1.1 compiled cryptography (as older ones don't compile on Travis).
/bin/sh -c 'python -m synapse.python_dependencies | sed -e "s/>=/==/g" -e "s/psycopg2==2.6//" -e "s/pyopenssl==16.0.0/pyopenssl==17.0.0/" | xargs pip install'
# Install Synapse itself. This won't update any libraries.
pip install -e .
{envbindir}/trial {env:TRIAL_FLAGS:} {posargs:tests} {env:TOXSUFFIX:}
[testenv:py35]
usedevelop=true