Merge remote-tracking branch 'origin/develop' into matrix-org-hotfixes

This commit is contained in:
Richard van der Hoff 2021-05-12 12:57:55 +01:00
commit b0b8110acd
329 changed files with 954 additions and 222 deletions

View file

@ -1,36 +0,0 @@
#!/usr/bin/env python
# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from synapse.storage.engines import create_engine
logger = logging.getLogger("create_postgres_db")
if __name__ == "__main__":
# Create a PostgresEngine.
db_engine = create_engine({"name": "psycopg2", "args": {}})
# Connect to postgres to create the base database.
# We use "postgres" as a database because it's bound to exist and the "synapse" one
# doesn't exist yet.
db_conn = db_engine.module.connect(
user="postgres", host="postgres", password="postgres", dbname="postgres"
)
db_conn.autocommit = True
cur = db_conn.cursor()
cur.execute("CREATE DATABASE synapse;")
cur.close()
db_conn.close()

View file

@ -0,0 +1,31 @@
#!/usr/bin/env python
# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import psycopg2
# a very simple replacment for `psql`, to make up for the lack of the postgres client
# libraries in the synapse docker image.
# We use "postgres" as a database because it's bound to exist and the "synapse" one
# doesn't exist yet.
db_conn = psycopg2.connect(
user="postgres", host="postgres", password="postgres", dbname="postgres"
)
db_conn.autocommit = True
cur = db_conn.cursor()
for c in sys.argv[1:]:
cur.execute(c)

View file

@ -1,10 +1,10 @@
#!/usr/bin/env bash
#
# Test script for 'synapse_port_db', which creates a virtualenv, installs Synapse along
# with additional dependencies needed for the test (such as coverage or the PostgreSQL
# driver), update the schema of the test SQLite database and run background updates on it,
# create an empty test database in PostgreSQL, then run the 'synapse_port_db' script to
# test porting the SQLite database to the PostgreSQL database (with coverage).
# Test script for 'synapse_port_db'.
# - sets up synapse and deps
# - runs the port script on a prepopulated test sqlite db
# - also runs it against an new sqlite db
set -xe
cd `dirname $0`/../..
@ -22,15 +22,32 @@ echo "--- Generate the signing key"
# Generate the server's signing key.
python -m synapse.app.homeserver --generate-keys -c .buildkite/sqlite-config.yaml
echo "--- Prepare the databases"
echo "--- Prepare test database"
# Make sure the SQLite3 database is using the latest schema and has no pending background update.
scripts-dev/update_database --database-config .buildkite/sqlite-config.yaml
# Create the PostgreSQL database.
./.buildkite/scripts/create_postgres_db.py
./.buildkite/scripts/postgres_exec.py "CREATE DATABASE synapse"
echo "+++ Run synapse_port_db"
# Run the script
echo "+++ Run synapse_port_db against test database"
coverage run scripts/synapse_port_db --sqlite-database .buildkite/test_db.db --postgres-config .buildkite/postgres-config.yaml
#####
# Now do the same again, on an empty database.
echo "--- Prepare empty SQLite database"
# we do this by deleting the sqlite db, and then doing the same again.
rm .buildkite/test_db.db
scripts-dev/update_database --database-config .buildkite/sqlite-config.yaml
# re-create the PostgreSQL database.
./.buildkite/scripts/postgres_exec.py \
"DROP DATABASE synapse" \
"CREATE DATABASE synapse"
echo "+++ Run synapse_port_db against empty database"
coverage run scripts/synapse_port_db --sqlite-database .buildkite/test_db.db --postgres-config .buildkite/postgres-config.yaml

View file

@ -273,7 +273,7 @@ jobs:
python-version: ${{ matrix.python-version }}
- name: Patch Buildkite-specific test scripts
run: |
sed -i -e 's/host="postgres"/host="localhost"/' .buildkite/scripts/create_postgres_db.py
sed -i -e 's/host="postgres"/host="localhost"/' .buildkite/scripts/postgres_exec.py
sed -i -e 's/host: postgres/host: localhost/' .buildkite/postgres-config.yaml
sed -i -e 's|/src/||' .buildkite/{sqlite,postgres}-config.yaml
sed -i -e 's/\$TOP/\$GITHUB_WORKSPACE/' .coveragerc

View file

@ -1,3 +1,19 @@
Synapse 1.33.2 (2021-05-11)
===========================
Due to the security issue highlighted below, server administrators are encouraged to update Synapse. We are not aware of these vulnerabilities being exploited in the wild.
Security advisory
-----------------
This release fixes a denial of service attack ([CVE-2021-29471](https://github.com/matrix-org/synapse/security/advisories/GHSA-x345-32rc-8h85)) against Synapse's push rules implementation. Server admins are encouraged to upgrade.
Internal Changes
----------------
- Unpin attrs dependency. ([\#9946](https://github.com/matrix-org/synapse/issues/9946))
Synapse 1.33.1 (2021-05-06)
===========================

View file

@ -85,6 +85,35 @@ for example:
wget https://packages.matrix.org/debian/pool/main/m/matrix-synapse-py3/matrix-synapse-py3_1.3.0+stretch1_amd64.deb
dpkg -i matrix-synapse-py3_1.3.0+stretch1_amd64.deb
Upgrading to v1.34.0
====================
`room_invite_state_types` configuration setting
-----------------------------------------------
The ``room_invite_state_types`` configuration setting has been deprecated and
replaced with ``room_prejoin_state``. See the `sample configuration file <https://github.com/matrix-org/synapse/blob/v1.34.0/docs/sample_config.yaml#L1515>`_.
If you have set ``room_invite_state_types`` to the default value you should simply
remove it from your configuration file. The default value used to be:
.. code:: yaml
room_invite_state_types:
- "m.room.join_rules"
- "m.room.canonical_alias"
- "m.room.avatar"
- "m.room.encryption"
- "m.room.name"
If you have customised this value by adding addition state types, you should
remove ``room_invite_state_types`` and configure ``additional_event_types`` with
your customisations.
If you have customised this value by removing state types, you should rename
``room_invite_state_types`` to ``additional_event_types``, and set
``disable_default_event_types`` to ``true``.
Upgrading to v1.33.0
====================

1
changelog.d/9882.misc Normal file
View file

@ -0,0 +1 @@
Export jemalloc stats to Prometheus if it is being used.

View file

@ -1 +1 @@
Support stable identifiers from [MSC1772](https://github.com/matrix-org/matrix-doc/pull/1772).
Support stable identifiers for [MSC1772](https://github.com/matrix-org/matrix-doc/pull/1772) Spaces. `m.space.child` events will now be taken into account when populating the experimental spaces summary response. Please see `UPGRADE.rst` if you have customised `room_invite_state_types` in your configuration.

1
changelog.d/9930.bugfix Normal file
View file

@ -0,0 +1 @@
Fix bugs introduced in v1.23.0 which made the PostgreSQL port script fail when run with a newly-created SQLite database.

1
changelog.d/9931.misc Normal file
View file

@ -0,0 +1 @@
Minor fixes to the `make_full_schema.sh` script.

1
changelog.d/9932.misc Normal file
View file

@ -0,0 +1 @@
Move database schema files into a common directory.

1
changelog.d/9935.feature Normal file
View file

@ -0,0 +1 @@
Improve performance of backfilling in large rooms.

1
changelog.d/9945.feature Normal file
View file

@ -0,0 +1 @@
Add a config option to allow you to prevent device display names from being shared over federation. Contributed by @aaronraimist.

View file

@ -1 +0,0 @@
Unpin attrs dependency.

1
changelog.d/9947.feature Normal file
View file

@ -0,0 +1 @@
Update support for [MSC2946](https://github.com/matrix-org/matrix-doc/pull/2946): Spaces Summary.

1
changelog.d/9954.feature Normal file
View file

@ -0,0 +1 @@
Update support for [MSC2946](https://github.com/matrix-org/matrix-doc/pull/2946): Spaces Summary.

1
changelog.d/9959.misc Normal file
View file

@ -0,0 +1 @@
Add debug logging for lost/delayed to-device messages.

1
changelog.d/9961.bugfix Normal file
View file

@ -0,0 +1 @@
Fix a bug introduced in Synapse 1.29.0 which caused `m.room_key_request` to-device messages sent from one user to another to be dropped.

1
changelog.d/9965.bugfix Normal file
View file

@ -0,0 +1 @@
Fix a bug introduced in Synapse 1.29.0 which caused `m.room_key_request` to-device messages sent from one user to another to be dropped.

1
changelog.d/9966.feature Normal file
View file

@ -0,0 +1 @@
Support stable identifiers for [MSC1772](https://github.com/matrix-org/matrix-doc/pull/1772) Spaces. `m.space.child` events will now be taken into account when populating the experimental spaces summary response. Please see `UPGRADE.rst` if you have customised `room_invite_state_types` in your configuration.

6
debian/changelog vendored
View file

@ -1,3 +1,9 @@
matrix-synapse-py3 (1.33.2) stable; urgency=medium
* New synapse release 1.33.2.
-- Synapse Packaging team <packages@matrix.org> Tue, 11 May 2021 11:17:59 +0100
matrix-synapse-py3 (1.33.1) stable; urgency=medium
* New synapse release 1.33.1.

View file

@ -741,6 +741,12 @@ acme:
#
#allow_profile_lookup_over_federation: false
# Uncomment to disable device display name lookup over federation. By default, the
# Federation API allows other homeservers to obtain device display names of any user
# on this homeserver. Defaults to 'true'.
#
#allow_device_name_lookup_over_federation: false
## Caching ##
@ -1515,6 +1521,7 @@ room_prejoin_state:
# - m.room.avatar
# - m.room.encryption
# - m.room.name
# - m.room.create
#
# Uncomment the following to disable these defaults (so that only the event
# types listed in 'additional_event_types' are shared). Defaults to 'false'.

View file

@ -6,7 +6,7 @@
# It does so by having Synapse generate an up-to-date SQLite DB, then running
# synapse_port_db to convert it to Postgres. It then dumps the contents of both.
POSTGRES_HOST="localhost"
export PGHOST="localhost"
POSTGRES_DB_NAME="synapse_full_schema.$$"
SQLITE_FULL_SCHEMA_OUTPUT_FILE="full.sql.sqlite"
@ -32,7 +32,7 @@ usage() {
while getopts "p:co:h" opt; do
case $opt in
p)
POSTGRES_USERNAME=$OPTARG
export PGUSER=$OPTARG
;;
c)
# Print all commands that are being executed
@ -69,7 +69,7 @@ if [ ${#unsatisfied_requirements} -ne 0 ]; then
exit 1
fi
if [ -z "$POSTGRES_USERNAME" ]; then
if [ -z "$PGUSER" ]; then
echo "No postgres username supplied"
usage
exit 1
@ -84,8 +84,9 @@ fi
# Create the output directory if it doesn't exist
mkdir -p "$OUTPUT_DIR"
read -rsp "Postgres password for '$POSTGRES_USERNAME': " POSTGRES_PASSWORD
read -rsp "Postgres password for '$PGUSER': " PGPASSWORD
echo ""
export PGPASSWORD
# Exit immediately if a command fails
set -e
@ -131,9 +132,9 @@ report_stats: false
database:
name: "psycopg2"
args:
user: "$POSTGRES_USERNAME"
host: "$POSTGRES_HOST"
password: "$POSTGRES_PASSWORD"
user: "$PGUSER"
host: "$PGHOST"
password: "$PGPASSWORD"
database: "$POSTGRES_DB_NAME"
# Suppress the key server warning.
@ -150,7 +151,7 @@ scripts-dev/update_database --database-config "$SQLITE_CONFIG"
# Create the PostgreSQL database.
echo "Creating postgres database..."
createdb $POSTGRES_DB_NAME
createdb --lc-collate=C --lc-ctype=C --template=template0 "$POSTGRES_DB_NAME"
echo "Copying data from SQLite3 to Postgres with synapse_port_db..."
if [ -z "$COVERAGE" ]; then
@ -181,7 +182,7 @@ DROP TABLE user_directory_search_docsize;
DROP TABLE user_directory_search_stat;
"
sqlite3 "$SQLITE_DB" <<< "$SQL"
psql $POSTGRES_DB_NAME -U "$POSTGRES_USERNAME" -w <<< "$SQL"
psql "$POSTGRES_DB_NAME" -w <<< "$SQL"
echo "Dumping SQLite3 schema to '$OUTPUT_DIR/$SQLITE_FULL_SCHEMA_OUTPUT_FILE'..."
sqlite3 "$SQLITE_DB" ".dump" > "$OUTPUT_DIR/$SQLITE_FULL_SCHEMA_OUTPUT_FILE"

View file

@ -913,10 +913,11 @@ class Porter(object):
(curr_forward_id + 1,),
)
txn.execute(
"ALTER SEQUENCE events_backfill_stream_seq RESTART WITH %s",
(curr_backward_id + 1,),
)
if curr_backward_id:
txn.execute(
"ALTER SEQUENCE events_backfill_stream_seq RESTART WITH %s",
(curr_backward_id + 1,),
)
await self.postgres_store.db_pool.runInteraction(
"_setup_events_stream_seqs", _setup_events_stream_seqs_set_pos,
@ -954,10 +955,11 @@ class Porter(object):
(curr_chain_id,),
)
await self.postgres_store.db_pool.runInteraction(
"_setup_event_auth_chain_id", r,
)
if curr_chain_id is not None:
await self.postgres_store.db_pool.runInteraction(
"_setup_event_auth_chain_id",
r,
)
##############################################

View file

@ -47,7 +47,7 @@ try:
except ImportError:
pass
__version__ = "1.33.1"
__version__ = "1.33.2"
if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
# We import here so that we don't have to install a bunch of deps when

View file

@ -116,9 +116,12 @@ class EventTypes:
MSC1772_SPACE_PARENT = "org.matrix.msc1772.space.parent"
class ToDeviceEventTypes:
RoomKeyRequest = "m.room_key_request"
class EduTypes:
Presence = "m.presence"
RoomKeyRequest = "m.room_key_request"
class RejectedReason:

View file

@ -37,6 +37,7 @@ from synapse.config.homeserver import HomeServerConfig
from synapse.crypto import context_factory
from synapse.logging.context import PreserveLoggingContext
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.metrics.jemalloc import setup_jemalloc_stats
from synapse.util.async_helpers import Linearizer
from synapse.util.daemonize import daemonize_process
from synapse.util.rlimit import change_resource_limit
@ -115,6 +116,7 @@ def start_reactor(
def run():
logger.info("Running")
setup_jemalloc_stats()
change_resource_limit(soft_file_limit)
if gc_thresholds:
gc.set_threshold(*gc_thresholds)

View file

@ -88,10 +88,6 @@ class ApiConfig(Config):
if not room_prejoin_state_config.get("disable_default_event_types"):
yield from _DEFAULT_PREJOIN_STATE_TYPES
if self.spaces_enabled:
# MSC1772 suggests adding m.room.create to the prejoin state
yield EventTypes.Create
yield from room_prejoin_state_config.get("additional_event_types", [])
@ -109,6 +105,8 @@ _DEFAULT_PREJOIN_STATE_TYPES = [
EventTypes.RoomAvatar,
EventTypes.RoomEncryption,
EventTypes.Name,
# Per MSC1772.
EventTypes.Create,
]

View file

@ -44,6 +44,10 @@ class FederationConfig(Config):
"allow_profile_lookup_over_federation", True
)
self.allow_device_name_lookup_over_federation = config.get(
"allow_device_name_lookup_over_federation", True
)
def generate_config_section(self, config_dir_path, server_name, **kwargs):
return """\
## Federation ##
@ -75,6 +79,12 @@ class FederationConfig(Config):
# on this homeserver. Defaults to 'true'.
#
#allow_profile_lookup_over_federation: false
# Uncomment to disable device display name lookup over federation. By default, the
# Federation API allows other homeservers to obtain device display names of any user
# on this homeserver. Defaults to 'true'.
#
#allow_device_name_lookup_over_federation: false
"""

View file

@ -44,7 +44,6 @@ from synapse.api.errors import (
SynapseError,
UnsupportedRoomVersionError,
)
from synapse.api.ratelimiting import Ratelimiter
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.events import EventBase
from synapse.federation.federation_base import FederationBase, event_from_pdu_json
@ -865,14 +864,6 @@ class FederationHandlerRegistry:
# EDU received.
self._edu_type_to_instance = {} # type: Dict[str, List[str]]
# A rate limiter for incoming room key requests per origin.
self._room_key_request_rate_limiter = Ratelimiter(
store=hs.get_datastore(),
clock=self.clock,
rate_hz=self.config.rc_key_requests.per_second,
burst_count=self.config.rc_key_requests.burst_count,
)
def register_edu_handler(
self, edu_type: str, handler: Callable[[str, JsonDict], Awaitable[None]]
) -> None:
@ -926,16 +917,6 @@ class FederationHandlerRegistry:
if not self.config.use_presence and edu_type == EduTypes.Presence:
return
# If the incoming room key requests from a particular origin are over
# the limit, drop them.
if (
edu_type == EduTypes.RoomKeyRequest
and not await self._room_key_request_rate_limiter.can_do_action(
None, origin
)
):
return
# Check if we have a handler on this instance
handler = self.edu_handlers.get(edu_type)
if handler:

View file

@ -28,6 +28,7 @@ from synapse.api.presence import UserPresenceState
from synapse.events import EventBase
from synapse.federation.units import Edu
from synapse.handlers.presence import format_user_presence_state
from synapse.logging import issue9533_logger
from synapse.logging.opentracing import SynapseTags, set_tag
from synapse.metrics import sent_transactions_counter
from synapse.metrics.background_process_metrics import run_as_background_process
@ -574,6 +575,14 @@ class PerDestinationQueue:
for content in contents
]
if edus:
issue9533_logger.debug(
"Sending %i to-device messages to %s, up to stream id %i",
len(edus),
self._destination,
stream_id,
)
return (edus, stream_id)
def _start_catching_up(self) -> None:

View file

@ -995,6 +995,7 @@ class TransportLayerClient:
returned per space
exclude_rooms: a list of any rooms we can skip
"""
# TODO When switching to the stable endpoint, use GET instead of POST.
path = _create_path(
FEDERATION_UNSTABLE_PREFIX, "/org.matrix.msc2946/spaces/%s", room_id
)

View file

@ -1376,6 +1376,32 @@ class FederationSpaceSummaryServlet(BaseFederationServlet):
PREFIX = FEDERATION_UNSTABLE_PREFIX + "/org.matrix.msc2946"
PATH = "/spaces/(?P<room_id>[^/]*)"
async def on_GET(
self,
origin: str,
content: JsonDict,
query: Mapping[bytes, Sequence[bytes]],
room_id: str,
) -> Tuple[int, JsonDict]:
suggested_only = parse_boolean_from_args(query, "suggested_only", default=False)
max_rooms_per_space = parse_integer_from_args(query, "max_rooms_per_space")
exclude_rooms = []
if b"exclude_rooms" in query:
try:
exclude_rooms = [
room_id.decode("ascii") for room_id in query[b"exclude_rooms"]
]
except Exception:
raise SynapseError(
400, "Bad query parameter for exclude_rooms", Codes.INVALID_PARAM
)
return 200, await self.handler.federation_space_summary(
room_id, suggested_only, max_rooms_per_space, exclude_rooms
)
# TODO When switching to the stable endpoint, remove the POST handler.
async def on_POST(
self,
origin: str,

View file

@ -15,7 +15,7 @@
import logging
from typing import TYPE_CHECKING, Any, Dict
from synapse.api.constants import EduTypes
from synapse.api.constants import ToDeviceEventTypes
from synapse.api.errors import SynapseError
from synapse.api.ratelimiting import Ratelimiter
from synapse.logging.context import run_in_background
@ -79,6 +79,8 @@ class DeviceMessageHandler:
ReplicationUserDevicesResyncRestServlet.make_client(hs)
)
# a rate limiter for room key requests. The keys are
# (sending_user_id, sending_device_id).
self._ratelimiter = Ratelimiter(
store=self.store,
clock=hs.get_clock(),
@ -100,12 +102,25 @@ class DeviceMessageHandler:
for user_id, by_device in content["messages"].items():
# we use UserID.from_string to catch invalid user ids
if not self.is_mine(UserID.from_string(user_id)):
logger.warning("Request for keys for non-local user %s", user_id)
logger.warning("To-device message to non-local user %s", user_id)
raise SynapseError(400, "Not a user here")
if not by_device:
continue
# Ratelimit key requests by the sending user.
if message_type == ToDeviceEventTypes.RoomKeyRequest:
allowed, _ = await self._ratelimiter.can_do_action(
None, (sender_user_id, None)
)
if not allowed:
logger.info(
"Dropping room_key_request from %s to %s due to rate limit",
sender_user_id,
user_id,
)
continue
messages_by_device = {
device_id: {
"content": message_content,
@ -192,13 +207,19 @@ class DeviceMessageHandler:
for user_id, by_device in messages.items():
# Ratelimit local cross-user key requests by the sending device.
if (
message_type == EduTypes.RoomKeyRequest
message_type == ToDeviceEventTypes.RoomKeyRequest
and user_id != sender_user_id
and await self._ratelimiter.can_do_action(
):
allowed, _ = await self._ratelimiter.can_do_action(
requester, (sender_user_id, requester.device_id)
)
):
continue
if not allowed:
logger.info(
"Dropping room_key_request from %s to %s due to rate limit",
sender_user_id,
user_id,
)
continue
# we use UserID.from_string to catch invalid user ids
if self.is_mine(UserID.from_string(user_id)):

View file

@ -552,8 +552,12 @@ class FederationHandler(BaseHandler):
destination: str,
room_id: str,
event_id: str,
) -> Tuple[List[EventBase], List[EventBase]]:
"""Requests all of the room state at a given event from a remote homeserver.
) -> List[EventBase]:
"""Requests all of the room state at a given event from a remote
homeserver.
Will also fetch any missing events reported in the `auth_chain_ids`
section of `/state_ids`.
Args:
destination: The remote homeserver to query for the state.
@ -561,8 +565,7 @@ class FederationHandler(BaseHandler):
event_id: The id of the event we want the state at.
Returns:
A list of events in the state, not including the event itself, and
a list of events in the auth chain for the given event.
A list of events in the state, not including the event itself.
"""
(
state_event_ids,
@ -571,68 +574,53 @@ class FederationHandler(BaseHandler):
destination, room_id, event_id=event_id
)
desired_events = set(state_event_ids + auth_event_ids)
event_map = await self._get_events_from_store_or_dest(
destination, room_id, desired_events
)
failed_to_fetch = desired_events - event_map.keys()
if failed_to_fetch:
logger.warning(
"Failed to fetch missing state/auth events for %s %s",
event_id,
failed_to_fetch,
)
remote_state = [
event_map[e_id] for e_id in state_event_ids if e_id in event_map
]
auth_chain = [event_map[e_id] for e_id in auth_event_ids if e_id in event_map]
auth_chain.sort(key=lambda e: e.depth)
return remote_state, auth_chain
async def _get_events_from_store_or_dest(
self, destination: str, room_id: str, event_ids: Iterable[str]
) -> Dict[str, EventBase]:
"""Fetch events from a remote destination, checking if we already have them.
Persists any events we don't already have as outliers.
If we fail to fetch any of the events, a warning will be logged, and the event
will be omitted from the result. Likewise, any events which turn out not to
be in the given room.
This function *does not* automatically get missing auth events of the
newly fetched events. Callers must include the full auth chain of
of the missing events in the `event_ids` argument, to ensure that any
missing auth events are correctly fetched.
Returns:
map from event_id to event
"""
fetched_events = await self.store.get_events(event_ids, allow_rejected=True)
missing_events = set(event_ids) - fetched_events.keys()
if missing_events:
logger.debug(
"Fetching unknown state/auth events %s for room %s",
missing_events,
room_id,
)
# Fetch the state events from the DB, and check we have the auth events.
event_map = await self.store.get_events(state_event_ids, allow_rejected=True)
auth_events_in_store = await self.store.have_seen_events(auth_event_ids)
# Check for missing events. We handle state and auth event seperately,
# as we want to pull the state from the DB, but we don't for the auth
# events. (Note: we likely won't use the majority of the auth chain, and
# it can be *huge* for large rooms, so it's worth ensuring that we don't
# unnecessarily pull it from the DB).
missing_state_events = set(state_event_ids) - set(event_map)
missing_auth_events = set(auth_event_ids) - set(auth_events_in_store)
if missing_state_events or missing_auth_events:
await self._get_events_and_persist(
destination=destination, room_id=room_id, events=missing_events
destination=destination,
room_id=room_id,
events=missing_state_events | missing_auth_events,
)
# we need to make sure we re-load from the database to get the rejected
# state correct.
fetched_events.update(
(await self.store.get_events(missing_events, allow_rejected=True))
)
if missing_state_events:
new_events = await self.store.get_events(
missing_state_events, allow_rejected=True
)
event_map.update(new_events)
missing_state_events.difference_update(new_events)
if missing_state_events:
logger.warning(
"Failed to fetch missing state events for %s %s",
event_id,
missing_state_events,
)
if missing_auth_events:
auth_events_in_store = await self.store.have_seen_events(
missing_auth_events
)
missing_auth_events.difference_update(auth_events_in_store)
if missing_auth_events:
logger.warning(
"Failed to fetch missing auth events for %s %s",
event_id,
missing_auth_events,
)
remote_state = list(event_map.values())
# check for events which were in the wrong room.
#
@ -640,8 +628,8 @@ class FederationHandler(BaseHandler):
# auth_events at an event in room A are actually events in room B
bad_events = [
(event_id, event.room_id)
for event_id, event in fetched_events.items()
(event.event_id, event.room_id)
for event in remote_state
if event.room_id != room_id
]
@ -658,9 +646,10 @@ class FederationHandler(BaseHandler):
room_id,
)
del fetched_events[bad_event_id]
if bad_events:
remote_state = [e for e in remote_state if e.room_id == room_id]
return fetched_events
return remote_state
async def _get_state_after_missing_prev_event(
self,
@ -963,27 +952,23 @@ class FederationHandler(BaseHandler):
# For each edge get the current state.
auth_events = {}
state_events = {}
events_to_state = {}
for e_id in edges:
state, auth = await self._get_state_for_room(
state = await self._get_state_for_room(
destination=dest,
room_id=room_id,
event_id=e_id,
)
auth_events.update({a.event_id: a for a in auth})
auth_events.update({s.event_id: s for s in state})
state_events.update({s.event_id: s for s in state})
events_to_state[e_id] = state
required_auth = {
a_id
for event in events
+ list(state_events.values())
+ list(auth_events.values())
for event in events + list(state_events.values())
for a_id in event.auth_event_ids()
}
auth_events = await self.store.get_events(required_auth, allow_rejected=True)
auth_events.update(
{e_id: event_map[e_id] for e_id in required_auth if e_id in event_map}
)

View file

@ -14,6 +14,7 @@
import itertools
import logging
import re
from collections import deque
from typing import TYPE_CHECKING, Iterable, List, Optional, Sequence, Set, Tuple, cast
@ -226,6 +227,23 @@ class SpaceSummaryHandler:
suggested_only: bool,
max_children: Optional[int],
) -> Tuple[Sequence[JsonDict], Sequence[JsonDict]]:
"""
Generate a room entry and a list of event entries for a given room.
Args:
requester: The requesting user, or None if this is over federation.
room_id: The room ID to summarize.
suggested_only: True if only suggested children should be returned.
Otherwise, all children are returned.
max_children: The maximum number of children to return for this node.
Returns:
A tuple of:
An iterable of a single value of the room.
An iterable of the sorted children events. This may be limited
to a maximum size or may include all children.
"""
if not await self._is_room_accessible(room_id, requester):
return (), ()
@ -357,6 +375,18 @@ class SpaceSummaryHandler:
return room_entry
async def _get_child_events(self, room_id: str) -> Iterable[EventBase]:
"""
Get the child events for a given room.
The returned results are sorted for stability.
Args:
room_id: The room id to get the children of.
Returns:
An iterable of sorted child events.
"""
# look for child rooms/spaces.
current_state_ids = await self._store.get_current_state_ids(room_id)
@ -370,8 +400,9 @@ class SpaceSummaryHandler:
]
)
# filter out any events without a "via" (which implies it has been redacted)
return (e for e in events if _has_valid_via(e))
# filter out any events without a "via" (which implies it has been redacted),
# and order to ensure we return stable results.
return sorted(filter(_has_valid_via, events), key=_child_events_comparison_key)
@attr.s(frozen=True, slots=True)
@ -397,3 +428,39 @@ def _is_suggested_child_event(edge_event: EventBase) -> bool:
return True
logger.debug("Ignorning not-suggested child %s", edge_event.state_key)
return False
# Order may only contain characters in the range of \x20 (space) to \x7F (~).
_INVALID_ORDER_CHARS_RE = re.compile(r"[^\x20-\x7F]")
def _child_events_comparison_key(child: EventBase) -> Tuple[bool, Optional[str], str]:
"""
Generate a value for comparing two child events for ordering.
The rules for ordering are supposed to be:
1. The 'order' key, if it is valid.
2. The 'origin_server_ts' of the 'm.room.create' event.
3. The 'room_id'.
But we skip step 2 since we may not have any state from the room.
Args:
child: The event for generating a comparison key.
Returns:
The comparison key as a tuple of:
False if the ordering is valid.
The ordering field.
The room ID.
"""
order = child.content.get("order")
# If order is not a string or doesn't meet the requirements, ignore it.
if not isinstance(order, str):
order = None
elif len(order) > 50 or _INVALID_ORDER_CHARS_RE.search(order):
order = None
# Items without an order come last.
return (order is None, order, child.room_id)

View file

@ -12,8 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# These are imported to allow for nicer logging configuration files.
import logging
from synapse.logging._remote import RemoteHandler
from synapse.logging._terse_json import JsonFormatter, TerseJsonFormatter
# These are imported to allow for nicer logging configuration files.
__all__ = ["RemoteHandler", "JsonFormatter", "TerseJsonFormatter"]
# Debug logger for https://github.com/matrix-org/synapse/issues/9533 etc
issue9533_logger = logging.getLogger("synapse.9533_debug")

View file

@ -629,6 +629,7 @@ try:
except AttributeError:
pass
__all__ = [
"MetricsResource",
"generate_latest",

196
synapse/metrics/jemalloc.py Normal file
View file

@ -0,0 +1,196 @@
# Copyright 2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import ctypes
import logging
import os
import re
from typing import Optional
from synapse.metrics import REGISTRY, GaugeMetricFamily
logger = logging.getLogger(__name__)
def _setup_jemalloc_stats():
"""Checks to see if jemalloc is loaded, and hooks up a collector to record
statistics exposed by jemalloc.
"""
# Try to find the loaded jemalloc shared library, if any. We need to
# introspect into what is loaded, rather than loading whatever is on the
# path, as if we load a *different* jemalloc version things will seg fault.
# We look in `/proc/self/maps`, which only exists on linux.
if not os.path.exists("/proc/self/maps"):
logger.debug("Not looking for jemalloc as no /proc/self/maps exist")
return
# We're looking for a path at the end of the line that includes
# "libjemalloc".
regex = re.compile(r"/\S+/libjemalloc.*$")
jemalloc_path = None
with open("/proc/self/maps") as f:
for line in f:
match = regex.search(line.strip())
if match:
jemalloc_path = match.group()
if not jemalloc_path:
# No loaded jemalloc was found.
logger.debug("jemalloc not found")
return
logger.debug("Found jemalloc at %s", jemalloc_path)
jemalloc = ctypes.CDLL(jemalloc_path)
def _mallctl(
name: str, read: bool = True, write: Optional[int] = None
) -> Optional[int]:
"""Wrapper around `mallctl` for reading and writing integers to
jemalloc.
Args:
name: The name of the option to read from/write to.
read: Whether to try and read the value.
write: The value to write, if given.
Returns:
The value read if `read` is True, otherwise None.
Raises:
An exception if `mallctl` returns a non-zero error code.
"""
input_var = None
input_var_ref = None
input_len_ref = None
if read:
input_var = ctypes.c_size_t(0)
input_len = ctypes.c_size_t(ctypes.sizeof(input_var))
input_var_ref = ctypes.byref(input_var)
input_len_ref = ctypes.byref(input_len)
write_var_ref = None
write_len = ctypes.c_size_t(0)
if write is not None:
write_var = ctypes.c_size_t(write)
write_len = ctypes.c_size_t(ctypes.sizeof(write_var))
write_var_ref = ctypes.byref(write_var)
# The interface is:
#
# int mallctl(
# const char *name,
# void *oldp,
# size_t *oldlenp,
# void *newp,
# size_t newlen
# )
#
# Where oldp/oldlenp is a buffer where the old value will be written to
# (if not null), and newp/newlen is the buffer with the new value to set
# (if not null). Note that they're all references *except* newlen.
result = jemalloc.mallctl(
name.encode("ascii"),
input_var_ref,
input_len_ref,
write_var_ref,
write_len,
)
if result != 0:
raise Exception("Failed to call mallctl")
if input_var is None:
return None
return input_var.value
def _jemalloc_refresh_stats() -> None:
"""Request that jemalloc updates its internal statistics. This needs to
be called before querying for stats, otherwise it will return stale
values.
"""
try:
_mallctl("epoch", read=False, write=1)
except Exception as e:
logger.warning("Failed to reload jemalloc stats: %s", e)
class JemallocCollector:
"""Metrics for internal jemalloc stats."""
def collect(self):
_jemalloc_refresh_stats()
g = GaugeMetricFamily(
"jemalloc_stats_app_memory_bytes",
"The stats reported by jemalloc",
labels=["type"],
)
# Read the relevant global stats from jemalloc. Note that these may
# not be accurate if python is configured to use its internal small
# object allocator (which is on by default, disable by setting the
# env `PYTHONMALLOC=malloc`).
#
# See the jemalloc manpage for details about what each value means,
# roughly:
# - allocated ─ Total number of bytes allocated by the app
# - active ─ Total number of bytes in active pages allocated by
# the application, this is bigger than `allocated`.
# - resident ─ Maximum number of bytes in physically resident data
# pages mapped by the allocator, comprising all pages dedicated
# to allocator metadata, pages backing active allocations, and
# unused dirty pages. This is bigger than `active`.
# - mapped ─ Total number of bytes in active extents mapped by the
# allocator.
# - metadata ─ Total number of bytes dedicated to jemalloc
# metadata.
for t in (
"allocated",
"active",
"resident",
"mapped",
"metadata",
):
try:
value = _mallctl(f"stats.{t}")
except Exception as e:
# There was an error fetching the value, skip.
logger.warning("Failed to read jemalloc stats.%s: %s", t, e)
continue
g.add_metric([t], value=value)
yield g
REGISTRY.register(JemallocCollector())
logger.debug("Added jemalloc stats")
def setup_jemalloc_stats():
"""Try to setup jemalloc stats, if jemalloc is loaded."""
try:
_setup_jemalloc_stats()
except Exception as e:
# This should only happen if we find the loaded jemalloc library, but
# fail to load it somehow (e.g. we somehow picked the wrong version).
logger.info("Failed to setup collector to record jemalloc stats: %s", e)

View file

@ -38,6 +38,7 @@ from synapse.api.constants import EventTypes, HistoryVisibility, Membership
from synapse.api.errors import AuthError
from synapse.events import EventBase
from synapse.handlers.presence import format_user_presence_state
from synapse.logging import issue9533_logger
from synapse.logging.context import PreserveLoggingContext
from synapse.logging.opentracing import log_kv, start_active_span
from synapse.logging.utils import log_function
@ -426,6 +427,13 @@ class Notifier:
for room in rooms:
user_streams |= self.room_to_user_streams.get(room, set())
if stream_key == "to_device_key":
issue9533_logger.debug(
"to-device messages stream id %s, awaking streams for %s",
new_token,
users,
)
time_now_ms = self.clock.time_msec()
for user_stream in user_streams:
try:

View file

@ -51,7 +51,6 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
# How long we allow callers to wait for replication updates before timing out.
_WAIT_FOR_REPLICATION_TIMEOUT_SECONDS = 30

View file

@ -1020,6 +1020,7 @@ class RoomSpaceSummaryRestServlet(RestServlet):
max_rooms_per_space=parse_integer(request, "max_rooms_per_space"),
)
# TODO When switching to the stable endpoint, remove the POST handler.
async def on_POST(
self, request: SynapseRequest, room_id: str
) -> Tuple[int, JsonDict]:

View file

@ -15,6 +15,7 @@
import logging
from typing import List, Optional, Tuple
from synapse.logging import issue9533_logger
from synapse.logging.opentracing import log_kv, set_tag, trace
from synapse.replication.tcp.streams import ToDeviceStream
from synapse.storage._base import SQLBaseStore, db_to_json
@ -404,6 +405,13 @@ class DeviceInboxWorkerStore(SQLBaseStore):
],
)
if remote_messages_by_destination:
issue9533_logger.debug(
"Queued outgoing to-device messages with stream_id %i for %s",
stream_id,
list(remote_messages_by_destination.keys()),
)
async with self._device_inbox_id_gen.get_next() as stream_id:
now_ms = self.clock.time_msec()
await self.db_pool.runInteraction(
@ -533,6 +541,16 @@ class DeviceInboxWorkerStore(SQLBaseStore):
],
)
issue9533_logger.debug(
"Stored to-device messages with stream_id %i for %s",
stream_id,
[
(user_id, device_id)
for (user_id, messages_by_device) in local_by_user_then_device.items()
for device_id in messages_by_device.keys()
],
)
class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop"

View file

@ -84,7 +84,9 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore):
if keys:
result["keys"] = keys
device_display_name = device.display_name
device_display_name = None
if self.hs.config.allow_device_name_lookup_over_federation:
device_display_name = device.display_name
if device_display_name:
result["device_display_name"] = device_display_name

View file

@ -1,21 +0,0 @@
# Synapse Database Schemas
These schemas are used as a basis to create brand new Synapse databases, on both
SQLite3 and Postgres.
## Building full schema dumps
If you want to recreate these schemas, they need to be made from a database that
has had all background updates run.
To do so, use `scripts-dev/make_full_schema.sh`. This will produce new
`full.sql.postgres ` and `full.sql.sqlite` files.
Ensure postgres is installed and your user has the ability to run bash commands
such as `createdb`, then call
./scripts-dev/make_full_schema.sh -p postgres_username -o output_dir/
There are currently two folders with full-schema snapshots. `16` is a snapshot
from 2015, for historical reference. The other contains the most recent full
schema snapshot.

View file

@ -26,16 +26,13 @@ from synapse.config.homeserver import HomeServerConfig
from synapse.storage.database import LoggingDatabaseConnection
from synapse.storage.engines import BaseDatabaseEngine
from synapse.storage.engines.postgres import PostgresEngine
from synapse.storage.schema import SCHEMA_VERSION
from synapse.storage.types import Cursor
logger = logging.getLogger(__name__)
# Remember to update this number every time a change is made to database
# schema files, so the users will be informed on server restarts.
SCHEMA_VERSION = 59
dir_path = os.path.abspath(os.path.dirname(__file__))
schema_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), "schema")
class PrepareDatabaseException(Exception):
@ -167,7 +164,14 @@ def _setup_new_database(
Example directory structure:
schema/
schema/
common/
delta/
...
full_schemas/
11/
foo.sql
main/
delta/
...
full_schemas/
@ -175,15 +179,14 @@ def _setup_new_database(
test.sql
...
11/
foo.sql
bar.sql
...
In the example foo.sql and bar.sql would be run, and then any delta files
for versions strictly greater than 11.
Note: we apply the full schemas and deltas from the top level `schema/`
folder as well those in the data stores specified.
Note: we apply the full schemas and deltas from the `schema/common`
folder as well those in the databases specified.
Args:
cur: a database cursor
@ -195,12 +198,12 @@ def _setup_new_database(
# configured to our liking.
database_engine.check_new_database(cur)
current_dir = os.path.join(dir_path, "schema", "full_schemas")
full_schemas_dir = os.path.join(schema_path, "common", "full_schemas")
# First we find the highest full schema version we have
valid_versions = []
for filename in os.listdir(current_dir):
for filename in os.listdir(full_schemas_dir):
try:
ver = int(filename)
except ValueError:
@ -218,15 +221,13 @@ def _setup_new_database(
logger.debug("Initialising schema v%d", max_current_ver)
# Now lets find all the full schema files, both in the global schema and
# in data store schemas.
directories = [os.path.join(current_dir, str(max_current_ver))]
# Now let's find all the full schema files, both in the common schema and
# in database schemas.
directories = [os.path.join(full_schemas_dir, str(max_current_ver))]
directories.extend(
os.path.join(
dir_path,
"databases",
schema_path,
database,
"schema",
"full_schemas",
str(max_current_ver),
)
@ -357,6 +358,9 @@ def _upgrade_existing_database(
check_database_before_upgrade(cur, database_engine, config)
start_ver = current_version
# if we got to this schema version by running a full_schema rather than a series
# of deltas, we should not run the deltas for this version.
if not upgraded:
start_ver += 1
@ -385,12 +389,10 @@ def _upgrade_existing_database(
# directories for schema updates.
# First we find the directories to search in
delta_dir = os.path.join(dir_path, "schema", "delta", str(v))
delta_dir = os.path.join(schema_path, "common", "delta", str(v))
directories = [delta_dir]
for database in databases:
directories.append(
os.path.join(dir_path, "databases", database, "schema", "delta", str(v))
)
directories.append(os.path.join(schema_path, database, "delta", str(v)))
# Used to check if we have any duplicate file names
file_name_counter = Counter() # type: CounterType[str]
@ -621,8 +623,8 @@ def _get_or_create_schema_state(
txn: Cursor, database_engine: BaseDatabaseEngine
) -> Optional[Tuple[int, List[str], bool]]:
# Bluntly try creating the schema_version tables.
schema_path = os.path.join(dir_path, "schema", "schema_version.sql")
executescript(txn, schema_path)
sql_path = os.path.join(schema_path, "common", "schema_version.sql")
executescript(txn, sql_path)
txn.execute("SELECT version, upgraded FROM schema_version")
row = txn.fetchone()

View file

@ -0,0 +1,37 @@
# Synapse Database Schemas
This directory contains the schema files used to build Synapse databases.
Synapse supports splitting its datastore across multiple physical databases (which can
be useful for large installations), and the schema files are therefore split according
to the logical database they are apply to.
At the time of writing, the following "logical" databases are supported:
* `state` - used to store Matrix room state (more specifically, `state_groups`,
their relationships and contents.)
* `main` - stores everything else.
Addionally, the `common` directory contains schema files for tables which must be
present on *all* physical databases.
## Full schema dumps
In the `full_schemas` directories, only the most recently-numbered snapshot is useful
(`54` at the time of writing). Older snapshots (eg, `16`) are present for historical
reference only.
## Building full schema dumps
If you want to recreate these schemas, they need to be made from a database that
has had all background updates run.
To do so, use `scripts-dev/make_full_schema.sh`. This will produce new
`full.sql.postgres` and `full.sql.sqlite` files.
Ensure postgres is installed, then run:
./scripts-dev/make_full_schema.sh -p postgres_username -o output_dir/
NB at the time of writing, this script predates the split into separate `state`/`main`
databases so will require updates to handle that correctly.

View file

@ -0,0 +1,17 @@
# Copyright 2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Remember to update this number every time a change is made to database
# schema files, so the users will be informed on server restarts.
SCHEMA_VERSION = 59

Some files were not shown because too many files have changed in this diff Show more