0
0
Fork 1
mirror of https://mau.dev/maunium/synapse.git synced 2024-12-14 14:03:54 +01:00

Merge branch 'develop' into matthew/filter_members

This commit is contained in:
Matthew Hodgson 2018-07-23 19:21:37 +01:00
commit adfe29ec0b
53 changed files with 3683 additions and 3243 deletions

2470
CHANGES.md Normal file

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

View file

@ -2,6 +2,7 @@ include synctl
include LICENSE
include VERSION
include *.rst
include *.md
include demo/README
include demo/demo.tls.dh
include demo/*.py

View file

@ -71,7 +71,7 @@ We'd like to invite you to join #matrix:matrix.org (via
https://matrix.org/docs/projects/try-matrix-now.html), run a homeserver, take a look
at the `Matrix spec <https://matrix.org/docs/spec>`_, and experiment with the
`APIs <https://matrix.org/docs/api>`_ and `Client SDKs
<http://matrix.org/docs/projects/try-matrix-now.html#client-sdks>`_.
<https://matrix.org/docs/projects/try-matrix-now.html#client-sdks>`_.
Thanks for using Matrix!
@ -283,7 +283,7 @@ Connecting to Synapse from a client
The easiest way to try out your new Synapse installation is by connecting to it
from a web client. The easiest option is probably the one at
http://riot.im/app. You will need to specify a "Custom server" when you log on
https://riot.im/app. You will need to specify a "Custom server" when you log on
or register: set this to ``https://domain.tld`` if you setup a reverse proxy
following the recommended setup, or ``https://localhost:8448`` - remember to specify the
port (``:8448``) if not ``:443`` unless you changed the configuration. (Leave the identity
@ -329,7 +329,7 @@ Security Note
=============
Matrix serves raw user generated data in some APIs - specifically the `content
repository endpoints <http://matrix.org/docs/spec/client_server/latest.html#get-matrix-media-r0-download-servername-mediaid>`_.
repository endpoints <https://matrix.org/docs/spec/client_server/latest.html#get-matrix-media-r0-download-servername-mediaid>`_.
Whilst we have tried to mitigate against possible XSS attacks (e.g.
https://github.com/matrix-org/synapse/pull/1021) we recommend running
@ -348,7 +348,7 @@ Platform-Specific Instructions
Debian
------
Matrix provides official Debian packages via apt from http://matrix.org/packages/debian/.
Matrix provides official Debian packages via apt from https://matrix.org/packages/debian/.
Note that these packages do not include a client - choose one from
https://matrix.org/docs/projects/try-matrix-now.html (or build your own with one of our SDKs :)
@ -524,7 +524,7 @@ Troubleshooting Running
-----------------------
If synapse fails with ``missing "sodium.h"`` crypto errors, you may need
to manually upgrade PyNaCL, as synapse uses NaCl (http://nacl.cr.yp.to/) for
to manually upgrade PyNaCL, as synapse uses NaCl (https://nacl.cr.yp.to/) for
encryption and digital signatures.
Unfortunately PyNACL currently has a few issues
(https://github.com/pyca/pynacl/issues/53) and
@ -672,8 +672,8 @@ useful just for development purposes. See `<demo/README>`_.
Using PostgreSQL
================
As of Synapse 0.9, `PostgreSQL <http://www.postgresql.org>`_ is supported as an
alternative to the `SQLite <http://sqlite.org/>`_ database that Synapse has
As of Synapse 0.9, `PostgreSQL <https://www.postgresql.org>`_ is supported as an
alternative to the `SQLite <https://sqlite.org/>`_ database that Synapse has
traditionally used for convenience and simplicity.
The advantages of Postgres include:
@ -697,7 +697,7 @@ Using a reverse proxy with Synapse
It is recommended to put a reverse proxy such as
`nginx <https://nginx.org/en/docs/http/ngx_http_proxy_module.html>`_,
`Apache <https://httpd.apache.org/docs/current/mod/mod_proxy_http.html>`_ or
`HAProxy <http://www.haproxy.org/>`_ in front of Synapse. One advantage of
`HAProxy <https://www.haproxy.org/>`_ in front of Synapse. One advantage of
doing so is that it means that you can expose the default https port (443) to
Matrix clients without needing to run Synapse with root privileges.

0
changelog.d/3415.misc Normal file
View file

1
changelog.d/3520.bugfix Normal file
View file

@ -0,0 +1 @@
Correctly announce deleted devices over federation

1
changelog.d/3548.bugfix Normal file
View file

@ -0,0 +1 @@
Catch failures saving metrics captured by Measure, and instead log the faulty metrics information for further analysis.

1
changelog.d/3552.misc Normal file
View file

@ -0,0 +1 @@
Release notes are now in the Markdown format.

View file

@ -1 +0,0 @@
Disable a noisy warning about logcontexts

0
changelog.d/3562.misc Normal file
View file

1
changelog.d/3570.bugfix Normal file
View file

@ -0,0 +1 @@
Fix potential stack overflow and deadlock under heavy load

1
changelog.d/3571.misc Normal file
View file

@ -0,0 +1 @@
Merge Linearizer and Limiter

1
changelog.d/3572.misc Normal file
View file

@ -0,0 +1 @@
Merge Linearizer and Limiter

0
changelog.d/3577.misc Normal file
View file

1
changelog.d/3579.misc Normal file
View file

@ -0,0 +1 @@
Lazily load state on master process when using workers to reduce DB consumption

1
changelog.d/3581.misc Normal file
View file

@ -0,0 +1 @@
Lazily load state on master process when using workers to reduce DB consumption

1
changelog.d/3582.misc Normal file
View file

@ -0,0 +1 @@
Lazily load state on master process when using workers to reduce DB consumption

1
changelog.d/3584.misc Normal file
View file

@ -0,0 +1 @@
Lazily load state on master process when using workers to reduce DB consumption

View file

@ -0,0 +1,63 @@
Shared-Secret Registration
==========================
This API allows for the creation of users in an administrative and
non-interactive way. This is generally used for bootstrapping a Synapse
instance with administrator accounts.
To authenticate yourself to the server, you will need both the shared secret
(``registration_shared_secret`` in the homeserver configuration), and a
one-time nonce. If the registration shared secret is not configured, this API
is not enabled.
To fetch the nonce, you need to request one from the API::
> GET /_matrix/client/r0/admin/register
< {"nonce": "thisisanonce"}
Once you have the nonce, you can make a ``POST`` to the same URL with a JSON
body containing the nonce, username, password, whether they are an admin
(optional, False by default), and a HMAC digest of the content.
As an example::
> POST /_matrix/client/r0/admin/register
> {
"nonce": "thisisanonce",
"username": "pepper_roni",
"password": "pizza",
"admin": true,
"mac": "mac_digest_here"
}
< {
"access_token": "token_here",
"user_id": "@pepper_roni@test",
"home_server": "test",
"device_id": "device_id_here"
}
The MAC is the hex digest output of the HMAC-SHA1 algorithm, with the key being
the shared secret and the content being the nonce, user, password, and either
the string "admin" or "notadmin", each separated by NULs. For an example of
generation in Python::
import hmac, hashlib
def generate_mac(nonce, user, password, admin=False):
mac = hmac.new(
key=shared_secret,
digestmod=hashlib.sha1,
)
mac.update(nonce.encode('utf8'))
mac.update(b"\x00")
mac.update(user.encode('utf8'))
mac.update(b"\x00")
mac.update(password.encode('utf8'))
mac.update(b"\x00")
mac.update(b"admin" if admin else b"notadmin")
return mac.hexdigest()

View file

@ -1,5 +1,30 @@
[tool.towncrier]
package = "synapse"
filename = "CHANGES.rst"
filename = "CHANGES.md"
directory = "changelog.d"
issue_format = "`#{issue} <https://github.com/matrix-org/synapse/issues/{issue}>`_"
issue_format = "[\\#{issue}](https://github.com/matrix-org/synapse/issues/{issue}>)"
[[tool.towncrier.type]]
directory = "feature"
name = "Features"
showcontent = true
[[tool.towncrier.type]]
directory = "bugfix"
name = "Bugfixes"
showcontent = true
[[tool.towncrier.type]]
directory = "doc"
name = "Improved Documentation"
showcontent = true
[[tool.towncrier.type]]
directory = "removal"
name = "Deprecations and Removals"
showcontent = true
[[tool.towncrier.type]]
directory = "misc"
name = "Internal Changes"
showcontent = true

View file

@ -26,11 +26,37 @@ import yaml
def request_registration(user, password, server_location, shared_secret, admin=False):
req = urllib2.Request(
"%s/_matrix/client/r0/admin/register" % (server_location,),
headers={'Content-Type': 'application/json'}
)
try:
if sys.version_info[:3] >= (2, 7, 9):
# As of version 2.7.9, urllib2 now checks SSL certs
import ssl
f = urllib2.urlopen(req, context=ssl.SSLContext(ssl.PROTOCOL_SSLv23))
else:
f = urllib2.urlopen(req)
body = f.read()
f.close()
nonce = json.loads(body)["nonce"]
except urllib2.HTTPError as e:
print "ERROR! Received %d %s" % (e.code, e.reason,)
if 400 <= e.code < 500:
if e.info().type == "application/json":
resp = json.load(e)
if "error" in resp:
print resp["error"]
sys.exit(1)
mac = hmac.new(
key=shared_secret,
digestmod=hashlib.sha1,
)
mac.update(nonce)
mac.update("\x00")
mac.update(user)
mac.update("\x00")
mac.update(password)
@ -40,10 +66,10 @@ def request_registration(user, password, server_location, shared_secret, admin=F
mac = mac.hexdigest()
data = {
"user": user,
"nonce": nonce,
"username": user,
"password": password,
"mac": mac,
"type": "org.matrix.login.shared_secret",
"admin": admin,
}
@ -52,7 +78,7 @@ def request_registration(user, password, server_location, shared_secret, admin=F
print "Sending registration request..."
req = urllib2.Request(
"%s/_matrix/client/api/v1/register" % (server_location,),
"%s/_matrix/client/r0/admin/register" % (server_location,),
data=json.dumps(data),
headers={'Content-Type': 'application/json'}
)

View file

@ -17,4 +17,4 @@
""" This is a reference implementation of a Matrix home server.
"""
__version__ = "0.33.0rc1"
__version__ = "0.33.0"

View file

@ -65,8 +65,9 @@ class Auth(object):
@defer.inlineCallbacks
def check_from_context(self, event, context, do_sig_check=True):
prev_state_ids = yield context.get_prev_state_ids(self.store)
auth_events_ids = yield self.compute_auth_events(
event, context.prev_state_ids, for_verification=True,
event, prev_state_ids, for_verification=True,
)
auth_events = yield self.store.get_events(auth_events_ids)
auth_events = {
@ -544,7 +545,8 @@ class Auth(object):
@defer.inlineCallbacks
def add_auth_events(self, builder, context):
auth_ids = yield self.compute_auth_events(builder, context.prev_state_ids)
prev_state_ids = yield context.get_prev_state_ids(self.store)
auth_ids = yield self.compute_auth_events(builder, prev_state_ids)
auth_events_entries = yield self.store.add_event_hashes(
auth_ids

View file

@ -18,6 +18,8 @@ import logging
import os
import sys
from six import iteritems
from twisted.application import service
from twisted.internet import defer, reactor
from twisted.web.resource import EncodingResourceWrapper, NoResource
@ -442,7 +444,7 @@ def run(hs):
stats["total_nonbridged_users"] = total_nonbridged_users
daily_user_type_results = yield hs.get_datastore().count_daily_user_type()
for name, count in daily_user_type_results.iteritems():
for name, count in iteritems(daily_user_type_results):
stats["daily_user_type_" + name] = count
room_count = yield hs.get_datastore().get_room_count()
@ -453,7 +455,7 @@ def run(hs):
stats["daily_messages"] = yield hs.get_datastore().count_daily_messages()
r30_results = yield hs.get_datastore().count_r30_users()
for name, count in r30_results.iteritems():
for name, count in iteritems(r30_results):
stats["r30_users_" + name] = count
daily_sent_messages = yield hs.get_datastore().count_daily_sent_messages()

View file

@ -25,6 +25,8 @@ import subprocess
import sys
import time
from six import iteritems
import yaml
SYNAPSE = [sys.executable, "-B", "-m", "synapse.app.homeserver"]
@ -173,7 +175,7 @@ def main():
os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor)
cache_factors = config.get("synctl_cache_factors", {})
for cache_name, factor in cache_factors.iteritems():
for cache_name, factor in iteritems(cache_factors):
os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor)
worker_configfiles = []

View file

@ -13,22 +13,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from six import iteritems
from frozendict import frozendict
from twisted.internet import defer
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
class EventContext(object):
"""
Attributes:
current_state_ids (dict[(str, str), str]):
The current state map including the current event.
(type, state_key) -> event_id
prev_state_ids (dict[(str, str), str]):
The current state map excluding the current event.
(type, state_key) -> event_id
state_group (int|None): state group id, if the state has been stored
as a state group. This is usually only None if e.g. the event is
an outlier.
@ -45,38 +41,77 @@ class EventContext(object):
prev_state_events (?): XXX: is this ever set to anything other than
the empty list?
_current_state_ids (dict[(str, str), str]|None):
The current state map including the current event. None if outlier
or we haven't fetched the state from DB yet.
(type, state_key) -> event_id
_prev_state_ids (dict[(str, str), str]|None):
The current state map excluding the current event. None if outlier
or we haven't fetched the state from DB yet.
(type, state_key) -> event_id
_fetching_state_deferred (Deferred|None): Resolves when *_state_ids have
been calculated. None if we haven't started calculating yet
_event_type (str): The type of the event the context is associated with.
Only set when state has not been fetched yet.
_event_state_key (str|None): The state_key of the event the context is
associated with. Only set when state has not been fetched yet.
_prev_state_id (str|None): If the event associated with the context is
a state event, then `_prev_state_id` is the event_id of the state
that was replaced.
Only set when state has not been fetched yet.
"""
__slots__ = [
"current_state_ids",
"prev_state_ids",
"state_group",
"rejected",
"prev_group",
"delta_ids",
"prev_state_events",
"app_service",
"_current_state_ids",
"_prev_state_ids",
"_prev_state_id",
"_event_type",
"_event_state_key",
"_fetching_state_deferred",
]
def __init__(self):
# The current state including the current event
self.current_state_ids = None
# The current state excluding the current event
self.prev_state_ids = None
self.state_group = None
self.prev_state_events = []
self.rejected = False
self.app_service = None
@staticmethod
def with_state(state_group, current_state_ids, prev_state_ids,
prev_group=None, delta_ids=None):
context = EventContext()
# The current state including the current event
context._current_state_ids = current_state_ids
# The current state excluding the current event
context._prev_state_ids = prev_state_ids
context.state_group = state_group
context._prev_state_id = None
context._event_type = None
context._event_state_key = None
context._fetching_state_deferred = defer.succeed(None)
# A previously persisted state group and a delta between that
# and this state.
self.prev_group = None
self.delta_ids = None
context.prev_group = prev_group
context.delta_ids = delta_ids
self.prev_state_events = None
return context
self.app_service = None
def serialize(self, event):
@defer.inlineCallbacks
def serialize(self, event, store):
"""Converts self to a type that can be serialized as JSON, and then
deserialized by `deserialize`
@ -92,11 +127,12 @@ class EventContext(object):
# the prev_state_ids, so if we're a state event we include the event
# id that we replaced in the state.
if event.is_state():
prev_state_id = self.prev_state_ids.get((event.type, event.state_key))
prev_state_ids = yield self.get_prev_state_ids(store)
prev_state_id = prev_state_ids.get((event.type, event.state_key))
else:
prev_state_id = None
return {
defer.returnValue({
"prev_state_id": prev_state_id,
"event_type": event.type,
"event_state_key": event.state_key if event.is_state() else None,
@ -106,10 +142,9 @@ class EventContext(object):
"delta_ids": _encode_state_dict(self.delta_ids),
"prev_state_events": self.prev_state_events,
"app_service_id": self.app_service.id if self.app_service else None
}
})
@staticmethod
@defer.inlineCallbacks
def deserialize(store, input):
"""Converts a dict that was produced by `serialize` back into a
EventContext.
@ -122,32 +157,114 @@ class EventContext(object):
EventContext
"""
context = EventContext()
context.state_group = input["state_group"]
context.rejected = input["rejected"]
context.prev_group = input["prev_group"]
context.delta_ids = _decode_state_dict(input["delta_ids"])
context.prev_state_events = input["prev_state_events"]
# We use the state_group and prev_state_id stuff to pull the
# current_state_ids out of the DB and construct prev_state_ids.
prev_state_id = input["prev_state_id"]
event_type = input["event_type"]
event_state_key = input["event_state_key"]
context._prev_state_id = input["prev_state_id"]
context._event_type = input["event_type"]
context._event_state_key = input["event_state_key"]
context.current_state_ids = yield store.get_state_ids_for_group(
context.state_group,
)
if prev_state_id and event_state_key:
context.prev_state_ids = dict(context.current_state_ids)
context.prev_state_ids[(event_type, event_state_key)] = prev_state_id
else:
context.prev_state_ids = context.current_state_ids
context._current_state_ids = None
context._prev_state_ids = None
context._fetching_state_deferred = None
context.state_group = input["state_group"]
context.prev_group = input["prev_group"]
context.delta_ids = _decode_state_dict(input["delta_ids"])
context.rejected = input["rejected"]
context.prev_state_events = input["prev_state_events"]
app_service_id = input["app_service_id"]
if app_service_id:
context.app_service = store.get_app_service_by_id(app_service_id)
defer.returnValue(context)
return context
@defer.inlineCallbacks
def get_current_state_ids(self, store):
"""Gets the current state IDs
Returns:
Deferred[dict[(str, str), str]|None]: Returns None if state_group
is None, which happens when the associated event is an outlier.
"""
if not self._fetching_state_deferred:
self._fetching_state_deferred = run_in_background(
self._fill_out_state, store,
)
yield make_deferred_yieldable(self._fetching_state_deferred)
defer.returnValue(self._current_state_ids)
@defer.inlineCallbacks
def get_prev_state_ids(self, store):
"""Gets the prev state IDs
Returns:
Deferred[dict[(str, str), str]|None]: Returns None if state_group
is None, which happens when the associated event is an outlier.
"""
if not self._fetching_state_deferred:
self._fetching_state_deferred = run_in_background(
self._fill_out_state, store,
)
yield make_deferred_yieldable(self._fetching_state_deferred)
defer.returnValue(self._prev_state_ids)
def get_cached_current_state_ids(self):
"""Gets the current state IDs if we have them already cached.
Returns:
dict[(str, str), str]|None: Returns None if we haven't cached the
state or if state_group is None, which happens when the associated
event is an outlier.
"""
return self._current_state_ids
@defer.inlineCallbacks
def _fill_out_state(self, store):
"""Called to populate the _current_state_ids and _prev_state_ids
attributes by loading from the database.
"""
if self.state_group is None:
return
self._current_state_ids = yield store.get_state_ids_for_group(
self.state_group,
)
if self._prev_state_id and self._event_state_key is not None:
self._prev_state_ids = dict(self._current_state_ids)
key = (self._event_type, self._event_state_key)
self._prev_state_ids[key] = self._prev_state_id
else:
self._prev_state_ids = self._current_state_ids
@defer.inlineCallbacks
def update_state(self, state_group, prev_state_ids, current_state_ids,
delta_ids):
"""Replace the state in the context
"""
# We need to make sure we wait for any ongoing fetching of state
# to complete so that the updated state doesn't get clobbered
if self._fetching_state_deferred:
yield make_deferred_yieldable(self._fetching_state_deferred)
self.state_group = state_group
self._prev_state_ids = prev_state_ids
self._current_state_ids = current_state_ids
self.delta_ids = delta_ids
# We need to ensure that that we've marked as having fetched the state
self._fetching_state_deferred = defer.succeed(None)
def _encode_state_dict(state_dict):
@ -159,7 +276,7 @@ def _encode_state_dict(state_dict):
return [
(etype, state_key, v)
for (etype, state_key), v in state_dict.iteritems()
for (etype, state_key), v in iteritems(state_dict)
]

View file

@ -112,8 +112,9 @@ class BaseHandler(object):
guest_access = event.content.get("guest_access", "forbidden")
if guest_access != "can_join":
if context:
current_state_ids = yield context.get_current_state_ids(self.store)
current_state = yield self.store.get_events(
list(context.current_state_ids.values())
list(current_state_ids.values())
)
else:
current_state = yield self.state_handler.get_current_state(

View file

@ -21,8 +21,8 @@ import logging
import sys
import six
from six import iteritems
from six.moves import http_client
from six import iteritems, itervalues
from six.moves import http_client, zip
from signedjson.key import decode_verify_key_bytes
from signedjson.sign import verify_signed_json
@ -486,7 +486,10 @@ class FederationHandler(BaseHandler):
# joined the room. Don't bother if the user is just
# changing their profile info.
newly_joined = True
prev_state_id = context.prev_state_ids.get(
prev_state_ids = yield context.get_prev_state_ids(self.store)
prev_state_id = prev_state_ids.get(
(event.type, event.state_key)
)
if prev_state_id:
@ -731,7 +734,7 @@ class FederationHandler(BaseHandler):
"""
joined_users = [
(state_key, int(event.depth))
for (e_type, state_key), event in state.iteritems()
for (e_type, state_key), event in iteritems(state)
if e_type == EventTypes.Member
and event.membership == Membership.JOIN
]
@ -748,7 +751,7 @@ class FederationHandler(BaseHandler):
except Exception:
pass
return sorted(joined_domains.iteritems(), key=lambda d: d[1])
return sorted(joined_domains.items(), key=lambda d: d[1])
curr_domains = get_domains_from_state(curr_state)
@ -811,7 +814,7 @@ class FederationHandler(BaseHandler):
tried_domains = set(likely_domains)
tried_domains.add(self.server_name)
event_ids = list(extremities.iterkeys())
event_ids = list(extremities.keys())
logger.debug("calling resolve_state_groups in _maybe_backfill")
resolve = logcontext.preserve_fn(
@ -827,15 +830,15 @@ class FederationHandler(BaseHandler):
states = dict(zip(event_ids, [s.state for s in states]))
state_map = yield self.store.get_events(
[e_id for ids in states.itervalues() for e_id in ids.itervalues()],
[e_id for ids in itervalues(states) for e_id in itervalues(ids)],
get_prev_content=False
)
states = {
key: {
k: state_map[e_id]
for k, e_id in state_dict.iteritems()
for k, e_id in iteritems(state_dict)
if e_id in state_map
} for key, state_dict in states.iteritems()
} for key, state_dict in iteritems(states)
}
for e_id, _ in sorted_extremeties_tuple:
@ -1106,10 +1109,12 @@ class FederationHandler(BaseHandler):
user = UserID.from_string(event.state_key)
yield user_joined_room(self.distributor, user, event.room_id)
state_ids = list(context.prev_state_ids.values())
prev_state_ids = yield context.get_prev_state_ids(self.store)
state_ids = list(prev_state_ids.values())
auth_chain = yield self.store.get_auth_chain(state_ids)
state = yield self.store.get_events(list(context.prev_state_ids.values()))
state = yield self.store.get_events(list(prev_state_ids.values()))
defer.returnValue({
"state": list(state.values()),
@ -1515,7 +1520,7 @@ class FederationHandler(BaseHandler):
yield self.store.persist_events(
[
(ev_info["event"], context)
for ev_info, context in itertools.izip(event_infos, contexts)
for ev_info, context in zip(event_infos, contexts)
],
backfilled=backfilled,
)
@ -1635,8 +1640,9 @@ class FederationHandler(BaseHandler):
)
if not auth_events:
prev_state_ids = yield context.get_prev_state_ids(self.store)
auth_events_ids = yield self.auth.compute_auth_events(
event, context.prev_state_ids, for_verification=True,
event, prev_state_ids, for_verification=True,
)
auth_events = yield self.store.get_events(auth_events_ids)
auth_events = {
@ -1876,9 +1882,10 @@ class FederationHandler(BaseHandler):
break
if do_resolution:
prev_state_ids = yield context.get_prev_state_ids(self.store)
# 1. Get what we think is the auth chain.
auth_ids = yield self.auth.compute_auth_events(
event, context.prev_state_ids
event, prev_state_ids
)
local_auth_chain = yield self.store.get_auth_chain(
auth_ids, include_given=True
@ -1968,21 +1975,35 @@ class FederationHandler(BaseHandler):
k: a.event_id for k, a in iteritems(auth_events)
if k != event_key
}
context.current_state_ids = dict(context.current_state_ids)
context.current_state_ids.update(state_updates)
current_state_ids = yield context.get_current_state_ids(self.store)
current_state_ids = dict(current_state_ids)
current_state_ids.update(state_updates)
if context.delta_ids is not None:
context.delta_ids = dict(context.delta_ids)
context.delta_ids.update(state_updates)
context.prev_state_ids = dict(context.prev_state_ids)
context.prev_state_ids.update({
delta_ids = dict(context.delta_ids)
delta_ids.update(state_updates)
prev_state_ids = yield context.get_prev_state_ids(self.store)
prev_state_ids = dict(prev_state_ids)
prev_state_ids.update({
k: a.event_id for k, a in iteritems(auth_events)
})
context.state_group = yield self.store.store_state_group(
state_group = yield self.store.store_state_group(
event.event_id,
event.room_id,
prev_group=context.prev_group,
delta_ids=context.delta_ids,
current_state_ids=context.current_state_ids,
delta_ids=delta_ids,
current_state_ids=current_state_ids,
)
yield context.update_state(
state_group=state_group,
current_state_ids=current_state_ids,
prev_state_ids=prev_state_ids,
delta_ids=delta_ids,
)
@defer.inlineCallbacks
@ -2222,7 +2243,8 @@ class FederationHandler(BaseHandler):
event.content["third_party_invite"]["signed"]["token"]
)
original_invite = None
original_invite_id = context.prev_state_ids.get(key)
prev_state_ids = yield context.get_prev_state_ids(self.store)
original_invite_id = prev_state_ids.get(key)
if original_invite_id:
original_invite = yield self.store.get_event(
original_invite_id, allow_none=True
@ -2264,7 +2286,8 @@ class FederationHandler(BaseHandler):
signed = event.content["third_party_invite"]["signed"]
token = signed["token"]
invite_event_id = context.prev_state_ids.get(
prev_state_ids = yield context.get_prev_state_ids(self.store)
invite_event_id = prev_state_ids.get(
(EventTypes.ThirdPartyInvite, token,)
)

View file

@ -33,7 +33,7 @@ from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
from synapse.replication.http.send_event import send_event_to_master
from synapse.types import RoomAlias, RoomStreamToken, UserID
from synapse.util.async import Limiter, ReadWriteLock
from synapse.util.async import Linearizer, ReadWriteLock
from synapse.util.frozenutils import frozendict_json_encoder
from synapse.util.logcontext import run_in_background
from synapse.util.metrics import measure_func
@ -427,7 +427,7 @@ class EventCreationHandler(object):
# We arbitrarily limit concurrent event creation for a room to 5.
# This is to stop us from diverging history *too* much.
self.limiter = Limiter(max_count=5)
self.limiter = Linearizer(max_count=5, name="room_event_creation_limit")
self.action_generator = hs.get_action_generator()
@ -630,7 +630,8 @@ class EventCreationHandler(object):
If so, returns the version of the event in context.
Otherwise, returns None.
"""
prev_event_id = context.prev_state_ids.get((event.type, event.state_key))
prev_state_ids = yield context.get_prev_state_ids(self.store)
prev_event_id = prev_state_ids.get((event.type, event.state_key))
prev_event = yield self.store.get_event(prev_event_id, allow_none=True)
if not prev_event:
return
@ -752,8 +753,8 @@ class EventCreationHandler(object):
event = builder.build()
logger.debug(
"Created event %s with state: %s",
event.event_id, context.prev_state_ids,
"Created event %s",
event.event_id,
)
defer.returnValue(
@ -806,8 +807,9 @@ class EventCreationHandler(object):
# If we're a worker we need to hit out to the master.
if self.config.worker_app:
yield send_event_to_master(
self.hs.get_clock(),
self.http_client,
clock=self.hs.get_clock(),
store=self.store,
client=self.http_client,
host=self.config.worker_replication_host,
port=self.config.worker_replication_http_port,
requester=requester,
@ -884,9 +886,11 @@ class EventCreationHandler(object):
e.sender == event.sender
)
current_state_ids = yield context.get_current_state_ids(self.store)
state_to_include_ids = [
e_id
for k, e_id in iteritems(context.current_state_ids)
for k, e_id in iteritems(current_state_ids)
if k[0] in self.hs.config.room_invite_state_types
or k == (EventTypes.Member, event.sender)
]
@ -922,8 +926,9 @@ class EventCreationHandler(object):
)
if event.type == EventTypes.Redaction:
prev_state_ids = yield context.get_prev_state_ids(self.store)
auth_events_ids = yield self.auth.compute_auth_events(
event, context.prev_state_ids, for_verification=True,
event, prev_state_ids, for_verification=True,
)
auth_events = yield self.store.get_events(auth_events_ids)
auth_events = {
@ -943,11 +948,13 @@ class EventCreationHandler(object):
"You don't have permission to redact events"
)
if event.type == EventTypes.Create and context.prev_state_ids:
raise AuthError(
403,
"Changing the room create event is forbidden",
)
if event.type == EventTypes.Create:
prev_state_ids = yield context.get_prev_state_ids(self.store)
if prev_state_ids:
raise AuthError(
403,
"Changing the room create event is forbidden",
)
(event_stream_id, max_stream_id) = yield self.store.persist_event(
event, context=context

View file

@ -201,7 +201,9 @@ class RoomMemberHandler(object):
ratelimit=ratelimit,
)
prev_member_event_id = context.prev_state_ids.get(
prev_state_ids = yield context.get_prev_state_ids(self.store)
prev_member_event_id = prev_state_ids.get(
(EventTypes.Member, target.to_string()),
None
)
@ -496,9 +498,10 @@ class RoomMemberHandler(object):
if prev_event is not None:
return
prev_state_ids = yield context.get_prev_state_ids(self.store)
if event.membership == Membership.JOIN:
if requester.is_guest:
guest_can_join = yield self._can_guest_join(context.prev_state_ids)
guest_can_join = yield self._can_guest_join(prev_state_ids)
if not guest_can_join:
# This should be an auth check, but guests are a local concept,
# so don't really fit into the general auth process.
@ -517,7 +520,7 @@ class RoomMemberHandler(object):
ratelimit=ratelimit,
)
prev_member_event_id = context.prev_state_ids.get(
prev_member_event_id = prev_state_ids.get(
(EventTypes.Member, event.state_key),
None
)

View file

@ -274,7 +274,7 @@ class Notifier(object):
logger.exception("Error notifying application services of event")
def on_new_event(self, stream_key, new_token, users=[], rooms=[]):
""" Used to inform listeners that something has happend event wise.
""" Used to inform listeners that something has happened event wise.
Will wake up all listeners for the given users and rooms.
"""

View file

@ -112,7 +112,8 @@ class BulkPushRuleEvaluator(object):
@defer.inlineCallbacks
def _get_power_levels_and_sender_level(self, event, context):
pl_event_id = context.prev_state_ids.get(POWER_KEY)
prev_state_ids = yield context.get_prev_state_ids(self.store)
pl_event_id = prev_state_ids.get(POWER_KEY)
if pl_event_id:
# fastpath: if there's a power level event, that's all we need, and
# not having a power level event is an extreme edge case
@ -120,7 +121,7 @@ class BulkPushRuleEvaluator(object):
auth_events = {POWER_KEY: pl_event}
else:
auth_events_ids = yield self.auth.compute_auth_events(
event, context.prev_state_ids, for_verification=False,
event, prev_state_ids, for_verification=False,
)
auth_events = yield self.store.get_events(auth_events_ids)
auth_events = {
@ -304,7 +305,7 @@ class RulesForRoom(object):
push_rules_delta_state_cache_metric.inc_hits()
else:
current_state_ids = context.current_state_ids
current_state_ids = yield context.get_current_state_ids(self.store)
push_rules_delta_state_cache_metric.inc_misses()
push_rules_state_size_counter.inc(len(current_state_ids))

View file

@ -34,12 +34,13 @@ logger = logging.getLogger(__name__)
@defer.inlineCallbacks
def send_event_to_master(clock, client, host, port, requester, event, context,
def send_event_to_master(clock, store, client, host, port, requester, event, context,
ratelimit, extra_users):
"""Send event to be handled on the master
Args:
clock (synapse.util.Clock)
store (DataStore)
client (SimpleHttpClient)
host (str): host of master
port (int): port on master listening for HTTP replication
@ -53,11 +54,13 @@ def send_event_to_master(clock, client, host, port, requester, event, context,
host, port, event.event_id,
)
serialized_context = yield context.serialize(event, store)
payload = {
"event": event.get_pdu_json(),
"internal_metadata": event.internal_metadata.get_dict(),
"rejected_reason": event.rejected_reason,
"context": context.serialize(event),
"context": serialized_context,
"requester": requester.serialize(),
"ratelimit": ratelimit,
"extra_users": [u.to_string() for u in extra_users],

View file

@ -14,6 +14,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import hashlib
import hmac
import logging
from six.moves import http_client
@ -63,6 +65,125 @@ class UsersRestServlet(ClientV1RestServlet):
defer.returnValue((200, ret))
class UserRegisterServlet(ClientV1RestServlet):
"""
Attributes:
NONCE_TIMEOUT (int): Seconds until a generated nonce won't be accepted
nonces (dict[str, int]): The nonces that we will accept. A dict of
nonce to the time it was generated, in int seconds.
"""
PATTERNS = client_path_patterns("/admin/register")
NONCE_TIMEOUT = 60
def __init__(self, hs):
super(UserRegisterServlet, self).__init__(hs)
self.handlers = hs.get_handlers()
self.reactor = hs.get_reactor()
self.nonces = {}
self.hs = hs
def _clear_old_nonces(self):
"""
Clear out old nonces that are older than NONCE_TIMEOUT.
"""
now = int(self.reactor.seconds())
for k, v in list(self.nonces.items()):
if now - v > self.NONCE_TIMEOUT:
del self.nonces[k]
def on_GET(self, request):
"""
Generate a new nonce.
"""
self._clear_old_nonces()
nonce = self.hs.get_secrets().token_hex(64)
self.nonces[nonce] = int(self.reactor.seconds())
return (200, {"nonce": nonce.encode('ascii')})
@defer.inlineCallbacks
def on_POST(self, request):
self._clear_old_nonces()
if not self.hs.config.registration_shared_secret:
raise SynapseError(400, "Shared secret registration is not enabled")
body = parse_json_object_from_request(request)
if "nonce" not in body:
raise SynapseError(
400, "nonce must be specified", errcode=Codes.BAD_JSON,
)
nonce = body["nonce"]
if nonce not in self.nonces:
raise SynapseError(
400, "unrecognised nonce",
)
# Delete the nonce, so it can't be reused, even if it's invalid
del self.nonces[nonce]
if "username" not in body:
raise SynapseError(
400, "username must be specified", errcode=Codes.BAD_JSON,
)
else:
if (not isinstance(body['username'], str) or len(body['username']) > 512):
raise SynapseError(400, "Invalid username")
username = body["username"].encode("utf-8")
if b"\x00" in username:
raise SynapseError(400, "Invalid username")
if "password" not in body:
raise SynapseError(
400, "password must be specified", errcode=Codes.BAD_JSON,
)
else:
if (not isinstance(body['password'], str) or len(body['password']) > 512):
raise SynapseError(400, "Invalid password")
password = body["password"].encode("utf-8")
if b"\x00" in password:
raise SynapseError(400, "Invalid password")
admin = body.get("admin", None)
got_mac = body["mac"]
want_mac = hmac.new(
key=self.hs.config.registration_shared_secret.encode(),
digestmod=hashlib.sha1,
)
want_mac.update(nonce)
want_mac.update(b"\x00")
want_mac.update(username)
want_mac.update(b"\x00")
want_mac.update(password)
want_mac.update(b"\x00")
want_mac.update(b"admin" if admin else b"notadmin")
want_mac = want_mac.hexdigest()
if not hmac.compare_digest(want_mac, got_mac):
raise SynapseError(
403, "HMAC incorrect",
)
# Reuse the parts of RegisterRestServlet to reduce code duplication
from synapse.rest.client.v2_alpha.register import RegisterRestServlet
register = RegisterRestServlet(self.hs)
(user_id, _) = yield register.registration_handler.register(
localpart=username.lower(), password=password, admin=bool(admin),
generate_token=False,
)
result = yield register._create_registration_details(user_id, body)
defer.returnValue((200, result))
class WhoisRestServlet(ClientV1RestServlet):
PATTERNS = client_path_patterns("/admin/whois/(?P<user_id>[^/]*)")
@ -614,3 +735,4 @@ def register_servlets(hs, http_server):
ShutdownRoomRestServlet(hs).register(http_server)
QuarantineMediaInRoom(hs).register(http_server)
ListMediaInRoom(hs).register(http_server)
UserRegisterServlet(hs).register(http_server)

42
synapse/secrets.py Normal file
View file

@ -0,0 +1,42 @@
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Injectable secrets module for Synapse.
See https://docs.python.org/3/library/secrets.html#module-secrets for the API
used in Python 3.6, and the API emulated in Python 2.7.
"""
import six
if six.PY3:
import secrets
def Secrets():
return secrets
else:
import os
import binascii
class Secrets(object):
def token_bytes(self, nbytes=32):
return os.urandom(nbytes)
def token_hex(self, nbytes=32):
return binascii.hexlify(self.token_bytes(nbytes))

View file

@ -74,6 +74,7 @@ from synapse.rest.media.v1.media_repository import (
MediaRepository,
MediaRepositoryResource,
)
from synapse.secrets import Secrets
from synapse.server_notices.server_notices_manager import ServerNoticesManager
from synapse.server_notices.server_notices_sender import ServerNoticesSender
from synapse.server_notices.worker_server_notices_sender import WorkerServerNoticesSender
@ -158,6 +159,7 @@ class HomeServer(object):
'groups_server_handler',
'groups_attestation_signing',
'groups_attestation_renewer',
'secrets',
'spam_checker',
'room_member_handler',
'federation_registry',
@ -405,6 +407,9 @@ class HomeServer(object):
def build_groups_attestation_renewer(self):
return GroupAttestionRenewer(self)
def build_secrets(self):
return Secrets()
def build_spam_checker(self):
return SpamChecker(self)

View file

@ -18,7 +18,7 @@ import hashlib
import logging
from collections import namedtuple
from six import iteritems, itervalues
from six import iteritems, iterkeys, itervalues
from frozendict import frozendict
@ -203,25 +203,27 @@ class StateHandler(object):
# If this is an outlier, then we know it shouldn't have any current
# state. Certainly store.get_current_state won't return any, and
# persisting the event won't store the state group.
context = EventContext()
if old_state:
context.prev_state_ids = {
prev_state_ids = {
(s.type, s.state_key): s.event_id for s in old_state
}
if event.is_state():
context.current_state_ids = dict(context.prev_state_ids)
current_state_ids = dict(prev_state_ids)
key = (event.type, event.state_key)
context.current_state_ids[key] = event.event_id
current_state_ids[key] = event.event_id
else:
context.current_state_ids = context.prev_state_ids
current_state_ids = prev_state_ids
else:
context.current_state_ids = {}
context.prev_state_ids = {}
context.prev_state_events = []
current_state_ids = {}
prev_state_ids = {}
# We don't store state for outliers, so we don't generate a state
# froup for it.
context.state_group = None
# group for it.
context = EventContext.with_state(
state_group=None,
current_state_ids=current_state_ids,
prev_state_ids=prev_state_ids,
)
defer.returnValue(context)
@ -230,31 +232,35 @@ class StateHandler(object):
# Let's just correctly fill out the context and create a
# new state group for it.
context = EventContext()
context.prev_state_ids = {
prev_state_ids = {
(s.type, s.state_key): s.event_id for s in old_state
}
if event.is_state():
key = (event.type, event.state_key)
if key in context.prev_state_ids:
replaces = context.prev_state_ids[key]
if key in prev_state_ids:
replaces = prev_state_ids[key]
if replaces != event.event_id: # Paranoia check
event.unsigned["replaces_state"] = replaces
context.current_state_ids = dict(context.prev_state_ids)
context.current_state_ids[key] = event.event_id
current_state_ids = dict(prev_state_ids)
current_state_ids[key] = event.event_id
else:
context.current_state_ids = context.prev_state_ids
current_state_ids = prev_state_ids
context.state_group = yield self.store.store_state_group(
state_group = yield self.store.store_state_group(
event.event_id,
event.room_id,
prev_group=None,
delta_ids=None,
current_state_ids=context.current_state_ids,
current_state_ids=current_state_ids,
)
context = EventContext.with_state(
state_group=state_group,
current_state_ids=current_state_ids,
prev_state_ids=prev_state_ids,
)
context.prev_state_events = []
defer.returnValue(context)
logger.debug("calling resolve_state_groups from compute_event_context")
@ -262,47 +268,47 @@ class StateHandler(object):
event.room_id, [e for e, _ in event.prev_events],
)
curr_state = entry.state
prev_state_ids = entry.state
prev_group = None
delta_ids = None
context = EventContext()
context.prev_state_ids = curr_state
if event.is_state():
# If this is a state event then we need to create a new state
# group for the state after this event.
key = (event.type, event.state_key)
if key in context.prev_state_ids:
replaces = context.prev_state_ids[key]
if key in prev_state_ids:
replaces = prev_state_ids[key]
event.unsigned["replaces_state"] = replaces
context.current_state_ids = dict(context.prev_state_ids)
context.current_state_ids[key] = event.event_id
current_state_ids = dict(prev_state_ids)
current_state_ids[key] = event.event_id
if entry.state_group:
# If the state at the event has a state group assigned then
# we can use that as the prev group
context.prev_group = entry.state_group
context.delta_ids = {
prev_group = entry.state_group
delta_ids = {
key: event.event_id
}
elif entry.prev_group:
# If the state at the event only has a prev group, then we can
# use that as a prev group too.
context.prev_group = entry.prev_group
context.delta_ids = dict(entry.delta_ids)
context.delta_ids[key] = event.event_id
prev_group = entry.prev_group
delta_ids = dict(entry.delta_ids)
delta_ids[key] = event.event_id
context.state_group = yield self.store.store_state_group(
state_group = yield self.store.store_state_group(
event.event_id,
event.room_id,
prev_group=context.prev_group,
delta_ids=context.delta_ids,
current_state_ids=context.current_state_ids,
prev_group=prev_group,
delta_ids=delta_ids,
current_state_ids=current_state_ids,
)
else:
context.current_state_ids = context.prev_state_ids
context.prev_group = entry.prev_group
context.delta_ids = entry.delta_ids
current_state_ids = prev_state_ids
prev_group = entry.prev_group
delta_ids = entry.delta_ids
if entry.state_group is None:
entry.state_group = yield self.store.store_state_group(
@ -310,13 +316,20 @@ class StateHandler(object):
event.room_id,
prev_group=entry.prev_group,
delta_ids=entry.delta_ids,
current_state_ids=context.current_state_ids,
current_state_ids=current_state_ids,
)
entry.state_id = entry.state_group
context.state_group = entry.state_group
state_group = entry.state_group
context = EventContext.with_state(
state_group=state_group,
current_state_ids=current_state_ids,
prev_state_ids=prev_state_ids,
prev_group=prev_group,
delta_ids=delta_ids,
)
context.prev_state_events = []
defer.returnValue(context)
@defer.inlineCallbacks
@ -647,7 +660,7 @@ def resolve_events_with_factory(state_sets, event_map, state_map_factory):
for event_id in event_ids
)
if event_map is not None:
needed_events -= set(event_map.iterkeys())
needed_events -= set(iterkeys(event_map))
logger.info("Asking for %d conflicted events", len(needed_events))
@ -668,7 +681,7 @@ def resolve_events_with_factory(state_sets, event_map, state_map_factory):
new_needed_events = set(itervalues(auth_events))
new_needed_events -= needed_events
if event_map is not None:
new_needed_events -= set(event_map.iterkeys())
new_needed_events -= set(iterkeys(event_map))
logger.info("Asking for %d auth events", len(new_needed_events))

View file

@ -248,17 +248,31 @@ class DeviceStore(SQLBaseStore):
def _update_remote_device_list_cache_entry_txn(self, txn, user_id, device_id,
content, stream_id):
self._simple_upsert_txn(
txn,
table="device_lists_remote_cache",
keyvalues={
"user_id": user_id,
"device_id": device_id,
},
values={
"content": json.dumps(content),
}
)
if content.get("deleted"):
self._simple_delete_txn(
txn,
table="device_lists_remote_cache",
keyvalues={
"user_id": user_id,
"device_id": device_id,
},
)
txn.call_after(
self.device_id_exists_cache.invalidate, (user_id, device_id,)
)
else:
self._simple_upsert_txn(
txn,
table="device_lists_remote_cache",
keyvalues={
"user_id": user_id,
"device_id": device_id,
},
values={
"content": json.dumps(content),
}
)
txn.call_after(self._get_cached_user_device.invalidate, (user_id, device_id,))
txn.call_after(self._get_cached_devices_for_user.invalidate, (user_id,))
@ -366,7 +380,7 @@ class DeviceStore(SQLBaseStore):
now_stream_id = max(stream_id for stream_id in itervalues(query_map))
devices = self._get_e2e_device_keys_txn(
txn, query_map.keys(), include_all_devices=True
txn, query_map.keys(), include_all_devices=True, include_deleted_devices=True
)
prev_sent_id_sql = """
@ -393,12 +407,15 @@ class DeviceStore(SQLBaseStore):
prev_id = stream_id
key_json = device.get("key_json", None)
if key_json:
result["keys"] = json.loads(key_json)
device_display_name = device.get("device_display_name", None)
if device_display_name:
result["device_display_name"] = device_display_name
if device is not None:
key_json = device.get("key_json", None)
if key_json:
result["keys"] = json.loads(key_json)
device_display_name = device.get("device_display_name", None)
if device_display_name:
result["device_display_name"] = device_display_name
else:
result["deleted"] = True
results.append(result)

View file

@ -64,12 +64,18 @@ class EndToEndKeyStore(SQLBaseStore):
)
@defer.inlineCallbacks
def get_e2e_device_keys(self, query_list, include_all_devices=False):
def get_e2e_device_keys(
self, query_list, include_all_devices=False,
include_deleted_devices=False,
):
"""Fetch a list of device keys.
Args:
query_list(list): List of pairs of user_ids and device_ids.
include_all_devices (bool): whether to include entries for devices
that don't have device keys
include_deleted_devices (bool): whether to include null entries for
devices which no longer exist (but were in the query_list).
This option only takes effect if include_all_devices is true.
Returns:
Dict mapping from user-id to dict mapping from device_id to
dict containing "key_json", "device_display_name".
@ -79,7 +85,7 @@ class EndToEndKeyStore(SQLBaseStore):
results = yield self.runInteraction(
"get_e2e_device_keys", self._get_e2e_device_keys_txn,
query_list, include_all_devices,
query_list, include_all_devices, include_deleted_devices,
)
for user_id, device_keys in iteritems(results):
@ -88,10 +94,19 @@ class EndToEndKeyStore(SQLBaseStore):
defer.returnValue(results)
def _get_e2e_device_keys_txn(self, txn, query_list, include_all_devices):
def _get_e2e_device_keys_txn(
self, txn, query_list, include_all_devices=False,
include_deleted_devices=False,
):
query_clauses = []
query_params = []
if include_all_devices is False:
include_deleted_devices = False
if include_deleted_devices:
deleted_devices = set(query_list)
for (user_id, device_id) in query_list:
query_clause = "user_id = ?"
query_params.append(user_id)
@ -119,8 +134,14 @@ class EndToEndKeyStore(SQLBaseStore):
result = {}
for row in rows:
if include_deleted_devices:
deleted_devices.remove((row["user_id"], row["device_id"]))
result.setdefault(row["user_id"], {})[row["device_id"]] = row
if include_deleted_devices:
for user_id, device_id in deleted_devices:
result.setdefault(user_id, {})[device_id] = None
return result
@defer.inlineCallbacks

View file

@ -549,7 +549,12 @@ class EventsStore(EventsWorkerStore):
if ctx.state_group in state_groups_map:
continue
state_groups_map[ctx.state_group] = ctx.current_state_ids
# We're only interested in pulling out state that has already
# been cached in the context. We'll pull stuff out of the DB later
# if necessary.
current_state_ids = ctx.get_cached_current_state_ids()
if current_state_ids is not None:
state_groups_map[ctx.state_group] = current_state_ids
# We need to map the event_ids to their state groups. First, let's
# check if the event is one we're persisting, in which case we can

View file

@ -186,6 +186,7 @@ class PushRulesWorkerStore(ApplicationServiceWorkerStore,
defer.returnValue(results)
@defer.inlineCallbacks
def bulk_get_push_rules_for_room(self, event, context):
state_group = context.state_group
if not state_group:
@ -195,9 +196,11 @@ class PushRulesWorkerStore(ApplicationServiceWorkerStore,
# To do this we set the state_group to a new object as object() != object()
state_group = object()
return self._bulk_get_push_rules_for_room(
event.room_id, state_group, context.current_state_ids, event=event
current_state_ids = yield context.get_current_state_ids(self)
result = yield self._bulk_get_push_rules_for_room(
event.room_id, state_group, current_state_ids, event=event
)
defer.returnValue(result)
@cachedInlineCallbacks(num_args=2, cache_context=True)
def _bulk_get_push_rules_for_room(self, room_id, state_group, current_state_ids,

View file

@ -232,6 +232,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
defer.returnValue(user_who_share_room)
@defer.inlineCallbacks
def get_joined_users_from_context(self, event, context):
state_group = context.state_group
if not state_group:
@ -241,11 +242,13 @@ class RoomMemberWorkerStore(EventsWorkerStore):
# To do this we set the state_group to a new object as object() != object()
state_group = object()
return self._get_joined_users_from_context(
event.room_id, state_group, context.current_state_ids,
current_state_ids = yield context.get_current_state_ids(self)
result = yield self._get_joined_users_from_context(
event.room_id, state_group, current_state_ids,
event=event,
context=context,
)
defer.returnValue(result)
def get_joined_users_from_state(self, room_id, state_entry):
state_group = state_entry.state_group

View file

@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -12,7 +13,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
import logging
from contextlib import contextmanager
@ -156,54 +157,72 @@ def concurrently_execute(func, args, limit):
class Linearizer(object):
"""Linearizes access to resources based on a key. Useful to ensure only one
thing is happening at a time on a given resource.
"""Limits concurrent access to resources based on a key. Useful to ensure
only a few things happen at a time on a given resource.
Example:
with (yield linearizer.queue("test_key")):
with (yield limiter.queue("test_key")):
# do some work.
"""
def __init__(self, name=None, clock=None):
def __init__(self, name=None, max_count=1, clock=None):
"""
Args:
max_count(int): The maximum number of concurrent accesses
"""
if name is None:
self.name = id(self)
else:
self.name = name
self.key_to_defer = {}
if not clock:
from twisted.internet import reactor
clock = Clock(reactor)
self._clock = clock
self.max_count = max_count
# key_to_defer is a map from the key to a 2 element list where
# the first element is the number of things executing, and
# the second element is an OrderedDict, where the keys are deferreds for the
# things blocked from executing.
self.key_to_defer = {}
@defer.inlineCallbacks
def queue(self, key):
# If there is already a deferred in the queue, we pull it out so that
# we can wait on it later.
# Then we replace it with a deferred that we resolve *after* the
# context manager has exited.
# We only return the context manager after the previous deferred has
# resolved.
# This all has the net effect of creating a chain of deferreds that
# wait for the previous deferred before starting their work.
current_defer = self.key_to_defer.get(key)
entry = self.key_to_defer.setdefault(key, [0, collections.OrderedDict()])
new_defer = defer.Deferred()
self.key_to_defer[key] = new_defer
# If the number of things executing is greater than the maximum
# then add a deferred to the list of blocked items
# When on of the things currently executing finishes it will callback
# this item so that it can continue executing.
if entry[0] >= self.max_count:
new_defer = defer.Deferred()
entry[1][new_defer] = 1
if current_defer:
logger.info(
"Waiting to acquire linearizer lock %r for key %r", self.name, key
"Waiting to acquire linearizer lock %r for key %r", self.name, key,
)
try:
with PreserveLoggingContext():
yield current_defer
except Exception:
logger.exception("Unexpected exception in Linearizer")
yield make_deferred_yieldable(new_defer)
except Exception as e:
if isinstance(e, CancelledError):
logger.info(
"Cancelling wait for linearizer lock %r for key %r",
self.name, key,
)
else:
logger.warn(
"Unexpected exception waiting for linearizer lock %r for key %r",
self.name, key,
)
logger.info("Acquired linearizer lock %r for key %r", self.name,
key)
# we just have to take ourselves back out of the queue.
del entry[1][new_defer]
raise
logger.info("Acquired linearizer lock %r for key %r", self.name, key)
entry[0] += 1
# if the code holding the lock completes synchronously, then it
# will recursively run the next claimant on the list. That can
@ -213,15 +232,15 @@ class Linearizer(object):
# In order to break the cycle, we add a cheeky sleep(0) here to
# ensure that we fall back to the reactor between each iteration.
#
# (There's no particular need for it to happen before we return
# the context manager, but it needs to happen while we hold the
# lock, and the context manager's exit code must be synchronous,
# so actually this is the only sensible place.
# (This needs to happen while we hold the lock, and the context manager's exit
# code must be synchronous, so this is the only sensible place.)
yield self._clock.sleep(0)
else:
logger.info("Acquired uncontended linearizer lock %r for key %r",
self.name, key)
logger.info(
"Acquired uncontended linearizer lock %r for key %r", self.name, key,
)
entry[0] += 1
@contextmanager
def _ctx_manager():
@ -229,73 +248,15 @@ class Linearizer(object):
yield
finally:
logger.info("Releasing linearizer lock %r for key %r", self.name, key)
with PreserveLoggingContext():
new_defer.callback(None)
current_d = self.key_to_defer.get(key)
if current_d is new_defer:
self.key_to_defer.pop(key, None)
defer.returnValue(_ctx_manager())
class Limiter(object):
"""Limits concurrent access to resources based on a key. Useful to ensure
only a few thing happen at a time on a given resource.
Example:
with (yield limiter.queue("test_key")):
# do some work.
"""
def __init__(self, max_count):
"""
Args:
max_count(int): The maximum number of concurrent access
"""
self.max_count = max_count
# key_to_defer is a map from the key to a 2 element list where
# the first element is the number of things executing
# the second element is a list of deferreds for the things blocked from
# executing.
self.key_to_defer = {}
@defer.inlineCallbacks
def queue(self, key):
entry = self.key_to_defer.setdefault(key, [0, []])
# If the number of things executing is greater than the maximum
# then add a deferred to the list of blocked items
# When on of the things currently executing finishes it will callback
# this item so that it can continue executing.
if entry[0] >= self.max_count:
new_defer = defer.Deferred()
entry[1].append(new_defer)
logger.info("Waiting to acquire limiter lock for key %r", key)
with PreserveLoggingContext():
yield new_defer
logger.info("Acquired limiter lock for key %r", key)
else:
logger.info("Acquired uncontended limiter lock for key %r", key)
entry[0] += 1
@contextmanager
def _ctx_manager():
try:
yield
finally:
logger.info("Releasing limiter lock for key %r", key)
# We've finished executing so check if there are any things
# blocked waiting to execute and start one of them
entry[0] -= 1
if entry[1]:
next_def = entry[1].pop(0)
(next_def, _) = entry[1].popitem(last=False)
# we need to run the next thing in the sentinel context.
with PreserveLoggingContext():
next_def.callback(None)
elif entry[0] == 0:

View file

@ -99,6 +99,17 @@ class ContextResourceUsage(object):
self.db_sched_duration_sec = 0
self.evt_db_fetch_count = 0
def __repr__(self):
return ("<ContextResourceUsage ru_stime='%r', ru_utime='%r', "
"db_txn_count='%r', db_txn_duration_sec='%r', "
"db_sched_duration_sec='%r', evt_db_fetch_count='%r'>") % (
self.ru_stime,
self.ru_utime,
self.db_txn_count,
self.db_txn_duration_sec,
self.db_sched_duration_sec,
self.evt_db_fetch_count,)
def __iadd__(self, other):
"""Add another ContextResourceUsage's stats to this one's.

View file

@ -104,12 +104,19 @@ class Measure(object):
logger.warn("Expected context. (%r)", self.name)
return
usage = context.get_resource_usage() - self.start_usage
block_ru_utime.labels(self.name).inc(usage.ru_utime)
block_ru_stime.labels(self.name).inc(usage.ru_stime)
block_db_txn_count.labels(self.name).inc(usage.db_txn_count)
block_db_txn_duration.labels(self.name).inc(usage.db_txn_duration_sec)
block_db_sched_duration.labels(self.name).inc(usage.db_sched_duration_sec)
current = context.get_resource_usage()
usage = current - self.start_usage
try:
block_ru_utime.labels(self.name).inc(usage.ru_utime)
block_ru_stime.labels(self.name).inc(usage.ru_stime)
block_db_txn_count.labels(self.name).inc(usage.db_txn_count)
block_db_txn_duration.labels(self.name).inc(usage.db_txn_duration_sec)
block_db_sched_duration.labels(self.name).inc(usage.db_sched_duration_sec)
except ValueError:
logger.warn(
"Failed to save metrics! OLD: %r, NEW: %r",
self.start_usage, current
)
if self.created_context:
self.start_context.__exit__(exc_type, exc_val, exc_tb)

View file

@ -12,11 +12,12 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import itertools
import logging
import operator
import six
from six import iteritems, itervalues
from six.moves import map
from twisted.internet import defer
@ -221,7 +222,7 @@ def filter_events_for_client(store, user_id, events, is_peeking=False,
return event
# check each event: gives an iterable[None|EventBase]
filtered_events = itertools.imap(allowed, events)
filtered_events = map(allowed, events)
# remove the None entries
filtered_events = filter(operator.truth, filtered_events)
@ -261,7 +262,7 @@ def filter_events_for_server(store, server_name, events):
# membership states for the requesting server to determine
# if the server is either in the room or has been invited
# into the room.
for ev in state.itervalues():
for ev in itervalues(state):
if ev.type != EventTypes.Member:
continue
try:
@ -295,7 +296,7 @@ def filter_events_for_server(store, server_name, events):
)
visibility_ids = set()
for sids in event_to_state_ids.itervalues():
for sids in itervalues(event_to_state_ids):
hist = sids.get((EventTypes.RoomHistoryVisibility, ""))
if hist:
visibility_ids.add(hist)
@ -308,7 +309,7 @@ def filter_events_for_server(store, server_name, events):
event_map = yield store.get_events(visibility_ids)
all_open = all(
e.content.get("history_visibility") in (None, "shared", "world_readable")
for e in event_map.itervalues()
for e in itervalues(event_map)
)
if all_open:
@ -346,7 +347,7 @@ def filter_events_for_server(store, server_name, events):
#
state_key_to_event_id_set = {
e
for key_to_eid in six.itervalues(event_to_state_ids)
for key_to_eid in itervalues(event_to_state_ids)
for e in key_to_eid.items()
}
@ -369,10 +370,10 @@ def filter_events_for_server(store, server_name, events):
event_to_state = {
e_id: {
key: event_map[inner_e_id]
for key, inner_e_id in key_to_eid.iteritems()
for key, inner_e_id in iteritems(key_to_eid)
if inner_e_id in event_map
}
for e_id, key_to_eid in event_to_state_ids.iteritems()
for e_id, key_to_eid in iteritems(event_to_state_ids)
}
defer.returnValue([

View file

@ -222,9 +222,11 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase):
state_ids = {
key: e.event_id for key, e in state.items()
}
context = EventContext()
context.current_state_ids = state_ids
context.prev_state_ids = state_ids
context = EventContext.with_state(
state_group=None,
current_state_ids=state_ids,
prev_state_ids=state_ids
)
else:
state_handler = self.hs.get_state_handler()
context = yield state_handler.compute_event_context(event)

View file

@ -0,0 +1,305 @@
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import hashlib
import hmac
import json
from mock import Mock
from synapse.http.server import JsonResource
from synapse.rest.client.v1.admin import register_servlets
from synapse.util import Clock
from tests import unittest
from tests.server import (
ThreadedMemoryReactorClock,
make_request,
render,
setup_test_homeserver,
)
class UserRegisterTestCase(unittest.TestCase):
def setUp(self):
self.clock = ThreadedMemoryReactorClock()
self.hs_clock = Clock(self.clock)
self.url = "/_matrix/client/r0/admin/register"
self.registration_handler = Mock()
self.identity_handler = Mock()
self.login_handler = Mock()
self.device_handler = Mock()
self.device_handler.check_device_registered = Mock(return_value="FAKE")
self.datastore = Mock(return_value=Mock())
self.datastore.get_current_state_deltas = Mock(return_value=[])
self.secrets = Mock()
self.hs = setup_test_homeserver(
http_client=None, clock=self.hs_clock, reactor=self.clock
)
self.hs.config.registration_shared_secret = u"shared"
self.hs.get_media_repository = Mock()
self.hs.get_deactivate_account_handler = Mock()
self.resource = JsonResource(self.hs)
register_servlets(self.hs, self.resource)
def test_disabled(self):
"""
If there is no shared secret, registration through this method will be
prevented.
"""
self.hs.config.registration_shared_secret = None
request, channel = make_request("POST", self.url, b'{}')
render(request, self.resource, self.clock)
self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual(
'Shared secret registration is not enabled', channel.json_body["error"]
)
def test_get_nonce(self):
"""
Calling GET on the endpoint will return a randomised nonce, using the
homeserver's secrets provider.
"""
secrets = Mock()
secrets.token_hex = Mock(return_value="abcd")
self.hs.get_secrets = Mock(return_value=secrets)
request, channel = make_request("GET", self.url)
render(request, self.resource, self.clock)
self.assertEqual(channel.json_body, {"nonce": "abcd"})
def test_expired_nonce(self):
"""
Calling GET on the endpoint will return a randomised nonce, which will
only last for SALT_TIMEOUT (60s).
"""
request, channel = make_request("GET", self.url)
render(request, self.resource, self.clock)
nonce = channel.json_body["nonce"]
# 59 seconds
self.clock.advance(59)
body = json.dumps({"nonce": nonce})
request, channel = make_request("POST", self.url, body.encode('utf8'))
render(request, self.resource, self.clock)
self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual('username must be specified', channel.json_body["error"])
# 61 seconds
self.clock.advance(2)
request, channel = make_request("POST", self.url, body.encode('utf8'))
render(request, self.resource, self.clock)
self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual('unrecognised nonce', channel.json_body["error"])
def test_register_incorrect_nonce(self):
"""
Only the provided nonce can be used, as it's checked in the MAC.
"""
request, channel = make_request("GET", self.url)
render(request, self.resource, self.clock)
nonce = channel.json_body["nonce"]
want_mac = hmac.new(key=b"shared", digestmod=hashlib.sha1)
want_mac.update(b"notthenonce\x00bob\x00abc123\x00admin")
want_mac = want_mac.hexdigest()
body = json.dumps(
{
"nonce": nonce,
"username": "bob",
"password": "abc123",
"admin": True,
"mac": want_mac,
}
).encode('utf8')
request, channel = make_request("POST", self.url, body.encode('utf8'))
render(request, self.resource, self.clock)
self.assertEqual(403, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual("HMAC incorrect", channel.json_body["error"])
def test_register_correct_nonce(self):
"""
When the correct nonce is provided, and the right key is provided, the
user is registered.
"""
request, channel = make_request("GET", self.url)
render(request, self.resource, self.clock)
nonce = channel.json_body["nonce"]
want_mac = hmac.new(key=b"shared", digestmod=hashlib.sha1)
want_mac.update(nonce.encode('ascii') + b"\x00bob\x00abc123\x00admin")
want_mac = want_mac.hexdigest()
body = json.dumps(
{
"nonce": nonce,
"username": "bob",
"password": "abc123",
"admin": True,
"mac": want_mac,
}
).encode('utf8')
request, channel = make_request("POST", self.url, body.encode('utf8'))
render(request, self.resource, self.clock)
self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual("@bob:test", channel.json_body["user_id"])
def test_nonce_reuse(self):
"""
A valid unrecognised nonce.
"""
request, channel = make_request("GET", self.url)
render(request, self.resource, self.clock)
nonce = channel.json_body["nonce"]
want_mac = hmac.new(key=b"shared", digestmod=hashlib.sha1)
want_mac.update(nonce.encode('ascii') + b"\x00bob\x00abc123\x00admin")
want_mac = want_mac.hexdigest()
body = json.dumps(
{
"nonce": nonce,
"username": "bob",
"password": "abc123",
"admin": True,
"mac": want_mac,
}
).encode('utf8')
request, channel = make_request("POST", self.url, body.encode('utf8'))
render(request, self.resource, self.clock)
self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual("@bob:test", channel.json_body["user_id"])
# Now, try and reuse it
request, channel = make_request("POST", self.url, body.encode('utf8'))
render(request, self.resource, self.clock)
self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual('unrecognised nonce', channel.json_body["error"])
def test_missing_parts(self):
"""
Synapse will complain if you don't give nonce, username, password, and
mac. Admin is optional. Additional checks are done for length and
type.
"""
def nonce():
request, channel = make_request("GET", self.url)
render(request, self.resource, self.clock)
return channel.json_body["nonce"]
#
# Nonce check
#
# Must be present
body = json.dumps({})
request, channel = make_request("POST", self.url, body.encode('utf8'))
render(request, self.resource, self.clock)
self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual('nonce must be specified', channel.json_body["error"])
#
# Username checks
#
# Must be present
body = json.dumps({"nonce": nonce()})
request, channel = make_request("POST", self.url, body.encode('utf8'))
render(request, self.resource, self.clock)
self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual('username must be specified', channel.json_body["error"])
# Must be a string
body = json.dumps({"nonce": nonce(), "username": 1234})
request, channel = make_request("POST", self.url, body.encode('utf8'))
render(request, self.resource, self.clock)
self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual('Invalid username', channel.json_body["error"])
# Must not have null bytes
body = json.dumps({"nonce": nonce(), "username": b"abcd\x00"})
request, channel = make_request("POST", self.url, body.encode('utf8'))
render(request, self.resource, self.clock)
self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual('Invalid username', channel.json_body["error"])
# Must not have null bytes
body = json.dumps({"nonce": nonce(), "username": "a" * 1000})
request, channel = make_request("POST", self.url, body.encode('utf8'))
render(request, self.resource, self.clock)
self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual('Invalid username', channel.json_body["error"])
#
# Username checks
#
# Must be present
body = json.dumps({"nonce": nonce(), "username": "a"})
request, channel = make_request("POST", self.url, body.encode('utf8'))
render(request, self.resource, self.clock)
self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual('password must be specified', channel.json_body["error"])
# Must be a string
body = json.dumps({"nonce": nonce(), "username": "a", "password": 1234})
request, channel = make_request("POST", self.url, body.encode('utf8'))
render(request, self.resource, self.clock)
self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual('Invalid password', channel.json_body["error"])
# Must not have null bytes
body = json.dumps({"nonce": nonce(), "username": "a", "password": b"abcd\x00"})
request, channel = make_request("POST", self.url, body.encode('utf8'))
render(request, self.resource, self.clock)
self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual('Invalid password', channel.json_body["error"])
# Super long
body = json.dumps({"nonce": nonce(), "username": "a", "password": "A" * 1000})
request, channel = make_request("POST", self.url, body.encode('utf8'))
render(request, self.resource, self.clock)
self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
self.assertEqual('Invalid password', channel.json_body["error"])

View file

@ -137,7 +137,6 @@ class MessageAcceptTests(unittest.TestCase):
)
self.assertEqual(self.successResultOf(extrem)[0], "$join:test.serv")
@unittest.DEBUG
def test_cant_hide_past_history(self):
"""
If you send a message, you must be able to provide the direct
@ -178,7 +177,7 @@ class MessageAcceptTests(unittest.TestCase):
for x, y in d.items()
if x == ("m.room.member", "@us:test")
],
"auth_chain_ids": d.values(),
"auth_chain_ids": list(d.values()),
}
)

View file

@ -204,7 +204,8 @@ class StateTestCase(unittest.TestCase):
self.store.register_event_context(event, context)
context_store[event.event_id] = context
self.assertEqual(2, len(context_store["D"].prev_state_ids))
prev_state_ids = yield context_store["D"].get_prev_state_ids(self.store)
self.assertEqual(2, len(prev_state_ids))
@defer.inlineCallbacks
def test_branch_basic_conflict(self):
@ -255,9 +256,11 @@ class StateTestCase(unittest.TestCase):
self.store.register_event_context(event, context)
context_store[event.event_id] = context
prev_state_ids = yield context_store["D"].get_prev_state_ids(self.store)
self.assertSetEqual(
{"START", "A", "C"},
{e_id for e_id in context_store["D"].prev_state_ids.values()}
{e_id for e_id in prev_state_ids.values()}
)
@defer.inlineCallbacks
@ -318,9 +321,11 @@ class StateTestCase(unittest.TestCase):
self.store.register_event_context(event, context)
context_store[event.event_id] = context
prev_state_ids = yield context_store["E"].get_prev_state_ids(self.store)
self.assertSetEqual(
{"START", "A", "B", "C"},
{e for e in context_store["E"].prev_state_ids.values()}
{e for e in prev_state_ids.values()}
)
@defer.inlineCallbacks
@ -398,9 +403,11 @@ class StateTestCase(unittest.TestCase):
self.store.register_event_context(event, context)
context_store[event.event_id] = context
prev_state_ids = yield context_store["D"].get_prev_state_ids(self.store)
self.assertSetEqual(
{"A1", "A2", "A3", "A5", "B"},
{e for e in context_store["D"].prev_state_ids.values()}
{e for e in prev_state_ids.values()}
)
def _add_depths(self, nodes, edges):
@ -429,8 +436,10 @@ class StateTestCase(unittest.TestCase):
event, old_state=old_state
)
current_state_ids = yield context.get_current_state_ids(self.store)
self.assertEqual(
set(e.event_id for e in old_state), set(context.current_state_ids.values())
set(e.event_id for e in old_state), set(current_state_ids.values())
)
self.assertIsNotNone(context.state_group)
@ -449,8 +458,10 @@ class StateTestCase(unittest.TestCase):
event, old_state=old_state
)
prev_state_ids = yield context.get_prev_state_ids(self.store)
self.assertEqual(
set(e.event_id for e in old_state), set(context.prev_state_ids.values())
set(e.event_id for e in old_state), set(prev_state_ids.values())
)
@defer.inlineCallbacks
@ -475,9 +486,11 @@ class StateTestCase(unittest.TestCase):
context = yield self.state.compute_event_context(event)
current_state_ids = yield context.get_current_state_ids(self.store)
self.assertEqual(
set([e.event_id for e in old_state]),
set(context.current_state_ids.values())
set(current_state_ids.values())
)
self.assertEqual(group_name, context.state_group)
@ -504,9 +517,11 @@ class StateTestCase(unittest.TestCase):
context = yield self.state.compute_event_context(event)
prev_state_ids = yield context.get_prev_state_ids(self.store)
self.assertEqual(
set([e.event_id for e in old_state]),
set(context.prev_state_ids.values())
set(prev_state_ids.values())
)
self.assertIsNotNone(context.state_group)
@ -545,7 +560,9 @@ class StateTestCase(unittest.TestCase):
event, prev_event_id1, old_state_1, prev_event_id2, old_state_2,
)
self.assertEqual(len(context.current_state_ids), 6)
current_state_ids = yield context.get_current_state_ids(self.store)
self.assertEqual(len(current_state_ids), 6)
self.assertIsNotNone(context.state_group)
@ -585,7 +602,9 @@ class StateTestCase(unittest.TestCase):
event, prev_event_id1, old_state_1, prev_event_id2, old_state_2,
)
self.assertEqual(len(context.current_state_ids), 6)
current_state_ids = yield context.get_current_state_ids(self.store)
self.assertEqual(len(current_state_ids), 6)
self.assertIsNotNone(context.state_group)
@ -642,8 +661,10 @@ class StateTestCase(unittest.TestCase):
event, prev_event_id1, old_state_1, prev_event_id2, old_state_2,
)
current_state_ids = yield context.get_current_state_ids(self.store)
self.assertEqual(
old_state_2[3].event_id, context.current_state_ids[("test1", "1")]
old_state_2[3].event_id, current_state_ids[("test1", "1")]
)
# Reverse the depth to make sure we are actually using the depths
@ -670,8 +691,10 @@ class StateTestCase(unittest.TestCase):
event, prev_event_id1, old_state_1, prev_event_id2, old_state_2,
)
current_state_ids = yield context.get_current_state_ids(self.store)
self.assertEqual(
old_state_1[3].event_id, context.current_state_ids[("test1", "1")]
old_state_1[3].event_id, current_state_ids[("test1", "1")]
)
def _get_context(self, event, prev_event_id_1, old_state_1, prev_event_id_2,

View file

@ -1,70 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet import defer
from synapse.util.async import Limiter
from tests import unittest
class LimiterTestCase(unittest.TestCase):
@defer.inlineCallbacks
def test_limiter(self):
limiter = Limiter(3)
key = object()
d1 = limiter.queue(key)
cm1 = yield d1
d2 = limiter.queue(key)
cm2 = yield d2
d3 = limiter.queue(key)
cm3 = yield d3
d4 = limiter.queue(key)
self.assertFalse(d4.called)
d5 = limiter.queue(key)
self.assertFalse(d5.called)
with cm1:
self.assertFalse(d4.called)
self.assertFalse(d5.called)
self.assertTrue(d4.called)
self.assertFalse(d5.called)
with cm3:
self.assertFalse(d5.called)
self.assertTrue(d5.called)
with cm2:
pass
with (yield d4):
pass
with (yield d5):
pass
d6 = limiter.queue(key)
with (yield d6):
pass

View file

@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -16,6 +17,7 @@
from six.moves import range
from twisted.internet import defer, reactor
from twisted.internet.defer import CancelledError
from synapse.util import Clock, logcontext
from synapse.util.async import Linearizer
@ -65,3 +67,79 @@ class LinearizerTestCase(unittest.TestCase):
func(i)
return func(1000)
@defer.inlineCallbacks
def test_multiple_entries(self):
limiter = Linearizer(max_count=3)
key = object()
d1 = limiter.queue(key)
cm1 = yield d1
d2 = limiter.queue(key)
cm2 = yield d2
d3 = limiter.queue(key)
cm3 = yield d3
d4 = limiter.queue(key)
self.assertFalse(d4.called)
d5 = limiter.queue(key)
self.assertFalse(d5.called)
with cm1:
self.assertFalse(d4.called)
self.assertFalse(d5.called)
cm4 = yield d4
self.assertFalse(d5.called)
with cm3:
self.assertFalse(d5.called)
cm5 = yield d5
with cm2:
pass
with cm4:
pass
with cm5:
pass
d6 = limiter.queue(key)
with (yield d6):
pass
@defer.inlineCallbacks
def test_cancellation(self):
linearizer = Linearizer()
key = object()
d1 = linearizer.queue(key)
cm1 = yield d1
d2 = linearizer.queue(key)
self.assertFalse(d2.called)
d3 = linearizer.queue(key)
self.assertFalse(d3.called)
d2.cancel()
with cm1:
pass
self.assertTrue(d2.called)
try:
yield d2
self.fail("Expected d2 to raise CancelledError")
except CancelledError:
pass
with (yield d3):
pass

View file

@ -71,6 +71,8 @@ def setup_test_homeserver(name="test", datastore=None, config=None, reactor=None
config.user_directory_search_all_users = False
config.user_consent_server_notice_content = None
config.block_events_without_consent_error = None
config.media_storage_providers = []
config.auto_join_rooms = []
# disable user directory updates, because they get done in the
# background, which upsets the test runner.
@ -136,6 +138,7 @@ def setup_test_homeserver(name="test", datastore=None, config=None, reactor=None
database_engine=db_engine,
room_list_handler=object(),
tls_server_context_factory=Mock(),
reactor=reactor,
**kargs
)