Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes
This commit is contained in:
commit
8d69193a42
1
changelog.d/5216.misc
Normal file
1
changelog.d/5216.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Synapse will now serve the experimental "room complexity" API endpoint.
|
1
changelog.d/5233.bugfix
Normal file
1
changelog.d/5233.bugfix
Normal file
|
@ -0,0 +1 @@
|
|||
Fix appservice timestamp massaging.
|
1
changelog.d/5249.feature
Normal file
1
changelog.d/5249.feature
Normal file
|
@ -0,0 +1 @@
|
|||
Ability to configure default room version.
|
1
changelog.d/5250.misc
Normal file
1
changelog.d/5250.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Simplification to Keyring.wait_for_previous_lookups.
|
1
changelog.d/5251.bugfix
Normal file
1
changelog.d/5251.bugfix
Normal file
|
@ -0,0 +1 @@
|
|||
Ensure that server_keys fetched via a notary server are correctly signed.
|
1
changelog.d/5256.bugfix
Normal file
1
changelog.d/5256.bugfix
Normal file
|
@ -0,0 +1 @@
|
|||
Show the correct error when logging out and access token is missing.
|
1
changelog.d/5257.bugfix
Normal file
1
changelog.d/5257.bugfix
Normal file
|
@ -0,0 +1 @@
|
|||
Fix error code when there is an invalid parameter on /_matrix/client/r0/publicRooms
|
1
changelog.d/5258.bugfix
Normal file
1
changelog.d/5258.bugfix
Normal file
|
@ -0,0 +1 @@
|
|||
Fix error when downloading thumbnail with missing width/height parameter.
|
1
changelog.d/5260.feature
Normal file
1
changelog.d/5260.feature
Normal file
|
@ -0,0 +1 @@
|
|||
Synapse now more efficiently collates room statistics.
|
1
changelog.d/5268.bugfix
Normal file
1
changelog.d/5268.bugfix
Normal file
|
@ -0,0 +1 @@
|
|||
Fix schema update for account validity.
|
1
changelog.d/5274.bugfix
Normal file
1
changelog.d/5274.bugfix
Normal file
|
@ -0,0 +1 @@
|
|||
Fix bug where we leaked extremities when we soft failed events, leading to performance degradation.
|
1
changelog.d/5275.bugfix
Normal file
1
changelog.d/5275.bugfix
Normal file
|
@ -0,0 +1 @@
|
|||
Fix "db txn 'update_presence' from sentinel context" log messages.
|
1
changelog.d/5277.bugfix
Normal file
1
changelog.d/5277.bugfix
Normal file
|
@ -0,0 +1 @@
|
|||
Fix dropped logcontexts during high outbound traffic.
|
1
changelog.d/5278.bugfix
Normal file
1
changelog.d/5278.bugfix
Normal file
|
@ -0,0 +1 @@
|
|||
Fix bug where we leaked extremities when we soft failed events, leading to performance degradation.
|
1
changelog.d/5282.doc
Normal file
1
changelog.d/5282.doc
Normal file
|
@ -0,0 +1 @@
|
|||
Fix docs on resetting the user directory.
|
1
changelog.d/5283.misc
Normal file
1
changelog.d/5283.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Specify the type of reCAPTCHA key to use.
|
1
changelog.d/5286.feature
Normal file
1
changelog.d/5286.feature
Normal file
|
@ -0,0 +1 @@
|
|||
CAS login will now hit the r0 API, not the deprecated v1 one.
|
1
changelog.d/5287.misc
Normal file
1
changelog.d/5287.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Remove spurious debug from MatrixFederationHttpClient.get_json.
|
1
changelog.d/5288.misc
Normal file
1
changelog.d/5288.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Improve logging for logcontext leaks.
|
|
@ -7,6 +7,7 @@ Requires a public/private key pair from:
|
|||
|
||||
https://developers.google.com/recaptcha/
|
||||
|
||||
Must be a reCAPTCHA v2 key using the "I'm not a robot" Checkbox option
|
||||
|
||||
Setting ReCaptcha Keys
|
||||
----------------------
|
||||
|
|
|
@ -90,6 +90,7 @@ pid_file: DATADIR/homeserver.pid
|
|||
#
|
||||
# For example, for room version 1, default_room_version should be set
|
||||
# to "1".
|
||||
#
|
||||
#default_room_version: "1"
|
||||
|
||||
# The GC threshold parameters to pass to `gc.set_threshold`, if defined
|
||||
|
@ -1102,9 +1103,9 @@ password_config:
|
|||
#
|
||||
# 'search_all_users' defines whether to search all users visible to your HS
|
||||
# when searching the user directory, rather than limiting to users visible
|
||||
# in public rooms. Defaults to false. If you set it True, you'll have to run
|
||||
# UPDATE user_directory_stream_pos SET stream_id = NULL;
|
||||
# on your database to tell it to rebuild the user_directory search indexes.
|
||||
# in public rooms. Defaults to false. If you set it True, you'll have to
|
||||
# rebuild the user_directory search indexes, see
|
||||
# https://github.com/matrix-org/synapse/blob/master/docs/user_directory.md
|
||||
#
|
||||
#user_directory:
|
||||
# enabled: true
|
||||
|
|
|
@ -7,11 +7,7 @@ who are present in a publicly viewable room present on the server.
|
|||
|
||||
The directory info is stored in various tables, which can (typically after
|
||||
DB corruption) get stale or out of sync. If this happens, for now the
|
||||
quickest solution to fix it is:
|
||||
|
||||
```
|
||||
UPDATE user_directory_stream_pos SET stream_id = NULL;
|
||||
```
|
||||
|
||||
and restart the synapse, which should then start a background task to
|
||||
solution to fix it is to execute the SQL here
|
||||
https://github.com/matrix-org/synapse/blob/master/synapse/storage/schema/delta/53/user_dir_populate.sql
|
||||
and then restart synapse. This should then start a background task to
|
||||
flush the current tables and regenerate the directory.
|
||||
|
|
|
@ -26,6 +26,7 @@ CLIENT_API_PREFIX = "/_matrix/client"
|
|||
FEDERATION_PREFIX = "/_matrix/federation"
|
||||
FEDERATION_V1_PREFIX = FEDERATION_PREFIX + "/v1"
|
||||
FEDERATION_V2_PREFIX = FEDERATION_PREFIX + "/v2"
|
||||
FEDERATION_UNSTABLE_PREFIX = FEDERATION_PREFIX + "/unstable"
|
||||
STATIC_PREFIX = "/_matrix/static"
|
||||
WEB_CLIENT_PREFIX = "/_matrix/client"
|
||||
CONTENT_REPO_PREFIX = "/_matrix/content"
|
||||
|
|
|
@ -344,15 +344,21 @@ class _LimitedHostnameResolver(object):
|
|||
|
||||
def resolveHostName(self, resolutionReceiver, hostName, portNumber=0,
|
||||
addressTypes=None, transportSemantics='TCP'):
|
||||
# Note this is happening deep within the reactor, so we don't need to
|
||||
# worry about log contexts.
|
||||
|
||||
# We need this function to return `resolutionReceiver` so we do all the
|
||||
# actual logic involving deferreds in a separate function.
|
||||
self._resolve(
|
||||
resolutionReceiver, hostName, portNumber,
|
||||
addressTypes, transportSemantics,
|
||||
)
|
||||
|
||||
# even though this is happening within the depths of twisted, we need to drop
|
||||
# our logcontext before starting _resolve, otherwise: (a) _resolve will drop
|
||||
# the logcontext if it returns an incomplete deferred; (b) _resolve will
|
||||
# call the resolutionReceiver *with* a logcontext, which it won't be expecting.
|
||||
with PreserveLoggingContext():
|
||||
self._resolve(
|
||||
resolutionReceiver,
|
||||
hostName,
|
||||
portNumber,
|
||||
addressTypes,
|
||||
transportSemantics,
|
||||
)
|
||||
|
||||
return resolutionReceiver
|
||||
|
||||
|
|
|
@ -414,6 +414,7 @@ class ServerConfig(Config):
|
|||
#
|
||||
# For example, for room version 1, default_room_version should be set
|
||||
# to "1".
|
||||
#
|
||||
#default_room_version: "%(default_room_version)s"
|
||||
|
||||
# The GC threshold parameters to pass to `gc.set_threshold`, if defined
|
||||
|
|
|
@ -43,9 +43,9 @@ class UserDirectoryConfig(Config):
|
|||
#
|
||||
# 'search_all_users' defines whether to search all users visible to your HS
|
||||
# when searching the user directory, rather than limiting to users visible
|
||||
# in public rooms. Defaults to false. If you set it True, you'll have to run
|
||||
# UPDATE user_directory_stream_pos SET stream_id = NULL;
|
||||
# on your database to tell it to rebuild the user_directory search indexes.
|
||||
# in public rooms. Defaults to false. If you set it True, you'll have to
|
||||
# rebuild the user_directory search indexes, see
|
||||
# https://github.com/matrix-org/synapse/blob/master/docs/user_directory.md
|
||||
#
|
||||
#user_directory:
|
||||
# enabled: true
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
import logging
|
||||
from collections import namedtuple
|
||||
|
||||
import six
|
||||
from six import raise_from
|
||||
from six.moves import urllib
|
||||
|
||||
|
@ -180,9 +181,7 @@ class Keyring(object):
|
|||
|
||||
# We want to wait for any previous lookups to complete before
|
||||
# proceeding.
|
||||
yield self.wait_for_previous_lookups(
|
||||
[rq.server_name for rq in verify_requests], server_to_deferred
|
||||
)
|
||||
yield self.wait_for_previous_lookups(server_to_deferred)
|
||||
|
||||
# Actually start fetching keys.
|
||||
self._get_server_verify_keys(verify_requests)
|
||||
|
@ -215,12 +214,11 @@ class Keyring(object):
|
|||
logger.exception("Error starting key lookups")
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def wait_for_previous_lookups(self, server_names, server_to_deferred):
|
||||
def wait_for_previous_lookups(self, server_to_deferred):
|
||||
"""Waits for any previous key lookups for the given servers to finish.
|
||||
|
||||
Args:
|
||||
server_names (list): list of server_names we want to lookup
|
||||
server_to_deferred (dict): server_name to deferred which gets
|
||||
server_to_deferred (dict[str, Deferred]): server_name to deferred which gets
|
||||
resolved once we've finished looking up keys for that server.
|
||||
The Deferreds should be regular twisted ones which call their
|
||||
callbacks with no logcontext.
|
||||
|
@ -233,7 +231,7 @@ class Keyring(object):
|
|||
while True:
|
||||
wait_on = [
|
||||
(server_name, self.key_downloads[server_name])
|
||||
for server_name in server_names
|
||||
for server_name in server_to_deferred.keys()
|
||||
if server_name in self.key_downloads
|
||||
]
|
||||
if not wait_on:
|
||||
|
@ -349,6 +347,7 @@ class KeyFetcher(object):
|
|||
Args:
|
||||
server_name_and_key_ids (iterable[Tuple[str, iterable[str]]]):
|
||||
list of (server_name, iterable[key_id]) tuples to fetch keys for
|
||||
Note that the iterables may be iterated more than once.
|
||||
|
||||
Returns:
|
||||
Deferred[dict[str, dict[str, synapse.storage.keys.FetchKeyResult|None]]]:
|
||||
|
@ -394,8 +393,7 @@ class BaseV2KeyFetcher(object):
|
|||
POST /_matrix/key/v2/query.
|
||||
|
||||
Checks that each signature in the response that claims to come from the origin
|
||||
server is valid. (Does not check that there actually is such a signature, for
|
||||
some reason.)
|
||||
server is valid, and that there is at least one such signature.
|
||||
|
||||
Stores the json in server_keys_json so that it can be used for future responses
|
||||
to /_matrix/key/v2/query.
|
||||
|
@ -430,16 +428,25 @@ class BaseV2KeyFetcher(object):
|
|||
verify_key=verify_key, valid_until_ts=ts_valid_until_ms
|
||||
)
|
||||
|
||||
# TODO: improve this signature checking
|
||||
server_name = response_json["server_name"]
|
||||
verified = False
|
||||
for key_id in response_json["signatures"].get(server_name, {}):
|
||||
if key_id not in verify_keys:
|
||||
# each of the keys used for the signature must be present in the response
|
||||
# json.
|
||||
key = verify_keys.get(key_id)
|
||||
if not key:
|
||||
raise KeyLookupError(
|
||||
"Key response must include verification keys for all signatures"
|
||||
"Key response is signed by key id %s:%s but that key is not "
|
||||
"present in the response" % (server_name, key_id)
|
||||
)
|
||||
|
||||
verify_signed_json(
|
||||
response_json, server_name, verify_keys[key_id].verify_key
|
||||
verify_signed_json(response_json, server_name, key.verify_key)
|
||||
verified = True
|
||||
|
||||
if not verified:
|
||||
raise KeyLookupError(
|
||||
"Key response for %s is not signed by the origin server"
|
||||
% (server_name,)
|
||||
)
|
||||
|
||||
for key_id, key_data in response_json["old_verify_keys"].items():
|
||||
|
@ -549,7 +556,16 @@ class PerspectivesKeyFetcher(BaseV2KeyFetcher):
|
|||
Returns:
|
||||
Deferred[dict[str, dict[str, synapse.storage.keys.FetchKeyResult]]]: map
|
||||
from server_name -> key_id -> FetchKeyResult
|
||||
|
||||
Raises:
|
||||
KeyLookupError if there was an error processing the entire response from
|
||||
the server
|
||||
"""
|
||||
logger.info(
|
||||
"Requesting keys %s from notary server %s",
|
||||
server_names_and_key_ids,
|
||||
perspective_name,
|
||||
)
|
||||
# TODO(mark): Set the minimum_valid_until_ts to that needed by
|
||||
# the events being validated or the current time if validating
|
||||
# an incoming request.
|
||||
|
@ -578,40 +594,31 @@ class PerspectivesKeyFetcher(BaseV2KeyFetcher):
|
|||
time_now_ms = self.clock.time_msec()
|
||||
|
||||
for response in query_response["server_keys"]:
|
||||
if (
|
||||
u"signatures" not in response
|
||||
or perspective_name not in response[u"signatures"]
|
||||
):
|
||||
# do this first, so that we can give useful errors thereafter
|
||||
server_name = response.get("server_name")
|
||||
if not isinstance(server_name, six.string_types):
|
||||
raise KeyLookupError(
|
||||
"Key response not signed by perspective server"
|
||||
" %r" % (perspective_name,)
|
||||
"Malformed response from key notary server %s: invalid server_name"
|
||||
% (perspective_name,)
|
||||
)
|
||||
|
||||
verified = False
|
||||
for key_id in response[u"signatures"][perspective_name]:
|
||||
if key_id in perspective_keys:
|
||||
verify_signed_json(
|
||||
response, perspective_name, perspective_keys[key_id]
|
||||
)
|
||||
verified = True
|
||||
|
||||
if not verified:
|
||||
logging.info(
|
||||
"Response from perspective server %r not signed with a"
|
||||
" known key, signed with: %r, known keys: %r",
|
||||
try:
|
||||
processed_response = yield self._process_perspectives_response(
|
||||
perspective_name,
|
||||
list(response[u"signatures"][perspective_name]),
|
||||
list(perspective_keys),
|
||||
perspective_keys,
|
||||
response,
|
||||
time_added_ms=time_now_ms,
|
||||
)
|
||||
raise KeyLookupError(
|
||||
"Response not signed with a known key for perspective"
|
||||
" server %r" % (perspective_name,)
|
||||
except KeyLookupError as e:
|
||||
logger.warning(
|
||||
"Error processing response from key notary server %s for origin "
|
||||
"server %s: %s",
|
||||
perspective_name,
|
||||
server_name,
|
||||
e,
|
||||
)
|
||||
|
||||
processed_response = yield self.process_v2_response(
|
||||
perspective_name, response, time_added_ms=time_now_ms
|
||||
)
|
||||
server_name = response["server_name"]
|
||||
# we continue to process the rest of the response
|
||||
continue
|
||||
|
||||
added_keys.extend(
|
||||
(server_name, key_id, key) for key_id, key in processed_response.items()
|
||||
|
@ -624,6 +631,53 @@ class PerspectivesKeyFetcher(BaseV2KeyFetcher):
|
|||
|
||||
defer.returnValue(keys)
|
||||
|
||||
def _process_perspectives_response(
|
||||
self, perspective_name, perspective_keys, response, time_added_ms
|
||||
):
|
||||
"""Parse a 'Server Keys' structure from the result of a /key/query request
|
||||
|
||||
Checks that the entry is correctly signed by the perspectives server, and then
|
||||
passes over to process_v2_response
|
||||
|
||||
Args:
|
||||
perspective_name (str): the name of the notary server that produced this
|
||||
result
|
||||
|
||||
perspective_keys (dict[str, VerifyKey]): map of key_id->key for the
|
||||
notary server
|
||||
|
||||
response (dict): the json-decoded Server Keys response object
|
||||
|
||||
time_added_ms (int): the timestamp to record in server_keys_json
|
||||
|
||||
Returns:
|
||||
Deferred[dict[str, FetchKeyResult]]: map from key_id to result object
|
||||
"""
|
||||
if (
|
||||
u"signatures" not in response
|
||||
or perspective_name not in response[u"signatures"]
|
||||
):
|
||||
raise KeyLookupError("Response not signed by the notary server")
|
||||
|
||||
verified = False
|
||||
for key_id in response[u"signatures"][perspective_name]:
|
||||
if key_id in perspective_keys:
|
||||
verify_signed_json(response, perspective_name, perspective_keys[key_id])
|
||||
verified = True
|
||||
|
||||
if not verified:
|
||||
raise KeyLookupError(
|
||||
"Response not signed with a known key: signed with: %r, known keys: %r"
|
||||
% (
|
||||
list(response[u"signatures"][perspective_name].keys()),
|
||||
list(perspective_keys.keys()),
|
||||
)
|
||||
)
|
||||
|
||||
return self.process_v2_response(
|
||||
perspective_name, response, time_added_ms=time_added_ms
|
||||
)
|
||||
|
||||
|
||||
class ServerKeyFetcher(BaseV2KeyFetcher):
|
||||
"""KeyFetcher impl which fetches keys from the origin servers"""
|
||||
|
@ -677,12 +731,6 @@ class ServerKeyFetcher(BaseV2KeyFetcher):
|
|||
except HttpResponseException as e:
|
||||
raise_from(KeyLookupError("Remote server returned an error"), e)
|
||||
|
||||
if (
|
||||
u"signatures" not in response
|
||||
or server_name not in response[u"signatures"]
|
||||
):
|
||||
raise KeyLookupError("Key response not signed by remote server")
|
||||
|
||||
if response["server_name"] != server_name:
|
||||
raise KeyLookupError(
|
||||
"Expected a response for server %r not %r"
|
||||
|
|
|
@ -76,6 +76,7 @@ class EventBuilder(object):
|
|||
# someone tries to get them when they don't exist.
|
||||
_state_key = attr.ib(default=None)
|
||||
_redacts = attr.ib(default=None)
|
||||
_origin_server_ts = attr.ib(default=None)
|
||||
|
||||
internal_metadata = attr.ib(default=attr.Factory(lambda: _EventInternalMetadata({})))
|
||||
|
||||
|
@ -142,6 +143,9 @@ class EventBuilder(object):
|
|||
if self._redacts is not None:
|
||||
event_dict["redacts"] = self._redacts
|
||||
|
||||
if self._origin_server_ts is not None:
|
||||
event_dict["origin_server_ts"] = self._origin_server_ts
|
||||
|
||||
defer.returnValue(
|
||||
create_local_event_from_event_dict(
|
||||
clock=self._clock,
|
||||
|
@ -209,6 +213,7 @@ class EventBuilderFactory(object):
|
|||
content=key_values.get("content", {}),
|
||||
unsigned=key_values.get("unsigned", {}),
|
||||
redacts=key_values.get("redacts", None),
|
||||
origin_server_ts=key_values.get("origin_server_ts", None),
|
||||
)
|
||||
|
||||
|
||||
|
@ -245,7 +250,7 @@ def create_local_event_from_event_dict(clock, hostname, signing_key,
|
|||
event_dict["event_id"] = _create_event_id(clock, hostname)
|
||||
|
||||
event_dict["origin"] = hostname
|
||||
event_dict["origin_server_ts"] = time_now
|
||||
event_dict.setdefault("origin_server_ts", time_now)
|
||||
|
||||
event_dict.setdefault("unsigned", {})
|
||||
age = event_dict["unsigned"].pop("age", 0)
|
||||
|
|
|
@ -23,7 +23,11 @@ from twisted.internet import defer
|
|||
import synapse
|
||||
from synapse.api.errors import Codes, FederationDeniedError, SynapseError
|
||||
from synapse.api.room_versions import RoomVersions
|
||||
from synapse.api.urls import FEDERATION_V1_PREFIX, FEDERATION_V2_PREFIX
|
||||
from synapse.api.urls import (
|
||||
FEDERATION_UNSTABLE_PREFIX,
|
||||
FEDERATION_V1_PREFIX,
|
||||
FEDERATION_V2_PREFIX,
|
||||
)
|
||||
from synapse.http.endpoint import parse_and_validate_server_name
|
||||
from synapse.http.server import JsonResource
|
||||
from synapse.http.servlet import (
|
||||
|
@ -1304,6 +1308,30 @@ class FederationGroupsSettingJoinPolicyServlet(BaseFederationServlet):
|
|||
defer.returnValue((200, new_content))
|
||||
|
||||
|
||||
class RoomComplexityServlet(BaseFederationServlet):
|
||||
"""
|
||||
Indicates to other servers how complex (and therefore likely
|
||||
resource-intensive) a public room this server knows about is.
|
||||
"""
|
||||
PATH = "/rooms/(?P<room_id>[^/]*)/complexity"
|
||||
PREFIX = FEDERATION_UNSTABLE_PREFIX
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_GET(self, origin, content, query, room_id):
|
||||
|
||||
store = self.handler.hs.get_datastore()
|
||||
|
||||
is_public = yield store.is_room_world_readable_or_publicly_joinable(
|
||||
room_id
|
||||
)
|
||||
|
||||
if not is_public:
|
||||
raise SynapseError(404, "Room not found", errcode=Codes.INVALID_PARAM)
|
||||
|
||||
complexity = yield store.get_room_complexity(room_id)
|
||||
defer.returnValue((200, complexity))
|
||||
|
||||
|
||||
FEDERATION_SERVLET_CLASSES = (
|
||||
FederationSendServlet,
|
||||
FederationEventServlet,
|
||||
|
@ -1327,6 +1355,7 @@ FEDERATION_SERVLET_CLASSES = (
|
|||
FederationThirdPartyInviteExchangeServlet,
|
||||
On3pidBindServlet,
|
||||
FederationVersionServlet,
|
||||
RoomComplexityServlet,
|
||||
)
|
||||
|
||||
OPENID_SERVLET_CLASSES = (
|
||||
|
|
|
@ -182,17 +182,27 @@ class PresenceHandler(object):
|
|||
# Start a LoopingCall in 30s that fires every 5s.
|
||||
# The initial delay is to allow disconnected clients a chance to
|
||||
# reconnect before we treat them as offline.
|
||||
def run_timeout_handler():
|
||||
return run_as_background_process(
|
||||
"handle_presence_timeouts", self._handle_timeouts
|
||||
)
|
||||
|
||||
self.clock.call_later(
|
||||
30,
|
||||
self.clock.looping_call,
|
||||
self._handle_timeouts,
|
||||
run_timeout_handler,
|
||||
5000,
|
||||
)
|
||||
|
||||
def run_persister():
|
||||
return run_as_background_process(
|
||||
"persist_presence_changes", self._persist_unpersisted_changes
|
||||
)
|
||||
|
||||
self.clock.call_later(
|
||||
60,
|
||||
self.clock.looping_call,
|
||||
self._persist_unpersisted_changes,
|
||||
run_persister,
|
||||
60 * 1000,
|
||||
)
|
||||
|
||||
|
@ -229,6 +239,7 @@ class PresenceHandler(object):
|
|||
)
|
||||
|
||||
if self.unpersisted_users_changes:
|
||||
|
||||
yield self.store.update_presence([
|
||||
self.user_to_current_state[user_id]
|
||||
for user_id in self.unpersisted_users_changes
|
||||
|
@ -240,30 +251,18 @@ class PresenceHandler(object):
|
|||
"""We periodically persist the unpersisted changes, as otherwise they
|
||||
may stack up and slow down shutdown times.
|
||||
"""
|
||||
logger.info(
|
||||
"Performing _persist_unpersisted_changes. Persisting %d unpersisted changes",
|
||||
len(self.unpersisted_users_changes)
|
||||
)
|
||||
|
||||
unpersisted = self.unpersisted_users_changes
|
||||
self.unpersisted_users_changes = set()
|
||||
|
||||
if unpersisted:
|
||||
logger.info(
|
||||
"Persisting %d upersisted presence updates", len(unpersisted)
|
||||
)
|
||||
yield self.store.update_presence([
|
||||
self.user_to_current_state[user_id]
|
||||
for user_id in unpersisted
|
||||
])
|
||||
|
||||
logger.info("Finished _persist_unpersisted_changes")
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _update_states_and_catch_exception(self, new_states):
|
||||
try:
|
||||
res = yield self._update_states(new_states)
|
||||
defer.returnValue(res)
|
||||
except Exception:
|
||||
logger.exception("Error updating presence")
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _update_states(self, new_states):
|
||||
"""Updates presence of users. Sets the appropriate timeouts. Pokes
|
||||
|
@ -338,45 +337,41 @@ class PresenceHandler(object):
|
|||
logger.info("Handling presence timeouts")
|
||||
now = self.clock.time_msec()
|
||||
|
||||
try:
|
||||
with Measure(self.clock, "presence_handle_timeouts"):
|
||||
# Fetch the list of users that *may* have timed out. Things may have
|
||||
# changed since the timeout was set, so we won't necessarily have to
|
||||
# take any action.
|
||||
users_to_check = set(self.wheel_timer.fetch(now))
|
||||
# Fetch the list of users that *may* have timed out. Things may have
|
||||
# changed since the timeout was set, so we won't necessarily have to
|
||||
# take any action.
|
||||
users_to_check = set(self.wheel_timer.fetch(now))
|
||||
|
||||
# Check whether the lists of syncing processes from an external
|
||||
# process have expired.
|
||||
expired_process_ids = [
|
||||
process_id for process_id, last_update
|
||||
in self.external_process_last_updated_ms.items()
|
||||
if now - last_update > EXTERNAL_PROCESS_EXPIRY
|
||||
]
|
||||
for process_id in expired_process_ids:
|
||||
users_to_check.update(
|
||||
self.external_process_last_updated_ms.pop(process_id, ())
|
||||
)
|
||||
self.external_process_last_update.pop(process_id)
|
||||
# Check whether the lists of syncing processes from an external
|
||||
# process have expired.
|
||||
expired_process_ids = [
|
||||
process_id for process_id, last_update
|
||||
in self.external_process_last_updated_ms.items()
|
||||
if now - last_update > EXTERNAL_PROCESS_EXPIRY
|
||||
]
|
||||
for process_id in expired_process_ids:
|
||||
users_to_check.update(
|
||||
self.external_process_last_updated_ms.pop(process_id, ())
|
||||
)
|
||||
self.external_process_last_update.pop(process_id)
|
||||
|
||||
states = [
|
||||
self.user_to_current_state.get(
|
||||
user_id, UserPresenceState.default(user_id)
|
||||
)
|
||||
for user_id in users_to_check
|
||||
]
|
||||
states = [
|
||||
self.user_to_current_state.get(
|
||||
user_id, UserPresenceState.default(user_id)
|
||||
)
|
||||
for user_id in users_to_check
|
||||
]
|
||||
|
||||
timers_fired_counter.inc(len(states))
|
||||
timers_fired_counter.inc(len(states))
|
||||
|
||||
changes = handle_timeouts(
|
||||
states,
|
||||
is_mine_fn=self.is_mine_id,
|
||||
syncing_user_ids=self.get_currently_syncing_users(),
|
||||
now=now,
|
||||
)
|
||||
changes = handle_timeouts(
|
||||
states,
|
||||
is_mine_fn=self.is_mine_id,
|
||||
syncing_user_ids=self.get_currently_syncing_users(),
|
||||
now=now,
|
||||
)
|
||||
|
||||
run_in_background(self._update_states_and_catch_exception, changes)
|
||||
except Exception:
|
||||
logger.exception("Exception in _handle_timeouts loop")
|
||||
return self._update_states(changes)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def bump_presence_active_time(self, user):
|
||||
|
|
|
@ -711,10 +711,6 @@ class MatrixFederationHttpClient(object):
|
|||
RequestSendFailed: If there were problems connecting to the
|
||||
remote, due to e.g. DNS failures, connection timeouts etc.
|
||||
"""
|
||||
logger.debug("get_json args: %s", args)
|
||||
|
||||
logger.debug("Query bytes: %s Retry DNS: %s", args, retry_on_dns_fail)
|
||||
|
||||
request = MatrixFederationRequest(
|
||||
method="GET",
|
||||
destination=destination,
|
||||
|
|
|
@ -55,7 +55,7 @@ def parse_integer_from_args(args, name, default=None, required=False):
|
|||
return int(args[name][0])
|
||||
except Exception:
|
||||
message = "Query parameter %r must be an integer" % (name,)
|
||||
raise SynapseError(400, message)
|
||||
raise SynapseError(400, message, errcode=Codes.INVALID_PARAM)
|
||||
else:
|
||||
if required:
|
||||
message = "Missing integer query parameter %r" % (name,)
|
||||
|
|
|
@ -822,10 +822,16 @@ class AdminRestResource(JsonResource):
|
|||
|
||||
def __init__(self, hs):
|
||||
JsonResource.__init__(self, hs, canonical_json=False)
|
||||
register_servlets(hs, self)
|
||||
|
||||
register_servlets_for_client_rest_resource(hs, self)
|
||||
SendServerNoticeServlet(hs).register(self)
|
||||
VersionServlet(hs).register(self)
|
||||
|
||||
def register_servlets(hs, http_server):
|
||||
"""
|
||||
Register all the admin servlets.
|
||||
"""
|
||||
register_servlets_for_client_rest_resource(hs, http_server)
|
||||
SendServerNoticeServlet(hs).register(http_server)
|
||||
VersionServlet(hs).register(http_server)
|
||||
|
||||
|
||||
def register_servlets_for_client_rest_resource(hs, http_server):
|
||||
|
|
|
@ -386,7 +386,7 @@ class CasRedirectServlet(RestServlet):
|
|||
b"redirectUrl": args[b"redirectUrl"][0]
|
||||
}).encode('ascii')
|
||||
hs_redirect_url = (self.cas_service_url +
|
||||
b"/_matrix/client/api/v1/login/cas/ticket")
|
||||
b"/_matrix/client/r0/login/cas/ticket")
|
||||
service_param = urllib.parse.urlencode({
|
||||
b"service": b"%s?%s" % (hs_redirect_url, client_redirect_url_param)
|
||||
}).encode('ascii')
|
||||
|
@ -395,7 +395,7 @@ class CasRedirectServlet(RestServlet):
|
|||
|
||||
|
||||
class CasTicketServlet(ClientV1RestServlet):
|
||||
PATTERNS = client_path_patterns("/login/cas/ticket", releases=())
|
||||
PATTERNS = client_path_patterns("/login/cas/ticket")
|
||||
|
||||
def __init__(self, hs):
|
||||
super(CasTicketServlet, self).__init__(hs)
|
||||
|
|
|
@ -17,8 +17,6 @@ import logging
|
|||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.errors import AuthError
|
||||
|
||||
from .base import ClientV1RestServlet, client_path_patterns
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
@ -38,23 +36,16 @@ class LogoutRestServlet(ClientV1RestServlet):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def on_POST(self, request):
|
||||
try:
|
||||
requester = yield self.auth.get_user_by_req(request)
|
||||
except AuthError:
|
||||
# this implies the access token has already been deleted.
|
||||
defer.returnValue((401, {
|
||||
"errcode": "M_UNKNOWN_TOKEN",
|
||||
"error": "Access Token unknown or expired"
|
||||
}))
|
||||
requester = yield self.auth.get_user_by_req(request)
|
||||
|
||||
if requester.device_id is None:
|
||||
# the acccess token wasn't associated with a device.
|
||||
# Just delete the access token
|
||||
access_token = self._auth.get_access_token_from_request(request)
|
||||
yield self._auth_handler.delete_access_token(access_token)
|
||||
else:
|
||||
if requester.device_id is None:
|
||||
# the acccess token wasn't associated with a device.
|
||||
# Just delete the access token
|
||||
access_token = self._auth.get_access_token_from_request(request)
|
||||
yield self._auth_handler.delete_access_token(access_token)
|
||||
else:
|
||||
yield self._device_handler.delete_device(
|
||||
requester.user.to_string(), requester.device_id)
|
||||
yield self._device_handler.delete_device(
|
||||
requester.user.to_string(), requester.device_id)
|
||||
|
||||
defer.returnValue((200, {}))
|
||||
|
||||
|
|
|
@ -56,8 +56,8 @@ class ThumbnailResource(Resource):
|
|||
def _async_render_GET(self, request):
|
||||
set_cors_headers(request)
|
||||
server_name, media_id, _ = parse_media_id(request)
|
||||
width = parse_integer(request, "width")
|
||||
height = parse_integer(request, "height")
|
||||
width = parse_integer(request, "width", required=True)
|
||||
height = parse_integer(request, "height", required=True)
|
||||
method = parse_string(request, "method", "scale")
|
||||
m_type = parse_string(request, "type", "image/png")
|
||||
|
||||
|
|
|
@ -36,6 +36,7 @@ from .engines import PostgresEngine
|
|||
from .event_federation import EventFederationStore
|
||||
from .event_push_actions import EventPushActionsStore
|
||||
from .events import EventsStore
|
||||
from .events_bg_updates import EventsBackgroundUpdatesStore
|
||||
from .filtering import FilteringStore
|
||||
from .group_server import GroupServerStore
|
||||
from .keys import KeyStore
|
||||
|
@ -66,6 +67,7 @@ logger = logging.getLogger(__name__)
|
|||
|
||||
|
||||
class DataStore(
|
||||
EventsBackgroundUpdatesStore,
|
||||
RoomMemberStore,
|
||||
RoomStore,
|
||||
RegistrationStore,
|
||||
|
|
|
@ -1261,7 +1261,8 @@ class SQLBaseStore(object):
|
|||
" AND ".join("%s = ?" % (k,) for k in keyvalues),
|
||||
)
|
||||
|
||||
return txn.execute(sql, list(keyvalues.values()))
|
||||
txn.execute(sql, list(keyvalues.values()))
|
||||
return txn.rowcount
|
||||
|
||||
def _simple_delete_many(self, table, column, iterable, keyvalues, desc):
|
||||
return self.runInteraction(
|
||||
|
@ -1280,9 +1281,12 @@ class SQLBaseStore(object):
|
|||
column : column name to test for inclusion against `iterable`
|
||||
iterable : list
|
||||
keyvalues : dict of column names and values to select the rows with
|
||||
|
||||
Returns:
|
||||
int: Number rows deleted
|
||||
"""
|
||||
if not iterable:
|
||||
return
|
||||
return 0
|
||||
|
||||
sql = "DELETE FROM %s" % table
|
||||
|
||||
|
@ -1297,7 +1301,9 @@ class SQLBaseStore(object):
|
|||
|
||||
if clauses:
|
||||
sql = "%s WHERE %s" % (sql, " AND ".join(clauses))
|
||||
return txn.execute(sql, values)
|
||||
txn.execute(sql, values)
|
||||
|
||||
return txn.rowcount
|
||||
|
||||
def _get_cache_dict(
|
||||
self, db_conn, table, entity_column, stream_column, max_value, limit=100000
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2014-2016 OpenMarket Ltd
|
||||
# Copyright 2018 New Vector Ltd
|
||||
# Copyright 2018-2019 New Vector Ltd
|
||||
# Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
|
@ -219,41 +220,11 @@ class EventsStore(
|
|||
EventsWorkerStore,
|
||||
BackgroundUpdateStore,
|
||||
):
|
||||
EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
|
||||
EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
|
||||
|
||||
def __init__(self, db_conn, hs):
|
||||
super(EventsStore, self).__init__(db_conn, hs)
|
||||
self.register_background_update_handler(
|
||||
self.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts
|
||||
)
|
||||
self.register_background_update_handler(
|
||||
self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME,
|
||||
self._background_reindex_fields_sender,
|
||||
)
|
||||
|
||||
self.register_background_index_update(
|
||||
"event_contains_url_index",
|
||||
index_name="event_contains_url_index",
|
||||
table="events",
|
||||
columns=["room_id", "topological_ordering", "stream_ordering"],
|
||||
where_clause="contains_url = true AND outlier = false",
|
||||
)
|
||||
|
||||
# an event_id index on event_search is useful for the purge_history
|
||||
# api. Plus it means we get to enforce some integrity with a UNIQUE
|
||||
# clause
|
||||
self.register_background_index_update(
|
||||
"event_search_event_id_idx",
|
||||
index_name="event_search_event_id_idx",
|
||||
table="event_search",
|
||||
columns=["event_id"],
|
||||
unique=True,
|
||||
psql_only=True,
|
||||
)
|
||||
|
||||
self._event_persist_queue = _EventPeristenceQueue()
|
||||
|
||||
self._state_resolution_handler = hs.get_state_resolution_handler()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
|
@ -554,10 +525,18 @@ class EventsStore(
|
|||
e_id for event in new_events for e_id in event.prev_event_ids()
|
||||
)
|
||||
|
||||
# Finally, remove any events which are prev_events of any existing events.
|
||||
# Remove any events which are prev_events of any existing events.
|
||||
existing_prevs = yield self._get_events_which_are_prevs(result)
|
||||
result.difference_update(existing_prevs)
|
||||
|
||||
# Finally handle the case where the new events have soft-failed prev
|
||||
# events. If they do we need to remove them and their prev events,
|
||||
# otherwise we end up with dangling extremities.
|
||||
existing_prevs = yield self._get_prevs_before_rejected(
|
||||
e_id for event in new_events for e_id in event.prev_event_ids()
|
||||
)
|
||||
result.difference_update(existing_prevs)
|
||||
|
||||
defer.returnValue(result)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
|
@ -573,7 +552,7 @@ class EventsStore(
|
|||
"""
|
||||
results = []
|
||||
|
||||
def _get_events(txn, batch):
|
||||
def _get_events_which_are_prevs_txn(txn, batch):
|
||||
sql = """
|
||||
SELECT prev_event_id, internal_metadata
|
||||
FROM event_edges
|
||||
|
@ -596,10 +575,78 @@ class EventsStore(
|
|||
)
|
||||
|
||||
for chunk in batch_iter(event_ids, 100):
|
||||
yield self.runInteraction("_get_events_which_are_prevs", _get_events, chunk)
|
||||
yield self.runInteraction(
|
||||
"_get_events_which_are_prevs",
|
||||
_get_events_which_are_prevs_txn,
|
||||
chunk,
|
||||
)
|
||||
|
||||
defer.returnValue(results)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _get_prevs_before_rejected(self, event_ids):
|
||||
"""Get soft-failed ancestors to remove from the extremities.
|
||||
|
||||
Given a set of events, find all those that have been soft-failed or
|
||||
rejected. Returns those soft failed/rejected events and their prev
|
||||
events (whether soft-failed/rejected or not), and recurses up the
|
||||
prev-event graph until it finds no more soft-failed/rejected events.
|
||||
|
||||
This is used to find extremities that are ancestors of new events, but
|
||||
are separated by soft failed events.
|
||||
|
||||
Args:
|
||||
event_ids (Iterable[str]): Events to find prev events for. Note
|
||||
that these must have already been persisted.
|
||||
|
||||
Returns:
|
||||
Deferred[set[str]]
|
||||
"""
|
||||
|
||||
# The set of event_ids to return. This includes all soft-failed events
|
||||
# and their prev events.
|
||||
existing_prevs = set()
|
||||
|
||||
def _get_prevs_before_rejected_txn(txn, batch):
|
||||
to_recursively_check = batch
|
||||
|
||||
while to_recursively_check:
|
||||
sql = """
|
||||
SELECT
|
||||
event_id, prev_event_id, internal_metadata,
|
||||
rejections.event_id IS NOT NULL
|
||||
FROM event_edges
|
||||
INNER JOIN events USING (event_id)
|
||||
LEFT JOIN rejections USING (event_id)
|
||||
LEFT JOIN event_json USING (event_id)
|
||||
WHERE
|
||||
event_id IN (%s)
|
||||
AND NOT events.outlier
|
||||
""" % (
|
||||
",".join("?" for _ in to_recursively_check),
|
||||
)
|
||||
|
||||
txn.execute(sql, to_recursively_check)
|
||||
to_recursively_check = []
|
||||
|
||||
for event_id, prev_event_id, metadata, rejected in txn:
|
||||
if prev_event_id in existing_prevs:
|
||||
continue
|
||||
|
||||
soft_failed = json.loads(metadata).get("soft_failed")
|
||||
if soft_failed or rejected:
|
||||
to_recursively_check.append(prev_event_id)
|
||||
existing_prevs.add(prev_event_id)
|
||||
|
||||
for chunk in batch_iter(event_ids, 100):
|
||||
yield self.runInteraction(
|
||||
"_get_prevs_before_rejected",
|
||||
_get_prevs_before_rejected_txn,
|
||||
chunk,
|
||||
)
|
||||
|
||||
defer.returnValue(existing_prevs)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _get_new_state_after_events(
|
||||
self, room_id, events_context, old_latest_event_ids, new_latest_event_ids
|
||||
|
@ -1503,153 +1550,6 @@ class EventsStore(
|
|||
ret = yield self.runInteraction("count_daily_active_rooms", _count)
|
||||
defer.returnValue(ret)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _background_reindex_fields_sender(self, progress, batch_size):
|
||||
target_min_stream_id = progress["target_min_stream_id_inclusive"]
|
||||
max_stream_id = progress["max_stream_id_exclusive"]
|
||||
rows_inserted = progress.get("rows_inserted", 0)
|
||||
|
||||
INSERT_CLUMP_SIZE = 1000
|
||||
|
||||
def reindex_txn(txn):
|
||||
sql = (
|
||||
"SELECT stream_ordering, event_id, json FROM events"
|
||||
" INNER JOIN event_json USING (event_id)"
|
||||
" WHERE ? <= stream_ordering AND stream_ordering < ?"
|
||||
" ORDER BY stream_ordering DESC"
|
||||
" LIMIT ?"
|
||||
)
|
||||
|
||||
txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
|
||||
|
||||
rows = txn.fetchall()
|
||||
if not rows:
|
||||
return 0
|
||||
|
||||
min_stream_id = rows[-1][0]
|
||||
|
||||
update_rows = []
|
||||
for row in rows:
|
||||
try:
|
||||
event_id = row[1]
|
||||
event_json = json.loads(row[2])
|
||||
sender = event_json["sender"]
|
||||
content = event_json["content"]
|
||||
|
||||
contains_url = "url" in content
|
||||
if contains_url:
|
||||
contains_url &= isinstance(content["url"], text_type)
|
||||
except (KeyError, AttributeError):
|
||||
# If the event is missing a necessary field then
|
||||
# skip over it.
|
||||
continue
|
||||
|
||||
update_rows.append((sender, contains_url, event_id))
|
||||
|
||||
sql = "UPDATE events SET sender = ?, contains_url = ? WHERE event_id = ?"
|
||||
|
||||
for index in range(0, len(update_rows), INSERT_CLUMP_SIZE):
|
||||
clump = update_rows[index : index + INSERT_CLUMP_SIZE]
|
||||
txn.executemany(sql, clump)
|
||||
|
||||
progress = {
|
||||
"target_min_stream_id_inclusive": target_min_stream_id,
|
||||
"max_stream_id_exclusive": min_stream_id,
|
||||
"rows_inserted": rows_inserted + len(rows),
|
||||
}
|
||||
|
||||
self._background_update_progress_txn(
|
||||
txn, self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, progress
|
||||
)
|
||||
|
||||
return len(rows)
|
||||
|
||||
result = yield self.runInteraction(
|
||||
self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, reindex_txn
|
||||
)
|
||||
|
||||
if not result:
|
||||
yield self._end_background_update(self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME)
|
||||
|
||||
defer.returnValue(result)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _background_reindex_origin_server_ts(self, progress, batch_size):
|
||||
target_min_stream_id = progress["target_min_stream_id_inclusive"]
|
||||
max_stream_id = progress["max_stream_id_exclusive"]
|
||||
rows_inserted = progress.get("rows_inserted", 0)
|
||||
|
||||
INSERT_CLUMP_SIZE = 1000
|
||||
|
||||
def reindex_search_txn(txn):
|
||||
sql = (
|
||||
"SELECT stream_ordering, event_id FROM events"
|
||||
" WHERE ? <= stream_ordering AND stream_ordering < ?"
|
||||
" ORDER BY stream_ordering DESC"
|
||||
" LIMIT ?"
|
||||
)
|
||||
|
||||
txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
|
||||
|
||||
rows = txn.fetchall()
|
||||
if not rows:
|
||||
return 0
|
||||
|
||||
min_stream_id = rows[-1][0]
|
||||
event_ids = [row[1] for row in rows]
|
||||
|
||||
rows_to_update = []
|
||||
|
||||
chunks = [event_ids[i : i + 100] for i in range(0, len(event_ids), 100)]
|
||||
for chunk in chunks:
|
||||
ev_rows = self._simple_select_many_txn(
|
||||
txn,
|
||||
table="event_json",
|
||||
column="event_id",
|
||||
iterable=chunk,
|
||||
retcols=["event_id", "json"],
|
||||
keyvalues={},
|
||||
)
|
||||
|
||||
for row in ev_rows:
|
||||
event_id = row["event_id"]
|
||||
event_json = json.loads(row["json"])
|
||||
try:
|
||||
origin_server_ts = event_json["origin_server_ts"]
|
||||
except (KeyError, AttributeError):
|
||||
# If the event is missing a necessary field then
|
||||
# skip over it.
|
||||
continue
|
||||
|
||||
rows_to_update.append((origin_server_ts, event_id))
|
||||
|
||||
sql = "UPDATE events SET origin_server_ts = ? WHERE event_id = ?"
|
||||
|
||||
for index in range(0, len(rows_to_update), INSERT_CLUMP_SIZE):
|
||||
clump = rows_to_update[index : index + INSERT_CLUMP_SIZE]
|
||||
txn.executemany(sql, clump)
|
||||
|
||||
progress = {
|
||||
"target_min_stream_id_inclusive": target_min_stream_id,
|
||||
"max_stream_id_exclusive": min_stream_id,
|
||||
"rows_inserted": rows_inserted + len(rows_to_update),
|
||||
}
|
||||
|
||||
self._background_update_progress_txn(
|
||||
txn, self.EVENT_ORIGIN_SERVER_TS_NAME, progress
|
||||
)
|
||||
|
||||
return len(rows_to_update)
|
||||
|
||||
result = yield self.runInteraction(
|
||||
self.EVENT_ORIGIN_SERVER_TS_NAME, reindex_search_txn
|
||||
)
|
||||
|
||||
if not result:
|
||||
yield self._end_background_update(self.EVENT_ORIGIN_SERVER_TS_NAME)
|
||||
|
||||
defer.returnValue(result)
|
||||
|
||||
def get_current_backfill_token(self):
|
||||
"""The current minimum token that backfilled events have reached"""
|
||||
return -self._backfill_id_gen.get_current_token()
|
||||
|
|
401
synapse/storage/events_bg_updates.py
Normal file
401
synapse/storage/events_bg_updates.py
Normal file
|
@ -0,0 +1,401 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
|
||||
from six import text_type
|
||||
|
||||
from canonicaljson import json
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.storage.background_updates import BackgroundUpdateStore
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
|
||||
|
||||
EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
|
||||
EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
|
||||
DELETE_SOFT_FAILED_EXTREMITIES = "delete_soft_failed_extremities"
|
||||
|
||||
def __init__(self, db_conn, hs):
|
||||
super(EventsBackgroundUpdatesStore, self).__init__(db_conn, hs)
|
||||
|
||||
self.register_background_update_handler(
|
||||
self.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts
|
||||
)
|
||||
self.register_background_update_handler(
|
||||
self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME,
|
||||
self._background_reindex_fields_sender,
|
||||
)
|
||||
|
||||
self.register_background_index_update(
|
||||
"event_contains_url_index",
|
||||
index_name="event_contains_url_index",
|
||||
table="events",
|
||||
columns=["room_id", "topological_ordering", "stream_ordering"],
|
||||
where_clause="contains_url = true AND outlier = false",
|
||||
)
|
||||
|
||||
# an event_id index on event_search is useful for the purge_history
|
||||
# api. Plus it means we get to enforce some integrity with a UNIQUE
|
||||
# clause
|
||||
self.register_background_index_update(
|
||||
"event_search_event_id_idx",
|
||||
index_name="event_search_event_id_idx",
|
||||
table="event_search",
|
||||
columns=["event_id"],
|
||||
unique=True,
|
||||
psql_only=True,
|
||||
)
|
||||
|
||||
self.register_background_update_handler(
|
||||
self.DELETE_SOFT_FAILED_EXTREMITIES,
|
||||
self._cleanup_extremities_bg_update,
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _background_reindex_fields_sender(self, progress, batch_size):
|
||||
target_min_stream_id = progress["target_min_stream_id_inclusive"]
|
||||
max_stream_id = progress["max_stream_id_exclusive"]
|
||||
rows_inserted = progress.get("rows_inserted", 0)
|
||||
|
||||
INSERT_CLUMP_SIZE = 1000
|
||||
|
||||
def reindex_txn(txn):
|
||||
sql = (
|
||||
"SELECT stream_ordering, event_id, json FROM events"
|
||||
" INNER JOIN event_json USING (event_id)"
|
||||
" WHERE ? <= stream_ordering AND stream_ordering < ?"
|
||||
" ORDER BY stream_ordering DESC"
|
||||
" LIMIT ?"
|
||||
)
|
||||
|
||||
txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
|
||||
|
||||
rows = txn.fetchall()
|
||||
if not rows:
|
||||
return 0
|
||||
|
||||
min_stream_id = rows[-1][0]
|
||||
|
||||
update_rows = []
|
||||
for row in rows:
|
||||
try:
|
||||
event_id = row[1]
|
||||
event_json = json.loads(row[2])
|
||||
sender = event_json["sender"]
|
||||
content = event_json["content"]
|
||||
|
||||
contains_url = "url" in content
|
||||
if contains_url:
|
||||
contains_url &= isinstance(content["url"], text_type)
|
||||
except (KeyError, AttributeError):
|
||||
# If the event is missing a necessary field then
|
||||
# skip over it.
|
||||
continue
|
||||
|
||||
update_rows.append((sender, contains_url, event_id))
|
||||
|
||||
sql = "UPDATE events SET sender = ?, contains_url = ? WHERE event_id = ?"
|
||||
|
||||
for index in range(0, len(update_rows), INSERT_CLUMP_SIZE):
|
||||
clump = update_rows[index : index + INSERT_CLUMP_SIZE]
|
||||
txn.executemany(sql, clump)
|
||||
|
||||
progress = {
|
||||
"target_min_stream_id_inclusive": target_min_stream_id,
|
||||
"max_stream_id_exclusive": min_stream_id,
|
||||
"rows_inserted": rows_inserted + len(rows),
|
||||
}
|
||||
|
||||
self._background_update_progress_txn(
|
||||
txn, self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, progress
|
||||
)
|
||||
|
||||
return len(rows)
|
||||
|
||||
result = yield self.runInteraction(
|
||||
self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, reindex_txn
|
||||
)
|
||||
|
||||
if not result:
|
||||
yield self._end_background_update(self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME)
|
||||
|
||||
defer.returnValue(result)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _background_reindex_origin_server_ts(self, progress, batch_size):
|
||||
target_min_stream_id = progress["target_min_stream_id_inclusive"]
|
||||
max_stream_id = progress["max_stream_id_exclusive"]
|
||||
rows_inserted = progress.get("rows_inserted", 0)
|
||||
|
||||
INSERT_CLUMP_SIZE = 1000
|
||||
|
||||
def reindex_search_txn(txn):
|
||||
sql = (
|
||||
"SELECT stream_ordering, event_id FROM events"
|
||||
" WHERE ? <= stream_ordering AND stream_ordering < ?"
|
||||
" ORDER BY stream_ordering DESC"
|
||||
" LIMIT ?"
|
||||
)
|
||||
|
||||
txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
|
||||
|
||||
rows = txn.fetchall()
|
||||
if not rows:
|
||||
return 0
|
||||
|
||||
min_stream_id = rows[-1][0]
|
||||
event_ids = [row[1] for row in rows]
|
||||
|
||||
rows_to_update = []
|
||||
|
||||
chunks = [event_ids[i : i + 100] for i in range(0, len(event_ids), 100)]
|
||||
for chunk in chunks:
|
||||
ev_rows = self._simple_select_many_txn(
|
||||
txn,
|
||||
table="event_json",
|
||||
column="event_id",
|
||||
iterable=chunk,
|
||||
retcols=["event_id", "json"],
|
||||
keyvalues={},
|
||||
)
|
||||
|
||||
for row in ev_rows:
|
||||
event_id = row["event_id"]
|
||||
event_json = json.loads(row["json"])
|
||||
try:
|
||||
origin_server_ts = event_json["origin_server_ts"]
|
||||
except (KeyError, AttributeError):
|
||||
# If the event is missing a necessary field then
|
||||
# skip over it.
|
||||
continue
|
||||
|
||||
rows_to_update.append((origin_server_ts, event_id))
|
||||
|
||||
sql = "UPDATE events SET origin_server_ts = ? WHERE event_id = ?"
|
||||
|
||||
for index in range(0, len(rows_to_update), INSERT_CLUMP_SIZE):
|
||||
clump = rows_to_update[index : index + INSERT_CLUMP_SIZE]
|
||||
txn.executemany(sql, clump)
|
||||
|
||||
progress = {
|
||||
"target_min_stream_id_inclusive": target_min_stream_id,
|
||||
"max_stream_id_exclusive": min_stream_id,
|
||||
"rows_inserted": rows_inserted + len(rows_to_update),
|
||||
}
|
||||
|
||||
self._background_update_progress_txn(
|
||||
txn, self.EVENT_ORIGIN_SERVER_TS_NAME, progress
|
||||
)
|
||||
|
||||
return len(rows_to_update)
|
||||
|
||||
result = yield self.runInteraction(
|
||||
self.EVENT_ORIGIN_SERVER_TS_NAME, reindex_search_txn
|
||||
)
|
||||
|
||||
if not result:
|
||||
yield self._end_background_update(self.EVENT_ORIGIN_SERVER_TS_NAME)
|
||||
|
||||
defer.returnValue(result)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _cleanup_extremities_bg_update(self, progress, batch_size):
|
||||
"""Background update to clean out extremities that should have been
|
||||
deleted previously.
|
||||
|
||||
Mainly used to deal with the aftermath of #5269.
|
||||
"""
|
||||
|
||||
# This works by first copying all existing forward extremities into the
|
||||
# `_extremities_to_check` table at start up, and then checking each
|
||||
# event in that table whether we have any descendants that are not
|
||||
# soft-failed/rejected. If that is the case then we delete that event
|
||||
# from the forward extremities table.
|
||||
#
|
||||
# For efficiency, we do this in batches by recursively pulling out all
|
||||
# descendants of a batch until we find the non soft-failed/rejected
|
||||
# events, i.e. the set of descendants whose chain of prev events back
|
||||
# to the batch of extremities are all soft-failed or rejected.
|
||||
# Typically, we won't find any such events as extremities will rarely
|
||||
# have any descendants, but if they do then we should delete those
|
||||
# extremities.
|
||||
|
||||
def _cleanup_extremities_bg_update_txn(txn):
|
||||
# The set of extremity event IDs that we're checking this round
|
||||
original_set = set()
|
||||
|
||||
# A dict[str, set[str]] of event ID to their prev events.
|
||||
graph = {}
|
||||
|
||||
# The set of descendants of the original set that are not rejected
|
||||
# nor soft-failed. Ancestors of these events should be removed
|
||||
# from the forward extremities table.
|
||||
non_rejected_leaves = set()
|
||||
|
||||
# Set of event IDs that have been soft failed, and for which we
|
||||
# should check if they have descendants which haven't been soft
|
||||
# failed.
|
||||
soft_failed_events_to_lookup = set()
|
||||
|
||||
# First, we get `batch_size` events from the table, pulling out
|
||||
# their successor events, if any, and the successor events'
|
||||
# rejection status.
|
||||
txn.execute(
|
||||
"""SELECT prev_event_id, event_id, internal_metadata,
|
||||
rejections.event_id IS NOT NULL, events.outlier
|
||||
FROM (
|
||||
SELECT event_id AS prev_event_id
|
||||
FROM _extremities_to_check
|
||||
LIMIT ?
|
||||
) AS f
|
||||
LEFT JOIN event_edges USING (prev_event_id)
|
||||
LEFT JOIN events USING (event_id)
|
||||
LEFT JOIN event_json USING (event_id)
|
||||
LEFT JOIN rejections USING (event_id)
|
||||
""", (batch_size,)
|
||||
)
|
||||
|
||||
for prev_event_id, event_id, metadata, rejected, outlier in txn:
|
||||
original_set.add(prev_event_id)
|
||||
|
||||
if not event_id or outlier:
|
||||
# Common case where the forward extremity doesn't have any
|
||||
# descendants.
|
||||
continue
|
||||
|
||||
graph.setdefault(event_id, set()).add(prev_event_id)
|
||||
|
||||
soft_failed = False
|
||||
if metadata:
|
||||
soft_failed = json.loads(metadata).get("soft_failed")
|
||||
|
||||
if soft_failed or rejected:
|
||||
soft_failed_events_to_lookup.add(event_id)
|
||||
else:
|
||||
non_rejected_leaves.add(event_id)
|
||||
|
||||
# Now we recursively check all the soft-failed descendants we
|
||||
# found above in the same way, until we have nothing left to
|
||||
# check.
|
||||
while soft_failed_events_to_lookup:
|
||||
# We only want to do 100 at a time, so we split given list
|
||||
# into two.
|
||||
batch = list(soft_failed_events_to_lookup)
|
||||
to_check, to_defer = batch[:100], batch[100:]
|
||||
soft_failed_events_to_lookup = set(to_defer)
|
||||
|
||||
sql = """SELECT prev_event_id, event_id, internal_metadata,
|
||||
rejections.event_id IS NOT NULL
|
||||
FROM event_edges
|
||||
INNER JOIN events USING (event_id)
|
||||
INNER JOIN event_json USING (event_id)
|
||||
LEFT JOIN rejections USING (event_id)
|
||||
WHERE
|
||||
prev_event_id IN (%s)
|
||||
AND NOT events.outlier
|
||||
""" % (
|
||||
",".join("?" for _ in to_check),
|
||||
)
|
||||
txn.execute(sql, to_check)
|
||||
|
||||
for prev_event_id, event_id, metadata, rejected in txn:
|
||||
if event_id in graph:
|
||||
# Already handled this event previously, but we still
|
||||
# want to record the edge.
|
||||
graph[event_id].add(prev_event_id)
|
||||
continue
|
||||
|
||||
graph[event_id] = {prev_event_id}
|
||||
|
||||
soft_failed = json.loads(metadata).get("soft_failed")
|
||||
if soft_failed or rejected:
|
||||
soft_failed_events_to_lookup.add(event_id)
|
||||
else:
|
||||
non_rejected_leaves.add(event_id)
|
||||
|
||||
# We have a set of non-soft-failed descendants, so we recurse up
|
||||
# the graph to find all ancestors and add them to the set of event
|
||||
# IDs that we can delete from forward extremities table.
|
||||
to_delete = set()
|
||||
while non_rejected_leaves:
|
||||
event_id = non_rejected_leaves.pop()
|
||||
prev_event_ids = graph.get(event_id, set())
|
||||
non_rejected_leaves.update(prev_event_ids)
|
||||
to_delete.update(prev_event_ids)
|
||||
|
||||
to_delete.intersection_update(original_set)
|
||||
|
||||
deleted = self._simple_delete_many_txn(
|
||||
txn=txn,
|
||||
table="event_forward_extremities",
|
||||
column="event_id",
|
||||
iterable=to_delete,
|
||||
keyvalues={},
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"Deleted %d forward extremities of %d checked, to clean up #5269",
|
||||
deleted,
|
||||
len(original_set),
|
||||
)
|
||||
|
||||
if deleted:
|
||||
# We now need to invalidate the caches of these rooms
|
||||
rows = self._simple_select_many_txn(
|
||||
txn,
|
||||
table="events",
|
||||
column="event_id",
|
||||
iterable=to_delete,
|
||||
keyvalues={},
|
||||
retcols=("room_id",)
|
||||
)
|
||||
room_ids = set(row["room_id"] for row in rows)
|
||||
for room_id in room_ids:
|
||||
txn.call_after(
|
||||
self.get_latest_event_ids_in_room.invalidate,
|
||||
(room_id,)
|
||||
)
|
||||
|
||||
self._simple_delete_many_txn(
|
||||
txn=txn,
|
||||
table="_extremities_to_check",
|
||||
column="event_id",
|
||||
iterable=original_set,
|
||||
keyvalues={},
|
||||
)
|
||||
|
||||
return len(original_set)
|
||||
|
||||
num_handled = yield self.runInteraction(
|
||||
"_cleanup_extremities_bg_update", _cleanup_extremities_bg_update_txn,
|
||||
)
|
||||
|
||||
if not num_handled:
|
||||
yield self._end_background_update(self.DELETE_SOFT_FAILED_EXTREMITIES)
|
||||
|
||||
def _drop_table_txn(txn):
|
||||
txn.execute("DROP TABLE _extremities_to_check")
|
||||
|
||||
yield self.runInteraction(
|
||||
"_cleanup_extremities_bg_update_drop_table",
|
||||
_drop_table_txn,
|
||||
)
|
||||
|
||||
defer.returnValue(num_handled)
|
|
@ -13,6 +13,8 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from __future__ import division
|
||||
|
||||
import itertools
|
||||
import logging
|
||||
from collections import namedtuple
|
||||
|
@ -614,7 +616,7 @@ class EventsWorkerStore(SQLBaseStore):
|
|||
|
||||
def _get_total_state_event_counts_txn(self, txn, room_id):
|
||||
"""
|
||||
See get_state_event_counts.
|
||||
See get_total_state_event_counts.
|
||||
"""
|
||||
sql = "SELECT COUNT(*) FROM state_events WHERE room_id=?"
|
||||
txn.execute(sql, (room_id,))
|
||||
|
@ -635,3 +637,49 @@ class EventsWorkerStore(SQLBaseStore):
|
|||
"get_total_state_event_counts",
|
||||
self._get_total_state_event_counts_txn, room_id
|
||||
)
|
||||
|
||||
def _get_current_state_event_counts_txn(self, txn, room_id):
|
||||
"""
|
||||
See get_current_state_event_counts.
|
||||
"""
|
||||
sql = "SELECT COUNT(*) FROM current_state_events WHERE room_id=?"
|
||||
txn.execute(sql, (room_id,))
|
||||
row = txn.fetchone()
|
||||
return row[0] if row else 0
|
||||
|
||||
def get_current_state_event_counts(self, room_id):
|
||||
"""
|
||||
Gets the current number of state events in a room.
|
||||
|
||||
Args:
|
||||
room_id (str)
|
||||
|
||||
Returns:
|
||||
Deferred[int]
|
||||
"""
|
||||
return self.runInteraction(
|
||||
"get_current_state_event_counts",
|
||||
self._get_current_state_event_counts_txn, room_id
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_room_complexity(self, room_id):
|
||||
"""
|
||||
Get a rough approximation of the complexity of the room. This is used by
|
||||
remote servers to decide whether they wish to join the room or not.
|
||||
Higher complexity value indicates that being in the room will consume
|
||||
more resources.
|
||||
|
||||
Args:
|
||||
room_id (str)
|
||||
|
||||
Returns:
|
||||
Deferred[dict[str:int]] of complexity version to complexity.
|
||||
"""
|
||||
state_events = yield self.get_current_state_event_counts(room_id)
|
||||
|
||||
# Call this one "v1", so we can introduce new ones as we want to develop
|
||||
# it.
|
||||
complexity_v1 = round(state_events / 500, 2)
|
||||
|
||||
defer.returnValue({"v1": complexity_v1})
|
||||
|
|
|
@ -13,6 +13,9 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
-- We previously changed the schema for this table without renaming the file, which means
|
||||
-- that some databases might still be using the old schema. This ensures Synapse uses the
|
||||
-- right schema for the table.
|
||||
DROP TABLE IF EXISTS account_validity;
|
||||
|
||||
-- Track what users are in public rooms.
|
|
@ -0,0 +1,22 @@
|
|||
/* Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
-- Start a background job to cleanup extremities that were incorrectly added
|
||||
-- by bug #5269.
|
||||
INSERT INTO background_updates (update_name, progress_json) VALUES
|
||||
('delete_soft_failed_extremities', '{}');
|
||||
|
||||
DROP TABLE IF EXISTS _extremities_to_check; -- To make this delta schema file idempotent.
|
||||
CREATE TABLE _extremities_to_check AS SELECT event_id FROM event_forward_extremities;
|
|
@ -169,7 +169,7 @@ class StatsStore(StateDeltasStore):
|
|||
|
||||
logger.info(
|
||||
"Processing the next %d rooms of %d remaining",
|
||||
(len(rooms_to_work_on), progress["remaining"]),
|
||||
len(rooms_to_work_on), progress["remaining"],
|
||||
)
|
||||
|
||||
# Number of state events we've processed by going through each room
|
||||
|
|
|
@ -226,6 +226,8 @@ class LoggingContext(object):
|
|||
self.request = request
|
||||
|
||||
def __str__(self):
|
||||
if self.request:
|
||||
return str(self.request)
|
||||
return "%s@%x" % (self.name, id(self))
|
||||
|
||||
@classmethod
|
||||
|
@ -274,12 +276,10 @@ class LoggingContext(object):
|
|||
current = self.set_current_context(self.previous_context)
|
||||
if current is not self:
|
||||
if current is self.sentinel:
|
||||
logger.warn("Expected logging context %s has been lost", self)
|
||||
logger.warning("Expected logging context %s was lost", self)
|
||||
else:
|
||||
logger.warn(
|
||||
"Current logging context %s is not expected context %s",
|
||||
current,
|
||||
self
|
||||
logger.warning(
|
||||
"Expected logging context %s but found %s", self, current
|
||||
)
|
||||
self.previous_context = None
|
||||
self.alive = False
|
||||
|
@ -433,10 +433,14 @@ class PreserveLoggingContext(object):
|
|||
context = LoggingContext.set_current_context(self.current_context)
|
||||
|
||||
if context != self.new_context:
|
||||
logger.warn(
|
||||
"Unexpected logging context: %s is not %s",
|
||||
context, self.new_context,
|
||||
)
|
||||
if context is LoggingContext.sentinel:
|
||||
logger.warning("Expected logging context %s was lost", self.new_context)
|
||||
else:
|
||||
logger.warning(
|
||||
"Expected logging context %s but found %s",
|
||||
self.new_context,
|
||||
context,
|
||||
)
|
||||
|
||||
if self.current_context is not LoggingContext.sentinel:
|
||||
if not self.current_context.alive:
|
||||
|
|
|
@ -55,12 +55,12 @@ class MockPerspectiveServer(object):
|
|||
key_id: {"key": signedjson.key.encode_verify_key_base64(verify_key)}
|
||||
},
|
||||
}
|
||||
return self.get_signed_response(res)
|
||||
|
||||
def get_signed_response(self, res):
|
||||
signedjson.sign.sign_json(res, self.server_name, self.key)
|
||||
self.sign_response(res)
|
||||
return res
|
||||
|
||||
def sign_response(self, res):
|
||||
signedjson.sign.sign_json(res, self.server_name, self.key)
|
||||
|
||||
|
||||
class KeyringTestCase(unittest.HomeserverTestCase):
|
||||
def make_homeserver(self, reactor, clock):
|
||||
|
@ -85,7 +85,7 @@ class KeyringTestCase(unittest.HomeserverTestCase):
|
|||
# we run the lookup in a logcontext so that the patched inlineCallbacks can check
|
||||
# it is doing the right thing with logcontexts.
|
||||
wait_1_deferred = run_in_context(
|
||||
kr.wait_for_previous_lookups, ["server1"], {"server1": lookup_1_deferred}
|
||||
kr.wait_for_previous_lookups, {"server1": lookup_1_deferred}
|
||||
)
|
||||
|
||||
# there were no previous lookups, so the deferred should be ready
|
||||
|
@ -94,7 +94,7 @@ class KeyringTestCase(unittest.HomeserverTestCase):
|
|||
# set off another wait. It should block because the first lookup
|
||||
# hasn't yet completed.
|
||||
wait_2_deferred = run_in_context(
|
||||
kr.wait_for_previous_lookups, ["server1"], {"server1": lookup_2_deferred}
|
||||
kr.wait_for_previous_lookups, {"server1": lookup_2_deferred}
|
||||
)
|
||||
|
||||
self.assertFalse(wait_2_deferred.called)
|
||||
|
@ -238,7 +238,7 @@ class ServerKeyFetcherTestCase(unittest.HomeserverTestCase):
|
|||
testkey = signedjson.key.generate_signing_key("ver1")
|
||||
testverifykey = signedjson.key.get_verify_key(testkey)
|
||||
testverifykey_id = "ed25519:ver1"
|
||||
VALID_UNTIL_TS = 1000
|
||||
VALID_UNTIL_TS = 200 * 1000
|
||||
|
||||
# valid response
|
||||
response = {
|
||||
|
@ -326,9 +326,10 @@ class PerspectivesKeyFetcherTestCase(unittest.HomeserverTestCase):
|
|||
},
|
||||
}
|
||||
|
||||
persp_resp = {
|
||||
"server_keys": [self.mock_perspective_server.get_signed_response(response)]
|
||||
}
|
||||
# the response must be signed by both the origin server and the perspectives
|
||||
# server.
|
||||
signedjson.sign.sign_json(response, SERVER_NAME, testkey)
|
||||
self.mock_perspective_server.sign_response(response)
|
||||
|
||||
def post_json(destination, path, data, **kwargs):
|
||||
self.assertEqual(destination, self.mock_perspective_server.server_name)
|
||||
|
@ -337,7 +338,7 @@ class PerspectivesKeyFetcherTestCase(unittest.HomeserverTestCase):
|
|||
# check that the request is for the expected key
|
||||
q = data["server_keys"]
|
||||
self.assertEqual(list(q[SERVER_NAME].keys()), ["key1"])
|
||||
return persp_resp
|
||||
return {"server_keys": [response]}
|
||||
|
||||
self.http_client.post_json.side_effect = post_json
|
||||
|
||||
|
@ -365,9 +366,74 @@ class PerspectivesKeyFetcherTestCase(unittest.HomeserverTestCase):
|
|||
|
||||
self.assertEqual(
|
||||
bytes(res["key_json"]),
|
||||
canonicaljson.encode_canonical_json(persp_resp["server_keys"][0]),
|
||||
canonicaljson.encode_canonical_json(response),
|
||||
)
|
||||
|
||||
def test_invalid_perspectives_responses(self):
|
||||
"""Check that invalid responses from the perspectives server are rejected"""
|
||||
# arbitrarily advance the clock a bit
|
||||
self.reactor.advance(100)
|
||||
|
||||
SERVER_NAME = "server2"
|
||||
testkey = signedjson.key.generate_signing_key("ver1")
|
||||
testverifykey = signedjson.key.get_verify_key(testkey)
|
||||
testverifykey_id = "ed25519:ver1"
|
||||
VALID_UNTIL_TS = 200 * 1000
|
||||
|
||||
def build_response():
|
||||
# valid response
|
||||
response = {
|
||||
"server_name": SERVER_NAME,
|
||||
"old_verify_keys": {},
|
||||
"valid_until_ts": VALID_UNTIL_TS,
|
||||
"verify_keys": {
|
||||
testverifykey_id: {
|
||||
"key": signedjson.key.encode_verify_key_base64(testverifykey)
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
# the response must be signed by both the origin server and the perspectives
|
||||
# server.
|
||||
signedjson.sign.sign_json(response, SERVER_NAME, testkey)
|
||||
self.mock_perspective_server.sign_response(response)
|
||||
return response
|
||||
|
||||
def get_key_from_perspectives(response):
|
||||
fetcher = PerspectivesKeyFetcher(self.hs)
|
||||
server_name_and_key_ids = [(SERVER_NAME, ("key1",))]
|
||||
|
||||
def post_json(destination, path, data, **kwargs):
|
||||
self.assertEqual(destination, self.mock_perspective_server.server_name)
|
||||
self.assertEqual(path, "/_matrix/key/v2/query")
|
||||
return {"server_keys": [response]}
|
||||
|
||||
self.http_client.post_json.side_effect = post_json
|
||||
|
||||
return self.get_success(
|
||||
fetcher.get_keys(server_name_and_key_ids)
|
||||
)
|
||||
|
||||
# start with a valid response so we can check we are testing the right thing
|
||||
response = build_response()
|
||||
keys = get_key_from_perspectives(response)
|
||||
k = keys[SERVER_NAME][testverifykey_id]
|
||||
self.assertEqual(k.verify_key, testverifykey)
|
||||
|
||||
# remove the perspectives server's signature
|
||||
response = build_response()
|
||||
del response["signatures"][self.mock_perspective_server.server_name]
|
||||
self.http_client.post_json.return_value = {"server_keys": [response]}
|
||||
keys = get_key_from_perspectives(response)
|
||||
self.assertEqual(keys, {}, "Expected empty dict with missing persp server sig")
|
||||
|
||||
# remove the origin server's signature
|
||||
response = build_response()
|
||||
del response["signatures"][SERVER_NAME]
|
||||
self.http_client.post_json.return_value = {"server_keys": [response]}
|
||||
keys = get_key_from_perspectives(response)
|
||||
self.assertEqual(keys, {}, "Expected empty dict with missing origin server sig")
|
||||
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def run_in_context(f, *args, **kwargs):
|
||||
|
|
90
tests/federation/test_complexity.py
Normal file
90
tests/federation/test_complexity.py
Normal file
|
@ -0,0 +1,90 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2019 Matrix.org Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.config.ratelimiting import FederationRateLimitConfig
|
||||
from synapse.federation.transport import server
|
||||
from synapse.rest import admin
|
||||
from synapse.rest.client.v1 import login, room
|
||||
from synapse.util.ratelimitutils import FederationRateLimiter
|
||||
|
||||
from tests import unittest
|
||||
|
||||
|
||||
class RoomComplexityTests(unittest.HomeserverTestCase):
|
||||
|
||||
servlets = [
|
||||
admin.register_servlets,
|
||||
room.register_servlets,
|
||||
login.register_servlets,
|
||||
]
|
||||
|
||||
def default_config(self, name='test'):
|
||||
config = super(RoomComplexityTests, self).default_config(name=name)
|
||||
config["limit_large_remote_room_joins"] = True
|
||||
config["limit_large_remote_room_complexity"] = 0.05
|
||||
return config
|
||||
|
||||
def prepare(self, reactor, clock, homeserver):
|
||||
class Authenticator(object):
|
||||
def authenticate_request(self, request, content):
|
||||
return defer.succeed("otherserver.nottld")
|
||||
|
||||
ratelimiter = FederationRateLimiter(
|
||||
clock,
|
||||
FederationRateLimitConfig(
|
||||
window_size=1,
|
||||
sleep_limit=1,
|
||||
sleep_msec=1,
|
||||
reject_limit=1000,
|
||||
concurrent_requests=1000,
|
||||
),
|
||||
)
|
||||
server.register_servlets(
|
||||
homeserver, self.resource, Authenticator(), ratelimiter
|
||||
)
|
||||
|
||||
def test_complexity_simple(self):
|
||||
|
||||
u1 = self.register_user("u1", "pass")
|
||||
u1_token = self.login("u1", "pass")
|
||||
|
||||
room_1 = self.helper.create_room_as(u1, tok=u1_token)
|
||||
self.helper.send_state(
|
||||
room_1, event_type="m.room.topic", body={"topic": "foo"}, tok=u1_token
|
||||
)
|
||||
|
||||
# Get the room complexity
|
||||
request, channel = self.make_request(
|
||||
"GET", "/_matrix/federation/unstable/rooms/%s/complexity" % (room_1,)
|
||||
)
|
||||
self.render(request)
|
||||
self.assertEquals(200, channel.code)
|
||||
complexity = channel.json_body["v1"]
|
||||
self.assertTrue(complexity > 0, complexity)
|
||||
|
||||
# Artificially raise the complexity
|
||||
store = self.hs.get_datastore()
|
||||
store.get_current_state_event_counts = lambda x: defer.succeed(500 * 1.23)
|
||||
|
||||
# Get the room complexity again -- make sure it's our artificial value
|
||||
request, channel = self.make_request(
|
||||
"GET", "/_matrix/federation/unstable/rooms/%s/complexity" % (room_1,)
|
||||
)
|
||||
self.render(request)
|
||||
self.assertEquals(200, channel.code)
|
||||
complexity = channel.json_body["v1"]
|
||||
self.assertEqual(complexity, 1.23)
|
248
tests/storage/test_cleanup_extrems.py
Normal file
248
tests/storage/test_cleanup_extrems.py
Normal file
|
@ -0,0 +1,248 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import os.path
|
||||
|
||||
from synapse.api.constants import EventTypes
|
||||
from synapse.storage import prepare_database
|
||||
from synapse.types import Requester, UserID
|
||||
|
||||
from tests.unittest import HomeserverTestCase
|
||||
|
||||
|
||||
class CleanupExtremBackgroundUpdateStoreTestCase(HomeserverTestCase):
|
||||
"""Test the background update to clean forward extremities table.
|
||||
"""
|
||||
|
||||
def prepare(self, reactor, clock, homeserver):
|
||||
self.store = homeserver.get_datastore()
|
||||
self.event_creator = homeserver.get_event_creation_handler()
|
||||
self.room_creator = homeserver.get_room_creation_handler()
|
||||
|
||||
# Create a test user and room
|
||||
self.user = UserID("alice", "test")
|
||||
self.requester = Requester(self.user, None, False, None, None)
|
||||
info = self.get_success(self.room_creator.create_room(self.requester, {}))
|
||||
self.room_id = info["room_id"]
|
||||
|
||||
def create_and_send_event(self, soft_failed=False, prev_event_ids=None):
|
||||
"""Create and send an event.
|
||||
|
||||
Args:
|
||||
soft_failed (bool): Whether to create a soft failed event or not
|
||||
prev_event_ids (list[str]|None): Explicitly set the prev events,
|
||||
or if None just use the default
|
||||
|
||||
Returns:
|
||||
str: The new event's ID.
|
||||
"""
|
||||
prev_events_and_hashes = None
|
||||
if prev_event_ids:
|
||||
prev_events_and_hashes = [[p, {}, 0] for p in prev_event_ids]
|
||||
|
||||
event, context = self.get_success(
|
||||
self.event_creator.create_event(
|
||||
self.requester,
|
||||
{
|
||||
"type": EventTypes.Message,
|
||||
"room_id": self.room_id,
|
||||
"sender": self.user.to_string(),
|
||||
"content": {"body": "", "msgtype": "m.text"},
|
||||
},
|
||||
prev_events_and_hashes=prev_events_and_hashes,
|
||||
)
|
||||
)
|
||||
|
||||
if soft_failed:
|
||||
event.internal_metadata.soft_failed = True
|
||||
|
||||
self.get_success(
|
||||
self.event_creator.send_nonmember_event(self.requester, event, context)
|
||||
)
|
||||
|
||||
return event.event_id
|
||||
|
||||
def add_extremity(self, event_id):
|
||||
"""Add the given event as an extremity to the room.
|
||||
"""
|
||||
self.get_success(
|
||||
self.store._simple_insert(
|
||||
table="event_forward_extremities",
|
||||
values={"room_id": self.room_id, "event_id": event_id},
|
||||
desc="test_add_extremity",
|
||||
)
|
||||
)
|
||||
|
||||
self.store.get_latest_event_ids_in_room.invalidate((self.room_id,))
|
||||
|
||||
def run_background_update(self):
|
||||
"""Re run the background update to clean up the extremities.
|
||||
"""
|
||||
# Make sure we don't clash with in progress updates.
|
||||
self.assertTrue(self.store._all_done, "Background updates are still ongoing")
|
||||
|
||||
schema_path = os.path.join(
|
||||
prepare_database.dir_path,
|
||||
"schema",
|
||||
"delta",
|
||||
"54",
|
||||
"delete_forward_extremities.sql",
|
||||
)
|
||||
|
||||
def run_delta_file(txn):
|
||||
prepare_database.executescript(txn, schema_path)
|
||||
|
||||
self.get_success(
|
||||
self.store.runInteraction("test_delete_forward_extremities", run_delta_file)
|
||||
)
|
||||
|
||||
# Ugh, have to reset this flag
|
||||
self.store._all_done = False
|
||||
|
||||
while not self.get_success(self.store.has_completed_background_updates()):
|
||||
self.get_success(self.store.do_next_background_update(100), by=0.1)
|
||||
|
||||
def test_soft_failed_extremities_handled_correctly(self):
|
||||
"""Test that extremities are correctly calculated in the presence of
|
||||
soft failed events.
|
||||
|
||||
Tests a graph like:
|
||||
|
||||
A <- SF1 <- SF2 <- B
|
||||
|
||||
Where SF* are soft failed.
|
||||
"""
|
||||
|
||||
# Create the room graph
|
||||
event_id_1 = self.create_and_send_event()
|
||||
event_id_2 = self.create_and_send_event(True, [event_id_1])
|
||||
event_id_3 = self.create_and_send_event(True, [event_id_2])
|
||||
event_id_4 = self.create_and_send_event(False, [event_id_3])
|
||||
|
||||
# Check the latest events are as expected
|
||||
latest_event_ids = self.get_success(
|
||||
self.store.get_latest_event_ids_in_room(self.room_id)
|
||||
)
|
||||
|
||||
self.assertEqual(latest_event_ids, [event_id_4])
|
||||
|
||||
def test_basic_cleanup(self):
|
||||
"""Test that extremities are correctly calculated in the presence of
|
||||
soft failed events.
|
||||
|
||||
Tests a graph like:
|
||||
|
||||
A <- SF1 <- B
|
||||
|
||||
Where SF* are soft failed, and with extremities of A and B
|
||||
"""
|
||||
# Create the room graph
|
||||
event_id_a = self.create_and_send_event()
|
||||
event_id_sf1 = self.create_and_send_event(True, [event_id_a])
|
||||
event_id_b = self.create_and_send_event(False, [event_id_sf1])
|
||||
|
||||
# Add the new extremity and check the latest events are as expected
|
||||
self.add_extremity(event_id_a)
|
||||
|
||||
latest_event_ids = self.get_success(
|
||||
self.store.get_latest_event_ids_in_room(self.room_id)
|
||||
)
|
||||
self.assertEqual(set(latest_event_ids), set((event_id_a, event_id_b)))
|
||||
|
||||
# Run the background update and check it did the right thing
|
||||
self.run_background_update()
|
||||
|
||||
latest_event_ids = self.get_success(
|
||||
self.store.get_latest_event_ids_in_room(self.room_id)
|
||||
)
|
||||
self.assertEqual(latest_event_ids, [event_id_b])
|
||||
|
||||
def test_chain_of_fail_cleanup(self):
|
||||
"""Test that extremities are correctly calculated in the presence of
|
||||
soft failed events.
|
||||
|
||||
Tests a graph like:
|
||||
|
||||
A <- SF1 <- SF2 <- B
|
||||
|
||||
Where SF* are soft failed, and with extremities of A and B
|
||||
"""
|
||||
# Create the room graph
|
||||
event_id_a = self.create_and_send_event()
|
||||
event_id_sf1 = self.create_and_send_event(True, [event_id_a])
|
||||
event_id_sf2 = self.create_and_send_event(True, [event_id_sf1])
|
||||
event_id_b = self.create_and_send_event(False, [event_id_sf2])
|
||||
|
||||
# Add the new extremity and check the latest events are as expected
|
||||
self.add_extremity(event_id_a)
|
||||
|
||||
latest_event_ids = self.get_success(
|
||||
self.store.get_latest_event_ids_in_room(self.room_id)
|
||||
)
|
||||
self.assertEqual(set(latest_event_ids), set((event_id_a, event_id_b)))
|
||||
|
||||
# Run the background update and check it did the right thing
|
||||
self.run_background_update()
|
||||
|
||||
latest_event_ids = self.get_success(
|
||||
self.store.get_latest_event_ids_in_room(self.room_id)
|
||||
)
|
||||
self.assertEqual(latest_event_ids, [event_id_b])
|
||||
|
||||
def test_forked_graph_cleanup(self):
|
||||
r"""Test that extremities are correctly calculated in the presence of
|
||||
soft failed events.
|
||||
|
||||
Tests a graph like, where time flows down the page:
|
||||
|
||||
A B
|
||||
/ \ /
|
||||
/ \ /
|
||||
SF1 SF2
|
||||
| |
|
||||
SF3 |
|
||||
/ \ |
|
||||
| \ |
|
||||
C SF4
|
||||
|
||||
Where SF* are soft failed, and with them A, B and C marked as
|
||||
extremities. This should resolve to B and C being marked as extremity.
|
||||
"""
|
||||
# Create the room graph
|
||||
event_id_a = self.create_and_send_event()
|
||||
event_id_b = self.create_and_send_event()
|
||||
event_id_sf1 = self.create_and_send_event(True, [event_id_a])
|
||||
event_id_sf2 = self.create_and_send_event(True, [event_id_a, event_id_b])
|
||||
event_id_sf3 = self.create_and_send_event(True, [event_id_sf1])
|
||||
self.create_and_send_event(True, [event_id_sf2, event_id_sf3]) # SF4
|
||||
event_id_c = self.create_and_send_event(False, [event_id_sf3])
|
||||
|
||||
# Add the new extremity and check the latest events are as expected
|
||||
self.add_extremity(event_id_a)
|
||||
|
||||
latest_event_ids = self.get_success(
|
||||
self.store.get_latest_event_ids_in_room(self.room_id)
|
||||
)
|
||||
self.assertEqual(
|
||||
set(latest_event_ids), set((event_id_a, event_id_b, event_id_c))
|
||||
)
|
||||
|
||||
# Run the background update and check it did the right thing
|
||||
self.run_background_update()
|
||||
|
||||
latest_event_ids = self.get_success(
|
||||
self.store.get_latest_event_ids_in_room(self.room_id)
|
||||
)
|
||||
self.assertEqual(set(latest_event_ids), set([event_id_b, event_id_c]))
|
Loading…
Reference in a new issue