0
0
Fork 1
mirror of https://mau.dev/maunium/synapse.git synced 2024-12-17 13:23:54 +01:00

Merge branch 'develop' into rav/drop_re_signing_hacks

This commit is contained in:
Richard van der Hoff 2018-07-04 07:13:38 +01:00 committed by GitHub
commit a4ab491371
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
152 changed files with 2187 additions and 641 deletions

View file

@ -4,7 +4,12 @@ language: python
# tell travis to cache ~/.cache/pip
cache: pip
before_script:
- git remote set-branches --add origin develop
- git fetch origin develop
matrix:
fast_finish: true
include:
- python: 2.7
env: TOX_ENV=packaging
@ -14,10 +19,13 @@ matrix:
- python: 2.7
env: TOX_ENV=py27
- python: 3.6
env: TOX_ENV=py36
- python: 3.6
env: TOX_ENV=check-newsfragment
install:
- pip install tox

View file

@ -1,15 +1,36 @@
Changes in synapse v0.31.2 (2018-06-14)
=======================================
SECURITY UPDATE: Prevent unauthorised users from setting state events in a room
when there is no ``m.room.power_levels`` event in force in the room. (PR #3397)
Discussion around the Matrix Spec change proposal for this change can be
followed at https://github.com/matrix-org/matrix-doc/issues/1304.
Changes in synapse v0.31.1 (2018-06-08)
=======================================
v0.31.1 fixes a security bug in the ``get_missing_events`` federation API
where event visibility rules were not applied correctly.
We are not aware of it being actively exploited but please upgrade asap.
Bug Fixes:
* Fix event filtering in get_missing_events handler (PR #3371)
Changes in synapse v0.31.0 (2018-06-06)
=======================================
Most notable change from v0.30.0 is to switch to python prometheus library to improve system
stats reporting. WARNING this changes a number of prometheus metrics in a
Most notable change from v0.30.0 is to switch to the python prometheus library to improve system
stats reporting. WARNING: this changes a number of prometheus metrics in a
backwards-incompatible manner. For more details, see
`docs/metrics-howto.rst <docs/metrics-howto.rst#removal-of-deprecated-metrics--time-based-counters-becoming-histograms-in-0310>`_.
Bug Fixes:
* Fix metric documentation tables (PR #3341)
* Fix LaterGuage error handling (694968f)
* Fix LaterGauge error handling (694968f)
* Fix replication metrics (b7e7fd2)
Changes in synapse v0.31.0-rc1 (2018-06-04)
@ -29,7 +50,6 @@ Changes:
* Remove users from user directory on deactivate (PR #3277)
* Avoid sending consent notice to guest users (PR #3288)
* disable CPUMetrics if no /proc/self/stat (PR #3299)
* Add local and loopback IPv6 addresses to url_preview_ip_range_blacklist (PR #3312) Thanks to @thegcat!
* Consistently use six's iteritems and wrap lazy keys/values in list() if they're not meant to be lazy (PR #3307)
* Add private IPv6 addresses to example config for url preview blacklist (PR #3317) Thanks to @thegcat!
* Reduce stuck read-receipts: ignore depth when updating (PR #3318)

View file

@ -48,6 +48,26 @@ Please ensure your changes match the cosmetic style of the existing project,
and **never** mix cosmetic and functional changes in the same commit, as it
makes it horribly hard to review otherwise.
Changelog
~~~~~~~~~
All changes, even minor ones, need a corresponding changelog
entry. These are managed by Towncrier
(https://github.com/hawkowl/towncrier).
To create a changelog entry, make a new file in the ``changelog.d``
file named in the format of ``issuenumberOrPR.type``. The type can be
one of ``feature``, ``bugfix``, ``removal`` (also used for
deprecations), or ``misc`` (for internal-only changes). The content of
the file is your changelog entry, which can contain RestructuredText
formatting. A note of contributors is welcomed in changelogs for
non-misc changes (the content of misc changes is not displayed).
For example, a fix for a bug reported in #1234 would have its
changelog entry in ``changelog.d/1234.bugfix``, and contain content
like "The security levels of Florbs are now validated when
recieved over federation. Contributed by Jane Matrix".
Attribution
~~~~~~~~~~~
@ -110,11 +130,15 @@ If you agree to this for your contribution, then all that's needed is to
include the line in your commit or pull request comment::
Signed-off-by: Your Name <your@email.example.org>
...using your real name; unfortunately pseudonyms and anonymous contributions
can't be accepted. Git makes this trivial - just use the -s flag when you do
``git commit``, having first set ``user.name`` and ``user.email`` git configs
(which you should have done anyway :)
We accept contributions under a legally identifiable name, such as
your name on government documentation or common-law names (names
claimed by legitimate usage or repute). Unfortunately, we cannot
accept anonymous contributions at this time.
Git allows you to add this signoff automatically when using the ``-s``
flag to ``git commit``, which uses the name and email set in your
``user.name`` and ``user.email`` git configs.
Conclusion
~~~~~~~~~~

View file

@ -29,5 +29,8 @@ exclude Dockerfile
exclude .dockerignore
recursive-exclude jenkins *.sh
include pyproject.toml
recursive-include changelog.d *
prune .github
prune demo/etc

1
changelog.d/3324.removal Normal file
View file

@ -0,0 +1 @@
Remove was_forgotten_at

1
changelog.d/3327.bugfix Normal file
View file

@ -0,0 +1 @@
Strip access_token from outgoing requests

0
changelog.d/3332.misc Normal file
View file

1
changelog.d/3334.feature Normal file
View file

@ -0,0 +1 @@
Cache factor override system for specific caches

1
changelog.d/3340.doc Normal file
View file

@ -0,0 +1 @@
``doc/postgres.rst``: fix display of the last command block. Thanks to @ArchangeGabriel!

0
changelog.d/3341.misc Normal file
View file

1
changelog.d/3344.feature Normal file
View file

@ -0,0 +1 @@
Add metrics to track appservice transactions

0
changelog.d/3347.misc Normal file
View file

0
changelog.d/3348.misc Normal file
View file

1
changelog.d/3349.bugfix Normal file
View file

@ -0,0 +1 @@
Redact AS tokens in logs

1
changelog.d/3355.bugfix Normal file
View file

@ -0,0 +1 @@
Fix federation backfill from SQLite servers

0
changelog.d/3356.misc Normal file
View file

1
changelog.d/3363.bugfix Normal file
View file

@ -0,0 +1 @@
Fix event-purge-by-ts admin API

1
changelog.d/3371.bugfix Normal file
View file

@ -0,0 +1 @@
Fix event filtering in get_missing_events handler

1
changelog.d/3372.feature Normal file
View file

@ -0,0 +1 @@
Try to log more helpful info when a sig verification fails

0
changelog.d/3446.misc Normal file
View file

0
changelog.d/3447.misc Normal file
View file

1
changelog.d/3456.bugfix Normal file
View file

@ -0,0 +1 @@
Synapse is now stricter regarding accepting events which it cannot retrieve the prev_events for.

1
changelog.d/3462.feature Normal file
View file

@ -0,0 +1 @@
Synapse now uses the best performing JSON encoder/decoder according to your runtime (simplejson on CPython, stdlib json on PyPy).

1
changelog.d/3465.feature Normal file
View file

@ -0,0 +1 @@
Add optional ip_range_whitelist param to AS registration files to lock AS IP access

0
changelog.d/3467.misc Normal file
View file

1
changelog.d/3470.bugfix Normal file
View file

@ -0,0 +1 @@
Fix bug where synapse would explode when receiving unicode in HTTP User-Agent header

1
changelog.d/3480.feature Normal file
View file

@ -0,0 +1 @@
Reject invalid server names in federation requests

View file

@ -44,13 +44,26 @@ Deactivate Account
This API deactivates an account. It removes active access tokens, resets the
password, and deletes third-party IDs (to prevent the user requesting a
password reset).
password reset). It can also mark the user as GDPR-erased (stopping their data
from distributed further, and deleting it entirely if there are no other
references to it).
The api is::
POST /_matrix/client/r0/admin/deactivate/<user_id>
including an ``access_token`` of a server admin, and an empty request body.
with a body of:
.. code:: json
{
"erase": true
}
including an ``access_token`` of a server admin.
The erase parameter is optional and defaults to 'false'.
An empty body may be passed for backwards compatibility.
Reset password

5
pyproject.toml Normal file
View file

@ -0,0 +1,5 @@
[tool.towncrier]
package = "synapse"
filename = "CHANGES.rst"
directory = "changelog.d"
issue_format = "`#{issue} <https://github.com/matrix-org/synapse/issues/{issue}>`_"

View file

@ -18,14 +18,22 @@
from __future__ import print_function
import argparse
from urlparse import urlparse, urlunparse
import nacl.signing
import json
import base64
import requests
import sys
from requests.adapters import HTTPAdapter
import srvlookup
import yaml
# uncomment the following to enable debug logging of http requests
#from httplib import HTTPConnection
#HTTPConnection.debuglevel = 1
def encode_base64(input_bytes):
"""Encode bytes as a base64 string without any padding."""
@ -113,17 +121,6 @@ def read_signing_keys(stream):
return keys
def lookup(destination, path):
if ":" in destination:
return "https://%s%s" % (destination, path)
else:
try:
srv = srvlookup.lookup("matrix", "tcp", destination)[0]
return "https://%s:%d%s" % (srv.host, srv.port, path)
except:
return "https://%s:%d%s" % (destination, 8448, path)
def request_json(method, origin_name, origin_key, destination, path, content):
if method is None:
if content is None:
@ -152,13 +149,19 @@ def request_json(method, origin_name, origin_key, destination, path, content):
authorization_headers.append(bytes(header))
print ("Authorization: %s" % header, file=sys.stderr)
dest = lookup(destination, path)
dest = "matrix://%s%s" % (destination, path)
print ("Requesting %s" % dest, file=sys.stderr)
result = requests.request(
s = requests.Session()
s.mount("matrix://", MatrixConnectionAdapter())
result = s.request(
method=method,
url=dest,
headers={"Authorization": authorization_headers[0]},
headers={
"Host": destination,
"Authorization": authorization_headers[0]
},
verify=False,
data=content,
)
@ -242,5 +245,39 @@ def read_args_from_config(args):
args.signing_key_path = config['signing_key_path']
class MatrixConnectionAdapter(HTTPAdapter):
@staticmethod
def lookup(s):
if s[-1] == ']':
# ipv6 literal (with no port)
return s, 8448
if ":" in s:
out = s.rsplit(":",1)
try:
port = int(out[1])
except ValueError:
raise ValueError("Invalid host:port '%s'" % s)
return out[0], port
try:
srv = srvlookup.lookup("matrix", "tcp", s)[0]
return srv.host, srv.port
except:
return s, 8448
def get_connection(self, url, proxies=None):
parsed = urlparse(url)
(host, port) = self.lookup(parsed.netloc)
netloc = "%s:%d" % (host, port)
print("Connecting to %s" % (netloc,), file=sys.stderr)
url = urlunparse((
"https", netloc, parsed.path, parsed.params, parsed.query,
parsed.fragment,
))
return super(MatrixConnectionAdapter, self).get_connection(url, proxies)
if __name__ == "__main__":
main()

View file

@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -16,4 +17,4 @@
""" This is a reference implementation of a Matrix home server.
"""
__version__ = "0.31.0"
__version__ = "0.31.2"

View file

@ -19,6 +19,7 @@ from six import itervalues
import pymacaroons
from twisted.internet import defer
from netaddr import IPAddress
import synapse.types
from synapse import event_auth
@ -244,6 +245,11 @@ class Auth(object):
if app_service is None:
defer.returnValue((None, None))
if app_service.ip_range_whitelist:
ip_address = IPAddress(self.hs.get_ip_from_request(request))
if ip_address not in app_service.ip_range_whitelist:
defer.returnValue((None, None))
if "user_id" not in request.args:
defer.returnValue((app_service.sender, app_service))
@ -488,7 +494,7 @@ class Auth(object):
def _look_up_user_by_access_token(self, token):
ret = yield self.store.get_user_by_access_token(token)
if not ret:
logger.warn("Unrecognised access token - not in store: %s" % (token,))
logger.warn("Unrecognised access token - not in store.")
raise AuthError(
self.TOKEN_NOT_FOUND_HTTP_STATUS, "Unrecognised access token.",
errcode=Codes.UNKNOWN_TOKEN
@ -511,7 +517,7 @@ class Auth(object):
)
service = self.store.get_app_service_by_token(token)
if not service:
logger.warn("Unrecognised appservice access token: %s" % (token,))
logger.warn("Unrecognised appservice access token.")
raise AuthError(
self.TOKEN_NOT_FOUND_HTTP_STATUS,
"Unrecognised access token.",
@ -655,7 +661,7 @@ class Auth(object):
auth_events[(EventTypes.PowerLevels, "")] = power_level_event
send_level = event_auth.get_send_level(
EventTypes.Aliases, "", auth_events
EventTypes.Aliases, "", power_level_event,
)
user_level = event_auth.get_user_power_level(user_id, auth_events)

View file

@ -17,7 +17,8 @@
import logging
import simplejson as json
from canonicaljson import json
from six import iteritems
from six.moves import http_client

View file

@ -17,7 +17,8 @@ from synapse.storage.presence import UserPresenceState
from synapse.types import UserID, RoomID
from twisted.internet import defer
import simplejson as json
from canonicaljson import json
import jsonschema
from jsonschema import FormatChecker

View file

@ -23,6 +23,7 @@ from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.directory import DirectoryStore
@ -62,7 +63,7 @@ class AppserviceServer(HomeServer):
for res in listener_config["resources"]:
for name in res["names"]:
if name == "metrics":
resources[METRICS_PREFIX] = MetricsResource(self)
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
root_resource = create_resource_tree(resources, NoResource())
@ -97,7 +98,7 @@ class AppserviceServer(HomeServer):
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
"collect_metrics is not enabled!"))
"enable_metrics is not True!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])

View file

@ -122,7 +122,7 @@ class ClientReaderServer(HomeServer):
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
"collect_metrics is not enabled!"))
"enable_metrics is not True!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])

View file

@ -138,7 +138,7 @@ class EventCreatorServer(HomeServer):
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
"collect_metrics is not enabled!"))
"enable_metrics is not True!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])

View file

@ -111,7 +111,7 @@ class FederationReaderServer(HomeServer):
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
"collect_metrics is not enabled!"))
"enable_metrics is not True!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])

View file

@ -125,7 +125,7 @@ class FederationSenderServer(HomeServer):
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
"collect_metrics is not enabled!"))
"enable_metrics is not True!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])

View file

@ -176,7 +176,7 @@ class FrontendProxyServer(HomeServer):
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
"collect_metrics is not enabled!"))
"enable_metrics is not True!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])

View file

@ -266,7 +266,7 @@ class SynapseHomeServer(HomeServer):
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
"collect_metrics is not enabled!"))
"enable_metrics is not True!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])
@ -318,11 +318,6 @@ def setup(config_options):
# check any extra requirements we have now we have a config
check_requirements(config)
version_string = "Synapse/" + get_version_string(synapse)
logger.info("Server hostname: %s", config.server_name)
logger.info("Server version: %s", version_string)
events.USE_FROZEN_DICTS = config.use_frozen_dicts
tls_server_context_factory = context_factory.ServerContextFactory(config)
@ -335,7 +330,7 @@ def setup(config_options):
db_config=config.database_config,
tls_server_context_factory=tls_server_context_factory,
config=config,
version_string=version_string,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,
)

View file

@ -118,7 +118,7 @@ class MediaRepositoryServer(HomeServer):
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
"collect_metrics is not enabled!"))
"enable_metrics is not True!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])

View file

@ -128,7 +128,7 @@ class PusherServer(HomeServer):
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
"collect_metrics is not enabled!"))
"enable_metrics is not True!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])

View file

@ -305,7 +305,7 @@ class SynchrotronServer(HomeServer):
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
"collect_metrics is not enabled!"))
"enable_metrics is not True!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])

View file

@ -150,7 +150,7 @@ class UserDirectoryServer(HomeServer):
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
"collect_metrics is not enabled!"))
"enable_metrics is not True!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])

View file

@ -85,7 +85,8 @@ class ApplicationService(object):
NS_LIST = [NS_USERS, NS_ALIASES, NS_ROOMS]
def __init__(self, token, hostname, url=None, namespaces=None, hs_token=None,
sender=None, id=None, protocols=None, rate_limited=True):
sender=None, id=None, protocols=None, rate_limited=True,
ip_range_whitelist=None):
self.token = token
self.url = url
self.hs_token = hs_token
@ -93,6 +94,7 @@ class ApplicationService(object):
self.server_name = hostname
self.namespaces = self._check_namespaces(namespaces)
self.id = id
self.ip_range_whitelist = ip_range_whitelist
if "|" in self.id:
raise Exception("application service ID cannot contain '|' character")

View file

@ -17,6 +17,8 @@ from ._base import Config, ConfigError
from synapse.appservice import ApplicationService
from synapse.types import UserID
from netaddr import IPSet
import yaml
import logging
@ -154,6 +156,13 @@ def _load_appservice(hostname, as_info, config_filename):
" will not receive events or queries.",
config_filename,
)
ip_range_whitelist = None
if as_info.get('ip_range_whitelist'):
ip_range_whitelist = IPSet(
as_info.get('ip_range_whitelist')
)
return ApplicationService(
token=as_info["as_token"],
hostname=hostname,
@ -163,5 +172,6 @@ def _load_appservice(hostname, as_info, config_filename):
sender=user_id,
id=as_info["id"],
protocols=protocols,
rate_limited=rate_limited
rate_limited=rate_limited,
ip_range_whitelist=ip_range_whitelist,
)

View file

@ -12,17 +12,20 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ._base import Config
from synapse.util.logcontext import LoggingContextFilter
from twisted.logger import globalLogBeginner, STDLibLogObserver
import logging
import logging.config
import yaml
from string import Template
import os
import signal
from string import Template
import sys
from twisted.logger import STDLibLogObserver, globalLogBeginner
import yaml
import synapse
from synapse.util.logcontext import LoggingContextFilter
from synapse.util.versionstring import get_version_string
from ._base import Config
DEFAULT_LOG_CONFIG = Template("""
version: 1
@ -202,6 +205,15 @@ def setup_logging(config, use_worker_options=False):
if getattr(signal, "SIGHUP"):
signal.signal(signal.SIGHUP, sighup)
# make sure that the first thing we log is a thing we can grep backwards
# for
logging.warn("***** STARTING SERVER *****")
logging.warn(
"Server %s version %s",
sys.argv[0], get_version_string(synapse),
)
logging.info("Server hostname: %s", config.server_name)
# It's critical to point twisted's internal logging somewhere, otherwise it
# stacks up and leaks kup to 64K object;
# see: https://twistedmatrix.com/trac/ticket/8164

View file

@ -18,7 +18,7 @@ from twisted.web.http import HTTPClient
from twisted.internet.protocol import Factory
from twisted.internet import defer, reactor
from synapse.http.endpoint import matrix_federation_endpoint
import simplejson as json
from canonicaljson import json
import logging

View file

@ -27,10 +27,12 @@ from synapse.util.metrics import Measure
from twisted.internet import defer
from signedjson.sign import (
verify_signed_json, signature_ids, sign_json, encode_canonical_json
verify_signed_json, signature_ids, sign_json, encode_canonical_json,
SignatureVerifyException,
)
from signedjson.key import (
is_signing_algorithm_supported, decode_verify_key_bytes
is_signing_algorithm_supported, decode_verify_key_bytes,
encode_verify_key_base64,
)
from unpaddedbase64 import decode_base64, encode_base64
@ -56,7 +58,7 @@ Attributes:
key_ids(set(str)): The set of key_ids to that could be used to verify the
JSON object
json_object(dict): The JSON object to verify.
deferred(twisted.internet.defer.Deferred):
deferred(Deferred[str, str, nacl.signing.VerifyKey]):
A deferred (server_name, key_id, verify_key) tuple that resolves when
a verify key has been fetched. The deferreds' callbacks are run with no
logcontext.
@ -736,6 +738,17 @@ class Keyring(object):
@defer.inlineCallbacks
def _handle_key_deferred(verify_request):
"""Waits for the key to become available, and then performs a verification
Args:
verify_request (VerifyKeyRequest):
Returns:
Deferred[None]
Raises:
SynapseError if there was a problem performing the verification
"""
server_name = verify_request.server_name
try:
with PreserveLoggingContext():
@ -768,11 +781,17 @@ def _handle_key_deferred(verify_request):
))
try:
verify_signed_json(json_object, server_name, verify_key)
except Exception:
except SignatureVerifyException as e:
logger.debug(
"Error verifying signature for %s:%s:%s with key %s: %s",
server_name, verify_key.alg, verify_key.version,
encode_verify_key_base64(verify_key),
str(e),
)
raise SynapseError(
401,
"Invalid signature for server %s with key %s:%s" % (
server_name, verify_key.alg, verify_key.version
"Invalid signature for server %s with key %s:%s: %s" % (
server_name, verify_key.alg, verify_key.version, str(e),
),
Codes.UNAUTHORIZED,
)

View file

@ -34,9 +34,11 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True):
event: the event being checked.
auth_events (dict: event-key -> event): the existing room state.
Raises:
AuthError if the checks fail
Returns:
True if the auth checks pass.
if the auth checks pass.
"""
if do_size_check:
_check_size_limits(event)
@ -71,7 +73,7 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True):
# Oh, we don't know what the state of the room was, so we
# are trusting that this is allowed (at least for now)
logger.warn("Trusting event: %s", event.event_id)
return True
return
if event.type == EventTypes.Create:
room_id_domain = get_domain_from_id(event.room_id)
@ -81,7 +83,8 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True):
"Creation event's room_id domain does not match sender's"
)
# FIXME
return True
logger.debug("Allowing! %s", event)
return
creation_event = auth_events.get((EventTypes.Create, ""), None)
@ -118,7 +121,8 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True):
403,
"Alias event's state_key does not match sender's domain"
)
return True
logger.debug("Allowing! %s", event)
return
if logger.isEnabledFor(logging.DEBUG):
logger.debug(
@ -127,14 +131,9 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True):
)
if event.type == EventTypes.Member:
allowed = _is_membership_change_allowed(
event, auth_events
)
if allowed:
logger.debug("Allowing! %s", event)
else:
logger.debug("Denying! %s", event)
return allowed
_is_membership_change_allowed(event, auth_events)
logger.debug("Allowing! %s", event)
return
_check_event_sender_in_room(event, auth_events)
@ -153,7 +152,8 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True):
)
)
else:
return True
logger.debug("Allowing! %s", event)
return
_can_send_event(event, auth_events)
@ -200,7 +200,7 @@ def _is_membership_change_allowed(event, auth_events):
create = auth_events.get(key)
if create and event.prev_events[0][0] == create.event_id:
if create.content["creator"] == event.state_key:
return True
return
target_user_id = event.state_key
@ -265,13 +265,13 @@ def _is_membership_change_allowed(event, auth_events):
raise AuthError(
403, "%s is banned from the room" % (target_user_id,)
)
return True
return
if Membership.JOIN != membership:
if (caller_invited
and Membership.LEAVE == membership
and target_user_id == event.user_id):
return True
return
if not caller_in_room: # caller isn't joined
raise AuthError(
@ -334,8 +334,6 @@ def _is_membership_change_allowed(event, auth_events):
else:
raise AuthError(500, "Unknown membership %s" % membership)
return True
def _check_event_sender_in_room(event, auth_events):
key = (EventTypes.Member, event.user_id, )
@ -355,35 +353,46 @@ def _check_joined_room(member, user_id, room_id):
))
def get_send_level(etype, state_key, auth_events):
key = (EventTypes.PowerLevels, "", )
send_level_event = auth_events.get(key)
send_level = None
if send_level_event:
send_level = send_level_event.content.get("events", {}).get(
etype
)
if send_level is None:
if state_key is not None:
send_level = send_level_event.content.get(
"state_default", 50
)
else:
send_level = send_level_event.content.get(
"events_default", 0
)
def get_send_level(etype, state_key, power_levels_event):
"""Get the power level required to send an event of a given type
if send_level:
send_level = int(send_level)
The federation spec [1] refers to this as "Required Power Level".
https://matrix.org/docs/spec/server_server/unstable.html#definitions
Args:
etype (str): type of event
state_key (str|None): state_key of state event, or None if it is not
a state event.
power_levels_event (synapse.events.EventBase|None): power levels event
in force at this point in the room
Returns:
int: power level required to send this event.
"""
if power_levels_event:
power_levels_content = power_levels_event.content
else:
send_level = 0
power_levels_content = {}
return send_level
# see if we have a custom level for this event type
send_level = power_levels_content.get("events", {}).get(etype)
# otherwise, fall back to the state_default/events_default.
if send_level is None:
if state_key is not None:
send_level = power_levels_content.get("state_default", 50)
else:
send_level = power_levels_content.get("events_default", 0)
return int(send_level)
def _can_send_event(event, auth_events):
power_levels_event = _get_power_level_event(auth_events)
send_level = get_send_level(
event.type, event.get("state_key", None), auth_events
event.type, event.get("state_key"), power_levels_event,
)
user_level = get_user_power_level(event.user_id, auth_events)
@ -524,13 +533,22 @@ def _check_power_levels(event, auth_events):
def _get_power_level_event(auth_events):
key = (EventTypes.PowerLevels, "", )
return auth_events.get(key)
return auth_events.get((EventTypes.PowerLevels, ""))
def get_user_power_level(user_id, auth_events):
power_level_event = _get_power_level_event(auth_events)
"""Get a user's power level
Args:
user_id (str): user's id to look up in power_levels
auth_events (dict[(str, str), synapse.events.EventBase]):
state in force at this point in the room (or rather, a subset of
it including at least the create event and power levels event.
Returns:
int: the user's power level in this room.
"""
power_level_event = _get_power_level_event(auth_events)
if power_level_event:
level = power_level_event.content.get("users", {}).get(user_id)
if not level:
@ -541,6 +559,11 @@ def get_user_power_level(user_id, auth_events):
else:
return int(level)
else:
# if there is no power levels event, the creator gets 100 and everyone
# else gets 0.
# some things which call this don't pass the create event: hack around
# that.
key = (EventTypes.Create, "", )
create_event = auth_events.get(key)
if (create_event is not None and

View file

@ -15,7 +15,7 @@
# limitations under the License.
import logging
import simplejson as json
from canonicaljson import json
from twisted.internet import defer
from synapse.api.errors import AuthError, FederationError, SynapseError, NotFoundError
@ -277,7 +277,7 @@ class FederationServer(FederationBase):
@defer.inlineCallbacks
@log_function
def on_pdu_request(self, origin, event_id):
pdu = yield self._get_persisted_pdu(origin, event_id)
pdu = yield self.handler.get_persisted_pdu(origin, event_id)
if pdu:
defer.returnValue(
@ -470,17 +470,6 @@ class FederationServer(FederationBase):
ts_now_ms = self._clock.time_msec()
return self.store.get_user_id_for_open_id_token(token, ts_now_ms)
@log_function
def _get_persisted_pdu(self, origin, event_id, do_auth=True):
""" Get a PDU from the database with given origin and id.
Returns:
Deferred: Results in a `Pdu`.
"""
return self.handler.get_persisted_pdu(
origin, event_id, do_auth=do_auth
)
def _transaction_from_pdus(self, pdu_list):
"""Returns a new Transaction containing the given PDUs suitable for
transmission.
@ -560,7 +549,9 @@ class FederationServer(FederationBase):
affected=pdu.event_id,
)
yield self.handler.on_receive_pdu(origin, pdu, get_missing=True)
yield self.handler.on_receive_pdu(
origin, pdu, get_missing=True, sent_to_us_directly=True,
)
def __str__(self):
return "<ReplicationLayer(%s)>" % self.server_name

View file

@ -21,7 +21,6 @@ from .units import Transaction, Edu
from synapse.api.errors import HttpResponseException, FederationDeniedError
from synapse.util import logcontext, PreserveLoggingContext
from synapse.util.async import run_on_reactor
from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
from synapse.util.metrics import measure_func
from synapse.handlers.presence import format_user_presence_state, get_interested_remotes
@ -42,8 +41,11 @@ import logging
logger = logging.getLogger(__name__)
sent_pdus_destination_dist = Counter(
"synapse_federation_transaction_queue_sent_pdu_destinations", ""
sent_pdus_destination_dist_count = Counter(
"synapse_federation_client_sent_pdu_destinations:count", ""
)
sent_pdus_destination_dist_total = Counter(
"synapse_federation_client_sent_pdu_destinations:total", ""
)
@ -280,7 +282,8 @@ class TransactionQueue(object):
if not destinations:
return
sent_pdus_destination_dist.inc(len(destinations))
sent_pdus_destination_dist_total.inc(len(destinations))
sent_pdus_destination_dist_count.inc()
for destination in destinations:
self.pending_pdus_by_dest.setdefault(destination, []).append(
@ -451,9 +454,6 @@ class TransactionQueue(object):
# hence why we throw the result away.
yield get_retry_limiter(destination, self.clock, self.store)
# XXX: what's this for?
yield run_on_reactor()
pending_pdus = []
while True:
device_message_edus, device_stream_id, dev_list_id = (

View file

@ -18,6 +18,7 @@ from twisted.internet import defer
from synapse.api.urls import FEDERATION_PREFIX as PREFIX
from synapse.api.errors import Codes, SynapseError, FederationDeniedError
from synapse.http.endpoint import parse_server_name
from synapse.http.server import JsonResource
from synapse.http.servlet import (
parse_json_object_from_request, parse_integer_from_args, parse_string_from_args,
@ -99,26 +100,6 @@ class Authenticator(object):
origin = None
def parse_auth_header(header_str):
try:
params = auth.split(" ")[1].split(",")
param_dict = dict(kv.split("=") for kv in params)
def strip_quotes(value):
if value.startswith("\""):
return value[1:-1]
else:
return value
origin = strip_quotes(param_dict["origin"])
key = strip_quotes(param_dict["key"])
sig = strip_quotes(param_dict["sig"])
return (origin, key, sig)
except Exception:
raise AuthenticationError(
400, "Malformed Authorization header", Codes.UNAUTHORIZED
)
auth_headers = request.requestHeaders.getRawHeaders(b"Authorization")
if not auth_headers:
@ -127,8 +108,8 @@ class Authenticator(object):
)
for auth in auth_headers:
if auth.startswith("X-Matrix"):
(origin, key, sig) = parse_auth_header(auth)
if auth.startswith(b"X-Matrix"):
(origin, key, sig) = _parse_auth_header(auth)
json_request["origin"] = origin
json_request["signatures"].setdefault(origin, {})[key] = sig
@ -165,6 +146,47 @@ class Authenticator(object):
logger.exception("Error resetting retry timings on %s", origin)
def _parse_auth_header(header_bytes):
"""Parse an X-Matrix auth header
Args:
header_bytes (bytes): header value
Returns:
Tuple[str, str, str]: origin, key id, signature.
Raises:
AuthenticationError if the header could not be parsed
"""
try:
header_str = header_bytes.decode('utf-8')
params = header_str.split(" ")[1].split(",")
param_dict = dict(kv.split("=") for kv in params)
def strip_quotes(value):
if value.startswith(b"\""):
return value[1:-1]
else:
return value
origin = strip_quotes(param_dict["origin"])
# ensure that the origin is a valid server name
parse_server_name(origin)
key = strip_quotes(param_dict["key"])
sig = strip_quotes(param_dict["sig"])
return origin, key, sig
except Exception as e:
logger.warn(
"Error parsing auth header '%s': %s",
header_bytes.decode('ascii', 'replace'),
e,
)
raise AuthenticationError(
400, "Malformed Authorization header", Codes.UNAUTHORIZED,
)
class BaseFederationServlet(object):
REQUIRE_AUTH = True

View file

@ -13,8 +13,11 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet import defer, threads
from canonicaljson import json
from ._base import BaseHandler
from synapse.api.constants import LoginType
from synapse.api.errors import (
@ -23,7 +26,6 @@ from synapse.api.errors import (
)
from synapse.module_api import ModuleApi
from synapse.types import UserID
from synapse.util.async import run_on_reactor
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.logcontext import make_deferred_yieldable
@ -32,7 +34,7 @@ from twisted.web.client import PartialDownloadError
import logging
import bcrypt
import pymacaroons
import simplejson
import attr
import synapse.util.stringutils as stringutils
@ -402,7 +404,7 @@ class AuthHandler(BaseHandler):
except PartialDownloadError as pde:
# Twisted is silly
data = pde.response
resp_body = simplejson.loads(data)
resp_body = json.loads(data)
if 'success' in resp_body:
# Note that we do NOT check the hostname here: we explicitly
@ -423,15 +425,11 @@ class AuthHandler(BaseHandler):
def _check_msisdn(self, authdict, _):
return self._check_threepid('msisdn', authdict)
@defer.inlineCallbacks
def _check_dummy_auth(self, authdict, _):
yield run_on_reactor()
defer.returnValue(True)
return defer.succeed(True)
@defer.inlineCallbacks
def _check_threepid(self, medium, authdict):
yield run_on_reactor()
if 'threepid_creds' not in authdict:
raise LoginError(400, "Missing threepid_creds", Codes.MISSING_PARAM)
@ -825,6 +823,15 @@ class AuthHandler(BaseHandler):
if medium == 'email':
address = address.lower()
identity_handler = self.hs.get_handlers().identity_handler
yield identity_handler.unbind_threepid(
user_id,
{
'medium': medium,
'address': address,
},
)
ret = yield self.store.user_delete_threepid(
user_id, medium, address,
)
@ -849,7 +856,11 @@ class AuthHandler(BaseHandler):
return bcrypt.hashpw(password.encode('utf8') + self.hs.config.password_pepper,
bcrypt.gensalt(self.bcrypt_rounds))
return make_deferred_yieldable(threads.deferToThread(_do_hash))
return make_deferred_yieldable(
threads.deferToThreadPool(
self.hs.get_reactor(), self.hs.get_reactor().getThreadPool(), _do_hash
),
)
def validate_hash(self, password, stored_hash):
"""Validates that self.hash(password) == stored_hash.
@ -869,16 +880,21 @@ class AuthHandler(BaseHandler):
)
if stored_hash:
return make_deferred_yieldable(threads.deferToThread(_do_validate_hash))
return make_deferred_yieldable(
threads.deferToThreadPool(
self.hs.get_reactor(),
self.hs.get_reactor().getThreadPool(),
_do_validate_hash,
),
)
else:
return defer.succeed(False)
class MacaroonGeneartor(object):
def __init__(self, hs):
self.clock = hs.get_clock()
self.server_name = hs.config.server_name
self.macaroon_secret_key = hs.config.macaroon_secret_key
@attr.s
class MacaroonGenerator(object):
hs = attr.ib()
def generate_access_token(self, user_id, extra_caveats=None):
extra_caveats = extra_caveats or []
@ -896,7 +912,7 @@ class MacaroonGeneartor(object):
def generate_short_term_login_token(self, user_id, duration_in_ms=(2 * 60 * 1000)):
macaroon = self._generate_base_macaroon(user_id)
macaroon.add_first_party_caveat("type = login")
now = self.clock.time_msec()
now = self.hs.get_clock().time_msec()
expiry = now + duration_in_ms
macaroon.add_first_party_caveat("time < %d" % (expiry,))
return macaroon.serialize()
@ -908,9 +924,9 @@ class MacaroonGeneartor(object):
def _generate_base_macaroon(self, user_id):
macaroon = pymacaroons.Macaroon(
location=self.server_name,
location=self.hs.config.server_name,
identifier="key",
key=self.macaroon_secret_key)
key=self.hs.config.macaroon_secret_key)
macaroon.add_first_party_caveat("gen = 1")
macaroon.add_first_party_caveat("user_id = %s" % (user_id,))
return macaroon

View file

@ -12,11 +12,12 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet import defer, reactor
from twisted.internet import defer
from ._base import BaseHandler
from synapse.types import UserID, create_requester
from synapse.util.logcontext import run_in_background
from synapse.api.errors import SynapseError
import logging
@ -30,6 +31,7 @@ class DeactivateAccountHandler(BaseHandler):
self._auth_handler = hs.get_auth_handler()
self._device_handler = hs.get_device_handler()
self._room_member_handler = hs.get_room_member_handler()
self._identity_handler = hs.get_handlers().identity_handler
self.user_directory_handler = hs.get_user_directory_handler()
# Flag that indicates whether the process to part users from rooms is running
@ -37,14 +39,15 @@ class DeactivateAccountHandler(BaseHandler):
# Start the user parter loop so it can resume parting users from rooms where
# it left off (if it has work left to do).
reactor.callWhenRunning(self._start_user_parting)
hs.get_reactor().callWhenRunning(self._start_user_parting)
@defer.inlineCallbacks
def deactivate_account(self, user_id):
def deactivate_account(self, user_id, erase_data):
"""Deactivate a user's account
Args:
user_id (str): ID of user to be deactivated
erase_data (bool): whether to GDPR-erase the user's data
Returns:
Deferred
@ -52,14 +55,35 @@ class DeactivateAccountHandler(BaseHandler):
# FIXME: Theoretically there is a race here wherein user resets
# password using threepid.
# first delete any devices belonging to the user, which will also
# delete threepids first. We remove these from the IS so if this fails,
# leave the user still active so they can try again.
# Ideally we would prevent password resets and then do this in the
# background thread.
threepids = yield self.store.user_get_threepids(user_id)
for threepid in threepids:
try:
yield self._identity_handler.unbind_threepid(
user_id,
{
'medium': threepid['medium'],
'address': threepid['address'],
},
)
except Exception:
# Do we want this to be a fatal error or should we carry on?
logger.exception("Failed to remove threepid from ID server")
raise SynapseError(400, "Failed to remove threepid from ID server")
yield self.store.user_delete_threepid(
user_id, threepid['medium'], threepid['address'],
)
# delete any devices belonging to the user, which will also
# delete corresponding access tokens.
yield self._device_handler.delete_all_devices_for_user(user_id)
# then delete any remaining access tokens which weren't associated with
# a device.
yield self._auth_handler.delete_access_tokens_for_user(user_id)
yield self.store.user_delete_threepids(user_id)
yield self.store.user_set_password_hash(user_id, None)
# Add the user to a table of users pending deactivation (ie.
@ -69,6 +93,11 @@ class DeactivateAccountHandler(BaseHandler):
# delete from user directory
yield self.user_directory_handler.handle_user_deactivated(user_id)
# Mark the user as erased, if they asked for that
if erase_data:
logger.info("Marking %s as erased", user_id)
yield self.store.mark_user_erased(user_id)
# Now start the process that goes through that list and
# parts users from rooms (if it isn't already running)
self._start_user_parting()

View file

@ -14,10 +14,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import simplejson as json
import logging
from canonicaljson import encode_canonical_json
from canonicaljson import encode_canonical_json, json
from twisted.internet import defer
from six import iteritems
@ -80,7 +79,7 @@ class E2eKeysHandler(object):
else:
remote_queries[user_id] = device_ids
# Firt get local devices.
# First get local devices.
failures = {}
results = {}
if local_query:
@ -357,7 +356,7 @@ def _exception_to_failure(e):
# include ConnectionRefused and other errors
#
# Note that some Exceptions (notably twisted's ResponseFailed etc) don't
# give a string for e.message, which simplejson then fails to serialize.
# give a string for e.message, which json then fails to serialize.
return {
"status": 503, "message": str(e.message),
}

View file

@ -39,11 +39,12 @@ from synapse.events.validator import EventValidator
from synapse.util import unwrapFirstError, logcontext
from synapse.util.metrics import measure_func
from synapse.util.logutils import log_function
from synapse.util.async import run_on_reactor, Linearizer
from synapse.util.async import Linearizer
from synapse.util.frozenutils import unfreeze
from synapse.crypto.event_signing import (
compute_event_signature, add_hashes_and_signatures,
)
from synapse.state import resolve_events_with_factory
from synapse.types import UserID, get_domain_from_id
from synapse.events.utils import prune_event
@ -89,7 +90,9 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
@log_function
def on_receive_pdu(self, origin, pdu, get_missing=True):
def on_receive_pdu(
self, origin, pdu, get_missing=True, sent_to_us_directly=False,
):
""" Process a PDU received via a federation /send/ transaction, or
via backfill of missing prev_events
@ -103,8 +106,10 @@ class FederationHandler(BaseHandler):
"""
# We reprocess pdus when we have seen them only as outliers
existing = yield self.get_persisted_pdu(
origin, pdu.event_id, do_auth=False
existing = yield self.store.get_event(
pdu.event_id,
allow_none=True,
allow_rejected=True,
)
# FIXME: Currently we fetch an event again when we already have it
@ -161,14 +166,11 @@ class FederationHandler(BaseHandler):
"Ignoring PDU %s for room %s from %s as we've left the room!",
pdu.event_id, pdu.room_id, origin,
)
return
defer.returnValue(None)
state = None
auth_chain = []
fetch_state = False
# Get missing pdus if necessary.
if not pdu.internal_metadata.is_outlier():
# We only backfill backwards to the min depth.
@ -223,26 +225,60 @@ class FederationHandler(BaseHandler):
list(prevs - seen)[:5],
)
if prevs - seen:
logger.info(
"Still missing %d events for room %r: %r...",
len(prevs - seen), pdu.room_id, list(prevs - seen)[:5]
if sent_to_us_directly and prevs - seen:
# If they have sent it to us directly, and the server
# isn't telling us about the auth events that it's
# made a message referencing, we explode
raise FederationError(
"ERROR",
403,
(
"Your server isn't divulging details about prev_events "
"referenced in this event."
),
affected=pdu.event_id,
)
fetch_state = True
elif prevs - seen:
# Calculate the state of the previous events, and
# de-conflict them to find the current state.
state_groups = []
auth_chains = set()
try:
# Get the state of the events we know about
ours = yield self.store.get_state_groups(pdu.room_id, list(seen))
state_groups.append(ours)
if fetch_state:
# We need to get the state at this event, since we haven't
# processed all the prev events.
logger.debug(
"_handle_new_pdu getting state for %s",
pdu.room_id
)
try:
state, auth_chain = yield self.replication_layer.get_state_for_room(
origin, pdu.room_id, pdu.event_id,
)
except Exception:
logger.exception("Failed to get state for event: %s", pdu.event_id)
# Ask the remote server for the states we don't
# know about
for p in prevs - seen:
state, got_auth_chain = (
yield self.replication_layer.get_state_for_room(
origin, pdu.room_id, p
)
)
auth_chains.update(got_auth_chain)
state_group = {(x.type, x.state_key): x.event_id for x in state}
state_groups.append(state_group)
# Resolve any conflicting state
def fetch(ev_ids):
return self.store.get_events(
ev_ids, get_prev_content=False, check_redacted=False
)
state_map = yield resolve_events_with_factory(
state_groups, {pdu.event_id: pdu}, fetch
)
state = (yield self.store.get_events(state_map.values())).values()
auth_chain = list(auth_chains)
except Exception:
raise FederationError(
"ERROR",
403,
"We can't get valid state history.",
affected=pdu.event_id,
)
yield self._process_received_pdu(
origin,
@ -320,11 +356,17 @@ class FederationHandler(BaseHandler):
for e in missing_events:
logger.info("Handling found event %s", e.event_id)
yield self.on_receive_pdu(
origin,
e,
get_missing=False
)
try:
yield self.on_receive_pdu(
origin,
e,
get_missing=False
)
except FederationError as e:
if e.code == 403:
logger.warn("Event %s failed history check.")
else:
raise
@log_function
@defer.inlineCallbacks
@ -458,6 +500,47 @@ class FederationHandler(BaseHandler):
@measure_func("_filter_events_for_server")
@defer.inlineCallbacks
def _filter_events_for_server(self, server_name, room_id, events):
"""Filter the given events for the given server, redacting those the
server can't see.
Assumes the server is currently in the room.
Returns
list[FrozenEvent]
"""
# First lets check to see if all the events have a history visibility
# of "shared" or "world_readable". If thats the case then we don't
# need to check membership (as we know the server is in the room).
event_to_state_ids = yield self.store.get_state_ids_for_events(
frozenset(e.event_id for e in events),
types=(
(EventTypes.RoomHistoryVisibility, ""),
)
)
visibility_ids = set()
for sids in event_to_state_ids.itervalues():
hist = sids.get((EventTypes.RoomHistoryVisibility, ""))
if hist:
visibility_ids.add(hist)
# If we failed to find any history visibility events then the default
# is "shared" visiblity.
if not visibility_ids:
defer.returnValue(events)
event_map = yield self.store.get_events(visibility_ids)
all_open = all(
e.content.get("history_visibility") in (None, "shared", "world_readable")
for e in event_map.itervalues()
)
if all_open:
defer.returnValue(events)
# Ok, so we're dealing with events that have non-trivial visibility
# rules, so we need to also get the memberships of the room.
event_to_state_ids = yield self.store.get_state_ids_for_events(
frozenset(e.event_id for e in events),
types=(
@ -493,7 +576,20 @@ class FederationHandler(BaseHandler):
for e_id, key_to_eid in event_to_state_ids.iteritems()
}
erased_senders = yield self.store.are_users_erased(
e.sender for e in events,
)
def redact_disallowed(event, state):
# if the sender has been gdpr17ed, always return a redacted
# copy of the event.
if erased_senders[event.sender]:
logger.info(
"Sender of %s has been erased, redacting",
event.event_id,
)
return prune_event(event)
if not state:
return event
@ -1371,8 +1467,6 @@ class FederationHandler(BaseHandler):
def get_state_for_pdu(self, room_id, event_id):
"""Returns the state at the event. i.e. not including said event.
"""
yield run_on_reactor()
state_groups = yield self.store.get_state_groups(
room_id, [event_id]
)
@ -1403,8 +1497,6 @@ class FederationHandler(BaseHandler):
def get_state_ids_for_pdu(self, room_id, event_id):
"""Returns the state at the event. i.e. not including said event.
"""
yield run_on_reactor()
state_groups = yield self.store.get_state_groups_ids(
room_id, [event_id]
)
@ -1446,11 +1538,20 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
@log_function
def get_persisted_pdu(self, origin, event_id, do_auth=True):
""" Get a PDU from the database with given origin and id.
def get_persisted_pdu(self, origin, event_id):
"""Get an event from the database for the given server.
Args:
origin [str]: hostname of server which is requesting the event; we
will check that the server is allowed to see it.
event_id [str]: id of the event being requested
Returns:
Deferred: Results in a `Pdu`.
Deferred[EventBase|None]: None if we know nothing about the event;
otherwise the (possibly-redacted) event.
Raises:
AuthError if the server is not currently in the room
"""
event = yield self.store.get_event(
event_id,
@ -1459,20 +1560,17 @@ class FederationHandler(BaseHandler):
)
if event:
if do_auth:
in_room = yield self.auth.check_host_in_room(
event.room_id,
origin
)
if not in_room:
raise AuthError(403, "Host not in room.")
events = yield self._filter_events_for_server(
origin, event.room_id, [event]
)
event = events[0]
in_room = yield self.auth.check_host_in_room(
event.room_id,
origin
)
if not in_room:
raise AuthError(403, "Host not in room.")
events = yield self._filter_events_for_server(
origin, event.room_id, [event]
)
event = events[0]
defer.returnValue(event)
else:
defer.returnValue(None)
@ -1751,6 +1849,10 @@ class FederationHandler(BaseHandler):
min_depth=min_depth,
)
missing_events = yield self._filter_events_for_server(
origin, room_id, missing_events,
)
defer.returnValue(missing_events)
@defer.inlineCallbacks

View file

@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
# Copyright 2017 Vector Creations Ltd
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -18,7 +19,7 @@
import logging
import simplejson as json
from canonicaljson import json
from twisted.internet import defer
@ -26,7 +27,6 @@ from synapse.api.errors import (
MatrixCodeMessageException, CodeMessageException
)
from ._base import BaseHandler
from synapse.util.async import run_on_reactor
from synapse.api.errors import SynapseError, Codes
logger = logging.getLogger(__name__)
@ -38,6 +38,7 @@ class IdentityHandler(BaseHandler):
super(IdentityHandler, self).__init__(hs)
self.http_client = hs.get_simple_http_client()
self.federation_http_client = hs.get_http_client()
self.trusted_id_servers = set(hs.config.trusted_third_party_id_servers)
self.trust_any_id_server_just_for_testing_do_not_use = (
@ -60,8 +61,6 @@ class IdentityHandler(BaseHandler):
@defer.inlineCallbacks
def threepid_from_creds(self, creds):
yield run_on_reactor()
if 'id_server' in creds:
id_server = creds['id_server']
elif 'idServer' in creds:
@ -104,7 +103,6 @@ class IdentityHandler(BaseHandler):
@defer.inlineCallbacks
def bind_threepid(self, creds, mxid):
yield run_on_reactor()
logger.debug("binding threepid %r to %s", creds, mxid)
data = None
@ -139,9 +137,53 @@ class IdentityHandler(BaseHandler):
defer.returnValue(data)
@defer.inlineCallbacks
def requestEmailToken(self, id_server, email, client_secret, send_attempt, **kwargs):
yield run_on_reactor()
def unbind_threepid(self, mxid, threepid):
"""
Removes a binding from an identity server
Args:
mxid (str): Matrix user ID of binding to be removed
threepid (dict): Dict with medium & address of binding to be removed
Returns:
Deferred[bool]: True on success, otherwise False
"""
logger.debug("unbinding threepid %r from %s", threepid, mxid)
if not self.trusted_id_servers:
logger.warn("Can't unbind threepid: no trusted ID servers set in config")
defer.returnValue(False)
# We don't track what ID server we added 3pids on (perhaps we ought to)
# but we assume that any of the servers in the trusted list are in the
# same ID server federation, so we can pick any one of them to send the
# deletion request to.
id_server = next(iter(self.trusted_id_servers))
url = "https://%s/_matrix/identity/api/v1/3pid/unbind" % (id_server,)
content = {
"mxid": mxid,
"threepid": threepid,
}
headers = {}
# we abuse the federation http client to sign the request, but we have to send it
# using the normal http client since we don't want the SRV lookup and want normal
# 'browser-like' HTTPS.
self.federation_http_client.sign_request(
destination=None,
method='POST',
url_bytes='/_matrix/identity/api/v1/3pid/unbind'.encode('ascii'),
headers_dict=headers,
content=content,
destination_is=id_server,
)
yield self.http_client.post_json_get_json(
url,
content,
headers,
)
defer.returnValue(True)
@defer.inlineCallbacks
def requestEmailToken(self, id_server, email, client_secret, send_attempt, **kwargs):
if not self._should_trust_id_server(id_server):
raise SynapseError(
400, "Untrusted ID server '%s'" % id_server,
@ -176,8 +218,6 @@ class IdentityHandler(BaseHandler):
self, id_server, country, phone_number,
client_secret, send_attempt, **kwargs
):
yield run_on_reactor()
if not self._should_trust_id_server(id_server):
raise SynapseError(
400, "Untrusted ID server '%s'" % id_server,

View file

@ -14,13 +14,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import simplejson
import sys
from canonicaljson import encode_canonical_json
from canonicaljson import encode_canonical_json, json
import six
from six import string_types, itervalues, iteritems
from twisted.internet import defer, reactor
from twisted.internet import defer
from twisted.internet.defer import succeed
from twisted.python.failure import Failure
@ -36,7 +35,7 @@ from synapse.events.validator import EventValidator
from synapse.types import (
UserID, RoomAlias, RoomStreamToken,
)
from synapse.util.async import run_on_reactor, ReadWriteLock, Limiter
from synapse.util.async import ReadWriteLock, Limiter
from synapse.util.logcontext import run_in_background
from synapse.util.metrics import measure_func
from synapse.util.frozenutils import frozendict_json_encoder
@ -157,7 +156,7 @@ class MessageHandler(BaseHandler):
# remove the purge from the list 24 hours after it completes
def clear_purge():
del self._purges_by_id[purge_id]
reactor.callLater(24 * 3600, clear_purge)
self.hs.get_reactor().callLater(24 * 3600, clear_purge)
def get_purge_status(self, purge_id):
"""Get the current status of an active purge
@ -491,7 +490,7 @@ class EventCreationHandler(object):
target, e
)
is_exempt = yield self._is_exempt_from_privacy_policy(builder)
is_exempt = yield self._is_exempt_from_privacy_policy(builder, requester)
if not is_exempt:
yield self.assert_accepted_privacy_policy(requester)
@ -509,12 +508,13 @@ class EventCreationHandler(object):
defer.returnValue((event, context))
def _is_exempt_from_privacy_policy(self, builder):
def _is_exempt_from_privacy_policy(self, builder, requester):
""""Determine if an event to be sent is exempt from having to consent
to the privacy policy
Args:
builder (synapse.events.builder.EventBuilder): event being created
requester (Requster): user requesting this event
Returns:
Deferred[bool]: true if the event can be sent without the user
@ -525,6 +525,9 @@ class EventCreationHandler(object):
membership = builder.content.get("membership", None)
if membership == Membership.JOIN:
return self._is_server_notices_room(builder.room_id)
elif membership == Membership.LEAVE:
# the user is always allowed to leave (but not kick people)
return builder.state_key == requester.user.to_string()
return succeed(False)
@defer.inlineCallbacks
@ -793,7 +796,7 @@ class EventCreationHandler(object):
# Ensure that we can round trip before trying to persist in db
try:
dump = frozendict_json_encoder.encode(event.content)
simplejson.loads(dump)
json.loads(dump)
except Exception:
logger.exception("Failed to encode content: %r", event.content)
raise
@ -806,6 +809,7 @@ class EventCreationHandler(object):
# If we're a worker we need to hit out to the master.
if self.config.worker_app:
yield send_event_to_master(
self.hs.get_clock(),
self.http_client,
host=self.config.worker_replication_host,
port=self.config.worker_replication_http_port,
@ -959,9 +963,7 @@ class EventCreationHandler(object):
event_stream_id, max_stream_id
)
@defer.inlineCallbacks
def _notify():
yield run_on_reactor()
try:
self.notifier.on_new_room_event(
event, event_stream_id, max_stream_id,

View file

@ -22,7 +22,7 @@ The methods that define policy are:
- should_notify
"""
from twisted.internet import defer, reactor
from twisted.internet import defer
from contextlib import contextmanager
from six import itervalues, iteritems
@ -179,7 +179,7 @@ class PresenceHandler(object):
# have not yet been persisted
self.unpersisted_users_changes = set()
reactor.addSystemEventTrigger("before", "shutdown", self._on_shutdown)
hs.get_reactor().addSystemEventTrigger("before", "shutdown", self._on_shutdown)
self.serial_to_user = {}
self._next_serial = 1

View file

@ -24,7 +24,7 @@ from synapse.api.errors import (
from synapse.http.client import CaptchaServerHttpClient
from synapse import types
from synapse.types import UserID, create_requester, RoomID, RoomAlias
from synapse.util.async import run_on_reactor, Linearizer
from synapse.util.async import Linearizer
from synapse.util.threepids import check_3pid_allowed
from ._base import BaseHandler
@ -139,7 +139,6 @@ class RegistrationHandler(BaseHandler):
Raises:
RegistrationError if there was a problem registering.
"""
yield run_on_reactor()
password_hash = None
if password:
password_hash = yield self.auth_handler().hash(password)
@ -431,8 +430,6 @@ class RegistrationHandler(BaseHandler):
Raises:
RegistrationError if there was a problem registering.
"""
yield run_on_reactor()
if localpart is None:
raise SynapseError(400, "Request must include user id")

View file

@ -23,7 +23,7 @@ from synapse.types import UserID, RoomAlias, RoomID, RoomStreamToken
from synapse.api.constants import (
EventTypes, JoinRules, RoomCreationPreset
)
from synapse.api.errors import AuthError, StoreError, SynapseError
from synapse.api.errors import AuthError, Codes, StoreError, SynapseError
from synapse.util import stringutils
from synapse.visibility import filter_events_for_client
@ -115,7 +115,11 @@ class RoomCreationHandler(BaseHandler):
)
if mapping:
raise SynapseError(400, "Room alias already taken")
raise SynapseError(
400,
"Room alias already taken",
Codes.ROOM_IN_USE
)
else:
room_alias = None

View file

@ -64,6 +64,13 @@ class SearchHandler(BaseHandler):
except Exception:
raise SynapseError(400, "Invalid batch")
logger.info(
"Search batch properties: %r, %r, %r",
batch_group, batch_group_key, batch_token,
)
logger.info("Search content: %s", content)
try:
room_cat = content["search_categories"]["room_events"]
@ -271,6 +278,8 @@ class SearchHandler(BaseHandler):
# We should never get here due to the guard earlier.
raise NotImplementedError()
logger.info("Found %d events to return", len(allowed_events))
# If client has asked for "context" for each event (i.e. some surrounding
# events and state), fetch that
if event_context is not None:
@ -282,6 +291,11 @@ class SearchHandler(BaseHandler):
event.room_id, event.event_id, before_limit, after_limit
)
logger.info(
"Context for search returned %d and %d events",
len(res["events_before"]), len(res["events_after"]),
)
res["events_before"] = yield filter_events_for_client(
self.store, user.to_string(), res["events_before"]
)

View file

@ -145,7 +145,7 @@ class SyncResult(collections.namedtuple("SyncResult", [
"invited", # InvitedSyncResult for each invited room.
"archived", # ArchivedSyncResult for each archived room.
"to_device", # List of direct messages for the device.
"device_lists", # List of user_ids whose devices have chanegd
"device_lists", # List of user_ids whose devices have changed
"device_one_time_keys_count", # Dict of algorithm to count for one time keys
# for this device
"groups",

View file

@ -19,7 +19,6 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes, JoinRules, Membership
from synapse.storage.roommember import ProfileInfo
from synapse.util.metrics import Measure
from synapse.util.async import sleep
from synapse.types import get_localpart_from_id
from six import iteritems
@ -174,7 +173,7 @@ class UserDirectoryHandler(object):
logger.info("Handling room %d/%d", num_processed_rooms + 1, len(room_ids))
yield self._handle_initial_room(room_id)
num_processed_rooms += 1
yield sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
logger.info("Processed all rooms.")
@ -188,7 +187,7 @@ class UserDirectoryHandler(object):
logger.info("Handling user %d/%d", num_processed_users + 1, len(user_ids))
yield self._handle_local_user(user_id)
num_processed_users += 1
yield sleep(self.INITIAL_USER_SLEEP_MS / 1000.)
yield self.clock.sleep(self.INITIAL_USER_SLEEP_MS / 1000.)
logger.info("Processed all users")
@ -236,7 +235,7 @@ class UserDirectoryHandler(object):
count = 0
for user_id in user_ids:
if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
yield sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
if not self.is_mine_id(user_id):
count += 1
@ -251,7 +250,7 @@ class UserDirectoryHandler(object):
continue
if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
yield sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
count += 1
user_set = (user_id, other_user_id)

View file

@ -42,7 +42,7 @@ from twisted.web._newclient import ResponseDone
from six import StringIO
from prometheus_client import Counter
import simplejson as json
from canonicaljson import json
import logging
import urllib
@ -98,8 +98,8 @@ class SimpleHttpClient(object):
method, uri, *args, **kwargs
)
add_timeout_to_deferred(
request_deferred,
60, cancelled_to_request_timed_out_error,
request_deferred, 60, self.hs.get_reactor(),
cancelled_to_request_timed_out_error,
)
response = yield make_deferred_yieldable(request_deferred)
@ -115,7 +115,7 @@ class SimpleHttpClient(object):
"Error sending request to %s %s: %s %s",
method, redact_uri(uri), type(e).__name__, e.message
)
raise e
raise
@defer.inlineCallbacks
def post_urlencoded_get_json(self, uri, args={}, headers=None):

View file

@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
from twisted.internet import defer, reactor
from twisted.internet import defer
from twisted.internet.error import ConnectError
from twisted.names import client, dns
from twisted.names.error import DNSNameError, DomainError
@ -38,6 +38,36 @@ _Server = collections.namedtuple(
)
def parse_server_name(server_name):
"""Split a server name into host/port parts.
Does some basic sanity checking of the
Args:
server_name (str): server name to parse
Returns:
Tuple[str, int|None]: host/port parts.
Raises:
ValueError if the server name could not be parsed.
"""
try:
if server_name[-1] == ']':
# ipv6 literal, hopefully
if server_name[0] != '[':
raise Exception()
return server_name, None
domain_port = server_name.rsplit(":", 1)
domain = domain_port[0]
port = int(domain_port[1]) if domain_port[1:] else None
return domain, port
except Exception:
raise ValueError("Invalid server name '%s'" % server_name)
def matrix_federation_endpoint(reactor, destination, ssl_context_factory=None,
timeout=None):
"""Construct an endpoint for the given matrix destination.
@ -50,9 +80,7 @@ def matrix_federation_endpoint(reactor, destination, ssl_context_factory=None,
timeout (int): connection timeout in seconds
"""
domain_port = destination.split(":")
domain = domain_port[0]
port = int(domain_port[1]) if domain_port[1:] else None
domain, port = parse_server_name(destination)
endpoint_kw_args = {}
@ -74,21 +102,22 @@ def matrix_federation_endpoint(reactor, destination, ssl_context_factory=None,
reactor, "matrix", domain, protocol="tcp",
default_port=default_port, endpoint=transport_endpoint,
endpoint_kw_args=endpoint_kw_args
))
), reactor)
else:
return _WrappingEndpointFac(transport_endpoint(
reactor, domain, port, **endpoint_kw_args
))
), reactor)
class _WrappingEndpointFac(object):
def __init__(self, endpoint_fac):
def __init__(self, endpoint_fac, reactor):
self.endpoint_fac = endpoint_fac
self.reactor = reactor
@defer.inlineCallbacks
def connect(self, protocolFactory):
conn = yield self.endpoint_fac.connect(protocolFactory)
conn = _WrappedConnection(conn)
conn = _WrappedConnection(conn, self.reactor)
defer.returnValue(conn)
@ -98,9 +127,10 @@ class _WrappedConnection(object):
"""
__slots__ = ["conn", "last_request"]
def __init__(self, conn):
def __init__(self, conn, reactor):
object.__setattr__(self, "conn", conn)
object.__setattr__(self, "last_request", time.time())
self._reactor = reactor
def __getattr__(self, name):
return getattr(self.conn, name)
@ -131,14 +161,14 @@ class _WrappedConnection(object):
# Time this connection out if we haven't send a request in the last
# N minutes
# TODO: Cancel the previous callLater?
reactor.callLater(3 * 60, self._time_things_out_maybe)
self._reactor.callLater(3 * 60, self._time_things_out_maybe)
d = self.conn.request(request)
def update_request_time(res):
self.last_request = time.time()
# TODO: Cancel the previous callLater?
reactor.callLater(3 * 60, self._time_things_out_maybe)
self._reactor.callLater(3 * 60, self._time_things_out_maybe)
return res
d.addCallback(update_request_time)

View file

@ -22,12 +22,12 @@ from twisted.web._newclient import ResponseDone
from synapse.http import cancelled_to_request_timed_out_error
from synapse.http.endpoint import matrix_federation_endpoint
import synapse.metrics
from synapse.util.async import sleep, add_timeout_to_deferred
from synapse.util.async import add_timeout_to_deferred
from synapse.util import logcontext
from synapse.util.logcontext import make_deferred_yieldable
import synapse.util.retryutils
from canonicaljson import encode_canonical_json
from canonicaljson import encode_canonical_json, json
from synapse.api.errors import (
SynapseError, Codes, HttpResponseException, FederationDeniedError,
@ -36,7 +36,6 @@ from synapse.api.errors import (
from signedjson.sign import sign_json
import cgi
import simplejson as json
import logging
import random
import sys
@ -193,6 +192,7 @@ class MatrixFederationHttpClient(object):
add_timeout_to_deferred(
request_deferred,
timeout / 1000. if timeout else 60,
self.hs.get_reactor(),
cancelled_to_request_timed_out_error,
)
response = yield make_deferred_yieldable(
@ -234,7 +234,7 @@ class MatrixFederationHttpClient(object):
delay = min(delay, 2)
delay *= random.uniform(0.8, 1.4)
yield sleep(delay)
yield self.clock.sleep(delay)
retries_left -= 1
else:
raise
@ -260,14 +260,35 @@ class MatrixFederationHttpClient(object):
defer.returnValue(response)
def sign_request(self, destination, method, url_bytes, headers_dict,
content=None):
content=None, destination_is=None):
"""
Signs a request by adding an Authorization header to headers_dict
Args:
destination (bytes|None): The desination home server of the request.
May be None if the destination is an identity server, in which case
destination_is must be non-None.
method (bytes): The HTTP method of the request
url_bytes (bytes): The URI path of the request
headers_dict (dict): Dictionary of request headers to append to
content (bytes): The body of the request
destination_is (bytes): As 'destination', but if the destination is an
identity server
Returns:
None
"""
request = {
"method": method,
"uri": url_bytes,
"origin": self.server_name,
"destination": destination,
}
if destination is not None:
request["destination"] = destination
if destination_is is not None:
request["destination_is"] = destination_is
if content is not None:
request["content"] = content

View file

@ -117,13 +117,17 @@ def _get_in_flight_counts():
Returns:
dict[tuple[str, str], int]
"""
for rm in _in_flight_requests:
# Cast to a list to prevent it changing while the Prometheus
# thread is collecting metrics
reqs = list(_in_flight_requests)
for rm in reqs:
rm.update_metrics()
# Map from (method, name) -> int, the number of in flight requests of that
# type
counts = {}
for rm in _in_flight_requests:
for rm in reqs:
key = (rm.method, rm.name,)
counts[key] = counts.get(key, 0) + 1
@ -131,7 +135,7 @@ def _get_in_flight_counts():
LaterGauge(
"synapse_http_request_metrics_in_flight_requests_count",
"synapse_http_server_in_flight_requests_count",
"",
["method", "servlet"],
_get_in_flight_counts,

View file

@ -29,7 +29,7 @@ import synapse.metrics
import synapse.events
from canonicaljson import (
encode_canonical_json, encode_pretty_printed_json
encode_canonical_json, encode_pretty_printed_json, json
)
from twisted.internet import defer
@ -41,7 +41,6 @@ from twisted.web.util import redirectTo
import collections
import logging
import urllib
import simplejson
logger = logging.getLogger(__name__)
@ -410,7 +409,7 @@ def respond_with_json(request, code, json_object, send_cors=False,
if canonical_json or synapse.events.USE_FROZEN_DICTS:
json_bytes = encode_canonical_json(json_object)
else:
json_bytes = simplejson.dumps(json_object)
json_bytes = json.dumps(json_object)
return respond_with_json_bytes(
request, code, json_bytes,

View file

@ -18,7 +18,9 @@
from synapse.api.errors import SynapseError, Codes
import logging
import simplejson
from canonicaljson import json
logger = logging.getLogger(__name__)
@ -171,7 +173,7 @@ def parse_json_value_from_request(request, allow_empty_body=False):
return None
try:
content = simplejson.loads(content_bytes)
content = json.loads(content_bytes)
except Exception as e:
logger.warn("Unable to parse JSON: %s", e)
raise SynapseError(400, "Content not JSON.", errcode=Codes.NOT_JSON)

View file

@ -99,19 +99,36 @@ class SynapseRequest(Request):
db_txn_count = context.db_txn_count
db_txn_duration_sec = context.db_txn_duration_sec
db_sched_duration_sec = context.db_sched_duration_sec
evt_db_fetch_count = context.evt_db_fetch_count
except Exception:
ru_utime, ru_stime = (0, 0)
db_txn_count, db_txn_duration_sec = (0, 0)
evt_db_fetch_count = 0
end_time = time.time()
# need to decode as it could be raw utf-8 bytes
# from a IDN servname in an auth header
authenticated_entity = self.authenticated_entity
if authenticated_entity is not None:
authenticated_entity = authenticated_entity.decode("utf-8", "replace")
# ...or could be raw utf-8 bytes in the User-Agent header.
# N.B. if you don't do this, the logger explodes cryptically
# with maximum recursion trying to log errors about
# the charset problem.
# c.f. https://github.com/matrix-org/synapse/issues/3471
user_agent = self.get_user_agent()
if user_agent is not None:
user_agent = user_agent.decode("utf-8", "replace")
self.site.access_logger.info(
"%s - %s - {%s}"
" Processed request: %.3fsec (%.3fsec, %.3fsec) (%.3fsec/%.3fsec/%d)"
" %sB %s \"%s %s %s\" \"%s\"",
" %sB %s \"%s %s %s\" \"%s\" [%d dbevts]",
self.getClientIP(),
self.site.site_tag,
self.authenticated_entity,
authenticated_entity,
end_time - self.start_time,
ru_utime,
ru_stime,
@ -123,7 +140,8 @@ class SynapseRequest(Request):
self.method,
self.get_redacted_uri(),
self.clientproto,
self.get_user_agent(),
user_agent,
evt_db_fetch_count,
)
try:

View file

@ -62,7 +62,7 @@ class LaterGauge(object):
calls = self.caller()
except Exception:
logger.exception(
"Exception running callback for LaterGuage(%s)",
"Exception running callback for LaterGauge(%s)",
self.name,
)
yield g
@ -140,14 +140,15 @@ gc_time = Histogram(
class GCCounts(object):
def collect(self):
cm = GaugeMetricFamily("python_gc_counts", "GC cycle counts", labels=["gen"])
cm = GaugeMetricFamily("python_gc_counts", "GC object counts", labels=["gen"])
for n, m in enumerate(gc.get_count()):
cm.add_metric([str(n)], m)
yield cm
REGISTRY.register(GCCounts())
if not running_on_pypy:
REGISTRY.register(GCCounts())
#
# Twisted reactor metrics
@ -190,6 +191,22 @@ event_processing_last_ts = Gauge("synapse_event_processing_last_ts", "", ["name"
# finished being processed.
event_processing_lag = Gauge("synapse_event_processing_lag", "", ["name"])
last_ticked = time.time()
class ReactorLastSeenMetric(object):
def collect(self):
cm = GaugeMetricFamily(
"python_twisted_reactor_last_seen",
"Seconds since the Twisted reactor was last seen",
)
cm.add_metric([], time.time() - last_ticked)
yield cm
REGISTRY.register(ReactorLastSeenMetric())
def runUntilCurrentTimer(func):
@ -222,6 +239,11 @@ def runUntilCurrentTimer(func):
tick_time.observe(end - start)
pending_calls_metric.observe(num_pending)
# Update the time we last ticked, for the metric to test whether
# Synapse's reactor has frozen
global last_ticked
last_ticked = end
if running_on_pypy:
return ret

View file

@ -161,6 +161,7 @@ class Notifier(object):
self.user_to_user_stream = {}
self.room_to_user_streams = {}
self.hs = hs
self.event_sources = hs.get_event_sources()
self.store = hs.get_datastore()
self.pending_new_room_events = []
@ -340,6 +341,7 @@ class Notifier(object):
add_timeout_to_deferred(
listener.deferred,
(end_time - now) / 1000.,
self.hs.get_reactor(),
)
with PreserveLoggingContext():
yield listener.deferred
@ -561,6 +563,7 @@ class Notifier(object):
add_timeout_to_deferred(
listener.deferred.addTimeout,
(end_time - now) / 1000.,
self.hs.get_reactor(),
)
try:
with PreserveLoggingContext():

View file

@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet import defer, reactor
from twisted.internet import defer
from twisted.internet.error import AlreadyCalled, AlreadyCancelled
import logging
@ -199,7 +199,7 @@ class EmailPusher(object):
self.timed_call = None
if soonest_due_at is not None:
self.timed_call = reactor.callLater(
self.timed_call = self.hs.get_reactor().callLater(
self.seconds_until(soonest_due_at), self.on_timer
)

View file

@ -15,7 +15,7 @@
# limitations under the License.
import logging
from twisted.internet import defer, reactor
from twisted.internet import defer
from twisted.internet.error import AlreadyCalled, AlreadyCancelled
from . import push_rule_evaluator
@ -220,7 +220,9 @@ class HttpPusher(object):
)
else:
logger.info("Push failed: delaying for %ds", self.backoff_delay)
self.timed_call = reactor.callLater(self.backoff_delay, self.on_timer)
self.timed_call = self.hs.get_reactor().callLater(
self.backoff_delay, self.on_timer
)
self.backoff_delay = min(self.backoff_delay * 2, self.MAX_BACKOFF_SEC)
break

View file

@ -19,7 +19,6 @@ import logging
from twisted.internet import defer
from synapse.push.pusher import PusherFactory
from synapse.util.async import run_on_reactor
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
logger = logging.getLogger(__name__)
@ -125,7 +124,6 @@ class PusherPool:
@defer.inlineCallbacks
def on_new_notifications(self, min_stream_id, max_stream_id):
yield run_on_reactor()
try:
users_affected = yield self.store.get_push_action_users_in_range(
min_stream_id, max_stream_id
@ -151,7 +149,6 @@ class PusherPool:
@defer.inlineCallbacks
def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids):
yield run_on_reactor()
try:
# Need to subtract 1 from the minimum because the lower bound here
# is not inclusive

View file

@ -21,7 +21,6 @@ from synapse.api.errors import (
from synapse.events import FrozenEvent
from synapse.events.snapshot import EventContext
from synapse.http.servlet import RestServlet, parse_json_object_from_request
from synapse.util.async import sleep
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.metrics import Measure
from synapse.types import Requester, UserID
@ -33,11 +32,12 @@ logger = logging.getLogger(__name__)
@defer.inlineCallbacks
def send_event_to_master(client, host, port, requester, event, context,
def send_event_to_master(clock, client, host, port, requester, event, context,
ratelimit, extra_users):
"""Send event to be handled on the master
Args:
clock (synapse.util.Clock)
client (SimpleHttpClient)
host (str): host of master
port (int): port on master listening for HTTP replication
@ -77,7 +77,7 @@ def send_event_to_master(client, host, port, requester, event, context,
# If we timed out we probably don't need to worry about backing
# off too much, but lets just wait a little anyway.
yield sleep(1)
yield clock.sleep(1)
except MatrixCodeMessageException as e:
# We convert to SynapseError as we know that it was a SynapseError
# on the master process that we should send to the client. (And

View file

@ -23,6 +23,7 @@ from synapse.storage.roommember import RoomMemberWorkerStore
from synapse.storage.state import StateGroupWorkerStore
from synapse.storage.stream import StreamWorkerStore
from synapse.storage.signatures import SignatureWorkerStore
from synapse.storage.user_erasure_store import UserErasureWorkerStore
from ._base import BaseSlavedStore
from ._slaved_id_tracker import SlavedIdTracker
@ -45,6 +46,7 @@ class SlavedEventStore(EventFederationWorkerStore,
EventsWorkerStore,
StateGroupWorkerStore,
SignatureWorkerStore,
UserErasureWorkerStore,
BaseSlavedStore):
def __init__(self, db_conn, hs):

View file

@ -15,7 +15,7 @@
"""A replication client for use by synapse workers.
"""
from twisted.internet import reactor, defer
from twisted.internet import defer
from twisted.internet.protocol import ReconnectingClientFactory
from .commands import (
@ -44,7 +44,7 @@ class ReplicationClientFactory(ReconnectingClientFactory):
self.server_name = hs.config.server_name
self._clock = hs.get_clock() # As self.clock is defined in super class
reactor.addSystemEventTrigger("before", "shutdown", self.stopTrying)
hs.get_reactor().addSystemEventTrigger("before", "shutdown", self.stopTrying)
def startedConnecting(self, connector):
logger.info("Connecting to replication: %r", connector.getDestination())
@ -95,7 +95,7 @@ class ReplicationClientHandler(object):
factory = ReplicationClientFactory(hs, client_name, self)
host = hs.config.worker_replication_host
port = hs.config.worker_replication_port
reactor.connectTCP(host, port, factory)
hs.get_reactor().connectTCP(host, port, factory)
def on_rdata(self, stream_name, token, rows):
"""Called when we get new replication data. By default this just pokes

View file

@ -19,13 +19,17 @@ allowed to be sent by which side.
"""
import logging
import simplejson
import platform
if platform.python_implementation() == "PyPy":
import json
_json_encoder = json.JSONEncoder()
else:
import simplejson as json
_json_encoder = json.JSONEncoder(namedtuple_as_object=False)
logger = logging.getLogger(__name__)
_json_encoder = simplejson.JSONEncoder(namedtuple_as_object=False)
class Command(object):
"""The base command class.
@ -102,7 +106,7 @@ class RdataCommand(Command):
return cls(
stream_name,
None if token == "batch" else int(token),
simplejson.loads(row_json)
json.loads(row_json)
)
def to_line(self):
@ -300,7 +304,7 @@ class InvalidateCacheCommand(Command):
def from_line(cls, line):
cache_func, keys_json = line.split(" ", 1)
return cls(cache_func, simplejson.loads(keys_json))
return cls(cache_func, json.loads(keys_json))
def to_line(self):
return " ".join((
@ -329,7 +333,7 @@ class UserIpCommand(Command):
def from_line(cls, line):
user_id, jsn = line.split(" ", 1)
access_token, ip, user_agent, device_id, last_seen = simplejson.loads(jsn)
access_token, ip, user_agent, device_id, last_seen = json.loads(jsn)
return cls(
user_id, access_token, ip, user_agent, device_id, last_seen

View file

@ -564,11 +564,13 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
# The following simply registers metrics for the replication connections
pending_commands = LaterGauge(
"pending_commands", "", ["name", "conn_id"],
"synapse_replication_tcp_protocol_pending_commands",
"",
["name", "conn_id"],
lambda: {
(p.name, p.conn_id): len(p.pending_commands)
for p in connected_connections
})
(p.name, p.conn_id): len(p.pending_commands) for p in connected_connections
},
)
def transport_buffer_size(protocol):
@ -579,11 +581,13 @@ def transport_buffer_size(protocol):
transport_send_buffer = LaterGauge(
"synapse_replication_tcp_transport_send_buffer", "", ["name", "conn_id"],
"synapse_replication_tcp_protocol_transport_send_buffer",
"",
["name", "conn_id"],
lambda: {
(p.name, p.conn_id): transport_buffer_size(p)
for p in connected_connections
})
(p.name, p.conn_id): transport_buffer_size(p) for p in connected_connections
},
)
def transport_kernel_read_buffer_size(protocol, read=True):
@ -602,37 +606,50 @@ def transport_kernel_read_buffer_size(protocol, read=True):
tcp_transport_kernel_send_buffer = LaterGauge(
"synapse_replication_tcp_transport_kernel_send_buffer", "", ["name", "conn_id"],
"synapse_replication_tcp_protocol_transport_kernel_send_buffer",
"",
["name", "conn_id"],
lambda: {
(p.name, p.conn_id): transport_kernel_read_buffer_size(p, False)
for p in connected_connections
})
},
)
tcp_transport_kernel_read_buffer = LaterGauge(
"synapse_replication_tcp_transport_kernel_read_buffer", "", ["name", "conn_id"],
"synapse_replication_tcp_protocol_transport_kernel_read_buffer",
"",
["name", "conn_id"],
lambda: {
(p.name, p.conn_id): transport_kernel_read_buffer_size(p, True)
for p in connected_connections
})
},
)
tcp_inbound_commands = LaterGauge(
"synapse_replication_tcp_inbound_commands", "", ["command", "name", "conn_id"],
"synapse_replication_tcp_protocol_inbound_commands",
"",
["command", "name", "conn_id"],
lambda: {
(k[0], p.name, p.conn_id): count
for p in connected_connections
for k, count in iteritems(p.inbound_commands_counter)
})
},
)
tcp_outbound_commands = LaterGauge(
"synapse_replication_tcp_outbound_commands", "", ["command", "name", "conn_id"],
"synapse_replication_tcp_protocol_outbound_commands",
"",
["command", "name", "conn_id"],
lambda: {
(k[0], p.name, p.conn_id): count
for p in connected_connections
for k, count in iteritems(p.outbound_commands_counter)
})
},
)
# number of updates received for each RDATA stream
inbound_rdata_count = Counter("synapse_replication_tcp_inbound_rdata_count", "",
["stream_name"])
inbound_rdata_count = Counter(
"synapse_replication_tcp_protocol_inbound_rdata_count", "", ["stream_name"]
)

View file

@ -15,7 +15,7 @@
"""The server side of the replication stream.
"""
from twisted.internet import defer, reactor
from twisted.internet import defer
from twisted.internet.protocol import Factory
from .streams import STREAMS_MAP, FederationStream
@ -109,7 +109,7 @@ class ReplicationStreamer(object):
self.is_looping = False
self.pending_updates = False
reactor.addSystemEventTrigger("before", "shutdown", self.on_shutdown)
hs.get_reactor().addSystemEventTrigger("before", "shutdown", self.on_shutdown)
def on_shutdown(self):
# close all connections on shutdown

View file

@ -16,6 +16,8 @@
from twisted.internet import defer
from six.moves import http_client
from synapse.api.constants import Membership
from synapse.api.errors import AuthError, SynapseError, Codes, NotFoundError
from synapse.types import UserID, create_requester
@ -247,6 +249,15 @@ class DeactivateAccountRestServlet(ClientV1RestServlet):
@defer.inlineCallbacks
def on_POST(self, request, target_user_id):
body = parse_json_object_from_request(request, allow_empty_body=True)
erase = body.get("erase", False)
if not isinstance(erase, bool):
raise SynapseError(
http_client.BAD_REQUEST,
"Param 'erase' must be a boolean, if given",
Codes.BAD_JSON,
)
UserID.from_string(target_user_id)
requester = yield self.auth.get_user_by_req(request)
is_admin = yield self.auth.is_server_admin(requester.user)
@ -254,7 +265,9 @@ class DeactivateAccountRestServlet(ClientV1RestServlet):
if not is_admin:
raise AuthError(403, "You are not a server admin")
yield self._deactivate_account_handler.deactivate_account(target_user_id)
yield self._deactivate_account_handler.deactivate_account(
target_user_id, erase,
)
defer.returnValue((200, {}))

View file

@ -23,7 +23,8 @@ from synapse.util.msisdn import phone_number_to_msisdn
from .base import ClientV1RestServlet, client_path_patterns
import simplejson as json
from canonicaljson import json
import urllib
from six.moves.urllib import parse as urlparse

View file

@ -24,8 +24,6 @@ import synapse.util.stringutils as stringutils
from synapse.http.servlet import parse_json_object_from_request
from synapse.types import create_requester
from synapse.util.async import run_on_reactor
from hashlib import sha1
import hmac
import logging
@ -272,7 +270,6 @@ class RegisterRestServlet(ClientV1RestServlet):
@defer.inlineCallbacks
def _do_password(self, request, register_json, session):
yield run_on_reactor()
if (self.hs.config.enable_registration_captcha and
not session[LoginType.RECAPTCHA]):
# captcha should've been done by this stage!
@ -333,8 +330,6 @@ class RegisterRestServlet(ClientV1RestServlet):
@defer.inlineCallbacks
def _do_shared_secret(self, request, register_json, session):
yield run_on_reactor()
if not isinstance(register_json.get("mac", None), string_types):
raise SynapseError(400, "Expected mac.")
if not isinstance(register_json.get("user", None), string_types):
@ -423,8 +418,6 @@ class CreateUserRestServlet(ClientV1RestServlet):
@defer.inlineCallbacks
def _do_create(self, requester, user_json):
yield run_on_reactor()
if "localpart" not in user_json:
raise SynapseError(400, "Expected 'localpart' key.")

View file

@ -31,7 +31,7 @@ from synapse.http.servlet import (
from six.moves.urllib import parse as urlparse
import logging
import simplejson as json
from canonicaljson import json
logger = logging.getLogger(__name__)

View file

@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
# Copyright 2017 Vector Creations Ltd
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -15,6 +16,7 @@
# limitations under the License.
import logging
from six.moves import http_client
from twisted.internet import defer
from synapse.api.auth import has_access_token
@ -24,7 +26,6 @@ from synapse.http.servlet import (
RestServlet, assert_params_in_request,
parse_json_object_from_request,
)
from synapse.util.async import run_on_reactor
from synapse.util.msisdn import phone_number_to_msisdn
from synapse.util.threepids import check_3pid_allowed
from ._base import client_v2_patterns, interactive_auth_handler
@ -187,13 +188,20 @@ class DeactivateAccountRestServlet(RestServlet):
@defer.inlineCallbacks
def on_POST(self, request):
body = parse_json_object_from_request(request)
erase = body.get("erase", False)
if not isinstance(erase, bool):
raise SynapseError(
http_client.BAD_REQUEST,
"Param 'erase' must be a boolean, if given",
Codes.BAD_JSON,
)
requester = yield self.auth.get_user_by_req(request)
# allow ASes to dectivate their own users
if requester.app_service:
yield self._deactivate_account_handler.deactivate_account(
requester.user.to_string()
requester.user.to_string(), erase,
)
defer.returnValue((200, {}))
@ -201,7 +209,7 @@ class DeactivateAccountRestServlet(RestServlet):
requester, body, self.hs.get_ip_from_request(request),
)
yield self._deactivate_account_handler.deactivate_account(
requester.user.to_string(),
requester.user.to_string(), erase,
)
defer.returnValue((200, {}))
@ -300,8 +308,6 @@ class ThreepidRestServlet(RestServlet):
@defer.inlineCallbacks
def on_GET(self, request):
yield run_on_reactor()
requester = yield self.auth.get_user_by_req(request)
threepids = yield self.datastore.user_get_threepids(
@ -312,8 +318,6 @@ class ThreepidRestServlet(RestServlet):
@defer.inlineCallbacks
def on_POST(self, request):
yield run_on_reactor()
body = parse_json_object_from_request(request)
threePidCreds = body.get('threePidCreds')
@ -365,8 +369,6 @@ class ThreepidDeleteRestServlet(RestServlet):
@defer.inlineCallbacks
def on_POST(self, request):
yield run_on_reactor()
body = parse_json_object_from_request(request)
required = ['medium', 'address']
@ -381,9 +383,16 @@ class ThreepidDeleteRestServlet(RestServlet):
requester = yield self.auth.get_user_by_req(request)
user_id = requester.user.to_string()
yield self.auth_handler.delete_threepid(
user_id, body['medium'], body['address']
)
try:
yield self.auth_handler.delete_threepid(
user_id, body['medium'], body['address']
)
except Exception:
# NB. This endpoint should succeed if there is nothing to
# delete, so it should only throw if something is wrong
# that we ought to care about.
logger.exception("Failed to remove threepid")
raise SynapseError(500, "Failed to remove threepid")
defer.returnValue((200, {}))

View file

@ -32,7 +32,6 @@ from ._base import client_v2_patterns, interactive_auth_handler
import logging
import hmac
from hashlib import sha1
from synapse.util.async import run_on_reactor
from synapse.util.ratelimitutils import FederationRateLimiter
from six import string_types
@ -191,8 +190,6 @@ class RegisterRestServlet(RestServlet):
@interactive_auth_handler
@defer.inlineCallbacks
def on_POST(self, request):
yield run_on_reactor()
body = parse_json_object_from_request(request)
kind = "user"

View file

@ -33,7 +33,7 @@ from ._base import set_timeline_upper_limit
import itertools
import logging
import simplejson as json
from canonicaljson import json
logger = logging.getLogger(__name__)

View file

@ -22,8 +22,9 @@ from synapse.api.errors import (
from twisted.protocols.basic import FileSender
from twisted.web import server, resource
from canonicaljson import json
import base64
import simplejson as json
import logging
import os
import re

View file

@ -58,6 +58,7 @@ UPDATE_RECENTLY_ACCESSED_TS = 60 * 1000
class MediaRepository(object):
def __init__(self, hs):
self.hs = hs
self.auth = hs.get_auth()
self.client = MatrixFederationHttpClient(hs)
self.clock = hs.get_clock()
@ -94,7 +95,7 @@ class MediaRepository(object):
storage_providers.append(provider)
self.media_storage = MediaStorage(
self.primary_base_path, self.filepaths, storage_providers,
self.hs, self.primary_base_path, self.filepaths, storage_providers,
)
self.clock.looping_call(

View file

@ -37,13 +37,15 @@ class MediaStorage(object):
"""Responsible for storing/fetching files from local sources.
Args:
hs (synapse.server.Homeserver)
local_media_directory (str): Base path where we store media on disk
filepaths (MediaFilePaths)
storage_providers ([StorageProvider]): List of StorageProvider that are
used to fetch and store files.
"""
def __init__(self, local_media_directory, filepaths, storage_providers):
def __init__(self, hs, local_media_directory, filepaths, storage_providers):
self.hs = hs
self.local_media_directory = local_media_directory
self.filepaths = filepaths
self.storage_providers = storage_providers
@ -175,7 +177,8 @@ class MediaStorage(object):
res = yield provider.fetch(path, file_info)
if res:
with res:
consumer = BackgroundFileConsumer(open(local_path, "w"))
consumer = BackgroundFileConsumer(
open(local_path, "w"), self.hs.get_reactor())
yield res.write_to_consumer(consumer)
yield consumer.wait()
defer.returnValue(local_path)

View file

@ -23,7 +23,8 @@ import re
import shutil
import sys
import traceback
import simplejson as json
from canonicaljson import json
from six.moves import urllib_parse as urlparse
from six import string_types

View file

@ -40,7 +40,7 @@ from synapse.federation.transport.client import TransportLayerClient
from synapse.federation.transaction_queue import TransactionQueue
from synapse.handlers import Handlers
from synapse.handlers.appservice import ApplicationServicesHandler
from synapse.handlers.auth import AuthHandler, MacaroonGeneartor
from synapse.handlers.auth import AuthHandler, MacaroonGenerator
from synapse.handlers.deactivate_account import DeactivateAccountHandler
from synapse.handlers.devicemessage import DeviceMessageHandler
from synapse.handlers.device import DeviceHandler
@ -165,15 +165,19 @@ class HomeServer(object):
'server_notices_sender',
]
def __init__(self, hostname, **kwargs):
def __init__(self, hostname, reactor=None, **kwargs):
"""
Args:
hostname : The hostname for the server.
"""
if not reactor:
from twisted.internet import reactor
self._reactor = reactor
self.hostname = hostname
self._building = {}
self.clock = Clock()
self.clock = Clock(reactor)
self.distributor = Distributor()
self.ratelimiter = Ratelimiter()
@ -186,6 +190,12 @@ class HomeServer(object):
self.datastore = DataStore(self.get_db_conn(), self)
logger.info("Finished setting up.")
def get_reactor(self):
"""
Fetch the Twisted reactor in use by this HomeServer.
"""
return self._reactor
def get_ip_from_request(self, request):
# X-Forwarded-For is handled by our custom request type.
return request.getClientIP()
@ -261,7 +271,7 @@ class HomeServer(object):
return AuthHandler(self)
def build_macaroon_generator(self):
return MacaroonGeneartor(self)
return MacaroonGenerator(self)
def build_device_handler(self):
return DeviceHandler(self)
@ -328,6 +338,7 @@ class HomeServer(object):
return adbapi.ConnectionPool(
name,
cp_reactor=self.get_reactor(),
**self.db_config.get("args", {})
)

View file

@ -694,10 +694,10 @@ def _create_auth_events_from_maps(unconflicted_state, conflicted_state, state_ma
return auth_events
def _resolve_with_state(unconflicted_state_ids, conflicted_state_ds, auth_event_ids,
def _resolve_with_state(unconflicted_state_ids, conflicted_state_ids, auth_event_ids,
state_map):
conflicted_state = {}
for key, event_ids in iteritems(conflicted_state_ds):
for key, event_ids in iteritems(conflicted_state_ids):
events = [state_map[ev_id] for ev_id in event_ids if ev_id in state_map]
if len(events) > 1:
conflicted_state[key] = events

View file

@ -20,6 +20,7 @@ import time
import logging
from synapse.storage.devices import DeviceStore
from synapse.storage.user_erasure_store import UserErasureStore
from .appservice import (
ApplicationServiceStore, ApplicationServiceTransactionStore
)
@ -88,6 +89,7 @@ class DataStore(RoomMemberStore, RoomStore,
DeviceInboxStore,
UserDirectoryStore,
GroupServerStore,
UserErasureStore,
):
def __init__(self, db_conn, hs):

View file

@ -20,10 +20,11 @@ from synapse.storage._base import SQLBaseStore
from synapse.storage.util.id_generators import StreamIdGenerator
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.caches.descriptors import cached, cachedList, cachedInlineCallbacks
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
from canonicaljson import json
import abc
import simplejson as json
import logging
logger = logging.getLogger(__name__)
@ -114,25 +115,6 @@ class AccountDataWorkerStore(SQLBaseStore):
else:
defer.returnValue(None)
@cachedList(cached_method_name="get_global_account_data_by_type_for_user",
num_args=2, list_name="user_ids", inlineCallbacks=True)
def get_global_account_data_by_type_for_users(self, data_type, user_ids):
rows = yield self._simple_select_many_batch(
table="account_data",
column="user_id",
iterable=user_ids,
keyvalues={
"account_data_type": data_type,
},
retcols=("user_id", "content",),
desc="get_global_account_data_by_type_for_users",
)
defer.returnValue({
row["user_id"]: json.loads(row["content"]) if row["content"] else None
for row in rows
})
@cached(num_args=2)
def get_account_data_for_room(self, user_id, room_id):
"""Get all the client account_data for a user for a room.

View file

@ -15,8 +15,8 @@
# limitations under the License.
import logging
import re
import simplejson as json
from twisted.internet import defer
from canonicaljson import json
from synapse.appservice import AppServiceTransaction
from synapse.config.appservice import load_appservices

Some files were not shown because too many files have changed in this diff Show more