Merge remote-tracking branch 'origin/develop' into rav/fix_federation_errors

This commit is contained in:
Richard van der Hoff 2018-09-27 15:18:21 +01:00
commit f094f715cf
31 changed files with 117 additions and 38 deletions

View file

@ -1,5 +1,21 @@
version: 2 version: 2
jobs: jobs:
dockerhubuploadrelease:
machine: true
steps:
- checkout
- run: docker build -f docker/Dockerfile -t matrixdotorg/synapse:$CIRCLE_TAG .
- run: docker login --username $DOCKER_HUB_USERNAME --password $DOCKER_HUB_PASSWORD
- run: docker push matrixdotorg/synapse:$CIRCLE_TAG
dockerhubuploadlatest:
machine: true
steps:
- checkout
- run: docker build -f docker/Dockerfile -t matrixdotorg/synapse:$CIRCLE_SHA1 .
- run: docker login --username $DOCKER_HUB_USERNAME --password $DOCKER_HUB_PASSWORD
- run: docker tag matrixdotorg/synapse:$CIRCLE_SHA1 matrixdotorg/synapse:latest
- run: docker push matrixdotorg/synapse:$CIRCLE_SHA1
- run: docker push matrixdotorg/synapse:latest
sytestpy2: sytestpy2:
machine: true machine: true
steps: steps:
@ -131,3 +147,13 @@ workflows:
filters: filters:
branches: branches:
ignore: /develop|master|release-.*/ ignore: /develop|master|release-.*/
- dockerhubuploadrelease:
filters:
tags:
only: /^v[0-9].[0-9]+.[0-9]+(.[0-9]+)?/
branches:
ignore: /.*/
- dockerhubuploadlatest:
filters:
branches:
only: master

View file

@ -20,6 +20,9 @@ matrix:
- python: 2.7 - python: 2.7
env: TOX_ENV=py27 env: TOX_ENV=py27
- python: 2.7
env: TOX_ENV=py27-old
- python: 2.7 - python: 2.7
env: TOX_ENV=py27-postgres TRIAL_FLAGS="-j 4" env: TOX_ENV=py27-postgres TRIAL_FLAGS="-j 4"
services: services:

1
changelog.d/3794.misc Normal file
View file

@ -0,0 +1 @@
Speed up calculation of typing updates for replication

1
changelog.d/3946.misc Normal file
View file

@ -0,0 +1 @@
Automate pushes to docker hub

1
changelog.d/3952.misc Normal file
View file

@ -0,0 +1 @@
Run the test suite on the oldest supported versions of our dependencies in CI.

1
changelog.d/3961.bugfix Normal file
View file

@ -0,0 +1 @@
Fix errors due to concurrent monthly_active_user upserts

1
changelog.d/3965.misc Normal file
View file

@ -0,0 +1 @@
Run notify_app_services as a bg process

1
changelog.d/3970.bugfix Normal file
View file

@ -0,0 +1 @@
Replaced all occurences of e.message with str(e). Contributed by Schnuffle

View file

@ -21,4 +21,4 @@ try:
verifier.verify(macaroon, key) verifier.verify(macaroon, key)
print "Signature is correct" print "Signature is correct"
except Exception as e: except Exception as e:
print e.message print str(e)

View file

@ -226,7 +226,7 @@ class Filtering(object):
jsonschema.validate(user_filter_json, USER_FILTER_SCHEMA, jsonschema.validate(user_filter_json, USER_FILTER_SCHEMA,
format_checker=FormatChecker()) format_checker=FormatChecker())
except jsonschema.ValidationError as e: except jsonschema.ValidationError as e:
raise SynapseError(400, e.message) raise SynapseError(400, str(e))
class FilterCollection(object): class FilterCollection(object):

View file

@ -24,7 +24,7 @@ try:
python_dependencies.check_requirements() python_dependencies.check_requirements()
except python_dependencies.MissingRequirementError as e: except python_dependencies.MissingRequirementError as e:
message = "\n".join([ message = "\n".join([
"Missing Requirement: %s" % (e.message,), "Missing Requirement: %s" % (str(e),),
"To install run:", "To install run:",
" pip install --upgrade --force \"%s\"" % (e.dependency,), " pip install --upgrade --force \"%s\"" % (e.dependency,),
"", "",

View file

@ -136,7 +136,7 @@ def start(config_options):
"Synapse appservice", config_options "Synapse appservice", config_options
) )
except ConfigError as e: except ConfigError as e:
sys.stderr.write("\n" + e.message + "\n") sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1) sys.exit(1)
assert config.worker_app == "synapse.app.appservice" assert config.worker_app == "synapse.app.appservice"

View file

@ -153,7 +153,7 @@ def start(config_options):
"Synapse client reader", config_options "Synapse client reader", config_options
) )
except ConfigError as e: except ConfigError as e:
sys.stderr.write("\n" + e.message + "\n") sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1) sys.exit(1)
assert config.worker_app == "synapse.app.client_reader" assert config.worker_app == "synapse.app.client_reader"

View file

@ -169,7 +169,7 @@ def start(config_options):
"Synapse event creator", config_options "Synapse event creator", config_options
) )
except ConfigError as e: except ConfigError as e:
sys.stderr.write("\n" + e.message + "\n") sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1) sys.exit(1)
assert config.worker_app == "synapse.app.event_creator" assert config.worker_app == "synapse.app.event_creator"

View file

@ -140,7 +140,7 @@ def start(config_options):
"Synapse federation reader", config_options "Synapse federation reader", config_options
) )
except ConfigError as e: except ConfigError as e:
sys.stderr.write("\n" + e.message + "\n") sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1) sys.exit(1)
assert config.worker_app == "synapse.app.federation_reader" assert config.worker_app == "synapse.app.federation_reader"

View file

@ -160,7 +160,7 @@ def start(config_options):
"Synapse federation sender", config_options "Synapse federation sender", config_options
) )
except ConfigError as e: except ConfigError as e:
sys.stderr.write("\n" + e.message + "\n") sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1) sys.exit(1)
assert config.worker_app == "synapse.app.federation_sender" assert config.worker_app == "synapse.app.federation_sender"

View file

@ -228,7 +228,7 @@ def start(config_options):
"Synapse frontend proxy", config_options "Synapse frontend proxy", config_options
) )
except ConfigError as e: except ConfigError as e:
sys.stderr.write("\n" + e.message + "\n") sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1) sys.exit(1)
assert config.worker_app == "synapse.app.frontend_proxy" assert config.worker_app == "synapse.app.frontend_proxy"

View file

@ -301,7 +301,7 @@ class SynapseHomeServer(HomeServer):
try: try:
database_engine.check_database(db_conn.cursor()) database_engine.check_database(db_conn.cursor())
except IncorrectDatabaseSetup as e: except IncorrectDatabaseSetup as e:
quit_with_error(e.message) quit_with_error(str(e))
# Gauges to expose monthly active user control metrics # Gauges to expose monthly active user control metrics
@ -328,7 +328,7 @@ def setup(config_options):
config_options, config_options,
) )
except ConfigError as e: except ConfigError as e:
sys.stderr.write("\n" + e.message + "\n") sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1) sys.exit(1)
if not config: if not config:

View file

@ -133,7 +133,7 @@ def start(config_options):
"Synapse media repository", config_options "Synapse media repository", config_options
) )
except ConfigError as e: except ConfigError as e:
sys.stderr.write("\n" + e.message + "\n") sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1) sys.exit(1)
assert config.worker_app == "synapse.app.media_repository" assert config.worker_app == "synapse.app.media_repository"

View file

@ -191,7 +191,7 @@ def start(config_options):
"Synapse pusher", config_options "Synapse pusher", config_options
) )
except ConfigError as e: except ConfigError as e:
sys.stderr.write("\n" + e.message + "\n") sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1) sys.exit(1)
assert config.worker_app == "synapse.app.pusher" assert config.worker_app == "synapse.app.pusher"

View file

@ -410,7 +410,7 @@ def start(config_options):
"Synapse synchrotron", config_options "Synapse synchrotron", config_options
) )
except ConfigError as e: except ConfigError as e:
sys.stderr.write("\n" + e.message + "\n") sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1) sys.exit(1)
assert config.worker_app == "synapse.app.synchrotron" assert config.worker_app == "synapse.app.synchrotron"

View file

@ -188,7 +188,7 @@ def start(config_options):
"Synapse user directory", config_options "Synapse user directory", config_options
) )
except ConfigError as e: except ConfigError as e:
sys.stderr.write("\n" + e.message + "\n") sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1) sys.exit(1)
assert config.worker_app == "synapse.app.user_dir" assert config.worker_app == "synapse.app.user_dir"

View file

@ -25,7 +25,7 @@ if __name__ == "__main__":
try: try:
config = HomeServerConfig.load_config("", sys.argv[3:]) config = HomeServerConfig.load_config("", sys.argv[3:])
except ConfigError as e: except ConfigError as e:
sys.stderr.write("\n" + e.message + "\n") sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1) sys.exit(1)
print (getattr(config, key)) print (getattr(config, key))

View file

@ -341,7 +341,7 @@ class E2eKeysHandler(object):
def _exception_to_failure(e): def _exception_to_failure(e):
if isinstance(e, CodeMessageException): if isinstance(e, CodeMessageException):
return { return {
"status": e.code, "message": e.message, "status": e.code, "message": str(e),
} }
if isinstance(e, NotRetryingDestination): if isinstance(e, NotRetryingDestination):

View file

@ -278,7 +278,7 @@ class BaseProfileHandler(BaseHandler):
except Exception as e: except Exception as e:
logger.warn( logger.warn(
"Failed to update join event for room %s - %s", "Failed to update join event for room %s - %s",
room_id, str(e.message) room_id, str(e)
) )

View file

@ -20,6 +20,7 @@ from twisted.internet import defer
from synapse.api.errors import AuthError, SynapseError from synapse.api.errors import AuthError, SynapseError
from synapse.types import UserID, get_domain_from_id from synapse.types import UserID, get_domain_from_id
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.logcontext import run_in_background from synapse.util.logcontext import run_in_background
from synapse.util.metrics import Measure from synapse.util.metrics import Measure
from synapse.util.wheel_timer import WheelTimer from synapse.util.wheel_timer import WheelTimer
@ -68,6 +69,11 @@ class TypingHandler(object):
# map room IDs to sets of users currently typing # map room IDs to sets of users currently typing
self._room_typing = {} self._room_typing = {}
# caches which room_ids changed at which serials
self._typing_stream_change_cache = StreamChangeCache(
"TypingStreamChangeCache", self._latest_room_serial,
)
self.clock.looping_call( self.clock.looping_call(
self._handle_timeouts, self._handle_timeouts,
5000, 5000,
@ -274,19 +280,29 @@ class TypingHandler(object):
self._latest_room_serial += 1 self._latest_room_serial += 1
self._room_serials[member.room_id] = self._latest_room_serial self._room_serials[member.room_id] = self._latest_room_serial
self._typing_stream_change_cache.entity_has_changed(
member.room_id, self._latest_room_serial,
)
self.notifier.on_new_event( self.notifier.on_new_event(
"typing_key", self._latest_room_serial, rooms=[member.room_id] "typing_key", self._latest_room_serial, rooms=[member.room_id]
) )
def get_all_typing_updates(self, last_id, current_id): def get_all_typing_updates(self, last_id, current_id):
# TODO: Work out a way to do this without scanning the entire state.
if last_id == current_id: if last_id == current_id:
return [] return []
changed_rooms = self._typing_stream_change_cache.get_all_entities_changed(
last_id,
)
if changed_rooms is None:
changed_rooms = self._room_serials
rows = [] rows = []
for room_id, serial in self._room_serials.items(): for room_id in changed_rooms:
if last_id < serial and serial <= current_id: serial = self._room_serials[room_id]
if last_id < serial <= current_id:
typing = self._room_typing[room_id] typing = self._room_typing[room_id]
rows.append((serial, room_id, list(typing))) rows.append((serial, room_id, list(typing)))
rows.sort() rows.sort()

View file

@ -24,9 +24,10 @@ from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError from synapse.api.errors import AuthError
from synapse.handlers.presence import format_user_presence_state from synapse.handlers.presence import format_user_presence_state
from synapse.metrics import LaterGauge from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import StreamToken from synapse.types import StreamToken
from synapse.util.async_helpers import ObservableDeferred, timeout_deferred from synapse.util.async_helpers import ObservableDeferred, timeout_deferred
from synapse.util.logcontext import PreserveLoggingContext, run_in_background from synapse.util.logcontext import PreserveLoggingContext
from synapse.util.logutils import log_function from synapse.util.logutils import log_function
from synapse.util.metrics import Measure from synapse.util.metrics import Measure
from synapse.visibility import filter_events_for_client from synapse.visibility import filter_events_for_client
@ -248,7 +249,10 @@ class Notifier(object):
def _on_new_room_event(self, event, room_stream_id, extra_users=[]): def _on_new_room_event(self, event, room_stream_id, extra_users=[]):
"""Notify any user streams that are interested in this room event""" """Notify any user streams that are interested in this room event"""
# poke any interested application service. # poke any interested application service.
run_in_background(self._notify_app_services, room_stream_id) run_as_background_process(
"notify_app_services",
self._notify_app_services, room_stream_id,
)
if self.federation_sender: if self.federation_sender:
self.federation_sender.notify_new_events(room_stream_id) self.federation_sender.notify_new_events(room_stream_id)

View file

@ -33,31 +33,32 @@ logger = logging.getLogger(__name__)
# [2] https://setuptools.readthedocs.io/en/latest/setuptools.html#declaring-dependencies # [2] https://setuptools.readthedocs.io/en/latest/setuptools.html#declaring-dependencies
REQUIREMENTS = { REQUIREMENTS = {
"jsonschema>=2.5.1": ["jsonschema>=2.5.1"], "jsonschema>=2.5.1": ["jsonschema>=2.5.1"],
"frozendict>=0.4": ["frozendict"], "frozendict>=1": ["frozendict"],
"unpaddedbase64>=1.1.0": ["unpaddedbase64>=1.1.0"], "unpaddedbase64>=1.1.0": ["unpaddedbase64>=1.1.0"],
"canonicaljson>=1.1.3": ["canonicaljson>=1.1.3"], "canonicaljson>=1.1.3": ["canonicaljson>=1.1.3"],
"signedjson>=1.0.0": ["signedjson>=1.0.0"], "signedjson>=1.0.0": ["signedjson>=1.0.0"],
"pynacl>=1.2.1": ["nacl>=1.2.1", "nacl.bindings"], "pynacl>=1.2.1": ["nacl>=1.2.1", "nacl.bindings"],
"service_identity>=1.0.0": ["service_identity>=1.0.0"], "service_identity>=16.0.0": ["service_identity>=16.0.0"],
"Twisted>=17.1.0": ["twisted>=17.1.0"], "Twisted>=17.1.0": ["twisted>=17.1.0"],
"treq>=15.1": ["treq>=15.1"], "treq>=15.1": ["treq>=15.1"],
# Twisted has required pyopenssl 16.0 since about Twisted 16.6. # Twisted has required pyopenssl 16.0 since about Twisted 16.6.
"pyopenssl>=16.0.0": ["OpenSSL>=16.0.0"], "pyopenssl>=16.0.0": ["OpenSSL>=16.0.0"],
"pyyaml": ["yaml"], "pyyaml>=3.11": ["yaml"],
"pyasn1": ["pyasn1"], "pyasn1>=0.1.9": ["pyasn1"],
"daemonize": ["daemonize"], "pyasn1-modules>=0.0.7": ["pyasn1_modules"],
"bcrypt": ["bcrypt>=3.1.0"], "daemonize>=2.3.1": ["daemonize"],
"pillow": ["PIL"], "bcrypt>=3.1.0": ["bcrypt>=3.1.0"],
"pydenticon": ["pydenticon"], "pillow>=3.1.2": ["PIL"],
"sortedcontainers": ["sortedcontainers"], "pydenticon>=0.2": ["pydenticon"],
"pysaml2>=3.0.0": ["saml2>=3.0.0"], "sortedcontainers>=1.4.4": ["sortedcontainers"],
"pymacaroons-pynacl": ["pymacaroons"], "pysaml2>=3.0.0": ["saml2"],
"pymacaroons-pynacl>=0.9.3": ["pymacaroons"],
"msgpack-python>=0.3.0": ["msgpack"], "msgpack-python>=0.3.0": ["msgpack"],
"phonenumbers>=8.2.0": ["phonenumbers"], "phonenumbers>=8.2.0": ["phonenumbers"],
"six": ["six"], "six>=1.10": ["six"],
"prometheus_client": ["prometheus_client"], "prometheus_client>=0.0.18": ["prometheus_client"],
# we use attr.s(slots), which arrived in 16.0.0 # we use attr.s(slots), which arrived in 16.0.0
"attrs>=16.0.0": ["attr>=16.0.0"], "attrs>=16.0.0": ["attr>=16.0.0"],

View file

@ -172,6 +172,10 @@ class MonthlyActiveUsersStore(SQLBaseStore):
Deferred[bool]: True if a new entry was created, False if an Deferred[bool]: True if a new entry was created, False if an
existing one was updated. existing one was updated.
""" """
# Am consciously deciding to lock the table on the basis that is ought
# never be a big table and alternative approaches (batching multiple
# upserts into a single txn) introduced a lot of extra complexity.
# See https://github.com/matrix-org/synapse/issues/3854 for more
is_insert = yield self._simple_upsert( is_insert = yield self._simple_upsert(
desc="upsert_monthly_active_user", desc="upsert_monthly_active_user",
table="monthly_active_users", table="monthly_active_users",
@ -181,7 +185,6 @@ class MonthlyActiveUsersStore(SQLBaseStore):
values={ values={
"timestamp": int(self._clock.time_msec()), "timestamp": int(self._clock.time_msec()),
}, },
lock=False,
) )
if is_insert: if is_insert:
self.user_last_seen_monthly_active.invalidate((user_id,)) self.user_last_seen_monthly_active.invalidate((user_id,))

View file

@ -121,7 +121,7 @@ class TestCase(unittest.TestCase):
try: try:
self.assertEquals(attrs[key], getattr(obj, key)) self.assertEquals(attrs[key], getattr(obj, key))
except AssertionError as e: except AssertionError as e:
raise (type(e))(e.message + " for '.%s'" % key) raise (type(e))(str(e) + " for '.%s'" % key)
def assert_dict(self, required, actual): def assert_dict(self, required, actual):
"""Does a partial assert of a dict. """Does a partial assert of a dict.

20
tox.ini
View file

@ -64,6 +64,26 @@ setenv =
{[base]setenv} {[base]setenv}
SYNAPSE_POSTGRES = 1 SYNAPSE_POSTGRES = 1
# A test suite for the oldest supported versions of Python libraries, to catch
# any uses of APIs not available in them.
[testenv:py27-old]
skip_install=True
deps =
# Old automat version for Twisted
Automat == 0.3.0
mock
lxml
commands =
/usr/bin/find "{toxinidir}" -name '*.pyc' -delete
# Make all greater-thans equals so we test the oldest version of our direct
# dependencies, but make the pyopenssl 17.0, which can work against an
# OpenSSL 1.1 compiled cryptography (as older ones don't compile on Travis).
/bin/sh -c 'python -m synapse.python_dependencies | sed -e "s/>=/==/g" -e "s/psycopg2==2.6//" -e "s/pyopenssl==16.0.0/pyopenssl==17.0.0/" | xargs pip install'
# Install Synapse itself. This won't update any libraries.
pip install -e .
{envbindir}/trial {env:TRIAL_FLAGS:} {posargs:tests} {env:TOXSUFFIX:}
[testenv:py35] [testenv:py35]
usedevelop=true usedevelop=true