0
0
Fork 1
mirror of https://mau.dev/maunium/synapse.git synced 2024-12-14 04:13:46 +01:00

Merge branch 'develop' into send_sni_for_federation_requests

This commit is contained in:
Richard van der Hoff 2018-07-27 09:17:11 +01:00 committed by GitHub
commit 7041cd872b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
156 changed files with 7063 additions and 5235 deletions

View file

@ -23,6 +23,9 @@ matrix:
- python: 3.6
env: TOX_ENV=py36
- python: 3.6
env: TOX_ENV=check_isort
- python: 3.6
env: TOX_ENV=check-newsfragment

2470
CHANGES.md Normal file

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

View file

@ -2,6 +2,7 @@ include synctl
include LICENSE
include VERSION
include *.rst
include *.md
include demo/README
include demo/demo.tls.dh
include demo/*.py

View file

@ -71,7 +71,7 @@ We'd like to invite you to join #matrix:matrix.org (via
https://matrix.org/docs/projects/try-matrix-now.html), run a homeserver, take a look
at the `Matrix spec <https://matrix.org/docs/spec>`_, and experiment with the
`APIs <https://matrix.org/docs/api>`_ and `Client SDKs
<http://matrix.org/docs/projects/try-matrix-now.html#client-sdks>`_.
<https://matrix.org/docs/projects/try-matrix-now.html#client-sdks>`_.
Thanks for using Matrix!
@ -283,7 +283,7 @@ Connecting to Synapse from a client
The easiest way to try out your new Synapse installation is by connecting to it
from a web client. The easiest option is probably the one at
http://riot.im/app. You will need to specify a "Custom server" when you log on
https://riot.im/app. You will need to specify a "Custom server" when you log on
or register: set this to ``https://domain.tld`` if you setup a reverse proxy
following the recommended setup, or ``https://localhost:8448`` - remember to specify the
port (``:8448``) if not ``:443`` unless you changed the configuration. (Leave the identity
@ -329,7 +329,7 @@ Security Note
=============
Matrix serves raw user generated data in some APIs - specifically the `content
repository endpoints <http://matrix.org/docs/spec/client_server/latest.html#get-matrix-media-r0-download-servername-mediaid>`_.
repository endpoints <https://matrix.org/docs/spec/client_server/latest.html#get-matrix-media-r0-download-servername-mediaid>`_.
Whilst we have tried to mitigate against possible XSS attacks (e.g.
https://github.com/matrix-org/synapse/pull/1021) we recommend running
@ -348,7 +348,7 @@ Platform-Specific Instructions
Debian
------
Matrix provides official Debian packages via apt from http://matrix.org/packages/debian/.
Matrix provides official Debian packages via apt from https://matrix.org/packages/debian/.
Note that these packages do not include a client - choose one from
https://matrix.org/docs/projects/try-matrix-now.html (or build your own with one of our SDKs :)
@ -524,7 +524,7 @@ Troubleshooting Running
-----------------------
If synapse fails with ``missing "sodium.h"`` crypto errors, you may need
to manually upgrade PyNaCL, as synapse uses NaCl (http://nacl.cr.yp.to/) for
to manually upgrade PyNaCL, as synapse uses NaCl (https://nacl.cr.yp.to/) for
encryption and digital signatures.
Unfortunately PyNACL currently has a few issues
(https://github.com/pyca/pynacl/issues/53) and
@ -672,8 +672,8 @@ useful just for development purposes. See `<demo/README>`_.
Using PostgreSQL
================
As of Synapse 0.9, `PostgreSQL <http://www.postgresql.org>`_ is supported as an
alternative to the `SQLite <http://sqlite.org/>`_ database that Synapse has
As of Synapse 0.9, `PostgreSQL <https://www.postgresql.org>`_ is supported as an
alternative to the `SQLite <https://sqlite.org/>`_ database that Synapse has
traditionally used for convenience and simplicity.
The advantages of Postgres include:
@ -697,7 +697,7 @@ Using a reverse proxy with Synapse
It is recommended to put a reverse proxy such as
`nginx <https://nginx.org/en/docs/http/ngx_http_proxy_module.html>`_,
`Apache <https://httpd.apache.org/docs/current/mod/mod_proxy_http.html>`_ or
`HAProxy <http://www.haproxy.org/>`_ in front of Synapse. One advantage of
`HAProxy <https://www.haproxy.org/>`_ in front of Synapse. One advantage of
doing so is that it means that you can expose the default https port (443) to
Matrix clients without needing to run Synapse with root privileges.

1
changelog.d/2970.feature Normal file
View file

@ -0,0 +1 @@
add support for the lazy_loaded_members filter as per MSC1227

View file

@ -1 +0,0 @@
Enforce the specified API for report_event

1
changelog.d/3331.feature Normal file
View file

@ -0,0 +1 @@
add support for the include_redundant_members filter param as per MSC1227

1
changelog.d/3350.misc Normal file
View file

@ -0,0 +1 @@
Remove redundant checks on who_forgot_in_room

1
changelog.d/3367.misc Normal file
View file

@ -0,0 +1 @@
Remove unnecessary event re-signing hacks

View file

@ -1 +0,0 @@
Include CPU time from database threads in request/block metrics.

View file

@ -1 +0,0 @@
Add CPU metrics for _fetch_event_list

View file

View file

@ -1 +0,0 @@
Reduce database consumption when processing large numbers of receipts

1
changelog.d/3514.bugfix Normal file
View file

@ -0,0 +1 @@
Don't generate TURN credentials if no TURN config options are set

1
changelog.d/3520.bugfix Normal file
View file

@ -0,0 +1 @@
Correctly announce deleted devices over federation

View file

@ -1 +0,0 @@
Cache optimisation for /sync requests

View file

@ -1 +0,0 @@
Fix queued federation requests being processed in the wrong order

View file

@ -1 +0,0 @@
refactor: use parse_{string,integer} and assert's from http.servlet for deduplication

View file

1
changelog.d/3548.bugfix Normal file
View file

@ -0,0 +1 @@
Catch failures saving metrics captured by Measure, and instead log the faulty metrics information for further analysis.

1
changelog.d/3552.misc Normal file
View file

@ -0,0 +1 @@
Release notes are now in the Markdown format.

1
changelog.d/3553.feature Normal file
View file

@ -0,0 +1 @@
Add metrics to track resource usage by background processes

1
changelog.d/3554.feature Normal file
View file

@ -0,0 +1 @@
Add `code` label to `synapse_http_server_response_time_seconds` prometheus metric

1
changelog.d/3555.feature Normal file
View file

@ -0,0 +1 @@
Add support for client_reader to handle more APIs

1
changelog.d/3556.feature Normal file
View file

@ -0,0 +1 @@
Add metrics to track resource usage by background processes

1
changelog.d/3559.misc Normal file
View file

@ -0,0 +1 @@
add config for pep8

1
changelog.d/3570.bugfix Normal file
View file

@ -0,0 +1 @@
Fix potential stack overflow and deadlock under heavy load

1
changelog.d/3571.misc Normal file
View file

@ -0,0 +1 @@
Merge Linearizer and Limiter

1
changelog.d/3572.misc Normal file
View file

@ -0,0 +1 @@
Merge Linearizer and Limiter

1
changelog.d/3579.misc Normal file
View file

@ -0,0 +1 @@
Lazily load state on master process when using workers to reduce DB consumption

1
changelog.d/3581.misc Normal file
View file

@ -0,0 +1 @@
Lazily load state on master process when using workers to reduce DB consumption

1
changelog.d/3582.misc Normal file
View file

@ -0,0 +1 @@
Lazily load state on master process when using workers to reduce DB consumption

1
changelog.d/3584.misc Normal file
View file

@ -0,0 +1 @@
Lazily load state on master process when using workers to reduce DB consumption

1
changelog.d/3586.misc Normal file
View file

@ -0,0 +1 @@
Fixes and optimisations for resolve_state_groups

1
changelog.d/3587.misc Normal file
View file

@ -0,0 +1 @@
Improve logging for exceptions when handling PDUs

1
changelog.d/3590.misc Normal file
View file

@ -0,0 +1 @@
Add some measure blocks to persist_events

1
changelog.d/3591.misc Normal file
View file

@ -0,0 +1 @@
Fix some random logcontext leaks.

1
changelog.d/3592.misc Normal file
View file

@ -0,0 +1 @@
Speed up calculating state deltas in persist_event loop

1
changelog.d/3595.misc Normal file
View file

@ -0,0 +1 @@
Attempt to reduce amount of state pulled out of DB during persist_events

1
changelog.d/3597.feature Normal file
View file

@ -0,0 +1 @@
Add support for client_reader to handle more APIs

1
changelog.d/3601.bugfix Normal file
View file

@ -0,0 +1 @@
Fix failure to persist events over federation under load

1
changelog.d/3604.feature Normal file
View file

@ -0,0 +1 @@
Add metrics to track resource usage by background processes

1
changelog.d/3605.bugfix Normal file
View file

@ -0,0 +1 @@
Fix updating of cached remote profiles

1
changelog.d/3606.misc Normal file
View file

@ -0,0 +1 @@
Fix some random logcontext leaks.

1
changelog.d/3607.bugfix Normal file
View file

@ -0,0 +1 @@
Fix 'tuple index out of range' error

1
changelog.d/3609.misc Normal file
View file

@ -0,0 +1 @@
Fix a documentation typo in on_make_leave_request

1
changelog.d/3610.feature Normal file
View file

@ -0,0 +1 @@
Add metrics to track resource usage by background processes

1
changelog.d/3613.misc Normal file
View file

@ -0,0 +1 @@
Remove some redundant joins on event_edges.room_id

1
changelog.d/3614.misc Normal file
View file

@ -0,0 +1 @@
Stop populating events.content

1
changelog.d/3616.misc Normal file
View file

@ -0,0 +1 @@
Update the /send_leave path registration to use event_id rather than a transaction ID.

View file

@ -0,0 +1,63 @@
Shared-Secret Registration
==========================
This API allows for the creation of users in an administrative and
non-interactive way. This is generally used for bootstrapping a Synapse
instance with administrator accounts.
To authenticate yourself to the server, you will need both the shared secret
(``registration_shared_secret`` in the homeserver configuration), and a
one-time nonce. If the registration shared secret is not configured, this API
is not enabled.
To fetch the nonce, you need to request one from the API::
> GET /_matrix/client/r0/admin/register
< {"nonce": "thisisanonce"}
Once you have the nonce, you can make a ``POST`` to the same URL with a JSON
body containing the nonce, username, password, whether they are an admin
(optional, False by default), and a HMAC digest of the content.
As an example::
> POST /_matrix/client/r0/admin/register
> {
"nonce": "thisisanonce",
"username": "pepper_roni",
"password": "pizza",
"admin": true,
"mac": "mac_digest_here"
}
< {
"access_token": "token_here",
"user_id": "@pepper_roni@test",
"home_server": "test",
"device_id": "device_id_here"
}
The MAC is the hex digest output of the HMAC-SHA1 algorithm, with the key being
the shared secret and the content being the nonce, user, password, and either
the string "admin" or "notadmin", each separated by NULs. For an example of
generation in Python::
import hmac, hashlib
def generate_mac(nonce, user, password, admin=False):
mac = hmac.new(
key=shared_secret,
digestmod=hashlib.sha1,
)
mac.update(nonce.encode('utf8'))
mac.update(b"\x00")
mac.update(user.encode('utf8'))
mac.update(b"\x00")
mac.update(password.encode('utf8'))
mac.update(b"\x00")
mac.update(b"admin" if admin else b"notadmin")
return mac.hexdigest()

View file

@ -206,6 +206,10 @@ Handles client API endpoints. It can handle REST endpoints matching the
following regular expressions::
^/_matrix/client/(api/v1|r0|unstable)/publicRooms$
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/joined_members$
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/context/.*$
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/members$
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/state$
``synapse.app.user_dir``
~~~~~~~~~~~~~~~~~~~~~~~~

View file

@ -1,5 +1,30 @@
[tool.towncrier]
package = "synapse"
filename = "CHANGES.rst"
filename = "CHANGES.md"
directory = "changelog.d"
issue_format = "`#{issue} <https://github.com/matrix-org/synapse/issues/{issue}>`_"
issue_format = "[\\#{issue}](https://github.com/matrix-org/synapse/issues/{issue}>)"
[[tool.towncrier.type]]
directory = "feature"
name = "Features"
showcontent = true
[[tool.towncrier.type]]
directory = "bugfix"
name = "Bugfixes"
showcontent = true
[[tool.towncrier.type]]
directory = "doc"
name = "Improved Documentation"
showcontent = true
[[tool.towncrier.type]]
directory = "removal"
name = "Deprecations and Removals"
showcontent = true
[[tool.towncrier.type]]
directory = "misc"
name = "Internal Changes"
showcontent = true

View file

@ -26,11 +26,37 @@ import yaml
def request_registration(user, password, server_location, shared_secret, admin=False):
req = urllib2.Request(
"%s/_matrix/client/r0/admin/register" % (server_location,),
headers={'Content-Type': 'application/json'}
)
try:
if sys.version_info[:3] >= (2, 7, 9):
# As of version 2.7.9, urllib2 now checks SSL certs
import ssl
f = urllib2.urlopen(req, context=ssl.SSLContext(ssl.PROTOCOL_SSLv23))
else:
f = urllib2.urlopen(req)
body = f.read()
f.close()
nonce = json.loads(body)["nonce"]
except urllib2.HTTPError as e:
print "ERROR! Received %d %s" % (e.code, e.reason,)
if 400 <= e.code < 500:
if e.info().type == "application/json":
resp = json.load(e)
if "error" in resp:
print resp["error"]
sys.exit(1)
mac = hmac.new(
key=shared_secret,
digestmod=hashlib.sha1,
)
mac.update(nonce)
mac.update("\x00")
mac.update(user)
mac.update("\x00")
mac.update(password)
@ -40,10 +66,10 @@ def request_registration(user, password, server_location, shared_secret, admin=F
mac = mac.hexdigest()
data = {
"user": user,
"nonce": nonce,
"username": user,
"password": password,
"mac": mac,
"type": "org.matrix.login.shared_secret",
"admin": admin,
}
@ -52,7 +78,7 @@ def request_registration(user, password, server_location, shared_secret, admin=F
print "Sending registration request..."
req = urllib2.Request(
"%s/_matrix/client/api/v1/register" % (server_location,),
"%s/_matrix/client/r0/admin/register" % (server_location,),
data=json.dumps(data),
headers={'Content-Type': 'application/json'}
)

View file

@ -14,12 +14,17 @@ ignore =
pylint.cfg
tox.ini
[flake8]
[pep8]
max-line-length = 90
# W503 requires that binary operators be at the end, not start, of lines. Erik doesn't like it.
# E203 is contrary to PEP8.
# W503 requires that binary operators be at the end, not start, of lines. Erik
# doesn't like it. E203 is contrary to PEP8.
ignore = W503,E203
[flake8]
# note that flake8 inherits the "ignore" settings from "pep8" (because it uses
# pep8 to do those checks), but not the "max-line-length" setting
max-line-length = 90
[isort]
line_length = 89
not_skip = __init__.py
@ -31,3 +36,4 @@ known_compat = mock,six
known_twisted=twisted,OpenSSL
multi_line_output=3
include_trailing_comma=true
combine_as_imports=true

View file

@ -17,4 +17,4 @@
""" This is a reference implementation of a Matrix home server.
"""
__version__ = "0.32.2"
__version__ = "0.33.0"

View file

@ -65,8 +65,9 @@ class Auth(object):
@defer.inlineCallbacks
def check_from_context(self, event, context, do_sig_check=True):
prev_state_ids = yield context.get_prev_state_ids(self.store)
auth_events_ids = yield self.compute_auth_events(
event, context.prev_state_ids, for_verification=True,
event, prev_state_ids, for_verification=True,
)
auth_events = yield self.store.get_events(auth_events_ids)
auth_events = {
@ -544,7 +545,8 @@ class Auth(object):
@defer.inlineCallbacks
def add_auth_events(self, builder, context):
auth_ids = yield self.compute_auth_events(builder, context.prev_state_ids)
prev_state_ids = yield context.get_prev_state_ids(self.store)
auth_ids = yield self.compute_auth_events(builder, prev_state_ids)
auth_events_entries = yield self.store.add_event_hashes(
auth_ids
@ -737,3 +739,37 @@ class Auth(object):
)
return query_params[0]
@defer.inlineCallbacks
def check_in_room_or_world_readable(self, room_id, user_id):
"""Checks that the user is or was in the room or the room is world
readable. If it isn't then an exception is raised.
Returns:
Deferred[tuple[str, str|None]]: Resolves to the current membership of
the user in the room and the membership event ID of the user. If
the user is not in the room and never has been, then
`(Membership.JOIN, None)` is returned.
"""
try:
# check_user_was_in_room will return the most recent membership
# event for the user if:
# * The user is a non-guest user, and was ever in the room
# * The user is a guest user, and has joined the room
# else it will throw.
member_event = yield self.check_user_was_in_room(room_id, user_id)
defer.returnValue((member_event.membership, member_event.event_id))
except AuthError:
visibility = yield self.state.get_current_state(
room_id, EventTypes.RoomHistoryVisibility, ""
)
if (
visibility and
visibility.content["history_visibility"] == "world_readable"
):
defer.returnValue((Membership.JOIN, None))
return
raise AuthError(
403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN
)

View file

@ -113,7 +113,13 @@ ROOM_EVENT_FILTER_SCHEMA = {
},
"contains_url": {
"type": "boolean"
}
},
"lazy_load_members": {
"type": "boolean"
},
"include_redundant_members": {
"type": "boolean"
},
}
}
@ -261,6 +267,12 @@ class FilterCollection(object):
def ephemeral_limit(self):
return self._room_ephemeral_filter.limit()
def lazy_load_members(self):
return self._room_state_filter.lazy_load_members()
def include_redundant_members(self):
return self._room_state_filter.include_redundant_members()
def filter_presence(self, events):
return self._presence_filter.filter(events)
@ -417,6 +429,12 @@ class Filter(object):
def limit(self):
return self.filter_json.get("limit", 10)
def lazy_load_members(self):
return self.filter_json.get("lazy_load_members", False)
def include_redundant_members(self):
return self.filter_json.get("include_redundant_members", False)
def _matches_wildcard(actual_value, filter_value):
if filter_value.endswith("*"):

View file

@ -31,6 +31,7 @@ from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
from synapse.replication.slave.storage.directory import DirectoryStore
@ -40,7 +41,13 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.slave.storage.transactions import TransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.client.v1.room import PublicRoomListRestServlet
from synapse.rest.client.v1.room import (
JoinedRoomMemberListRestServlet,
PublicRoomListRestServlet,
RoomEventContextServlet,
RoomMemberListRestServlet,
RoomStateRestServlet,
)
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.util.httpresourcetree import create_resource_tree
@ -52,6 +59,7 @@ logger = logging.getLogger("synapse.app.client_reader")
class ClientReaderSlavedStore(
SlavedAccountDataStore,
SlavedEventStore,
SlavedKeyStore,
RoomStore,
@ -82,7 +90,13 @@ class ClientReaderServer(HomeServer):
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
elif name == "client":
resource = JsonResource(self, canonical_json=False)
PublicRoomListRestServlet(self).register(resource)
RoomMemberListRestServlet(self).register(resource)
JoinedRoomMemberListRestServlet(self).register(resource)
RoomStateRestServlet(self).register(resource)
RoomEventContextServlet(self).register(resource)
resources.update({
"/_matrix/client/r0": resource,
"/_matrix/client/unstable": resource,

View file

@ -18,6 +18,8 @@ import logging
import os
import sys
from six import iteritems
from twisted.application import service
from twisted.internet import defer, reactor
from twisted.web.resource import EncodingResourceWrapper, NoResource
@ -47,6 +49,7 @@ from synapse.http.additional_resource import AdditionalResource
from synapse.http.server import RootRedirect
from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.module_api import ModuleApi
from synapse.python_dependencies import CONDITIONAL_REQUIREMENTS, check_requirements
@ -427,6 +430,9 @@ def run(hs):
# currently either 0 or 1
stats_process = []
def start_phone_stats_home():
return run_as_background_process("phone_stats_home", phone_stats_home)
@defer.inlineCallbacks
def phone_stats_home():
logger.info("Gathering stats for reporting")
@ -444,7 +450,7 @@ def run(hs):
stats["total_nonbridged_users"] = total_nonbridged_users
daily_user_type_results = yield hs.get_datastore().count_daily_user_type()
for name, count in daily_user_type_results.iteritems():
for name, count in iteritems(daily_user_type_results):
stats["daily_user_type_" + name] = count
room_count = yield hs.get_datastore().get_room_count()
@ -455,7 +461,7 @@ def run(hs):
stats["daily_messages"] = yield hs.get_datastore().count_daily_messages()
r30_results = yield hs.get_datastore().count_r30_users()
for name, count in r30_results.iteritems():
for name, count in iteritems(r30_results):
stats["r30_users_" + name] = count
daily_sent_messages = yield hs.get_datastore().count_daily_sent_messages()
@ -498,7 +504,10 @@ def run(hs):
)
def generate_user_daily_visit_stats():
hs.get_datastore().generate_user_daily_visits()
return run_as_background_process(
"generate_user_daily_visits",
hs.get_datastore().generate_user_daily_visits,
)
# Rather than update on per session basis, batch up the requests.
# If you increase the loop period, the accuracy of user_daily_visits
@ -507,7 +516,7 @@ def run(hs):
if hs.config.report_stats:
logger.info("Scheduling stats reporting for 3 hour intervals")
clock.looping_call(phone_stats_home, 3 * 60 * 60 * 1000)
clock.looping_call(start_phone_stats_home, 3 * 60 * 60 * 1000)
# We need to defer this init for the cases that we daemonize
# otherwise the process ID we get is that of the non-daemon process
@ -515,7 +524,7 @@ def run(hs):
# We wait 5 minutes to send the first set of stats as the server can
# be quite busy the first few minutes
clock.call_later(5 * 60, phone_stats_home)
clock.call_later(5 * 60, start_phone_stats_home)
if hs.config.daemonize and hs.config.print_pidfile:
print (hs.config.pid_file)

View file

@ -55,7 +55,6 @@ from synapse.rest.client.v2_alpha import sync
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.storage.presence import UserPresenceState
from synapse.storage.roommember import RoomMemberStore
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, run_in_background
from synapse.util.manhole import manhole
@ -81,9 +80,7 @@ class SynchrotronSlavedStore(
RoomStore,
BaseSlavedStore,
):
did_forget = (
RoomMemberStore.__dict__["did_forget"]
)
pass
UPDATE_SYNCING_USERS_MS = 10 * 1000

View file

@ -25,6 +25,8 @@ import subprocess
import sys
import time
from six import iteritems
import yaml
SYNAPSE = [sys.executable, "-B", "-m", "synapse.app.homeserver"]
@ -173,7 +175,7 @@ def main():
os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor)
cache_factors = config.get("synctl_cache_factors", {})
for cache_name, factor in cache_factors.iteritems():
for cache_name, factor in iteritems(cache_factors):
os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor)
worker_configfiles = []

View file

@ -30,10 +30,10 @@ class VoipConfig(Config):
## Turn ##
# The public URIs of the TURN server to give to clients
turn_uris: []
#turn_uris: []
# The shared secret used to compute passwords for the TURN server
turn_shared_secret: "YOUR_SHARED_SECRET"
#turn_shared_secret: "YOUR_SHARED_SECRET"
# The Username and password if the TURN server needs them and
# does not use a token

View file

@ -13,22 +13,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from six import iteritems
from frozendict import frozendict
from twisted.internet import defer
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
class EventContext(object):
"""
Attributes:
current_state_ids (dict[(str, str), str]):
The current state map including the current event.
(type, state_key) -> event_id
prev_state_ids (dict[(str, str), str]):
The current state map excluding the current event.
(type, state_key) -> event_id
state_group (int|None): state group id, if the state has been stored
as a state group. This is usually only None if e.g. the event is
an outlier.
@ -45,38 +41,77 @@ class EventContext(object):
prev_state_events (?): XXX: is this ever set to anything other than
the empty list?
_current_state_ids (dict[(str, str), str]|None):
The current state map including the current event. None if outlier
or we haven't fetched the state from DB yet.
(type, state_key) -> event_id
_prev_state_ids (dict[(str, str), str]|None):
The current state map excluding the current event. None if outlier
or we haven't fetched the state from DB yet.
(type, state_key) -> event_id
_fetching_state_deferred (Deferred|None): Resolves when *_state_ids have
been calculated. None if we haven't started calculating yet
_event_type (str): The type of the event the context is associated with.
Only set when state has not been fetched yet.
_event_state_key (str|None): The state_key of the event the context is
associated with. Only set when state has not been fetched yet.
_prev_state_id (str|None): If the event associated with the context is
a state event, then `_prev_state_id` is the event_id of the state
that was replaced.
Only set when state has not been fetched yet.
"""
__slots__ = [
"current_state_ids",
"prev_state_ids",
"state_group",
"rejected",
"prev_group",
"delta_ids",
"prev_state_events",
"app_service",
"_current_state_ids",
"_prev_state_ids",
"_prev_state_id",
"_event_type",
"_event_state_key",
"_fetching_state_deferred",
]
def __init__(self):
# The current state including the current event
self.current_state_ids = None
# The current state excluding the current event
self.prev_state_ids = None
self.state_group = None
self.prev_state_events = []
self.rejected = False
self.app_service = None
@staticmethod
def with_state(state_group, current_state_ids, prev_state_ids,
prev_group=None, delta_ids=None):
context = EventContext()
# The current state including the current event
context._current_state_ids = current_state_ids
# The current state excluding the current event
context._prev_state_ids = prev_state_ids
context.state_group = state_group
context._prev_state_id = None
context._event_type = None
context._event_state_key = None
context._fetching_state_deferred = defer.succeed(None)
# A previously persisted state group and a delta between that
# and this state.
self.prev_group = None
self.delta_ids = None
context.prev_group = prev_group
context.delta_ids = delta_ids
self.prev_state_events = None
return context
self.app_service = None
def serialize(self, event):
@defer.inlineCallbacks
def serialize(self, event, store):
"""Converts self to a type that can be serialized as JSON, and then
deserialized by `deserialize`
@ -92,11 +127,12 @@ class EventContext(object):
# the prev_state_ids, so if we're a state event we include the event
# id that we replaced in the state.
if event.is_state():
prev_state_id = self.prev_state_ids.get((event.type, event.state_key))
prev_state_ids = yield self.get_prev_state_ids(store)
prev_state_id = prev_state_ids.get((event.type, event.state_key))
else:
prev_state_id = None
return {
defer.returnValue({
"prev_state_id": prev_state_id,
"event_type": event.type,
"event_state_key": event.state_key if event.is_state() else None,
@ -106,10 +142,9 @@ class EventContext(object):
"delta_ids": _encode_state_dict(self.delta_ids),
"prev_state_events": self.prev_state_events,
"app_service_id": self.app_service.id if self.app_service else None
}
})
@staticmethod
@defer.inlineCallbacks
def deserialize(store, input):
"""Converts a dict that was produced by `serialize` back into a
EventContext.
@ -122,32 +157,115 @@ class EventContext(object):
EventContext
"""
context = EventContext()
context.state_group = input["state_group"]
context.rejected = input["rejected"]
context.prev_group = input["prev_group"]
context.delta_ids = _decode_state_dict(input["delta_ids"])
context.prev_state_events = input["prev_state_events"]
# We use the state_group and prev_state_id stuff to pull the
# current_state_ids out of the DB and construct prev_state_ids.
prev_state_id = input["prev_state_id"]
event_type = input["event_type"]
event_state_key = input["event_state_key"]
context._prev_state_id = input["prev_state_id"]
context._event_type = input["event_type"]
context._event_state_key = input["event_state_key"]
context.current_state_ids = yield store.get_state_ids_for_group(
context.state_group,
)
if prev_state_id and event_state_key:
context.prev_state_ids = dict(context.current_state_ids)
context.prev_state_ids[(event_type, event_state_key)] = prev_state_id
else:
context.prev_state_ids = context.current_state_ids
context._current_state_ids = None
context._prev_state_ids = None
context._fetching_state_deferred = None
context.state_group = input["state_group"]
context.prev_group = input["prev_group"]
context.delta_ids = _decode_state_dict(input["delta_ids"])
context.rejected = input["rejected"]
context.prev_state_events = input["prev_state_events"]
app_service_id = input["app_service_id"]
if app_service_id:
context.app_service = store.get_app_service_by_id(app_service_id)
defer.returnValue(context)
return context
@defer.inlineCallbacks
def get_current_state_ids(self, store):
"""Gets the current state IDs
Returns:
Deferred[dict[(str, str), str]|None]: Returns None if state_group
is None, which happens when the associated event is an outlier.
"""
if not self._fetching_state_deferred:
self._fetching_state_deferred = run_in_background(
self._fill_out_state, store,
)
yield make_deferred_yieldable(self._fetching_state_deferred)
defer.returnValue(self._current_state_ids)
@defer.inlineCallbacks
def get_prev_state_ids(self, store):
"""Gets the prev state IDs
Returns:
Deferred[dict[(str, str), str]|None]: Returns None if state_group
is None, which happens when the associated event is an outlier.
"""
if not self._fetching_state_deferred:
self._fetching_state_deferred = run_in_background(
self._fill_out_state, store,
)
yield make_deferred_yieldable(self._fetching_state_deferred)
defer.returnValue(self._prev_state_ids)
def get_cached_current_state_ids(self):
"""Gets the current state IDs if we have them already cached.
Returns:
dict[(str, str), str]|None: Returns None if we haven't cached the
state or if state_group is None, which happens when the associated
event is an outlier.
"""
return self._current_state_ids
@defer.inlineCallbacks
def _fill_out_state(self, store):
"""Called to populate the _current_state_ids and _prev_state_ids
attributes by loading from the database.
"""
if self.state_group is None:
return
self._current_state_ids = yield store.get_state_ids_for_group(
self.state_group,
)
if self._prev_state_id and self._event_state_key is not None:
self._prev_state_ids = dict(self._current_state_ids)
key = (self._event_type, self._event_state_key)
self._prev_state_ids[key] = self._prev_state_id
else:
self._prev_state_ids = self._current_state_ids
@defer.inlineCallbacks
def update_state(self, state_group, prev_state_ids, current_state_ids,
prev_group, delta_ids):
"""Replace the state in the context
"""
# We need to make sure we wait for any ongoing fetching of state
# to complete so that the updated state doesn't get clobbered
if self._fetching_state_deferred:
yield make_deferred_yieldable(self._fetching_state_deferred)
self.state_group = state_group
self._prev_state_ids = prev_state_ids
self.prev_group = prev_group
self._current_state_ids = current_state_ids
self.delta_ids = delta_ids
# We need to ensure that that we've marked as having fetched the state
self._fetching_state_deferred = defer.succeed(None)
def _encode_state_dict(state_dict):
@ -159,7 +277,7 @@ def _encode_state_dict(state_dict):
return [
(etype, state_key, v)
for (etype, state_key), v in state_dict.iteritems()
for (etype, state_key), v in iteritems(state_dict)
]

View file

@ -24,6 +24,7 @@ from prometheus_client import Counter
from twisted.internet import defer
from twisted.internet.abstract import isIPAddress
from twisted.python import failure
from synapse.api.constants import EventTypes
from synapse.api.errors import AuthError, FederationError, NotFoundError, SynapseError
@ -186,8 +187,12 @@ class FederationServer(FederationBase):
logger.warn("Error handling PDU %s: %s", event_id, e)
pdu_results[event_id] = {"error": str(e)}
except Exception as e:
f = failure.Failure()
pdu_results[event_id] = {"error": str(e)}
logger.exception("Failed to handle PDU %s", event_id)
logger.error(
"Failed to handle PDU %s: %s",
event_id, f.getTraceback().rstrip(),
)
yield async.concurrently_execute(
process_pdus_for_room, pdus_by_room.keys(),
@ -203,8 +208,8 @@ class FederationServer(FederationBase):
)
pdu_failures = getattr(transaction, "pdu_failures", [])
for failure in pdu_failures:
logger.info("Got failure %r", failure)
for fail in pdu_failures:
logger.info("Got failure %r", fail)
response = {
"pdus": pdu_results,

View file

@ -30,7 +30,8 @@ from synapse.metrics import (
sent_edus_counter,
sent_transactions_counter,
)
from synapse.util import PreserveLoggingContext, logcontext
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util import logcontext
from synapse.util.metrics import measure_func
from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
@ -165,10 +166,11 @@ class TransactionQueue(object):
if self._is_processing:
return
# fire off a processing loop in the background. It's likely it will
# outlast the current request, so run it in the sentinel logcontext.
with PreserveLoggingContext():
self._process_event_queue_loop()
# fire off a processing loop in the background
run_as_background_process(
"process_event_queue_for_federation",
self._process_event_queue_loop,
)
@defer.inlineCallbacks
def _process_event_queue_loop(self):
@ -432,14 +434,11 @@ class TransactionQueue(object):
logger.debug("TX [%s] Starting transaction loop", destination)
# Drop the logcontext before starting the transaction. It doesn't
# really make sense to log all the outbound transactions against
# whatever path led us to this point: that's pretty arbitrary really.
#
# (this also means we can fire off _perform_transaction without
# yielding)
with logcontext.PreserveLoggingContext():
self._transaction_transmission_loop(destination)
run_as_background_process(
"federation_transaction_transmission_loop",
self._transaction_transmission_loop,
destination,
)
@defer.inlineCallbacks
def _transaction_transmission_loop(self, destination):

View file

@ -404,10 +404,10 @@ class FederationMakeLeaveServlet(BaseFederationServlet):
class FederationSendLeaveServlet(BaseFederationServlet):
PATH = "/send_leave/(?P<room_id>[^/]*)/(?P<txid>[^/]*)"
PATH = "/send_leave/(?P<room_id>[^/]*)/(?P<event_id>[^/]*)"
@defer.inlineCallbacks
def on_PUT(self, origin, content, query, room_id, txid):
def on_PUT(self, origin, content, query, room_id, event_id):
content = yield self.handler.on_send_leave_request(origin, content)
defer.returnValue((200, content))

View file

@ -43,6 +43,7 @@ from signedjson.sign import sign_json
from twisted.internet import defer
from synapse.api.errors import SynapseError
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import get_domain_from_id
from synapse.util.logcontext import run_in_background
@ -129,7 +130,7 @@ class GroupAttestionRenewer(object):
self.attestations = hs.get_groups_attestation_signing()
self._renew_attestations_loop = self.clock.looping_call(
self._renew_attestations, 30 * 60 * 1000,
self._start_renew_attestations, 30 * 60 * 1000,
)
@defer.inlineCallbacks
@ -151,6 +152,9 @@ class GroupAttestionRenewer(object):
defer.returnValue({})
def _start_renew_attestations(self):
return run_as_background_process("renew_attestations", self._renew_attestations)
@defer.inlineCallbacks
def _renew_attestations(self):
"""Called periodically to check if we need to update any of our attestations

View file

@ -17,9 +17,7 @@ from .admin import AdminHandler
from .directory import DirectoryHandler
from .federation import FederationHandler
from .identity import IdentityHandler
from .message import MessageHandler
from .register import RegistrationHandler
from .room import RoomContextHandler
from .search import SearchHandler
@ -44,10 +42,8 @@ class Handlers(object):
def __init__(self, hs):
self.registration_handler = RegistrationHandler(hs)
self.message_handler = MessageHandler(hs)
self.federation_handler = FederationHandler(hs)
self.directory_handler = DirectoryHandler(hs)
self.admin_handler = AdminHandler(hs)
self.identity_handler = IdentityHandler(hs)
self.search_handler = SearchHandler(hs)
self.room_context_handler = RoomContextHandler(hs)

View file

@ -112,8 +112,9 @@ class BaseHandler(object):
guest_access = event.content.get("guest_access", "forbidden")
if guest_access != "can_join":
if context:
current_state_ids = yield context.get_current_state_ids(self.store)
current_state = yield self.store.get_events(
list(context.current_state_ids.values())
list(current_state_ids.values())
)
else:
current_state = yield self.state_handler.get_current_state(

View file

@ -23,6 +23,7 @@ from twisted.internet import defer
import synapse
from synapse.api.constants import EventTypes
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.util.metrics import Measure
@ -106,7 +107,9 @@ class ApplicationServicesHandler(object):
yield self._check_user_exists(event.state_key)
if not self.started_scheduler:
self.scheduler.start().addErrback(log_failure)
def start_scheduler():
return self.scheduler.start().addErrback(log_failure)
run_as_background_process("as_scheduler", start_scheduler)
self.started_scheduler = True
# Fork off pushes to these services

View file

@ -21,8 +21,8 @@ import logging
import sys
import six
from six import iteritems
from six.moves import http_client
from six import iteritems, itervalues
from six.moves import http_client, zip
from signedjson.key import decode_verify_key_bytes
from signedjson.sign import verify_signed_json
@ -43,7 +43,6 @@ from synapse.crypto.event_signing import (
add_hashes_and_signatures,
compute_event_signature,
)
from synapse.events.utils import prune_event
from synapse.events.validator import EventValidator
from synapse.state import resolve_events_with_factory
from synapse.types import UserID, get_domain_from_id
@ -52,8 +51,8 @@ from synapse.util.async import Linearizer
from synapse.util.distributor import user_joined_room
from synapse.util.frozenutils import unfreeze
from synapse.util.logutils import log_function
from synapse.util.metrics import measure_func
from synapse.util.retryutils import NotRetryingDestination
from synapse.visibility import filter_events_for_server
from ._base import BaseHandler
@ -487,7 +486,10 @@ class FederationHandler(BaseHandler):
# joined the room. Don't bother if the user is just
# changing their profile info.
newly_joined = True
prev_state_id = context.prev_state_ids.get(
prev_state_ids = yield context.get_prev_state_ids(self.store)
prev_state_id = prev_state_ids.get(
(event.type, event.state_key)
)
if prev_state_id:
@ -501,137 +503,6 @@ class FederationHandler(BaseHandler):
user = UserID.from_string(event.state_key)
yield user_joined_room(self.distributor, user, event.room_id)
@measure_func("_filter_events_for_server")
@defer.inlineCallbacks
def _filter_events_for_server(self, server_name, room_id, events):
"""Filter the given events for the given server, redacting those the
server can't see.
Assumes the server is currently in the room.
Returns
list[FrozenEvent]
"""
# First lets check to see if all the events have a history visibility
# of "shared" or "world_readable". If thats the case then we don't
# need to check membership (as we know the server is in the room).
event_to_state_ids = yield self.store.get_state_ids_for_events(
frozenset(e.event_id for e in events),
types=(
(EventTypes.RoomHistoryVisibility, ""),
)
)
visibility_ids = set()
for sids in event_to_state_ids.itervalues():
hist = sids.get((EventTypes.RoomHistoryVisibility, ""))
if hist:
visibility_ids.add(hist)
# If we failed to find any history visibility events then the default
# is "shared" visiblity.
if not visibility_ids:
defer.returnValue(events)
event_map = yield self.store.get_events(visibility_ids)
all_open = all(
e.content.get("history_visibility") in (None, "shared", "world_readable")
for e in event_map.itervalues()
)
if all_open:
defer.returnValue(events)
# Ok, so we're dealing with events that have non-trivial visibility
# rules, so we need to also get the memberships of the room.
event_to_state_ids = yield self.store.get_state_ids_for_events(
frozenset(e.event_id for e in events),
types=(
(EventTypes.RoomHistoryVisibility, ""),
(EventTypes.Member, None),
)
)
# We only want to pull out member events that correspond to the
# server's domain.
def check_match(id):
try:
return server_name == get_domain_from_id(id)
except Exception:
return False
# Parses mapping `event_id -> (type, state_key) -> state event_id`
# to get all state ids that we're interested in.
event_map = yield self.store.get_events([
e_id
for key_to_eid in list(event_to_state_ids.values())
for key, e_id in key_to_eid.items()
if key[0] != EventTypes.Member or check_match(key[1])
])
event_to_state = {
e_id: {
key: event_map[inner_e_id]
for key, inner_e_id in key_to_eid.iteritems()
if inner_e_id in event_map
}
for e_id, key_to_eid in event_to_state_ids.iteritems()
}
erased_senders = yield self.store.are_users_erased(
e.sender for e in events,
)
def redact_disallowed(event, state):
# if the sender has been gdpr17ed, always return a redacted
# copy of the event.
if erased_senders[event.sender]:
logger.info(
"Sender of %s has been erased, redacting",
event.event_id,
)
return prune_event(event)
if not state:
return event
history = state.get((EventTypes.RoomHistoryVisibility, ''), None)
if history:
visibility = history.content.get("history_visibility", "shared")
if visibility in ["invited", "joined"]:
# We now loop through all state events looking for
# membership states for the requesting server to determine
# if the server is either in the room or has been invited
# into the room.
for ev in state.itervalues():
if ev.type != EventTypes.Member:
continue
try:
domain = get_domain_from_id(ev.state_key)
except Exception:
continue
if domain != server_name:
continue
memtype = ev.membership
if memtype == Membership.JOIN:
return event
elif memtype == Membership.INVITE:
if visibility == "invited":
return event
else:
return prune_event(event)
return event
defer.returnValue([
redact_disallowed(e, event_to_state[e.event_id])
for e in events
])
@log_function
@defer.inlineCallbacks
def backfill(self, dest, room_id, limit, extremities):
@ -863,7 +734,7 @@ class FederationHandler(BaseHandler):
"""
joined_users = [
(state_key, int(event.depth))
for (e_type, state_key), event in state.iteritems()
for (e_type, state_key), event in iteritems(state)
if e_type == EventTypes.Member
and event.membership == Membership.JOIN
]
@ -880,7 +751,7 @@ class FederationHandler(BaseHandler):
except Exception:
pass
return sorted(joined_domains.iteritems(), key=lambda d: d[1])
return sorted(joined_domains.items(), key=lambda d: d[1])
curr_domains = get_domains_from_state(curr_state)
@ -943,7 +814,7 @@ class FederationHandler(BaseHandler):
tried_domains = set(likely_domains)
tried_domains.add(self.server_name)
event_ids = list(extremities.iterkeys())
event_ids = list(extremities.keys())
logger.debug("calling resolve_state_groups in _maybe_backfill")
resolve = logcontext.preserve_fn(
@ -959,15 +830,15 @@ class FederationHandler(BaseHandler):
states = dict(zip(event_ids, [s.state for s in states]))
state_map = yield self.store.get_events(
[e_id for ids in states.itervalues() for e_id in ids.itervalues()],
[e_id for ids in itervalues(states) for e_id in itervalues(ids)],
get_prev_content=False
)
states = {
key: {
k: state_map[e_id]
for k, e_id in state_dict.iteritems()
for k, e_id in iteritems(state_dict)
if e_id in state_map
} for key, state_dict in states.iteritems()
} for key, state_dict in iteritems(states)
}
for e_id, _ in sorted_extremeties_tuple:
@ -1038,16 +909,6 @@ class FederationHandler(BaseHandler):
[auth_id for auth_id, _ in event.auth_events],
include_given=True
)
for event in auth:
event.signatures.update(
compute_event_signature(
event,
self.hs.hostname,
self.hs.config.signing_key[0]
)
)
defer.returnValue([e for e in auth])
@log_function
@ -1248,10 +1109,12 @@ class FederationHandler(BaseHandler):
user = UserID.from_string(event.state_key)
yield user_joined_room(self.distributor, user, event.room_id)
state_ids = list(context.prev_state_ids.values())
prev_state_ids = yield context.get_prev_state_ids(self.store)
state_ids = list(prev_state_ids.values())
auth_chain = yield self.store.get_auth_chain(state_ids)
state = yield self.store.get_events(list(context.prev_state_ids.values()))
state = yield self.store.get_events(list(prev_state_ids.values()))
defer.returnValue({
"state": list(state.values()),
@ -1416,7 +1279,7 @@ class FederationHandler(BaseHandler):
@log_function
def on_make_leave_request(self, room_id, user_id):
""" We've received a /make_leave/ request, so we create a partial
join event for the room and return that. We do *not* persist or
leave event for the room and return that. We do *not* persist or
process it until the other server has signed it and sent it back.
"""
builder = self.event_builder_factory.new({
@ -1503,18 +1366,6 @@ class FederationHandler(BaseHandler):
del results[(event.type, event.state_key)]
res = list(results.values())
for event in res:
# We sign these again because there was a bug where we
# incorrectly signed things the first time round
if self.is_mine_id(event.event_id):
event.signatures.update(
compute_event_signature(
event,
self.hs.hostname,
self.hs.config.signing_key[0]
)
)
defer.returnValue(res)
else:
defer.returnValue([])
@ -1558,7 +1409,7 @@ class FederationHandler(BaseHandler):
limit
)
events = yield self._filter_events_for_server(origin, room_id, events)
events = yield filter_events_for_server(self.store, origin, events)
defer.returnValue(events)
@ -1586,18 +1437,6 @@ class FederationHandler(BaseHandler):
)
if event:
if self.is_mine_id(event.event_id):
# FIXME: This is a temporary work around where we occasionally
# return events slightly differently than when they were
# originally signed
event.signatures.update(
compute_event_signature(
event,
self.hs.hostname,
self.hs.config.signing_key[0]
)
)
in_room = yield self.auth.check_host_in_room(
event.room_id,
origin
@ -1605,8 +1444,8 @@ class FederationHandler(BaseHandler):
if not in_room:
raise AuthError(403, "Host not in room.")
events = yield self._filter_events_for_server(
origin, event.room_id, [event]
events = yield filter_events_for_server(
self.store, origin, [event],
)
event = events[0]
defer.returnValue(event)
@ -1681,7 +1520,7 @@ class FederationHandler(BaseHandler):
yield self.store.persist_events(
[
(ev_info["event"], context)
for ev_info, context in itertools.izip(event_infos, contexts)
for ev_info, context in zip(event_infos, contexts)
],
backfilled=backfilled,
)
@ -1801,8 +1640,9 @@ class FederationHandler(BaseHandler):
)
if not auth_events:
prev_state_ids = yield context.get_prev_state_ids(self.store)
auth_events_ids = yield self.auth.compute_auth_events(
event, context.prev_state_ids, for_verification=True,
event, prev_state_ids, for_verification=True,
)
auth_events = yield self.store.get_events(auth_events_ids)
auth_events = {
@ -1862,15 +1702,6 @@ class FederationHandler(BaseHandler):
local_auth_chain, remote_auth_chain
)
for event in ret["auth_chain"]:
event.signatures.update(
compute_event_signature(
event,
self.hs.hostname,
self.hs.config.signing_key[0]
)
)
logger.debug("on_query_auth returning: %s", ret)
defer.returnValue(ret)
@ -1896,8 +1727,8 @@ class FederationHandler(BaseHandler):
min_depth=min_depth,
)
missing_events = yield self._filter_events_for_server(
origin, room_id, missing_events,
missing_events = yield filter_events_for_server(
self.store, origin, missing_events,
)
defer.returnValue(missing_events)
@ -2051,9 +1882,10 @@ class FederationHandler(BaseHandler):
break
if do_resolution:
prev_state_ids = yield context.get_prev_state_ids(self.store)
# 1. Get what we think is the auth chain.
auth_ids = yield self.auth.compute_auth_events(
event, context.prev_state_ids
event, prev_state_ids
)
local_auth_chain = yield self.store.get_auth_chain(
auth_ids, include_given=True
@ -2143,21 +1975,34 @@ class FederationHandler(BaseHandler):
k: a.event_id for k, a in iteritems(auth_events)
if k != event_key
}
context.current_state_ids = dict(context.current_state_ids)
context.current_state_ids.update(state_updates)
if context.delta_ids is not None:
context.delta_ids = dict(context.delta_ids)
context.delta_ids.update(state_updates)
context.prev_state_ids = dict(context.prev_state_ids)
context.prev_state_ids.update({
current_state_ids = yield context.get_current_state_ids(self.store)
current_state_ids = dict(current_state_ids)
current_state_ids.update(state_updates)
prev_state_ids = yield context.get_prev_state_ids(self.store)
prev_state_ids = dict(prev_state_ids)
prev_state_ids.update({
k: a.event_id for k, a in iteritems(auth_events)
})
context.state_group = yield self.store.store_state_group(
# create a new state group as a delta from the existing one.
prev_group = context.state_group
state_group = yield self.store.store_state_group(
event.event_id,
event.room_id,
prev_group=context.prev_group,
delta_ids=context.delta_ids,
current_state_ids=context.current_state_ids,
prev_group=prev_group,
delta_ids=state_updates,
current_state_ids=current_state_ids,
)
yield context.update_state(
state_group=state_group,
current_state_ids=current_state_ids,
prev_state_ids=prev_state_ids,
prev_group=prev_group,
delta_ids=state_updates,
)
@defer.inlineCallbacks
@ -2397,7 +2242,8 @@ class FederationHandler(BaseHandler):
event.content["third_party_invite"]["signed"]["token"]
)
original_invite = None
original_invite_id = context.prev_state_ids.get(key)
prev_state_ids = yield context.get_prev_state_ids(self.store)
original_invite_id = prev_state_ids.get(key)
if original_invite_id:
original_invite = yield self.store.get_event(
original_invite_id, allow_none=True
@ -2439,7 +2285,8 @@ class FederationHandler(BaseHandler):
signed = event.content["third_party_invite"]["signed"]
token = signed["token"]
invite_event_id = context.prev_state_ids.get(
prev_state_ids = yield context.get_prev_state_ids(self.store)
invite_event_id = prev_state_ids.get(
(EventTypes.ThirdPartyInvite, token,)
)

View file

@ -148,13 +148,15 @@ class InitialSyncHandler(BaseHandler):
try:
if event.membership == Membership.JOIN:
room_end_token = now_token.room_key
deferred_room_state = self.state_handler.get_current_state(
event.room_id
deferred_room_state = run_in_background(
self.state_handler.get_current_state,
event.room_id,
)
elif event.membership == Membership.LEAVE:
room_end_token = "s%d" % (event.stream_ordering,)
deferred_room_state = self.store.get_state_for_events(
[event.event_id], None
deferred_room_state = run_in_background(
self.store.get_state_for_events,
[event.event_id], None,
)
deferred_room_state.addCallback(
lambda states: states[event.event_id]
@ -387,19 +389,21 @@ class InitialSyncHandler(BaseHandler):
receipts = []
defer.returnValue(receipts)
presence, receipts, (messages, token) = yield defer.gatherResults(
[
run_in_background(get_presence),
run_in_background(get_receipts),
run_in_background(
self.store.get_recent_events_for_room,
room_id,
limit=limit,
end_token=now_token.room_key,
)
],
consumeErrors=True,
).addErrback(unwrapFirstError)
presence, receipts, (messages, token) = yield make_deferred_yieldable(
defer.gatherResults(
[
run_in_background(get_presence),
run_in_background(get_receipts),
run_in_background(
self.store.get_recent_events_for_room,
room_id,
limit=limit,
end_token=now_token.room_key,
)
],
consumeErrors=True,
).addErrback(unwrapFirstError),
)
messages = yield filter_events_for_client(
self.store, user_id, messages, is_peeking=is_peeking,

View file

@ -23,7 +23,6 @@ from canonicaljson import encode_canonical_json, json
from twisted.internet import defer
from twisted.internet.defer import succeed
from twisted.python.failure import Failure
from synapse.api.constants import MAX_DEPTH, EventTypes, Membership
from synapse.api.errors import AuthError, Codes, ConsentNotGivenError, SynapseError
@ -32,247 +31,26 @@ from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
from synapse.replication.http.send_event import send_event_to_master
from synapse.types import RoomAlias, RoomStreamToken, UserID
from synapse.util.async import Limiter, ReadWriteLock
from synapse.types import RoomAlias, UserID
from synapse.util.async import Linearizer
from synapse.util.frozenutils import frozendict_json_encoder
from synapse.util.logcontext import run_in_background
from synapse.util.metrics import measure_func
from synapse.util.stringutils import random_string
from synapse.visibility import filter_events_for_client
from ._base import BaseHandler
logger = logging.getLogger(__name__)
class PurgeStatus(object):
"""Object tracking the status of a purge request
This class contains information on the progress of a purge request, for
return by get_purge_status.
Attributes:
status (int): Tracks whether this request has completed. One of
STATUS_{ACTIVE,COMPLETE,FAILED}
class MessageHandler(object):
"""Contains some read only APIs to get state about a room
"""
STATUS_ACTIVE = 0
STATUS_COMPLETE = 1
STATUS_FAILED = 2
STATUS_TEXT = {
STATUS_ACTIVE: "active",
STATUS_COMPLETE: "complete",
STATUS_FAILED: "failed",
}
def __init__(self):
self.status = PurgeStatus.STATUS_ACTIVE
def asdict(self):
return {
"status": PurgeStatus.STATUS_TEXT[self.status]
}
class MessageHandler(BaseHandler):
def __init__(self, hs):
super(MessageHandler, self).__init__(hs)
self.hs = hs
self.state = hs.get_state_handler()
self.auth = hs.get_auth()
self.clock = hs.get_clock()
self.pagination_lock = ReadWriteLock()
self._purges_in_progress_by_room = set()
# map from purge id to PurgeStatus
self._purges_by_id = {}
def start_purge_history(self, room_id, token,
delete_local_events=False):
"""Start off a history purge on a room.
Args:
room_id (str): The room to purge from
token (str): topological token to delete events before
delete_local_events (bool): True to delete local events as well as
remote ones
Returns:
str: unique ID for this purge transaction.
"""
if room_id in self._purges_in_progress_by_room:
raise SynapseError(
400,
"History purge already in progress for %s" % (room_id, ),
)
purge_id = random_string(16)
# we log the purge_id here so that it can be tied back to the
# request id in the log lines.
logger.info("[purge] starting purge_id %s", purge_id)
self._purges_by_id[purge_id] = PurgeStatus()
run_in_background(
self._purge_history,
purge_id, room_id, token, delete_local_events,
)
return purge_id
@defer.inlineCallbacks
def _purge_history(self, purge_id, room_id, token,
delete_local_events):
"""Carry out a history purge on a room.
Args:
purge_id (str): The id for this purge
room_id (str): The room to purge from
token (str): topological token to delete events before
delete_local_events (bool): True to delete local events as well as
remote ones
Returns:
Deferred
"""
self._purges_in_progress_by_room.add(room_id)
try:
with (yield self.pagination_lock.write(room_id)):
yield self.store.purge_history(
room_id, token, delete_local_events,
)
logger.info("[purge] complete")
self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
except Exception:
logger.error("[purge] failed: %s", Failure().getTraceback().rstrip())
self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED
finally:
self._purges_in_progress_by_room.discard(room_id)
# remove the purge from the list 24 hours after it completes
def clear_purge():
del self._purges_by_id[purge_id]
self.hs.get_reactor().callLater(24 * 3600, clear_purge)
def get_purge_status(self, purge_id):
"""Get the current status of an active purge
Args:
purge_id (str): purge_id returned by start_purge_history
Returns:
PurgeStatus|None
"""
return self._purges_by_id.get(purge_id)
@defer.inlineCallbacks
def get_messages(self, requester, room_id=None, pagin_config=None,
as_client_event=True, event_filter=None):
"""Get messages in a room.
Args:
requester (Requester): The user requesting messages.
room_id (str): The room they want messages from.
pagin_config (synapse.api.streams.PaginationConfig): The pagination
config rules to apply, if any.
as_client_event (bool): True to get events in client-server format.
event_filter (Filter): Filter to apply to results or None
Returns:
dict: Pagination API results
"""
user_id = requester.user.to_string()
if pagin_config.from_token:
room_token = pagin_config.from_token.room_key
else:
pagin_config.from_token = (
yield self.hs.get_event_sources().get_current_token_for_room(
room_id=room_id
)
)
room_token = pagin_config.from_token.room_key
room_token = RoomStreamToken.parse(room_token)
pagin_config.from_token = pagin_config.from_token.copy_and_replace(
"room_key", str(room_token)
)
source_config = pagin_config.get_source_config("room")
with (yield self.pagination_lock.read(room_id)):
membership, member_event_id = yield self._check_in_room_or_world_readable(
room_id, user_id
)
if source_config.direction == 'b':
# if we're going backwards, we might need to backfill. This
# requires that we have a topo token.
if room_token.topological:
max_topo = room_token.topological
else:
max_topo = yield self.store.get_max_topological_token(
room_id, room_token.stream
)
if membership == Membership.LEAVE:
# If they have left the room then clamp the token to be before
# they left the room, to save the effort of loading from the
# database.
leave_token = yield self.store.get_topological_token_for_event(
member_event_id
)
leave_token = RoomStreamToken.parse(leave_token)
if leave_token.topological < max_topo:
source_config.from_key = str(leave_token)
yield self.hs.get_handlers().federation_handler.maybe_backfill(
room_id, max_topo
)
events, next_key = yield self.store.paginate_room_events(
room_id=room_id,
from_key=source_config.from_key,
to_key=source_config.to_key,
direction=source_config.direction,
limit=source_config.limit,
event_filter=event_filter,
)
next_token = pagin_config.from_token.copy_and_replace(
"room_key", next_key
)
if not events:
defer.returnValue({
"chunk": [],
"start": pagin_config.from_token.to_string(),
"end": next_token.to_string(),
})
if event_filter:
events = event_filter.filter(events)
events = yield filter_events_for_client(
self.store,
user_id,
events,
is_peeking=(member_event_id is None),
)
time_now = self.clock.time_msec()
chunk = {
"chunk": [
serialize_event(e, time_now, as_client_event)
for e in events
],
"start": pagin_config.from_token.to_string(),
"end": next_token.to_string(),
}
defer.returnValue(chunk)
self.state = hs.get_state_handler()
self.store = hs.get_datastore()
@defer.inlineCallbacks
def get_room_data(self, user_id=None, room_id=None,
@ -286,12 +64,12 @@ class MessageHandler(BaseHandler):
Raises:
SynapseError if something went wrong.
"""
membership, membership_event_id = yield self._check_in_room_or_world_readable(
membership, membership_event_id = yield self.auth.check_in_room_or_world_readable(
room_id, user_id
)
if membership == Membership.JOIN:
data = yield self.state_handler.get_current_state(
data = yield self.state.get_current_state(
room_id, event_type, state_key
)
elif membership == Membership.LEAVE:
@ -303,31 +81,6 @@ class MessageHandler(BaseHandler):
defer.returnValue(data)
@defer.inlineCallbacks
def _check_in_room_or_world_readable(self, room_id, user_id):
try:
# check_user_was_in_room will return the most recent membership
# event for the user if:
# * The user is a non-guest user, and was ever in the room
# * The user is a guest user, and has joined the room
# else it will throw.
member_event = yield self.auth.check_user_was_in_room(room_id, user_id)
defer.returnValue((member_event.membership, member_event.event_id))
return
except AuthError:
visibility = yield self.state_handler.get_current_state(
room_id, EventTypes.RoomHistoryVisibility, ""
)
if (
visibility and
visibility.content["history_visibility"] == "world_readable"
):
defer.returnValue((Membership.JOIN, None))
return
raise AuthError(
403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN
)
@defer.inlineCallbacks
def get_state_events(self, user_id, room_id, is_guest=False):
"""Retrieve all state events for a given room. If the user is
@ -340,12 +93,12 @@ class MessageHandler(BaseHandler):
Returns:
A list of dicts representing state events. [{}, {}, {}]
"""
membership, membership_event_id = yield self._check_in_room_or_world_readable(
membership, membership_event_id = yield self.auth.check_in_room_or_world_readable(
room_id, user_id
)
if membership == Membership.JOIN:
room_state = yield self.state_handler.get_current_state(room_id)
room_state = yield self.state.get_current_state(room_id)
elif membership == Membership.LEAVE:
room_state = yield self.store.get_state_for_events(
[membership_event_id], None
@ -373,7 +126,7 @@ class MessageHandler(BaseHandler):
if not requester.app_service:
# We check AS auth after fetching the room membership, as it
# requires us to pull out all joined members anyway.
membership, _ = yield self._check_in_room_or_world_readable(
membership, _ = yield self.auth.check_in_room_or_world_readable(
room_id, user_id
)
if membership != Membership.JOIN:
@ -427,7 +180,7 @@ class EventCreationHandler(object):
# We arbitrarily limit concurrent event creation for a room to 5.
# This is to stop us from diverging history *too* much.
self.limiter = Limiter(max_count=5)
self.limiter = Linearizer(max_count=5, name="room_event_creation_limit")
self.action_generator = hs.get_action_generator()
@ -630,7 +383,8 @@ class EventCreationHandler(object):
If so, returns the version of the event in context.
Otherwise, returns None.
"""
prev_event_id = context.prev_state_ids.get((event.type, event.state_key))
prev_state_ids = yield context.get_prev_state_ids(self.store)
prev_event_id = prev_state_ids.get((event.type, event.state_key))
prev_event = yield self.store.get_event(prev_event_id, allow_none=True)
if not prev_event:
return
@ -752,8 +506,8 @@ class EventCreationHandler(object):
event = builder.build()
logger.debug(
"Created event %s with state: %s",
event.event_id, context.prev_state_ids,
"Created event %s",
event.event_id,
)
defer.returnValue(
@ -806,8 +560,9 @@ class EventCreationHandler(object):
# If we're a worker we need to hit out to the master.
if self.config.worker_app:
yield send_event_to_master(
self.hs.get_clock(),
self.http_client,
clock=self.hs.get_clock(),
store=self.store,
client=self.http_client,
host=self.config.worker_replication_host,
port=self.config.worker_replication_http_port,
requester=requester,
@ -884,9 +639,11 @@ class EventCreationHandler(object):
e.sender == event.sender
)
current_state_ids = yield context.get_current_state_ids(self.store)
state_to_include_ids = [
e_id
for k, e_id in iteritems(context.current_state_ids)
for k, e_id in iteritems(current_state_ids)
if k[0] in self.hs.config.room_invite_state_types
or k == (EventTypes.Member, event.sender)
]
@ -922,8 +679,9 @@ class EventCreationHandler(object):
)
if event.type == EventTypes.Redaction:
prev_state_ids = yield context.get_prev_state_ids(self.store)
auth_events_ids = yield self.auth.compute_auth_events(
event, context.prev_state_ids, for_verification=True,
event, prev_state_ids, for_verification=True,
)
auth_events = yield self.store.get_events(auth_events_ids)
auth_events = {
@ -943,11 +701,13 @@ class EventCreationHandler(object):
"You don't have permission to redact events"
)
if event.type == EventTypes.Create and context.prev_state_ids:
raise AuthError(
403,
"Changing the room create event is forbidden",
)
if event.type == EventTypes.Create:
prev_state_ids = yield context.get_prev_state_ids(self.store)
if prev_state_ids:
raise AuthError(
403,
"Changing the room create event is forbidden",
)
(event_stream_id, max_stream_id) = yield self.store.persist_event(
event, context=context

View file

@ -0,0 +1,265 @@
# -*- coding: utf-8 -*-
# Copyright 2014 - 2016 OpenMarket Ltd
# Copyright 2017 - 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from twisted.internet import defer
from twisted.python.failure import Failure
from synapse.api.constants import Membership
from synapse.api.errors import SynapseError
from synapse.events.utils import serialize_event
from synapse.types import RoomStreamToken
from synapse.util.async import ReadWriteLock
from synapse.util.logcontext import run_in_background
from synapse.util.stringutils import random_string
from synapse.visibility import filter_events_for_client
logger = logging.getLogger(__name__)
class PurgeStatus(object):
"""Object tracking the status of a purge request
This class contains information on the progress of a purge request, for
return by get_purge_status.
Attributes:
status (int): Tracks whether this request has completed. One of
STATUS_{ACTIVE,COMPLETE,FAILED}
"""
STATUS_ACTIVE = 0
STATUS_COMPLETE = 1
STATUS_FAILED = 2
STATUS_TEXT = {
STATUS_ACTIVE: "active",
STATUS_COMPLETE: "complete",
STATUS_FAILED: "failed",
}
def __init__(self):
self.status = PurgeStatus.STATUS_ACTIVE
def asdict(self):
return {
"status": PurgeStatus.STATUS_TEXT[self.status]
}
class PaginationHandler(object):
"""Handles pagination and purge history requests.
These are in the same handler due to the fact we need to block clients
paginating during a purge.
"""
def __init__(self, hs):
self.hs = hs
self.auth = hs.get_auth()
self.store = hs.get_datastore()
self.clock = hs.get_clock()
self.pagination_lock = ReadWriteLock()
self._purges_in_progress_by_room = set()
# map from purge id to PurgeStatus
self._purges_by_id = {}
def start_purge_history(self, room_id, token,
delete_local_events=False):
"""Start off a history purge on a room.
Args:
room_id (str): The room to purge from
token (str): topological token to delete events before
delete_local_events (bool): True to delete local events as well as
remote ones
Returns:
str: unique ID for this purge transaction.
"""
if room_id in self._purges_in_progress_by_room:
raise SynapseError(
400,
"History purge already in progress for %s" % (room_id, ),
)
purge_id = random_string(16)
# we log the purge_id here so that it can be tied back to the
# request id in the log lines.
logger.info("[purge] starting purge_id %s", purge_id)
self._purges_by_id[purge_id] = PurgeStatus()
run_in_background(
self._purge_history,
purge_id, room_id, token, delete_local_events,
)
return purge_id
@defer.inlineCallbacks
def _purge_history(self, purge_id, room_id, token,
delete_local_events):
"""Carry out a history purge on a room.
Args:
purge_id (str): The id for this purge
room_id (str): The room to purge from
token (str): topological token to delete events before
delete_local_events (bool): True to delete local events as well as
remote ones
Returns:
Deferred
"""
self._purges_in_progress_by_room.add(room_id)
try:
with (yield self.pagination_lock.write(room_id)):
yield self.store.purge_history(
room_id, token, delete_local_events,
)
logger.info("[purge] complete")
self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
except Exception:
logger.error("[purge] failed: %s", Failure().getTraceback().rstrip())
self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED
finally:
self._purges_in_progress_by_room.discard(room_id)
# remove the purge from the list 24 hours after it completes
def clear_purge():
del self._purges_by_id[purge_id]
self.hs.get_reactor().callLater(24 * 3600, clear_purge)
def get_purge_status(self, purge_id):
"""Get the current status of an active purge
Args:
purge_id (str): purge_id returned by start_purge_history
Returns:
PurgeStatus|None
"""
return self._purges_by_id.get(purge_id)
@defer.inlineCallbacks
def get_messages(self, requester, room_id=None, pagin_config=None,
as_client_event=True, event_filter=None):
"""Get messages in a room.
Args:
requester (Requester): The user requesting messages.
room_id (str): The room they want messages from.
pagin_config (synapse.api.streams.PaginationConfig): The pagination
config rules to apply, if any.
as_client_event (bool): True to get events in client-server format.
event_filter (Filter): Filter to apply to results or None
Returns:
dict: Pagination API results
"""
user_id = requester.user.to_string()
if pagin_config.from_token:
room_token = pagin_config.from_token.room_key
else:
pagin_config.from_token = (
yield self.hs.get_event_sources().get_current_token_for_room(
room_id=room_id
)
)
room_token = pagin_config.from_token.room_key
room_token = RoomStreamToken.parse(room_token)
pagin_config.from_token = pagin_config.from_token.copy_and_replace(
"room_key", str(room_token)
)
source_config = pagin_config.get_source_config("room")
with (yield self.pagination_lock.read(room_id)):
membership, member_event_id = yield self.auth.check_in_room_or_world_readable(
room_id, user_id
)
if source_config.direction == 'b':
# if we're going backwards, we might need to backfill. This
# requires that we have a topo token.
if room_token.topological:
max_topo = room_token.topological
else:
max_topo = yield self.store.get_max_topological_token(
room_id, room_token.stream
)
if membership == Membership.LEAVE:
# If they have left the room then clamp the token to be before
# they left the room, to save the effort of loading from the
# database.
leave_token = yield self.store.get_topological_token_for_event(
member_event_id
)
leave_token = RoomStreamToken.parse(leave_token)
if leave_token.topological < max_topo:
source_config.from_key = str(leave_token)
yield self.hs.get_handlers().federation_handler.maybe_backfill(
room_id, max_topo
)
events, next_key = yield self.store.paginate_room_events(
room_id=room_id,
from_key=source_config.from_key,
to_key=source_config.to_key,
direction=source_config.direction,
limit=source_config.limit,
event_filter=event_filter,
)
next_token = pagin_config.from_token.copy_and_replace(
"room_key", next_key
)
if not events:
defer.returnValue({
"chunk": [],
"start": pagin_config.from_token.to_string(),
"end": next_token.to_string(),
})
if event_filter:
events = event_filter.filter(events)
events = yield filter_events_for_client(
self.store,
user_id,
events,
is_peeking=(member_event_id is None),
)
time_now = self.clock.time_msec()
chunk = {
"chunk": [
serialize_event(e, time_now, as_client_event)
for e in events
],
"start": pagin_config.from_token.to_string(),
"end": next_token.to_string(),
}
defer.returnValue(chunk)

View file

@ -18,6 +18,7 @@ import logging
from twisted.internet import defer
from synapse.api.errors import AuthError, CodeMessageException, SynapseError
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import UserID, get_domain_from_id
from ._base import BaseHandler
@ -41,7 +42,7 @@ class ProfileHandler(BaseHandler):
if hs.config.worker_app is None:
self.clock.looping_call(
self._update_remote_profile_cache, self.PROFILE_UPDATE_MS,
self._start_update_remote_profile_cache, self.PROFILE_UPDATE_MS,
)
@defer.inlineCallbacks
@ -254,6 +255,12 @@ class ProfileHandler(BaseHandler):
room_id, str(e.message)
)
def _start_update_remote_profile_cache(self):
return run_as_background_process(
"Update remote profile", self._update_remote_profile_cache,
)
@defer.inlineCallbacks
def _update_remote_profile_cache(self):
"""Called periodically to check profiles of remote users we haven't
checked in a while.

View file

@ -24,7 +24,7 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes, JoinRules, RoomCreationPreset
from synapse.api.errors import AuthError, Codes, StoreError, SynapseError
from synapse.types import RoomAlias, RoomID, RoomStreamToken, UserID
from synapse.types import RoomAlias, RoomID, RoomStreamToken, StreamToken, UserID
from synapse.util import stringutils
from synapse.visibility import filter_events_for_client
@ -395,7 +395,11 @@ class RoomCreationHandler(BaseHandler):
)
class RoomContextHandler(BaseHandler):
class RoomContextHandler(object):
def __init__(self, hs):
self.hs = hs
self.store = hs.get_datastore()
@defer.inlineCallbacks
def get_event_context(self, user, room_id, event_id, limit):
"""Retrieves events, pagination tokens and state around a given event
@ -414,8 +418,6 @@ class RoomContextHandler(BaseHandler):
before_limit = math.floor(limit / 2.)
after_limit = limit - before_limit
now_token = yield self.hs.get_event_sources().get_current_token()
users = yield self.store.get_users_in_room(room_id)
is_peeking = user.to_string() not in users
@ -458,11 +460,15 @@ class RoomContextHandler(BaseHandler):
)
results["state"] = list(state[last_event_id].values())
results["start"] = now_token.copy_and_replace(
# We use a dummy token here as we only care about the room portion of
# the token, which we replace.
token = StreamToken.START
results["start"] = token.copy_and_replace(
"room_key", results["start"]
).to_string()
results["end"] = now_token.copy_and_replace(
results["end"] = token.copy_and_replace(
"room_key", results["end"]
).to_string()

View file

@ -201,7 +201,9 @@ class RoomMemberHandler(object):
ratelimit=ratelimit,
)
prev_member_event_id = context.prev_state_ids.get(
prev_state_ids = yield context.get_prev_state_ids(self.store)
prev_member_event_id = prev_state_ids.get(
(EventTypes.Member, target.to_string()),
None
)
@ -496,9 +498,10 @@ class RoomMemberHandler(object):
if prev_event is not None:
return
prev_state_ids = yield context.get_prev_state_ids(self.store)
if event.membership == Membership.JOIN:
if requester.is_guest:
guest_can_join = yield self._can_guest_join(context.prev_state_ids)
guest_can_join = yield self._can_guest_join(prev_state_ids)
if not guest_can_join:
# This should be an auth check, but guests are a local concept,
# so don't really fit into the general auth process.
@ -517,7 +520,7 @@ class RoomMemberHandler(object):
ratelimit=ratelimit,
)
prev_member_event_id = context.prev_state_ids.get(
prev_member_event_id = prev_state_ids.get(
(EventTypes.Member, event.state_key),
None
)

View file

@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2015 - 2016 OpenMarket Ltd
# Copyright 2015, 2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -25,6 +26,8 @@ from synapse.api.constants import EventTypes, Membership
from synapse.push.clientformat import format_push_rules_for_user
from synapse.types import RoomStreamToken
from synapse.util.async import concurrently_execute
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.caches.lrucache import LruCache
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.logcontext import LoggingContext
from synapse.util.metrics import Measure, measure_func
@ -32,6 +35,14 @@ from synapse.visibility import filter_events_for_client
logger = logging.getLogger(__name__)
# Store the cache that tracks which lazy-loaded members have been sent to a given
# client for no more than 30 minutes.
LAZY_LOADED_MEMBERS_CACHE_MAX_AGE = 30 * 60 * 1000
# Remember the last 100 members we sent to a client for the purposes of
# avoiding redundantly sending the same lazy-loaded members to the client
LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE = 100
SyncConfig = collections.namedtuple("SyncConfig", [
"user",
@ -181,6 +192,12 @@ class SyncHandler(object):
self.response_cache = ResponseCache(hs, "sync")
self.state = hs.get_state_handler()
# ExpiringCache((User, Device)) -> LruCache(state_key => event_id)
self.lazy_loaded_members_cache = ExpiringCache(
"lazy_loaded_members_cache", self.clock,
max_len=0, expiry_ms=LAZY_LOADED_MEMBERS_CACHE_MAX_AGE,
)
def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
full_state=False):
"""Get the sync for a client if we have new data for it now. Otherwise
@ -416,29 +433,44 @@ class SyncHandler(object):
))
@defer.inlineCallbacks
def get_state_after_event(self, event):
def get_state_after_event(self, event, types=None, filtered_types=None):
"""
Get the room state after the given event
Args:
event(synapse.events.EventBase): event of interest
types(list[(str, str|None)]|None): List of (type, state_key) tuples
which are used to filter the state fetched. If `state_key` is None,
all events are returned of the given type.
May be None, which matches any key.
filtered_types(list[str]|None): Only apply filtering via `types` to this
list of event types. Other types of events are returned unfiltered.
If None, `types` filtering is applied to all events.
Returns:
A Deferred map from ((type, state_key)->Event)
"""
state_ids = yield self.store.get_state_ids_for_event(event.event_id)
state_ids = yield self.store.get_state_ids_for_event(
event.event_id, types, filtered_types=filtered_types,
)
if event.is_state():
state_ids = state_ids.copy()
state_ids[(event.type, event.state_key)] = event.event_id
defer.returnValue(state_ids)
@defer.inlineCallbacks
def get_state_at(self, room_id, stream_position):
def get_state_at(self, room_id, stream_position, types=None, filtered_types=None):
""" Get the room state at a particular stream position
Args:
room_id(str): room for which to get state
stream_position(StreamToken): point at which to get state
types(list[(str, str|None)]|None): List of (type, state_key) tuples
which are used to filter the state fetched. If `state_key` is None,
all events are returned of the given type.
filtered_types(list[str]|None): Only apply filtering via `types` to this
list of event types. Other types of events are returned unfiltered.
If None, `types` filtering is applied to all events.
Returns:
A Deferred map from ((type, state_key)->Event)
@ -453,7 +485,9 @@ class SyncHandler(object):
if last_events:
last_event = last_events[-1]
state = yield self.get_state_after_event(last_event)
state = yield self.get_state_after_event(
last_event, types, filtered_types=filtered_types,
)
else:
# no events in this room - so presumably no state
@ -485,59 +519,129 @@ class SyncHandler(object):
# TODO(mjark) Check for new redactions in the state events.
with Measure(self.clock, "compute_state_delta"):
types = None
filtered_types = None
lazy_load_members = sync_config.filter_collection.lazy_load_members()
include_redundant_members = (
sync_config.filter_collection.include_redundant_members()
)
if lazy_load_members:
# We only request state for the members needed to display the
# timeline:
types = [
(EventTypes.Member, state_key)
for state_key in set(
event.sender # FIXME: we also care about invite targets etc.
for event in batch.events
)
]
# only apply the filtering to room members
filtered_types = [EventTypes.Member]
timeline_state = {
(event.type, event.state_key): event.event_id
for event in batch.events if event.is_state()
}
if full_state:
if batch:
current_state_ids = yield self.store.get_state_ids_for_event(
batch.events[-1].event_id
batch.events[-1].event_id, types=types,
filtered_types=filtered_types,
)
state_ids = yield self.store.get_state_ids_for_event(
batch.events[0].event_id
batch.events[0].event_id, types=types,
filtered_types=filtered_types,
)
else:
current_state_ids = yield self.get_state_at(
room_id, stream_position=now_token
room_id, stream_position=now_token, types=types,
filtered_types=filtered_types,
)
state_ids = current_state_ids
timeline_state = {
(event.type, event.state_key): event.event_id
for event in batch.events if event.is_state()
}
state_ids = _calculate_state(
timeline_contains=timeline_state,
timeline_start=state_ids,
previous={},
current=current_state_ids,
lazy_load_members=lazy_load_members,
)
elif batch.limited:
state_at_previous_sync = yield self.get_state_at(
room_id, stream_position=since_token
room_id, stream_position=since_token, types=types,
filtered_types=filtered_types,
)
current_state_ids = yield self.store.get_state_ids_for_event(
batch.events[-1].event_id
batch.events[-1].event_id, types=types,
filtered_types=filtered_types,
)
state_at_timeline_start = yield self.store.get_state_ids_for_event(
batch.events[0].event_id
batch.events[0].event_id, types=types,
filtered_types=filtered_types,
)
timeline_state = {
(event.type, event.state_key): event.event_id
for event in batch.events if event.is_state()
}
state_ids = _calculate_state(
timeline_contains=timeline_state,
timeline_start=state_at_timeline_start,
previous=state_at_previous_sync,
current=current_state_ids,
lazy_load_members=lazy_load_members,
)
else:
state_ids = {}
if lazy_load_members:
if types:
state_ids = yield self.store.get_state_ids_for_event(
batch.events[0].event_id, types=types,
filtered_types=filtered_types,
)
if lazy_load_members and not include_redundant_members:
cache_key = (sync_config.user.to_string(), sync_config.device_id)
cache = self.lazy_loaded_members_cache.get(cache_key)
if cache is None:
logger.debug("creating LruCache for %r", cache_key)
cache = LruCache(LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE)
self.lazy_loaded_members_cache[cache_key] = cache
else:
logger.debug("found LruCache for %r", cache_key)
# if it's a new sync sequence, then assume the client has had
# amnesia and doesn't want any recent lazy-loaded members
# de-duplicated.
if since_token is None:
logger.debug("clearing LruCache for %r", cache_key)
cache.clear()
else:
# only send members which aren't in our LruCache (either
# because they're new to this client or have been pushed out
# of the cache)
logger.debug("filtering state from %r...", state_ids)
state_ids = {
t: event_id
for t, event_id in state_ids.iteritems()
if cache.get(t[1]) != event_id
}
logger.debug("...to %r", state_ids)
# add any member IDs we are about to send into our LruCache
for t, event_id in itertools.chain(
state_ids.items(),
timeline_state.items(),
):
if t[0] == EventTypes.Member:
cache.set(t[1], event_id)
state = {}
if state_ids:
@ -1448,7 +1552,9 @@ def _action_has_highlight(actions):
return False
def _calculate_state(timeline_contains, timeline_start, previous, current):
def _calculate_state(
timeline_contains, timeline_start, previous, current, lazy_load_members,
):
"""Works out what state to include in a sync response.
Args:
@ -1457,6 +1563,9 @@ def _calculate_state(timeline_contains, timeline_start, previous, current):
previous (dict): state at the end of the previous sync (or empty dict
if this is an initial sync)
current (dict): state at the end of the timeline
lazy_load_members (bool): whether to return members from timeline_start
or not. assumes that timeline_start has already been filtered to
include only the members the client needs to know about.
Returns:
dict
@ -1472,9 +1581,25 @@ def _calculate_state(timeline_contains, timeline_start, previous, current):
}
c_ids = set(e for e in current.values())
tc_ids = set(e for e in timeline_contains.values())
p_ids = set(e for e in previous.values())
ts_ids = set(e for e in timeline_start.values())
p_ids = set(e for e in previous.values())
tc_ids = set(e for e in timeline_contains.values())
# If we are lazyloading room members, we explicitly add the membership events
# for the senders in the timeline into the state block returned by /sync,
# as we may not have sent them to the client before. We find these membership
# events by filtering them out of timeline_start, which has already been filtered
# to only include membership events for the senders in the timeline.
# In practice, we can do this by removing them from the p_ids list,
# which is the list of relevant state we know we have already sent to the client.
# see https://github.com/matrix-org/synapse/pull/2970
# /files/efcdacad7d1b7f52f879179701c7e0d9b763511f#r204732809
if lazy_load_members:
p_ids.difference_update(
e for t, e in timeline_start.iteritems()
if t[0] == EventTypes.Member
)
state_ids = ((c_ids | ts_ids) - p_ids) - tc_ids

View file

@ -26,9 +26,11 @@ from OpenSSL.SSL import VERIFY_NONE
from twisted.internet import defer, protocol, reactor, ssl, task
from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
from twisted.web._newclient import ResponseDone
from twisted.web.client import Agent, BrowserLikeRedirectAgent, ContentDecoderAgent
from twisted.web.client import FileBodyProducer as TwistedFileBodyProducer
from twisted.web.client import (
Agent,
BrowserLikeRedirectAgent,
ContentDecoderAgent,
FileBodyProducer as TwistedFileBodyProducer,
GzipDecoder,
HTTPConnectionPool,
PartialDownloadError,

View file

@ -38,7 +38,8 @@ outgoing_responses_counter = Counter(
)
response_timer = Histogram(
"synapse_http_server_response_time_seconds", "sec", ["method", "servlet", "tag"]
"synapse_http_server_response_time_seconds", "sec",
["method", "servlet", "tag", "code"],
)
response_ru_utime = Counter(
@ -171,11 +172,13 @@ class RequestMetrics(object):
)
return
outgoing_responses_counter.labels(request.method, str(request.code)).inc()
response_code = str(request.code)
outgoing_responses_counter.labels(request.method, response_code).inc()
response_count.labels(request.method, self.name, tag).inc()
response_timer.labels(request.method, self.name, tag).observe(
response_timer.labels(request.method, self.name, tag, response_code).observe(
time_sec - self.start
)

View file

@ -20,7 +20,7 @@ from twisted.web.server import Request, Site
from synapse.http import redact_uri
from synapse.http.request_metrics import RequestMetrics
from synapse.util.logcontext import LoggingContext, ContextResourceUsage
from synapse.util.logcontext import ContextResourceUsage, LoggingContext
logger = logging.getLogger(__name__)
@ -42,9 +42,10 @@ class SynapseRequest(Request):
which is handling the request, and returns a context manager.
"""
def __init__(self, site, *args, **kw):
Request.__init__(self, *args, **kw)
def __init__(self, site, channel, *args, **kw):
Request.__init__(self, channel, *args, **kw)
self.site = site
self._channel = channel
self.authenticated_entity = None
self.start_time = 0

View file

@ -0,0 +1,185 @@
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import six
from prometheus_client.core import REGISTRY, Counter, GaugeMetricFamily
from twisted.internet import defer
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
_background_process_start_count = Counter(
"synapse_background_process_start_count",
"Number of background processes started",
["name"],
)
# we set registry=None in all of these to stop them getting registered with
# the default registry. Instead we collect them all via the CustomCollector,
# which ensures that we can update them before they are collected.
#
_background_process_ru_utime = Counter(
"synapse_background_process_ru_utime_seconds",
"User CPU time used by background processes, in seconds",
["name"],
registry=None,
)
_background_process_ru_stime = Counter(
"synapse_background_process_ru_stime_seconds",
"System CPU time used by background processes, in seconds",
["name"],
registry=None,
)
_background_process_db_txn_count = Counter(
"synapse_background_process_db_txn_count",
"Number of database transactions done by background processes",
["name"],
registry=None,
)
_background_process_db_txn_duration = Counter(
"synapse_background_process_db_txn_duration_seconds",
("Seconds spent by background processes waiting for database "
"transactions, excluding scheduling time"),
["name"],
registry=None,
)
_background_process_db_sched_duration = Counter(
"synapse_background_process_db_sched_duration_seconds",
"Seconds spent by background processes waiting for database connections",
["name"],
registry=None,
)
# map from description to a counter, so that we can name our logcontexts
# incrementally. (It actually duplicates _background_process_start_count, but
# it's much simpler to do so than to try to combine them.)
_background_process_counts = dict() # type: dict[str, int]
# map from description to the currently running background processes.
#
# it's kept as a dict of sets rather than a big set so that we can keep track
# of process descriptions that no longer have any active processes.
_background_processes = dict() # type: dict[str, set[_BackgroundProcess]]
class _Collector(object):
"""A custom metrics collector for the background process metrics.
Ensures that all of the metrics are up-to-date with any in-flight processes
before they are returned.
"""
def collect(self):
background_process_in_flight_count = GaugeMetricFamily(
"synapse_background_process_in_flight_count",
"Number of background processes in flight",
labels=["name"],
)
for desc, processes in six.iteritems(_background_processes):
background_process_in_flight_count.add_metric(
(desc,), len(processes),
)
for process in processes:
process.update_metrics()
yield background_process_in_flight_count
# now we need to run collect() over each of the static Counters, and
# yield each metric they return.
for m in (
_background_process_ru_utime,
_background_process_ru_stime,
_background_process_db_txn_count,
_background_process_db_txn_duration,
_background_process_db_sched_duration,
):
for r in m.collect():
yield r
REGISTRY.register(_Collector())
class _BackgroundProcess(object):
def __init__(self, desc, ctx):
self.desc = desc
self._context = ctx
self._reported_stats = None
def update_metrics(self):
"""Updates the metrics with values from this process."""
new_stats = self._context.get_resource_usage()
if self._reported_stats is None:
diff = new_stats
else:
diff = new_stats - self._reported_stats
self._reported_stats = new_stats
_background_process_ru_utime.labels(self.desc).inc(diff.ru_utime)
_background_process_ru_stime.labels(self.desc).inc(diff.ru_stime)
_background_process_db_txn_count.labels(self.desc).inc(
diff.db_txn_count,
)
_background_process_db_txn_duration.labels(self.desc).inc(
diff.db_txn_duration_sec,
)
_background_process_db_sched_duration.labels(self.desc).inc(
diff.db_sched_duration_sec,
)
def run_as_background_process(desc, func, *args, **kwargs):
"""Run the given function in its own logcontext, with resource metrics
This should be used to wrap processes which are fired off to run in the
background, instead of being associated with a particular request.
It returns a Deferred which completes when the function completes, but it doesn't
follow the synapse logcontext rules, which makes it appropriate for passing to
clock.looping_call and friends (or for firing-and-forgetting in the middle of a
normal synapse inlineCallbacks function).
Args:
desc (str): a description for this background process type
func: a function, which may return a Deferred
args: positional args for func
kwargs: keyword args for func
Returns: Deferred which returns the result of func, but note that it does not
follow the synapse logcontext rules.
"""
@defer.inlineCallbacks
def run():
count = _background_process_counts.get(desc, 0)
_background_process_counts[desc] = count + 1
_background_process_start_count.labels(desc).inc()
with LoggingContext(desc) as context:
context.request = "%s-%i" % (desc, count)
proc = _BackgroundProcess(desc, context)
_background_processes.setdefault(desc, set()).add(proc)
try:
yield func(*args, **kwargs)
finally:
proc.update_metrics()
_background_processes[desc].remove(proc)
with PreserveLoggingContext():
return run()

View file

@ -274,7 +274,7 @@ class Notifier(object):
logger.exception("Error notifying application services of event")
def on_new_event(self, stream_key, new_token, users=[], rooms=[]):
""" Used to inform listeners that something has happend event wise.
""" Used to inform listeners that something has happened event wise.
Will wake up all listeners for the given users and rooms.
"""

View file

@ -112,7 +112,8 @@ class BulkPushRuleEvaluator(object):
@defer.inlineCallbacks
def _get_power_levels_and_sender_level(self, event, context):
pl_event_id = context.prev_state_ids.get(POWER_KEY)
prev_state_ids = yield context.get_prev_state_ids(self.store)
pl_event_id = prev_state_ids.get(POWER_KEY)
if pl_event_id:
# fastpath: if there's a power level event, that's all we need, and
# not having a power level event is an extreme edge case
@ -120,7 +121,7 @@ class BulkPushRuleEvaluator(object):
auth_events = {POWER_KEY: pl_event}
else:
auth_events_ids = yield self.auth.compute_auth_events(
event, context.prev_state_ids, for_verification=False,
event, prev_state_ids, for_verification=False,
)
auth_events = yield self.store.get_events(auth_events_ids)
auth_events = {
@ -304,7 +305,7 @@ class RulesForRoom(object):
push_rules_delta_state_cache_metric.inc_hits()
else:
current_state_ids = context.current_state_ids
current_state_ids = yield context.get_current_state_ids(self.store)
push_rules_delta_state_cache_metric.inc_misses()
push_rules_state_size_counter.inc(len(current_state_ids))

View file

@ -34,12 +34,13 @@ logger = logging.getLogger(__name__)
@defer.inlineCallbacks
def send_event_to_master(clock, client, host, port, requester, event, context,
def send_event_to_master(clock, store, client, host, port, requester, event, context,
ratelimit, extra_users):
"""Send event to be handled on the master
Args:
clock (synapse.util.Clock)
store (DataStore)
client (SimpleHttpClient)
host (str): host of master
port (int): port on master listening for HTTP replication
@ -53,11 +54,13 @@ def send_event_to_master(clock, client, host, port, requester, event, context,
host, port, event.event_id,
)
serialized_context = yield context.serialize(event, store)
payload = {
"event": event.get_pdu_json(),
"internal_metadata": event.internal_metadata.get_dict(),
"rejected_reason": event.rejected_reason,
"context": context.serialize(event),
"context": serialized_context,
"requester": requester.serialize(),
"ratelimit": ratelimit,
"extra_users": [u.to_string() for u in extra_users],

View file

@ -192,7 +192,7 @@ class ReplicationClientHandler(object):
"""Returns a deferred that is resolved when we receive a SYNC command
with given data.
Used by tests.
[Not currently] used by tests.
"""
return self.awaiting_syncs.setdefault(data, defer.Deferred())

View file

@ -25,6 +25,7 @@ from twisted.internet import defer
from twisted.internet.protocol import Factory
from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.metrics import Measure, measure_func
from .protocol import ServerReplicationStreamProtocol
@ -117,7 +118,6 @@ class ReplicationStreamer(object):
for conn in self.connections:
conn.send_error("server shutting down")
@defer.inlineCallbacks
def on_notifier_poke(self):
"""Checks if there is actually any new data and sends it to the
connections if there are.
@ -132,14 +132,16 @@ class ReplicationStreamer(object):
stream.discard_updates_and_advance()
return
# If we're in the process of checking for new updates, mark that fact
# and return
self.pending_updates = True
if self.is_looping:
logger.debug("Noitifier poke loop already running")
self.pending_updates = True
logger.debug("Notifier poke loop already running")
return
self.pending_updates = True
run_as_background_process("replication_notifier", self._run_notifier_loop)
@defer.inlineCallbacks
def _run_notifier_loop(self):
self.is_looping = True
try:

View file

@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -13,13 +14,24 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from six import PY3
from synapse.http.server import JsonResource
from synapse.rest.client import versions
from synapse.rest.client.v1 import admin, directory, events, initial_sync
from synapse.rest.client.v1 import login as v1_login
from synapse.rest.client.v1 import logout, presence, profile, push_rule, pusher
from synapse.rest.client.v1 import register as v1_register
from synapse.rest.client.v1 import room, voip
from synapse.rest.client.v1 import (
admin,
directory,
events,
initial_sync,
login as v1_login,
logout,
presence,
profile,
push_rule,
pusher,
room,
voip,
)
from synapse.rest.client.v2_alpha import (
account,
account_data,
@ -42,6 +54,11 @@ from synapse.rest.client.v2_alpha import (
user_directory,
)
if not PY3:
from synapse.rest.client.v1_only import (
register as v1_register,
)
class ClientRestResource(JsonResource):
"""A resource for version 1 of the matrix client API."""
@ -54,14 +71,22 @@ class ClientRestResource(JsonResource):
def register_servlets(client_resource, hs):
versions.register_servlets(client_resource)
# "v1"
room.register_servlets(hs, client_resource)
if not PY3:
# "v1" (Python 2 only)
v1_register.register_servlets(hs, client_resource)
# Deprecated in r0
initial_sync.register_servlets(hs, client_resource)
room.register_deprecated_servlets(hs, client_resource)
# Partially deprecated in r0
events.register_servlets(hs, client_resource)
v1_register.register_servlets(hs, client_resource)
# "v1" + "r0"
room.register_servlets(hs, client_resource)
v1_login.register_servlets(hs, client_resource)
profile.register_servlets(hs, client_resource)
presence.register_servlets(hs, client_resource)
initial_sync.register_servlets(hs, client_resource)
directory.register_servlets(hs, client_resource)
voip.register_servlets(hs, client_resource)
admin.register_servlets(hs, client_resource)

View file

@ -14,6 +14,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import hashlib
import hmac
import logging
from six.moves import http_client
@ -24,9 +26,9 @@ from synapse.api.constants import Membership
from synapse.api.errors import AuthError, Codes, NotFoundError, SynapseError
from synapse.http.servlet import (
assert_params_in_dict,
parse_json_object_from_request,
parse_integer,
parse_string
parse_json_object_from_request,
parse_string,
)
from synapse.types import UserID, create_requester
@ -63,6 +65,125 @@ class UsersRestServlet(ClientV1RestServlet):
defer.returnValue((200, ret))
class UserRegisterServlet(ClientV1RestServlet):
"""
Attributes:
NONCE_TIMEOUT (int): Seconds until a generated nonce won't be accepted
nonces (dict[str, int]): The nonces that we will accept. A dict of
nonce to the time it was generated, in int seconds.
"""
PATTERNS = client_path_patterns("/admin/register")
NONCE_TIMEOUT = 60
def __init__(self, hs):
super(UserRegisterServlet, self).__init__(hs)
self.handlers = hs.get_handlers()
self.reactor = hs.get_reactor()
self.nonces = {}
self.hs = hs
def _clear_old_nonces(self):
"""
Clear out old nonces that are older than NONCE_TIMEOUT.
"""
now = int(self.reactor.seconds())
for k, v in list(self.nonces.items()):
if now - v > self.NONCE_TIMEOUT:
del self.nonces[k]
def on_GET(self, request):
"""
Generate a new nonce.
"""
self._clear_old_nonces()
nonce = self.hs.get_secrets().token_hex(64)
self.nonces[nonce] = int(self.reactor.seconds())
return (200, {"nonce": nonce.encode('ascii')})
@defer.inlineCallbacks
def on_POST(self, request):
self._clear_old_nonces()
if not self.hs.config.registration_shared_secret:
raise SynapseError(400, "Shared secret registration is not enabled")
body = parse_json_object_from_request(request)
if "nonce" not in body:
raise SynapseError(
400, "nonce must be specified", errcode=Codes.BAD_JSON,
)
nonce = body["nonce"]
if nonce not in self.nonces:
raise SynapseError(
400, "unrecognised nonce",
)
# Delete the nonce, so it can't be reused, even if it's invalid
del self.nonces[nonce]
if "username" not in body:
raise SynapseError(
400, "username must be specified", errcode=Codes.BAD_JSON,
)
else:
if (not isinstance(body['username'], str) or len(body['username']) > 512):
raise SynapseError(400, "Invalid username")
username = body["username"].encode("utf-8")
if b"\x00" in username:
raise SynapseError(400, "Invalid username")
if "password" not in body:
raise SynapseError(
400, "password must be specified", errcode=Codes.BAD_JSON,
)
else:
if (not isinstance(body['password'], str) or len(body['password']) > 512):
raise SynapseError(400, "Invalid password")
password = body["password"].encode("utf-8")
if b"\x00" in password:
raise SynapseError(400, "Invalid password")
admin = body.get("admin", None)
got_mac = body["mac"]
want_mac = hmac.new(
key=self.hs.config.registration_shared_secret.encode(),
digestmod=hashlib.sha1,
)
want_mac.update(nonce)
want_mac.update(b"\x00")
want_mac.update(username)
want_mac.update(b"\x00")
want_mac.update(password)
want_mac.update(b"\x00")
want_mac.update(b"admin" if admin else b"notadmin")
want_mac = want_mac.hexdigest()
if not hmac.compare_digest(want_mac, got_mac):
raise SynapseError(
403, "HMAC incorrect",
)
# Reuse the parts of RegisterRestServlet to reduce code duplication
from synapse.rest.client.v2_alpha.register import RegisterRestServlet
register = RegisterRestServlet(self.hs)
(user_id, _) = yield register.registration_handler.register(
localpart=username.lower(), password=password, admin=bool(admin),
generate_token=False,
)
result = yield register._create_registration_details(user_id, body)
defer.returnValue((200, result))
class WhoisRestServlet(ClientV1RestServlet):
PATTERNS = client_path_patterns("/admin/whois/(?P<user_id>[^/]*)")
@ -123,7 +244,7 @@ class PurgeHistoryRestServlet(ClientV1RestServlet):
hs (synapse.server.HomeServer)
"""
super(PurgeHistoryRestServlet, self).__init__(hs)
self.handlers = hs.get_handlers()
self.pagination_handler = hs.get_pagination_handler()
self.store = hs.get_datastore()
@defer.inlineCallbacks
@ -198,7 +319,7 @@ class PurgeHistoryRestServlet(ClientV1RestServlet):
errcode=Codes.BAD_JSON,
)
purge_id = yield self.handlers.message_handler.start_purge_history(
purge_id = yield self.pagination_handler.start_purge_history(
room_id, token,
delete_local_events=delete_local_events,
)
@ -220,7 +341,7 @@ class PurgeHistoryStatusRestServlet(ClientV1RestServlet):
hs (synapse.server.HomeServer)
"""
super(PurgeHistoryStatusRestServlet, self).__init__(hs)
self.handlers = hs.get_handlers()
self.pagination_handler = hs.get_pagination_handler()
@defer.inlineCallbacks
def on_GET(self, request, purge_id):
@ -230,7 +351,7 @@ class PurgeHistoryStatusRestServlet(ClientV1RestServlet):
if not is_admin:
raise AuthError(403, "You are not a server admin")
purge_status = self.handlers.message_handler.get_purge_status(purge_id)
purge_status = self.pagination_handler.get_purge_status(purge_id)
if purge_status is None:
raise NotFoundError("purge id '%s' not found" % purge_id)
@ -614,3 +735,4 @@ def register_servlets(hs, http_server):
ShutdownRoomRestServlet(hs).register(http_server)
QuarantineMediaInRoom(hs).register(http_server)
ListMediaInRoom(hs).register(http_server)
UserRegisterServlet(hs).register(http_server)

View file

@ -15,8 +15,8 @@
from twisted.internet import defer
from synapse.streams.config import PaginationConfig
from synapse.http.servlet import parse_boolean
from synapse.streams.config import PaginationConfig
from .base import ClientV1RestServlet, client_path_patterns

View file

@ -90,6 +90,7 @@ class RoomStateEventRestServlet(ClientV1RestServlet):
self.handlers = hs.get_handlers()
self.event_creation_hander = hs.get_event_creation_handler()
self.room_member_handler = hs.get_room_member_handler()
self.message_handler = hs.get_message_handler()
def register(self, http_server):
# /room/$roomid/state/$eventtype
@ -124,7 +125,7 @@ class RoomStateEventRestServlet(ClientV1RestServlet):
format = parse_string(request, "format", default="content",
allowed_values=["content", "event"])
msg_handler = self.handlers.message_handler
msg_handler = self.message_handler
data = yield msg_handler.get_room_data(
user_id=requester.user.to_string(),
room_id=room_id,
@ -377,14 +378,13 @@ class RoomMemberListRestServlet(ClientV1RestServlet):
def __init__(self, hs):
super(RoomMemberListRestServlet, self).__init__(hs)
self.handlers = hs.get_handlers()
self.message_handler = hs.get_message_handler()
@defer.inlineCallbacks
def on_GET(self, request, room_id):
# TODO support Pagination stream API (limit/tokens)
requester = yield self.auth.get_user_by_req(request)
handler = self.handlers.message_handler
events = yield handler.get_state_events(
events = yield self.message_handler.get_state_events(
room_id=room_id,
user_id=requester.user.to_string(),
)
@ -406,7 +406,7 @@ class JoinedRoomMemberListRestServlet(ClientV1RestServlet):
def __init__(self, hs):
super(JoinedRoomMemberListRestServlet, self).__init__(hs)
self.message_handler = hs.get_handlers().message_handler
self.message_handler = hs.get_message_handler()
@defer.inlineCallbacks
def on_GET(self, request, room_id):
@ -427,7 +427,7 @@ class RoomMessageListRestServlet(ClientV1RestServlet):
def __init__(self, hs):
super(RoomMessageListRestServlet, self).__init__(hs)
self.handlers = hs.get_handlers()
self.pagination_handler = hs.get_pagination_handler()
@defer.inlineCallbacks
def on_GET(self, request, room_id):
@ -442,8 +442,7 @@ class RoomMessageListRestServlet(ClientV1RestServlet):
event_filter = Filter(json.loads(filter_json))
else:
event_filter = None
handler = self.handlers.message_handler
msgs = yield handler.get_messages(
msgs = yield self.pagination_handler.get_messages(
room_id=room_id,
requester=requester,
pagin_config=pagination_config,
@ -460,14 +459,13 @@ class RoomStateRestServlet(ClientV1RestServlet):
def __init__(self, hs):
super(RoomStateRestServlet, self).__init__(hs)
self.handlers = hs.get_handlers()
self.message_handler = hs.get_message_handler()
@defer.inlineCallbacks
def on_GET(self, request, room_id):
requester = yield self.auth.get_user_by_req(request, allow_guest=True)
handler = self.handlers.message_handler
# Get all the current state for this room
events = yield handler.get_state_events(
events = yield self.message_handler.get_state_events(
room_id=room_id,
user_id=requester.user.to_string(),
is_guest=requester.is_guest,
@ -525,7 +523,7 @@ class RoomEventContextServlet(ClientV1RestServlet):
def __init__(self, hs):
super(RoomEventContextServlet, self).__init__(hs)
self.clock = hs.get_clock()
self.handlers = hs.get_handlers()
self.room_context_handler = hs.get_room_context_handler()
@defer.inlineCallbacks
def on_GET(self, request, room_id, event_id):
@ -533,7 +531,7 @@ class RoomEventContextServlet(ClientV1RestServlet):
limit = parse_integer(request, "limit", default=10)
results = yield self.handlers.room_context_handler.get_event_context(
results = yield self.room_context_handler.get_event_context(
requester.user,
room_id,
event_id,
@ -832,10 +830,13 @@ def register_servlets(hs, http_server):
RoomSendEventRestServlet(hs).register(http_server)
PublicRoomListRestServlet(hs).register(http_server)
RoomStateRestServlet(hs).register(http_server)
RoomInitialSyncRestServlet(hs).register(http_server)
RoomRedactEventRestServlet(hs).register(http_server)
RoomTypingRestServlet(hs).register(http_server)
SearchRestServlet(hs).register(http_server)
JoinedRoomsRestServlet(hs).register(http_server)
RoomEventServlet(hs).register(http_server)
RoomEventContextServlet(hs).register(http_server)
def register_deprecated_servlets(hs, http_server):
RoomInitialSyncRestServlet(hs).register(http_server)

View file

@ -0,0 +1,3 @@
"""
REST APIs that are only used in v1 (the legacy API).
"""

View file

@ -0,0 +1,39 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""This module contains base REST classes for constructing client v1 servlets.
"""
import re
from synapse.api.urls import CLIENT_PREFIX
def v1_only_client_path_patterns(path_regex, include_in_unstable=True):
"""Creates a regex compiled client path with the correct client path
prefix.
Args:
path_regex (str): The regex string to match. This should NOT have a ^
as this will be prefixed.
Returns:
list of SRE_Pattern
"""
patterns = [re.compile("^" + CLIENT_PREFIX + path_regex)]
if include_in_unstable:
unstable_prefix = CLIENT_PREFIX.replace("/api/v1", "/unstable")
patterns.append(re.compile("^" + unstable_prefix + path_regex))
return patterns

View file

@ -24,9 +24,10 @@ import synapse.util.stringutils as stringutils
from synapse.api.constants import LoginType
from synapse.api.errors import Codes, SynapseError
from synapse.http.servlet import assert_params_in_dict, parse_json_object_from_request
from synapse.rest.client.v1.base import ClientV1RestServlet
from synapse.types import create_requester
from .base import ClientV1RestServlet, client_path_patterns
from .base import v1_only_client_path_patterns
logger = logging.getLogger(__name__)
@ -49,7 +50,7 @@ class RegisterRestServlet(ClientV1RestServlet):
handler doesn't have a concept of multi-stages or sessions.
"""
PATTERNS = client_path_patterns("/register$", releases=(), include_in_unstable=False)
PATTERNS = v1_only_client_path_patterns("/register$", include_in_unstable=False)
def __init__(self, hs):
"""
@ -379,7 +380,7 @@ class CreateUserRestServlet(ClientV1RestServlet):
"""Handles user creation via a server-to-server interface
"""
PATTERNS = client_path_patterns("/createUser$", releases=())
PATTERNS = v1_only_client_path_patterns("/createUser$")
def __init__(self, hs):
super(CreateUserRestServlet, self).__init__(hs)

Some files were not shown because too many files have changed in this diff Show more