forked from MirrorHub/synapse
Log outbound requests when we retry
This commit is contained in:
parent
8fd93b5eea
commit
8c5b84441b
1 changed files with 72 additions and 73 deletions
|
@ -177,11 +177,6 @@ class MatrixFederationHttpClient(object):
|
|||
txn_id = "%s-O-%s" % (method, self._next_id)
|
||||
self._next_id = (self._next_id + 1) % (MAXINT - 1)
|
||||
|
||||
outbound_logger.info(
|
||||
"{%s} [%s] Sending request: %s %s",
|
||||
txn_id, destination, method, url
|
||||
)
|
||||
|
||||
# XXX: Would be much nicer to retry only at the transaction-layer
|
||||
# (once we have reliable transactions in place)
|
||||
if long_retries:
|
||||
|
@ -194,85 +189,89 @@ class MatrixFederationHttpClient(object):
|
|||
).decode('ascii')
|
||||
|
||||
log_result = None
|
||||
try:
|
||||
while True:
|
||||
try:
|
||||
if json_callback:
|
||||
json = json_callback()
|
||||
while True:
|
||||
try:
|
||||
if json_callback:
|
||||
json = json_callback()
|
||||
|
||||
if json:
|
||||
data = encode_canonical_json(json)
|
||||
headers_dict["Content-Type"] = ["application/json"]
|
||||
self.sign_request(
|
||||
destination, method, http_url, headers_dict, json
|
||||
)
|
||||
else:
|
||||
data = None
|
||||
self.sign_request(destination, method, http_url, headers_dict)
|
||||
|
||||
request_deferred = treq.request(
|
||||
method,
|
||||
url,
|
||||
headers=Headers(headers_dict),
|
||||
data=data,
|
||||
agent=self.agent,
|
||||
)
|
||||
add_timeout_to_deferred(
|
||||
request_deferred,
|
||||
timeout / 1000. if timeout else 60,
|
||||
self.hs.get_reactor(),
|
||||
cancelled_to_request_timed_out_error,
|
||||
)
|
||||
response = yield make_deferred_yieldable(
|
||||
request_deferred,
|
||||
if json:
|
||||
data = encode_canonical_json(json)
|
||||
headers_dict["Content-Type"] = ["application/json"]
|
||||
self.sign_request(
|
||||
destination, method, http_url, headers_dict, json
|
||||
)
|
||||
else:
|
||||
data = None
|
||||
self.sign_request(destination, method, http_url, headers_dict)
|
||||
|
||||
log_result = "%d %s" % (response.code, response.phrase,)
|
||||
break
|
||||
except Exception as e:
|
||||
if not retry_on_dns_fail and isinstance(e, DNSLookupError):
|
||||
logger.warn(
|
||||
"DNS Lookup failed to %s with %s",
|
||||
destination,
|
||||
e
|
||||
)
|
||||
log_result = "DNS Lookup failed to %s with %s" % (
|
||||
destination, e
|
||||
)
|
||||
raise
|
||||
outbound_logger.info(
|
||||
"{%s} [%s] Sending request: %s %s",
|
||||
txn_id, destination, method, url
|
||||
)
|
||||
|
||||
request_deferred = treq.request(
|
||||
method,
|
||||
url,
|
||||
headers=Headers(headers_dict),
|
||||
data=data,
|
||||
agent=self.agent,
|
||||
)
|
||||
add_timeout_to_deferred(
|
||||
request_deferred,
|
||||
timeout / 1000. if timeout else 60,
|
||||
self.hs.get_reactor(),
|
||||
cancelled_to_request_timed_out_error,
|
||||
)
|
||||
response = yield make_deferred_yieldable(
|
||||
request_deferred,
|
||||
)
|
||||
|
||||
log_result = "%d %s" % (response.code, response.phrase,)
|
||||
break
|
||||
except Exception as e:
|
||||
if not retry_on_dns_fail and isinstance(e, DNSLookupError):
|
||||
logger.warn(
|
||||
"{%s} Sending request failed to %s: %s %s: %s",
|
||||
txn_id,
|
||||
"DNS Lookup failed to %s with %s",
|
||||
destination,
|
||||
method,
|
||||
url,
|
||||
_flatten_response_never_received(e),
|
||||
e
|
||||
)
|
||||
log_result = "DNS Lookup failed to %s with %s" % (
|
||||
destination, e
|
||||
)
|
||||
raise
|
||||
|
||||
log_result = _flatten_response_never_received(e)
|
||||
logger.warn(
|
||||
"{%s} Sending request failed to %s: %s %s: %s",
|
||||
txn_id,
|
||||
destination,
|
||||
method,
|
||||
url,
|
||||
_flatten_response_never_received(e),
|
||||
)
|
||||
|
||||
if retries_left and not timeout:
|
||||
if long_retries:
|
||||
delay = 4 ** (MAX_LONG_RETRIES + 1 - retries_left)
|
||||
delay = min(delay, 60)
|
||||
delay *= random.uniform(0.8, 1.4)
|
||||
else:
|
||||
delay = 0.5 * 2 ** (MAX_SHORT_RETRIES - retries_left)
|
||||
delay = min(delay, 2)
|
||||
delay *= random.uniform(0.8, 1.4)
|
||||
log_result = _flatten_response_never_received(e)
|
||||
|
||||
yield self.clock.sleep(delay)
|
||||
retries_left -= 1
|
||||
if retries_left and not timeout:
|
||||
if long_retries:
|
||||
delay = 4 ** (MAX_LONG_RETRIES + 1 - retries_left)
|
||||
delay = min(delay, 60)
|
||||
delay *= random.uniform(0.8, 1.4)
|
||||
else:
|
||||
raise
|
||||
finally:
|
||||
outbound_logger.info(
|
||||
"{%s} [%s] Result: %s",
|
||||
txn_id,
|
||||
destination,
|
||||
log_result,
|
||||
)
|
||||
delay = 0.5 * 2 ** (MAX_SHORT_RETRIES - retries_left)
|
||||
delay = min(delay, 2)
|
||||
delay *= random.uniform(0.8, 1.4)
|
||||
|
||||
yield self.clock.sleep(delay)
|
||||
retries_left -= 1
|
||||
else:
|
||||
raise
|
||||
finally:
|
||||
outbound_logger.info(
|
||||
"{%s} [%s] Result: %s",
|
||||
txn_id,
|
||||
destination,
|
||||
log_result,
|
||||
)
|
||||
|
||||
if 200 <= response.code < 300:
|
||||
pass
|
||||
|
|
Loading…
Reference in a new issue