Add a timeout param to get_event

This commit is contained in:
Erik Johnston 2015-05-19 14:53:32 +01:00
parent 6837c5edab
commit 5b1631a4a9
4 changed files with 25 additions and 16 deletions

View file

@ -80,6 +80,7 @@ class FederationBase(object):
destinations=[pdu.origin], destinations=[pdu.origin],
event_id=pdu.event_id, event_id=pdu.event_id,
outlier=outlier, outlier=outlier,
timeout=10000,
) )
if new_pdu: if new_pdu:

View file

@ -22,6 +22,7 @@ from .units import Edu
from synapse.api.errors import ( from synapse.api.errors import (
CodeMessageException, HttpResponseException, SynapseError, CodeMessageException, HttpResponseException, SynapseError,
) )
from synapse.util import unwrapFirstError
from synapse.util.expiringcache import ExpiringCache from synapse.util.expiringcache import ExpiringCache
from synapse.util.logutils import log_function from synapse.util.logutils import log_function
from synapse.events import FrozenEvent from synapse.events import FrozenEvent
@ -173,7 +174,7 @@ class FederationClient(FederationBase):
@defer.inlineCallbacks @defer.inlineCallbacks
@log_function @log_function
def get_pdu(self, destinations, event_id, outlier=False): def get_pdu(self, destinations, event_id, outlier=False, timeout=None):
"""Requests the PDU with given origin and ID from the remote home """Requests the PDU with given origin and ID from the remote home
servers. servers.
@ -212,7 +213,7 @@ class FederationClient(FederationBase):
with limiter: with limiter:
transaction_data = yield self.transport_layer.get_event( transaction_data = yield self.transport_layer.get_event(
destination, event_id destination, event_id, timeout=timeout,
) )
logger.debug("transaction_data %r", transaction_data) logger.debug("transaction_data %r", transaction_data)
@ -370,13 +371,17 @@ class FederationClient(FederationBase):
for p in content.get("auth_chain", []) for p in content.get("auth_chain", [])
] ]
signed_state = yield self._check_sigs_and_hash_and_fetch( signed_state, signed_auth = yield defer.gatherResults(
[
self._check_sigs_and_hash_and_fetch(
destination, state, outlier=True destination, state, outlier=True
) ),
self._check_sigs_and_hash_and_fetch(
signed_auth = yield self._check_sigs_and_hash_and_fetch(
destination, auth_chain, outlier=True destination, auth_chain, outlier=True
) )
],
consumeErrors=True
).addErrback(unwrapFirstError)
auth_chain.sort(key=lambda e: e.depth) auth_chain.sort(key=lambda e: e.depth)

View file

@ -50,7 +50,7 @@ class TransportLayerClient(object):
) )
@log_function @log_function
def get_event(self, destination, event_id): def get_event(self, destination, event_id, timeout=None):
""" Requests the pdu with give id and origin from the given server. """ Requests the pdu with give id and origin from the given server.
Args: Args:
@ -65,7 +65,7 @@ class TransportLayerClient(object):
destination, event_id) destination, event_id)
path = PREFIX + "/event/%s/" % (event_id, ) path = PREFIX + "/event/%s/" % (event_id, )
return self.client.get_json(destination, path=path) return self.client.get_json(destination, path=path, timeout=timeout)
@log_function @log_function
def backfill(self, destination, room_id, event_tuples, limit): def backfill(self, destination, room_id, event_tuples, limit):

View file

@ -110,7 +110,8 @@ class MatrixFederationHttpClient(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def _create_request(self, destination, method, path_bytes, def _create_request(self, destination, method, path_bytes,
body_callback, headers_dict={}, param_bytes=b"", body_callback, headers_dict={}, param_bytes=b"",
query_bytes=b"", retry_on_dns_fail=True): query_bytes=b"", retry_on_dns_fail=True,
timeout=None):
""" Creates and sends a request to the given url """ Creates and sends a request to the given url
""" """
headers_dict[b"User-Agent"] = [self.version_string] headers_dict[b"User-Agent"] = [self.version_string]
@ -158,7 +159,7 @@ class MatrixFederationHttpClient(object):
response = yield self.clock.time_bound_deferred( response = yield self.clock.time_bound_deferred(
request_deferred, request_deferred,
time_out=60, time_out=timeout/1000. if timeout else 60,
) )
logger.debug("Got response to %s", method) logger.debug("Got response to %s", method)
@ -181,7 +182,7 @@ class MatrixFederationHttpClient(object):
_flatten_response_never_received(e), _flatten_response_never_received(e),
) )
if retries_left: if retries_left and not timeout:
yield sleep(2 ** (5 - retries_left)) yield sleep(2 ** (5 - retries_left))
retries_left -= 1 retries_left -= 1
else: else:
@ -334,7 +335,8 @@ class MatrixFederationHttpClient(object):
defer.returnValue(json.loads(body)) defer.returnValue(json.loads(body))
@defer.inlineCallbacks @defer.inlineCallbacks
def get_json(self, destination, path, args={}, retry_on_dns_fail=True): def get_json(self, destination, path, args={}, retry_on_dns_fail=True,
timeout=None):
""" GETs some json from the given host homeserver and path """ GETs some json from the given host homeserver and path
Args: Args:
@ -370,7 +372,8 @@ class MatrixFederationHttpClient(object):
path.encode("ascii"), path.encode("ascii"),
query_bytes=query_bytes, query_bytes=query_bytes,
body_callback=body_callback, body_callback=body_callback,
retry_on_dns_fail=retry_on_dns_fail retry_on_dns_fail=retry_on_dns_fail,
timeout=timeout,
) )
if 200 <= response.code < 300: if 200 <= response.code < 300: