Merge branch 'develop' of github.com:matrix-org/synapse into neilj/mau_sync_block

This commit is contained in:
Neil Johnson 2018-08-09 11:40:37 +01:00
commit a5ef110749
98 changed files with 819 additions and 405 deletions

View file

@ -1,3 +1,76 @@
Synapse 0.33.2 (2018-08-09)
===========================
No significant changes.
Synapse 0.33.2rc1 (2018-08-07)
==============================
Features
--------
- add support for the lazy_loaded_members filter as per MSC1227 ([\#2970](https://github.com/matrix-org/synapse/issues/2970))
- add support for the include_redundant_members filter param as per MSC1227 ([\#3331](https://github.com/matrix-org/synapse/issues/3331))
- Add metrics to track resource usage by background processes ([\#3553](https://github.com/matrix-org/synapse/issues/3553), [\#3556](https://github.com/matrix-org/synapse/issues/3556), [\#3604](https://github.com/matrix-org/synapse/issues/3604), [\#3610](https://github.com/matrix-org/synapse/issues/3610))
- Add `code` label to `synapse_http_server_response_time_seconds` prometheus metric ([\#3554](https://github.com/matrix-org/synapse/issues/3554))
- Add support for client_reader to handle more APIs ([\#3555](https://github.com/matrix-org/synapse/issues/3555), [\#3597](https://github.com/matrix-org/synapse/issues/3597))
- make the /context API filter & lazy-load aware as per MSC1227 ([\#3567](https://github.com/matrix-org/synapse/issues/3567))
- Add ability to limit number of monthly active users on the server ([\#3630](https://github.com/matrix-org/synapse/issues/3630))
- When we fail to join a room over federation, pass the error code back to the client. ([\#3639](https://github.com/matrix-org/synapse/issues/3639))
- Add a new /admin/register API for non-interactively creating users. ([\#3415](https://github.com/matrix-org/synapse/issues/3415))
Bugfixes
--------
- Make /directory/list API return 404 for room not found instead of 400 ([\#2952](https://github.com/matrix-org/synapse/issues/2952))
- Default inviter_display_name to mxid for email invites ([\#3391](https://github.com/matrix-org/synapse/issues/3391))
- Don't generate TURN credentials if no TURN config options are set ([\#3514](https://github.com/matrix-org/synapse/issues/3514))
- Correctly announce deleted devices over federation ([\#3520](https://github.com/matrix-org/synapse/issues/3520))
- Catch failures saving metrics captured by Measure, and instead log the faulty metrics information for further analysis. ([\#3548](https://github.com/matrix-org/synapse/issues/3548))
- Unicode passwords are now normalised before hashing, preventing the instance where two different devices or browsers might send a different UTF-8 sequence for the password. ([\#3569](https://github.com/matrix-org/synapse/issues/3569))
- Fix potential stack overflow and deadlock under heavy load ([\#3570](https://github.com/matrix-org/synapse/issues/3570))
- Respond with M_NOT_FOUND when profiles are not found locally or over federation. Fixes #3585 ([\#3585](https://github.com/matrix-org/synapse/issues/3585))
- Fix failure to persist events over federation under load ([\#3601](https://github.com/matrix-org/synapse/issues/3601))
- Fix updating of cached remote profiles ([\#3605](https://github.com/matrix-org/synapse/issues/3605))
- Fix 'tuple index out of range' error ([\#3607](https://github.com/matrix-org/synapse/issues/3607))
- Only import secrets when available (fix for py < 3.6) ([\#3626](https://github.com/matrix-org/synapse/issues/3626))
Internal Changes
----------------
- Remove redundant checks on who_forgot_in_room ([\#3350](https://github.com/matrix-org/synapse/issues/3350))
- Remove unnecessary event re-signing hacks ([\#3367](https://github.com/matrix-org/synapse/issues/3367))
- Rewrite cache list decorator ([\#3384](https://github.com/matrix-org/synapse/issues/3384))
- Move v1-only REST APIs into their own module. ([\#3460](https://github.com/matrix-org/synapse/issues/3460))
- Replace more instances of Python 2-only iteritems and itervalues uses. ([\#3562](https://github.com/matrix-org/synapse/issues/3562))
- Refactor EventContext to accept state during init ([\#3577](https://github.com/matrix-org/synapse/issues/3577))
- Improve Dockerfile and docker-compose instructions ([\#3543](https://github.com/matrix-org/synapse/issues/3543))
- Release notes are now in the Markdown format. ([\#3552](https://github.com/matrix-org/synapse/issues/3552))
- add config for pep8 ([\#3559](https://github.com/matrix-org/synapse/issues/3559))
- Merge Linearizer and Limiter ([\#3571](https://github.com/matrix-org/synapse/issues/3571), [\#3572](https://github.com/matrix-org/synapse/issues/3572))
- Lazily load state on master process when using workers to reduce DB consumption ([\#3579](https://github.com/matrix-org/synapse/issues/3579), [\#3581](https://github.com/matrix-org/synapse/issues/3581), [\#3582](https://github.com/matrix-org/synapse/issues/3582), [\#3584](https://github.com/matrix-org/synapse/issues/3584))
- Fixes and optimisations for resolve_state_groups ([\#3586](https://github.com/matrix-org/synapse/issues/3586))
- Improve logging for exceptions when handling PDUs ([\#3587](https://github.com/matrix-org/synapse/issues/3587))
- Add some measure blocks to persist_events ([\#3590](https://github.com/matrix-org/synapse/issues/3590))
- Fix some random logcontext leaks. ([\#3591](https://github.com/matrix-org/synapse/issues/3591), [\#3606](https://github.com/matrix-org/synapse/issues/3606))
- Speed up calculating state deltas in persist_event loop ([\#3592](https://github.com/matrix-org/synapse/issues/3592))
- Attempt to reduce amount of state pulled out of DB during persist_events ([\#3595](https://github.com/matrix-org/synapse/issues/3595))
- Fix a documentation typo in on_make_leave_request ([\#3609](https://github.com/matrix-org/synapse/issues/3609))
- Make EventStore inherit from EventFederationStore ([\#3612](https://github.com/matrix-org/synapse/issues/3612))
- Remove some redundant joins on event_edges.room_id ([\#3613](https://github.com/matrix-org/synapse/issues/3613))
- Stop populating events.content ([\#3614](https://github.com/matrix-org/synapse/issues/3614))
- Update the /send_leave path registration to use event_id rather than a transaction ID. ([\#3616](https://github.com/matrix-org/synapse/issues/3616))
- Refactor FederationHandler to move DB writes into separate functions ([\#3621](https://github.com/matrix-org/synapse/issues/3621))
- Remove unused field "pdu_failures" from transactions. ([\#3628](https://github.com/matrix-org/synapse/issues/3628))
- rename replication_layer to federation_client ([\#3634](https://github.com/matrix-org/synapse/issues/3634))
- Factor out exception handling in federation_client ([\#3638](https://github.com/matrix-org/synapse/issues/3638))
- Refactor location of docker build script. ([\#3644](https://github.com/matrix-org/synapse/issues/3644))
- Update CONTRIBUTING to mention newsfragments. ([\#3645](https://github.com/matrix-org/synapse/issues/3645))
Synapse 0.33.1 (2018-08-02) Synapse 0.33.1 (2018-08-02)
=========================== ===========================

View file

@ -1 +0,0 @@
Make /directory/list API return 404 for room not found instead of 400

View file

@ -1 +0,0 @@
add support for the lazy_loaded_members filter as per MSC1227

View file

@ -1 +0,0 @@
add support for the include_redundant_members filter param as per MSC1227

View file

@ -1 +0,0 @@
Remove redundant checks on who_forgot_in_room

View file

@ -1 +0,0 @@
Remove unnecessary event re-signing hacks

View file

@ -1 +0,0 @@
Rewrite cache list decorator

View file

@ -1 +0,0 @@
Default inviter_display_name to mxid for email invites

View file

View file

View file

@ -1 +0,0 @@
Don't generate TURN credentials if no TURN config options are set

View file

@ -1 +0,0 @@
Correctly announce deleted devices over federation

View file

@ -1 +0,0 @@
Improve Dockerfile and docker-compose instructions

View file

@ -1 +0,0 @@
Catch failures saving metrics captured by Measure, and instead log the faulty metrics information for further analysis.

View file

@ -1 +0,0 @@
Release notes are now in the Markdown format.

View file

@ -1 +0,0 @@
Add metrics to track resource usage by background processes

View file

@ -1 +0,0 @@
Add `code` label to `synapse_http_server_response_time_seconds` prometheus metric

View file

@ -1 +0,0 @@
Add support for client_reader to handle more APIs

View file

@ -1 +0,0 @@
Add metrics to track resource usage by background processes

View file

@ -1 +0,0 @@
add config for pep8

View file

View file

@ -1 +0,0 @@
make the /context API filter & lazy-load aware as per MSC1227

View file

@ -1 +0,0 @@
Unicode passwords are now normalised before hashing, preventing the instance where two different devices or browsers might send a different UTF-8 sequence for the password.

View file

@ -1 +0,0 @@
Fix potential stack overflow and deadlock under heavy load

View file

@ -1 +0,0 @@
Merge Linearizer and Limiter

View file

@ -1 +0,0 @@
Merge Linearizer and Limiter

View file

View file

@ -1 +0,0 @@
Lazily load state on master process when using workers to reduce DB consumption

View file

@ -1 +0,0 @@
Lazily load state on master process when using workers to reduce DB consumption

View file

@ -1 +0,0 @@
Lazily load state on master process when using workers to reduce DB consumption

View file

@ -1 +0,0 @@
Lazily load state on master process when using workers to reduce DB consumption

View file

@ -1 +0,0 @@
Respond with M_NOT_FOUND when profiles are not found locally or over federation. Fixes #3585

View file

@ -1 +0,0 @@
Fixes and optimisations for resolve_state_groups

View file

@ -1 +0,0 @@
Improve logging for exceptions when handling PDUs

View file

@ -1 +0,0 @@
Add some measure blocks to persist_events

View file

@ -1 +0,0 @@
Fix some random logcontext leaks.

View file

@ -1 +0,0 @@
Speed up calculating state deltas in persist_event loop

View file

@ -1 +0,0 @@
Attempt to reduce amount of state pulled out of DB during persist_events

View file

@ -1 +0,0 @@
Add support for client_reader to handle more APIs

View file

@ -1 +0,0 @@
Fix failure to persist events over federation under load

View file

@ -1 +0,0 @@
Add metrics to track resource usage by background processes

View file

@ -1 +0,0 @@
Fix updating of cached remote profiles

View file

@ -1 +0,0 @@
Fix some random logcontext leaks.

View file

@ -1 +0,0 @@
Fix 'tuple index out of range' error

View file

@ -1 +0,0 @@
Fix a documentation typo in on_make_leave_request

View file

@ -1 +0,0 @@
Add metrics to track resource usage by background processes

View file

@ -1 +0,0 @@
Make EventStore inherit from EventFederationStore

View file

@ -1 +0,0 @@
Remove some redundant joins on event_edges.room_id

View file

@ -1 +0,0 @@
Stop populating events.content

View file

@ -1 +0,0 @@
Update the /send_leave path registration to use event_id rather than a transaction ID.

View file

@ -1 +0,0 @@
Refactor FederationHandler to move DB writes into separate functions

View file

@ -1 +0,0 @@
Only import secrets when available (fix for py < 3.6)

View file

@ -1 +0,0 @@
Remove unused field "pdu_failures" from transactions.

View file

@ -1 +0,0 @@
Add ability to limit number of monthly active users on the server

1
changelog.d/3632.misc Normal file
View file

@ -0,0 +1 @@
Refactor HTTP replication endpoints to reduce code duplication

View file

@ -1 +0,0 @@
rename replication_layer to federation_client

View file

@ -1 +0,0 @@
Factor out exception handling in federation_client

View file

@ -1 +0,0 @@
When we fail to join a room over federation, pass the error code back to the client.

View file

@ -1 +0,0 @@
Refactor location of docker build script.

View file

@ -1 +0,0 @@
Update CONTRIBUTING to mention newsfragments.

1
changelog.d/3647.misc Normal file
View file

@ -0,0 +1 @@
Tests now correctly execute on Python 3.

1
changelog.d/3654.feature Normal file
View file

@ -0,0 +1 @@
Basic support for room versioning

1
changelog.d/3664.feature Normal file
View file

@ -0,0 +1 @@
Add some metrics for the appservice and federation event sending loops

View file

@ -2,7 +2,7 @@
package = "synapse" package = "synapse"
filename = "CHANGES.md" filename = "CHANGES.md"
directory = "changelog.d" directory = "changelog.d"
issue_format = "[\\#{issue}](https://github.com/matrix-org/synapse/issues/{issue}>)" issue_format = "[\\#{issue}](https://github.com/matrix-org/synapse/issues/{issue})"
[[tool.towncrier.type]] [[tool.towncrier.type]]
directory = "feature" directory = "feature"

View file

@ -17,4 +17,4 @@
""" This is a reference implementation of a Matrix home server. """ This is a reference implementation of a Matrix home server.
""" """
__version__ = "0.33.1" __version__ = "0.33.2"

View file

@ -1,6 +1,7 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd # Copyright 2014-2016 OpenMarket Ltd
# Copyright 2017 Vector Creations Ltd # Copyright 2017 Vector Creations Ltd
# Copyright 2018 New Vector Ltd.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
@ -94,3 +95,11 @@ class RoomCreationPreset(object):
class ThirdPartyEntityKind(object): class ThirdPartyEntityKind(object):
USER = "user" USER = "user"
LOCATION = "location" LOCATION = "location"
# the version we will give rooms which are created on this server
DEFAULT_ROOM_VERSION = "1"
# vdh-test-version is a placeholder to get room versioning support working and tested
# until we have a working v2.
KNOWN_ROOM_VERSIONS = {"1", "vdh-test-version"}

View file

@ -1,5 +1,6 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd # Copyright 2014-2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
@ -56,6 +57,8 @@ class Codes(object):
CONSENT_NOT_GIVEN = "M_CONSENT_NOT_GIVEN" CONSENT_NOT_GIVEN = "M_CONSENT_NOT_GIVEN"
CANNOT_LEAVE_SERVER_NOTICE_ROOM = "M_CANNOT_LEAVE_SERVER_NOTICE_ROOM" CANNOT_LEAVE_SERVER_NOTICE_ROOM = "M_CANNOT_LEAVE_SERVER_NOTICE_ROOM"
MAU_LIMIT_EXCEEDED = "M_MAU_LIMIT_EXCEEDED" MAU_LIMIT_EXCEEDED = "M_MAU_LIMIT_EXCEEDED"
UNSUPPORTED_ROOM_VERSION = "M_UNSUPPORTED_ROOM_VERSION"
INCOMPATIBLE_ROOM_VERSION = "M_INCOMPATIBLE_ROOM_VERSION"
class CodeMessageException(RuntimeError): class CodeMessageException(RuntimeError):
@ -285,6 +288,27 @@ class LimitExceededError(SynapseError):
) )
class IncompatibleRoomVersionError(SynapseError):
"""A server is trying to join a room whose version it does not support."""
def __init__(self, room_version):
super(IncompatibleRoomVersionError, self).__init__(
code=400,
msg="Your homeserver does not support the features required to "
"join this room",
errcode=Codes.INCOMPATIBLE_ROOM_VERSION,
)
self._room_version = room_version
def error_dict(self):
return cs_error(
self.msg,
self.errcode,
room_version=self._room_version,
)
def cs_error(msg, code=Codes.UNKNOWN, **kwargs): def cs_error(msg, code=Codes.UNKNOWN, **kwargs):
""" Utility method for constructing an error response for client-server """ Utility method for constructing an error response for client-server
interactions. interactions.

View file

@ -20,7 +20,7 @@ from signedjson.key import decode_verify_key_bytes
from signedjson.sign import SignatureVerifyException, verify_signed_json from signedjson.sign import SignatureVerifyException, verify_signed_json
from unpaddedbase64 import decode_base64 from unpaddedbase64 import decode_base64
from synapse.api.constants import EventTypes, JoinRules, Membership from synapse.api.constants import KNOWN_ROOM_VERSIONS, EventTypes, JoinRules, Membership
from synapse.api.errors import AuthError, EventSizeError, SynapseError from synapse.api.errors import AuthError, EventSizeError, SynapseError
from synapse.types import UserID, get_domain_from_id from synapse.types import UserID, get_domain_from_id
@ -83,6 +83,14 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True):
403, 403,
"Creation event's room_id domain does not match sender's" "Creation event's room_id domain does not match sender's"
) )
room_version = event.content.get("room_version", "1")
if room_version not in KNOWN_ROOM_VERSIONS:
raise AuthError(
403,
"room appears to have unsupported version %s" % (
room_version,
))
# FIXME # FIXME
logger.debug("Allowing! %s", event) logger.debug("Allowing! %s", event)
return return

View file

@ -25,7 +25,7 @@ from prometheus_client import Counter
from twisted.internet import defer from twisted.internet import defer
from synapse.api.constants import Membership from synapse.api.constants import KNOWN_ROOM_VERSIONS, EventTypes, Membership
from synapse.api.errors import ( from synapse.api.errors import (
CodeMessageException, CodeMessageException,
FederationDeniedError, FederationDeniedError,
@ -518,10 +518,10 @@ class FederationClient(FederationBase):
description, destination, exc_info=1, description, destination, exc_info=1,
) )
raise RuntimeError("Failed to %s via any server", description) raise RuntimeError("Failed to %s via any server" % (description, ))
def make_membership_event(self, destinations, room_id, user_id, membership, def make_membership_event(self, destinations, room_id, user_id, membership,
content={},): content, params):
""" """
Creates an m.room.member event, with context, without participating in the room. Creates an m.room.member event, with context, without participating in the room.
@ -537,8 +537,10 @@ class FederationClient(FederationBase):
user_id (str): The user whose membership is being evented. user_id (str): The user whose membership is being evented.
membership (str): The "membership" property of the event. Must be membership (str): The "membership" property of the event. Must be
one of "join" or "leave". one of "join" or "leave".
content (object): Any additional data to put into the content field content (dict): Any additional data to put into the content field
of the event. of the event.
params (dict[str, str|Iterable[str]]): Query parameters to include in the
request.
Return: Return:
Deferred: resolves to a tuple of (origin (str), event (object)) Deferred: resolves to a tuple of (origin (str), event (object))
where origin is the remote homeserver which generated the event. where origin is the remote homeserver which generated the event.
@ -558,10 +560,12 @@ class FederationClient(FederationBase):
@defer.inlineCallbacks @defer.inlineCallbacks
def send_request(destination): def send_request(destination):
ret = yield self.transport_layer.make_membership_event( ret = yield self.transport_layer.make_membership_event(
destination, room_id, user_id, membership destination, room_id, user_id, membership, params,
) )
pdu_dict = ret["event"] pdu_dict = ret.get("event", None)
if not isinstance(pdu_dict, dict):
raise InvalidResponseError("Bad 'event' field in response")
logger.debug("Got response to make_%s: %s", membership, pdu_dict) logger.debug("Got response to make_%s: %s", membership, pdu_dict)
@ -605,6 +609,26 @@ class FederationClient(FederationBase):
Fails with a ``RuntimeError`` if no servers were reachable. Fails with a ``RuntimeError`` if no servers were reachable.
""" """
def check_authchain_validity(signed_auth_chain):
for e in signed_auth_chain:
if e.type == EventTypes.Create:
create_event = e
break
else:
raise InvalidResponseError(
"no %s in auth chain" % (EventTypes.Create,),
)
# the room version should be sane.
room_version = create_event.content.get("room_version", "1")
if room_version not in KNOWN_ROOM_VERSIONS:
# This shouldn't be possible, because the remote server should have
# rejected the join attempt during make_join.
raise InvalidResponseError(
"room appears to have unsupported version %s" % (
room_version,
))
@defer.inlineCallbacks @defer.inlineCallbacks
def send_request(destination): def send_request(destination):
time_now = self._clock.time_msec() time_now = self._clock.time_msec()
@ -661,7 +685,7 @@ class FederationClient(FederationBase):
for s in signed_state: for s in signed_state:
s.internal_metadata = copy.deepcopy(s.internal_metadata) s.internal_metadata = copy.deepcopy(s.internal_metadata)
auth_chain.sort(key=lambda e: e.depth) check_authchain_validity(signed_auth)
defer.returnValue({ defer.returnValue({
"state": signed_state, "state": signed_state,

View file

@ -27,7 +27,13 @@ from twisted.internet.abstract import isIPAddress
from twisted.python import failure from twisted.python import failure
from synapse.api.constants import EventTypes from synapse.api.constants import EventTypes
from synapse.api.errors import AuthError, FederationError, NotFoundError, SynapseError from synapse.api.errors import (
AuthError,
FederationError,
IncompatibleRoomVersionError,
NotFoundError,
SynapseError,
)
from synapse.crypto.event_signing import compute_event_signature from synapse.crypto.event_signing import compute_event_signature
from synapse.federation.federation_base import FederationBase, event_from_pdu_json from synapse.federation.federation_base import FederationBase, event_from_pdu_json
from synapse.federation.persistence import TransactionActions from synapse.federation.persistence import TransactionActions
@ -323,12 +329,21 @@ class FederationServer(FederationBase):
defer.returnValue((200, resp)) defer.returnValue((200, resp))
@defer.inlineCallbacks @defer.inlineCallbacks
def on_make_join_request(self, origin, room_id, user_id): def on_make_join_request(self, origin, room_id, user_id, supported_versions):
origin_host, _ = parse_server_name(origin) origin_host, _ = parse_server_name(origin)
yield self.check_server_matches_acl(origin_host, room_id) yield self.check_server_matches_acl(origin_host, room_id)
room_version = yield self.store.get_room_version(room_id)
if room_version not in supported_versions:
logger.warn("Room version %s not in %s", room_version, supported_versions)
raise IncompatibleRoomVersionError(room_version=room_version)
pdu = yield self.handler.on_make_join_request(room_id, user_id) pdu = yield self.handler.on_make_join_request(room_id, user_id)
time_now = self._clock.time_msec() time_now = self._clock.time_msec()
defer.returnValue({"event": pdu.get_pdu_json(time_now)}) defer.returnValue({
"event": pdu.get_pdu_json(time_now),
"room_version": room_version,
})
@defer.inlineCallbacks @defer.inlineCallbacks
def on_invite_request(self, origin, content): def on_invite_request(self, origin, content):

View file

@ -26,6 +26,8 @@ from synapse.api.errors import FederationDeniedError, HttpResponseException
from synapse.handlers.presence import format_user_presence_state, get_interested_remotes from synapse.handlers.presence import format_user_presence_state, get_interested_remotes
from synapse.metrics import ( from synapse.metrics import (
LaterGauge, LaterGauge,
event_processing_loop_counter,
event_processing_loop_room_count,
events_processed_counter, events_processed_counter,
sent_edus_counter, sent_edus_counter,
sent_transactions_counter, sent_transactions_counter,
@ -253,7 +255,13 @@ class TransactionQueue(object):
synapse.metrics.event_processing_last_ts.labels( synapse.metrics.event_processing_last_ts.labels(
"federation_sender").set(ts) "federation_sender").set(ts)
events_processed_counter.inc(len(events)) events_processed_counter.inc(len(events))
event_processing_loop_room_count.labels(
"federation_sender"
).inc(len(events_by_room))
event_processing_loop_counter.labels("federation_sender").inc()
synapse.metrics.event_processing_positions.labels( synapse.metrics.event_processing_positions.labels(
"federation_sender").set(next_token) "federation_sender").set(next_token)

View file

@ -195,7 +195,7 @@ class TransportLayerClient(object):
@defer.inlineCallbacks @defer.inlineCallbacks
@log_function @log_function
def make_membership_event(self, destination, room_id, user_id, membership): def make_membership_event(self, destination, room_id, user_id, membership, params):
"""Asks a remote server to build and sign us a membership event """Asks a remote server to build and sign us a membership event
Note that this does not append any events to any graphs. Note that this does not append any events to any graphs.
@ -205,6 +205,8 @@ class TransportLayerClient(object):
room_id (str): room to join/leave room_id (str): room to join/leave
user_id (str): user to be joined/left user_id (str): user to be joined/left
membership (str): one of join/leave membership (str): one of join/leave
params (dict[str, str|Iterable[str]]): Query parameters to include in the
request.
Returns: Returns:
Deferred: Succeeds when we get a 2xx HTTP response. The result Deferred: Succeeds when we get a 2xx HTTP response. The result
@ -241,6 +243,7 @@ class TransportLayerClient(object):
content = yield self.client.get_json( content = yield self.client.get_json(
destination=destination, destination=destination,
path=path, path=path,
args=params,
retry_on_dns_fail=retry_on_dns_fail, retry_on_dns_fail=retry_on_dns_fail,
timeout=20000, timeout=20000,
ignore_backoff=ignore_backoff, ignore_backoff=ignore_backoff,

View file

@ -190,6 +190,41 @@ def _parse_auth_header(header_bytes):
class BaseFederationServlet(object): class BaseFederationServlet(object):
"""Abstract base class for federation servlet classes.
The servlet object should have a PATH attribute which takes the form of a regexp to
match against the request path (excluding the /federation/v1 prefix).
The servlet should also implement one or more of on_GET, on_POST, on_PUT, to match
the appropriate HTTP method. These methods have the signature:
on_<METHOD>(self, origin, content, query, **kwargs)
With arguments:
origin (unicode|None): The authenticated server_name of the calling server,
unless REQUIRE_AUTH is set to False and authentication failed.
content (unicode|None): decoded json body of the request. None if the
request was a GET.
query (dict[bytes, list[bytes]]): Query params from the request. url-decoded
(ie, '+' and '%xx' are decoded) but note that it is *not* utf8-decoded
yet.
**kwargs (dict[unicode, unicode]): the dict mapping keys to path
components as specified in the path match regexp.
Returns:
Deferred[(int, object)|None]: either (response code, response object) to
return a JSON response, or None if the request has already been handled.
Raises:
SynapseError: to return an error code
Exception: other exceptions will be caught, logged, and a 500 will be
returned.
"""
REQUIRE_AUTH = True REQUIRE_AUTH = True
def __init__(self, handler, authenticator, ratelimiter, server_name): def __init__(self, handler, authenticator, ratelimiter, server_name):
@ -204,6 +239,18 @@ class BaseFederationServlet(object):
@defer.inlineCallbacks @defer.inlineCallbacks
@functools.wraps(func) @functools.wraps(func)
def new_func(request, *args, **kwargs): def new_func(request, *args, **kwargs):
""" A callback which can be passed to HttpServer.RegisterPaths
Args:
request (twisted.web.http.Request):
*args: unused?
**kwargs (dict[unicode, unicode]): the dict mapping keys to path
components as specified in the path match regexp.
Returns:
Deferred[(int, object)|None]: (response code, response object) as returned
by the callback method. None if the request has already been handled.
"""
content = None content = None
if request.method in ["PUT", "POST"]: if request.method in ["PUT", "POST"]:
# TODO: Handle other method types? other content types? # TODO: Handle other method types? other content types?
@ -384,9 +431,31 @@ class FederationMakeJoinServlet(BaseFederationServlet):
PATH = "/make_join/(?P<context>[^/]*)/(?P<user_id>[^/]*)" PATH = "/make_join/(?P<context>[^/]*)/(?P<user_id>[^/]*)"
@defer.inlineCallbacks @defer.inlineCallbacks
def on_GET(self, origin, content, query, context, user_id): def on_GET(self, origin, _content, query, context, user_id):
"""
Args:
origin (unicode): The authenticated server_name of the calling server
_content (None): (GETs don't have bodies)
query (dict[bytes, list[bytes]]): Query params from the request.
**kwargs (dict[unicode, unicode]): the dict mapping keys to path
components as specified in the path match regexp.
Returns:
Deferred[(int, object)|None]: either (response code, response object) to
return a JSON response, or None if the request has already been handled.
"""
versions = query.get(b'ver')
if versions is not None:
supported_versions = [v.decode("utf-8") for v in versions]
else:
supported_versions = ["1"]
content = yield self.handler.on_make_join_request( content = yield self.handler.on_make_join_request(
origin, context, user_id, origin, context, user_id,
supported_versions=supported_versions,
) )
defer.returnValue((200, content)) defer.returnValue((200, content))

View file

@ -23,6 +23,10 @@ from twisted.internet import defer
import synapse import synapse
from synapse.api.constants import EventTypes from synapse.api.constants import EventTypes
from synapse.metrics import (
event_processing_loop_counter,
event_processing_loop_room_count,
)
from synapse.metrics.background_process_metrics import run_as_background_process from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.logcontext import make_deferred_yieldable, run_in_background from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.util.metrics import Measure from synapse.util.metrics import Measure
@ -136,6 +140,12 @@ class ApplicationServicesHandler(object):
events_processed_counter.inc(len(events)) events_processed_counter.inc(len(events))
event_processing_loop_room_count.labels(
"appservice_sender"
).inc(len(events_by_room))
event_processing_loop_counter.labels("appservice_sender").inc()
synapse.metrics.event_processing_lag.labels( synapse.metrics.event_processing_lag.labels(
"appservice_sender").set(now - ts) "appservice_sender").set(now - ts)
synapse.metrics.event_processing_last_ts.labels( synapse.metrics.event_processing_last_ts.labels(

View file

@ -30,7 +30,12 @@ from unpaddedbase64 import decode_base64
from twisted.internet import defer from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership, RejectedReason from synapse.api.constants import (
KNOWN_ROOM_VERSIONS,
EventTypes,
Membership,
RejectedReason,
)
from synapse.api.errors import ( from synapse.api.errors import (
AuthError, AuthError,
CodeMessageException, CodeMessageException,
@ -922,6 +927,9 @@ class FederationHandler(BaseHandler):
joinee, joinee,
"join", "join",
content, content,
params={
"ver": KNOWN_ROOM_VERSIONS,
},
) )
# This shouldn't happen, because the RoomMemberHandler has a # This shouldn't happen, because the RoomMemberHandler has a
@ -1187,13 +1195,14 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks @defer.inlineCallbacks
def _make_and_verify_event(self, target_hosts, room_id, user_id, membership, def _make_and_verify_event(self, target_hosts, room_id, user_id, membership,
content={},): content={}, params=None):
origin, pdu = yield self.federation_client.make_membership_event( origin, pdu = yield self.federation_client.make_membership_event(
target_hosts, target_hosts,
room_id, room_id,
user_id, user_id,
membership, membership,
content, content,
params=params,
) )
logger.debug("Got response to make_%s: %s", membership, pdu) logger.debug("Got response to make_%s: %s", membership, pdu)

View file

@ -30,7 +30,7 @@ from synapse.api.urls import ConsentURIBuilder
from synapse.crypto.event_signing import add_hashes_and_signatures from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.events.utils import serialize_event from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator from synapse.events.validator import EventValidator
from synapse.replication.http.send_event import send_event_to_master from synapse.replication.http.send_event import ReplicationSendEventRestServlet
from synapse.types import RoomAlias, UserID from synapse.types import RoomAlias, UserID
from synapse.util.async import Linearizer from synapse.util.async import Linearizer
from synapse.util.frozenutils import frozendict_json_encoder from synapse.util.frozenutils import frozendict_json_encoder
@ -171,7 +171,7 @@ class EventCreationHandler(object):
self.notifier = hs.get_notifier() self.notifier = hs.get_notifier()
self.config = hs.config self.config = hs.config
self.http_client = hs.get_simple_http_client() self.send_event_to_master = ReplicationSendEventRestServlet.make_client(hs)
# This is only used to get at ratelimit function, and maybe_kick_guest_users # This is only used to get at ratelimit function, and maybe_kick_guest_users
self.base_handler = BaseHandler(hs) self.base_handler = BaseHandler(hs)
@ -559,12 +559,9 @@ class EventCreationHandler(object):
try: try:
# If we're a worker we need to hit out to the master. # If we're a worker we need to hit out to the master.
if self.config.worker_app: if self.config.worker_app:
yield send_event_to_master( yield self.send_event_to_master(
clock=self.hs.get_clock(), event_id=event.event_id,
store=self.store, store=self.store,
client=self.http_client,
host=self.config.worker_replication_host,
port=self.config.worker_replication_http_port,
requester=requester, requester=requester,
event=event, event=event,
context=context, context=context,

View file

@ -21,9 +21,17 @@ import math
import string import string
from collections import OrderedDict from collections import OrderedDict
from six import string_types
from twisted.internet import defer from twisted.internet import defer
from synapse.api.constants import EventTypes, JoinRules, RoomCreationPreset from synapse.api.constants import (
DEFAULT_ROOM_VERSION,
KNOWN_ROOM_VERSIONS,
EventTypes,
JoinRules,
RoomCreationPreset,
)
from synapse.api.errors import AuthError, Codes, StoreError, SynapseError from synapse.api.errors import AuthError, Codes, StoreError, SynapseError
from synapse.types import RoomAlias, RoomID, RoomStreamToken, StreamToken, UserID from synapse.types import RoomAlias, RoomID, RoomStreamToken, StreamToken, UserID
from synapse.util import stringutils from synapse.util import stringutils
@ -99,6 +107,21 @@ class RoomCreationHandler(BaseHandler):
if ratelimit: if ratelimit:
yield self.ratelimit(requester) yield self.ratelimit(requester)
room_version = config.get("room_version", DEFAULT_ROOM_VERSION)
if not isinstance(room_version, string_types):
raise SynapseError(
400,
"room_version must be a string",
Codes.BAD_JSON,
)
if room_version not in KNOWN_ROOM_VERSIONS:
raise SynapseError(
400,
"Your homeserver does not support this room version",
Codes.UNSUPPORTED_ROOM_VERSION,
)
if "room_alias_name" in config: if "room_alias_name" in config:
for wchar in string.whitespace: for wchar in string.whitespace:
if wchar in config["room_alias_name"]: if wchar in config["room_alias_name"]:
@ -184,6 +207,9 @@ class RoomCreationHandler(BaseHandler):
creation_content = config.get("creation_content", {}) creation_content = config.get("creation_content", {})
# override any attempt to set room versions via the creation_content
creation_content["room_version"] = room_version
room_member_handler = self.hs.get_room_member_handler() room_member_handler = self.hs.get_room_member_handler()
yield self._send_events_for_new_room( yield self._send_events_for_new_room(

View file

@ -20,16 +20,24 @@ from twisted.internet import defer
from synapse.api.errors import SynapseError from synapse.api.errors import SynapseError
from synapse.handlers.room_member import RoomMemberHandler from synapse.handlers.room_member import RoomMemberHandler
from synapse.replication.http.membership import ( from synapse.replication.http.membership import (
get_or_register_3pid_guest, ReplicationRegister3PIDGuestRestServlet as Repl3PID,
notify_user_membership_change, ReplicationRemoteJoinRestServlet as ReplRemoteJoin,
remote_join, ReplicationRemoteRejectInviteRestServlet as ReplRejectInvite,
remote_reject_invite, ReplicationUserJoinedLeftRoomRestServlet as ReplJoinedLeft,
) )
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class RoomMemberWorkerHandler(RoomMemberHandler): class RoomMemberWorkerHandler(RoomMemberHandler):
def __init__(self, hs):
super(RoomMemberWorkerHandler, self).__init__(hs)
self._get_register_3pid_client = Repl3PID.make_client(hs)
self._remote_join_client = ReplRemoteJoin.make_client(hs)
self._remote_reject_client = ReplRejectInvite.make_client(hs)
self._notify_change_client = ReplJoinedLeft.make_client(hs)
@defer.inlineCallbacks @defer.inlineCallbacks
def _remote_join(self, requester, remote_room_hosts, room_id, user, content): def _remote_join(self, requester, remote_room_hosts, room_id, user, content):
"""Implements RoomMemberHandler._remote_join """Implements RoomMemberHandler._remote_join
@ -37,10 +45,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
if len(remote_room_hosts) == 0: if len(remote_room_hosts) == 0:
raise SynapseError(404, "No known servers") raise SynapseError(404, "No known servers")
ret = yield remote_join( ret = yield self._remote_join_client(
self.simple_http_client,
host=self.config.worker_replication_host,
port=self.config.worker_replication_http_port,
requester=requester, requester=requester,
remote_room_hosts=remote_room_hosts, remote_room_hosts=remote_room_hosts,
room_id=room_id, room_id=room_id,
@ -55,10 +60,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target): def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target):
"""Implements RoomMemberHandler._remote_reject_invite """Implements RoomMemberHandler._remote_reject_invite
""" """
return remote_reject_invite( return self._remote_reject_client(
self.simple_http_client,
host=self.config.worker_replication_host,
port=self.config.worker_replication_http_port,
requester=requester, requester=requester,
remote_room_hosts=remote_room_hosts, remote_room_hosts=remote_room_hosts,
room_id=room_id, room_id=room_id,
@ -68,10 +70,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
def _user_joined_room(self, target, room_id): def _user_joined_room(self, target, room_id):
"""Implements RoomMemberHandler._user_joined_room """Implements RoomMemberHandler._user_joined_room
""" """
return notify_user_membership_change( return self._notify_change_client(
self.simple_http_client,
host=self.config.worker_replication_host,
port=self.config.worker_replication_http_port,
user_id=target.to_string(), user_id=target.to_string(),
room_id=room_id, room_id=room_id,
change="joined", change="joined",
@ -80,10 +79,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
def _user_left_room(self, target, room_id): def _user_left_room(self, target, room_id):
"""Implements RoomMemberHandler._user_left_room """Implements RoomMemberHandler._user_left_room
""" """
return notify_user_membership_change( return self._notify_change_client(
self.simple_http_client,
host=self.config.worker_replication_host,
port=self.config.worker_replication_http_port,
user_id=target.to_string(), user_id=target.to_string(),
room_id=room_id, room_id=room_id,
change="left", change="left",
@ -92,10 +88,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
def get_or_register_3pid_guest(self, requester, medium, address, inviter_user_id): def get_or_register_3pid_guest(self, requester, medium, address, inviter_user_id):
"""Implements RoomMemberHandler.get_or_register_3pid_guest """Implements RoomMemberHandler.get_or_register_3pid_guest
""" """
return get_or_register_3pid_guest( return self._get_register_3pid_client(
self.simple_http_client,
host=self.config.worker_replication_host,
port=self.config.worker_replication_http_port,
requester=requester, requester=requester,
medium=medium, medium=medium,
address=address, address=address,

View file

@ -439,7 +439,7 @@ class MatrixFederationHttpClient(object):
defer.returnValue(json.loads(body)) defer.returnValue(json.loads(body))
@defer.inlineCallbacks @defer.inlineCallbacks
def get_json(self, destination, path, args={}, retry_on_dns_fail=True, def get_json(self, destination, path, args=None, retry_on_dns_fail=True,
timeout=None, ignore_backoff=False): timeout=None, ignore_backoff=False):
""" GETs some json from the given host homeserver and path """ GETs some json from the given host homeserver and path
@ -447,7 +447,7 @@ class MatrixFederationHttpClient(object):
destination (str): The remote server to send the HTTP request destination (str): The remote server to send the HTTP request
to. to.
path (str): The HTTP path. path (str): The HTTP path.
args (dict): A dictionary used to create query strings, defaults to args (dict|None): A dictionary used to create query strings, defaults to
None. None.
timeout (int): How long to try (in ms) the destination for before timeout (int): How long to try (in ms) the destination for before
giving up. None indicates no timeout and that the request will giving up. None indicates no timeout and that the request will
@ -702,6 +702,9 @@ def check_content_type_is_json(headers):
def encode_query_args(args): def encode_query_args(args):
if args is None:
return b""
encoded_args = {} encoded_args = {}
for k, vs in args.items(): for k, vs in args.items():
if isinstance(vs, string_types): if isinstance(vs, string_types):

View file

@ -174,6 +174,19 @@ sent_transactions_counter = Counter("synapse_federation_client_sent_transactions
events_processed_counter = Counter("synapse_federation_client_events_processed", "") events_processed_counter = Counter("synapse_federation_client_events_processed", "")
event_processing_loop_counter = Counter(
"synapse_event_processing_loop_count",
"Event processing loop iterations",
["name"],
)
event_processing_loop_room_count = Counter(
"synapse_event_processing_loop_room_count",
"Rooms seen per event processing loop iteration",
["name"],
)
# Used to track where various components have processed in the event stream, # Used to track where various components have processed in the event stream,
# e.g. federation sending, appservice sending, etc. # e.g. federation sending, appservice sending, etc.
event_processing_positions = Gauge("synapse_event_processing_positions", "", ["name"]) event_processing_positions = Gauge("synapse_event_processing_positions", "", ["name"])

View file

@ -0,0 +1,215 @@
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import logging
import re
from six.moves import urllib
from twisted.internet import defer
from synapse.api.errors import CodeMessageException, HttpResponseException
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.stringutils import random_string
logger = logging.getLogger(__name__)
class ReplicationEndpoint(object):
"""Helper base class for defining new replication HTTP endpoints.
This creates an endpoint under `/_synapse/replication/:NAME/:PATH_ARGS..`
(with an `/:txn_id` prefix for cached requests.), where NAME is a name,
PATH_ARGS are a tuple of parameters to be encoded in the URL.
For example, if `NAME` is "send_event" and `PATH_ARGS` is `("event_id",)`,
with `CACHE` set to true then this generates an endpoint:
/_synapse/replication/send_event/:event_id/:txn_id
For POST/PUT requests the payload is serialized to json and sent as the
body, while for GET requests the payload is added as query parameters. See
`_serialize_payload` for details.
Incoming requests are handled by overriding `_handle_request`. Servers
must call `register` to register the path with the HTTP server.
Requests can be sent by calling the client returned by `make_client`.
Attributes:
NAME (str): A name for the endpoint, added to the path as well as used
in logging and metrics.
PATH_ARGS (tuple[str]): A list of parameters to be added to the path.
Adding parameters to the path (rather than payload) can make it
easier to follow along in the log files.
METHOD (str): The method of the HTTP request, defaults to POST. Can be
one of POST, PUT or GET. If GET then the payload is sent as query
parameters rather than a JSON body.
CACHE (bool): Whether server should cache the result of the request/
If true then transparently adds a txn_id to all requests, and
`_handle_request` must return a Deferred.
RETRY_ON_TIMEOUT(bool): Whether or not to retry the request when a 504
is received.
"""
__metaclass__ = abc.ABCMeta
NAME = abc.abstractproperty()
PATH_ARGS = abc.abstractproperty()
METHOD = "POST"
CACHE = True
RETRY_ON_TIMEOUT = True
def __init__(self, hs):
if self.CACHE:
self.response_cache = ResponseCache(
hs, "repl." + self.NAME,
timeout_ms=30 * 60 * 1000,
)
assert self.METHOD in ("PUT", "POST", "GET")
@abc.abstractmethod
def _serialize_payload(**kwargs):
"""Static method that is called when creating a request.
Concrete implementations should have explicit parameters (rather than
kwargs) so that an appropriate exception is raised if the client is
called with unexpected parameters. All PATH_ARGS must appear in
argument list.
Returns:
Deferred[dict]|dict: If POST/PUT request then dictionary must be
JSON serialisable, otherwise must be appropriate for adding as
query args.
"""
return {}
@abc.abstractmethod
def _handle_request(self, request, **kwargs):
"""Handle incoming request.
This is called with the request object and PATH_ARGS.
Returns:
Deferred[dict]: A JSON serialisable dict to be used as response
body of request.
"""
pass
@classmethod
def make_client(cls, hs):
"""Create a client that makes requests.
Returns a callable that accepts the same parameters as `_serialize_payload`.
"""
clock = hs.get_clock()
host = hs.config.worker_replication_host
port = hs.config.worker_replication_http_port
client = hs.get_simple_http_client()
@defer.inlineCallbacks
def send_request(**kwargs):
data = yield cls._serialize_payload(**kwargs)
url_args = [urllib.parse.quote(kwargs[name]) for name in cls.PATH_ARGS]
if cls.CACHE:
txn_id = random_string(10)
url_args.append(txn_id)
if cls.METHOD == "POST":
request_func = client.post_json_get_json
elif cls.METHOD == "PUT":
request_func = client.put_json
elif cls.METHOD == "GET":
request_func = client.get_json
else:
# We have already asserted in the constructor that a
# compatible was picked, but lets be paranoid.
raise Exception(
"Unknown METHOD on %s replication endpoint" % (cls.NAME,)
)
uri = "http://%s:%s/_synapse/replication/%s/%s" % (
host, port, cls.NAME, "/".join(url_args)
)
try:
# We keep retrying the same request for timeouts. This is so that we
# have a good idea that the request has either succeeded or failed on
# the master, and so whether we should clean up or not.
while True:
try:
result = yield request_func(uri, data)
break
except CodeMessageException as e:
if e.code != 504 or not cls.RETRY_ON_TIMEOUT:
raise
logger.warn("%s request timed out", cls.NAME)
# If we timed out we probably don't need to worry about backing
# off too much, but lets just wait a little anyway.
yield clock.sleep(1)
except HttpResponseException as e:
# We convert to SynapseError as we know that it was a SynapseError
# on the master process that we should send to the client. (And
# importantly, not stack traces everywhere)
raise e.to_synapse_error()
defer.returnValue(result)
return send_request
def register(self, http_server):
"""Called by the server to register this as a handler to the
appropriate path.
"""
url_args = list(self.PATH_ARGS)
handler = self._handle_request
method = self.METHOD
if self.CACHE:
handler = self._cached_handler
url_args.append("txn_id")
args = "/".join("(?P<%s>[^/]+)" % (arg,) for arg in url_args)
pattern = re.compile("^/_synapse/replication/%s/%s$" % (
self.NAME,
args
))
http_server.register_paths(method, [pattern], handler)
def _cached_handler(self, request, txn_id, **kwargs):
"""Called on new incoming requests when caching is enabled. Checks
if there is a cached response for the request and returns that,
otherwise calls `_handle_request` and caches its response.
"""
# We just use the txn_id here, but we probably also want to use the
# other PATH_ARGS as well.
assert self.CACHE
return self.response_cache.wrap(
txn_id,
self._handle_request,
request, **kwargs
)

View file

@ -14,182 +14,63 @@
# limitations under the License. # limitations under the License.
import logging import logging
import re
from twisted.internet import defer from twisted.internet import defer
from synapse.api.errors import HttpResponseException from synapse.http.servlet import parse_json_object_from_request
from synapse.http.servlet import RestServlet, parse_json_object_from_request from synapse.replication.http._base import ReplicationEndpoint
from synapse.types import Requester, UserID from synapse.types import Requester, UserID
from synapse.util.distributor import user_joined_room, user_left_room from synapse.util.distributor import user_joined_room, user_left_room
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@defer.inlineCallbacks class ReplicationRemoteJoinRestServlet(ReplicationEndpoint):
def remote_join(client, host, port, requester, remote_room_hosts, """Does a remote join for the given user to the given room
room_id, user_id, content):
"""Ask the master to do a remote join for the given user to the given room
Args: Request format:
client (SimpleHttpClient)
host (str): host of master
port (int): port on master listening for HTTP replication
requester (Requester)
remote_room_hosts (list[str]): Servers to try and join via
room_id (str)
user_id (str)
content (dict): The event content to use for the join event
Returns: POST /_synapse/replication/remote_join/:room_id/:user_id
Deferred
"""
uri = "http://%s:%s/_synapse/replication/remote_join" % (host, port)
payload = { {
"requester": requester.serialize(), "requester": ...,
"remote_room_hosts": remote_room_hosts, "remote_room_hosts": [...],
"room_id": room_id, "content": { ... }
"user_id": user_id, }
"content": content,
}
try:
result = yield client.post_json_get_json(uri, payload)
except HttpResponseException as e:
# We convert to SynapseError as we know that it was a SynapseError
# on the master process that we should send to the client. (And
# importantly, not stack traces everywhere)
raise e.to_synapse_error()
defer.returnValue(result)
@defer.inlineCallbacks
def remote_reject_invite(client, host, port, requester, remote_room_hosts,
room_id, user_id):
"""Ask master to reject the invite for the user and room.
Args:
client (SimpleHttpClient)
host (str): host of master
port (int): port on master listening for HTTP replication
requester (Requester)
remote_room_hosts (list[str]): Servers to try and reject via
room_id (str)
user_id (str)
Returns:
Deferred
"""
uri = "http://%s:%s/_synapse/replication/remote_reject_invite" % (host, port)
payload = {
"requester": requester.serialize(),
"remote_room_hosts": remote_room_hosts,
"room_id": room_id,
"user_id": user_id,
}
try:
result = yield client.post_json_get_json(uri, payload)
except HttpResponseException as e:
# We convert to SynapseError as we know that it was a SynapseError
# on the master process that we should send to the client. (And
# importantly, not stack traces everywhere)
raise e.to_synapse_error()
defer.returnValue(result)
@defer.inlineCallbacks
def get_or_register_3pid_guest(client, host, port, requester,
medium, address, inviter_user_id):
"""Ask the master to get/create a guest account for given 3PID.
Args:
client (SimpleHttpClient)
host (str): host of master
port (int): port on master listening for HTTP replication
requester (Requester)
medium (str)
address (str)
inviter_user_id (str): The user ID who is trying to invite the
3PID
Returns:
Deferred[(str, str)]: A 2-tuple of `(user_id, access_token)` of the
3PID guest account.
""" """
uri = "http://%s:%s/_synapse/replication/get_or_register_3pid_guest" % (host, port) NAME = "remote_join"
PATH_ARGS = ("room_id", "user_id",)
payload = {
"requester": requester.serialize(),
"medium": medium,
"address": address,
"inviter_user_id": inviter_user_id,
}
try:
result = yield client.post_json_get_json(uri, payload)
except HttpResponseException as e:
# We convert to SynapseError as we know that it was a SynapseError
# on the master process that we should send to the client. (And
# importantly, not stack traces everywhere)
raise e.to_synapse_error()
defer.returnValue(result)
@defer.inlineCallbacks
def notify_user_membership_change(client, host, port, user_id, room_id, change):
"""Notify master that a user has joined or left the room
Args:
client (SimpleHttpClient)
host (str): host of master
port (int): port on master listening for HTTP replication.
user_id (str)
room_id (str)
change (str): Either "join" or "left"
Returns:
Deferred
"""
assert change in ("joined", "left")
uri = "http://%s:%s/_synapse/replication/user_%s_room" % (host, port, change)
payload = {
"user_id": user_id,
"room_id": room_id,
}
try:
result = yield client.post_json_get_json(uri, payload)
except HttpResponseException as e:
# We convert to SynapseError as we know that it was a SynapseError
# on the master process that we should send to the client. (And
# importantly, not stack traces everywhere)
raise e.to_synapse_error()
defer.returnValue(result)
class ReplicationRemoteJoinRestServlet(RestServlet):
PATTERNS = [re.compile("^/_synapse/replication/remote_join$")]
def __init__(self, hs): def __init__(self, hs):
super(ReplicationRemoteJoinRestServlet, self).__init__() super(ReplicationRemoteJoinRestServlet, self).__init__(hs)
self.federation_handler = hs.get_handlers().federation_handler self.federation_handler = hs.get_handlers().federation_handler
self.store = hs.get_datastore() self.store = hs.get_datastore()
self.clock = hs.get_clock() self.clock = hs.get_clock()
@staticmethod
def _serialize_payload(requester, room_id, user_id, remote_room_hosts,
content):
"""
Args:
requester(Requester)
room_id (str)
user_id (str)
remote_room_hosts (list[str]): Servers to try and join via
content(dict): The event content to use for the join event
"""
return {
"requester": requester.serialize(),
"remote_room_hosts": remote_room_hosts,
"content": content,
}
@defer.inlineCallbacks @defer.inlineCallbacks
def on_POST(self, request): def _handle_request(self, request, room_id, user_id):
content = parse_json_object_from_request(request) content = parse_json_object_from_request(request)
remote_room_hosts = content["remote_room_hosts"] remote_room_hosts = content["remote_room_hosts"]
room_id = content["room_id"]
user_id = content["user_id"]
event_content = content["content"] event_content = content["content"]
requester = Requester.deserialize(self.store, content["requester"]) requester = Requester.deserialize(self.store, content["requester"])
@ -212,23 +93,48 @@ class ReplicationRemoteJoinRestServlet(RestServlet):
defer.returnValue((200, {})) defer.returnValue((200, {}))
class ReplicationRemoteRejectInviteRestServlet(RestServlet): class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
PATTERNS = [re.compile("^/_synapse/replication/remote_reject_invite$")] """Rejects the invite for the user and room.
Request format:
POST /_synapse/replication/remote_reject_invite/:room_id/:user_id
{
"requester": ...,
"remote_room_hosts": [...],
}
"""
NAME = "remote_reject_invite"
PATH_ARGS = ("room_id", "user_id",)
def __init__(self, hs): def __init__(self, hs):
super(ReplicationRemoteRejectInviteRestServlet, self).__init__() super(ReplicationRemoteRejectInviteRestServlet, self).__init__(hs)
self.federation_handler = hs.get_handlers().federation_handler self.federation_handler = hs.get_handlers().federation_handler
self.store = hs.get_datastore() self.store = hs.get_datastore()
self.clock = hs.get_clock() self.clock = hs.get_clock()
@staticmethod
def _serialize_payload(requester, room_id, user_id, remote_room_hosts):
"""
Args:
requester(Requester)
room_id (str)
user_id (str)
remote_room_hosts (list[str]): Servers to try and reject via
"""
return {
"requester": requester.serialize(),
"remote_room_hosts": remote_room_hosts,
}
@defer.inlineCallbacks @defer.inlineCallbacks
def on_POST(self, request): def _handle_request(self, request, room_id, user_id):
content = parse_json_object_from_request(request) content = parse_json_object_from_request(request)
remote_room_hosts = content["remote_room_hosts"] remote_room_hosts = content["remote_room_hosts"]
room_id = content["room_id"]
user_id = content["user_id"]
requester = Requester.deserialize(self.store, content["requester"]) requester = Requester.deserialize(self.store, content["requester"])
@ -264,18 +170,50 @@ class ReplicationRemoteRejectInviteRestServlet(RestServlet):
defer.returnValue((200, ret)) defer.returnValue((200, ret))
class ReplicationRegister3PIDGuestRestServlet(RestServlet): class ReplicationRegister3PIDGuestRestServlet(ReplicationEndpoint):
PATTERNS = [re.compile("^/_synapse/replication/get_or_register_3pid_guest$")] """Gets/creates a guest account for given 3PID.
Request format:
POST /_synapse/replication/get_or_register_3pid_guest/
{
"requester": ...,
"medium": ...,
"address": ...,
"inviter_user_id": ...
}
"""
NAME = "get_or_register_3pid_guest"
PATH_ARGS = ()
def __init__(self, hs): def __init__(self, hs):
super(ReplicationRegister3PIDGuestRestServlet, self).__init__() super(ReplicationRegister3PIDGuestRestServlet, self).__init__(hs)
self.registeration_handler = hs.get_handlers().registration_handler self.registeration_handler = hs.get_handlers().registration_handler
self.store = hs.get_datastore() self.store = hs.get_datastore()
self.clock = hs.get_clock() self.clock = hs.get_clock()
@staticmethod
def _serialize_payload(requester, medium, address, inviter_user_id):
"""
Args:
requester(Requester)
medium (str)
address (str)
inviter_user_id (str): The user ID who is trying to invite the
3PID
"""
return {
"requester": requester.serialize(),
"medium": medium,
"address": address,
"inviter_user_id": inviter_user_id,
}
@defer.inlineCallbacks @defer.inlineCallbacks
def on_POST(self, request): def _handle_request(self, request):
content = parse_json_object_from_request(request) content = parse_json_object_from_request(request)
medium = content["medium"] medium = content["medium"]
@ -296,23 +234,41 @@ class ReplicationRegister3PIDGuestRestServlet(RestServlet):
defer.returnValue((200, ret)) defer.returnValue((200, ret))
class ReplicationUserJoinedLeftRoomRestServlet(RestServlet): class ReplicationUserJoinedLeftRoomRestServlet(ReplicationEndpoint):
PATTERNS = [re.compile("^/_synapse/replication/user_(?P<change>joined|left)_room$")] """Notifies that a user has joined or left the room
Request format:
POST /_synapse/replication/membership_change/:room_id/:user_id/:change
{}
"""
NAME = "membership_change"
PATH_ARGS = ("room_id", "user_id", "change")
CACHE = False # No point caching as should return instantly.
def __init__(self, hs): def __init__(self, hs):
super(ReplicationUserJoinedLeftRoomRestServlet, self).__init__() super(ReplicationUserJoinedLeftRoomRestServlet, self).__init__(hs)
self.registeration_handler = hs.get_handlers().registration_handler self.registeration_handler = hs.get_handlers().registration_handler
self.store = hs.get_datastore() self.store = hs.get_datastore()
self.clock = hs.get_clock() self.clock = hs.get_clock()
self.distributor = hs.get_distributor() self.distributor = hs.get_distributor()
def on_POST(self, request, change): @staticmethod
content = parse_json_object_from_request(request) def _serialize_payload(room_id, user_id, change):
"""
Args:
room_id (str)
user_id (str)
change (str): Either "joined" or "left"
"""
assert change in ("joined", "left",)
user_id = content["user_id"] return {}
room_id = content["room_id"]
def _handle_request(self, request, room_id, user_id, change):
logger.info("user membership change: %s in %s", user_id, room_id) logger.info("user membership change: %s in %s", user_id, room_id)
user = UserID.from_string(user_id) user = UserID.from_string(user_id)

View file

@ -14,86 +14,26 @@
# limitations under the License. # limitations under the License.
import logging import logging
import re
from twisted.internet import defer from twisted.internet import defer
from synapse.api.errors import CodeMessageException, HttpResponseException
from synapse.events import FrozenEvent from synapse.events import FrozenEvent
from synapse.events.snapshot import EventContext from synapse.events.snapshot import EventContext
from synapse.http.servlet import RestServlet, parse_json_object_from_request from synapse.http.servlet import parse_json_object_from_request
from synapse.replication.http._base import ReplicationEndpoint
from synapse.types import Requester, UserID from synapse.types import Requester, UserID
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.metrics import Measure from synapse.util.metrics import Measure
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@defer.inlineCallbacks class ReplicationSendEventRestServlet(ReplicationEndpoint):
def send_event_to_master(clock, store, client, host, port, requester, event, context,
ratelimit, extra_users):
"""Send event to be handled on the master
Args:
clock (synapse.util.Clock)
store (DataStore)
client (SimpleHttpClient)
host (str): host of master
port (int): port on master listening for HTTP replication
requester (Requester)
event (FrozenEvent)
context (EventContext)
ratelimit (bool)
extra_users (list(UserID)): Any extra users to notify about event
"""
uri = "http://%s:%s/_synapse/replication/send_event/%s" % (
host, port, event.event_id,
)
serialized_context = yield context.serialize(event, store)
payload = {
"event": event.get_pdu_json(),
"internal_metadata": event.internal_metadata.get_dict(),
"rejected_reason": event.rejected_reason,
"context": serialized_context,
"requester": requester.serialize(),
"ratelimit": ratelimit,
"extra_users": [u.to_string() for u in extra_users],
}
try:
# We keep retrying the same request for timeouts. This is so that we
# have a good idea that the request has either succeeded or failed on
# the master, and so whether we should clean up or not.
while True:
try:
result = yield client.put_json(uri, payload)
break
except CodeMessageException as e:
if e.code != 504:
raise
logger.warn("send_event request timed out")
# If we timed out we probably don't need to worry about backing
# off too much, but lets just wait a little anyway.
yield clock.sleep(1)
except HttpResponseException as e:
# We convert to SynapseError as we know that it was a SynapseError
# on the master process that we should send to the client. (And
# importantly, not stack traces everywhere)
raise e.to_synapse_error()
defer.returnValue(result)
class ReplicationSendEventRestServlet(RestServlet):
"""Handles events newly created on workers, including persisting and """Handles events newly created on workers, including persisting and
notifying. notifying.
The API looks like: The API looks like:
POST /_synapse/replication/send_event/:event_id POST /_synapse/replication/send_event/:event_id/:txn_id
{ {
"event": { .. serialized event .. }, "event": { .. serialized event .. },
@ -105,27 +45,47 @@ class ReplicationSendEventRestServlet(RestServlet):
"extra_users": [], "extra_users": [],
} }
""" """
PATTERNS = [re.compile("^/_synapse/replication/send_event/(?P<event_id>[^/]+)$")] NAME = "send_event"
PATH_ARGS = ("event_id",)
def __init__(self, hs): def __init__(self, hs):
super(ReplicationSendEventRestServlet, self).__init__() super(ReplicationSendEventRestServlet, self).__init__(hs)
self.event_creation_handler = hs.get_event_creation_handler() self.event_creation_handler = hs.get_event_creation_handler()
self.store = hs.get_datastore() self.store = hs.get_datastore()
self.clock = hs.get_clock() self.clock = hs.get_clock()
# The responses are tiny, so we may as well cache them for a while @staticmethod
self.response_cache = ResponseCache(hs, "send_event", timeout_ms=30 * 60 * 1000) @defer.inlineCallbacks
def _serialize_payload(event_id, store, event, context, requester,
ratelimit, extra_users):
"""
Args:
event_id (str)
store (DataStore)
requester (Requester)
event (FrozenEvent)
context (EventContext)
ratelimit (bool)
extra_users (list(UserID)): Any extra users to notify about event
"""
def on_PUT(self, request, event_id): serialized_context = yield context.serialize(event, store)
return self.response_cache.wrap(
event_id, payload = {
self._handle_request, "event": event.get_pdu_json(),
request "internal_metadata": event.internal_metadata.get_dict(),
) "rejected_reason": event.rejected_reason,
"context": serialized_context,
"requester": requester.serialize(),
"ratelimit": ratelimit,
"extra_users": [u.to_string() for u in extra_users],
}
defer.returnValue(payload)
@defer.inlineCallbacks @defer.inlineCallbacks
def _handle_request(self, request): def _handle_request(self, request, event_id):
with Measure(self.clock, "repl_send_event_parse"): with Measure(self.clock, "repl_send_event_parse"):
content = parse_json_object_from_request(request) content = parse_json_object_from_request(request)

View file

@ -44,8 +44,8 @@ class SlavedEventStore(EventFederationWorkerStore,
RoomMemberWorkerStore, RoomMemberWorkerStore,
EventPushActionsWorkerStore, EventPushActionsWorkerStore,
StreamWorkerStore, StreamWorkerStore,
EventsWorkerStore,
StateGroupWorkerStore, StateGroupWorkerStore,
EventsWorkerStore,
SignatureWorkerStore, SignatureWorkerStore,
UserErasureWorkerStore, UserErasureWorkerStore,
BaseSlavedStore): BaseSlavedStore):

View file

@ -21,15 +21,17 @@ from six.moves import range
from twisted.internet import defer from twisted.internet import defer
from synapse.api.constants import EventTypes
from synapse.api.errors import NotFoundError
from synapse.storage._base import SQLBaseStore
from synapse.storage.background_updates import BackgroundUpdateStore from synapse.storage.background_updates import BackgroundUpdateStore
from synapse.storage.engines import PostgresEngine from synapse.storage.engines import PostgresEngine
from synapse.storage.events_worker import EventsWorkerStore
from synapse.util.caches import get_cache_factor_for, intern_string from synapse.util.caches import get_cache_factor_for, intern_string
from synapse.util.caches.descriptors import cached, cachedList from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.dictionary_cache import DictionaryCache from synapse.util.caches.dictionary_cache import DictionaryCache
from synapse.util.stringutils import to_ascii from synapse.util.stringutils import to_ascii
from ._base import SQLBaseStore
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -46,7 +48,8 @@ class _GetStateGroupDelta(namedtuple("_GetStateGroupDelta", ("prev_group", "delt
return len(self.delta_ids) if self.delta_ids else 0 return len(self.delta_ids) if self.delta_ids else 0
class StateGroupWorkerStore(SQLBaseStore): # this inherits from EventsWorkerStore because it calls self.get_events
class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
"""The parts of StateGroupStore that can be called from workers. """The parts of StateGroupStore that can be called from workers.
""" """
@ -61,6 +64,30 @@ class StateGroupWorkerStore(SQLBaseStore):
"*stateGroupCache*", 500000 * get_cache_factor_for("stateGroupCache") "*stateGroupCache*", 500000 * get_cache_factor_for("stateGroupCache")
) )
@defer.inlineCallbacks
def get_room_version(self, room_id):
"""Get the room_version of a given room
Args:
room_id (str)
Returns:
Deferred[str]
Raises:
NotFoundError if the room is unknown
"""
# for now we do this by looking at the create event. We may want to cache this
# more intelligently in future.
state_ids = yield self.get_current_state_ids(room_id)
create_id = state_ids.get((EventTypes.Create, ""))
if not create_id:
raise NotFoundError("Unknown room")
create_event = yield self.get_event(create_id)
defer.returnValue(create_event.content.get("room_version", "1"))
@cached(max_entries=100000, iterable=True) @cached(max_entries=100000, iterable=True)
def get_current_state_ids(self, room_id): def get_current_state_ids(self, room_id):
"""Get the current state event ids for a room based on the """Get the current state event ids for a room based on the

View file

@ -48,7 +48,9 @@ def _expect_edu(destination, edu_type, content, origin="test"):
def _make_edu_json(origin, edu_type, content): def _make_edu_json(origin, edu_type, content):
return json.dumps(_expect_edu("test", edu_type, content, origin=origin)) return json.dumps(
_expect_edu("test", edu_type, content, origin=origin)
).encode('utf8')
class TypingNotificationsTestCase(unittest.TestCase): class TypingNotificationsTestCase(unittest.TestCase):

View file

@ -85,7 +85,7 @@ class HttpTransactionCacheTestCase(unittest.TestCase):
try: try:
yield self.cache.fetch_or_execute(self.mock_key, cb) yield self.cache.fetch_or_execute(self.mock_key, cb)
except Exception as e: except Exception as e:
self.assertEqual(e.message, "boo") self.assertEqual(e.args[0], "boo")
self.assertIs(LoggingContext.current_context(), test_context) self.assertIs(LoggingContext.current_context(), test_context)
res = yield self.cache.fetch_or_execute(self.mock_key, cb) res = yield self.cache.fetch_or_execute(self.mock_key, cb)
@ -111,7 +111,7 @@ class HttpTransactionCacheTestCase(unittest.TestCase):
try: try:
yield self.cache.fetch_or_execute(self.mock_key, cb) yield self.cache.fetch_or_execute(self.mock_key, cb)
except Exception as e: except Exception as e:
self.assertEqual(e.message, "boo") self.assertEqual(e.args[0], "boo")
self.assertIs(LoggingContext.current_context(), test_context) self.assertIs(LoggingContext.current_context(), test_context)
res = yield self.cache.fetch_or_execute(self.mock_key, cb) res = yield self.cache.fetch_or_execute(self.mock_key, cb)

View file

@ -140,7 +140,7 @@ class UserRegisterTestCase(unittest.TestCase):
"admin": True, "admin": True,
"mac": want_mac, "mac": want_mac,
} }
).encode('utf8') )
request, channel = make_request("POST", self.url, body.encode('utf8')) request, channel = make_request("POST", self.url, body.encode('utf8'))
render(request, self.resource, self.clock) render(request, self.resource, self.clock)
@ -168,7 +168,7 @@ class UserRegisterTestCase(unittest.TestCase):
"admin": True, "admin": True,
"mac": want_mac, "mac": want_mac,
} }
).encode('utf8') )
request, channel = make_request("POST", self.url, body.encode('utf8')) request, channel = make_request("POST", self.url, body.encode('utf8'))
render(request, self.resource, self.clock) render(request, self.resource, self.clock)
@ -195,7 +195,7 @@ class UserRegisterTestCase(unittest.TestCase):
"admin": True, "admin": True,
"mac": want_mac, "mac": want_mac,
} }
).encode('utf8') )
request, channel = make_request("POST", self.url, body.encode('utf8')) request, channel = make_request("POST", self.url, body.encode('utf8'))
render(request, self.resource, self.clock) render(request, self.resource, self.clock)
@ -253,7 +253,7 @@ class UserRegisterTestCase(unittest.TestCase):
self.assertEqual('Invalid username', channel.json_body["error"]) self.assertEqual('Invalid username', channel.json_body["error"])
# Must not have null bytes # Must not have null bytes
body = json.dumps({"nonce": nonce(), "username": b"abcd\x00"}) body = json.dumps({"nonce": nonce(), "username": u"abcd\u0000"})
request, channel = make_request("POST", self.url, body.encode('utf8')) request, channel = make_request("POST", self.url, body.encode('utf8'))
render(request, self.resource, self.clock) render(request, self.resource, self.clock)
@ -289,7 +289,7 @@ class UserRegisterTestCase(unittest.TestCase):
self.assertEqual('Invalid password', channel.json_body["error"]) self.assertEqual('Invalid password', channel.json_body["error"])
# Must not have null bytes # Must not have null bytes
body = json.dumps({"nonce": nonce(), "username": "a", "password": b"abcd\x00"}) body = json.dumps({"nonce": nonce(), "username": "a", "password": u"abcd\u0000"})
request, channel = make_request("POST", self.url, body.encode('utf8')) request, channel = make_request("POST", self.url, body.encode('utf8'))
render(request, self.resource, self.clock) render(request, self.resource, self.clock)

View file

@ -80,7 +80,7 @@ class ProfileTestCase(unittest.TestCase):
(code, response) = yield self.mock_resource.trigger( (code, response) = yield self.mock_resource.trigger(
"PUT", "PUT",
"/profile/%s/displayname" % (myid), "/profile/%s/displayname" % (myid),
'{"displayname": "Frank Jr."}' b'{"displayname": "Frank Jr."}'
) )
self.assertEquals(200, code) self.assertEquals(200, code)
@ -95,7 +95,7 @@ class ProfileTestCase(unittest.TestCase):
(code, response) = yield self.mock_resource.trigger( (code, response) = yield self.mock_resource.trigger(
"PUT", "/profile/%s/displayname" % ("@4567:test"), "PUT", "/profile/%s/displayname" % ("@4567:test"),
'{"displayname": "Frank Jr."}' b'{"displayname": "Frank Jr."}'
) )
self.assertTrue( self.assertTrue(
@ -122,7 +122,7 @@ class ProfileTestCase(unittest.TestCase):
(code, response) = yield self.mock_resource.trigger( (code, response) = yield self.mock_resource.trigger(
"PUT", "/profile/%s/displayname" % ("@opaque:elsewhere"), "PUT", "/profile/%s/displayname" % ("@opaque:elsewhere"),
'{"displayname":"bob"}' b'{"displayname":"bob"}'
) )
self.assertTrue( self.assertTrue(
@ -151,7 +151,7 @@ class ProfileTestCase(unittest.TestCase):
(code, response) = yield self.mock_resource.trigger( (code, response) = yield self.mock_resource.trigger(
"PUT", "PUT",
"/profile/%s/avatar_url" % (myid), "/profile/%s/avatar_url" % (myid),
'{"avatar_url": "http://my.server/pic.gif"}' b'{"avatar_url": "http://my.server/pic.gif"}'
) )
self.assertEquals(200, code) self.assertEquals(200, code)

View file

@ -105,7 +105,7 @@ class RestTestCase(unittest.TestCase):
"password": "test", "password": "test",
"type": "m.login.password" "type": "m.login.password"
})) }))
self.assertEquals(200, code) self.assertEquals(200, code, msg=response)
defer.returnValue(response) defer.returnValue(response)
@defer.inlineCallbacks @defer.inlineCallbacks
@ -149,14 +149,14 @@ class RestHelper(object):
def create_room_as(self, room_creator, is_public=True, tok=None): def create_room_as(self, room_creator, is_public=True, tok=None):
temp_id = self.auth_user_id temp_id = self.auth_user_id
self.auth_user_id = room_creator self.auth_user_id = room_creator
path = b"/_matrix/client/r0/createRoom" path = "/_matrix/client/r0/createRoom"
content = {} content = {}
if not is_public: if not is_public:
content["visibility"] = "private" content["visibility"] = "private"
if tok: if tok:
path = path + b"?access_token=%s" % tok.encode('ascii') path = path + "?access_token=%s" % tok
request, channel = make_request(b"POST", path, json.dumps(content).encode('utf8')) request, channel = make_request("POST", path, json.dumps(content).encode('utf8'))
request.render(self.resource) request.render(self.resource)
wait_until_result(self.hs.get_reactor(), channel) wait_until_result(self.hs.get_reactor(), channel)
@ -205,7 +205,7 @@ class RestHelper(object):
data = {"membership": membership} data = {"membership": membership}
request, channel = make_request( request, channel = make_request(
b"PUT", path.encode('ascii'), json.dumps(data).encode('utf8') "PUT", path, json.dumps(data).encode('utf8')
) )
request.render(self.resource) request.render(self.resource)

View file

@ -33,7 +33,7 @@ PATH_PREFIX = "/_matrix/client/v2_alpha"
class FilterTestCase(unittest.TestCase): class FilterTestCase(unittest.TestCase):
USER_ID = b"@apple:test" USER_ID = "@apple:test"
EXAMPLE_FILTER = {"room": {"timeline": {"types": ["m.room.message"]}}} EXAMPLE_FILTER = {"room": {"timeline": {"types": ["m.room.message"]}}}
EXAMPLE_FILTER_JSON = b'{"room": {"timeline": {"types": ["m.room.message"]}}}' EXAMPLE_FILTER_JSON = b'{"room": {"timeline": {"types": ["m.room.message"]}}}'
TO_REGISTER = [filter] TO_REGISTER = [filter]
@ -72,8 +72,8 @@ class FilterTestCase(unittest.TestCase):
def test_add_filter(self): def test_add_filter(self):
request, channel = make_request( request, channel = make_request(
b"POST", "POST",
b"/_matrix/client/r0/user/%s/filter" % (self.USER_ID), "/_matrix/client/r0/user/%s/filter" % (self.USER_ID),
self.EXAMPLE_FILTER_JSON, self.EXAMPLE_FILTER_JSON,
) )
request.render(self.resource) request.render(self.resource)
@ -87,8 +87,8 @@ class FilterTestCase(unittest.TestCase):
def test_add_filter_for_other_user(self): def test_add_filter_for_other_user(self):
request, channel = make_request( request, channel = make_request(
b"POST", "POST",
b"/_matrix/client/r0/user/%s/filter" % (b"@watermelon:test"), "/_matrix/client/r0/user/%s/filter" % ("@watermelon:test"),
self.EXAMPLE_FILTER_JSON, self.EXAMPLE_FILTER_JSON,
) )
request.render(self.resource) request.render(self.resource)
@ -101,8 +101,8 @@ class FilterTestCase(unittest.TestCase):
_is_mine = self.hs.is_mine _is_mine = self.hs.is_mine
self.hs.is_mine = lambda target_user: False self.hs.is_mine = lambda target_user: False
request, channel = make_request( request, channel = make_request(
b"POST", "POST",
b"/_matrix/client/r0/user/%s/filter" % (self.USER_ID), "/_matrix/client/r0/user/%s/filter" % (self.USER_ID),
self.EXAMPLE_FILTER_JSON, self.EXAMPLE_FILTER_JSON,
) )
request.render(self.resource) request.render(self.resource)
@ -119,7 +119,7 @@ class FilterTestCase(unittest.TestCase):
self.clock.advance(1) self.clock.advance(1)
filter_id = filter_id.result filter_id = filter_id.result
request, channel = make_request( request, channel = make_request(
b"GET", b"/_matrix/client/r0/user/%s/filter/%s" % (self.USER_ID, filter_id) "GET", "/_matrix/client/r0/user/%s/filter/%s" % (self.USER_ID, filter_id)
) )
request.render(self.resource) request.render(self.resource)
wait_until_result(self.clock, channel) wait_until_result(self.clock, channel)
@ -129,7 +129,7 @@ class FilterTestCase(unittest.TestCase):
def test_get_filter_non_existant(self): def test_get_filter_non_existant(self):
request, channel = make_request( request, channel = make_request(
b"GET", "/_matrix/client/r0/user/%s/filter/12382148321" % (self.USER_ID) "GET", "/_matrix/client/r0/user/%s/filter/12382148321" % (self.USER_ID)
) )
request.render(self.resource) request.render(self.resource)
wait_until_result(self.clock, channel) wait_until_result(self.clock, channel)
@ -141,7 +141,7 @@ class FilterTestCase(unittest.TestCase):
# in errors.py # in errors.py
def test_get_filter_invalid_id(self): def test_get_filter_invalid_id(self):
request, channel = make_request( request, channel = make_request(
b"GET", "/_matrix/client/r0/user/%s/filter/foobar" % (self.USER_ID) "GET", "/_matrix/client/r0/user/%s/filter/foobar" % (self.USER_ID)
) )
request.render(self.resource) request.render(self.resource)
wait_until_result(self.clock, channel) wait_until_result(self.clock, channel)
@ -151,7 +151,7 @@ class FilterTestCase(unittest.TestCase):
# No ID also returns an invalid_id error # No ID also returns an invalid_id error
def test_get_filter_no_id(self): def test_get_filter_no_id(self):
request, channel = make_request( request, channel = make_request(
b"GET", "/_matrix/client/r0/user/%s/filter/" % (self.USER_ID) "GET", "/_matrix/client/r0/user/%s/filter/" % (self.USER_ID)
) )
request.render(self.resource) request.render(self.resource)
wait_until_result(self.clock, channel) wait_until_result(self.clock, channel)

View file

@ -81,7 +81,7 @@ class RegisterRestServletTestCase(unittest.TestCase):
"access_token": token, "access_token": token,
"home_server": self.hs.hostname, "home_server": self.hs.hostname,
} }
self.assertDictContainsSubset(det_data, json.loads(channel.result["body"])) self.assertDictContainsSubset(det_data, channel.json_body)
def test_POST_appservice_registration_invalid(self): def test_POST_appservice_registration_invalid(self):
self.appservice = None # no application service exists self.appservice = None # no application service exists
@ -102,7 +102,7 @@ class RegisterRestServletTestCase(unittest.TestCase):
self.assertEquals(channel.result["code"], b"400", channel.result) self.assertEquals(channel.result["code"], b"400", channel.result)
self.assertEquals( self.assertEquals(
json.loads(channel.result["body"])["error"], "Invalid password" channel.json_body["error"], "Invalid password"
) )
def test_POST_bad_username(self): def test_POST_bad_username(self):
@ -113,7 +113,7 @@ class RegisterRestServletTestCase(unittest.TestCase):
self.assertEquals(channel.result["code"], b"400", channel.result) self.assertEquals(channel.result["code"], b"400", channel.result)
self.assertEquals( self.assertEquals(
json.loads(channel.result["body"])["error"], "Invalid username" channel.json_body["error"], "Invalid username"
) )
def test_POST_user_valid(self): def test_POST_user_valid(self):
@ -140,7 +140,7 @@ class RegisterRestServletTestCase(unittest.TestCase):
"device_id": device_id, "device_id": device_id,
} }
self.assertEquals(channel.result["code"], b"200", channel.result) self.assertEquals(channel.result["code"], b"200", channel.result)
self.assertDictContainsSubset(det_data, json.loads(channel.result["body"])) self.assertDictContainsSubset(det_data, channel.json_body)
self.auth_handler.get_login_tuple_for_user_id( self.auth_handler.get_login_tuple_for_user_id(
user_id, device_id=device_id, initial_device_display_name=None user_id, device_id=device_id, initial_device_display_name=None
) )
@ -158,7 +158,7 @@ class RegisterRestServletTestCase(unittest.TestCase):
self.assertEquals(channel.result["code"], b"403", channel.result) self.assertEquals(channel.result["code"], b"403", channel.result)
self.assertEquals( self.assertEquals(
json.loads(channel.result["body"])["error"], channel.json_body["error"],
"Registration has been disabled", "Registration has been disabled",
) )
@ -178,7 +178,7 @@ class RegisterRestServletTestCase(unittest.TestCase):
"device_id": "guest_device", "device_id": "guest_device",
} }
self.assertEquals(channel.result["code"], b"200", channel.result) self.assertEquals(channel.result["code"], b"200", channel.result)
self.assertDictContainsSubset(det_data, json.loads(channel.result["body"])) self.assertDictContainsSubset(det_data, channel.json_body)
def test_POST_disabled_guest_registration(self): def test_POST_disabled_guest_registration(self):
self.hs.config.allow_guest_access = False self.hs.config.allow_guest_access = False
@ -189,5 +189,5 @@ class RegisterRestServletTestCase(unittest.TestCase):
self.assertEquals(channel.result["code"], b"403", channel.result) self.assertEquals(channel.result["code"], b"403", channel.result)
self.assertEquals( self.assertEquals(
json.loads(channel.result["body"])["error"], "Guest access is disabled" channel.json_body["error"], "Guest access is disabled"
) )

View file

@ -32,7 +32,7 @@ PATH_PREFIX = "/_matrix/client/v2_alpha"
class FilterTestCase(unittest.TestCase): class FilterTestCase(unittest.TestCase):
USER_ID = b"@apple:test" USER_ID = "@apple:test"
TO_REGISTER = [sync] TO_REGISTER = [sync]
def setUp(self): def setUp(self):
@ -68,7 +68,7 @@ class FilterTestCase(unittest.TestCase):
r.register_servlets(self.hs, self.resource) r.register_servlets(self.hs, self.resource)
def test_sync_argless(self): def test_sync_argless(self):
request, channel = make_request(b"GET", b"/_matrix/client/r0/sync") request, channel = make_request("GET", "/_matrix/client/r0/sync")
request.render(self.resource) request.render(self.resource)
wait_until_result(self.clock, channel) wait_until_result(self.clock, channel)

View file

@ -11,6 +11,7 @@ from twisted.python.failure import Failure
from twisted.test.proto_helpers import MemoryReactorClock from twisted.test.proto_helpers import MemoryReactorClock
from synapse.http.site import SynapseRequest from synapse.http.site import SynapseRequest
from synapse.util import Clock
from tests.utils import setup_test_homeserver as _sth from tests.utils import setup_test_homeserver as _sth
@ -28,7 +29,13 @@ class FakeChannel(object):
def json_body(self): def json_body(self):
if not self.result: if not self.result:
raise Exception("No result yet.") raise Exception("No result yet.")
return json.loads(self.result["body"]) return json.loads(self.result["body"].decode('utf8'))
@property
def code(self):
if not self.result:
raise Exception("No result yet.")
return int(self.result["code"])
def writeHeaders(self, version, code, reason, headers): def writeHeaders(self, version, code, reason, headers):
self.result["version"] = version self.result["version"] = version
@ -79,11 +86,16 @@ def make_request(method, path, content=b""):
Make a web request using the given method and path, feed it the Make a web request using the given method and path, feed it the
content, and return the Request and the Channel underneath. content, and return the Request and the Channel underneath.
""" """
if not isinstance(method, bytes):
method = method.encode('ascii')
if not isinstance(path, bytes):
path = path.encode('ascii')
# Decorate it to be the full path # Decorate it to be the full path
if not path.startswith(b"/_matrix"): if not path.startswith(b"/_matrix"):
path = b"/_matrix/client/r0/" + path path = b"/_matrix/client/r0/" + path
path = path.replace("//", "/") path = path.replace(b"//", b"/")
if isinstance(content, text_type): if isinstance(content, text_type):
content = content.encode('utf8') content = content.encode('utf8')
@ -191,3 +203,9 @@ def setup_test_homeserver(*args, **kwargs):
clock.threadpool = ThreadPool() clock.threadpool = ThreadPool()
pool.threadpool = ThreadPool() pool.threadpool = ThreadPool()
return d return d
def get_clock():
clock = ThreadedMemoryReactorClock()
hs_clock = Clock(clock)
return (clock, hs_clock)

View file

@ -49,7 +49,7 @@ class EventFederationWorkerStoreTestCase(tests.unittest.TestCase):
'INSERT INTO event_reference_hashes ' 'INSERT INTO event_reference_hashes '
'(event_id, algorithm, hash) ' '(event_id, algorithm, hash) '
"VALUES (?, 'sha256', ?)" "VALUES (?, 'sha256', ?)"
), (event_id, 'ffff')) ), (event_id, b'ffff'))
for i in range(0, 11): for i in range(0, 11):
yield self.store.runInteraction("insert", insert_event, i) yield self.store.runInteraction("insert", insert_event, i)

View file

@ -176,7 +176,7 @@ class StateStoreTestCase(tests.unittest.TestCase):
room_id = self.room.to_string() room_id = self.room.to_string()
group_ids = yield self.store.get_state_groups_ids(room_id, [e5.event_id]) group_ids = yield self.store.get_state_groups_ids(room_id, [e5.event_id])
group = group_ids.keys()[0] group = list(group_ids.keys())[0]
# test _get_some_state_from_cache correctly filters out members with types=[] # test _get_some_state_from_cache correctly filters out members with types=[]
(state_dict, is_all) = yield self.store._get_some_state_from_cache( (state_dict, is_all) = yield self.store._get_some_state_from_cache(

View file

@ -1,4 +1,3 @@
import json
import re import re
from twisted.internet.defer import Deferred from twisted.internet.defer import Deferred
@ -104,9 +103,8 @@ class JsonResourceTests(unittest.TestCase):
request.render(res) request.render(res)
self.assertEqual(channel.result["code"], b'403') self.assertEqual(channel.result["code"], b'403')
reply_body = json.loads(channel.result["body"]) self.assertEqual(channel.json_body["error"], "Forbidden!!one!")
self.assertEqual(reply_body["error"], "Forbidden!!one!") self.assertEqual(channel.json_body["errcode"], "M_FORBIDDEN")
self.assertEqual(reply_body["errcode"], "M_FORBIDDEN")
def test_no_handler(self): def test_no_handler(self):
""" """
@ -126,6 +124,5 @@ class JsonResourceTests(unittest.TestCase):
request.render(res) request.render(res)
self.assertEqual(channel.result["code"], b'400') self.assertEqual(channel.result["code"], b'400')
reply_body = json.loads(channel.result["body"]) self.assertEqual(channel.json_body["error"], "Unrecognized request")
self.assertEqual(reply_body["error"], "Unrecognized request") self.assertEqual(channel.json_body["errcode"], "M_UNRECOGNIZED")
self.assertEqual(reply_body["errcode"], "M_UNRECOGNIZED")

View file

@ -77,6 +77,10 @@ def setup_test_homeserver(name="test", datastore=None, config=None, reactor=None
config.max_mau_value = 50 config.max_mau_value = 50
config.mau_limits_reserved_threepids = [] config.mau_limits_reserved_threepids = []
# we need a sane default_room_version, otherwise attempts to create rooms will
# fail.
config.default_room_version = "1"
# disable user directory updates, because they get done in the # disable user directory updates, because they get done in the
# background, which upsets the test runner. # background, which upsets the test runner.
config.update_user_directory = False config.update_user_directory = False
@ -149,8 +153,9 @@ def setup_test_homeserver(name="test", datastore=None, config=None, reactor=None
# Need to let the HS build an auth handler and then mess with it # Need to let the HS build an auth handler and then mess with it
# because AuthHandler's constructor requires the HS, so we can't make one # because AuthHandler's constructor requires the HS, so we can't make one
# beforehand and pass it in to the HS's constructor (chicken / egg) # beforehand and pass it in to the HS's constructor (chicken / egg)
hs.get_auth_handler().hash = lambda p: hashlib.md5(p).hexdigest() hs.get_auth_handler().hash = lambda p: hashlib.md5(p.encode('utf8')).hexdigest()
hs.get_auth_handler().validate_hash = lambda p, h: hashlib.md5(p).hexdigest() == h hs.get_auth_handler().validate_hash = lambda p, h: hashlib.md5(
p.encode('utf8')).hexdigest() == h
fed = kargs.get("resource_for_federation", None) fed = kargs.get("resource_for_federation", None)
if fed: if fed:
@ -223,8 +228,8 @@ class MockHttpResource(HttpServer):
mock_content.configure_mock(**config) mock_content.configure_mock(**config)
mock_request.content = mock_content mock_request.content = mock_content
mock_request.method = http_method mock_request.method = http_method.encode('ascii')
mock_request.uri = path mock_request.uri = path.encode('ascii')
mock_request.getClientIP.return_value = "-" mock_request.getClientIP.return_value = "-"