Use run_in_background in preference to preserve_fn

While I was going through uses of preserve_fn for other PRs, I converted places
which only use the wrapped function once to use run_in_background, to avoid
creating the function object.
This commit is contained in:
Richard van der Hoff 2018-04-27 11:29:27 +01:00
parent 0ced8b5b47
commit 2a13af23bc
22 changed files with 97 additions and 71 deletions

View file

@ -38,7 +38,7 @@ from synapse.server import HomeServer
from synapse.storage.engines import create_engine from synapse.storage.engines import create_engine
from synapse.util.async import Linearizer from synapse.util.async import Linearizer
from synapse.util.httpresourcetree import create_resource_tree from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, preserve_fn from synapse.util.logcontext import LoggingContext, run_in_background
from synapse.util.manhole import manhole from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string from synapse.util.versionstring import get_version_string
from twisted.internet import defer, reactor from twisted.internet import defer, reactor
@ -229,7 +229,7 @@ class FederationSenderHandler(object):
# presence, typing, etc. # presence, typing, etc.
if stream_name == "federation": if stream_name == "federation":
send_queue.process_rows_for_federation(self.federation_sender, rows) send_queue.process_rows_for_federation(self.federation_sender, rows)
preserve_fn(self.update_token)(token) run_in_background(self.update_token, token)
# We also need to poke the federation sender when new events happen # We also need to poke the federation sender when new events happen
elif stream_name == "events": elif stream_name == "events":

View file

@ -33,7 +33,7 @@ from synapse.server import HomeServer
from synapse.storage import DataStore from synapse.storage import DataStore
from synapse.storage.engines import create_engine from synapse.storage.engines import create_engine
from synapse.util.httpresourcetree import create_resource_tree from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, preserve_fn from synapse.util.logcontext import LoggingContext, run_in_background
from synapse.util.manhole import manhole from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string from synapse.util.versionstring import get_version_string
from twisted.internet import defer, reactor from twisted.internet import defer, reactor
@ -140,7 +140,7 @@ class PusherReplicationHandler(ReplicationClientHandler):
def on_rdata(self, stream_name, token, rows): def on_rdata(self, stream_name, token, rows):
super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows) super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows)
preserve_fn(self.poke_pushers)(stream_name, token, rows) run_in_background(self.poke_pushers, stream_name, token, rows)
@defer.inlineCallbacks @defer.inlineCallbacks
def poke_pushers(self, stream_name, token, rows): def poke_pushers(self, stream_name, token, rows):

View file

@ -51,7 +51,7 @@ from synapse.storage.engines import create_engine
from synapse.storage.presence import UserPresenceState from synapse.storage.presence import UserPresenceState
from synapse.storage.roommember import RoomMemberStore from synapse.storage.roommember import RoomMemberStore
from synapse.util.httpresourcetree import create_resource_tree from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, preserve_fn from synapse.util.logcontext import LoggingContext, run_in_background
from synapse.util.manhole import manhole from synapse.util.manhole import manhole
from synapse.util.stringutils import random_string from synapse.util.stringutils import random_string
from synapse.util.versionstring import get_version_string from synapse.util.versionstring import get_version_string
@ -327,8 +327,7 @@ class SyncReplicationHandler(ReplicationClientHandler):
def on_rdata(self, stream_name, token, rows): def on_rdata(self, stream_name, token, rows):
super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows) super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows)
run_in_background(self.process_and_notify, stream_name, token, rows)
preserve_fn(self.process_and_notify)(stream_name, token, rows)
def get_streams_to_replicate(self): def get_streams_to_replicate(self):
args = super(SyncReplicationHandler, self).get_streams_to_replicate() args = super(SyncReplicationHandler, self).get_streams_to_replicate()

View file

@ -51,7 +51,7 @@ components.
from twisted.internet import defer from twisted.internet import defer
from synapse.appservice import ApplicationServiceState from synapse.appservice import ApplicationServiceState
from synapse.util.logcontext import preserve_fn from synapse.util.logcontext import run_in_background
from synapse.util.metrics import Measure from synapse.util.metrics import Measure
import logging import logging
@ -106,7 +106,7 @@ class _ServiceQueuer(object):
def enqueue(self, service, event): def enqueue(self, service, event):
# if this service isn't being sent something # if this service isn't being sent something
self.queued_events.setdefault(service.id, []).append(event) self.queued_events.setdefault(service.id, []).append(event)
preserve_fn(self._send_request)(service) run_in_background(self._send_request, service)
@defer.inlineCallbacks @defer.inlineCallbacks
def _send_request(self, service): def _send_request(self, service):
@ -152,10 +152,10 @@ class _TransactionController(object):
if sent: if sent:
yield txn.complete(self.store) yield txn.complete(self.store)
else: else:
preserve_fn(self._start_recoverer)(service) run_in_background(self._start_recoverer, service)
except Exception as e: except Exception:
logger.exception(e) logger.exception("Error creating appservice transaction")
preserve_fn(self._start_recoverer)(service) run_in_background(self._start_recoverer, service)
@defer.inlineCallbacks @defer.inlineCallbacks
def on_recovered(self, recoverer): def on_recovered(self, recoverer):

View file

@ -19,7 +19,8 @@ from synapse.api.errors import SynapseError, Codes
from synapse.util import unwrapFirstError, logcontext from synapse.util import unwrapFirstError, logcontext
from synapse.util.logcontext import ( from synapse.util.logcontext import (
PreserveLoggingContext, PreserveLoggingContext,
preserve_fn preserve_fn,
run_in_background,
) )
from synapse.util.metrics import Measure from synapse.util.metrics import Measure
@ -127,7 +128,7 @@ class Keyring(object):
verify_requests.append(verify_request) verify_requests.append(verify_request)
preserve_fn(self._start_key_lookups)(verify_requests) run_in_background(self._start_key_lookups, verify_requests)
# Pass those keys to handle_key_deferred so that the json object # Pass those keys to handle_key_deferred so that the json object
# signatures can be verified # signatures can be verified
@ -313,7 +314,7 @@ class Keyring(object):
if not verify_request.deferred.called: if not verify_request.deferred.called:
verify_request.deferred.errback(err) verify_request.deferred.errback(err)
preserve_fn(do_iterations)().addErrback(on_err) run_in_background(do_iterations).addErrback(on_err)
@defer.inlineCallbacks @defer.inlineCallbacks
def get_keys_from_store(self, server_name_and_key_ids): def get_keys_from_store(self, server_name_and_key_ids):
@ -329,8 +330,9 @@ class Keyring(object):
""" """
res = yield logcontext.make_deferred_yieldable(defer.gatherResults( res = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[ [
preserve_fn(self.store.get_server_verify_keys)( run_in_background(
server_name, key_ids self.store.get_server_verify_keys,
server_name, key_ids,
).addCallback(lambda ks, server: (server, ks), server_name) ).addCallback(lambda ks, server: (server, ks), server_name)
for server_name, key_ids in server_name_and_key_ids for server_name, key_ids in server_name_and_key_ids
], ],
@ -358,7 +360,7 @@ class Keyring(object):
results = yield logcontext.make_deferred_yieldable(defer.gatherResults( results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[ [
preserve_fn(get_key)(p_name, p_keys) run_in_background(get_key, p_name, p_keys)
for p_name, p_keys in self.perspective_servers.items() for p_name, p_keys in self.perspective_servers.items()
], ],
consumeErrors=True, consumeErrors=True,
@ -398,7 +400,7 @@ class Keyring(object):
results = yield logcontext.make_deferred_yieldable(defer.gatherResults( results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[ [
preserve_fn(get_key)(server_name, key_ids) run_in_background(get_key, server_name, key_ids)
for server_name, key_ids in server_name_and_key_ids for server_name, key_ids in server_name_and_key_ids
], ],
consumeErrors=True, consumeErrors=True,
@ -481,7 +483,8 @@ class Keyring(object):
yield logcontext.make_deferred_yieldable(defer.gatherResults( yield logcontext.make_deferred_yieldable(defer.gatherResults(
[ [
preserve_fn(self.store_keys)( run_in_background(
self.store_keys,
server_name=server_name, server_name=server_name,
from_server=perspective_name, from_server=perspective_name,
verify_keys=response_keys, verify_keys=response_keys,
@ -539,7 +542,8 @@ class Keyring(object):
yield logcontext.make_deferred_yieldable(defer.gatherResults( yield logcontext.make_deferred_yieldable(defer.gatherResults(
[ [
preserve_fn(self.store_keys)( run_in_background(
self.store_keys,
server_name=key_server_name, server_name=key_server_name,
from_server=server_name, from_server=server_name,
verify_keys=verify_keys, verify_keys=verify_keys,
@ -615,7 +619,8 @@ class Keyring(object):
yield logcontext.make_deferred_yieldable(defer.gatherResults( yield logcontext.make_deferred_yieldable(defer.gatherResults(
[ [
preserve_fn(self.store.store_server_keys_json)( run_in_background(
self.store.store_server_keys_json,
server_name=server_name, server_name=server_name,
key_id=key_id, key_id=key_id,
from_server=server_name, from_server=server_name,
@ -716,7 +721,8 @@ class Keyring(object):
# TODO(markjh): Store whether the keys have expired. # TODO(markjh): Store whether the keys have expired.
return logcontext.make_deferred_yieldable(defer.gatherResults( return logcontext.make_deferred_yieldable(defer.gatherResults(
[ [
preserve_fn(self.store.store_server_verify_key)( run_in_background(
self.store.store_server_verify_key,
server_name, server_name, key.time_added, key server_name, server_name, key.time_added, key
) )
for key_id, key in verify_keys.items() for key_id, key in verify_keys.items()

View file

@ -33,7 +33,7 @@ from synapse.federation.federation_base import (
import synapse.metrics import synapse.metrics
from synapse.util import logcontext, unwrapFirstError from synapse.util import logcontext, unwrapFirstError
from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.util.logutils import log_function from synapse.util.logutils import log_function
from synapse.util.retryutils import NotRetryingDestination from synapse.util.retryutils import NotRetryingDestination
@ -417,7 +417,8 @@ class FederationClient(FederationBase):
batch = set(missing_events[i:i + batch_size]) batch = set(missing_events[i:i + batch_size])
deferreds = [ deferreds = [
preserve_fn(self.get_pdu)( run_in_background(
self.get_pdu,
destinations=random_server_list(), destinations=random_server_list(),
event_id=e_id, event_id=e_id,
) )

View file

@ -42,7 +42,7 @@ from twisted.internet import defer
from synapse.api.errors import SynapseError from synapse.api.errors import SynapseError
from synapse.types import get_domain_from_id from synapse.types import get_domain_from_id
from synapse.util.logcontext import preserve_fn from synapse.util.logcontext import run_in_background
from signedjson.sign import sign_json from signedjson.sign import sign_json
@ -192,4 +192,4 @@ class GroupAttestionRenewer(object):
group_id = row["group_id"] group_id = row["group_id"]
user_id = row["user_id"] user_id = row["user_id"]
preserve_fn(_renew_attestation)(group_id, user_id) run_in_background(_renew_attestation, group_id, user_id)

View file

@ -198,7 +198,10 @@ class ApplicationServicesHandler(object):
services = yield self._get_services_for_3pn(protocol) services = yield self._get_services_for_3pn(protocol)
results = yield make_deferred_yieldable(defer.DeferredList([ results = yield make_deferred_yieldable(defer.DeferredList([
preserve_fn(self.appservice_api.query_3pe)(service, kind, protocol, fields) run_in_background(
self.appservice_api.query_3pe,
service, kind, protocol, fields,
)
for service in services for service in services
], consumeErrors=True)) ], consumeErrors=True))

View file

@ -24,7 +24,7 @@ from synapse.api.errors import (
SynapseError, CodeMessageException, FederationDeniedError, SynapseError, CodeMessageException, FederationDeniedError,
) )
from synapse.types import get_domain_from_id, UserID from synapse.types import get_domain_from_id, UserID
from synapse.util.logcontext import preserve_fn, make_deferred_yieldable from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.util.retryutils import NotRetryingDestination from synapse.util.retryutils import NotRetryingDestination
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -139,7 +139,7 @@ class E2eKeysHandler(object):
failures[destination] = _exception_to_failure(e) failures[destination] = _exception_to_failure(e)
yield make_deferred_yieldable(defer.gatherResults([ yield make_deferred_yieldable(defer.gatherResults([
preserve_fn(do_remote_query)(destination) run_in_background(do_remote_query, destination)
for destination in remote_queries_not_in_cache for destination in remote_queries_not_in_cache
])) ]))
@ -242,7 +242,7 @@ class E2eKeysHandler(object):
failures[destination] = _exception_to_failure(e) failures[destination] = _exception_to_failure(e)
yield make_deferred_yieldable(defer.gatherResults([ yield make_deferred_yieldable(defer.gatherResults([
preserve_fn(claim_client_keys)(destination) run_in_background(claim_client_keys, destination)
for destination in remote_queries for destination in remote_queries
])) ]))

View file

@ -637,7 +637,8 @@ class FederationHandler(BaseHandler):
results = yield logcontext.make_deferred_yieldable(defer.gatherResults( results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[ [
logcontext.preserve_fn(self.replication_layer.get_pdu)( logcontext.run_in_background(
self.replication_layer.get_pdu,
[dest], [dest],
event_id, event_id,
outlier=True, outlier=True,
@ -1023,7 +1024,7 @@ class FederationHandler(BaseHandler):
# lots of requests for missing prev_events which we do actually # lots of requests for missing prev_events which we do actually
# have. Hence we fire off the deferred, but don't wait for it. # have. Hence we fire off the deferred, but don't wait for it.
logcontext.preserve_fn(self._handle_queued_pdus)(room_queue) logcontext.run_in_background(self._handle_queued_pdus, room_queue)
defer.returnValue(True) defer.returnValue(True)
@ -1523,8 +1524,9 @@ class FederationHandler(BaseHandler):
if not backfilled: if not backfilled:
# this intentionally does not yield: we don't care about the result # this intentionally does not yield: we don't care about the result
# and don't need to wait for it. # and don't need to wait for it.
logcontext.preserve_fn(self.pusher_pool.on_new_notifications)( logcontext.run_in_background(
event_stream_id, max_stream_id self.pusher_pool.on_new_notifications,
event_stream_id, max_stream_id,
) )
defer.returnValue((context, event_stream_id, max_stream_id)) defer.returnValue((context, event_stream_id, max_stream_id))
@ -1538,7 +1540,8 @@ class FederationHandler(BaseHandler):
""" """
contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults( contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[ [
logcontext.preserve_fn(self._prep_event)( logcontext.run_in_background(
self._prep_event,
origin, origin,
ev_info["event"], ev_info["event"],
state=ev_info.get("state"), state=ev_info.get("state"),
@ -1867,7 +1870,8 @@ class FederationHandler(BaseHandler):
different_events = yield logcontext.make_deferred_yieldable( different_events = yield logcontext.make_deferred_yieldable(
defer.gatherResults([ defer.gatherResults([
logcontext.preserve_fn(self.store.get_event)( logcontext.run_in_background(
self.store.get_event,
d, d,
allow_none=True, allow_none=True,
allow_rejected=False, allow_rejected=False,

View file

@ -27,7 +27,7 @@ from synapse.types import (
from synapse.util import unwrapFirstError from synapse.util import unwrapFirstError
from synapse.util.async import concurrently_execute from synapse.util.async import concurrently_execute
from synapse.util.caches.snapshot_cache import SnapshotCache from synapse.util.caches.snapshot_cache import SnapshotCache
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.visibility import filter_events_for_client from synapse.visibility import filter_events_for_client
from ._base import BaseHandler from ._base import BaseHandler
@ -166,7 +166,8 @@ class InitialSyncHandler(BaseHandler):
(messages, token), current_state = yield make_deferred_yieldable( (messages, token), current_state = yield make_deferred_yieldable(
defer.gatherResults( defer.gatherResults(
[ [
preserve_fn(self.store.get_recent_events_for_room)( run_in_background(
self.store.get_recent_events_for_room,
event.room_id, event.room_id,
limit=limit, limit=limit,
end_token=room_end_token, end_token=room_end_token,
@ -391,9 +392,10 @@ class InitialSyncHandler(BaseHandler):
presence, receipts, (messages, token) = yield defer.gatherResults( presence, receipts, (messages, token) = yield defer.gatherResults(
[ [
preserve_fn(get_presence)(), run_in_background(get_presence),
preserve_fn(get_receipts)(), run_in_background(get_receipts),
preserve_fn(self.store.get_recent_events_for_room)( run_in_background(
self.store.get_recent_events_for_room,
room_id, room_id,
limit=limit, limit=limit,
end_token=now_token.room_key, end_token=now_token.room_key,

View file

@ -850,7 +850,8 @@ class EventCreationHandler(object):
# this intentionally does not yield: we don't care about the result # this intentionally does not yield: we don't care about the result
# and don't need to wait for it. # and don't need to wait for it.
preserve_fn(self.pusher_pool.on_new_notifications)( run_in_background(
self.pusher_pool.on_new_notifications,
event_stream_id, max_stream_id event_stream_id, max_stream_id
) )
@ -862,7 +863,7 @@ class EventCreationHandler(object):
extra_users=extra_users extra_users=extra_users
) )
preserve_fn(_notify)() run_in_background(_notify)
if event.type == EventTypes.Message: if event.type == EventTypes.Message:
presence = self.hs.get_presence_handler() presence = self.hs.get_presence_handler()

View file

@ -16,7 +16,7 @@
from twisted.internet import defer from twisted.internet import defer
from synapse.api.errors import SynapseError, AuthError from synapse.api.errors import SynapseError, AuthError
from synapse.util.logcontext import preserve_fn from synapse.util.logcontext import run_in_background
from synapse.util.metrics import Measure from synapse.util.metrics import Measure
from synapse.util.wheel_timer import WheelTimer from synapse.util.wheel_timer import WheelTimer
from synapse.types import UserID, get_domain_from_id from synapse.types import UserID, get_domain_from_id
@ -97,7 +97,8 @@ class TypingHandler(object):
if self.hs.is_mine_id(member.user_id): if self.hs.is_mine_id(member.user_id):
last_fed_poke = self._member_last_federation_poke.get(member, None) last_fed_poke = self._member_last_federation_poke.get(member, None)
if not last_fed_poke or last_fed_poke + FEDERATION_PING_INTERVAL <= now: if not last_fed_poke or last_fed_poke + FEDERATION_PING_INTERVAL <= now:
preserve_fn(self._push_remote)( run_in_background(
self._push_remote,
member=member, member=member,
typing=True typing=True
) )
@ -196,7 +197,7 @@ class TypingHandler(object):
def _push_update(self, member, typing): def _push_update(self, member, typing):
if self.hs.is_mine_id(member.user_id): if self.hs.is_mine_id(member.user_id):
# Only send updates for changes to our own users. # Only send updates for changes to our own users.
preserve_fn(self._push_remote)(member, typing) run_in_background(self._push_remote, member, typing)
self._push_update_local( self._push_update_local(
member=member, member=member,

View file

@ -14,13 +14,13 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import logging
from twisted.internet import defer from twisted.internet import defer
from .pusher import PusherFactory from synapse.push.pusher import PusherFactory
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
from synapse.util.async import run_on_reactor from synapse.util.async import run_on_reactor
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -137,8 +137,9 @@ class PusherPool:
if u in self.pushers: if u in self.pushers:
for p in self.pushers[u].values(): for p in self.pushers[u].values():
deferreds.append( deferreds.append(
preserve_fn(p.on_new_notifications)( run_in_background(
min_stream_id, max_stream_id p.on_new_notifications,
min_stream_id, max_stream_id,
) )
) )
@ -164,7 +165,10 @@ class PusherPool:
if u in self.pushers: if u in self.pushers:
for p in self.pushers[u].values(): for p in self.pushers[u].values():
deferreds.append( deferreds.append(
preserve_fn(p.on_new_receipts)(min_stream_id, max_stream_id) run_in_background(
p.on_new_receipts,
min_stream_id, max_stream_id,
)
) )
yield make_deferred_yieldable(defer.gatherResults(deferreds)) yield make_deferred_yieldable(defer.gatherResults(deferreds))
@ -207,7 +211,7 @@ class PusherPool:
if appid_pushkey in byuser: if appid_pushkey in byuser:
byuser[appid_pushkey].on_stop() byuser[appid_pushkey].on_stop()
byuser[appid_pushkey] = p byuser[appid_pushkey] = p
preserve_fn(p.on_started)() run_in_background(p.on_started)
logger.info("Started pushers") logger.info("Started pushers")

View file

@ -35,7 +35,7 @@ from ._base import FileInfo
from synapse.api.errors import ( from synapse.api.errors import (
SynapseError, Codes, SynapseError, Codes,
) )
from synapse.util.logcontext import preserve_fn, make_deferred_yieldable from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.util.stringutils import random_string from synapse.util.stringutils import random_string
from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.caches.expiringcache import ExpiringCache
from synapse.http.client import SpiderHttpClient from synapse.http.client import SpiderHttpClient
@ -144,7 +144,8 @@ class PreviewUrlResource(Resource):
observable = self._cache.get(url) observable = self._cache.get(url)
if not observable: if not observable:
download = preserve_fn(self._do_preview)( download = run_in_background(
self._do_preview,
url, requester.user, ts, url, requester.user, ts,
) )
observable = ObservableDeferred( observable = ObservableDeferred(

View file

@ -20,7 +20,7 @@ from synapse.events import FrozenEvent
from synapse.events.utils import prune_event from synapse.events.utils import prune_event
from synapse.util.logcontext import ( from synapse.util.logcontext import (
preserve_fn, PreserveLoggingContext, make_deferred_yieldable PreserveLoggingContext, make_deferred_yieldable, run_in_background,
) )
from synapse.util.metrics import Measure from synapse.util.metrics import Measure
from synapse.api.errors import SynapseError from synapse.api.errors import SynapseError
@ -319,7 +319,8 @@ class EventsWorkerStore(SQLBaseStore):
res = yield make_deferred_yieldable(defer.gatherResults( res = yield make_deferred_yieldable(defer.gatherResults(
[ [
preserve_fn(self._get_event_from_row)( run_in_background(
self._get_event_from_row,
row["internal_metadata"], row["json"], row["redacts"], row["internal_metadata"], row["json"], row["redacts"],
rejected_reason=row["rejects"], rejected_reason=row["rejects"],
) )

View file

@ -41,7 +41,7 @@ from synapse.storage.events import EventsWorkerStore
from synapse.util.caches.descriptors import cached from synapse.util.caches.descriptors import cached
from synapse.types import RoomStreamToken from synapse.types import RoomStreamToken
from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.storage.engines import PostgresEngine, Sqlite3Engine from synapse.storage.engines import PostgresEngine, Sqlite3Engine
import abc import abc
@ -198,7 +198,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
room_ids = list(room_ids) room_ids = list(room_ids)
for rm_ids in (room_ids[i:i + 20] for i in xrange(0, len(room_ids), 20)): for rm_ids in (room_ids[i:i + 20] for i in xrange(0, len(room_ids), 20)):
res = yield make_deferred_yieldable(defer.gatherResults([ res = yield make_deferred_yieldable(defer.gatherResults([
preserve_fn(self.get_room_events_stream_for_room)( run_in_background(
self.get_room_events_stream_for_room,
room_id, from_key, to_key, limit, order=order, room_id, from_key, to_key, limit, order=order,
) )
for room_id in rm_ids for room_id in rm_ids

View file

@ -17,7 +17,7 @@
from twisted.internet import defer, reactor from twisted.internet import defer, reactor
from .logcontext import ( from .logcontext import (
PreserveLoggingContext, make_deferred_yieldable, preserve_fn PreserveLoggingContext, make_deferred_yieldable, run_in_background
) )
from synapse.util import logcontext, unwrapFirstError from synapse.util import logcontext, unwrapFirstError
@ -161,7 +161,7 @@ def concurrently_execute(func, args, limit):
pass pass
return logcontext.make_deferred_yieldable(defer.gatherResults([ return logcontext.make_deferred_yieldable(defer.gatherResults([
preserve_fn(_concurrently_execute_inner)() run_in_background(_concurrently_execute_inner)
for _ in xrange(limit) for _ in xrange(limit)
], consumeErrors=True)).addErrback(unwrapFirstError) ], consumeErrors=True)).addErrback(unwrapFirstError)

View file

@ -15,7 +15,7 @@
from twisted.internet import threads, reactor from twisted.internet import threads, reactor
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from six.moves import queue from six.moves import queue
@ -70,7 +70,9 @@ class BackgroundFileConsumer(object):
self._producer = producer self._producer = producer
self.streaming = streaming self.streaming = streaming
self._finished_deferred = preserve_fn(threads.deferToThread)(self._writer) self._finished_deferred = run_in_background(
threads.deferToThread, self._writer
)
if not streaming: if not streaming:
self._producer.resumeProducing() self._producer.resumeProducing()

View file

@ -341,7 +341,7 @@ def make_deferred_yieldable(deferred):
returning a deferred. Then, when the deferred completes, restores the returning a deferred. Then, when the deferred completes, restores the
current logcontext before running callbacks/errbacks. current logcontext before running callbacks/errbacks.
(This is more-or-less the opposite operation to preserve_fn.) (This is more-or-less the opposite operation to run_in_background.)
""" """
if isinstance(deferred, defer.Deferred) and not deferred.called: if isinstance(deferred, defer.Deferred) and not deferred.called:
prev_context = LoggingContext.set_current_context(LoggingContext.sentinel) prev_context = LoggingContext.set_current_context(LoggingContext.sentinel)

View file

@ -18,7 +18,7 @@ from twisted.internet import defer
from synapse.api.errors import LimitExceededError from synapse.api.errors import LimitExceededError
from synapse.util.async import sleep from synapse.util.async import sleep
from synapse.util.logcontext import preserve_fn from synapse.util.logcontext import run_in_background
import collections import collections
import contextlib import contextlib
@ -150,7 +150,7 @@ class _PerHostRatelimiter(object):
"Ratelimit [%s]: sleeping req", "Ratelimit [%s]: sleeping req",
id(request_id), id(request_id),
) )
ret_defer = preserve_fn(sleep)(self.sleep_msec / 1000.0) ret_defer = run_in_background(sleep, self.sleep_msec / 1000.0)
self.sleeping_requests.add(request_id) self.sleeping_requests.add(request_id)

View file

@ -203,8 +203,8 @@ class RetryDestinationLimiter(object):
) )
except Exception: except Exception:
logger.exception( logger.exception(
"Failed to store set_destination_retry_timings", "Failed to store destination_retry_timings",
) )
# we deliberately do this in the background. # we deliberately do this in the background.
synapse.util.logcontext.preserve_fn(store_retry_timings)() synapse.util.logcontext.run_in_background(store_retry_timings)