mirror of
https://mau.dev/maunium/synapse.git
synced 2024-11-04 21:58:54 +01:00
Merge pull request #132 from matrix-org/observer_and_locks
Observer and locks
This commit is contained in:
commit
1c1d67dfef
3 changed files with 42 additions and 1 deletions
|
@ -24,6 +24,8 @@ from synapse.api.errors import SynapseError, Codes
|
||||||
|
|
||||||
from synapse.util.retryutils import get_retry_limiter
|
from synapse.util.retryutils import get_retry_limiter
|
||||||
|
|
||||||
|
from synapse.util.async import create_observer
|
||||||
|
|
||||||
from OpenSSL import crypto
|
from OpenSSL import crypto
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
@ -38,6 +40,8 @@ class Keyring(object):
|
||||||
self.clock = hs.get_clock()
|
self.clock = hs.get_clock()
|
||||||
self.hs = hs
|
self.hs = hs
|
||||||
|
|
||||||
|
self.key_downloads = {}
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def verify_json_for_server(self, server_name, json_object):
|
def verify_json_for_server(self, server_name, json_object):
|
||||||
logger.debug("Verifying for %s", server_name)
|
logger.debug("Verifying for %s", server_name)
|
||||||
|
@ -97,6 +101,22 @@ class Keyring(object):
|
||||||
defer.returnValue(cached[0])
|
defer.returnValue(cached[0])
|
||||||
return
|
return
|
||||||
|
|
||||||
|
download = self.key_downloads.get(server_name)
|
||||||
|
|
||||||
|
if download is None:
|
||||||
|
download = self._get_server_verify_key_impl(server_name, key_ids)
|
||||||
|
self.key_downloads[server_name] = download
|
||||||
|
|
||||||
|
@download.addBoth
|
||||||
|
def callback(ret):
|
||||||
|
del self.key_downloads[server_name]
|
||||||
|
return ret
|
||||||
|
|
||||||
|
r = yield create_observer(download)
|
||||||
|
defer.returnValue(r)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def _get_server_verify_key_impl(self, server_name, key_ids):
|
||||||
# Try to fetch the key from the remote server.
|
# Try to fetch the key from the remote server.
|
||||||
|
|
||||||
limiter = yield get_retry_limiter(
|
limiter = yield get_retry_limiter(
|
||||||
|
|
|
@ -25,6 +25,8 @@ from twisted.internet import defer
|
||||||
from twisted.web.resource import Resource
|
from twisted.web.resource import Resource
|
||||||
from twisted.protocols.basic import FileSender
|
from twisted.protocols.basic import FileSender
|
||||||
|
|
||||||
|
from synapse.util.async import create_observer
|
||||||
|
|
||||||
import os
|
import os
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
@ -87,7 +89,7 @@ class BaseMediaResource(Resource):
|
||||||
def callback(media_info):
|
def callback(media_info):
|
||||||
del self.downloads[key]
|
del self.downloads[key]
|
||||||
return media_info
|
return media_info
|
||||||
return download
|
return create_observer(download)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _get_remote_media_impl(self, server_name, media_id):
|
def _get_remote_media_impl(self, server_name, media_id):
|
||||||
|
|
|
@ -32,3 +32,22 @@ def run_on_reactor():
|
||||||
iteration of the main loop
|
iteration of the main loop
|
||||||
"""
|
"""
|
||||||
return sleep(0)
|
return sleep(0)
|
||||||
|
|
||||||
|
|
||||||
|
def create_observer(deferred):
|
||||||
|
"""Creates a deferred that observes the result or failure of the given
|
||||||
|
deferred *without* affecting the given deferred.
|
||||||
|
"""
|
||||||
|
d = defer.Deferred()
|
||||||
|
|
||||||
|
def callback(r):
|
||||||
|
d.callback(r)
|
||||||
|
return r
|
||||||
|
|
||||||
|
def errback(f):
|
||||||
|
d.errback(f)
|
||||||
|
return f
|
||||||
|
|
||||||
|
deferred.addCallbacks(callback, errback)
|
||||||
|
|
||||||
|
return d
|
||||||
|
|
Loading…
Reference in a new issue