0
0
Fork 1
mirror of https://mau.dev/maunium/synapse.git synced 2024-12-16 00:13:54 +01:00

Merge pull request #156 from matrix-org/erikj/join_perf

Make joining #matrix:matrix.org over federation quicker
This commit is contained in:
Mark Haines 2015-05-22 16:09:54 +01:00
commit 36317f3dad
6 changed files with 92 additions and 41 deletions

View file

@ -80,6 +80,7 @@ class FederationBase(object):
destinations=[pdu.origin], destinations=[pdu.origin],
event_id=pdu.event_id, event_id=pdu.event_id,
outlier=outlier, outlier=outlier,
timeout=10000,
) )
if new_pdu: if new_pdu:

View file

@ -22,6 +22,7 @@ from .units import Edu
from synapse.api.errors import ( from synapse.api.errors import (
CodeMessageException, HttpResponseException, SynapseError, CodeMessageException, HttpResponseException, SynapseError,
) )
from synapse.util import unwrapFirstError
from synapse.util.expiringcache import ExpiringCache from synapse.util.expiringcache import ExpiringCache
from synapse.util.logutils import log_function from synapse.util.logutils import log_function
from synapse.events import FrozenEvent from synapse.events import FrozenEvent
@ -173,7 +174,7 @@ class FederationClient(FederationBase):
@defer.inlineCallbacks @defer.inlineCallbacks
@log_function @log_function
def get_pdu(self, destinations, event_id, outlier=False): def get_pdu(self, destinations, event_id, outlier=False, timeout=None):
"""Requests the PDU with given origin and ID from the remote home """Requests the PDU with given origin and ID from the remote home
servers. servers.
@ -189,6 +190,8 @@ class FederationClient(FederationBase):
outlier (bool): Indicates whether the PDU is an `outlier`, i.e. if outlier (bool): Indicates whether the PDU is an `outlier`, i.e. if
it's from an arbitary point in the context as opposed to part it's from an arbitary point in the context as opposed to part
of the current block of PDUs. Defaults to `False` of the current block of PDUs. Defaults to `False`
timeout (int): How long to try (in ms) each destination for before
moving to the next destination. None indicates no timeout.
Returns: Returns:
Deferred: Results in the requested PDU. Deferred: Results in the requested PDU.
@ -212,7 +215,7 @@ class FederationClient(FederationBase):
with limiter: with limiter:
transaction_data = yield self.transport_layer.get_event( transaction_data = yield self.transport_layer.get_event(
destination, event_id destination, event_id, timeout=timeout,
) )
logger.debug("transaction_data %r", transaction_data) logger.debug("transaction_data %r", transaction_data)
@ -370,13 +373,17 @@ class FederationClient(FederationBase):
for p in content.get("auth_chain", []) for p in content.get("auth_chain", [])
] ]
signed_state = yield self._check_sigs_and_hash_and_fetch( signed_state, signed_auth = yield defer.gatherResults(
destination, state, outlier=True [
) self._check_sigs_and_hash_and_fetch(
destination, state, outlier=True
signed_auth = yield self._check_sigs_and_hash_and_fetch( ),
destination, auth_chain, outlier=True self._check_sigs_and_hash_and_fetch(
) destination, auth_chain, outlier=True
)
],
consumeErrors=True
).addErrback(unwrapFirstError)
auth_chain.sort(key=lambda e: e.depth) auth_chain.sort(key=lambda e: e.depth)
@ -518,7 +525,7 @@ class FederationClient(FederationBase):
# Are we missing any? # Are we missing any?
seen_events = set(earliest_events_ids) seen_events = set(earliest_events_ids)
seen_events.update(e.event_id for e in signed_events) seen_events.update(e.event_id for e in signed_events if e)
missing_events = {} missing_events = {}
for e in itertools.chain(latest_events, signed_events): for e in itertools.chain(latest_events, signed_events):

View file

@ -50,13 +50,15 @@ class TransportLayerClient(object):
) )
@log_function @log_function
def get_event(self, destination, event_id): def get_event(self, destination, event_id, timeout=None):
""" Requests the pdu with give id and origin from the given server. """ Requests the pdu with give id and origin from the given server.
Args: Args:
destination (str): The host name of the remote home server we want destination (str): The host name of the remote home server we want
to get the state from. to get the state from.
event_id (str): The id of the event being requested. event_id (str): The id of the event being requested.
timeout (int): How long to try (in ms) the destination for before
giving up. None indicates no timeout.
Returns: Returns:
Deferred: Results in a dict received from the remote homeserver. Deferred: Results in a dict received from the remote homeserver.
@ -65,7 +67,7 @@ class TransportLayerClient(object):
destination, event_id) destination, event_id)
path = PREFIX + "/event/%s/" % (event_id, ) path = PREFIX + "/event/%s/" % (event_id, )
return self.client.get_json(destination, path=path) return self.client.get_json(destination, path=path, timeout=timeout)
@log_function @log_function
def backfill(self, destination, room_id, event_tuples, limit): def backfill(self, destination, room_id, event_tuples, limit):

View file

@ -517,30 +517,59 @@ class FederationHandler(BaseHandler):
# FIXME # FIXME
pass pass
auth_ids_to_deferred = {}
def process_auth_ev(ev):
auth_ids = [e_id for e_id, _ in ev.auth_events]
prev_ds = [
auth_ids_to_deferred[i]
for i in auth_ids
if i in auth_ids_to_deferred
]
d = defer.Deferred()
auth_ids_to_deferred[ev.event_id] = d
@defer.inlineCallbacks
def f(*_):
ev.internal_metadata.outlier = True
try:
auth = {
(e.type, e.state_key): e for e in auth_chain
if e.event_id in auth_ids
}
yield self._handle_new_event(
origin, ev, auth_events=auth
)
except:
logger.exception(
"Failed to handle auth event %s",
ev.event_id,
)
d.callback(None)
if prev_ds:
dx = defer.DeferredList(prev_ds)
dx.addBoth(f)
else:
f()
for e in auth_chain: for e in auth_chain:
e.internal_metadata.outlier = True
if e.event_id == event.event_id: if e.event_id == event.event_id:
continue return
process_auth_ev(e)
try: yield defer.DeferredList(auth_ids_to_deferred.values())
auth_ids = [e_id for e_id, _ in e.auth_events]
auth = {
(e.type, e.state_key): e for e in auth_chain
if e.event_id in auth_ids
}
yield self._handle_new_event(
origin, e, auth_events=auth
)
except:
logger.exception(
"Failed to handle auth event %s",
e.event_id,
)
for e in state: @defer.inlineCallbacks
def handle_state(e):
if e.event_id == event.event_id: if e.event_id == event.event_id:
continue return
e.internal_metadata.outlier = True e.internal_metadata.outlier = True
try: try:
@ -558,6 +587,8 @@ class FederationHandler(BaseHandler):
e.event_id, e.event_id,
) )
yield defer.DeferredList([handle_state(e) for e in state])
auth_ids = [e_id for e_id, _ in event.auth_events] auth_ids = [e_id for e_id, _ in event.auth_events]
auth_events = { auth_events = {
(e.type, e.state_key): e for e in auth_chain (e.type, e.state_key): e for e in auth_chain
@ -893,9 +924,12 @@ class FederationHandler(BaseHandler):
# This is a hack to fix some old rooms where the initial join event # This is a hack to fix some old rooms where the initial join event
# didn't reference the create event in its auth events. # didn't reference the create event in its auth events.
if event.type == EventTypes.Member and not event.auth_events: if event.type == EventTypes.Member and not event.auth_events:
if len(event.prev_events) == 1: if len(event.prev_events) == 1 and event.depth < 5:
c = yield self.store.get_event(event.prev_events[0][0]) c = yield self.store.get_event(
if c.type == EventTypes.Create: event.prev_events[0][0],
allow_none=True,
)
if c and c.type == EventTypes.Create:
auth_events[(c.type, c.state_key)] = c auth_events[(c.type, c.state_key)] = c
try: try:

View file

@ -110,7 +110,8 @@ class MatrixFederationHttpClient(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def _create_request(self, destination, method, path_bytes, def _create_request(self, destination, method, path_bytes,
body_callback, headers_dict={}, param_bytes=b"", body_callback, headers_dict={}, param_bytes=b"",
query_bytes=b"", retry_on_dns_fail=True): query_bytes=b"", retry_on_dns_fail=True,
timeout=None):
""" Creates and sends a request to the given url """ Creates and sends a request to the given url
""" """
headers_dict[b"User-Agent"] = [self.version_string] headers_dict[b"User-Agent"] = [self.version_string]
@ -158,7 +159,7 @@ class MatrixFederationHttpClient(object):
response = yield self.clock.time_bound_deferred( response = yield self.clock.time_bound_deferred(
request_deferred, request_deferred,
time_out=60, time_out=timeout/1000. if timeout else 60,
) )
logger.debug("Got response to %s", method) logger.debug("Got response to %s", method)
@ -181,7 +182,7 @@ class MatrixFederationHttpClient(object):
_flatten_response_never_received(e), _flatten_response_never_received(e),
) )
if retries_left: if retries_left and not timeout:
yield sleep(2 ** (5 - retries_left)) yield sleep(2 ** (5 - retries_left))
retries_left -= 1 retries_left -= 1
else: else:
@ -334,7 +335,8 @@ class MatrixFederationHttpClient(object):
defer.returnValue(json.loads(body)) defer.returnValue(json.loads(body))
@defer.inlineCallbacks @defer.inlineCallbacks
def get_json(self, destination, path, args={}, retry_on_dns_fail=True): def get_json(self, destination, path, args={}, retry_on_dns_fail=True,
timeout=None):
""" GETs some json from the given host homeserver and path """ GETs some json from the given host homeserver and path
Args: Args:
@ -343,6 +345,9 @@ class MatrixFederationHttpClient(object):
path (str): The HTTP path. path (str): The HTTP path.
args (dict): A dictionary used to create query strings, defaults to args (dict): A dictionary used to create query strings, defaults to
None. None.
timeout (int): How long to try (in ms) the destination for before
giving up. None indicates no timeout and that the request will
be retried.
Returns: Returns:
Deferred: Succeeds when we get *any* HTTP response. Deferred: Succeeds when we get *any* HTTP response.
@ -370,7 +375,8 @@ class MatrixFederationHttpClient(object):
path.encode("ascii"), path.encode("ascii"),
query_bytes=query_bytes, query_bytes=query_bytes,
body_callback=body_callback, body_callback=body_callback,
retry_on_dns_fail=retry_on_dns_fail retry_on_dns_fail=retry_on_dns_fail,
timeout=timeout,
) )
if 200 <= response.code < 300: if 200 <= response.code < 300:

View file

@ -330,12 +330,13 @@ class EventFederationStore(SQLBaseStore):
" WHERE event_id = ? AND room_id = ?" " WHERE event_id = ? AND room_id = ?"
" )" " )"
" AND NOT EXISTS (" " AND NOT EXISTS ("
" SELECT 1 FROM events WHERE event_id = ? AND room_id = ?" " SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
" AND outlier = ?"
" )" " )"
) )
txn.executemany(query, [ txn.executemany(query, [
(e_id, room_id, e_id, room_id, e_id, room_id, ) (e_id, room_id, e_id, room_id, e_id, room_id, False)
for e_id, _ in prev_events for e_id, _ in prev_events
]) ])