Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes

This commit is contained in:
Erik Johnston 2019-03-21 14:10:31 +00:00
commit b0bf1ea7bd
52 changed files with 847 additions and 94 deletions

View file

@ -90,6 +90,17 @@ steps:
image: "python:3.7"
propagate-environment: true
- command:
- "python -m pip install tox"
- "tox -e py27-old,codecov"
label: ":python: 2.7 / SQLite / Old Deps"
env:
TRIAL_FLAGS: "-j 2"
plugins:
- docker#v3.0.1:
image: "python:2.7"
propagate-environment: true
- label: ":python: 2.7 / :postgres: 9.4"
env:
TRIAL_FLAGS: "-j 4"

1
changelog.d/4870.misc Normal file
View file

@ -0,0 +1 @@
Update Apache setup to remove location syntax. Thanks to @cwmke!

1
changelog.d/4879.misc Normal file
View file

@ -0,0 +1 @@
Reinstate test case that runs unit tests against oldest supported dependencies.

2
changelog.d/4888.bugfix Normal file
View file

@ -0,0 +1,2 @@
Fix a bug where hs_disabled_message was sometimes not correctly enforced.

1
changelog.d/4889.misc Normal file
View file

@ -0,0 +1 @@
Use a regular HomeServerConfig object for unit tests rater than a Mock.

1
changelog.d/4890.feature Normal file
View file

@ -0,0 +1 @@
Batch up outgoing read-receipts to reduce federation traffic.

1
changelog.d/4895.feature Normal file
View file

@ -0,0 +1 @@
Add option to disable searching the user directory.

1
changelog.d/4895.misc Normal file
View file

@ -0,0 +1 @@
Add some notes about tuning postgres for larger deployments.

1
changelog.d/4896.feature Normal file
View file

@ -0,0 +1 @@
Add option to disable searching of local and remote public room lists.

1
changelog.d/4900.feature Normal file
View file

@ -0,0 +1 @@
The user directory has been rewritten to make it faster, with less chance of falling behind on a large server.

1
changelog.d/4902.misc Normal file
View file

@ -0,0 +1 @@
Add a config option for torture-testing worker replication.

1
changelog.d/4904.bugfix Normal file
View file

@ -0,0 +1 @@
Fix bug in shutdown room admin API where it would fail if a user in the room hadn't consented to the privacy policy.

1
changelog.d/4905.misc Normal file
View file

@ -0,0 +1 @@
Log requests which are simulated by the unit tests.

1
changelog.d/4908.bugfix Normal file
View file

@ -0,0 +1 @@
Fix bug where blocked world-readable rooms were still peekable.

View file

@ -49,6 +49,24 @@ As with Debian/Ubuntu, postgres support depends on the postgres python connector
export PATH=/usr/pgsql-9.4/bin/:$PATH
pip install psycopg2
Tuning Postgres
===============
The default settings should be fine for most deployments. For larger scale
deployments tuning some of the settings is recommended, details of which can be
found at https://wiki.postgresql.org/wiki/Tuning_Your_PostgreSQL_Server.
In particular, we've found tuning the following values helpful for performance:
- ``shared_buffers``
- ``effective_cache_size``
- ``work_mem``
- ``maintenance_work_mem``
- ``autovacuum_work_mem``
Note that the appropriate values for those fields depend on the amount of free
memory the database host has available.
Synapse config
==============
@ -129,8 +147,8 @@ Once that has completed, change the synapse config to point at the PostgreSQL
database configuration file ``homeserver-postgres.yaml``::
./synctl stop
mv homeserver.yaml homeserver-old-sqlite.yaml
mv homeserver-postgres.yaml homeserver.yaml
mv homeserver.yaml homeserver-old-sqlite.yaml
mv homeserver-postgres.yaml homeserver.yaml
./synctl start
Synapse should now be running against PostgreSQL.

View file

@ -69,20 +69,16 @@ Let's assume that we expect clients to connect to our server at
SSLEngine on
ServerName matrix.example.com;
<Location /_matrix>
ProxyPass http://127.0.0.1:8008/_matrix nocanon
ProxyPassReverse http://127.0.0.1:8008/_matrix
</Location>
ProxyPass /_matrix http://127.0.0.1:8008/_matrix nocanon
ProxyPassReverse /_matrix http://127.0.0.1:8008/_matrix
</VirtualHost>
<VirtualHost *:8448>
SSLEngine on
ServerName example.com;
<Location /_matrix>
ProxyPass http://127.0.0.1:8008/_matrix nocanon
ProxyPassReverse http://127.0.0.1:8008/_matrix
</Location>
ProxyPass /_matrix http://127.0.0.1:8008/_matrix nocanon
ProxyPassReverse /_matrix http://127.0.0.1:8008/_matrix
</VirtualHost>
* HAProxy::

View file

@ -438,6 +438,14 @@ log_config: "CONFDIR/SERVERNAME.log.config"
#
#federation_rc_concurrent: 3
# Target outgoing federation transaction frequency for sending read-receipts,
# per-room.
#
# If we end up trying to send out more read-receipts, they will get buffered up
# into fewer transactions.
#
#federation_rr_transactions_per_room_per_second: 50
# Directory where uploaded images and attachments are stored.
@ -954,6 +962,10 @@ password_config:
# User Directory configuration
#
# 'enabled' defines whether users can search the user directory. If
# false then empty responses are returned to all queries. Defaults to
# true.
#
# 'search_all_users' defines whether to search all users visible to your HS
# when searching the user directory, rather than limiting to users visible
# in public rooms. Defaults to false. If you set it True, you'll have to run
@ -961,6 +973,7 @@ password_config:
# on your database to tell it to rebuild the user_directory search indexes.
#
#user_directory:
# enabled: true
# search_all_users: false
@ -1036,6 +1049,12 @@ password_config:
# Uncomment to disable searching the public room list. When disabled
# blocks searching local and remote room lists for local and remote
# users by always returning an empty list for all queries.
#
#enable_room_list_search: false
# The `alias_creation` option controls who's allowed to create aliases
# on this server.
#

View file

@ -788,9 +788,11 @@ class Auth(object):
# Never fail an auth check for the server notices users or support user
# This can be a problem where event creation is prohibited due to blocking
is_support = yield self.store.is_support_user(user_id)
if user_id == self.hs.config.server_notices_mxid or is_support:
return
if user_id is not None:
if user_id == self.hs.config.server_notices_mxid:
return
if (yield self.store.is_support_user(user_id)):
return
if self.hs.config.hs_disabled:
raise ResourceLimitError(

View file

@ -405,7 +405,10 @@ class Config(object):
self.invoke_all("generate_files", config)
return
self.invoke_all("read_config", config)
self.parse_config_dict(config)
def parse_config_dict(self, config_dict):
self.invoke_all("read_config", config_dict)
def find_config_files(search_paths):

View file

@ -38,7 +38,12 @@ logger = logging.getLogger(__name__)
class KeyConfig(Config):
def read_config(self, config):
self.signing_key = self.read_signing_key(config["signing_key_path"])
# the signing key can be specified inline or in a separate file
if "signing_key" in config:
self.signing_key = read_signing_keys([config["signing_key"]])
else:
self.signing_key = self.read_signing_key(config["signing_key_path"])
self.old_signing_keys = self.read_old_signing_keys(
config.get("old_signing_keys", {})
)

View file

@ -42,6 +42,10 @@ class RatelimitConfig(Config):
self.federation_rc_reject_limit = config.get("federation_rc_reject_limit", 50)
self.federation_rc_concurrent = config.get("federation_rc_concurrent", 3)
self.federation_rr_transactions_per_room_per_second = config.get(
"federation_rr_transactions_per_room_per_second", 50,
)
def default_config(self, **kwargs):
return """\
## Ratelimiting ##
@ -111,4 +115,12 @@ class RatelimitConfig(Config):
# single server
#
#federation_rc_concurrent: 3
# Target outgoing federation transaction frequency for sending read-receipts,
# per-room.
#
# If we end up trying to send out more read-receipts, they will get buffered up
# into fewer transactions.
#
#federation_rr_transactions_per_room_per_second: 50
"""

View file

@ -20,6 +20,10 @@ from ._base import Config, ConfigError
class RoomDirectoryConfig(Config):
def read_config(self, config):
self.enable_room_list_search = config.get(
"enable_room_list_search", True,
)
alias_creation_rules = config.get("alias_creation_rules")
if alias_creation_rules is not None:
@ -54,6 +58,12 @@ class RoomDirectoryConfig(Config):
def default_config(self, config_dir_path, server_name, **kwargs):
return """
# Uncomment to disable searching the public room list. When disabled
# blocks searching local and remote room lists for local and remote
# users by always returning an empty list for all queries.
#
#enable_room_list_search: false
# The `alias_creation` option controls who's allowed to create aliases
# on this server.
#

View file

@ -126,6 +126,11 @@ class ServerConfig(Config):
self.public_baseurl += '/'
self.start_pushers = config.get("start_pushers", True)
# (undocumented) option for torturing the worker-mode replication a bit,
# for testing. The value defines the number of milliseconds to pause before
# sending out any replication updates.
self.replication_torture_level = config.get("replication_torture_level")
self.listeners = []
for listener in config.get("listeners", []):
if not isinstance(listener.get("port", None), int):

View file

@ -22,9 +22,13 @@ class UserDirectoryConfig(Config):
"""
def read_config(self, config):
self.user_directory_search_enabled = True
self.user_directory_search_all_users = False
user_directory_config = config.get("user_directory", None)
if user_directory_config:
self.user_directory_search_enabled = (
user_directory_config.get("enabled", True)
)
self.user_directory_search_all_users = (
user_directory_config.get("search_all_users", False)
)
@ -33,6 +37,10 @@ class UserDirectoryConfig(Config):
return """
# User Directory configuration
#
# 'enabled' defines whether users can search the user directory. If
# false then empty responses are returned to all queries. Defaults to
# true.
#
# 'search_all_users' defines whether to search all users visible to your HS
# when searching the user directory, rather than limiting to users visible
# in public rooms. Defaults to false. If you set it True, you'll have to run
@ -40,5 +48,6 @@ class UserDirectoryConfig(Config):
# on your database to tell it to rebuild the user_directory search indexes.
#
#user_directory:
# enabled: true
# search_all_users: false
"""

View file

@ -104,7 +104,26 @@ class FederationSender(object):
self._processing_pending_presence = False
# map from room_id to a set of PerDestinationQueues which we believe are
# awaiting a call to flush_read_receipts_for_room. The presence of an entry
# here for a given room means that we are rate-limiting RR flushes to that room,
# and that there is a pending call to _flush_rrs_for_room in the system.
self._queues_awaiting_rr_flush_by_room = {
} # type: dict[str, set[PerDestinationQueue]]
self._rr_txn_interval_per_room_ms = (
1000.0 / hs.get_config().federation_rr_transactions_per_room_per_second
)
def _get_per_destination_queue(self, destination):
"""Get or create a PerDestinationQueue for the given destination
Args:
destination (str): server_name of remote server
Returns:
PerDestinationQueue
"""
queue = self._per_destination_queues.get(destination)
if not queue:
queue = PerDestinationQueue(self.hs, self._transaction_manager, destination)
@ -250,33 +269,91 @@ class FederationSender(object):
Args:
receipt (synapse.types.ReadReceipt): receipt to be sent
"""
# Some background on the rate-limiting going on here.
#
# It turns out that if we attempt to send out RRs as soon as we get them from
# a client, then we end up trying to do several hundred Hz of federation
# transactions. (The number of transactions scales as O(N^2) on the size of a
# room, since in a large room we have both more RRs coming in, and more servers
# to send them to.)
#
# This leads to a lot of CPU load, and we end up getting behind. The solution
# currently adopted is as follows:
#
# The first receipt in a given room is sent out immediately, at time T0. Any
# further receipts are, in theory, batched up for N seconds, where N is calculated
# based on the number of servers in the room to achieve a transaction frequency
# of around 50Hz. So, for example, if there were 100 servers in the room, then
# N would be 100 / 50Hz = 2 seconds.
#
# Then, after T+N, we flush out any receipts that have accumulated, and restart
# the timer to flush out more receipts at T+2N, etc. If no receipts accumulate,
# we stop the cycle and go back to the start.
#
# However, in practice, it is often possible to flush out receipts earlier: in
# particular, if we are sending a transaction to a given server anyway (for
# example, because we have a PDU or a RR in another room to send), then we may
# as well send out all of the pending RRs for that server. So it may be that
# by the time we get to T+N, we don't actually have any RRs left to send out.
# Nevertheless we continue to buffer up RRs for the room in question until we
# reach the point that no RRs arrive between timer ticks.
#
# For even more background, see https://github.com/matrix-org/synapse/issues/4730.
room_id = receipt.room_id
# Work out which remote servers should be poked and poke them.
domains = yield self.state.get_current_hosts_in_room(receipt.room_id)
domains = yield self.state.get_current_hosts_in_room(room_id)
domains = [d for d in domains if d != self.server_name]
if not domains:
return
logger.debug("Sending receipt to: %r", domains)
queues_pending_flush = self._queues_awaiting_rr_flush_by_room.get(
room_id
)
content = {
receipt.room_id: {
receipt.receipt_type: {
receipt.user_id: {
"event_ids": receipt.event_ids,
"data": receipt.data,
},
},
},
}
key = (receipt.room_id, receipt.receipt_type, receipt.user_id)
# if there is no flush yet scheduled, we will send out these receipts with
# immediate flushes, and schedule the next flush for this room.
if queues_pending_flush is not None:
logger.debug("Queuing receipt for: %r", domains)
else:
logger.debug("Sending receipt to: %r", domains)
self._schedule_rr_flush_for_room(room_id, len(domains))
for domain in domains:
self.build_and_send_edu(
destination=domain,
edu_type="m.receipt",
content=content,
key=key,
)
queue = self._get_per_destination_queue(domain)
queue.queue_read_receipt(receipt)
# if there is already a RR flush pending for this room, then make sure this
# destination is registered for the flush
if queues_pending_flush is not None:
queues_pending_flush.add(queue)
else:
queue.flush_read_receipts_for_room(room_id)
def _schedule_rr_flush_for_room(self, room_id, n_domains):
# that is going to cause approximately len(domains) transactions, so now back
# off for that multiplied by RR_TXN_INTERVAL_PER_ROOM
backoff_ms = self._rr_txn_interval_per_room_ms * n_domains
logger.debug("Scheduling RR flush in %s in %d ms", room_id, backoff_ms)
self.clock.call_later(backoff_ms, self._flush_rrs_for_room, room_id)
self._queues_awaiting_rr_flush_by_room[room_id] = set()
def _flush_rrs_for_room(self, room_id):
queues = self._queues_awaiting_rr_flush_by_room.pop(room_id)
logger.debug("Flushing RRs in %s to %s", room_id, queues)
if not queues:
# no more RRs arrived for this room; we are done.
return
# schedule the next flush
self._schedule_rr_flush_for_room(room_id, len(queues))
for queue in queues:
queue.flush_read_receipts_for_room(room_id)
@logcontext.preserve_fn # the caller should not yield on this
@defer.inlineCallbacks

View file

@ -80,6 +80,10 @@ class PerDestinationQueue(object):
# destination
self._pending_presence = {} # type: dict[str, UserPresenceState]
# room_id -> receipt_type -> user_id -> receipt_dict
self._pending_rrs = {}
self._rrs_pending_flush = False
# stream_id of last successfully sent to-device message.
# NB: may be a long or an int.
self._last_device_stream_id = 0
@ -87,6 +91,9 @@ class PerDestinationQueue(object):
# stream_id of last successfully sent device list update.
self._last_device_list_stream_id = 0
def __str__(self):
return "PerDestinationQueue[%s]" % self._destination
def pending_pdu_count(self):
return len(self._pending_pdus)
@ -118,6 +125,30 @@ class PerDestinationQueue(object):
})
self.attempt_new_transaction()
def queue_read_receipt(self, receipt):
"""Add a RR to the list to be sent. Doesn't start the transmission loop yet
(see flush_read_receipts_for_room)
Args:
receipt (synapse.api.receipt_info.ReceiptInfo): receipt to be queued
"""
self._pending_rrs.setdefault(
receipt.room_id, {},
).setdefault(
receipt.receipt_type, {}
)[receipt.user_id] = {
"event_ids": receipt.event_ids,
"data": receipt.data,
}
def flush_read_receipts_for_room(self, room_id):
# if we don't have any read-receipts for this room, it may be that we've already
# sent them out, so we don't need to flush.
if room_id not in self._pending_rrs:
return
self._rrs_pending_flush = True
self.attempt_new_transaction()
def send_keyed_edu(self, edu, key):
self._pending_edus_keyed[(edu.edu_type, key)] = edu
self.attempt_new_transaction()
@ -183,10 +214,12 @@ class PerDestinationQueue(object):
# We can only include at most 50 PDUs per transactions
pending_pdus, self._pending_pdus = pending_pdus[:50], pending_pdus[50:]
pending_edus = self._pending_edus
pending_edus = []
pending_edus.extend(self._get_rr_edus(force_flush=False))
# We can only include at most 100 EDUs per transactions
pending_edus, self._pending_edus = pending_edus[:100], pending_edus[100:]
pending_edus.extend(self._pop_pending_edus(100 - len(pending_edus)))
pending_edus.extend(
self._pending_edus_keyed.values()
@ -224,6 +257,11 @@ class PerDestinationQueue(object):
self._last_device_stream_id = device_stream_id
return
# if we've decided to send a transaction anyway, and we have room, we
# may as well send any pending RRs
if len(pending_edus) < 100:
pending_edus.extend(self._get_rr_edus(force_flush=True))
# END CRITICAL SECTION
success = yield self._transaction_manager.send_new_transaction(
@ -285,6 +323,28 @@ class PerDestinationQueue(object):
# We want to be *very* sure we clear this after we stop processing
self.transmission_loop_running = False
def _get_rr_edus(self, force_flush):
if not self._pending_rrs:
return
if not force_flush and not self._rrs_pending_flush:
# not yet time for this lot
return
edu = Edu(
origin=self._server_name,
destination=self._destination,
edu_type="m.receipt",
content=self._pending_rrs,
)
self._pending_rrs = {}
self._rrs_pending_flush = False
yield edu
def _pop_pending_edus(self, limit):
pending_edus = self._pending_edus
pending_edus, self._pending_edus = pending_edus[:limit], pending_edus[limit:]
return pending_edus
@defer.inlineCallbacks
def _get_new_device_messages(self):
last_device_stream_id = self._last_device_stream_id

View file

@ -165,6 +165,7 @@ class BaseHandler(object):
member_event.room_id,
"leave",
ratelimit=False,
require_consent=False,
)
except Exception as e:
logger.exception("Error kicking guest user: %s" % (e,))

View file

@ -164,6 +164,7 @@ class DeactivateAccountHandler(BaseHandler):
room_id,
"leave",
ratelimit=False,
require_consent=False,
)
except Exception:
logger.exception(

View file

@ -44,6 +44,7 @@ class DirectoryHandler(BaseHandler):
self.appservice_handler = hs.get_application_service_handler()
self.event_creation_handler = hs.get_event_creation_handler()
self.config = hs.config
self.enable_room_list_search = hs.config.enable_room_list_search
self.federation = hs.get_federation_client()
hs.get_federation_registry().register_query_handler(
@ -411,6 +412,13 @@ class DirectoryHandler(BaseHandler):
if visibility not in ["public", "private"]:
raise SynapseError(400, "Invalid visibility setting")
if visibility == "public" and not self.enable_room_list_search:
# The room list has been disabled.
raise AuthError(
403,
"This user is not permitted to publish rooms to the room list"
)
room = yield self.store.get_room(room_id)
if room is None:
raise SynapseError(400, "Unknown room")

View file

@ -19,7 +19,7 @@ import random
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError
from synapse.api.errors import AuthError, SynapseError
from synapse.events import EventBase
from synapse.events.utils import serialize_event
from synapse.types import UserID
@ -61,6 +61,11 @@ class EventStreamHandler(BaseHandler):
If `only_keys` is not None, events from keys will be sent down.
"""
if room_id:
blocked = yield self.store.is_room_blocked(room_id)
if blocked:
raise SynapseError(403, "This room has been blocked on this server")
# send any outstanding server notices to the user.
yield self._server_notices_sender.on_user_syncing(auth_user_id)

View file

@ -18,7 +18,7 @@ import logging
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError, Codes
from synapse.api.errors import AuthError, Codes, SynapseError
from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
from synapse.handlers.presence import format_user_presence_state
@ -262,6 +262,10 @@ class InitialSyncHandler(BaseHandler):
A JSON serialisable dict with the snapshot of the room.
"""
blocked = yield self.store.is_room_blocked(room_id)
if blocked:
raise SynapseError(403, "This room has been blocked on this server")
user_id = requester.user.to_string()
membership, member_event_id = yield self._check_in_room_or_world_readable(

View file

@ -255,7 +255,7 @@ class EventCreationHandler(object):
@defer.inlineCallbacks
def create_event(self, requester, event_dict, token_id=None, txn_id=None,
prev_events_and_hashes=None):
prev_events_and_hashes=None, require_consent=True):
"""
Given a dict from a client, create a new event.
@ -276,6 +276,9 @@ class EventCreationHandler(object):
where *hashes* is a map from algorithm to hash.
If None, they will be requested from the database.
require_consent (bool): Whether to check if the requester has
consented to privacy policy.
Raises:
ResourceLimitError if server is blocked to some resource being
exceeded
@ -317,7 +320,7 @@ class EventCreationHandler(object):
)
is_exempt = yield self._is_exempt_from_privacy_policy(builder, requester)
if not is_exempt:
if require_consent and not is_exempt:
yield self.assert_accepted_privacy_policy(requester)
if token_id is not None:

View file

@ -118,7 +118,7 @@ class ReceiptsHandler(BaseHandler):
if not is_new:
return
self.federation.send_read_receipt(receipt)
yield self.federation.send_read_receipt(receipt)
@defer.inlineCallbacks
def get_receipts_for_room(self, room_id, to_key):

View file

@ -44,6 +44,8 @@ EMPTY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None)
class RoomListHandler(BaseHandler):
def __init__(self, hs):
super(RoomListHandler, self).__init__(hs)
self.enable_room_list_search = hs.config.enable_room_list_search
self.response_cache = ResponseCache(
hs, "room_list", timeout_ms=10 * 60 * 1000,
)
@ -69,10 +71,17 @@ class RoomListHandler(BaseHandler):
appservice and network id to use an appservice specific one.
Setting to None returns all public rooms across all lists.
"""
if not self.enable_room_list_search:
return defer.succeed({
"chunk": [],
"total_room_count_estimate": 0,
})
logger.info(
"Getting public room list: limit=%r, since=%r, search=%r, network=%r",
limit, since_token, bool(search_filter), network_tuple,
)
if search_filter:
# We explicitly don't bother caching searches or requests for
# appservice specific lists.
@ -444,6 +453,12 @@ class RoomListHandler(BaseHandler):
def get_remote_public_room_list(self, server_name, limit=None, since_token=None,
search_filter=None, include_all_networks=False,
third_party_instance_id=None,):
if not self.enable_room_list_search:
defer.returnValue({
"chunk": [],
"total_room_count_estimate": 0,
})
if search_filter:
# We currently don't support searching across federation, so we have
# to do it manually without pagination

View file

@ -161,6 +161,7 @@ class RoomMemberHandler(object):
txn_id=None,
ratelimit=True,
content=None,
require_consent=True,
):
user_id = target.to_string()
@ -186,6 +187,7 @@ class RoomMemberHandler(object):
token_id=requester.access_token_id,
txn_id=txn_id,
prev_events_and_hashes=prev_events_and_hashes,
require_consent=require_consent,
)
# Check if this event matches the previous membership event for the user.
@ -306,6 +308,7 @@ class RoomMemberHandler(object):
third_party_signed=None,
ratelimit=True,
content=None,
require_consent=True,
):
key = (room_id,)
@ -339,6 +342,7 @@ class RoomMemberHandler(object):
third_party_signed=third_party_signed,
ratelimit=ratelimit,
content=content,
require_consent=require_consent,
)
defer.returnValue(result)
@ -355,6 +359,7 @@ class RoomMemberHandler(object):
third_party_signed=None,
ratelimit=True,
content=None,
require_consent=True,
):
content_specified = bool(content)
if content is None:
@ -536,6 +541,7 @@ class RoomMemberHandler(object):
ratelimit=ratelimit,
prev_events_and_hashes=prev_events_and_hashes,
content=content,
require_consent=require_consent,
)
defer.returnValue(res)

View file

@ -16,6 +16,7 @@
"""
import logging
import random
from six import itervalues
@ -74,6 +75,8 @@ class ReplicationStreamer(object):
self.notifier = hs.get_notifier()
self._server_notices_sender = hs.get_server_notices_sender()
self._replication_torture_level = hs.config.replication_torture_level
# Current connections.
self.connections = []
@ -157,10 +160,23 @@ class ReplicationStreamer(object):
for stream in self.streams:
stream.advance_current_token()
for stream in self.streams:
all_streams = self.streams
if self._replication_torture_level is not None:
# there is no guarantee about ordering between the streams,
# so let's shuffle them around a bit when we are in torture mode.
all_streams = list(all_streams)
random.shuffle(all_streams)
for stream in all_streams:
if stream.last_token == stream.upto_token:
continue
if self._replication_torture_level:
yield self.clock.sleep(
self._replication_torture_level / 1000.0
)
logger.debug(
"Getting stream: %s: %s -> %s",
stream.NAME, stream.last_token, stream.upto_token

View file

@ -490,40 +490,54 @@ class ShutdownRoomRestServlet(ClientV1RestServlet):
requester_user_id = requester.user.to_string()
logger.info("Shutting down room %r", room_id)
logger.info(
"Shutting down room %r, joining to new room: %r",
room_id, new_room_id,
)
# This will work even if the room is already blocked, but that is
# desirable in case the first attempt at blocking the room failed below.
yield self.store.block_room(room_id, requester_user_id)
users = yield self.state.get_current_user_in_room(room_id)
kicked_users = []
failed_to_kick_users = []
for user_id in users:
if not self.hs.is_mine_id(user_id):
continue
logger.info("Kicking %r from %r...", user_id, room_id)
target_requester = create_requester(user_id)
yield self.room_member_handler.update_membership(
requester=target_requester,
target=target_requester.user,
room_id=room_id,
action=Membership.LEAVE,
content={},
ratelimit=False
)
try:
target_requester = create_requester(user_id)
yield self.room_member_handler.update_membership(
requester=target_requester,
target=target_requester.user,
room_id=room_id,
action=Membership.LEAVE,
content={},
ratelimit=False,
require_consent=False,
)
yield self.room_member_handler.forget(target_requester.user, room_id)
yield self.room_member_handler.forget(target_requester.user, room_id)
yield self.room_member_handler.update_membership(
requester=target_requester,
target=target_requester.user,
room_id=new_room_id,
action=Membership.JOIN,
content={},
ratelimit=False
)
yield self.room_member_handler.update_membership(
requester=target_requester,
target=target_requester.user,
room_id=new_room_id,
action=Membership.JOIN,
content={},
ratelimit=False,
require_consent=False,
)
kicked_users.append(user_id)
kicked_users.append(user_id)
except Exception:
logger.exception(
"Failed to leave old room and join new room for %r", user_id,
)
failed_to_kick_users.append(user_id)
yield self.event_creation_handler.create_and_send_nonmember_event(
room_creator_requester,
@ -544,6 +558,7 @@ class ShutdownRoomRestServlet(ClientV1RestServlet):
defer.returnValue((200, {
"kicked_users": kicked_users,
"failed_to_kick_users": failed_to_kick_users,
"local_aliases": aliases_for_room,
"new_room_id": new_room_id,
}))

View file

@ -59,6 +59,12 @@ class UserDirectorySearchRestServlet(RestServlet):
requester = yield self.auth.get_user_by_req(request, allow_guest=False)
user_id = requester.user.to_string()
if not self.hs.config.user_directory_search_enabled:
defer.returnValue((200, {
"limited": False,
"results": [],
}))
body = parse_json_object_from_request(request)
limit = body.get("limit", 10)

View file

@ -500,10 +500,22 @@ class RoomStore(RoomWorkerStore, SearchStore):
@defer.inlineCallbacks
def block_room(self, room_id, user_id):
yield self._simple_insert(
"""Marks the room as blocked. Can be called multiple times.
Args:
room_id (str): Room to block
user_id (str): Who blocked it
Returns:
Deferred
"""
yield self._simple_upsert(
table="blocked_rooms",
values={
keyvalues={
"room_id": room_id,
},
values={},
insertion_values={
"user_id": user_id,
},
desc="block_room",

View file

@ -32,6 +32,11 @@ TEMP_TABLE = "_temp_populate_user_directory"
class UserDirectoryStore(BackgroundUpdateStore):
# How many records do we calculate before sending it to
# add_users_who_share_private_rooms?
SHARE_PRIVATE_WORKING_SET = 500
def __init__(self, db_conn, hs):
super(UserDirectoryStore, self).__init__(db_conn, hs)
@ -218,6 +223,14 @@ class UserDirectoryStore(BackgroundUpdateStore):
user_set = (user_id, other_user_id)
to_insert.add(user_set)
# If it gets too big, stop and write to the database
# to prevent storing too much in RAM.
if len(to_insert) >= self.SHARE_PRIVATE_WORKING_SET:
yield self.add_users_who_share_private_room(
room_id, to_insert
)
to_insert.clear()
if to_insert:
yield self.add_users_who_share_private_room(room_id, to_insert)
to_insert.clear()

View file

@ -344,6 +344,23 @@ class AuthTestCase(unittest.TestCase):
self.assertEquals(e.exception.errcode, Codes.RESOURCE_LIMIT_EXCEEDED)
self.assertEquals(e.exception.code, 403)
@defer.inlineCallbacks
def test_hs_disabled_no_server_notices_user(self):
"""Check that 'hs_disabled_message' works correctly when there is no
server_notices user.
"""
# this should be the default, but we had a bug where the test was doing the wrong
# thing, so let's make it explicit
self.hs.config.server_notices_mxid = None
self.hs.config.hs_disabled = True
self.hs.config.hs_disabled_message = "Reason for being disabled"
with self.assertRaises(ResourceLimitError) as e:
yield self.auth.check_auth_blocking()
self.assertEquals(e.exception.admin_contact, self.hs.config.admin_contact)
self.assertEquals(e.exception.errcode, Codes.RESOURCE_LIMIT_EXCEEDED)
self.assertEquals(e.exception.code, 403)
@defer.inlineCallbacks
def test_server_notices_mxid_special_cased(self):
self.hs.config.hs_disabled = True

View file

@ -0,0 +1,128 @@
# -*- coding: utf-8 -*-
# Copyright 2019 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from mock import Mock
from twisted.internet import defer
from synapse.types import ReadReceipt
from tests.unittest import HomeserverTestCase
class FederationSenderTestCases(HomeserverTestCase):
def make_homeserver(self, reactor, clock):
return super(FederationSenderTestCases, self).setup_test_homeserver(
state_handler=Mock(spec=["get_current_hosts_in_room"]),
federation_transport_client=Mock(spec=["send_transaction"]),
)
def test_send_receipts(self):
mock_state_handler = self.hs.get_state_handler()
mock_state_handler.get_current_hosts_in_room.return_value = ["test", "host2"]
mock_send_transaction = self.hs.get_federation_transport_client().send_transaction
mock_send_transaction.return_value = defer.succeed({})
sender = self.hs.get_federation_sender()
receipt = ReadReceipt("room_id", "m.read", "user_id", ["event_id"], {"ts": 1234})
self.successResultOf(sender.send_read_receipt(receipt))
self.pump()
# expect a call to send_transaction
mock_send_transaction.assert_called_once()
json_cb = mock_send_transaction.call_args[0][1]
data = json_cb()
self.assertEqual(data['edus'], [
{
'edu_type': 'm.receipt',
'content': {
'room_id': {
'm.read': {
'user_id': {
'event_ids': ['event_id'],
'data': {'ts': 1234},
},
},
},
},
},
])
def test_send_receipts_with_backoff(self):
"""Send two receipts in quick succession; the second should be flushed, but
only after 20ms"""
mock_state_handler = self.hs.get_state_handler()
mock_state_handler.get_current_hosts_in_room.return_value = ["test", "host2"]
mock_send_transaction = self.hs.get_federation_transport_client().send_transaction
mock_send_transaction.return_value = defer.succeed({})
sender = self.hs.get_federation_sender()
receipt = ReadReceipt("room_id", "m.read", "user_id", ["event_id"], {"ts": 1234})
self.successResultOf(sender.send_read_receipt(receipt))
self.pump()
# expect a call to send_transaction
mock_send_transaction.assert_called_once()
json_cb = mock_send_transaction.call_args[0][1]
data = json_cb()
self.assertEqual(data['edus'], [
{
'edu_type': 'm.receipt',
'content': {
'room_id': {
'm.read': {
'user_id': {
'event_ids': ['event_id'],
'data': {'ts': 1234},
},
},
},
},
},
])
mock_send_transaction.reset_mock()
# send the second RR
receipt = ReadReceipt("room_id", "m.read", "user_id", ["other_id"], {"ts": 1234})
self.successResultOf(sender.send_read_receipt(receipt))
self.pump()
mock_send_transaction.assert_not_called()
self.reactor.advance(19)
mock_send_transaction.assert_not_called()
self.reactor.advance(10)
mock_send_transaction.assert_called_once()
json_cb = mock_send_transaction.call_args[0][1]
data = json_cb()
self.assertEqual(data['edus'], [
{
'edu_type': 'm.receipt',
'content': {
'room_id': {
'm.read': {
'user_id': {
'event_ids': ['other_id'],
'data': {'ts': 1234},
},
},
},
},
},
])

View file

@ -111,7 +111,7 @@ class TestCreateAliasACL(unittest.HomeserverTestCase):
servlets = [directory.register_servlets, room.register_servlets]
def prepare(self, hs, reactor, clock):
def prepare(self, reactor, clock, hs):
# We cheekily override the config to add custom alias creation rules
config = {}
config["alias_creation_rules"] = [
@ -151,3 +151,60 @@ class TestCreateAliasACL(unittest.HomeserverTestCase):
)
self.render(request)
self.assertEquals(200, channel.code, channel.result)
class TestRoomListSearchDisabled(unittest.HomeserverTestCase):
user_id = "@test:test"
servlets = [directory.register_servlets, room.register_servlets]
def prepare(self, reactor, clock, hs):
room_id = self.helper.create_room_as(self.user_id)
request, channel = self.make_request(
"PUT",
b"directory/list/room/%s" % (room_id.encode('ascii'),),
b'{}',
)
self.render(request)
self.assertEquals(200, channel.code, channel.result)
self.room_list_handler = hs.get_room_list_handler()
self.directory_handler = hs.get_handlers().directory_handler
return hs
def test_disabling_room_list(self):
self.room_list_handler.enable_room_list_search = True
self.directory_handler.enable_room_list_search = True
# Room list is enabled so we should get some results
request, channel = self.make_request(
"GET",
b"publicRooms",
)
self.render(request)
self.assertEquals(200, channel.code, channel.result)
self.assertTrue(len(channel.json_body["chunk"]) > 0)
self.room_list_handler.enable_room_list_search = False
self.directory_handler.enable_room_list_search = False
# Room list disabled so we should get no results
request, channel = self.make_request(
"GET",
b"publicRooms",
)
self.render(request)
self.assertEquals(200, channel.code, channel.result)
self.assertTrue(len(channel.json_body["chunk"]) == 0)
# Room list disabled so we shouldn't be allowed to publish rooms
room_id = self.helper.create_room_as(self.user_id)
request, channel = self.make_request(
"PUT",
b"directory/list/room/%s" % (room_id.encode('ascii'),),
b'{}',
)
self.render(request)
self.assertEquals(403, channel.code, channel.result)

View file

@ -22,7 +22,7 @@ from synapse.api.errors import ResourceLimitError, SynapseError
from synapse.handlers.register import RegistrationHandler
from synapse.types import RoomAlias, UserID, create_requester
from tests.utils import setup_test_homeserver
from tests.utils import default_config, setup_test_homeserver
from .. import unittest
@ -40,8 +40,16 @@ class RegistrationTestCase(unittest.TestCase):
self.mock_distributor = Mock()
self.mock_distributor.declare("registered_user")
self.mock_captcha_client = Mock()
hs_config = default_config("test")
# some of the tests rely on us having a user consent version
hs_config.user_consent_version = "test_consent_version"
hs_config.max_mau_value = 50
self.hs = yield setup_test_homeserver(
self.addCleanup,
config=hs_config,
expire_access_token=True,
)
self.macaroon_generator = Mock(
@ -50,7 +58,6 @@ class RegistrationTestCase(unittest.TestCase):
self.hs.get_macaroon_generator = Mock(return_value=self.macaroon_generator)
self.handler = self.hs.get_registration_handler()
self.store = self.hs.get_datastore()
self.hs.config.max_mau_value = 50
self.lots_of_users = 100
self.small_number_of_users = 1

View file

@ -16,6 +16,7 @@ from mock import Mock
from synapse.api.constants import UserTypes
from synapse.rest.client.v1 import admin, login, room
from synapse.rest.client.v2_alpha import user_directory
from synapse.storage.roommember import ProfileInfo
from tests import unittest
@ -317,3 +318,54 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase):
u4 = self.register_user("user4", "pass")
s = self.get_success(self.handler.search_users(u1, u4, 10))
self.assertEqual(len(s["results"]), 1)
class TestUserDirSearchDisabled(unittest.HomeserverTestCase):
user_id = "@test:test"
servlets = [
user_directory.register_servlets,
room.register_servlets,
login.register_servlets,
admin.register_servlets,
]
def make_homeserver(self, reactor, clock):
config = self.default_config()
config.update_user_directory = True
hs = self.setup_test_homeserver(config=config)
self.config = hs.config
return hs
def test_disabling_room_list(self):
self.config.user_directory_search_enabled = True
# First we create a room with another user so that user dir is non-empty
# for our user
self.helper.create_room_as(self.user_id)
u2 = self.register_user("user2", "pass")
room = self.helper.create_room_as(self.user_id)
self.helper.join(room, user=u2)
# Assert user directory is not empty
request, channel = self.make_request(
"POST",
b"user_directory/search",
b'{"search_term":"user2"}',
)
self.render(request)
self.assertEquals(200, channel.code, channel.result)
self.assertTrue(len(channel.json_body["results"]) > 0)
# Disable user directory and check search returns nothing
self.config.user_directory_search_enabled = False
request, channel = self.make_request(
"POST",
b"user_directory/search",
b'{"search_term":"user2"}',
)
self.render(request)
self.assertEquals(200, channel.code, channel.result)
self.assertTrue(len(channel.json_body["results"]) == 0)

View file

@ -63,8 +63,10 @@ class EmailPusherTests(HomeserverTestCase):
config.email_smtp_port = 20
config.require_transport_security = False
config.email_smtp_user = None
config.email_smtp_pass = None
config.email_app_name = "Matrix"
config.email_notif_from = "test@example.com"
config.email_riot_base_url = None
hs = self.setup_test_homeserver(config=config, sendmail=sendmail)

View file

@ -20,7 +20,7 @@ import json
from mock import Mock
from synapse.api.constants import UserTypes
from synapse.rest.client.v1 import admin, login
from synapse.rest.client.v1 import admin, events, login, room
from tests import unittest
@ -353,3 +353,140 @@ class UserRegisterTestCase(unittest.HomeserverTestCase):
self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual('Invalid user type', channel.json_body["error"])
class ShutdownRoomTestCase(unittest.HomeserverTestCase):
servlets = [
admin.register_servlets,
login.register_servlets,
events.register_servlets,
room.register_servlets,
room.register_deprecated_servlets,
]
def prepare(self, reactor, clock, hs):
self.event_creation_handler = hs.get_event_creation_handler()
hs.config.user_consent_version = "1"
consent_uri_builder = Mock()
consent_uri_builder.build_user_consent_uri.return_value = (
"http://example.com"
)
self.event_creation_handler._consent_uri_builder = consent_uri_builder
self.store = hs.get_datastore()
self.admin_user = self.register_user("admin", "pass", admin=True)
self.admin_user_tok = self.login("admin", "pass")
self.other_user = self.register_user("user", "pass")
self.other_user_token = self.login("user", "pass")
# Mark the admin user as having consented
self.get_success(
self.store.user_set_consent_version(self.admin_user, "1"),
)
def test_shutdown_room_consent(self):
"""Test that we can shutdown rooms with local users who have not
yet accepted the privacy policy. This used to fail when we tried to
force part the user from the old room.
"""
self.event_creation_handler._block_events_without_consent_error = None
room_id = self.helper.create_room_as(self.other_user, tok=self.other_user_token)
# Assert one user in room
users_in_room = self.get_success(
self.store.get_users_in_room(room_id),
)
self.assertEqual([self.other_user], users_in_room)
# Enable require consent to send events
self.event_creation_handler._block_events_without_consent_error = "Error"
# Assert that the user is getting consent error
self.helper.send(
room_id,
body="foo", tok=self.other_user_token, expect_code=403,
)
# Test that the admin can still send shutdown
url = "admin/shutdown_room/" + room_id
request, channel = self.make_request(
"POST",
url.encode('ascii'),
json.dumps({"new_room_user_id": self.admin_user}),
access_token=self.admin_user_tok,
)
self.render(request)
self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
# Assert there is now no longer anyone in the room
users_in_room = self.get_success(
self.store.get_users_in_room(room_id),
)
self.assertEqual([], users_in_room)
@unittest.DEBUG
def test_shutdown_room_block_peek(self):
"""Test that a world_readable room can no longer be peeked into after
it has been shut down.
"""
self.event_creation_handler._block_events_without_consent_error = None
room_id = self.helper.create_room_as(self.other_user, tok=self.other_user_token)
# Enable world readable
url = "rooms/%s/state/m.room.history_visibility" % (room_id,)
request, channel = self.make_request(
"PUT",
url.encode('ascii'),
json.dumps({"history_visibility": "world_readable"}),
access_token=self.other_user_token,
)
self.render(request)
self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
# Test that the admin can still send shutdown
url = "admin/shutdown_room/" + room_id
request, channel = self.make_request(
"POST",
url.encode('ascii'),
json.dumps({"new_room_user_id": self.admin_user}),
access_token=self.admin_user_tok,
)
self.render(request)
self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
# Assert we can no longer peek into the room
self._assert_peek(room_id, expect_code=403)
def _assert_peek(self, room_id, expect_code):
"""Assert that the admin user can (or cannot) peek into the room.
"""
url = "rooms/%s/initialSync" % (room_id,)
request, channel = self.make_request(
"GET",
url.encode('ascii'),
access_token=self.admin_user_tok,
)
self.render(request)
self.assertEqual(
expect_code, int(channel.result["code"]), msg=channel.result["body"],
)
url = "events?timeout=0&room_id=" + room_id
request, channel = self.make_request(
"GET",
url.encode('ascii'),
access_token=self.admin_user_tok,
)
self.render(request)
self.assertEqual(
expect_code, int(channel.result["code"]), msg=channel.result["body"],
)

View file

@ -20,6 +20,7 @@ class RegisterRestServletTestCase(unittest.HomeserverTestCase):
self.hs.config.registrations_require_3pid = []
self.hs.config.auto_join_rooms = []
self.hs.config.enable_registration_captcha = False
self.hs.config.allow_guest_access = True
return self.hs
@ -28,7 +29,7 @@ class RegisterRestServletTestCase(unittest.HomeserverTestCase):
as_token = "i_am_an_app_service"
appservice = ApplicationService(
as_token, self.hs.config.hostname,
as_token, self.hs.config.server_name,
id="1234",
namespaces={
"users": [{"regex": r"@as_user.*", "exclusive": True}],

View file

@ -119,14 +119,7 @@ class FakeSite:
server_version_string = b"1"
site_tag = "test"
@property
def access_logger(self):
class FakeLogger:
def info(self, *args, **kwargs):
pass
return FakeLogger()
access_logger = logging.getLogger("synapse.access.http.fake")
def make_request(

View file

@ -9,13 +9,16 @@ from synapse.server_notices.resource_limits_server_notices import (
)
from tests import unittest
from tests.utils import setup_test_homeserver
from tests.utils import default_config, setup_test_homeserver
class TestResourceLimitsServerNotices(unittest.TestCase):
@defer.inlineCallbacks
def setUp(self):
self.hs = yield setup_test_homeserver(self.addCleanup)
hs_config = default_config(name="test")
hs_config.server_notices_mxid = "@server:test"
self.hs = yield setup_test_homeserver(self.addCleanup, config=hs_config)
self.server_notices_sender = self.hs.get_server_notices_sender()
# relying on [1] is far from ideal, but the only case where

View file

@ -28,7 +28,7 @@ from twisted.internet import defer, reactor
from synapse.api.constants import EventTypes, RoomVersions
from synapse.api.errors import CodeMessageException, cs_error
from synapse.config.server import ServerConfig
from synapse.config.homeserver import HomeServerConfig
from synapse.federation.transport import server as federation_server
from synapse.http.server import HttpServer
from synapse.server import HomeServer
@ -111,14 +111,25 @@ def default_config(name):
"""
Create a reasonable test config.
"""
config = Mock()
config.signing_key = [MockKey()]
config_dict = {
"server_name": name,
"media_store_path": "media",
"uploads_path": "uploads",
# the test signing key is just an arbitrary ed25519 key to keep the config
# parser happy
"signing_key": "ed25519 a_lPym qvioDNmfExFBRPgdTU+wtFYKq4JfwFRv7sYVgWvmgJg",
}
config = HomeServerConfig()
config.parse_config_dict(config_dict)
# TODO: move this stuff into config_dict or get rid of it
config.event_cache_size = 1
config.enable_registration = True
config.enable_registration_captcha = False
config.macaroon_secret_key = "not even a little secret"
config.expire_access_token = False
config.server_name = name
config.trusted_third_party_id_servers = []
config.room_invite_state_types = []
config.password_providers = []
@ -176,13 +187,6 @@ def default_config(name):
# background, which upsets the test runner.
config.update_user_directory = False
def is_threepid_reserved(threepid):
return ServerConfig.is_threepid_reserved(
config.mau_limits_reserved_threepids, threepid
)
config.is_threepid_reserved.side_effect = is_threepid_reserved
return config
@ -276,7 +280,6 @@ def setup_test_homeserver(
db_config=config.database_config,
version_string="Synapse/tests",
database_engine=db_engine,
room_list_handler=object(),
tls_server_context_factory=Mock(),
tls_client_options_factory=Mock(),
reactor=reactor,
@ -347,7 +350,6 @@ def setup_test_homeserver(
config=config,
version_string="Synapse/tests",
database_engine=db_engine,
room_list_handler=object(),
tls_server_context_factory=Mock(),
tls_client_options_factory=Mock(),
reactor=reactor,

10
tox.ini
View file

@ -82,15 +82,23 @@ deps =
mock
lxml
coverage
commands =
/usr/bin/find "{toxinidir}" -name '*.pyc' -delete
# Make all greater-thans equals so we test the oldest version of our direct
# dependencies, but make the pyopenssl 17.0, which can work against an
# OpenSSL 1.1 compiled cryptography (as older ones don't compile on Travis).
/bin/sh -c 'python -m synapse.python_dependencies | sed -e "s/>=/==/g" -e "s/psycopg2==2.6//" -e "s/pyopenssl==16.0.0/pyopenssl==17.0.0/" | xargs pip install'
# Add this so that coverage will run on subprocesses
/bin/sh -c 'echo "import coverage; coverage.process_startup()" > {envsitepackagesdir}/../sitecustomize.py'
# Install Synapse itself. This won't update any libraries.
pip install -e .
{envbindir}/trial {env:TRIAL_FLAGS:} {posargs:tests} {env:TOXSUFFIX:}
{envbindir}/coverage run "{envbindir}/trial" {env:TRIAL_FLAGS:} {posargs:tests} {env:TOXSUFFIX:}
[testenv:packaging]
skip_install=True