Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes

This commit is contained in:
Erik Johnston 2018-09-13 16:15:56 +01:00
commit df73da691f
18 changed files with 370 additions and 79 deletions

View file

@ -4,7 +4,7 @@ set -e
# CircleCI doesn't give CIRCLE_PR_NUMBER in the environment for non-forked PRs. Wonderful.
# In this case, we just need to do some ~shell magic~ to strip it out of the PULL_REQUEST URL.
echo 'export CIRCLE_PR_NUMBER="${CIRCLE_PR_NUMBER:-${CIRCLE_PULL_REQUEST##*/}}"' >> "$BASH_ENV"
echo 'export CIRCLE_PR_NUMBER="${CIRCLE_PR_NUMBER:-${CIRCLE_PULL_REQUEST##*/}}"' >> $BASH_ENV
source $BASH_ENV
if [[ -z "${CIRCLE_PR_NUMBER}" ]]
@ -19,6 +19,10 @@ GITBASE=`curl -q https://api.github.com/repos/matrix-org/synapse/pulls/${CIRCLE_
# Show what we are before
git show -s
# Set up username so it can do a merge
git config --global user.email bot@matrix.org
git config --global user.name "A robot"
# Fetch and merge. If it doesn't work, it will raise due to set -e.
git fetch -u origin $GITBASE
git merge --no-edit origin/$GITBASE

1
changelog.d/3846.feature Normal file
View file

@ -0,0 +1 @@
Add synapse_admin_mau:registered_reserved_users metric to expose number of real reaserved users

1
changelog.d/3855.misc Normal file
View file

@ -0,0 +1 @@
Removed some excess logging messages.

1
changelog.d/3856.misc Normal file
View file

@ -0,0 +1 @@
Speed up purge history for rooms that have been previously purged

1
changelog.d/3857.misc Normal file
View file

@ -0,0 +1 @@
Refactor some HTTP timeout code.

1
changelog.d/3858.misc Normal file
View file

@ -0,0 +1 @@
Fix running merged builds on CircleCI

1
changelog.d/3859.bugfix Normal file
View file

@ -0,0 +1 @@
Fix handling of redacted events from federation

View file

@ -17,4 +17,14 @@
""" This is a reference implementation of a Matrix home server.
"""
try:
from twisted.internet import protocol
from twisted.internet.protocol import Factory
from twisted.names.dns import DNSDatagramProtocol
protocol.Factory.noisy = False
Factory.noisy = False
DNSDatagramProtocol.noisy = False
except ImportError:
pass
__version__ = "0.33.4"

View file

@ -307,6 +307,10 @@ class SynapseHomeServer(HomeServer):
# Gauges to expose monthly active user control metrics
current_mau_gauge = Gauge("synapse_admin_mau:current", "Current MAU")
max_mau_gauge = Gauge("synapse_admin_mau:max", "MAU Limit")
registered_reserved_users_mau_gauge = Gauge(
"synapse_admin_mau:registered_reserved_users",
"Registered users with reserved threepids"
)
def setup(config_options):
@ -531,10 +535,14 @@ def run(hs):
@defer.inlineCallbacks
def generate_monthly_active_users():
count = 0
current_mau_count = 0
reserved_count = 0
store = hs.get_datastore()
if hs.config.limit_usage_by_mau:
count = yield hs.get_datastore().get_monthly_active_count()
current_mau_gauge.set(float(count))
current_mau_count = yield store.get_monthly_active_count()
reserved_count = yield store.get_registered_reserved_users_count()
current_mau_gauge.set(float(current_mau_count))
registered_reserved_users_mau_gauge.set(float(reserved_count))
max_mau_gauge.set(float(hs.config.max_mau_value))
hs.get_datastore().initialise_reserved_users(

View file

@ -227,7 +227,22 @@ def setup_logging(config, use_worker_options=False):
#
# However this may not be too much of a problem if we are just writing to a file.
observer = STDLibLogObserver()
def _log(event):
if "log_text" in event:
if event["log_text"].startswith("DNSDatagramProtocol starting on "):
return
if event["log_text"].startswith("(UDP Port "):
return
if event["log_text"].startswith("Timing out client"):
return
return observer(event)
globalLogBeginner.beginLoggingTo(
[observer],
[_log],
redirectStandardIO=not config.no_redirect_stdio,
)

View file

@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import six
from synapse.util.caches import intern_dict
from synapse.util.frozenutils import freeze
@ -147,6 +149,9 @@ class EventBase(object):
def items(self):
return list(self._event_dict.items())
def keys(self):
return six.iterkeys(self._event_dict)
class FrozenEvent(EventBase):
def __init__(self, event_dict, internal_metadata_dict={}, rejected_reason=None):

View file

@ -153,7 +153,7 @@ class FederationBase(object):
# *actual* redacted copy to be on the safe side.)
redacted_event = prune_event(pdu)
if (
set(six.iterkeys(redacted_event)) == set(six.iterkeys(pdu)) and
set(redacted_event.keys()) == set(pdu.keys()) and
set(six.iterkeys(redacted_event.content))
== set(six.iterkeys(pdu.content))
):

View file

@ -26,7 +26,7 @@ from canonicaljson import encode_canonical_json
from prometheus_client import Counter
from signedjson.sign import sign_json
from twisted.internet import defer, protocol, reactor
from twisted.internet import defer, protocol
from twisted.internet.error import DNSLookupError
from twisted.web._newclient import ResponseDone
from twisted.web.client import Agent, HTTPConnectionPool
@ -40,10 +40,8 @@ from synapse.api.errors import (
HttpResponseException,
SynapseError,
)
from synapse.http import cancelled_to_request_timed_out_error
from synapse.http.endpoint import matrix_federation_endpoint
from synapse.util import logcontext
from synapse.util.async_helpers import add_timeout_to_deferred
from synapse.util.logcontext import make_deferred_yieldable
logger = logging.getLogger(__name__)
@ -66,13 +64,14 @@ else:
class MatrixFederationEndpointFactory(object):
def __init__(self, hs):
self.reactor = hs.get_reactor()
self.tls_client_options_factory = hs.tls_client_options_factory
def endpointForURI(self, uri):
destination = uri.netloc.decode('ascii')
return matrix_federation_endpoint(
reactor, destination, timeout=10,
self.reactor, destination, timeout=10,
tls_client_options_factory=self.tls_client_options_factory
)
@ -90,6 +89,7 @@ class MatrixFederationHttpClient(object):
self.hs = hs
self.signing_key = hs.config.signing_key[0]
self.server_name = hs.hostname
reactor = hs.get_reactor()
pool = HTTPConnectionPool(reactor)
pool.maxPersistentPerHost = 5
pool.cachedConnectionTimeout = 2 * 60
@ -100,6 +100,7 @@ class MatrixFederationHttpClient(object):
self._store = hs.get_datastore()
self.version_string = hs.version_string.encode('ascii')
self._next_id = 1
self.default_timeout = 60
def _create_url(self, destination, path_bytes, param_bytes, query_bytes):
return urllib.parse.urlunparse(
@ -143,6 +144,11 @@ class MatrixFederationHttpClient(object):
(May also fail with plenty of other Exceptions for things like DNS
failures, connection failures, SSL failures.)
"""
if timeout:
_sec_timeout = timeout / 1000
else:
_sec_timeout = self.default_timeout
if (
self.hs.config.federation_domain_whitelist is not None and
destination not in self.hs.config.federation_domain_whitelist
@ -215,13 +221,9 @@ class MatrixFederationHttpClient(object):
headers=Headers(headers_dict),
data=data,
agent=self.agent,
reactor=self.hs.get_reactor()
)
add_timeout_to_deferred(
request_deferred,
timeout / 1000. if timeout else 60,
self.hs.get_reactor(),
cancelled_to_request_timed_out_error,
)
request_deferred.addTimeout(_sec_timeout, self.hs.get_reactor())
response = yield make_deferred_yieldable(
request_deferred,
)
@ -261,6 +263,13 @@ class MatrixFederationHttpClient(object):
delay = min(delay, 2)
delay *= random.uniform(0.8, 1.4)
logger.debug(
"{%s} Waiting %s before sending to %s...",
txn_id,
delay,
destination
)
yield self.clock.sleep(delay)
retries_left -= 1
else:
@ -279,10 +288,9 @@ class MatrixFederationHttpClient(object):
# :'(
# Update transactions table?
with logcontext.PreserveLoggingContext():
body = yield self._timeout_deferred(
treq.content(response),
timeout,
)
d = treq.content(response)
d.addTimeout(_sec_timeout, self.hs.get_reactor())
body = yield make_deferred_yieldable(d)
raise HttpResponseException(
response.code, response.phrase, body
)
@ -396,10 +404,9 @@ class MatrixFederationHttpClient(object):
check_content_type_is_json(response.headers)
with logcontext.PreserveLoggingContext():
body = yield self._timeout_deferred(
treq.json_content(response),
timeout,
)
d = treq.json_content(response)
d.addTimeout(self.default_timeout, self.hs.get_reactor())
body = yield make_deferred_yieldable(d)
defer.returnValue(body)
@defer.inlineCallbacks
@ -449,10 +456,14 @@ class MatrixFederationHttpClient(object):
check_content_type_is_json(response.headers)
with logcontext.PreserveLoggingContext():
body = yield self._timeout_deferred(
treq.json_content(response),
timeout,
)
d = treq.json_content(response)
if timeout:
_sec_timeout = timeout / 1000
else:
_sec_timeout = self.default_timeout
d.addTimeout(_sec_timeout, self.hs.get_reactor())
body = yield make_deferred_yieldable(d)
defer.returnValue(body)
@ -504,10 +515,9 @@ class MatrixFederationHttpClient(object):
check_content_type_is_json(response.headers)
with logcontext.PreserveLoggingContext():
body = yield self._timeout_deferred(
treq.json_content(response),
timeout,
)
d = treq.json_content(response)
d.addTimeout(self.default_timeout, self.hs.get_reactor())
body = yield make_deferred_yieldable(d)
defer.returnValue(body)
@ -554,10 +564,9 @@ class MatrixFederationHttpClient(object):
check_content_type_is_json(response.headers)
with logcontext.PreserveLoggingContext():
body = yield self._timeout_deferred(
treq.json_content(response),
timeout,
)
d = treq.json_content(response)
d.addTimeout(self.default_timeout, self.hs.get_reactor())
body = yield make_deferred_yieldable(d)
defer.returnValue(body)
@ -599,38 +608,15 @@ class MatrixFederationHttpClient(object):
try:
with logcontext.PreserveLoggingContext():
length = yield self._timeout_deferred(
_readBodyToFile(
response, output_stream, max_size
),
)
d = _readBodyToFile(response, output_stream, max_size)
d.addTimeout(self.default_timeout, self.hs.get_reactor())
length = yield make_deferred_yieldable(d)
except Exception:
logger.exception("Failed to download body")
raise
defer.returnValue((length, headers))
def _timeout_deferred(self, deferred, timeout_ms=None):
"""Times the deferred out after `timeout_ms` ms
Args:
deferred (Deferred)
timeout_ms (int|None): Timeout in milliseconds. If None defaults
to 60 seconds.
Returns:
Deferred
"""
add_timeout_to_deferred(
deferred,
timeout_ms / 1000. if timeout_ms else 60,
self.hs.get_reactor(),
cancelled_to_request_timed_out_error,
)
return deferred
class _ReadBodyToFileProtocol(protocol.Protocol):
def __init__(self, stream, deferred, max_size):

View file

@ -1890,20 +1890,6 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
")"
)
# create an index on should_delete because later we'll be looking for
# the should_delete / shouldn't_delete subsets
txn.execute(
"CREATE INDEX events_to_purge_should_delete"
" ON events_to_purge(should_delete)",
)
# We do joins against events_to_purge for e.g. calculating state
# groups to purge, etc., so lets make an index.
txn.execute(
"CREATE INDEX events_to_purge_id"
" ON events_to_purge(event_id)",
)
# First ensure that we're not about to delete all the forward extremeties
txn.execute(
"SELECT e.event_id, e.depth FROM events as e "
@ -1930,19 +1916,45 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
should_delete_params = ()
if not delete_local_events:
should_delete_expr += " AND event_id NOT LIKE ?"
should_delete_params += ("%:" + self.hs.hostname, )
# We include the parameter twice since we use the expression twice
should_delete_params += (
"%:" + self.hs.hostname,
"%:" + self.hs.hostname,
)
should_delete_params += (room_id, token.topological)
# Note that we insert events that are outliers and aren't going to be
# deleted, as nothing will happen to them.
txn.execute(
"INSERT INTO events_to_purge"
" SELECT event_id, %s"
" FROM events AS e LEFT JOIN state_events USING (event_id)"
" WHERE e.room_id = ? AND topological_ordering < ?" % (
" WHERE (NOT outlier OR (%s)) AND e.room_id = ? AND topological_ordering < ?"
% (
should_delete_expr,
should_delete_expr,
),
should_delete_params,
)
# We create the indices *after* insertion as that's a lot faster.
# create an index on should_delete because later we'll be looking for
# the should_delete / shouldn't_delete subsets
txn.execute(
"CREATE INDEX events_to_purge_should_delete"
" ON events_to_purge(should_delete)",
)
# We do joins against events_to_purge for e.g. calculating state
# groups to purge, etc., so lets make an index.
txn.execute(
"CREATE INDEX events_to_purge_id"
" ON events_to_purge(event_id)",
)
txn.execute(
"SELECT event_id, should_delete FROM events_to_purge"
)

View file

@ -146,6 +146,23 @@ class MonthlyActiveUsersStore(SQLBaseStore):
return count
return self.runInteraction("count_users", _count_users)
@defer.inlineCallbacks
def get_registered_reserved_users_count(self):
"""Of the reserved threepids defined in config, how many are associated
with registered users?
Returns:
Defered[int]: Number of real reserved users
"""
count = 0
for tp in self.hs.config.mau_limits_reserved_threepids:
user_id = yield self.hs.get_datastore().get_user_id_by_threepid(
tp["medium"], tp["address"]
)
if user_id:
count = count + 1
defer.returnValue(count)
@defer.inlineCallbacks
def upsert_monthly_active_user(self, user_id):
"""

View file

@ -0,0 +1,157 @@
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from mock import Mock
from twisted.internet.defer import TimeoutError
from twisted.internet.error import ConnectingCancelledError, DNSLookupError
from twisted.web.client import ResponseNeverReceived
from synapse.http.matrixfederationclient import MatrixFederationHttpClient
from tests.unittest import HomeserverTestCase
class FederationClientTests(HomeserverTestCase):
def make_homeserver(self, reactor, clock):
hs = self.setup_test_homeserver(reactor=reactor, clock=clock)
hs.tls_client_options_factory = None
return hs
def prepare(self, reactor, clock, homeserver):
self.cl = MatrixFederationHttpClient(self.hs)
self.reactor.lookups["testserv"] = "1.2.3.4"
def test_dns_error(self):
"""
If the DNS raising returns an error, it will bubble up.
"""
d = self.cl._request("testserv2:8008", "GET", "foo/bar", timeout=10000)
self.pump()
f = self.failureResultOf(d)
self.assertIsInstance(f.value, DNSLookupError)
def test_client_never_connect(self):
"""
If the HTTP request is not connected and is timed out, it'll give a
ConnectingCancelledError.
"""
d = self.cl._request("testserv:8008", "GET", "foo/bar", timeout=10000)
self.pump()
# Nothing happened yet
self.assertFalse(d.called)
# Make sure treq is trying to connect
clients = self.reactor.tcpClients
self.assertEqual(len(clients), 1)
self.assertEqual(clients[0][0], '1.2.3.4')
self.assertEqual(clients[0][1], 8008)
# Deferred is still without a result
self.assertFalse(d.called)
# Push by enough to time it out
self.reactor.advance(10.5)
f = self.failureResultOf(d)
self.assertIsInstance(f.value, ConnectingCancelledError)
def test_client_connect_no_response(self):
"""
If the HTTP request is connected, but gets no response before being
timed out, it'll give a ResponseNeverReceived.
"""
d = self.cl._request("testserv:8008", "GET", "foo/bar", timeout=10000)
self.pump()
# Nothing happened yet
self.assertFalse(d.called)
# Make sure treq is trying to connect
clients = self.reactor.tcpClients
self.assertEqual(len(clients), 1)
self.assertEqual(clients[0][0], '1.2.3.4')
self.assertEqual(clients[0][1], 8008)
conn = Mock()
client = clients[0][2].buildProtocol(None)
client.makeConnection(conn)
# Deferred is still without a result
self.assertFalse(d.called)
# Push by enough to time it out
self.reactor.advance(10.5)
f = self.failureResultOf(d)
self.assertIsInstance(f.value, ResponseNeverReceived)
def test_client_gets_headers(self):
"""
Once the client gets the headers, _request returns successfully.
"""
d = self.cl._request("testserv:8008", "GET", "foo/bar", timeout=10000)
self.pump()
conn = Mock()
clients = self.reactor.tcpClients
client = clients[0][2].buildProtocol(None)
client.makeConnection(conn)
# Deferred does not have a result
self.assertFalse(d.called)
# Send it the HTTP response
client.dataReceived(b"HTTP/1.1 200 OK\r\nServer: Fake\r\n\r\n")
# We should get a successful response
r = self.successResultOf(d)
self.assertEqual(r.code, 200)
def test_client_headers_no_body(self):
"""
If the HTTP request is connected, but gets no response before being
timed out, it'll give a ResponseNeverReceived.
"""
d = self.cl.post_json("testserv:8008", "foo/bar", timeout=10000)
self.pump()
conn = Mock()
clients = self.reactor.tcpClients
client = clients[0][2].buildProtocol(None)
client.makeConnection(conn)
# Deferred does not have a result
self.assertFalse(d.called)
# Send it the HTTP response
client.dataReceived(
(b"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\n"
b"Server: Fake\r\n\r\n")
)
# Push by enough to time it out
self.reactor.advance(10.5)
f = self.failureResultOf(d)
self.assertIsInstance(f.value, TimeoutError)

View file

@ -4,9 +4,14 @@ from io import BytesIO
from six import text_type
import attr
from zope.interface import implementer
from twisted.internet import address, threads
from twisted.internet import address, threads, udp
from twisted.internet._resolver import HostResolution
from twisted.internet.address import IPv4Address
from twisted.internet.defer import Deferred
from twisted.internet.error import DNSLookupError
from twisted.internet.interfaces import IReactorPluggableNameResolver
from twisted.python.failure import Failure
from twisted.test.proto_helpers import MemoryReactorClock
@ -154,11 +159,46 @@ def render(request, resource, clock):
wait_until_result(clock, request)
@implementer(IReactorPluggableNameResolver)
class ThreadedMemoryReactorClock(MemoryReactorClock):
"""
A MemoryReactorClock that supports callFromThread.
"""
def __init__(self):
self._udp = []
self.lookups = {}
class Resolver(object):
def resolveHostName(
_self,
resolutionReceiver,
hostName,
portNumber=0,
addressTypes=None,
transportSemantics='TCP',
):
resolution = HostResolution(hostName)
resolutionReceiver.resolutionBegan(resolution)
if hostName not in self.lookups:
raise DNSLookupError("OH NO")
resolutionReceiver.addressResolved(
IPv4Address('TCP', self.lookups[hostName], portNumber)
)
resolutionReceiver.resolutionComplete()
return resolution
self.nameResolver = Resolver()
super(ThreadedMemoryReactorClock, self).__init__()
def listenUDP(self, port, protocol, interface='', maxPacketSize=8196):
p = udp.Port(port, protocol, interface, maxPacketSize, self)
p.startListening()
self._udp.append(p)
return p
def callFromThread(self, callback, *args, **kwargs):
"""
Make the callback fire in the next reactor iteration.

View file

@ -183,3 +183,34 @@ class MonthlyActiveUsersTestCase(HomeserverTestCase):
self.store.populate_monthly_active_users('user_id')
self.pump()
self.store.upsert_monthly_active_user.assert_not_called()
def test_get_reserved_real_user_account(self):
# Test no reserved users, or reserved threepids
count = self.store.get_registered_reserved_users_count()
self.assertEquals(self.get_success(count), 0)
# Test reserved users but no registered users
user1 = '@user1:example.com'
user2 = '@user2:example.com'
user1_email = 'user1@example.com'
user2_email = 'user2@example.com'
threepids = [
{'medium': 'email', 'address': user1_email},
{'medium': 'email', 'address': user2_email},
]
self.hs.config.mau_limits_reserved_threepids = threepids
self.store.initialise_reserved_users(threepids)
self.pump()
count = self.store.get_registered_reserved_users_count()
self.assertEquals(self.get_success(count), 0)
# Test reserved registed users
self.store.register(user_id=user1, token="123", password_hash=None)
self.store.register(user_id=user2, token="456", password_hash=None)
self.pump()
now = int(self.hs.get_clock().time_msec())
self.store.user_add_threepid(user1, "email", user1_email, now, now)
self.store.user_add_threepid(user2, "email", user2_email, now, now)
count = self.store.get_registered_reserved_users_count()
self.assertEquals(self.get_success(count), len(threepids))