mirror of
https://mau.dev/maunium/synapse.git
synced 2024-12-15 13:33:54 +01:00
Use Twisted-15.2.1, Use Agent.usingEndpointFactory rather than implement our own Agent
This commit is contained in:
parent
afbd3b2fc4
commit
90abdaf3bc
2 changed files with 26 additions and 51 deletions
|
@ -16,7 +16,7 @@
|
||||||
|
|
||||||
from twisted.internet import defer, reactor, protocol
|
from twisted.internet import defer, reactor, protocol
|
||||||
from twisted.internet.error import DNSLookupError
|
from twisted.internet.error import DNSLookupError
|
||||||
from twisted.web.client import readBody, _AgentBase, _URI, HTTPConnectionPool
|
from twisted.web.client import readBody, HTTPConnectionPool, Agent
|
||||||
from twisted.web.http_headers import Headers
|
from twisted.web.http_headers import Headers
|
||||||
from twisted.web._newclient import ResponseDone
|
from twisted.web._newclient import ResponseDone
|
||||||
|
|
||||||
|
@ -53,41 +53,17 @@ incoming_responses_counter = metrics.register_counter(
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
class MatrixFederationHttpAgent(_AgentBase):
|
class MatrixFederationEndpointFactory(object):
|
||||||
|
def __init__(self, hs):
|
||||||
|
self.tls_context_factory = hs.tls_context_factory
|
||||||
|
|
||||||
def __init__(self, reactor, pool=None):
|
def endpointForURI(self, uri):
|
||||||
_AgentBase.__init__(self, reactor, pool)
|
destination = uri.netloc
|
||||||
|
|
||||||
def request(self, destination, endpoint, method, path, params, query,
|
return matrix_federation_endpoint(
|
||||||
headers, body_producer):
|
reactor, destination, timeout=10,
|
||||||
|
ssl_context_factory=self.tls_context_factory
|
||||||
outgoing_requests_counter.inc(method)
|
)
|
||||||
|
|
||||||
host = b""
|
|
||||||
port = 0
|
|
||||||
fragment = b""
|
|
||||||
|
|
||||||
parsed_URI = _URI(b"http", destination, host, port, path, params,
|
|
||||||
query, fragment)
|
|
||||||
|
|
||||||
# Set the connection pool key to be the destination.
|
|
||||||
key = destination
|
|
||||||
|
|
||||||
d = self._requestWithEndpoint(key, endpoint, method, parsed_URI,
|
|
||||||
headers, body_producer,
|
|
||||||
parsed_URI.originForm)
|
|
||||||
|
|
||||||
def _cb(response):
|
|
||||||
incoming_responses_counter.inc(method, response.code)
|
|
||||||
return response
|
|
||||||
|
|
||||||
def _eb(failure):
|
|
||||||
incoming_responses_counter.inc(method, "ERR")
|
|
||||||
return failure
|
|
||||||
|
|
||||||
d.addCallbacks(_cb, _eb)
|
|
||||||
|
|
||||||
return d
|
|
||||||
|
|
||||||
|
|
||||||
class MatrixFederationHttpClient(object):
|
class MatrixFederationHttpClient(object):
|
||||||
|
@ -105,10 +81,17 @@ class MatrixFederationHttpClient(object):
|
||||||
self.server_name = hs.hostname
|
self.server_name = hs.hostname
|
||||||
pool = HTTPConnectionPool(reactor)
|
pool = HTTPConnectionPool(reactor)
|
||||||
pool.maxPersistentPerHost = 10
|
pool.maxPersistentPerHost = 10
|
||||||
self.agent = MatrixFederationHttpAgent(reactor, pool=pool)
|
self.agent = Agent.usingEndpointFactory(
|
||||||
|
reactor, MatrixFederationEndpointFactory(hs), pool=pool
|
||||||
|
)
|
||||||
self.clock = hs.get_clock()
|
self.clock = hs.get_clock()
|
||||||
self.version_string = hs.version_string
|
self.version_string = hs.version_string
|
||||||
|
|
||||||
|
def _create_url(self, destination, path_bytes, param_bytes, query_bytes):
|
||||||
|
return urlparse.urlunparse(
|
||||||
|
("matrix", destination, path_bytes, param_bytes, query_bytes, "")
|
||||||
|
)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _create_request(self, destination, method, path_bytes,
|
def _create_request(self, destination, method, path_bytes,
|
||||||
body_callback, headers_dict={}, param_bytes=b"",
|
body_callback, headers_dict={}, param_bytes=b"",
|
||||||
|
@ -119,8 +102,8 @@ class MatrixFederationHttpClient(object):
|
||||||
headers_dict[b"User-Agent"] = [self.version_string]
|
headers_dict[b"User-Agent"] = [self.version_string]
|
||||||
headers_dict[b"Host"] = [destination]
|
headers_dict[b"Host"] = [destination]
|
||||||
|
|
||||||
url_bytes = urlparse.urlunparse(
|
url_bytes = self._create_url(
|
||||||
("", "", path_bytes, param_bytes, query_bytes, "",)
|
destination, path_bytes, param_bytes, query_bytes
|
||||||
)
|
)
|
||||||
|
|
||||||
logger.info("Sending request to %s: %s %s",
|
logger.info("Sending request to %s: %s %s",
|
||||||
|
@ -139,22 +122,20 @@ class MatrixFederationHttpClient(object):
|
||||||
# (once we have reliable transactions in place)
|
# (once we have reliable transactions in place)
|
||||||
retries_left = 5
|
retries_left = 5
|
||||||
|
|
||||||
endpoint = self._getEndpoint(reactor, destination)
|
http_url_bytes = urlparse.urlunparse(
|
||||||
|
("", "", path_bytes, param_bytes, query_bytes, "")
|
||||||
|
)
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
producer = None
|
producer = None
|
||||||
if body_callback:
|
if body_callback:
|
||||||
producer = body_callback(method, url_bytes, headers_dict)
|
producer = body_callback(method, http_url_bytes, headers_dict)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
request_deferred = preserve_context_over_fn(
|
request_deferred = preserve_context_over_fn(
|
||||||
self.agent.request,
|
self.agent.request,
|
||||||
destination,
|
|
||||||
endpoint,
|
|
||||||
method,
|
method,
|
||||||
path_bytes,
|
url_bytes,
|
||||||
param_bytes,
|
|
||||||
query_bytes,
|
|
||||||
Headers(headers_dict),
|
Headers(headers_dict),
|
||||||
producer
|
producer
|
||||||
)
|
)
|
||||||
|
@ -442,12 +423,6 @@ class MatrixFederationHttpClient(object):
|
||||||
|
|
||||||
defer.returnValue((length, headers))
|
defer.returnValue((length, headers))
|
||||||
|
|
||||||
def _getEndpoint(self, reactor, destination):
|
|
||||||
return matrix_federation_endpoint(
|
|
||||||
reactor, destination, timeout=10,
|
|
||||||
ssl_context_factory=self.hs.tls_context_factory
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class _ReadBodyToFileProtocol(protocol.Protocol):
|
class _ReadBodyToFileProtocol(protocol.Protocol):
|
||||||
def __init__(self, stream, deferred, max_size):
|
def __init__(self, stream, deferred, max_size):
|
||||||
|
|
|
@ -19,7 +19,7 @@ logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
REQUIREMENTS = {
|
REQUIREMENTS = {
|
||||||
"syutil>=0.0.6": ["syutil>=0.0.6"],
|
"syutil>=0.0.6": ["syutil>=0.0.6"],
|
||||||
"Twisted==14.0.2": ["twisted==14.0.2"],
|
"Twisted==15.2.1": ["twisted==15.2.1"],
|
||||||
"service_identity>=1.0.0": ["service_identity>=1.0.0"],
|
"service_identity>=1.0.0": ["service_identity>=1.0.0"],
|
||||||
"pyopenssl>=0.14": ["OpenSSL>=0.14"],
|
"pyopenssl>=0.14": ["OpenSSL>=0.14"],
|
||||||
"pyyaml": ["yaml"],
|
"pyyaml": ["yaml"],
|
||||||
|
|
Loading…
Reference in a new issue