forked from MirrorHub/synapse
Implement locks using create_observer for fetching media and server keys
This commit is contained in:
parent
1c82fbd2eb
commit
e701aec2d1
2 changed files with 87 additions and 65 deletions
|
@ -24,6 +24,8 @@ from synapse.api.errors import SynapseError, Codes
|
||||||
|
|
||||||
from synapse.util.retryutils import get_retry_limiter
|
from synapse.util.retryutils import get_retry_limiter
|
||||||
|
|
||||||
|
from synapse.util.async import create_observer
|
||||||
|
|
||||||
from OpenSSL import crypto
|
from OpenSSL import crypto
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
@ -38,6 +40,8 @@ class Keyring(object):
|
||||||
self.clock = hs.get_clock()
|
self.clock = hs.get_clock()
|
||||||
self.hs = hs
|
self.hs = hs
|
||||||
|
|
||||||
|
self.key_downloads = {}
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def verify_json_for_server(self, server_name, json_object):
|
def verify_json_for_server(self, server_name, json_object):
|
||||||
logger.debug("Verifying for %s", server_name)
|
logger.debug("Verifying for %s", server_name)
|
||||||
|
@ -97,76 +101,92 @@ class Keyring(object):
|
||||||
defer.returnValue(cached[0])
|
defer.returnValue(cached[0])
|
||||||
return
|
return
|
||||||
|
|
||||||
# Try to fetch the key from the remote server.
|
@defer.inlineCallbacks
|
||||||
|
def fetch_keys():
|
||||||
|
# Try to fetch the key from the remote server.
|
||||||
|
|
||||||
limiter = yield get_retry_limiter(
|
limiter = yield get_retry_limiter(
|
||||||
server_name,
|
server_name,
|
||||||
self.clock,
|
self.clock,
|
||||||
self.store,
|
self.store,
|
||||||
)
|
|
||||||
|
|
||||||
with limiter:
|
|
||||||
(response, tls_certificate) = yield fetch_server_key(
|
|
||||||
server_name, self.hs.tls_context_factory
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# Check the response.
|
with limiter:
|
||||||
|
(response, tls_certificate) = yield fetch_server_key(
|
||||||
x509_certificate_bytes = crypto.dump_certificate(
|
server_name, self.hs.tls_context_factory
|
||||||
crypto.FILETYPE_ASN1, tls_certificate
|
|
||||||
)
|
|
||||||
|
|
||||||
if ("signatures" not in response
|
|
||||||
or server_name not in response["signatures"]):
|
|
||||||
raise ValueError("Key response not signed by remote server")
|
|
||||||
|
|
||||||
if "tls_certificate" not in response:
|
|
||||||
raise ValueError("Key response missing TLS certificate")
|
|
||||||
|
|
||||||
tls_certificate_b64 = response["tls_certificate"]
|
|
||||||
|
|
||||||
if encode_base64(x509_certificate_bytes) != tls_certificate_b64:
|
|
||||||
raise ValueError("TLS certificate doesn't match")
|
|
||||||
|
|
||||||
verify_keys = {}
|
|
||||||
for key_id, key_base64 in response["verify_keys"].items():
|
|
||||||
if is_signing_algorithm_supported(key_id):
|
|
||||||
key_bytes = decode_base64(key_base64)
|
|
||||||
verify_key = decode_verify_key_bytes(key_id, key_bytes)
|
|
||||||
verify_keys[key_id] = verify_key
|
|
||||||
|
|
||||||
for key_id in response["signatures"][server_name]:
|
|
||||||
if key_id not in response["verify_keys"]:
|
|
||||||
raise ValueError(
|
|
||||||
"Key response must include verification keys for all"
|
|
||||||
" signatures"
|
|
||||||
)
|
|
||||||
if key_id in verify_keys:
|
|
||||||
verify_signed_json(
|
|
||||||
response,
|
|
||||||
server_name,
|
|
||||||
verify_keys[key_id]
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# Cache the result in the datastore.
|
# Check the response.
|
||||||
|
|
||||||
time_now_ms = self.clock.time_msec()
|
x509_certificate_bytes = crypto.dump_certificate(
|
||||||
|
crypto.FILETYPE_ASN1, tls_certificate
|
||||||
yield self.store.store_server_certificate(
|
|
||||||
server_name,
|
|
||||||
server_name,
|
|
||||||
time_now_ms,
|
|
||||||
tls_certificate,
|
|
||||||
)
|
|
||||||
|
|
||||||
for key_id, key in verify_keys.items():
|
|
||||||
yield self.store.store_server_verify_key(
|
|
||||||
server_name, server_name, time_now_ms, key
|
|
||||||
)
|
)
|
||||||
|
|
||||||
for key_id in key_ids:
|
if ("signatures" not in response
|
||||||
if key_id in verify_keys:
|
or server_name not in response["signatures"]):
|
||||||
defer.returnValue(verify_keys[key_id])
|
raise ValueError("Key response not signed by remote server")
|
||||||
return
|
|
||||||
|
|
||||||
raise ValueError("No verification key found for given key ids")
|
if "tls_certificate" not in response:
|
||||||
|
raise ValueError("Key response missing TLS certificate")
|
||||||
|
|
||||||
|
tls_certificate_b64 = response["tls_certificate"]
|
||||||
|
|
||||||
|
if encode_base64(x509_certificate_bytes) != tls_certificate_b64:
|
||||||
|
raise ValueError("TLS certificate doesn't match")
|
||||||
|
|
||||||
|
verify_keys = {}
|
||||||
|
for key_id, key_base64 in response["verify_keys"].items():
|
||||||
|
if is_signing_algorithm_supported(key_id):
|
||||||
|
key_bytes = decode_base64(key_base64)
|
||||||
|
verify_key = decode_verify_key_bytes(key_id, key_bytes)
|
||||||
|
verify_keys[key_id] = verify_key
|
||||||
|
|
||||||
|
for key_id in response["signatures"][server_name]:
|
||||||
|
if key_id not in response["verify_keys"]:
|
||||||
|
raise ValueError(
|
||||||
|
"Key response must include verification keys for all"
|
||||||
|
" signatures"
|
||||||
|
)
|
||||||
|
if key_id in verify_keys:
|
||||||
|
verify_signed_json(
|
||||||
|
response,
|
||||||
|
server_name,
|
||||||
|
verify_keys[key_id]
|
||||||
|
)
|
||||||
|
|
||||||
|
# Cache the result in the datastore.
|
||||||
|
|
||||||
|
time_now_ms = self.clock.time_msec()
|
||||||
|
|
||||||
|
yield self.store.store_server_certificate(
|
||||||
|
server_name,
|
||||||
|
server_name,
|
||||||
|
time_now_ms,
|
||||||
|
tls_certificate,
|
||||||
|
)
|
||||||
|
|
||||||
|
for key_id, key in verify_keys.items():
|
||||||
|
yield self.store.store_server_verify_key(
|
||||||
|
server_name, server_name, time_now_ms, key
|
||||||
|
)
|
||||||
|
|
||||||
|
for key_id in key_ids:
|
||||||
|
if key_id in verify_keys:
|
||||||
|
defer.returnValue(verify_keys[key_id])
|
||||||
|
return
|
||||||
|
|
||||||
|
raise ValueError("No verification key found for given key ids")
|
||||||
|
|
||||||
|
download = self.key_downloads.get(server_name)
|
||||||
|
|
||||||
|
if download is None:
|
||||||
|
download = fetch_keys()
|
||||||
|
self.key_downloads[server_name] = download
|
||||||
|
|
||||||
|
@download.addBoth
|
||||||
|
def callback(ret):
|
||||||
|
del self.key_downloads[server_name]
|
||||||
|
return ret
|
||||||
|
|
||||||
|
r = yield create_observer(download)
|
||||||
|
defer.returnValue(r)
|
||||||
|
|
|
@ -25,6 +25,8 @@ from twisted.internet import defer
|
||||||
from twisted.web.resource import Resource
|
from twisted.web.resource import Resource
|
||||||
from twisted.protocols.basic import FileSender
|
from twisted.protocols.basic import FileSender
|
||||||
|
|
||||||
|
from synapse.util.async import create_observer
|
||||||
|
|
||||||
import os
|
import os
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
@ -87,7 +89,7 @@ class BaseMediaResource(Resource):
|
||||||
def callback(media_info):
|
def callback(media_info):
|
||||||
del self.downloads[key]
|
del self.downloads[key]
|
||||||
return media_info
|
return media_info
|
||||||
return download
|
return create_observer(download)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _get_remote_media_impl(self, server_name, media_id):
|
def _get_remote_media_impl(self, server_name, media_id):
|
||||||
|
|
Loading…
Reference in a new issue