Merge branch 'develop' of github.com:matrix-org/synapse into matthew/autocreate_autojoin

This commit is contained in:
Neil Johnson 2018-10-25 14:40:06 +01:00
commit f7f487e14c
142 changed files with 4204 additions and 1202 deletions

View file

@ -23,99 +23,106 @@ jobs:
- run: docker push matrixdotorg/synapse:latest
- run: docker push matrixdotorg/synapse:latest-py3
sytestpy2:
machine: true
docker:
- image: matrixdotorg/sytest-synapsepy2
working_directory: /src
steps:
- checkout
- run: docker pull matrixdotorg/sytest-synapsepy2
- run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs matrixdotorg/sytest-synapsepy2
- run: /synapse_sytest.sh
- store_artifacts:
path: ~/project/logs
path: /logs
destination: logs
- store_test_results:
path: logs
path: /logs
sytestpy2postgres:
machine: true
docker:
- image: matrixdotorg/sytest-synapsepy2
working_directory: /src
steps:
- checkout
- run: docker pull matrixdotorg/sytest-synapsepy2
- run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs -e POSTGRES=1 matrixdotorg/sytest-synapsepy2
- run: POSTGRES=1 /synapse_sytest.sh
- store_artifacts:
path: ~/project/logs
path: /logs
destination: logs
- store_test_results:
path: logs
path: /logs
sytestpy2merged:
machine: true
docker:
- image: matrixdotorg/sytest-synapsepy2
working_directory: /src
steps:
- checkout
- run: bash .circleci/merge_base_branch.sh
- run: docker pull matrixdotorg/sytest-synapsepy2
- run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs matrixdotorg/sytest-synapsepy2
- run: /synapse_sytest.sh
- store_artifacts:
path: ~/project/logs
path: /logs
destination: logs
- store_test_results:
path: logs
path: /logs
sytestpy2postgresmerged:
machine: true
docker:
- image: matrixdotorg/sytest-synapsepy2
working_directory: /src
steps:
- checkout
- run: bash .circleci/merge_base_branch.sh
- run: docker pull matrixdotorg/sytest-synapsepy2
- run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs -e POSTGRES=1 matrixdotorg/sytest-synapsepy2
- run: POSTGRES=1 /synapse_sytest.sh
- store_artifacts:
path: ~/project/logs
path: /logs
destination: logs
- store_test_results:
path: logs
path: /logs
sytestpy3:
machine: true
docker:
- image: matrixdotorg/sytest-synapsepy3
working_directory: /src
steps:
- checkout
- run: docker pull matrixdotorg/sytest-synapsepy3
- run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs matrixdotorg/sytest-synapsepy3
- run: /synapse_sytest.sh
- store_artifacts:
path: ~/project/logs
path: /logs
destination: logs
- store_test_results:
path: logs
path: /logs
sytestpy3postgres:
machine: true
docker:
- image: matrixdotorg/sytest-synapsepy3
working_directory: /src
steps:
- checkout
- run: docker pull matrixdotorg/sytest-synapsepy3
- run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs -e POSTGRES=1 matrixdotorg/sytest-synapsepy3
- run: POSTGRES=1 /synapse_sytest.sh
- store_artifacts:
path: ~/project/logs
path: /logs
destination: logs
- store_test_results:
path: logs
path: /logs
sytestpy3merged:
machine: true
docker:
- image: matrixdotorg/sytest-synapsepy3
working_directory: /src
steps:
- checkout
- run: bash .circleci/merge_base_branch.sh
- run: docker pull matrixdotorg/sytest-synapsepy3
- run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs matrixdotorg/sytest-synapsepy3
- run: /synapse_sytest.sh
- store_artifacts:
path: ~/project/logs
path: /logs
destination: logs
- store_test_results:
path: logs
path: /logs
sytestpy3postgresmerged:
machine: true
docker:
- image: matrixdotorg/sytest-synapsepy3
working_directory: /src
steps:
- checkout
- run: bash .circleci/merge_base_branch.sh
- run: docker pull matrixdotorg/sytest-synapsepy3
- run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs -e POSTGRES=1 matrixdotorg/sytest-synapsepy3
- run: POSTGRES=1 /synapse_sytest.sh
- store_artifacts:
path: ~/project/logs
path: /logs
destination: logs
- store_test_results:
path: logs
path: /logs
workflows:
version: 2

View file

@ -16,7 +16,7 @@ then
GITBASE="develop"
else
# Get the reference, using the GitHub API
GITBASE=`curl -q https://api.github.com/repos/matrix-org/synapse/pulls/${CIRCLE_PR_NUMBER} | jq -r '.base.ref'`
GITBASE=`wget -O- https://api.github.com/repos/matrix-org/synapse/pulls/${CIRCLE_PR_NUMBER} | jq -r '.base.ref'`
fi
# Show what we are before
@ -31,4 +31,4 @@ git fetch -u origin $GITBASE
git merge --no-edit origin/$GITBASE
# Show what we are after.
git show -s
git show -s

View file

@ -1,12 +1,27 @@
sudo: false
language: python
# tell travis to cache ~/.cache/pip
cache: pip
cache:
directories:
# we only bother to cache the wheels; parts of the http cache get
# invalidated every build (because they get served with a max-age of 600
# seconds), which means that we end up re-uploading the whole cache for
# every build, which is time-consuming In any case, it's not obvious that
# downloading the cache from S3 would be much faster than downloading the
# originals from pypi.
#
- $HOME/.cache/pip/wheels
before_script:
- git remote set-branches --add origin develop
- git fetch origin develop
# don't clone the whole repo history, one commit will do
git:
depth: 1
# only build branches we care about (PRs are built seperately)
branches:
only:
- master
- develop
- /^release-v/
matrix:
fast_finish: true
@ -14,8 +29,8 @@ matrix:
- python: 2.7
env: TOX_ENV=packaging
- python: 2.7
env: TOX_ENV=pep8
- python: 3.6
env: TOX_ENV="pep8,check_isort"
- python: 2.7
env: TOX_ENV=py27
@ -39,11 +54,14 @@ matrix:
services:
- postgresql
- python: 3.6
env: TOX_ENV=check_isort
- python: 3.6
- # we only need to check for the newsfragment if it's a PR build
if: type = pull_request
python: 3.6
env: TOX_ENV=check-newsfragment
script:
- git remote set-branches --add origin develop
- git fetch origin develop
- tox -e $TOX_ENV
install:
- pip install tox

View file

@ -1,3 +1,58 @@
Synapse 0.33.7 (2018-10-18)
===========================
**Warning**: This release removes the example email notification templates from
`res/templates` (they are now internal to the python package). This should only
affect you if you (a) deploy your Synapse instance from a git checkout or a
github snapshot URL, and (b) have email notifications enabled.
If you have email notifications enabled, you should ensure that
`email.template_dir` is either configured to point at a directory where you
have installed customised templates, or leave it unset to use the default
templates.
Synapse 0.33.7rc2 (2018-10-17)
==============================
Features
--------
- Ship the example email templates as part of the package ([\#4052](https://github.com/matrix-org/synapse/issues/4052))
Bugfixes
--------
- Fix bug which made get_missing_events return too few events ([\#4045](https://github.com/matrix-org/synapse/issues/4045))
Synapse 0.33.7rc1 (2018-10-15)
==============================
Features
--------
- Add support for end-to-end key backup (MSC1687) ([\#4019](https://github.com/matrix-org/synapse/issues/4019))
Bugfixes
--------
- Fix bug in event persistence logic which caused 'NoneType is not iterable' ([\#3995](https://github.com/matrix-org/synapse/issues/3995))
- Fix exception in background metrics collection ([\#3996](https://github.com/matrix-org/synapse/issues/3996))
- Fix exception handling in fetching remote profiles ([\#3997](https://github.com/matrix-org/synapse/issues/3997))
- Fix handling of rejected threepid invites ([\#3999](https://github.com/matrix-org/synapse/issues/3999))
- Workers now start on Python 3. ([\#4027](https://github.com/matrix-org/synapse/issues/4027))
- Synapse now starts on Python 3.7. ([\#4033](https://github.com/matrix-org/synapse/issues/4033))
Internal Changes
----------------
- Log exceptions in looping calls ([\#4008](https://github.com/matrix-org/synapse/issues/4008))
- Optimisation for serving federation requests ([\#4017](https://github.com/matrix-org/synapse/issues/4017))
- Add metric to count number of non-empty sync responses ([\#4022](https://github.com/matrix-org/synapse/issues/4022))
Synapse 0.33.6 (2018-10-04)
===========================

View file

@ -12,12 +12,12 @@ recursive-include synapse/storage/schema *.sql
recursive-include synapse/storage/schema *.py
recursive-include docs *
recursive-include res *
recursive-include scripts *
recursive-include scripts-dev *
recursive-include synapse *.pyi
recursive-include tests *.py
recursive-include synapse/res *
recursive-include synapse/static *.css
recursive-include synapse/static *.gif
recursive-include synapse/static *.html

View file

@ -174,6 +174,12 @@ Alternatively, Andreas Peters (previously Silvio Fricke) has contributed a
Dockerfile to automate a synapse server in a single Docker image, at
https://hub.docker.com/r/avhost/docker-matrix/tags/
Slavi Pantaleev has created an Ansible playbook,
which installs the offical Docker image of Matrix Synapse
along with many other Matrix-related services (Postgres database, riot-web, coturn, mxisd, SSL support, etc.).
For more details, see
https://github.com/spantaleev/matrix-docker-ansible-deploy
Configuring Synapse
-------------------

View file

@ -48,6 +48,19 @@ returned by the Client-Server API:
# configured on port 443.
curl -kv https://<host.name>/_matrix/client/versions 2>&1 | grep "Server:"
Upgrading to v0.33.7
====================
This release removes the example email notification templates from
``res/templates`` (they are now internal to the python package). This should
only affect you if you (a) deploy your Synapse instance from a git checkout or
a github snapshot URL, and (b) have email notifications enabled.
If you have email notifications enabled, you should ensure that
``email.template_dir`` is either configured to point at a directory where you
have installed customised templates, or leave it unset to use the default
templates.
Upgrading to v0.27.3
====================

1
changelog.d/3698.misc Normal file
View file

@ -0,0 +1 @@
Add information about the [matrix-docker-ansible-deploy](https://github.com/spantaleev/matrix-docker-ansible-deploy) playbook

1
changelog.d/3786.misc Normal file
View file

@ -0,0 +1 @@
Add initial implementation of new state resolution algorithm

1
changelog.d/3969.bugfix Normal file
View file

@ -0,0 +1 @@
Fix HTTP error response codes for federated group requests.

View file

@ -1 +0,0 @@
Fix bug in event persistence logic which caused 'NoneType is not iterable'

View file

@ -1 +0,0 @@
Fix exception in background metrics collection

View file

@ -1 +0,0 @@
Fix exception handling in fetching remote profiles

View file

@ -1 +0,0 @@
Fix handling of rejected threepid invites

1
changelog.d/4031.misc Normal file
View file

@ -0,0 +1 @@
Various cleanups in the federation client code

1
changelog.d/4041.misc Normal file
View file

@ -0,0 +1 @@
Run the CircleCI builds in docker containers

1
changelog.d/4046.bugfix Normal file
View file

@ -0,0 +1 @@
Fix issue where Python 3 users couldn't paginate /publicRooms

1
changelog.d/4049.misc Normal file
View file

@ -0,0 +1 @@
Only colourise synctl output when attached to tty

1
changelog.d/4050.bugfix Normal file
View file

@ -0,0 +1 @@
Fix URL priewing to work in Python 3.7

1
changelog.d/4057.bugfix Normal file
View file

@ -0,0 +1 @@
synctl will use the right python executable to run worker processes

1
changelog.d/4060.bugfix Normal file
View file

@ -0,0 +1 @@
Manhole now works again on Python 3, instead of failing with a "couldn't match all kex parts" when connecting.

1
changelog.d/4061.bugfix Normal file
View file

@ -0,0 +1 @@
Fix some metrics being racy and causing exceptions when polled by Prometheus.

1
changelog.d/4063.misc Normal file
View file

@ -0,0 +1 @@
Refactor room alias creation code

1
changelog.d/4067.bugfix Normal file
View file

@ -0,0 +1 @@
Manhole now works again on Python 3, instead of failing with a "couldn't match all kex parts" when connecting.

1
changelog.d/4068.bugfix Normal file
View file

@ -0,0 +1 @@
Fix bug which prevented email notifications from being sent unless an absolute path was given for `email_templates`.

1
changelog.d/4068.misc Normal file
View file

@ -0,0 +1 @@
Make the Python scripts in the top-level scripts folders meet pep8 and pass flake8.

1
changelog.d/4073.misc Normal file
View file

@ -0,0 +1 @@
Add psutil as an explicit dependency

1
changelog.d/4074.bugfix Normal file
View file

@ -0,0 +1 @@
Correctly account for cpu usage by background threads

1
changelog.d/4075.misc Normal file
View file

@ -0,0 +1 @@
Clean up threading and logcontexts in pushers

1
changelog.d/4076.misc Normal file
View file

@ -0,0 +1 @@
Correctly manage logcontexts during startup to fix some "Unexpected logging context" warnings

1
changelog.d/4077.misc Normal file
View file

@ -0,0 +1 @@
Give some more things logcontexts

1
changelog.d/4082.misc Normal file
View file

@ -0,0 +1 @@
Clean up some bits of code which were flagged by the linter

1
changelog.d/4083.bugfix Normal file
View file

@ -0,0 +1 @@
Fix bug which prevented backslashes being used in event field filters

View file

@ -211,7 +211,9 @@ email:
require_transport_security: False
notif_from: "{{ SYNAPSE_SMTP_FROM or "hostmaster@" + SYNAPSE_SERVER_NAME }}"
app_name: Matrix
template_dir: res/templates
# if template_dir is unset, uses the example templates that are part of
# the Synapse distribution.
#template_dir: res/templates
notif_template_html: notif_mail.html
notif_template_text: notif_mail.txt
notif_for_new_users: True

View file

@ -1,21 +1,20 @@
from synapse.events import FrozenEvent
from synapse.api.auth import Auth
from mock import Mock
from __future__ import print_function
import argparse
import itertools
import json
import sys
from mock import Mock
from synapse.api.auth import Auth
from synapse.events import FrozenEvent
def check_auth(auth, auth_chain, events):
auth_chain.sort(key=lambda e: e.depth)
auth_map = {
e.event_id: e
for e in auth_chain
}
auth_map = {e.event_id: e for e in auth_chain}
create_events = {}
for e in auth_chain:
@ -25,31 +24,26 @@ def check_auth(auth, auth_chain, events):
for e in itertools.chain(auth_chain, events):
auth_events_list = [auth_map[i] for i, _ in e.auth_events]
auth_events = {
(e.type, e.state_key): e
for e in auth_events_list
}
auth_events = {(e.type, e.state_key): e for e in auth_events_list}
auth_events[("m.room.create", "")] = create_events[e.room_id]
try:
auth.check(e, auth_events=auth_events)
except Exception as ex:
print "Failed:", e.event_id, e.type, e.state_key
print "Auth_events:", auth_events
print ex
print json.dumps(e.get_dict(), sort_keys=True, indent=4)
print("Failed:", e.event_id, e.type, e.state_key)
print("Auth_events:", auth_events)
print(ex)
print(json.dumps(e.get_dict(), sort_keys=True, indent=4))
# raise
print "Success:", e.event_id, e.type, e.state_key
print("Success:", e.event_id, e.type, e.state_key)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument(
'json',
nargs='?',
type=argparse.FileType('r'),
default=sys.stdin,
'json', nargs='?', type=argparse.FileType('r'), default=sys.stdin
)
args = parser.parse_args()

View file

@ -1,10 +1,15 @@
from synapse.crypto.event_signing import *
from unpaddedbase64 import encode_base64
import argparse
import hashlib
import sys
import json
import logging
import sys
from unpaddedbase64 import encode_base64
from synapse.crypto.event_signing import (
check_event_content_hash,
compute_event_reference_hash,
)
class dictobj(dict):
@ -24,27 +29,26 @@ class dictobj(dict):
def main():
parser = argparse.ArgumentParser()
parser.add_argument("input_json", nargs="?", type=argparse.FileType('r'),
default=sys.stdin)
parser.add_argument(
"input_json", nargs="?", type=argparse.FileType('r'), default=sys.stdin
)
args = parser.parse_args()
logging.basicConfig()
event_json = dictobj(json.load(args.input_json))
algorithms = {
"sha256": hashlib.sha256,
}
algorithms = {"sha256": hashlib.sha256}
for alg_name in event_json.hashes:
if check_event_content_hash(event_json, algorithms[alg_name]):
print "PASS content hash %s" % (alg_name,)
print("PASS content hash %s" % (alg_name,))
else:
print "FAIL content hash %s" % (alg_name,)
print("FAIL content hash %s" % (alg_name,))
for algorithm in algorithms.values():
name, h_bytes = compute_event_reference_hash(event_json, algorithm)
print "Reference hash %s: %s" % (name, encode_base64(h_bytes))
print("Reference hash %s: %s" % (name, encode_base64(h_bytes)))
if __name__=="__main__":
if __name__ == "__main__":
main()

View file

@ -1,15 +1,15 @@
from signedjson.sign import verify_signed_json
import argparse
import json
import logging
import sys
import urllib2
import dns.resolver
from signedjson.key import decode_verify_key_bytes, write_signing_keys
from signedjson.sign import verify_signed_json
from unpaddedbase64 import decode_base64
import urllib2
import json
import sys
import dns.resolver
import pprint
import argparse
import logging
def get_targets(server_name):
if ":" in server_name:
@ -23,6 +23,7 @@ def get_targets(server_name):
except dns.resolver.NXDOMAIN:
yield (server_name, 8448)
def get_server_keys(server_name, target, port):
url = "https://%s:%i/_matrix/key/v1" % (target, port)
keys = json.load(urllib2.urlopen(url))
@ -33,12 +34,14 @@ def get_server_keys(server_name, target, port):
verify_keys[key_id] = verify_key
return verify_keys
def main():
parser = argparse.ArgumentParser()
parser.add_argument("signature_name")
parser.add_argument("input_json", nargs="?", type=argparse.FileType('r'),
default=sys.stdin)
parser.add_argument(
"input_json", nargs="?", type=argparse.FileType('r'), default=sys.stdin
)
args = parser.parse_args()
logging.basicConfig()
@ -48,24 +51,23 @@ def main():
for target, port in get_targets(server_name):
try:
keys = get_server_keys(server_name, target, port)
print "Using keys from https://%s:%s/_matrix/key/v1" % (target, port)
print("Using keys from https://%s:%s/_matrix/key/v1" % (target, port))
write_signing_keys(sys.stdout, keys.values())
break
except:
except Exception:
logging.exception("Error talking to %s:%s", target, port)
json_to_check = json.load(args.input_json)
print "Checking JSON:"
print("Checking JSON:")
for key_id in json_to_check["signatures"][args.signature_name]:
try:
key = keys[key_id]
verify_signed_json(json_to_check, args.signature_name, key)
print "PASS %s" % (key_id,)
except:
print("PASS %s" % (key_id,))
except Exception:
logging.exception("Check for key %s failed" % (key_id,))
print "FAIL %s" % (key_id,)
print("FAIL %s" % (key_id,))
if __name__ == '__main__':
main()

View file

@ -1,13 +1,21 @@
import hashlib
import json
import sys
import time
import six
import psycopg2
import yaml
import sys
import json
import time
import hashlib
from unpaddedbase64 import encode_base64
from canonicaljson import encode_canonical_json
from signedjson.key import read_signing_keys
from signedjson.sign import sign_json
from canonicaljson import encode_canonical_json
from unpaddedbase64 import encode_base64
if six.PY2:
db_type = six.moves.builtins.buffer
else:
db_type = memoryview
def select_v1_keys(connection):
@ -39,7 +47,9 @@ def select_v2_json(connection):
cursor.close()
results = {}
for server_name, key_id, key_json in rows:
results.setdefault(server_name, {})[key_id] = json.loads(str(key_json).decode("utf-8"))
results.setdefault(server_name, {})[key_id] = json.loads(
str(key_json).decode("utf-8")
)
return results
@ -47,10 +57,7 @@ def convert_v1_to_v2(server_name, valid_until, keys, certificate):
return {
"old_verify_keys": {},
"server_name": server_name,
"verify_keys": {
key_id: {"key": key}
for key_id, key in keys.items()
},
"verify_keys": {key_id: {"key": key} for key_id, key in keys.items()},
"valid_until_ts": valid_until,
"tls_fingerprints": [fingerprint(certificate)],
}
@ -65,7 +72,7 @@ def rows_v2(server, json):
valid_until = json["valid_until_ts"]
key_json = encode_canonical_json(json)
for key_id in json["verify_keys"]:
yield (server, key_id, "-", valid_until, valid_until, buffer(key_json))
yield (server, key_id, "-", valid_until, valid_until, db_type(key_json))
def main():
@ -87,7 +94,7 @@ def main():
result = {}
for server in keys:
if not server in json:
if server not in json:
v2_json = convert_v1_to_v2(
server, valid_until, keys[server], certificates[server]
)
@ -96,10 +103,7 @@ def main():
yaml.safe_dump(result, sys.stdout, default_flow_style=False)
rows = list(
row for server, json in result.items()
for row in rows_v2(server, json)
)
rows = list(row for server, json in result.items() for row in rows_v2(server, json))
cursor = connection.cursor()
cursor.executemany(
@ -107,7 +111,7 @@ def main():
" server_name, key_id, from_server,"
" ts_added_ms, ts_valid_until_ms, key_json"
") VALUES (%s, %s, %s, %s, %s, %s)",
rows
rows,
)
connection.commit()

View file

@ -1,8 +1,16 @@
#! /usr/bin/python
from __future__ import print_function
import argparse
import ast
import os
import re
import sys
import yaml
class DefinitionVisitor(ast.NodeVisitor):
def __init__(self):
super(DefinitionVisitor, self).__init__()
@ -42,15 +50,18 @@ def non_empty(defs):
functions = {name: non_empty(f) for name, f in defs['def'].items()}
classes = {name: non_empty(f) for name, f in defs['class'].items()}
result = {}
if functions: result['def'] = functions
if classes: result['class'] = classes
if functions:
result['def'] = functions
if classes:
result['class'] = classes
names = defs['names']
uses = []
for name in names.get('Load', ()):
if name not in names.get('Param', ()) and name not in names.get('Store', ()):
uses.append(name)
uses.extend(defs['attrs'])
if uses: result['uses'] = uses
if uses:
result['uses'] = uses
result['names'] = names
result['attrs'] = defs['attrs']
return result
@ -95,7 +106,6 @@ def used_names(prefix, item, defs, names):
if __name__ == '__main__':
import sys, os, argparse, re
parser = argparse.ArgumentParser(description='Find definitions.')
parser.add_argument(
@ -105,24 +115,28 @@ if __name__ == '__main__':
"--ignore", action="append", metavar="REGEXP", help="Ignore a pattern"
)
parser.add_argument(
"--pattern", action="append", metavar="REGEXP",
help="Search for a pattern"
"--pattern", action="append", metavar="REGEXP", help="Search for a pattern"
)
parser.add_argument(
"directories", nargs='+', metavar="DIR",
help="Directories to search for definitions"
"directories",
nargs='+',
metavar="DIR",
help="Directories to search for definitions",
)
parser.add_argument(
"--referrers", default=0, type=int,
help="Include referrers up to the given depth"
"--referrers",
default=0,
type=int,
help="Include referrers up to the given depth",
)
parser.add_argument(
"--referred", default=0, type=int,
help="Include referred down to the given depth"
"--referred",
default=0,
type=int,
help="Include referred down to the given depth",
)
parser.add_argument(
"--format", default="yaml",
help="Output format, one of 'yaml' or 'dot'"
"--format", default="yaml", help="Output format, one of 'yaml' or 'dot'"
)
args = parser.parse_args()
@ -162,7 +176,7 @@ if __name__ == '__main__':
for used_by in entry.get("used", ()):
referrers.add(used_by)
for name, definition in names.items():
if not name in referrers:
if name not in referrers:
continue
if ignore and any(pattern.match(name) for pattern in ignore):
continue
@ -176,7 +190,7 @@ if __name__ == '__main__':
for uses in entry.get("uses", ()):
referred.add(uses)
for name, definition in names.items():
if not name in referred:
if name not in referred:
continue
if ignore and any(pattern.match(name) for pattern in ignore):
continue
@ -185,12 +199,12 @@ if __name__ == '__main__':
if args.format == 'yaml':
yaml.dump(result, sys.stdout, default_flow_style=False)
elif args.format == 'dot':
print "digraph {"
print("digraph {")
for name, entry in result.items():
print name
print(name)
for used_by in entry.get("used", ()):
if used_by in result:
print used_by, "->", name
print "}"
print(used_by, "->", name)
print("}")
else:
raise ValueError("Unknown format %r" % (args.format))

View file

@ -1,8 +1,11 @@
#!/usr/bin/env python2
import pymacaroons
from __future__ import print_function
import sys
import pymacaroons
if len(sys.argv) == 1:
sys.stderr.write("usage: %s macaroon [key]\n" % (sys.argv[0],))
sys.exit(1)
@ -11,14 +14,14 @@ macaroon_string = sys.argv[1]
key = sys.argv[2] if len(sys.argv) > 2 else None
macaroon = pymacaroons.Macaroon.deserialize(macaroon_string)
print macaroon.inspect()
print(macaroon.inspect())
print ""
print("")
verifier = pymacaroons.Verifier()
verifier.satisfy_general(lambda c: True)
try:
verifier.verify(macaroon, key)
print "Signature is correct"
print("Signature is correct")
except Exception as e:
print str(e)
print(str(e))

View file

@ -18,21 +18,21 @@
from __future__ import print_function
import argparse
import base64
import json
import sys
from urlparse import urlparse, urlunparse
import nacl.signing
import json
import base64
import requests
import sys
from requests.adapters import HTTPAdapter
import srvlookup
import yaml
from requests.adapters import HTTPAdapter
# uncomment the following to enable debug logging of http requests
#from httplib import HTTPConnection
#HTTPConnection.debuglevel = 1
# from httplib import HTTPConnection
# HTTPConnection.debuglevel = 1
def encode_base64(input_bytes):
"""Encode bytes as a base64 string without any padding."""
@ -58,15 +58,15 @@ def decode_base64(input_string):
def encode_canonical_json(value):
return json.dumps(
value,
# Encode code-points outside of ASCII as UTF-8 rather than \u escapes
ensure_ascii=False,
# Remove unecessary white space.
separators=(',',':'),
# Sort the keys of dictionaries.
sort_keys=True,
# Encode the resulting unicode as UTF-8 bytes.
).encode("UTF-8")
value,
# Encode code-points outside of ASCII as UTF-8 rather than \u escapes
ensure_ascii=False,
# Remove unecessary white space.
separators=(',', ':'),
# Sort the keys of dictionaries.
sort_keys=True,
# Encode the resulting unicode as UTF-8 bytes.
).encode("UTF-8")
def sign_json(json_object, signing_key, signing_name):
@ -88,6 +88,7 @@ def sign_json(json_object, signing_key, signing_name):
NACL_ED25519 = "ed25519"
def decode_signing_key_base64(algorithm, version, key_base64):
"""Decode a base64 encoded signing key
Args:
@ -143,14 +144,12 @@ def request_json(method, origin_name, origin_key, destination, path, content):
authorization_headers = []
for key, sig in signed_json["signatures"][origin_name].items():
header = "X-Matrix origin=%s,key=\"%s\",sig=\"%s\"" % (
origin_name, key, sig,
)
header = "X-Matrix origin=%s,key=\"%s\",sig=\"%s\"" % (origin_name, key, sig)
authorization_headers.append(bytes(header))
print ("Authorization: %s" % header, file=sys.stderr)
print("Authorization: %s" % header, file=sys.stderr)
dest = "matrix://%s%s" % (destination, path)
print ("Requesting %s" % dest, file=sys.stderr)
print("Requesting %s" % dest, file=sys.stderr)
s = requests.Session()
s.mount("matrix://", MatrixConnectionAdapter())
@ -158,10 +157,7 @@ def request_json(method, origin_name, origin_key, destination, path, content):
result = s.request(
method=method,
url=dest,
headers={
"Host": destination,
"Authorization": authorization_headers[0]
},
headers={"Host": destination, "Authorization": authorization_headers[0]},
verify=False,
data=content,
)
@ -171,50 +167,50 @@ def request_json(method, origin_name, origin_key, destination, path, content):
def main():
parser = argparse.ArgumentParser(
description=
"Signs and sends a federation request to a matrix homeserver",
description="Signs and sends a federation request to a matrix homeserver"
)
parser.add_argument(
"-N", "--server-name",
"-N",
"--server-name",
help="Name to give as the local homeserver. If unspecified, will be "
"read from the config file.",
"read from the config file.",
)
parser.add_argument(
"-k", "--signing-key-path",
"-k",
"--signing-key-path",
help="Path to the file containing the private ed25519 key to sign the "
"request with.",
"request with.",
)
parser.add_argument(
"-c", "--config",
"-c",
"--config",
default="homeserver.yaml",
help="Path to server config file. Ignored if --server-name and "
"--signing-key-path are both given.",
"--signing-key-path are both given.",
)
parser.add_argument(
"-d", "--destination",
"-d",
"--destination",
default="matrix.org",
help="name of the remote homeserver. We will do SRV lookups and "
"connect appropriately.",
"connect appropriately.",
)
parser.add_argument(
"-X", "--method",
"-X",
"--method",
help="HTTP method to use for the request. Defaults to GET if --data is"
"unspecified, POST if it is."
"unspecified, POST if it is.",
)
parser.add_argument(
"--body",
help="Data to send as the body of the HTTP request"
)
parser.add_argument("--body", help="Data to send as the body of the HTTP request")
parser.add_argument(
"path",
help="request path. We will add '/_matrix/federation/v1/' to this."
"path", help="request path. We will add '/_matrix/federation/v1/' to this."
)
args = parser.parse_args()
@ -227,13 +223,15 @@ def main():
result = request_json(
args.method,
args.server_name, key, args.destination,
args.server_name,
key,
args.destination,
"/_matrix/federation/v1/" + args.path,
content=args.body,
)
json.dump(result, sys.stdout)
print ("")
print("")
def read_args_from_config(args):
@ -253,7 +251,7 @@ class MatrixConnectionAdapter(HTTPAdapter):
return s, 8448
if ":" in s:
out = s.rsplit(":",1)
out = s.rsplit(":", 1)
try:
port = int(out[1])
except ValueError:
@ -263,7 +261,7 @@ class MatrixConnectionAdapter(HTTPAdapter):
try:
srv = srvlookup.lookup("matrix", "tcp", s)[0]
return srv.host, srv.port
except:
except Exception:
return s, 8448
def get_connection(self, url, proxies=None):
@ -272,10 +270,9 @@ class MatrixConnectionAdapter(HTTPAdapter):
(host, port) = self.lookup(parsed.netloc)
netloc = "%s:%d" % (host, port)
print("Connecting to %s" % (netloc,), file=sys.stderr)
url = urlunparse((
"https", netloc, parsed.path, parsed.params, parsed.query,
parsed.fragment,
))
url = urlunparse(
("https", netloc, parsed.path, parsed.params, parsed.query, parsed.fragment)
)
return super(MatrixConnectionAdapter, self).get_connection(url, proxies)

View file

@ -1,23 +1,31 @@
from synapse.storage.pdu import PduStore
from synapse.storage.signatures import SignatureStore
from synapse.storage._base import SQLBaseStore
from synapse.federation.units import Pdu
from synapse.crypto.event_signing import (
add_event_pdu_content_hash, compute_pdu_event_reference_hash
)
from synapse.api.events.utils import prune_pdu
from unpaddedbase64 import encode_base64, decode_base64
from canonicaljson import encode_canonical_json
from __future__ import print_function
import sqlite3
import sys
from unpaddedbase64 import decode_base64, encode_base64
from synapse.crypto.event_signing import (
add_event_pdu_content_hash,
compute_pdu_event_reference_hash,
)
from synapse.federation.units import Pdu
from synapse.storage._base import SQLBaseStore
from synapse.storage.pdu import PduStore
from synapse.storage.signatures import SignatureStore
class Store(object):
_get_pdu_tuples = PduStore.__dict__["_get_pdu_tuples"]
_get_pdu_content_hashes_txn = SignatureStore.__dict__["_get_pdu_content_hashes_txn"]
_get_prev_pdu_hashes_txn = SignatureStore.__dict__["_get_prev_pdu_hashes_txn"]
_get_pdu_origin_signatures_txn = SignatureStore.__dict__["_get_pdu_origin_signatures_txn"]
_get_pdu_origin_signatures_txn = SignatureStore.__dict__[
"_get_pdu_origin_signatures_txn"
]
_store_pdu_content_hash_txn = SignatureStore.__dict__["_store_pdu_content_hash_txn"]
_store_pdu_reference_hash_txn = SignatureStore.__dict__["_store_pdu_reference_hash_txn"]
_store_pdu_reference_hash_txn = SignatureStore.__dict__[
"_store_pdu_reference_hash_txn"
]
_store_prev_pdu_hash_txn = SignatureStore.__dict__["_store_prev_pdu_hash_txn"]
_simple_insert_txn = SQLBaseStore.__dict__["_simple_insert_txn"]
@ -26,9 +34,7 @@ store = Store()
def select_pdus(cursor):
cursor.execute(
"SELECT pdu_id, origin FROM pdus ORDER BY depth ASC"
)
cursor.execute("SELECT pdu_id, origin FROM pdus ORDER BY depth ASC")
ids = cursor.fetchall()
@ -41,23 +47,30 @@ def select_pdus(cursor):
for pdu in pdus:
try:
if pdu.prev_pdus:
print "PROCESS", pdu.pdu_id, pdu.origin, pdu.prev_pdus
print("PROCESS", pdu.pdu_id, pdu.origin, pdu.prev_pdus)
for pdu_id, origin, hashes in pdu.prev_pdus:
ref_alg, ref_hsh = reference_hashes[(pdu_id, origin)]
hashes[ref_alg] = encode_base64(ref_hsh)
store._store_prev_pdu_hash_txn(cursor, pdu.pdu_id, pdu.origin, pdu_id, origin, ref_alg, ref_hsh)
print "SUCCESS", pdu.pdu_id, pdu.origin, pdu.prev_pdus
store._store_prev_pdu_hash_txn(
cursor, pdu.pdu_id, pdu.origin, pdu_id, origin, ref_alg, ref_hsh
)
print("SUCCESS", pdu.pdu_id, pdu.origin, pdu.prev_pdus)
pdu = add_event_pdu_content_hash(pdu)
ref_alg, ref_hsh = compute_pdu_event_reference_hash(pdu)
reference_hashes[(pdu.pdu_id, pdu.origin)] = (ref_alg, ref_hsh)
store._store_pdu_reference_hash_txn(cursor, pdu.pdu_id, pdu.origin, ref_alg, ref_hsh)
store._store_pdu_reference_hash_txn(
cursor, pdu.pdu_id, pdu.origin, ref_alg, ref_hsh
)
for alg, hsh_base64 in pdu.hashes.items():
print alg, hsh_base64
store._store_pdu_content_hash_txn(cursor, pdu.pdu_id, pdu.origin, alg, decode_base64(hsh_base64))
print(alg, hsh_base64)
store._store_pdu_content_hash_txn(
cursor, pdu.pdu_id, pdu.origin, alg, decode_base64(hsh_base64)
)
except Exception:
print("FAILED_", pdu.pdu_id, pdu.origin, pdu.prev_pdus)
except:
print "FAILED_", pdu.pdu_id, pdu.origin, pdu.prev_pdus
def main():
conn = sqlite3.connect(sys.argv[1])
@ -65,5 +78,6 @@ def main():
select_pdus(cursor)
conn.commit()
if __name__=='__main__':
if __name__ == '__main__':
main()

View file

@ -1,18 +1,17 @@
#! /usr/bin/python
import ast
import argparse
import ast
import os
import sys
import yaml
PATTERNS_V1 = []
PATTERNS_V2 = []
RESULT = {
"v1": PATTERNS_V1,
"v2": PATTERNS_V2,
}
RESULT = {"v1": PATTERNS_V1, "v2": PATTERNS_V2}
class CallVisitor(ast.NodeVisitor):
def visit_Call(self, node):
@ -21,7 +20,6 @@ class CallVisitor(ast.NodeVisitor):
else:
return
if name == "client_path_patterns":
PATTERNS_V1.append(node.args[0].s)
elif name == "client_v2_patterns":
@ -42,8 +40,10 @@ def find_patterns_in_file(filepath):
parser = argparse.ArgumentParser(description='Find url patterns.')
parser.add_argument(
"directories", nargs='+', metavar="DIR",
help="Directories to search for definitions"
"directories",
nargs='+',
metavar="DIR",
help="Directories to search for definitions",
)
args = parser.parse_args()

View file

@ -1,8 +1,9 @@
import requests
import collections
import json
import sys
import time
import json
import requests
Entry = collections.namedtuple("Entry", "name position rows")
@ -30,11 +31,11 @@ def parse_response(content):
def replicate(server, streams):
return parse_response(requests.get(
server + "/_synapse/replication",
verify=False,
params=streams
).content)
return parse_response(
requests.get(
server + "/_synapse/replication", verify=False, params=streams
).content
)
def main():
@ -45,16 +46,16 @@ def main():
try:
streams = {
row.name: row.position
for row in replicate(server, {"streams":"-1"})["streams"].rows
for row in replicate(server, {"streams": "-1"})["streams"].rows
}
except requests.exceptions.ConnectionError as e:
except requests.exceptions.ConnectionError:
time.sleep(0.1)
while True:
try:
results = replicate(server, streams)
except:
sys.stdout.write("connection_lost("+ repr(streams) + ")\n")
except Exception:
sys.stdout.write("connection_lost(" + repr(streams) + ")\n")
break
for update in results.values():
for row in update.rows:
@ -62,6 +63,5 @@ def main():
streams[update.name] = update.position
if __name__=='__main__':
if __name__ == '__main__':
main()

View file

@ -1,12 +1,10 @@
#!/usr/bin/env python
import argparse
import getpass
import sys
import bcrypt
import getpass
import yaml
bcrypt_rounds=12
@ -52,4 +50,3 @@ if __name__ == "__main__":
password = prompt_for_pass()
print bcrypt.hashpw(password + password_pepper, bcrypt.gensalt(bcrypt_rounds))

View file

@ -36,12 +36,9 @@ from __future__ import print_function
import argparse
import logging
import sys
import os
import shutil
import sys
from synapse.rest.media.v1.filepath import MediaFilePaths
@ -77,24 +74,23 @@ def move_media(origin_server, file_id, src_paths, dest_paths):
if not os.path.exists(original_file):
logger.warn(
"Original for %s/%s (%s) does not exist",
origin_server, file_id, original_file,
origin_server,
file_id,
original_file,
)
else:
mkdir_and_move(
original_file,
dest_paths.remote_media_filepath(origin_server, file_id),
original_file, dest_paths.remote_media_filepath(origin_server, file_id)
)
# now look for thumbnails
original_thumb_dir = src_paths.remote_media_thumbnail_dir(
origin_server, file_id,
)
original_thumb_dir = src_paths.remote_media_thumbnail_dir(origin_server, file_id)
if not os.path.exists(original_thumb_dir):
return
mkdir_and_move(
original_thumb_dir,
dest_paths.remote_media_thumbnail_dir(origin_server, file_id)
dest_paths.remote_media_thumbnail_dir(origin_server, file_id),
)
@ -109,24 +105,16 @@ def mkdir_and_move(original_file, dest_file):
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class = argparse.RawDescriptionHelpFormatter,
)
parser.add_argument(
"-v", action='store_true', help='enable debug logging')
parser.add_argument(
"src_repo",
help="Path to source content repo",
)
parser.add_argument(
"dest_repo",
help="Path to source content repo",
description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument("-v", action='store_true', help='enable debug logging')
parser.add_argument("src_repo", help="Path to source content repo")
parser.add_argument("dest_repo", help="Path to source content repo")
args = parser.parse_args()
logging_config = {
"level": logging.DEBUG if args.v else logging.INFO,
"format": "%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(message)s"
"format": "%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(message)s",
}
logging.basicConfig(**logging_config)

View file

@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import argparse
import getpass
@ -22,19 +23,23 @@ import hmac
import json
import sys
import urllib2
from six import input
import yaml
def request_registration(user, password, server_location, shared_secret, admin=False):
req = urllib2.Request(
"%s/_matrix/client/r0/admin/register" % (server_location,),
headers={'Content-Type': 'application/json'}
headers={'Content-Type': 'application/json'},
)
try:
if sys.version_info[:3] >= (2, 7, 9):
# As of version 2.7.9, urllib2 now checks SSL certs
import ssl
f = urllib2.urlopen(req, context=ssl.SSLContext(ssl.PROTOCOL_SSLv23))
else:
f = urllib2.urlopen(req)
@ -42,18 +47,15 @@ def request_registration(user, password, server_location, shared_secret, admin=F
f.close()
nonce = json.loads(body)["nonce"]
except urllib2.HTTPError as e:
print "ERROR! Received %d %s" % (e.code, e.reason,)
print("ERROR! Received %d %s" % (e.code, e.reason))
if 400 <= e.code < 500:
if e.info().type == "application/json":
resp = json.load(e)
if "error" in resp:
print resp["error"]
print(resp["error"])
sys.exit(1)
mac = hmac.new(
key=shared_secret,
digestmod=hashlib.sha1,
)
mac = hmac.new(key=shared_secret, digestmod=hashlib.sha1)
mac.update(nonce)
mac.update("\x00")
@ -75,30 +77,31 @@ def request_registration(user, password, server_location, shared_secret, admin=F
server_location = server_location.rstrip("/")
print "Sending registration request..."
print("Sending registration request...")
req = urllib2.Request(
"%s/_matrix/client/r0/admin/register" % (server_location,),
data=json.dumps(data),
headers={'Content-Type': 'application/json'}
headers={'Content-Type': 'application/json'},
)
try:
if sys.version_info[:3] >= (2, 7, 9):
# As of version 2.7.9, urllib2 now checks SSL certs
import ssl
f = urllib2.urlopen(req, context=ssl.SSLContext(ssl.PROTOCOL_SSLv23))
else:
f = urllib2.urlopen(req)
f.read()
f.close()
print "Success."
print("Success.")
except urllib2.HTTPError as e:
print "ERROR! Received %d %s" % (e.code, e.reason,)
print("ERROR! Received %d %s" % (e.code, e.reason))
if 400 <= e.code < 500:
if e.info().type == "application/json":
resp = json.load(e)
if "error" in resp:
print resp["error"]
print(resp["error"])
sys.exit(1)
@ -106,35 +109,35 @@ def register_new_user(user, password, server_location, shared_secret, admin):
if not user:
try:
default_user = getpass.getuser()
except:
except Exception:
default_user = None
if default_user:
user = raw_input("New user localpart [%s]: " % (default_user,))
user = input("New user localpart [%s]: " % (default_user,))
if not user:
user = default_user
else:
user = raw_input("New user localpart: ")
user = input("New user localpart: ")
if not user:
print "Invalid user name"
print("Invalid user name")
sys.exit(1)
if not password:
password = getpass.getpass("Password: ")
if not password:
print "Password cannot be blank."
print("Password cannot be blank.")
sys.exit(1)
confirm_password = getpass.getpass("Confirm password: ")
if password != confirm_password:
print "Passwords do not match"
print("Passwords do not match")
sys.exit(1)
if admin is None:
admin = raw_input("Make admin [no]: ")
admin = input("Make admin [no]: ")
if admin in ("y", "yes", "true"):
admin = True
else:
@ -146,42 +149,51 @@ def register_new_user(user, password, server_location, shared_secret, admin):
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Used to register new users with a given home server when"
" registration has been disabled. The home server must be"
" configured with the 'registration_shared_secret' option"
" set.",
" registration has been disabled. The home server must be"
" configured with the 'registration_shared_secret' option"
" set."
)
parser.add_argument(
"-u", "--user",
"-u",
"--user",
default=None,
help="Local part of the new user. Will prompt if omitted.",
)
parser.add_argument(
"-p", "--password",
"-p",
"--password",
default=None,
help="New password for user. Will prompt if omitted.",
)
admin_group = parser.add_mutually_exclusive_group()
admin_group.add_argument(
"-a", "--admin",
"-a",
"--admin",
action="store_true",
help="Register new user as an admin. Will prompt if --no-admin is not set either.",
help=(
"Register new user as an admin. "
"Will prompt if --no-admin is not set either."
),
)
admin_group.add_argument(
"--no-admin",
action="store_true",
help="Register new user as a regular user. Will prompt if --admin is not set either.",
help=(
"Register new user as a regular user. "
"Will prompt if --admin is not set either."
),
)
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument(
"-c", "--config",
"-c",
"--config",
type=argparse.FileType('r'),
help="Path to server config file. Used to read in shared secret.",
)
group.add_argument(
"-k", "--shared-secret",
help="Shared secret as defined in server config file.",
"-k", "--shared-secret", help="Shared secret as defined in server config file."
)
parser.add_argument(
@ -189,7 +201,7 @@ if __name__ == "__main__":
default="https://localhost:8448",
nargs='?',
help="URL to use to talk to the home server. Defaults to "
" 'https://localhost:8448'.",
" 'https://localhost:8448'.",
)
args = parser.parse_args()
@ -198,7 +210,7 @@ if __name__ == "__main__":
config = yaml.safe_load(args.config)
secret = config.get("registration_shared_secret", None)
if not secret:
print "No 'registration_shared_secret' defined in config."
print("No 'registration_shared_secret' defined in config.")
sys.exit(1)
else:
secret = args.shared_secret

View file

@ -15,23 +15,23 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet import defer, reactor
from twisted.enterprise import adbapi
from synapse.storage._base import LoggingTransaction, SQLBaseStore
from synapse.storage.engines import create_engine
from synapse.storage.prepare_database import prepare_database
import argparse
import curses
import logging
import sys
import time
import traceback
import yaml
from six import string_types
import yaml
from twisted.enterprise import adbapi
from twisted.internet import defer, reactor
from synapse.storage._base import LoggingTransaction, SQLBaseStore
from synapse.storage.engines import create_engine
from synapse.storage.prepare_database import prepare_database
logger = logging.getLogger("synapse_port_db")
@ -105,6 +105,7 @@ class Store(object):
*All* database interactions should go through this object.
"""
def __init__(self, db_pool, engine):
self.db_pool = db_pool
self.database_engine = engine
@ -135,7 +136,8 @@ class Store(object):
txn = conn.cursor()
return func(
LoggingTransaction(txn, desc, self.database_engine, [], []),
*args, **kwargs
*args,
**kwargs
)
except self.database_engine.module.DatabaseError as e:
if self.database_engine.is_deadlock(e):
@ -158,22 +160,20 @@ class Store(object):
def r(txn):
txn.execute(sql, args)
return txn.fetchall()
return self.runInteraction("execute_sql", r)
def insert_many_txn(self, txn, table, headers, rows):
sql = "INSERT INTO %s (%s) VALUES (%s)" % (
table,
", ".join(k for k in headers),
", ".join("%s" for _ in headers)
", ".join("%s" for _ in headers),
)
try:
txn.executemany(sql, rows)
except:
logger.exception(
"Failed to insert: %s",
table,
)
except Exception:
logger.exception("Failed to insert: %s", table)
raise
@ -206,7 +206,7 @@ class Porter(object):
"table_name": table,
"forward_rowid": 1,
"backward_rowid": 0,
}
},
)
forward_chunk = 1
@ -221,10 +221,10 @@ class Porter(object):
table, forward_chunk, backward_chunk
)
else:
def delete_all(txn):
txn.execute(
"DELETE FROM port_from_sqlite3 WHERE table_name = %s",
(table,)
"DELETE FROM port_from_sqlite3 WHERE table_name = %s", (table,)
)
txn.execute("TRUNCATE %s CASCADE" % (table,))
@ -232,11 +232,7 @@ class Porter(object):
yield self.postgres_store._simple_insert(
table="port_from_sqlite3",
values={
"table_name": table,
"forward_rowid": 1,
"backward_rowid": 0,
}
values={"table_name": table, "forward_rowid": 1, "backward_rowid": 0},
)
forward_chunk = 1
@ -251,12 +247,16 @@ class Porter(object):
)
@defer.inlineCallbacks
def handle_table(self, table, postgres_size, table_size, forward_chunk,
backward_chunk):
def handle_table(
self, table, postgres_size, table_size, forward_chunk, backward_chunk
):
logger.info(
"Table %s: %i/%i (rows %i-%i) already ported",
table, postgres_size, table_size,
backward_chunk+1, forward_chunk-1,
table,
postgres_size,
table_size,
backward_chunk + 1,
forward_chunk - 1,
)
if not table_size:
@ -271,7 +271,9 @@ class Porter(object):
return
if table in (
"user_directory", "user_directory_search", "users_who_share_rooms",
"user_directory",
"user_directory_search",
"users_who_share_rooms",
"users_in_pubic_room",
):
# We don't port these tables, as they're a faff and we can regenreate
@ -283,37 +285,35 @@ class Porter(object):
# We need to make sure there is a single row, `(X, null), as that is
# what synapse expects to be there.
yield self.postgres_store._simple_insert(
table=table,
values={"stream_id": None},
table=table, values={"stream_id": None}
)
self.progress.update(table, table_size) # Mark table as done
return
forward_select = (
"SELECT rowid, * FROM %s WHERE rowid >= ? ORDER BY rowid LIMIT ?"
% (table,)
"SELECT rowid, * FROM %s WHERE rowid >= ? ORDER BY rowid LIMIT ?" % (table,)
)
backward_select = (
"SELECT rowid, * FROM %s WHERE rowid <= ? ORDER BY rowid LIMIT ?"
% (table,)
"SELECT rowid, * FROM %s WHERE rowid <= ? ORDER BY rowid LIMIT ?" % (table,)
)
do_forward = [True]
do_backward = [True]
while True:
def r(txn):
forward_rows = []
backward_rows = []
if do_forward[0]:
txn.execute(forward_select, (forward_chunk, self.batch_size,))
txn.execute(forward_select, (forward_chunk, self.batch_size))
forward_rows = txn.fetchall()
if not forward_rows:
do_forward[0] = False
if do_backward[0]:
txn.execute(backward_select, (backward_chunk, self.batch_size,))
txn.execute(backward_select, (backward_chunk, self.batch_size))
backward_rows = txn.fetchall()
if not backward_rows:
do_backward[0] = False
@ -325,9 +325,7 @@ class Porter(object):
return headers, forward_rows, backward_rows
headers, frows, brows = yield self.sqlite_store.runInteraction(
"select", r
)
headers, frows, brows = yield self.sqlite_store.runInteraction("select", r)
if frows or brows:
if frows:
@ -339,9 +337,7 @@ class Porter(object):
rows = self._convert_rows(table, headers, rows)
def insert(txn):
self.postgres_store.insert_many_txn(
txn, table, headers[1:], rows
)
self.postgres_store.insert_many_txn(txn, table, headers[1:], rows)
self.postgres_store._simple_update_one_txn(
txn,
@ -362,8 +358,9 @@ class Porter(object):
return
@defer.inlineCallbacks
def handle_search_table(self, postgres_size, table_size, forward_chunk,
backward_chunk):
def handle_search_table(
self, postgres_size, table_size, forward_chunk, backward_chunk
):
select = (
"SELECT es.rowid, es.*, e.origin_server_ts, e.stream_ordering"
" FROM event_search as es"
@ -373,8 +370,9 @@ class Porter(object):
)
while True:
def r(txn):
txn.execute(select, (forward_chunk, self.batch_size,))
txn.execute(select, (forward_chunk, self.batch_size))
rows = txn.fetchall()
headers = [column[0] for column in txn.description]
@ -402,18 +400,21 @@ class Porter(object):
else:
rows_dict.append(d)
txn.executemany(sql, [
(
row["event_id"],
row["room_id"],
row["key"],
row["sender"],
row["value"],
row["origin_server_ts"],
row["stream_ordering"],
)
for row in rows_dict
])
txn.executemany(
sql,
[
(
row["event_id"],
row["room_id"],
row["key"],
row["sender"],
row["value"],
row["origin_server_ts"],
row["stream_ordering"],
)
for row in rows_dict
],
)
self.postgres_store._simple_update_one_txn(
txn,
@ -437,7 +438,8 @@ class Porter(object):
def setup_db(self, db_config, database_engine):
db_conn = database_engine.module.connect(
**{
k: v for k, v in db_config.get("args", {}).items()
k: v
for k, v in db_config.get("args", {}).items()
if not k.startswith("cp_")
}
)
@ -450,13 +452,11 @@ class Porter(object):
def run(self):
try:
sqlite_db_pool = adbapi.ConnectionPool(
self.sqlite_config["name"],
**self.sqlite_config["args"]
self.sqlite_config["name"], **self.sqlite_config["args"]
)
postgres_db_pool = adbapi.ConnectionPool(
self.postgres_config["name"],
**self.postgres_config["args"]
self.postgres_config["name"], **self.postgres_config["args"]
)
sqlite_engine = create_engine(sqlite_config)
@ -465,9 +465,7 @@ class Porter(object):
self.sqlite_store = Store(sqlite_db_pool, sqlite_engine)
self.postgres_store = Store(postgres_db_pool, postgres_engine)
yield self.postgres_store.execute(
postgres_engine.check_database
)
yield self.postgres_store.execute(postgres_engine.check_database)
# Step 1. Set up databases.
self.progress.set_state("Preparing SQLite3")
@ -477,6 +475,7 @@ class Porter(object):
self.setup_db(postgres_config, postgres_engine)
self.progress.set_state("Creating port tables")
def create_port_table(txn):
txn.execute(
"CREATE TABLE IF NOT EXISTS port_from_sqlite3 ("
@ -501,10 +500,9 @@ class Porter(object):
)
try:
yield self.postgres_store.runInteraction(
"alter_table", alter_table
)
except Exception as e:
yield self.postgres_store.runInteraction("alter_table", alter_table)
except Exception:
# On Error Resume Next
pass
yield self.postgres_store.runInteraction(
@ -514,11 +512,7 @@ class Porter(object):
# Step 2. Get tables.
self.progress.set_state("Fetching tables")
sqlite_tables = yield self.sqlite_store._simple_select_onecol(
table="sqlite_master",
keyvalues={
"type": "table",
},
retcol="name",
table="sqlite_master", keyvalues={"type": "table"}, retcol="name"
)
postgres_tables = yield self.postgres_store._simple_select_onecol(
@ -545,18 +539,14 @@ class Porter(object):
# Step 4. Do the copying.
self.progress.set_state("Copying to postgres")
yield defer.gatherResults(
[
self.handle_table(*res)
for res in setup_res
],
consumeErrors=True,
[self.handle_table(*res) for res in setup_res], consumeErrors=True
)
# Step 5. Do final post-processing
yield self._setup_state_group_id_seq()
self.progress.done()
except:
except Exception:
global end_error_exec_info
end_error_exec_info = sys.exc_info()
logger.exception("")
@ -566,9 +556,7 @@ class Porter(object):
def _convert_rows(self, table, headers, rows):
bool_col_names = BOOLEAN_COLUMNS.get(table, [])
bool_cols = [
i for i, h in enumerate(headers) if h in bool_col_names
]
bool_cols = [i for i, h in enumerate(headers) if h in bool_col_names]
class BadValueException(Exception):
pass
@ -577,18 +565,21 @@ class Porter(object):
if j in bool_cols:
return bool(col)
elif isinstance(col, string_types) and "\0" in col:
logger.warn("DROPPING ROW: NUL value in table %s col %s: %r", table, headers[j], col)
raise BadValueException();
logger.warn(
"DROPPING ROW: NUL value in table %s col %s: %r",
table,
headers[j],
col,
)
raise BadValueException()
return col
outrows = []
for i, row in enumerate(rows):
try:
outrows.append(tuple(
conv(j, col)
for j, col in enumerate(row)
if j > 0
))
outrows.append(
tuple(conv(j, col) for j, col in enumerate(row) if j > 0)
)
except BadValueException:
pass
@ -616,9 +607,7 @@ class Porter(object):
return headers, [r for r in rows if r[ts_ind] < yesterday]
headers, rows = yield self.sqlite_store.runInteraction(
"select", r,
)
headers, rows = yield self.sqlite_store.runInteraction("select", r)
rows = self._convert_rows("sent_transactions", headers, rows)
@ -639,7 +628,7 @@ class Porter(object):
txn.execute(
"SELECT rowid FROM sent_transactions WHERE ts >= ?"
" ORDER BY rowid ASC LIMIT 1",
(yesterday,)
(yesterday,),
)
rows = txn.fetchall()
@ -657,21 +646,17 @@ class Porter(object):
"table_name": "sent_transactions",
"forward_rowid": next_chunk,
"backward_rowid": 0,
}
},
)
def get_sent_table_size(txn):
txn.execute(
"SELECT count(*) FROM sent_transactions"
" WHERE ts >= ?",
(yesterday,)
"SELECT count(*) FROM sent_transactions" " WHERE ts >= ?", (yesterday,)
)
size, = txn.fetchone()
return int(size)
remaining_count = yield self.sqlite_store.execute(
get_sent_table_size
)
remaining_count = yield self.sqlite_store.execute(get_sent_table_size)
total_count = remaining_count + inserted_rows
@ -680,13 +665,11 @@ class Porter(object):
@defer.inlineCallbacks
def _get_remaining_count_to_port(self, table, forward_chunk, backward_chunk):
frows = yield self.sqlite_store.execute_sql(
"SELECT count(*) FROM %s WHERE rowid >= ?" % (table,),
forward_chunk,
"SELECT count(*) FROM %s WHERE rowid >= ?" % (table,), forward_chunk
)
brows = yield self.sqlite_store.execute_sql(
"SELECT count(*) FROM %s WHERE rowid <= ?" % (table,),
backward_chunk,
"SELECT count(*) FROM %s WHERE rowid <= ?" % (table,), backward_chunk
)
defer.returnValue(frows[0][0] + brows[0][0])
@ -694,7 +677,7 @@ class Porter(object):
@defer.inlineCallbacks
def _get_already_ported_count(self, table):
rows = yield self.postgres_store.execute_sql(
"SELECT count(*) FROM %s" % (table,),
"SELECT count(*) FROM %s" % (table,)
)
defer.returnValue(rows[0][0])
@ -717,22 +700,21 @@ class Porter(object):
def _setup_state_group_id_seq(self):
def r(txn):
txn.execute("SELECT MAX(id) FROM state_groups")
next_id = txn.fetchone()[0]+1
txn.execute(
"ALTER SEQUENCE state_group_id_seq RESTART WITH %s",
(next_id,),
)
next_id = txn.fetchone()[0] + 1
txn.execute("ALTER SEQUENCE state_group_id_seq RESTART WITH %s", (next_id,))
return self.postgres_store.runInteraction("setup_state_group_id_seq", r)
##############################################
###### The following is simply UI stuff ######
# The following is simply UI stuff
##############################################
class Progress(object):
"""Used to report progress of the port
"""
def __init__(self):
self.tables = {}
@ -758,6 +740,7 @@ class Progress(object):
class CursesProgress(Progress):
"""Reports progress to a curses window
"""
def __init__(self, stdscr):
self.stdscr = stdscr
@ -801,7 +784,7 @@ class CursesProgress(Progress):
duration = int(now) - int(self.start_time)
minutes, seconds = divmod(duration, 60)
duration_str = '%02dm %02ds' % (minutes, seconds,)
duration_str = '%02dm %02ds' % (minutes, seconds)
if self.finished:
status = "Time spent: %s (Done!)" % (duration_str,)
@ -814,16 +797,12 @@ class CursesProgress(Progress):
est_remaining_str = '%02dm %02ds remaining' % divmod(est_remaining, 60)
else:
est_remaining_str = "Unknown"
status = (
"Time spent: %s (est. remaining: %s)"
% (duration_str, est_remaining_str,)
status = "Time spent: %s (est. remaining: %s)" % (
duration_str,
est_remaining_str,
)
self.stdscr.addstr(
0, 0,
status,
curses.A_BOLD,
)
self.stdscr.addstr(0, 0, status, curses.A_BOLD)
max_len = max([len(t) for t in self.tables.keys()])
@ -831,9 +810,7 @@ class CursesProgress(Progress):
middle_space = 1
items = self.tables.items()
items.sort(
key=lambda i: (i[1]["perc"], i[0]),
)
items.sort(key=lambda i: (i[1]["perc"], i[0]))
for i, (table, data) in enumerate(items):
if i + 2 >= rows:
@ -844,9 +821,7 @@ class CursesProgress(Progress):
color = curses.color_pair(2) if perc == 100 else curses.color_pair(1)
self.stdscr.addstr(
i + 2, left_margin + max_len - len(table),
table,
curses.A_BOLD | color,
i + 2, left_margin + max_len - len(table), table, curses.A_BOLD | color
)
size = 20
@ -857,15 +832,13 @@ class CursesProgress(Progress):
)
self.stdscr.addstr(
i + 2, left_margin + max_len + middle_space,
i + 2,
left_margin + max_len + middle_space,
"%s %3d%% (%d/%d)" % (progress, perc, data["num_done"], data["total"]),
)
if self.finished:
self.stdscr.addstr(
rows - 1, 0,
"Press any key to exit...",
)
self.stdscr.addstr(rows - 1, 0, "Press any key to exit...")
self.stdscr.refresh()
self.last_update = time.time()
@ -877,29 +850,25 @@ class CursesProgress(Progress):
def set_state(self, state):
self.stdscr.clear()
self.stdscr.addstr(
0, 0,
state + "...",
curses.A_BOLD,
)
self.stdscr.addstr(0, 0, state + "...", curses.A_BOLD)
self.stdscr.refresh()
class TerminalProgress(Progress):
"""Just prints progress to the terminal
"""
def update(self, table, num_done):
super(TerminalProgress, self).update(table, num_done)
data = self.tables[table]
print "%s: %d%% (%d/%d)" % (
table, data["perc"],
data["num_done"], data["total"],
print(
"%s: %d%% (%d/%d)" % (table, data["perc"], data["num_done"], data["total"])
)
def set_state(self, state):
print state + "..."
print(state + "...")
##############################################
@ -909,34 +878,38 @@ class TerminalProgress(Progress):
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="A script to port an existing synapse SQLite database to"
" a new PostgreSQL database."
" a new PostgreSQL database."
)
parser.add_argument("-v", action='store_true')
parser.add_argument(
"--sqlite-database", required=True,
"--sqlite-database",
required=True,
help="The snapshot of the SQLite database file. This must not be"
" currently used by a running synapse server"
" currently used by a running synapse server",
)
parser.add_argument(
"--postgres-config", type=argparse.FileType('r'), required=True,
help="The database config file for the PostgreSQL database"
"--postgres-config",
type=argparse.FileType('r'),
required=True,
help="The database config file for the PostgreSQL database",
)
parser.add_argument(
"--curses", action='store_true',
help="display a curses based progress UI"
"--curses", action='store_true', help="display a curses based progress UI"
)
parser.add_argument(
"--batch-size", type=int, default=1000,
"--batch-size",
type=int,
default=1000,
help="The number of rows to select from the SQLite table each"
" iteration [default=1000]",
" iteration [default=1000]",
)
args = parser.parse_args()
logging_config = {
"level": logging.DEBUG if args.v else logging.INFO,
"format": "%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(message)s"
"format": "%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(message)s",
}
if args.curses:

View file

@ -14,17 +14,16 @@ ignore =
pylint.cfg
tox.ini
[pep8]
max-line-length = 90
# W503 requires that binary operators be at the end, not start, of lines. Erik
# doesn't like it. E203 is contrary to PEP8. E731 is silly.
ignore = W503,E203,E731
[flake8]
# note that flake8 inherits the "ignore" settings from "pep8" (because it uses
# pep8 to do those checks), but not the "max-line-length" setting
max-line-length = 90
ignore=W503,E203,E731
# see https://pycodestyle.readthedocs.io/en/latest/intro.html#error-codes
# for error codes. The ones we ignore are:
# W503: line break before binary operator
# W504: line break after binary operator
# E203: whitespace before ':' (which is contrary to pep8?)
# E731: do not assign a lambda expression, use a def
ignore=W503,W504,E203,E731
[isort]
line_length = 89

View file

@ -1,6 +1,8 @@
#!/usr/bin/env python
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2014-2017 OpenMarket Ltd
# Copyright 2017 Vector Creations Ltd
# Copyright 2017-2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -86,7 +88,7 @@ setup(
name="matrix-synapse",
version=version,
packages=find_packages(exclude=["tests", "tests.*"]),
description="Reference Synapse Home Server",
description="Reference homeserver for the Matrix decentralised comms protocol",
install_requires=dependencies['requirements'](include_conditional=True).keys(),
dependency_links=dependencies["DEPENDENCY_LINKS"].values(),
include_package_data=True,

View file

@ -27,4 +27,4 @@ try:
except ImportError:
pass
__version__ = "0.33.6"
__version__ = "0.33.7"

View file

@ -59,6 +59,7 @@ class Codes(object):
RESOURCE_LIMIT_EXCEEDED = "M_RESOURCE_LIMIT_EXCEEDED"
UNSUPPORTED_ROOM_VERSION = "M_UNSUPPORTED_ROOM_VERSION"
INCOMPATIBLE_ROOM_VERSION = "M_INCOMPATIBLE_ROOM_VERSION"
WRONG_ROOM_KEYS_VERSION = "M_WRONG_ROOM_KEYS_VERSION"
class CodeMessageException(RuntimeError):
@ -312,6 +313,20 @@ class LimitExceededError(SynapseError):
)
class RoomKeysVersionError(SynapseError):
"""A client has tried to upload to a non-current version of the room_keys store
"""
def __init__(self, current_version):
"""
Args:
current_version (str): the current version of the store they should have used
"""
super(RoomKeysVersionError, self).__init__(
403, "Wrong room_keys version", Codes.WRONG_ROOM_KEYS_VERSION
)
self.current_version = current_version
class IncompatibleRoomVersionError(SynapseError):
"""A server is trying to join a room whose version it does not support."""

View file

@ -172,7 +172,10 @@ USER_FILTER_SCHEMA = {
# events a lot easier as we can then use a negative lookbehind
# assertion to split '\.' If we allowed \\ then it would
# incorrectly split '\\.' See synapse.events.utils.serialize_event
"pattern": "^((?!\\\).)*$"
#
# Note that because this is a regular expression, we have to escape
# each backslash in the pattern.
"pattern": r"^((?!\\\\).)*$"
}
}
},

View file

@ -17,6 +17,7 @@ import gc
import logging
import sys
import psutil
from daemonize import Daemonize
from twisted.internet import error, reactor
@ -24,12 +25,6 @@ from twisted.internet import error, reactor
from synapse.util import PreserveLoggingContext
from synapse.util.rlimit import change_resource_limit
try:
import affinity
except Exception:
affinity = None
logger = logging.getLogger(__name__)
@ -89,15 +84,20 @@ def start_reactor(
with PreserveLoggingContext():
logger.info("Running")
if cpu_affinity is not None:
if not affinity:
quit_with_error(
"Missing package 'affinity' required for cpu_affinity\n"
"option\n\n"
"Install by running:\n\n"
" pip install affinity\n\n"
)
logger.info("Setting CPU affinity to %s" % cpu_affinity)
affinity.set_process_affinity_mask(0, cpu_affinity)
# Turn the bitmask into bits, reverse it so we go from 0 up
mask_to_bits = bin(cpu_affinity)[2:][::-1]
cpus = []
cpu_num = 0
for i in mask_to_bits:
if i == "1":
cpus.append(cpu_num)
cpu_num += 1
p = psutil.Process()
p.cpu_affinity(cpus)
change_resource_limit(soft_file_limit)
if gc_thresholds:
gc.set_threshold(*gc_thresholds)

View file

@ -178,6 +178,9 @@ def start(config_options):
setup_logging(config, use_worker_options=True)
# This should only be done on the user directory worker or the master
config.update_user_directory = False
events.USE_FROZEN_DICTS = config.use_frozen_dicts
database_engine = create_engine(config.database_config)

View file

@ -68,7 +68,7 @@ class PresenceStatusStubServlet(ClientV1RestServlet):
"Authorization": auth_headers,
}
result = yield self.http_client.get_json(
self.main_uri + request.uri,
self.main_uri + request.uri.decode('ascii'),
headers=headers,
)
defer.returnValue((200, result))
@ -125,7 +125,7 @@ class KeyUploadServlet(RestServlet):
"Authorization": auth_headers,
}
result = yield self.http_client.post_json_get_json(
self.main_uri + request.uri,
self.main_uri + request.uri.decode('ascii'),
body,
headers=headers,
)

View file

@ -20,6 +20,7 @@ import sys
from six import iteritems
import psutil
from prometheus_client import Gauge
from twisted.application import service
@ -502,7 +503,6 @@ def run(hs):
def performance_stats_init():
try:
import psutil
process = psutil.Process()
# Ensure we can fetch both, and make the initial request for cpu_percent
# so the next request will use this as the initial point.
@ -510,12 +510,9 @@ def run(hs):
process.cpu_percent(interval=None)
logger.info("report_stats can use psutil")
stats_process.append(process)
except (ImportError, AttributeError):
logger.warn(
"report_stats enabled but psutil is not installed or incorrect version."
" Disabling reporting of memory/cpu stats."
" Ensuring psutil is available will help matrix.org track performance"
" changes across releases."
except (AttributeError):
logger.warning(
"Unable to read memory/cpu stats. Disabling reporting."
)
def generate_user_daily_visit_stats():
@ -530,10 +527,13 @@ def run(hs):
clock.looping_call(generate_user_daily_visit_stats, 5 * 60 * 1000)
# monthly active user limiting functionality
clock.looping_call(
hs.get_datastore().reap_monthly_active_users, 1000 * 60 * 60
)
hs.get_datastore().reap_monthly_active_users()
def reap_monthly_active_users():
return run_as_background_process(
"reap_monthly_active_users",
hs.get_datastore().reap_monthly_active_users,
)
clock.looping_call(reap_monthly_active_users, 1000 * 60 * 60)
reap_monthly_active_users()
@defer.inlineCallbacks
def generate_monthly_active_users():
@ -547,12 +547,23 @@ def run(hs):
registered_reserved_users_mau_gauge.set(float(reserved_count))
max_mau_gauge.set(float(hs.config.max_mau_value))
hs.get_datastore().initialise_reserved_users(
hs.config.mau_limits_reserved_threepids
def start_generate_monthly_active_users():
return run_as_background_process(
"generate_monthly_active_users",
generate_monthly_active_users,
)
# XXX is this really supposed to be a background process? it looks
# like it needs to complete before some of the other stuff runs.
run_as_background_process(
"initialise_reserved_users",
hs.get_datastore().initialise_reserved_users,
hs.config.mau_limits_reserved_threepids,
)
generate_monthly_active_users()
start_generate_monthly_active_users()
if hs.config.limit_usage_by_mau:
clock.looping_call(generate_monthly_active_users, 5 * 60 * 1000)
clock.looping_call(start_generate_monthly_active_users, 5 * 60 * 1000)
# End of monthly active user settings
if hs.config.report_stats:
@ -568,7 +579,7 @@ def run(hs):
clock.call_later(5 * 60, start_phone_stats_home)
if hs.config.daemonize and hs.config.print_pidfile:
print (hs.config.pid_file)
print(hs.config.pid_file)
_base.start_reactor(
"synapse-homeserver",

View file

@ -28,6 +28,7 @@ from synapse.config.logger import setup_logging
from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage._base import __func__
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.pushers import SlavedPusherStore
@ -49,31 +50,31 @@ class PusherSlaveStore(
SlavedAccountDataStore
):
update_pusher_last_stream_ordering_and_success = (
DataStore.update_pusher_last_stream_ordering_and_success.__func__
__func__(DataStore.update_pusher_last_stream_ordering_and_success)
)
update_pusher_failing_since = (
DataStore.update_pusher_failing_since.__func__
__func__(DataStore.update_pusher_failing_since)
)
update_pusher_last_stream_ordering = (
DataStore.update_pusher_last_stream_ordering.__func__
__func__(DataStore.update_pusher_last_stream_ordering)
)
get_throttle_params_by_room = (
DataStore.get_throttle_params_by_room.__func__
__func__(DataStore.get_throttle_params_by_room)
)
set_throttle_params = (
DataStore.set_throttle_params.__func__
__func__(DataStore.set_throttle_params)
)
get_time_of_last_push_action_before = (
DataStore.get_time_of_last_push_action_before.__func__
__func__(DataStore.get_time_of_last_push_action_before)
)
get_profile_displayname = (
DataStore.get_profile_displayname.__func__
__func__(DataStore.get_profile_displayname)
)
@ -160,11 +161,11 @@ class PusherReplicationHandler(ReplicationClientHandler):
else:
yield self.start_pusher(row.user_id, row.app_id, row.pushkey)
elif stream_name == "events":
self.pusher_pool.on_new_notifications(
yield self.pusher_pool.on_new_notifications(
token, token,
)
elif stream_name == "receipts":
self.pusher_pool.on_new_receipts(
yield self.pusher_pool.on_new_receipts(
token, token, set(row.room_id for row in rows)
)
except Exception:
@ -182,7 +183,7 @@ class PusherReplicationHandler(ReplicationClientHandler):
def start_pusher(self, user_id, app_id, pushkey):
key = "%s:%s" % (app_id, pushkey)
logger.info("Starting pusher %r / %r", user_id, key)
return self.pusher_pool._refresh_pusher(app_id, pushkey, user_id)
return self.pusher_pool.start_pusher_by_id(app_id, pushkey, user_id)
def start(config_options):

View file

@ -33,7 +33,7 @@ from synapse.http.server import JsonResource
from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage._base import BaseSlavedStore, __func__
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
@ -147,7 +147,7 @@ class SynchrotronPresence(object):
and haven't come back yet. If there are poke the master about them.
"""
now = self.clock.time_msec()
for user_id, last_sync_ms in self.users_going_offline.items():
for user_id, last_sync_ms in list(self.users_going_offline.items()):
if now - last_sync_ms > 10 * 1000:
self.users_going_offline.pop(user_id, None)
self.send_user_sync(user_id, False, last_sync_ms)
@ -156,9 +156,9 @@ class SynchrotronPresence(object):
# TODO Hows this supposed to work?
pass
get_states = PresenceHandler.get_states.__func__
get_state = PresenceHandler.get_state.__func__
current_state_for_users = PresenceHandler.current_state_for_users.__func__
get_states = __func__(PresenceHandler.get_states)
get_state = __func__(PresenceHandler.get_state)
current_state_for_users = __func__(PresenceHandler.current_state_for_users)
def user_syncing(self, user_id, affect_presence):
if affect_presence:
@ -208,7 +208,7 @@ class SynchrotronPresence(object):
) for row in rows]
for state in states:
self.user_to_current_state[row.user_id] = state
self.user_to_current_state[state.user_id] = state
stream_id = token
yield self.notify_from_replication(states, stream_id)

View file

@ -28,7 +28,7 @@ if __name__ == "__main__":
sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
print (getattr(config, key))
print(getattr(config, key))
sys.exit(0)
else:
sys.stderr.write("Unknown command %r\n" % (action,))

View file

@ -106,10 +106,7 @@ class Config(object):
@classmethod
def check_file(cls, file_path, config_name):
if file_path is None:
raise ConfigError(
"Missing config for %s."
% (config_name,)
)
raise ConfigError("Missing config for %s." % (config_name,))
try:
os.stat(file_path)
except OSError as e:
@ -128,9 +125,7 @@ class Config(object):
if e.errno != errno.EEXIST:
raise
if not os.path.isdir(dir_path):
raise ConfigError(
"%s is not a directory" % (dir_path,)
)
raise ConfigError("%s is not a directory" % (dir_path,))
return dir_path
@classmethod
@ -156,21 +151,20 @@ class Config(object):
return results
def generate_config(
self,
config_dir_path,
server_name,
is_generating_file,
report_stats=None,
self, config_dir_path, server_name, is_generating_file, report_stats=None
):
default_config = "# vim:ft=yaml\n"
default_config += "\n\n".join(dedent(conf) for conf in self.invoke_all(
"default_config",
config_dir_path=config_dir_path,
server_name=server_name,
is_generating_file=is_generating_file,
report_stats=report_stats,
))
default_config += "\n\n".join(
dedent(conf)
for conf in self.invoke_all(
"default_config",
config_dir_path=config_dir_path,
server_name=server_name,
is_generating_file=is_generating_file,
report_stats=report_stats,
)
)
config = yaml.load(default_config)
@ -178,23 +172,22 @@ class Config(object):
@classmethod
def load_config(cls, description, argv):
config_parser = argparse.ArgumentParser(
description=description,
)
config_parser = argparse.ArgumentParser(description=description)
config_parser.add_argument(
"-c", "--config-path",
"-c",
"--config-path",
action="append",
metavar="CONFIG_FILE",
help="Specify config file. Can be given multiple times and"
" may specify directories containing *.yaml files."
" may specify directories containing *.yaml files.",
)
config_parser.add_argument(
"--keys-directory",
metavar="DIRECTORY",
help="Where files such as certs and signing keys are stored when"
" their location is given explicitly in the config."
" Defaults to the directory containing the last config file",
" their location is given explicitly in the config."
" Defaults to the directory containing the last config file",
)
config_args = config_parser.parse_args(argv)
@ -203,9 +196,7 @@ class Config(object):
obj = cls()
obj.read_config_files(
config_files,
keys_directory=config_args.keys_directory,
generate_keys=False,
config_files, keys_directory=config_args.keys_directory, generate_keys=False
)
return obj
@ -213,38 +204,38 @@ class Config(object):
def load_or_generate_config(cls, description, argv):
config_parser = argparse.ArgumentParser(add_help=False)
config_parser.add_argument(
"-c", "--config-path",
"-c",
"--config-path",
action="append",
metavar="CONFIG_FILE",
help="Specify config file. Can be given multiple times and"
" may specify directories containing *.yaml files."
" may specify directories containing *.yaml files.",
)
config_parser.add_argument(
"--generate-config",
action="store_true",
help="Generate a config file for the server name"
help="Generate a config file for the server name",
)
config_parser.add_argument(
"--report-stats",
action="store",
help="Whether the generated config reports anonymized usage statistics",
choices=["yes", "no"]
choices=["yes", "no"],
)
config_parser.add_argument(
"--generate-keys",
action="store_true",
help="Generate any missing key files then exit"
help="Generate any missing key files then exit",
)
config_parser.add_argument(
"--keys-directory",
metavar="DIRECTORY",
help="Used with 'generate-*' options to specify where files such as"
" certs and signing keys should be stored in, unless explicitly"
" specified in the config."
" certs and signing keys should be stored in, unless explicitly"
" specified in the config.",
)
config_parser.add_argument(
"-H", "--server-name",
help="The server name to generate a config file for"
"-H", "--server-name", help="The server name to generate a config file for"
)
config_args, remaining_args = config_parser.parse_known_args(argv)
@ -257,8 +248,8 @@ class Config(object):
if config_args.generate_config:
if config_args.report_stats is None:
config_parser.error(
"Please specify either --report-stats=yes or --report-stats=no\n\n" +
MISSING_REPORT_STATS_SPIEL
"Please specify either --report-stats=yes or --report-stats=no\n\n"
+ MISSING_REPORT_STATS_SPIEL
)
if not config_files:
config_parser.error(
@ -287,26 +278,32 @@ class Config(object):
config_dir_path=config_dir_path,
server_name=server_name,
report_stats=(config_args.report_stats == "yes"),
is_generating_file=True
is_generating_file=True,
)
obj.invoke_all("generate_files", config)
config_file.write(config_str)
print((
"A config file has been generated in %r for server name"
" %r with corresponding SSL keys and self-signed"
" certificates. Please review this file and customise it"
" to your needs."
) % (config_path, server_name))
print(
(
"A config file has been generated in %r for server name"
" %r with corresponding SSL keys and self-signed"
" certificates. Please review this file and customise it"
" to your needs."
)
% (config_path, server_name)
)
print(
"If this server name is incorrect, you will need to"
" regenerate the SSL certificates"
)
return
else:
print((
"Config file %r already exists. Generating any missing key"
" files."
) % (config_path,))
print(
(
"Config file %r already exists. Generating any missing key"
" files."
)
% (config_path,)
)
generate_keys = True
parser = argparse.ArgumentParser(
@ -338,8 +335,7 @@ class Config(object):
return obj
def read_config_files(self, config_files, keys_directory=None,
generate_keys=False):
def read_config_files(self, config_files, keys_directory=None, generate_keys=False):
if not keys_directory:
keys_directory = os.path.dirname(config_files[-1])
@ -364,8 +360,9 @@ class Config(object):
if "report_stats" not in config:
raise ConfigError(
MISSING_REPORT_STATS_CONFIG_INSTRUCTIONS + "\n" +
MISSING_REPORT_STATS_SPIEL
MISSING_REPORT_STATS_CONFIG_INSTRUCTIONS
+ "\n"
+ MISSING_REPORT_STATS_SPIEL
)
if generate_keys:
@ -399,16 +396,16 @@ def find_config_files(search_paths):
for entry in os.listdir(config_path):
entry_path = os.path.join(config_path, entry)
if not os.path.isfile(entry_path):
print (
"Found subdirectory in config directory: %r. IGNORING."
) % (entry_path, )
err = "Found subdirectory in config directory: %r. IGNORING."
print(err % (entry_path,))
continue
if not entry.endswith(".yaml"):
print (
"Found file in config directory that does not"
" end in '.yaml': %r. IGNORING."
) % (entry_path, )
err = (
"Found file in config directory that does not end in "
"'.yaml': %r. IGNORING."
)
print(err % (entry_path,))
continue
files.append(entry_path)

View file

@ -13,10 +13,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
# This file can't be called email.py because if it is, we cannot:
import email.utils
import logging
import os
from ._base import Config
import pkg_resources
from ._base import Config, ConfigError
logger = logging.getLogger(__name__)
class EmailConfig(Config):
@ -38,7 +46,6 @@ class EmailConfig(Config):
"smtp_host",
"smtp_port",
"notif_from",
"template_dir",
"notif_template_html",
"notif_template_text",
]
@ -62,9 +69,26 @@ class EmailConfig(Config):
self.email_smtp_host = email_config["smtp_host"]
self.email_smtp_port = email_config["smtp_port"]
self.email_notif_from = email_config["notif_from"]
self.email_template_dir = email_config["template_dir"]
self.email_notif_template_html = email_config["notif_template_html"]
self.email_notif_template_text = email_config["notif_template_text"]
template_dir = email_config.get("template_dir")
# we need an absolute path, because we change directory after starting (and
# we don't yet know what auxilliary templates like mail.css we will need).
# (Note that loading as package_resources with jinja.PackageLoader doesn't
# work for the same reason.)
if not template_dir:
template_dir = pkg_resources.resource_filename(
'synapse', 'res/templates'
)
template_dir = os.path.abspath(template_dir)
for f in self.email_notif_template_text, self.email_notif_template_html:
p = os.path.join(template_dir, f)
if not os.path.isfile(p):
raise ConfigError("Unable to find email template file %s" % (p, ))
self.email_template_dir = template_dir
self.email_notif_for_new_users = email_config.get(
"notif_for_new_users", True
)
@ -113,7 +137,9 @@ class EmailConfig(Config):
# require_transport_security: False
# notif_from: "Your Friendly %(app)s Home Server <noreply@example.com>"
# app_name: Matrix
# template_dir: res/templates
# # if template_dir is unset, uses the example templates that are part of
# # the Synapse distribution.
# #template_dir: res/templates
# notif_template_html: notif_mail.html
# notif_template_text: notif_mail.txt
# notif_for_new_users: True

View file

@ -178,7 +178,7 @@ class ContentRepositoryConfig(Config):
def default_config(self, **kwargs):
media_store = self.default_path("media_store")
uploads_path = self.default_path("uploads")
return """
return r"""
# Directory where uploaded images and attachments are stored.
media_store_path: "%(media_store)s"

View file

@ -55,7 +55,7 @@ def fetch_server_key(server_name, tls_client_options_factory, path=KEY_API_V1):
raise IOError("Cannot get key for %r" % server_name)
except (ConnectError, DomainError) as e:
logger.warn("Error getting key for %r: %s", server_name, e)
except Exception as e:
except Exception:
logger.exception("Error getting key for %r", server_name)
raise IOError("Cannot get key for %r" % server_name)

View file

@ -690,7 +690,7 @@ def auth_types_for_event(event):
auth_types = []
auth_types.append((EventTypes.PowerLevels, "", ))
auth_types.append((EventTypes.Member, event.user_id, ))
auth_types.append((EventTypes.Member, event.sender, ))
auth_types.append((EventTypes.Create, "", ))
if event.type == EventTypes.Member:

View file

@ -507,19 +507,19 @@ class FederationServer(FederationBase):
@defer.inlineCallbacks
@log_function
def on_get_missing_events(self, origin, room_id, earliest_events,
latest_events, limit, min_depth):
latest_events, limit):
with (yield self._server_linearizer.queue((origin, room_id))):
origin_host, _ = parse_server_name(origin)
yield self.check_server_matches_acl(origin_host, room_id)
logger.info(
"on_get_missing_events: earliest_events: %r, latest_events: %r,"
" limit: %d, min_depth: %d",
earliest_events, latest_events, limit, min_depth
" limit: %d",
earliest_events, latest_events, limit,
)
missing_events = yield self.handler.on_get_missing_events(
origin, room_id, earliest_events, latest_events, limit, min_depth
origin, room_id, earliest_events, latest_events, limit,
)
if len(missing_events) < 5:
@ -800,7 +800,7 @@ class FederationHandlerRegistry(object):
yield handler(origin, content)
except SynapseError as e:
logger.info("Failed to handle edu %r: %r", edu_type, e)
except Exception as e:
except Exception:
logger.exception("Failed to handle edu %r", edu_type)
def on_query(self, query_type, args):

View file

@ -633,14 +633,6 @@ class TransactionQueue(object):
transaction, json_data_cb
)
code = 200
if response:
for e_id, r in response.get("pdus", {}).items():
if "error" in r:
logger.warn(
"Transaction returned error for %s: %s",
e_id, r,
)
except HttpResponseException as e:
code = e.code
response = e.response
@ -657,19 +649,24 @@ class TransactionQueue(object):
destination, txn_id, code
)
logger.debug("TX [%s] Sent transaction", destination)
logger.debug("TX [%s] Marking as delivered...", destination)
yield self.transaction_actions.delivered(
transaction, code, response
)
logger.debug("TX [%s] Marked as delivered", destination)
logger.debug("TX [%s] {%s} Marked as delivered", destination, txn_id)
if code != 200:
if code == 200:
for e_id, r in response.get("pdus", {}).items():
if "error" in r:
logger.warn(
"TX [%s] {%s} Remote returned error for %s: %s",
destination, txn_id, e_id, r,
)
else:
for p in pdus:
logger.info(
"Failed to send event %s to %s", p.event_id, destination
logger.warn(
"TX [%s] {%s} Failed to send event %s",
destination, txn_id, p.event_id,
)
success = False

View file

@ -143,9 +143,17 @@ class TransportLayerClient(object):
transaction (Transaction)
Returns:
Deferred: Results of the deferred is a tuple in the form of
(response_code, response_body) where the response_body is a
python dict decoded from json
Deferred: Succeeds when we get a 2xx HTTP response. The result
will be the decoded JSON body.
Fails with ``HTTPRequestException`` if we get an HTTP response
code >= 300.
Fails with ``NotRetryingDestination`` if we are not yet ready
to retry this server.
Fails with ``FederationDeniedError`` if this destination
is not on our federation whitelist
"""
logger.debug(
"send_data dest=%s, txid=%s",
@ -170,11 +178,6 @@ class TransportLayerClient(object):
backoff_on_404=True, # If we get a 404 the other side has gone
)
logger.debug(
"send_data dest=%s, txid=%s, got response: 200",
transaction.destination, transaction.transaction_id,
)
defer.returnValue(response)
@defer.inlineCallbacks

View file

@ -560,7 +560,6 @@ class FederationGetMissingEventsServlet(BaseFederationServlet):
@defer.inlineCallbacks
def on_POST(self, origin, content, query, room_id):
limit = int(content.get("limit", 10))
min_depth = int(content.get("min_depth", 0))
earliest_events = content.get("earliest_events", [])
latest_events = content.get("latest_events", [])
@ -569,7 +568,6 @@ class FederationGetMissingEventsServlet(BaseFederationServlet):
room_id=room_id,
earliest_events=earliest_events,
latest_events=latest_events,
min_depth=min_depth,
limit=limit,
)

View file

@ -28,6 +28,7 @@ from synapse.metrics import (
event_processing_loop_room_count,
)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util import log_failure
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.util.metrics import Measure
@ -36,17 +37,6 @@ logger = logging.getLogger(__name__)
events_processed_counter = Counter("synapse_handlers_appservice_events_processed", "")
def log_failure(failure):
logger.error(
"Application Services Failure",
exc_info=(
failure.type,
failure.value,
failure.getTracebackObject()
)
)
class ApplicationServicesHandler(object):
def __init__(self, hs):
@ -112,7 +102,10 @@ class ApplicationServicesHandler(object):
if not self.started_scheduler:
def start_scheduler():
return self.scheduler.start().addErrback(log_failure)
return self.scheduler.start().addErrback(
log_failure, "Application Services Failure",
)
run_as_background_process("as_scheduler", start_scheduler)
self.started_scheduler = True

View file

@ -22,7 +22,7 @@ import bcrypt
import pymacaroons
from canonicaljson import json
from twisted.internet import defer, threads
from twisted.internet import defer
from twisted.web.client import PartialDownloadError
import synapse.util.stringutils as stringutils
@ -37,8 +37,8 @@ from synapse.api.errors import (
)
from synapse.module_api import ModuleApi
from synapse.types import UserID
from synapse.util import logcontext
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.logcontext import make_deferred_yieldable
from ._base import BaseHandler
@ -884,11 +884,7 @@ class AuthHandler(BaseHandler):
bcrypt.gensalt(self.bcrypt_rounds),
).decode('ascii')
return make_deferred_yieldable(
threads.deferToThreadPool(
self.hs.get_reactor(), self.hs.get_reactor().getThreadPool(), _do_hash
),
)
return logcontext.defer_to_thread(self.hs.get_reactor(), _do_hash)
def validate_hash(self, password, stored_hash):
"""Validates that self.hash(password) == stored_hash.
@ -913,13 +909,7 @@ class AuthHandler(BaseHandler):
if not isinstance(stored_hash, bytes):
stored_hash = stored_hash.encode('ascii')
return make_deferred_yieldable(
threads.deferToThreadPool(
self.hs.get_reactor(),
self.hs.get_reactor().getThreadPool(),
_do_validate_hash,
),
)
return logcontext.defer_to_thread(self.hs.get_reactor(), _do_validate_hash)
else:
return defer.succeed(False)

View file

@ -17,8 +17,8 @@ import logging
from twisted.internet import defer
from synapse.api.errors import SynapseError
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import UserID, create_requester
from synapse.util.logcontext import run_in_background
from ._base import BaseHandler
@ -121,7 +121,7 @@ class DeactivateAccountHandler(BaseHandler):
None
"""
if not self._user_parter_running:
run_in_background(self._user_parter_loop)
run_as_background_process("user_parter_loop", self._user_parter_loop)
@defer.inlineCallbacks
def _user_parter_loop(self):

View file

@ -80,42 +80,60 @@ class DirectoryHandler(BaseHandler):
)
@defer.inlineCallbacks
def create_association(self, user_id, room_alias, room_id, servers=None):
# association creation for human users
# TODO(erikj): Do user auth.
def create_association(self, requester, room_alias, room_id, servers=None,
send_event=True):
"""Attempt to create a new alias
if not self.spam_checker.user_may_create_room_alias(user_id, room_alias):
raise SynapseError(
403, "This user is not permitted to create this alias",
)
Args:
requester (Requester)
room_alias (RoomAlias)
room_id (str)
servers (list[str]|None): List of servers that others servers
should try and join via
send_event (bool): Whether to send an updated m.room.aliases event
can_create = yield self.can_modify_alias(
room_alias,
user_id=user_id
)
if not can_create:
raise SynapseError(
400, "This alias is reserved by an application service.",
errcode=Codes.EXCLUSIVE
Returns:
Deferred
"""
user_id = requester.user.to_string()
service = requester.app_service
if service:
if not service.is_interested_in_alias(room_alias.to_string()):
raise SynapseError(
400, "This application service has not reserved"
" this kind of alias.", errcode=Codes.EXCLUSIVE
)
else:
if not self.spam_checker.user_may_create_room_alias(user_id, room_alias):
raise AuthError(
403, "This user is not permitted to create this alias",
)
can_create = yield self.can_modify_alias(
room_alias,
user_id=user_id
)
if not can_create:
raise AuthError(
400, "This alias is reserved by an application service.",
errcode=Codes.EXCLUSIVE
)
yield self._create_association(room_alias, room_id, servers, creator=user_id)
@defer.inlineCallbacks
def create_appservice_association(self, service, room_alias, room_id,
servers=None):
if not service.is_interested_in_alias(room_alias.to_string()):
raise SynapseError(
400, "This application service has not reserved"
" this kind of alias.", errcode=Codes.EXCLUSIVE
if send_event:
yield self.send_room_alias_update_event(
requester,
room_id
)
# association creation for app services
yield self._create_association(room_alias, room_id, servers)
@defer.inlineCallbacks
def delete_association(self, requester, user_id, room_alias):
def delete_association(self, requester, room_alias):
# association deletion for human users
user_id = requester.user.to_string()
try:
can_delete = yield self._user_can_delete_alias(room_alias, user_id)
except StoreError as e:
@ -143,7 +161,6 @@ class DirectoryHandler(BaseHandler):
try:
yield self.send_room_alias_update_event(
requester,
requester.user.to_string(),
room_id
)
@ -261,7 +278,7 @@ class DirectoryHandler(BaseHandler):
)
@defer.inlineCallbacks
def send_room_alias_update_event(self, requester, user_id, room_id):
def send_room_alias_update_event(self, requester, room_id):
aliases = yield self.store.get_aliases_for_room(room_id)
yield self.event_creation_handler.create_and_send_nonmember_event(
@ -270,7 +287,7 @@ class DirectoryHandler(BaseHandler):
"type": EventTypes.Aliases,
"state_key": self.hs.hostname,
"room_id": room_id,
"sender": user_id,
"sender": requester.user.to_string(),
"content": {"aliases": aliases},
},
ratelimit=False

View file

@ -0,0 +1,289 @@
# -*- coding: utf-8 -*-
# Copyright 2017, 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from six import iteritems
from twisted.internet import defer
from synapse.api.errors import RoomKeysVersionError, StoreError, SynapseError
from synapse.util.async_helpers import Linearizer
logger = logging.getLogger(__name__)
class E2eRoomKeysHandler(object):
"""
Implements an optional realtime backup mechanism for encrypted E2E megolm room keys.
This gives a way for users to store and recover their megolm keys if they lose all
their clients. It should also extend easily to future room key mechanisms.
The actual payload of the encrypted keys is completely opaque to the handler.
"""
def __init__(self, hs):
self.store = hs.get_datastore()
# Used to lock whenever a client is uploading key data. This prevents collisions
# between clients trying to upload the details of a new session, given all
# clients belonging to a user will receive and try to upload a new session at
# roughly the same time. Also used to lock out uploads when the key is being
# changed.
self._upload_linearizer = Linearizer("upload_room_keys_lock")
@defer.inlineCallbacks
def get_room_keys(self, user_id, version, room_id=None, session_id=None):
"""Bulk get the E2E room keys for a given backup, optionally filtered to a given
room, or a given session.
See EndToEndRoomKeyStore.get_e2e_room_keys for full details.
Args:
user_id(str): the user whose keys we're getting
version(str): the version ID of the backup we're getting keys from
room_id(string): room ID to get keys for, for None to get keys for all rooms
session_id(string): session ID to get keys for, for None to get keys for all
sessions
Returns:
A deferred list of dicts giving the session_data and message metadata for
these room keys.
"""
# we deliberately take the lock to get keys so that changing the version
# works atomically
with (yield self._upload_linearizer.queue(user_id)):
results = yield self.store.get_e2e_room_keys(
user_id, version, room_id, session_id
)
if results['rooms'] == {}:
raise SynapseError(404, "No room_keys found")
defer.returnValue(results)
@defer.inlineCallbacks
def delete_room_keys(self, user_id, version, room_id=None, session_id=None):
"""Bulk delete the E2E room keys for a given backup, optionally filtered to a given
room or a given session.
See EndToEndRoomKeyStore.delete_e2e_room_keys for full details.
Args:
user_id(str): the user whose backup we're deleting
version(str): the version ID of the backup we're deleting
room_id(string): room ID to delete keys for, for None to delete keys for all
rooms
session_id(string): session ID to delete keys for, for None to delete keys
for all sessions
Returns:
A deferred of the deletion transaction
"""
# lock for consistency with uploading
with (yield self._upload_linearizer.queue(user_id)):
yield self.store.delete_e2e_room_keys(user_id, version, room_id, session_id)
@defer.inlineCallbacks
def upload_room_keys(self, user_id, version, room_keys):
"""Bulk upload a list of room keys into a given backup version, asserting
that the given version is the current backup version. room_keys are merged
into the current backup as described in RoomKeysServlet.on_PUT().
Args:
user_id(str): the user whose backup we're setting
version(str): the version ID of the backup we're updating
room_keys(dict): a nested dict describing the room_keys we're setting:
{
"rooms": {
"!abc:matrix.org": {
"sessions": {
"c0ff33": {
"first_message_index": 1,
"forwarded_count": 1,
"is_verified": false,
"session_data": "SSBBTSBBIEZJU0gK"
}
}
}
}
}
Raises:
SynapseError: with code 404 if there are no versions defined
RoomKeysVersionError: if the uploaded version is not the current version
"""
# TODO: Validate the JSON to make sure it has the right keys.
# XXX: perhaps we should use a finer grained lock here?
with (yield self._upload_linearizer.queue(user_id)):
# Check that the version we're trying to upload is the current version
try:
version_info = yield self.store.get_e2e_room_keys_version_info(user_id)
except StoreError as e:
if e.code == 404:
raise SynapseError(404, "Version '%s' not found" % (version,))
else:
raise
if version_info['version'] != version:
# Check that the version we're trying to upload actually exists
try:
version_info = yield self.store.get_e2e_room_keys_version_info(
user_id, version,
)
# if we get this far, the version must exist
raise RoomKeysVersionError(current_version=version_info['version'])
except StoreError as e:
if e.code == 404:
raise SynapseError(404, "Version '%s' not found" % (version,))
else:
raise
# go through the room_keys.
# XXX: this should/could be done concurrently, given we're in a lock.
for room_id, room in iteritems(room_keys['rooms']):
for session_id, session in iteritems(room['sessions']):
yield self._upload_room_key(
user_id, version, room_id, session_id, session
)
@defer.inlineCallbacks
def _upload_room_key(self, user_id, version, room_id, session_id, room_key):
"""Upload a given room_key for a given room and session into a given
version of the backup. Merges the key with any which might already exist.
Args:
user_id(str): the user whose backup we're setting
version(str): the version ID of the backup we're updating
room_id(str): the ID of the room whose keys we're setting
session_id(str): the session whose room_key we're setting
room_key(dict): the room_key being set
"""
# get the room_key for this particular row
current_room_key = None
try:
current_room_key = yield self.store.get_e2e_room_key(
user_id, version, room_id, session_id
)
except StoreError as e:
if e.code == 404:
pass
else:
raise
if self._should_replace_room_key(current_room_key, room_key):
yield self.store.set_e2e_room_key(
user_id, version, room_id, session_id, room_key
)
@staticmethod
def _should_replace_room_key(current_room_key, room_key):
"""
Determine whether to replace a given current_room_key (if any)
with a newly uploaded room_key backup
Args:
current_room_key (dict): Optional, the current room_key dict if any
room_key (dict): The new room_key dict which may or may not be fit to
replace the current_room_key
Returns:
True if current_room_key should be replaced by room_key in the backup
"""
if current_room_key:
# spelt out with if/elifs rather than nested boolean expressions
# purely for legibility.
if room_key['is_verified'] and not current_room_key['is_verified']:
return True
elif (
room_key['first_message_index'] <
current_room_key['first_message_index']
):
return True
elif room_key['forwarded_count'] < current_room_key['forwarded_count']:
return True
else:
return False
return True
@defer.inlineCallbacks
def create_version(self, user_id, version_info):
"""Create a new backup version. This automatically becomes the new
backup version for the user's keys; previous backups will no longer be
writeable to.
Args:
user_id(str): the user whose backup version we're creating
version_info(dict): metadata about the new version being created
{
"algorithm": "m.megolm_backup.v1",
"auth_data": "dGhpcyBzaG91bGQgYWN0dWFsbHkgYmUgZW5jcnlwdGVkIGpzb24K"
}
Returns:
A deferred of a string that gives the new version number.
"""
# TODO: Validate the JSON to make sure it has the right keys.
# lock everyone out until we've switched version
with (yield self._upload_linearizer.queue(user_id)):
new_version = yield self.store.create_e2e_room_keys_version(
user_id, version_info
)
defer.returnValue(new_version)
@defer.inlineCallbacks
def get_version_info(self, user_id, version=None):
"""Get the info about a given version of the user's backup
Args:
user_id(str): the user whose current backup version we're querying
version(str): Optional; if None gives the most recent version
otherwise a historical one.
Raises:
StoreError: code 404 if the requested backup version doesn't exist
Returns:
A deferred of a info dict that gives the info about the new version.
{
"version": "1234",
"algorithm": "m.megolm_backup.v1",
"auth_data": "dGhpcyBzaG91bGQgYWN0dWFsbHkgYmUgZW5jcnlwdGVkIGpzb24K"
}
"""
with (yield self._upload_linearizer.queue(user_id)):
res = yield self.store.get_e2e_room_keys_version_info(user_id, version)
defer.returnValue(res)
@defer.inlineCallbacks
def delete_version(self, user_id, version=None):
"""Deletes a given version of the user's e2e_room_keys backup
Args:
user_id(str): the user whose current backup version we're deleting
version(str): the version id of the backup being deleted
Raises:
StoreError: code 404 if this backup version doesn't exist
"""
with (yield self._upload_linearizer.queue(user_id)):
yield self.store.delete_e2e_room_keys_version(user_id, version)

View file

@ -53,7 +53,7 @@ from synapse.replication.http.federation import (
ReplicationFederationSendEventsRestServlet,
)
from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet
from synapse.state import resolve_events_with_factory
from synapse.state import StateResolutionStore, resolve_events_with_store
from synapse.types import UserID, get_domain_from_id
from synapse.util import logcontext, unwrapFirstError
from synapse.util.async_helpers import Linearizer
@ -309,8 +309,8 @@ class FederationHandler(BaseHandler):
if sent_to_us_directly:
logger.warn(
"[%s %s] Failed to fetch %d prev events: rejecting",
room_id, event_id, len(prevs - seen),
"[%s %s] Rejecting: failed to fetch %d prev events: %s",
room_id, event_id, len(prevs - seen), shortstr(prevs - seen)
)
raise FederationError(
"ERROR",
@ -384,24 +384,24 @@ class FederationHandler(BaseHandler):
for x in remote_state:
event_map[x.event_id] = x
# Resolve any conflicting state
@defer.inlineCallbacks
def fetch(ev_ids):
fetched = yield self.store.get_events(
ev_ids, get_prev_content=False, check_redacted=False,
)
# add any events we fetch here to the `event_map` so that we
# can use them to build the state event list below.
event_map.update(fetched)
defer.returnValue(fetched)
room_version = yield self.store.get_room_version(room_id)
state_map = yield resolve_events_with_factory(
room_version, state_maps, event_map, fetch,
state_map = yield resolve_events_with_store(
room_version, state_maps, event_map,
state_res_store=StateResolutionStore(self.store),
)
# we need to give _process_received_pdu the actual state events
# We need to give _process_received_pdu the actual state events
# rather than event ids, so generate that now.
# First though we need to fetch all the events that are in
# state_map, so we can build up the state below.
evs = yield self.store.get_events(
list(state_map.values()),
get_prev_content=False,
check_redacted=False,
)
event_map.update(evs)
state = [
event_map[e] for e in six.itervalues(state_map)
]
@ -452,8 +452,8 @@ class FederationHandler(BaseHandler):
latest |= seen
logger.info(
"[%s %s]: Requesting %d prev_events: %s",
room_id, event_id, len(prevs - seen), shortstr(prevs - seen)
"[%s %s]: Requesting missing events between %s and %s",
room_id, event_id, shortstr(latest), event_id,
)
# XXX: we set timeout to 10s to help workaround
@ -1852,7 +1852,7 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
def on_get_missing_events(self, origin, room_id, earliest_events,
latest_events, limit, min_depth):
latest_events, limit):
in_room = yield self.auth.check_host_in_room(
room_id,
origin
@ -1861,14 +1861,12 @@ class FederationHandler(BaseHandler):
raise AuthError(403, "Host not in room.")
limit = min(limit, 20)
min_depth = max(min_depth, 0)
missing_events = yield self.store.get_missing_events(
room_id=room_id,
earliest_events=earliest_events,
latest_events=latest_events,
limit=limit,
min_depth=min_depth,
)
missing_events = yield filter_events_for_server(
@ -2522,7 +2520,7 @@ class FederationHandler(BaseHandler):
if not backfilled: # Never notify for backfilled events
for event, _ in event_and_contexts:
self._notify_persisted_event(event, max_stream_id)
yield self._notify_persisted_event(event, max_stream_id)
def _notify_persisted_event(self, event, max_stream_id):
"""Checks to see if notifier/pushers should be notified about the
@ -2555,7 +2553,7 @@ class FederationHandler(BaseHandler):
extra_users=extra_users
)
self.pusher_pool.on_new_notifications(
return self.pusher_pool.on_new_notifications(
event_stream_id, max_stream_id,
)

View file

@ -20,7 +20,7 @@ from six import iteritems
from twisted.internet import defer
from synapse.api.errors import SynapseError
from synapse.api.errors import HttpResponseException, SynapseError
from synapse.types import get_domain_from_id
logger = logging.getLogger(__name__)
@ -37,9 +37,23 @@ def _create_rerouter(func_name):
)
else:
destination = get_domain_from_id(group_id)
return getattr(self.transport_client, func_name)(
d = getattr(self.transport_client, func_name)(
destination, group_id, *args, **kwargs
)
# Capture errors returned by the remote homeserver and
# re-throw specific errors as SynapseErrors. This is so
# when the remote end responds with things like 403 Not
# In Group, we can communicate that to the client instead
# of a 500.
def h(failure):
failure.trap(HttpResponseException)
e = failure.value
if e.code == 403:
raise e.to_synapse_error()
return failure
d.addErrback(h)
return d
return f

View file

@ -779,7 +779,7 @@ class EventCreationHandler(object):
event, context=context
)
self.pusher_pool.on_new_notifications(
yield self.pusher_pool.on_new_notifications(
event_stream_id, max_stream_id,
)

View file

@ -119,7 +119,7 @@ class ReceiptsHandler(BaseHandler):
"receipt_key", max_batch_id, rooms=affected_room_ids
)
# Note that the min here shouldn't be relied upon to be accurate.
self.hs.get_pusherpool().on_new_receipts(
yield self.hs.get_pusherpool().on_new_receipts(
min_batch_id, max_batch_id, affected_room_ids,
)

View file

@ -190,10 +190,11 @@ class RoomCreationHandler(BaseHandler):
if room_alias:
directory_handler = self.hs.get_handlers().directory_handler
yield directory_handler.create_association(
user_id=user_id,
requester=requester,
room_id=room_id,
room_alias=room_alias,
servers=[self.hs.hostname],
send_event=False,
)
preset_config = config.get(
@ -289,7 +290,7 @@ class RoomCreationHandler(BaseHandler):
if room_alias:
result["room_alias"] = room_alias.to_string()
yield directory_handler.send_room_alias_update_event(
requester, user_id, room_id
requester, room_id
)
defer.returnValue(result)

View file

@ -16,7 +16,7 @@
import logging
from collections import namedtuple
from six import iteritems
from six import PY3, iteritems
from six.moves import range
import msgpack
@ -444,9 +444,16 @@ class RoomListNextBatch(namedtuple("RoomListNextBatch", (
@classmethod
def from_token(cls, token):
if PY3:
# The argument raw=False is only available on new versions of
# msgpack, and only really needed on Python 3. Gate it behind
# a PY3 check to avoid causing issues on Debian-packaged versions.
decoded = msgpack.loads(decode_base64(token), raw=False)
else:
decoded = msgpack.loads(decode_base64(token))
return RoomListNextBatch(**{
cls.REVERSE_KEY_DICT[key]: val
for key, val in msgpack.loads(decode_base64(token)).items()
for key, val in decoded.items()
})
def to_token(self):

View file

@ -20,6 +20,8 @@ import logging
from six import iteritems, itervalues
from prometheus_client import Counter
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
@ -36,6 +38,19 @@ from synapse.visibility import filter_events_for_client
logger = logging.getLogger(__name__)
# Counts the number of times we returned a non-empty sync. `type` is one of
# "initial_sync", "full_state_sync" or "incremental_sync", `lazy_loaded` is
# "true" or "false" depending on if the request asked for lazy loaded members or
# not.
non_empty_sync_counter = Counter(
"synapse_handlers_sync_nonempty_total",
"Count of non empty sync responses. type is initial_sync/full_state_sync"
"/incremental_sync. lazy_loaded indicates if lazy loaded members were "
"enabled for that request.",
["type", "lazy_loaded"],
)
# Store the cache that tracks which lazy-loaded members have been sent to a given
# client for no more than 30 minutes.
LAZY_LOADED_MEMBERS_CACHE_MAX_AGE = 30 * 60 * 1000
@ -227,14 +242,16 @@ class SyncHandler(object):
@defer.inlineCallbacks
def _wait_for_sync_for_user(self, sync_config, since_token, timeout,
full_state):
if since_token is None:
sync_type = "initial_sync"
elif full_state:
sync_type = "full_state_sync"
else:
sync_type = "incremental_sync"
context = LoggingContext.current_context()
if context:
if since_token is None:
context.tag = "initial_sync"
elif full_state:
context.tag = "full_state_sync"
else:
context.tag = "incremental_sync"
context.tag = sync_type
if timeout == 0 or since_token is None or full_state:
# we are going to return immediately, so don't bother calling
@ -242,7 +259,6 @@ class SyncHandler(object):
result = yield self.current_sync_for_user(
sync_config, since_token, full_state=full_state,
)
defer.returnValue(result)
else:
def current_sync_callback(before_token, after_token):
return self.current_sync_for_user(sync_config, since_token)
@ -251,7 +267,15 @@ class SyncHandler(object):
sync_config.user.to_string(), timeout, current_sync_callback,
from_token=since_token,
)
defer.returnValue(result)
if result:
if sync_config.filter_collection.lazy_load_members():
lazy_loaded = "true"
else:
lazy_loaded = "false"
non_empty_sync_counter.labels(sync_type, lazy_loaded).inc()
defer.returnValue(result)
def current_sync_for_user(self, sync_config, since_token=None,
full_state=False):

View file

@ -20,6 +20,7 @@ from six import iteritems
from twisted.internet import defer
from synapse.api.constants import EventTypes, JoinRules, Membership
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.roommember import ProfileInfo
from synapse.types import get_localpart_from_id
from synapse.util.metrics import Measure
@ -98,7 +99,6 @@ class UserDirectoryHandler(object):
"""
return self.store.search_user_dir(user_id, search_term, limit)
@defer.inlineCallbacks
def notify_new_event(self):
"""Called when there may be more deltas to process
"""
@ -108,11 +108,15 @@ class UserDirectoryHandler(object):
if self._is_processing:
return
@defer.inlineCallbacks
def process():
try:
yield self._unsafe_process()
finally:
self._is_processing = False
self._is_processing = True
try:
yield self._unsafe_process()
finally:
self._is_processing = False
run_as_background_process("user_directory.notify_new_event", process)
@defer.inlineCallbacks
def handle_local_profile_change(self, user_id, profile):

View file

@ -195,7 +195,7 @@ class MatrixFederationHttpClient(object):
)
self.clock = hs.get_clock()
self._store = hs.get_datastore()
self.version_string = hs.version_string.encode('ascii')
self.version_string_bytes = hs.version_string.encode('ascii')
self.default_timeout = 60
def schedule(x):
@ -230,7 +230,7 @@ class MatrixFederationHttpClient(object):
Returns:
Deferred: resolves with the http response object on success.
Fails with ``HTTPRequestException``: if we get an HTTP response
Fails with ``HttpResponseException``: if we get an HTTP response
code >= 300.
Fails with ``NotRetryingDestination`` if we are not yet ready
@ -261,8 +261,8 @@ class MatrixFederationHttpClient(object):
ignore_backoff=ignore_backoff,
)
method = request.method
destination = request.destination
method_bytes = request.method.encode("ascii")
destination_bytes = request.destination.encode("ascii")
path_bytes = request.path.encode("ascii")
if request.query:
query_bytes = encode_query_args(request.query)
@ -270,8 +270,8 @@ class MatrixFederationHttpClient(object):
query_bytes = b""
headers_dict = {
"User-Agent": [self.version_string],
"Host": [request.destination],
b"User-Agent": [self.version_string_bytes],
b"Host": [destination_bytes],
}
with limiter:
@ -282,50 +282,51 @@ class MatrixFederationHttpClient(object):
else:
retries_left = MAX_SHORT_RETRIES
url = urllib.parse.urlunparse((
b"matrix", destination.encode("ascii"),
url_bytes = urllib.parse.urlunparse((
b"matrix", destination_bytes,
path_bytes, None, query_bytes, b"",
)).decode('ascii')
))
url_str = url_bytes.decode('ascii')
http_url = urllib.parse.urlunparse((
url_to_sign_bytes = urllib.parse.urlunparse((
b"", b"",
path_bytes, None, query_bytes, b"",
)).decode('ascii')
))
while True:
try:
json = request.get_json()
if json:
data = encode_canonical_json(json)
headers_dict["Content-Type"] = ["application/json"]
headers_dict[b"Content-Type"] = [b"application/json"]
self.sign_request(
destination, method, http_url, headers_dict, json
destination_bytes, method_bytes, url_to_sign_bytes,
headers_dict, json,
)
else:
data = None
self.sign_request(destination, method, http_url, headers_dict)
logger.info(
"{%s} [%s] Sending request: %s %s",
request.txn_id, destination, method, url
)
if data:
data = encode_canonical_json(json)
producer = FileBodyProducer(
BytesIO(data),
cooperator=self._cooperator
cooperator=self._cooperator,
)
else:
producer = None
self.sign_request(
destination_bytes, method_bytes, url_to_sign_bytes,
headers_dict,
)
request_deferred = treq.request(
method,
url,
logger.info(
"{%s} [%s] Sending request: %s %s",
request.txn_id, request.destination, request.method,
url_str,
)
# we don't want all the fancy cookie and redirect handling that
# treq.request gives: just use the raw Agent.
request_deferred = self.agent.request(
method_bytes,
url_bytes,
headers=Headers(headers_dict),
data=producer,
agent=self.agent,
reactor=self.hs.get_reactor(),
unbuffered=True
bodyProducer=producer,
)
request_deferred = timeout_deferred(
@ -344,9 +345,9 @@ class MatrixFederationHttpClient(object):
logger.warn(
"{%s} [%s] Request failed: %s %s: %s",
request.txn_id,
destination,
method,
url,
request.destination,
request.method,
url_str,
_flatten_response_never_received(e),
)
@ -366,7 +367,7 @@ class MatrixFederationHttpClient(object):
logger.debug(
"{%s} [%s] Waiting %ss before re-sending...",
request.txn_id,
destination,
request.destination,
delay,
)
@ -378,7 +379,7 @@ class MatrixFederationHttpClient(object):
logger.info(
"{%s} [%s] Got response headers: %d %s",
request.txn_id,
destination,
request.destination,
response.code,
response.phrase.decode('ascii', errors='replace'),
)
@ -411,8 +412,9 @@ class MatrixFederationHttpClient(object):
destination_is must be non-None.
method (bytes): The HTTP method of the request
url_bytes (bytes): The URI path of the request
headers_dict (dict): Dictionary of request headers to append to
content (bytes): The body of the request
headers_dict (dict[bytes, list[bytes]]): Dictionary of request headers to
append to
content (object): The body of the request
destination_is (bytes): As 'destination', but if the destination is an
identity server
@ -478,7 +480,7 @@ class MatrixFederationHttpClient(object):
Deferred: Succeeds when we get a 2xx HTTP response. The result
will be the decoded JSON body.
Fails with ``HTTPRequestException`` if we get an HTTP response
Fails with ``HttpResponseException`` if we get an HTTP response
code >= 300.
Fails with ``NotRetryingDestination`` if we are not yet ready
@ -532,7 +534,7 @@ class MatrixFederationHttpClient(object):
Deferred: Succeeds when we get a 2xx HTTP response. The result
will be the decoded JSON body.
Fails with ``HTTPRequestException`` if we get an HTTP response
Fails with ``HttpResponseException`` if we get an HTTP response
code >= 300.
Fails with ``NotRetryingDestination`` if we are not yet ready
@ -587,7 +589,7 @@ class MatrixFederationHttpClient(object):
Deferred: Succeeds when we get a 2xx HTTP response. The result
will be the decoded JSON body.
Fails with ``HTTPRequestException`` if we get an HTTP response
Fails with ``HttpResponseException`` if we get an HTTP response
code >= 300.
Fails with ``NotRetryingDestination`` if we are not yet ready
@ -638,7 +640,7 @@ class MatrixFederationHttpClient(object):
Deferred: Succeeds when we get a 2xx HTTP response. The result
will be the decoded JSON body.
Fails with ``HTTPRequestException`` if we get an HTTP response
Fails with ``HttpResponseException`` if we get an HTTP response
code >= 300.
Fails with ``NotRetryingDestination`` if we are not yet ready
@ -682,7 +684,7 @@ class MatrixFederationHttpClient(object):
Deferred: resolves with an (int,dict) tuple of the file length and
a dict of the response headers.
Fails with ``HTTPRequestException`` if we get an HTTP response code
Fails with ``HttpResponseException`` if we get an HTTP response code
>= 300
Fails with ``NotRetryingDestination`` if we are not yet ready

View file

@ -39,7 +39,8 @@ outgoing_responses_counter = Counter(
)
response_timer = Histogram(
"synapse_http_server_response_time_seconds", "sec",
"synapse_http_server_response_time_seconds",
"sec",
["method", "servlet", "tag", "code"],
)
@ -79,15 +80,11 @@ response_size = Counter(
# than when the response was written.
in_flight_requests_ru_utime = Counter(
"synapse_http_server_in_flight_requests_ru_utime_seconds",
"",
["method", "servlet"],
"synapse_http_server_in_flight_requests_ru_utime_seconds", "", ["method", "servlet"]
)
in_flight_requests_ru_stime = Counter(
"synapse_http_server_in_flight_requests_ru_stime_seconds",
"",
["method", "servlet"],
"synapse_http_server_in_flight_requests_ru_stime_seconds", "", ["method", "servlet"]
)
in_flight_requests_db_txn_count = Counter(
@ -134,7 +131,7 @@ def _get_in_flight_counts():
# type
counts = {}
for rm in reqs:
key = (rm.method, rm.name,)
key = (rm.method, rm.name)
counts[key] = counts.get(key, 0) + 1
return counts
@ -175,7 +172,8 @@ class RequestMetrics(object):
if context != self.start_context:
logger.warn(
"Context have unexpectedly changed %r, %r",
context, self.start_context
context,
self.start_context,
)
return
@ -192,10 +190,10 @@ class RequestMetrics(object):
resource_usage = context.get_resource_usage()
response_ru_utime.labels(self.method, self.name, tag).inc(
resource_usage.ru_utime,
resource_usage.ru_utime
)
response_ru_stime.labels(self.method, self.name, tag).inc(
resource_usage.ru_stime,
resource_usage.ru_stime
)
response_db_txn_count.labels(self.method, self.name, tag).inc(
resource_usage.db_txn_count
@ -222,8 +220,15 @@ class RequestMetrics(object):
diff = new_stats - self._request_stats
self._request_stats = new_stats
in_flight_requests_ru_utime.labels(self.method, self.name).inc(diff.ru_utime)
in_flight_requests_ru_stime.labels(self.method, self.name).inc(diff.ru_stime)
# max() is used since rapid use of ru_stime/ru_utime can end up with the
# count going backwards due to NTP, time smearing, fine-grained
# correction, or floating points. Who knows, really?
in_flight_requests_ru_utime.labels(self.method, self.name).inc(
max(diff.ru_utime, 0)
)
in_flight_requests_ru_stime.labels(self.method, self.name).inc(
max(diff.ru_stime, 0)
)
in_flight_requests_db_txn_count.labels(self.method, self.name).inc(
diff.db_txn_count

View file

@ -186,9 +186,9 @@ class Notifier(object):
def count_listeners():
all_user_streams = set()
for x in self.room_to_user_streams.values():
for x in list(self.room_to_user_streams.values()):
all_user_streams |= x
for x in self.user_to_user_stream.values():
for x in list(self.user_to_user_stream.values()):
all_user_streams.add(x)
return sum(stream.count_listeners() for stream in all_user_streams)
@ -196,7 +196,7 @@ class Notifier(object):
LaterGauge(
"synapse_notifier_rooms", "", [],
lambda: count(bool, self.room_to_user_streams.values()),
lambda: count(bool, list(self.room_to_user_streams.values())),
)
LaterGauge(
"synapse_notifier_users", "", [],

View file

@ -18,8 +18,7 @@ import logging
from twisted.internet import defer
from twisted.internet.error import AlreadyCalled, AlreadyCancelled
from synapse.util.logcontext import LoggingContext
from synapse.util.metrics import Measure
from synapse.metrics.background_process_metrics import run_as_background_process
logger = logging.getLogger(__name__)
@ -71,18 +70,11 @@ class EmailPusher(object):
# See httppusher
self.max_stream_ordering = None
self.processing = False
self._is_processing = False
@defer.inlineCallbacks
def on_started(self):
if self.mailer is not None:
try:
self.throttle_params = yield self.store.get_throttle_params_by_room(
self.pusher_id
)
yield self._process()
except Exception:
logger.exception("Error starting email pusher")
self._start_processing()
def on_stop(self):
if self.timed_call:
@ -92,43 +84,52 @@ class EmailPusher(object):
pass
self.timed_call = None
@defer.inlineCallbacks
def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering)
yield self._process()
self._start_processing()
def on_new_receipts(self, min_stream_id, max_stream_id):
# We could wake up and cancel the timer but there tend to be quite a
# lot of read receipts so it's probably less work to just let the
# timer fire
return defer.succeed(None)
pass
@defer.inlineCallbacks
def on_timer(self):
self.timed_call = None
yield self._process()
self._start_processing()
def _start_processing(self):
if self._is_processing:
return
run_as_background_process("emailpush.process", self._process)
@defer.inlineCallbacks
def _process(self):
if self.processing:
return
# we should never get here if we are already processing
assert not self._is_processing
with LoggingContext("emailpush._process"):
with Measure(self.clock, "emailpush._process"):
try:
self._is_processing = True
if self.throttle_params is None:
# this is our first loop: load up the throttle params
self.throttle_params = yield self.store.get_throttle_params_by_room(
self.pusher_id
)
# if the max ordering changes while we're running _unsafe_process,
# call it again, and so on until we've caught up.
while True:
starting_max_ordering = self.max_stream_ordering
try:
self.processing = True
# if the max ordering changes while we're running _unsafe_process,
# call it again, and so on until we've caught up.
while True:
starting_max_ordering = self.max_stream_ordering
try:
yield self._unsafe_process()
except Exception:
logger.exception("Exception processing notifs")
if self.max_stream_ordering == starting_max_ordering:
break
finally:
self.processing = False
yield self._unsafe_process()
except Exception:
logger.exception("Exception processing notifs")
if self.max_stream_ordering == starting_max_ordering:
break
finally:
self._is_processing = False
@defer.inlineCallbacks
def _unsafe_process(self):

View file

@ -22,9 +22,8 @@ from prometheus_client import Counter
from twisted.internet import defer
from twisted.internet.error import AlreadyCalled, AlreadyCancelled
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.push import PusherConfigException
from synapse.util.logcontext import LoggingContext
from synapse.util.metrics import Measure
from . import push_rule_evaluator, push_tools
@ -61,7 +60,7 @@ class HttpPusher(object):
self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
self.failing_since = pusherdict['failing_since']
self.timed_call = None
self.processing = False
self._is_processing = False
# This is the highest stream ordering we know it's safe to process.
# When new events arrive, we'll be given a window of new events: we
@ -92,34 +91,27 @@ class HttpPusher(object):
self.data_minus_url.update(self.data)
del self.data_minus_url['url']
@defer.inlineCallbacks
def on_started(self):
try:
yield self._process()
except Exception:
logger.exception("Error starting http pusher")
self._start_processing()
@defer.inlineCallbacks
def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering or 0)
yield self._process()
self._start_processing()
@defer.inlineCallbacks
def on_new_receipts(self, min_stream_id, max_stream_id):
# Note that the min here shouldn't be relied upon to be accurate.
# We could check the receipts are actually m.read receipts here,
# but currently that's the only type of receipt anyway...
with LoggingContext("push.on_new_receipts"):
with Measure(self.clock, "push.on_new_receipts"):
badge = yield push_tools.get_badge_count(
self.hs.get_datastore(), self.user_id
)
yield self._send_badge(badge)
run_as_background_process("http_pusher.on_new_receipts", self._update_badge)
@defer.inlineCallbacks
def _update_badge(self):
badge = yield push_tools.get_badge_count(self.hs.get_datastore(), self.user_id)
yield self._send_badge(badge)
def on_timer(self):
yield self._process()
self._start_processing()
def on_stop(self):
if self.timed_call:
@ -129,27 +121,31 @@ class HttpPusher(object):
pass
self.timed_call = None
@defer.inlineCallbacks
def _process(self):
if self.processing:
def _start_processing(self):
if self._is_processing:
return
with LoggingContext("push._process"):
with Measure(self.clock, "push._process"):
run_as_background_process("httppush.process", self._process)
@defer.inlineCallbacks
def _process(self):
# we should never get here if we are already processing
assert not self._is_processing
try:
self._is_processing = True
# if the max ordering changes while we're running _unsafe_process,
# call it again, and so on until we've caught up.
while True:
starting_max_ordering = self.max_stream_ordering
try:
self.processing = True
# if the max ordering changes while we're running _unsafe_process,
# call it again, and so on until we've caught up.
while True:
starting_max_ordering = self.max_stream_ordering
try:
yield self._unsafe_process()
except Exception:
logger.exception("Exception processing notifs")
if self.max_stream_ordering == starting_max_ordering:
break
finally:
self.processing = False
yield self._unsafe_process()
except Exception:
logger.exception("Exception processing notifs")
if self.max_stream_ordering == starting_max_ordering:
break
finally:
self._is_processing = False
@defer.inlineCallbacks
def _unsafe_process(self):

View file

@ -526,8 +526,7 @@ def load_jinja2_templates(config):
Returns:
(notif_template_html, notif_template_text)
"""
logger.info("loading jinja2")
logger.info("loading email templates from '%s'", config.email_template_dir)
loader = jinja2.FileSystemLoader(config.email_template_dir)
env = jinja2.Environment(loader=loader)
env.filters["format_ts"] = format_ts_filter

View file

@ -20,24 +20,39 @@ from twisted.internet import defer
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.push.pusher import PusherFactory
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
logger = logging.getLogger(__name__)
class PusherPool:
"""
The pusher pool. This is responsible for dispatching notifications of new events to
the http and email pushers.
It provides three methods which are designed to be called by the rest of the
application: `start`, `on_new_notifications`, and `on_new_receipts`: each of these
delegates to each of the relevant pushers.
Note that it is expected that each pusher will have its own 'processing' loop which
will send out the notifications in the background, rather than blocking until the
notifications are sent; accordingly Pusher.on_started, Pusher.on_new_notifications and
Pusher.on_new_receipts are not expected to return deferreds.
"""
def __init__(self, _hs):
self.hs = _hs
self.pusher_factory = PusherFactory(_hs)
self.start_pushers = _hs.config.start_pushers
self._should_start_pushers = _hs.config.start_pushers
self.store = self.hs.get_datastore()
self.clock = self.hs.get_clock()
self.pushers = {}
@defer.inlineCallbacks
def start(self):
pushers = yield self.store.get_all_pushers()
self._start_pushers(pushers)
"""Starts the pushers off in a background process.
"""
if not self._should_start_pushers:
logger.info("Not starting pushers because they are disabled in the config")
return
run_as_background_process("start_pushers", self._start_pushers)
@defer.inlineCallbacks
def add_pusher(self, user_id, access_token, kind, app_id,
@ -86,7 +101,7 @@ class PusherPool:
last_stream_ordering=last_stream_ordering,
profile_tag=profile_tag,
)
yield self._refresh_pusher(app_id, pushkey, user_id)
yield self.start_pusher_by_id(app_id, pushkey, user_id)
@defer.inlineCallbacks
def remove_pushers_by_app_id_and_pushkey_not_user(self, app_id, pushkey,
@ -123,45 +138,23 @@ class PusherPool:
p['app_id'], p['pushkey'], p['user_name'],
)
def on_new_notifications(self, min_stream_id, max_stream_id):
run_as_background_process(
"on_new_notifications",
self._on_new_notifications, min_stream_id, max_stream_id,
)
@defer.inlineCallbacks
def _on_new_notifications(self, min_stream_id, max_stream_id):
def on_new_notifications(self, min_stream_id, max_stream_id):
try:
users_affected = yield self.store.get_push_action_users_in_range(
min_stream_id, max_stream_id
)
deferreds = []
for u in users_affected:
if u in self.pushers:
for p in self.pushers[u].values():
deferreds.append(
run_in_background(
p.on_new_notifications,
min_stream_id, max_stream_id,
)
)
p.on_new_notifications(min_stream_id, max_stream_id)
yield make_deferred_yieldable(
defer.gatherResults(deferreds, consumeErrors=True),
)
except Exception:
logger.exception("Exception in pusher on_new_notifications")
def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids):
run_as_background_process(
"on_new_receipts",
self._on_new_receipts, min_stream_id, max_stream_id, affected_room_ids,
)
@defer.inlineCallbacks
def _on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids):
def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids):
try:
# Need to subtract 1 from the minimum because the lower bound here
# is not inclusive
@ -171,26 +164,20 @@ class PusherPool:
# This returns a tuple, user_id is at index 3
users_affected = set([r[3] for r in updated_receipts])
deferreds = []
for u in users_affected:
if u in self.pushers:
for p in self.pushers[u].values():
deferreds.append(
run_in_background(
p.on_new_receipts,
min_stream_id, max_stream_id,
)
)
p.on_new_receipts(min_stream_id, max_stream_id)
yield make_deferred_yieldable(
defer.gatherResults(deferreds, consumeErrors=True),
)
except Exception:
logger.exception("Exception in pusher on_new_receipts")
@defer.inlineCallbacks
def _refresh_pusher(self, app_id, pushkey, user_id):
def start_pusher_by_id(self, app_id, pushkey, user_id):
"""Look up the details for the given pusher, and start it"""
if not self._should_start_pushers:
return
resultlist = yield self.store.get_pushers_by_app_id_and_pushkey(
app_id, pushkey
)
@ -201,34 +188,50 @@ class PusherPool:
p = r
if p:
self._start_pusher(p)
self._start_pushers([p])
@defer.inlineCallbacks
def _start_pushers(self):
"""Start all the pushers
def _start_pushers(self, pushers):
if not self.start_pushers:
logger.info("Not starting pushers because they are disabled in the config")
return
Returns:
Deferred
"""
pushers = yield self.store.get_all_pushers()
logger.info("Starting %d pushers", len(pushers))
for pusherdict in pushers:
try:
p = self.pusher_factory.create_pusher(pusherdict)
except Exception:
logger.exception("Couldn't start a pusher: caught Exception")
continue
if p:
appid_pushkey = "%s:%s" % (
pusherdict['app_id'],
pusherdict['pushkey'],
)
byuser = self.pushers.setdefault(pusherdict['user_name'], {})
if appid_pushkey in byuser:
byuser[appid_pushkey].on_stop()
byuser[appid_pushkey] = p
run_in_background(p.on_started)
self._start_pusher(pusherdict)
logger.info("Started pushers")
def _start_pusher(self, pusherdict):
"""Start the given pusher
Args:
pusherdict (dict):
Returns:
None
"""
try:
p = self.pusher_factory.create_pusher(pusherdict)
except Exception:
logger.exception("Couldn't start a pusher: caught Exception")
return
if not p:
return
appid_pushkey = "%s:%s" % (
pusherdict['app_id'],
pusherdict['pushkey'],
)
byuser = self.pushers.setdefault(pusherdict['user_name'], {})
if appid_pushkey in byuser:
byuser[appid_pushkey].on_stop()
byuser[appid_pushkey] = p
p.on_started()
@defer.inlineCallbacks
def remove_pusher(self, app_id, pushkey, user_id):
appid_pushkey = "%s:%s" % (app_id, pushkey)

View file

@ -53,9 +53,10 @@ REQUIREMENTS = {
"pillow>=3.1.2": ["PIL"],
"pydenticon>=0.2": ["pydenticon"],
"sortedcontainers>=1.4.4": ["sortedcontainers"],
"psutil>=2.0.0": ["psutil>=2.0.0"],
"pysaml2>=3.0.0": ["saml2"],
"pymacaroons-pynacl>=0.9.3": ["pymacaroons"],
"msgpack-python>=0.3.0": ["msgpack"],
"msgpack-python>=0.4.2": ["msgpack"],
"phonenumbers>=8.2.0": ["phonenumbers"],
"six>=1.10": ["six"],
@ -79,12 +80,6 @@ CONDITIONAL_REQUIREMENTS = {
"matrix-synapse-ldap3": {
"matrix-synapse-ldap3>=0.1": ["ldap_auth_provider"],
},
"psutil": {
"psutil>=2.0.0": ["psutil>=2.0.0"],
},
"affinity": {
"affinity": ["affinity"],
},
"postgres": {
"psycopg2>=2.6": ["psycopg2"]
}

View file

@ -15,6 +15,8 @@
import logging
import six
from synapse.storage._base import SQLBaseStore
from synapse.storage.engines import PostgresEngine
@ -23,6 +25,13 @@ from ._slaved_id_tracker import SlavedIdTracker
logger = logging.getLogger(__name__)
def __func__(inp):
if six.PY3:
return inp
else:
return inp.__func__
class BaseSlavedStore(SQLBaseStore):
def __init__(self, db_conn, hs):
super(BaseSlavedStore, self).__init__(db_conn, hs)

View file

@ -17,7 +17,7 @@ from synapse.storage import DataStore
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.caches.stream_change_cache import StreamChangeCache
from ._base import BaseSlavedStore
from ._base import BaseSlavedStore, __func__
from ._slaved_id_tracker import SlavedIdTracker
@ -43,11 +43,11 @@ class SlavedDeviceInboxStore(BaseSlavedStore):
expiry_ms=30 * 60 * 1000,
)
get_to_device_stream_token = DataStore.get_to_device_stream_token.__func__
get_new_messages_for_device = DataStore.get_new_messages_for_device.__func__
get_new_device_msgs_for_remote = DataStore.get_new_device_msgs_for_remote.__func__
delete_messages_for_device = DataStore.delete_messages_for_device.__func__
delete_device_msgs_for_remote = DataStore.delete_device_msgs_for_remote.__func__
get_to_device_stream_token = __func__(DataStore.get_to_device_stream_token)
get_new_messages_for_device = __func__(DataStore.get_new_messages_for_device)
get_new_device_msgs_for_remote = __func__(DataStore.get_new_device_msgs_for_remote)
delete_messages_for_device = __func__(DataStore.delete_messages_for_device)
delete_device_msgs_for_remote = __func__(DataStore.delete_device_msgs_for_remote)
def stream_positions(self):
result = super(SlavedDeviceInboxStore, self).stream_positions()

View file

@ -13,23 +13,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import six
from synapse.storage import DataStore
from synapse.storage.end_to_end_keys import EndToEndKeyStore
from synapse.util.caches.stream_change_cache import StreamChangeCache
from ._base import BaseSlavedStore
from ._base import BaseSlavedStore, __func__
from ._slaved_id_tracker import SlavedIdTracker
def __func__(inp):
if six.PY3:
return inp
else:
return inp.__func__
class SlavedDeviceStore(BaseSlavedStore):
def __init__(self, db_conn, hs):
super(SlavedDeviceStore, self).__init__(db_conn, hs)

View file

@ -16,7 +16,7 @@
from synapse.storage import DataStore
from synapse.util.caches.stream_change_cache import StreamChangeCache
from ._base import BaseSlavedStore
from ._base import BaseSlavedStore, __func__
from ._slaved_id_tracker import SlavedIdTracker
@ -33,9 +33,9 @@ class SlavedGroupServerStore(BaseSlavedStore):
"_group_updates_stream_cache", self._group_updates_id_gen.get_current_token(),
)
get_groups_changes_for_user = DataStore.get_groups_changes_for_user.__func__
get_group_stream_token = DataStore.get_group_stream_token.__func__
get_all_groups_for_user = DataStore.get_all_groups_for_user.__func__
get_groups_changes_for_user = __func__(DataStore.get_groups_changes_for_user)
get_group_stream_token = __func__(DataStore.get_group_stream_token)
get_all_groups_for_user = __func__(DataStore.get_all_groups_for_user)
def stream_positions(self):
result = super(SlavedGroupServerStore, self).stream_positions()

View file

@ -16,7 +16,7 @@
from synapse.storage import DataStore
from synapse.storage.keys import KeyStore
from ._base import BaseSlavedStore
from ._base import BaseSlavedStore, __func__
class SlavedKeyStore(BaseSlavedStore):
@ -24,11 +24,11 @@ class SlavedKeyStore(BaseSlavedStore):
"_get_server_verify_key"
]
get_server_verify_keys = DataStore.get_server_verify_keys.__func__
store_server_verify_key = DataStore.store_server_verify_key.__func__
get_server_verify_keys = __func__(DataStore.get_server_verify_keys)
store_server_verify_key = __func__(DataStore.store_server_verify_key)
get_server_certificate = DataStore.get_server_certificate.__func__
store_server_certificate = DataStore.store_server_certificate.__func__
get_server_certificate = __func__(DataStore.get_server_certificate)
store_server_certificate = __func__(DataStore.store_server_certificate)
get_server_keys_json = DataStore.get_server_keys_json.__func__
store_server_keys_json = DataStore.store_server_keys_json.__func__
get_server_keys_json = __func__(DataStore.get_server_keys_json)
store_server_keys_json = __func__(DataStore.store_server_keys_json)

View file

@ -17,7 +17,7 @@ from synapse.storage import DataStore
from synapse.storage.presence import PresenceStore
from synapse.util.caches.stream_change_cache import StreamChangeCache
from ._base import BaseSlavedStore
from ._base import BaseSlavedStore, __func__
from ._slaved_id_tracker import SlavedIdTracker
@ -34,8 +34,8 @@ class SlavedPresenceStore(BaseSlavedStore):
"PresenceStreamChangeCache", self._presence_id_gen.get_current_token()
)
_get_active_presence = DataStore._get_active_presence.__func__
take_presence_startup_info = DataStore.take_presence_startup_info.__func__
_get_active_presence = __func__(DataStore._get_active_presence)
take_presence_startup_info = __func__(DataStore.take_presence_startup_info)
_get_presence_for_user = PresenceStore.__dict__["_get_presence_for_user"]
get_presence_for_users = PresenceStore.__dict__["get_presence_for_users"]

Some files were not shown because too many files have changed in this diff Show more