forked from MirrorHub/synapse
Merge pull request #3140 from matrix-org/rav/use_run_in_background
Use run_in_background in preference to preserve_fn
This commit is contained in:
commit
aab2e4da60
22 changed files with 98 additions and 72 deletions
|
@ -38,7 +38,7 @@ from synapse.server import HomeServer
|
||||||
from synapse.storage.engines import create_engine
|
from synapse.storage.engines import create_engine
|
||||||
from synapse.util.async import Linearizer
|
from synapse.util.async import Linearizer
|
||||||
from synapse.util.httpresourcetree import create_resource_tree
|
from synapse.util.httpresourcetree import create_resource_tree
|
||||||
from synapse.util.logcontext import LoggingContext, preserve_fn
|
from synapse.util.logcontext import LoggingContext, run_in_background
|
||||||
from synapse.util.manhole import manhole
|
from synapse.util.manhole import manhole
|
||||||
from synapse.util.versionstring import get_version_string
|
from synapse.util.versionstring import get_version_string
|
||||||
from twisted.internet import defer, reactor
|
from twisted.internet import defer, reactor
|
||||||
|
@ -229,7 +229,7 @@ class FederationSenderHandler(object):
|
||||||
# presence, typing, etc.
|
# presence, typing, etc.
|
||||||
if stream_name == "federation":
|
if stream_name == "federation":
|
||||||
send_queue.process_rows_for_federation(self.federation_sender, rows)
|
send_queue.process_rows_for_federation(self.federation_sender, rows)
|
||||||
preserve_fn(self.update_token)(token)
|
run_in_background(self.update_token, token)
|
||||||
|
|
||||||
# We also need to poke the federation sender when new events happen
|
# We also need to poke the federation sender when new events happen
|
||||||
elif stream_name == "events":
|
elif stream_name == "events":
|
||||||
|
|
|
@ -33,7 +33,7 @@ from synapse.server import HomeServer
|
||||||
from synapse.storage import DataStore
|
from synapse.storage import DataStore
|
||||||
from synapse.storage.engines import create_engine
|
from synapse.storage.engines import create_engine
|
||||||
from synapse.util.httpresourcetree import create_resource_tree
|
from synapse.util.httpresourcetree import create_resource_tree
|
||||||
from synapse.util.logcontext import LoggingContext, preserve_fn
|
from synapse.util.logcontext import LoggingContext, run_in_background
|
||||||
from synapse.util.manhole import manhole
|
from synapse.util.manhole import manhole
|
||||||
from synapse.util.versionstring import get_version_string
|
from synapse.util.versionstring import get_version_string
|
||||||
from twisted.internet import defer, reactor
|
from twisted.internet import defer, reactor
|
||||||
|
@ -140,7 +140,7 @@ class PusherReplicationHandler(ReplicationClientHandler):
|
||||||
|
|
||||||
def on_rdata(self, stream_name, token, rows):
|
def on_rdata(self, stream_name, token, rows):
|
||||||
super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows)
|
super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows)
|
||||||
preserve_fn(self.poke_pushers)(stream_name, token, rows)
|
run_in_background(self.poke_pushers, stream_name, token, rows)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def poke_pushers(self, stream_name, token, rows):
|
def poke_pushers(self, stream_name, token, rows):
|
||||||
|
|
|
@ -51,7 +51,7 @@ from synapse.storage.engines import create_engine
|
||||||
from synapse.storage.presence import UserPresenceState
|
from synapse.storage.presence import UserPresenceState
|
||||||
from synapse.storage.roommember import RoomMemberStore
|
from synapse.storage.roommember import RoomMemberStore
|
||||||
from synapse.util.httpresourcetree import create_resource_tree
|
from synapse.util.httpresourcetree import create_resource_tree
|
||||||
from synapse.util.logcontext import LoggingContext, preserve_fn
|
from synapse.util.logcontext import LoggingContext, run_in_background
|
||||||
from synapse.util.manhole import manhole
|
from synapse.util.manhole import manhole
|
||||||
from synapse.util.stringutils import random_string
|
from synapse.util.stringutils import random_string
|
||||||
from synapse.util.versionstring import get_version_string
|
from synapse.util.versionstring import get_version_string
|
||||||
|
@ -327,8 +327,7 @@ class SyncReplicationHandler(ReplicationClientHandler):
|
||||||
|
|
||||||
def on_rdata(self, stream_name, token, rows):
|
def on_rdata(self, stream_name, token, rows):
|
||||||
super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows)
|
super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows)
|
||||||
|
run_in_background(self.process_and_notify, stream_name, token, rows)
|
||||||
preserve_fn(self.process_and_notify)(stream_name, token, rows)
|
|
||||||
|
|
||||||
def get_streams_to_replicate(self):
|
def get_streams_to_replicate(self):
|
||||||
args = super(SyncReplicationHandler, self).get_streams_to_replicate()
|
args = super(SyncReplicationHandler, self).get_streams_to_replicate()
|
||||||
|
|
|
@ -51,7 +51,7 @@ components.
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
|
|
||||||
from synapse.appservice import ApplicationServiceState
|
from synapse.appservice import ApplicationServiceState
|
||||||
from synapse.util.logcontext import preserve_fn
|
from synapse.util.logcontext import run_in_background
|
||||||
from synapse.util.metrics import Measure
|
from synapse.util.metrics import Measure
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
@ -106,7 +106,7 @@ class _ServiceQueuer(object):
|
||||||
def enqueue(self, service, event):
|
def enqueue(self, service, event):
|
||||||
# if this service isn't being sent something
|
# if this service isn't being sent something
|
||||||
self.queued_events.setdefault(service.id, []).append(event)
|
self.queued_events.setdefault(service.id, []).append(event)
|
||||||
preserve_fn(self._send_request)(service)
|
run_in_background(self._send_request, service)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _send_request(self, service):
|
def _send_request(self, service):
|
||||||
|
@ -152,10 +152,10 @@ class _TransactionController(object):
|
||||||
if sent:
|
if sent:
|
||||||
yield txn.complete(self.store)
|
yield txn.complete(self.store)
|
||||||
else:
|
else:
|
||||||
preserve_fn(self._start_recoverer)(service)
|
run_in_background(self._start_recoverer, service)
|
||||||
except Exception as e:
|
except Exception:
|
||||||
logger.exception(e)
|
logger.exception("Error creating appservice transaction")
|
||||||
preserve_fn(self._start_recoverer)(service)
|
run_in_background(self._start_recoverer, service)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def on_recovered(self, recoverer):
|
def on_recovered(self, recoverer):
|
||||||
|
|
|
@ -19,7 +19,8 @@ from synapse.api.errors import SynapseError, Codes
|
||||||
from synapse.util import unwrapFirstError, logcontext
|
from synapse.util import unwrapFirstError, logcontext
|
||||||
from synapse.util.logcontext import (
|
from synapse.util.logcontext import (
|
||||||
PreserveLoggingContext,
|
PreserveLoggingContext,
|
||||||
preserve_fn
|
preserve_fn,
|
||||||
|
run_in_background,
|
||||||
)
|
)
|
||||||
from synapse.util.metrics import Measure
|
from synapse.util.metrics import Measure
|
||||||
|
|
||||||
|
@ -127,7 +128,7 @@ class Keyring(object):
|
||||||
|
|
||||||
verify_requests.append(verify_request)
|
verify_requests.append(verify_request)
|
||||||
|
|
||||||
preserve_fn(self._start_key_lookups)(verify_requests)
|
run_in_background(self._start_key_lookups, verify_requests)
|
||||||
|
|
||||||
# Pass those keys to handle_key_deferred so that the json object
|
# Pass those keys to handle_key_deferred so that the json object
|
||||||
# signatures can be verified
|
# signatures can be verified
|
||||||
|
@ -316,7 +317,7 @@ class Keyring(object):
|
||||||
if not verify_request.deferred.called:
|
if not verify_request.deferred.called:
|
||||||
verify_request.deferred.errback(err)
|
verify_request.deferred.errback(err)
|
||||||
|
|
||||||
preserve_fn(do_iterations)().addErrback(on_err)
|
run_in_background(do_iterations).addErrback(on_err)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def get_keys_from_store(self, server_name_and_key_ids):
|
def get_keys_from_store(self, server_name_and_key_ids):
|
||||||
|
@ -332,8 +333,9 @@ class Keyring(object):
|
||||||
"""
|
"""
|
||||||
res = yield logcontext.make_deferred_yieldable(defer.gatherResults(
|
res = yield logcontext.make_deferred_yieldable(defer.gatherResults(
|
||||||
[
|
[
|
||||||
preserve_fn(self.store.get_server_verify_keys)(
|
run_in_background(
|
||||||
server_name, key_ids
|
self.store.get_server_verify_keys,
|
||||||
|
server_name, key_ids,
|
||||||
).addCallback(lambda ks, server: (server, ks), server_name)
|
).addCallback(lambda ks, server: (server, ks), server_name)
|
||||||
for server_name, key_ids in server_name_and_key_ids
|
for server_name, key_ids in server_name_and_key_ids
|
||||||
],
|
],
|
||||||
|
@ -361,7 +363,7 @@ class Keyring(object):
|
||||||
|
|
||||||
results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
|
results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
|
||||||
[
|
[
|
||||||
preserve_fn(get_key)(p_name, p_keys)
|
run_in_background(get_key, p_name, p_keys)
|
||||||
for p_name, p_keys in self.perspective_servers.items()
|
for p_name, p_keys in self.perspective_servers.items()
|
||||||
],
|
],
|
||||||
consumeErrors=True,
|
consumeErrors=True,
|
||||||
|
@ -401,7 +403,7 @@ class Keyring(object):
|
||||||
|
|
||||||
results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
|
results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
|
||||||
[
|
[
|
||||||
preserve_fn(get_key)(server_name, key_ids)
|
run_in_background(get_key, server_name, key_ids)
|
||||||
for server_name, key_ids in server_name_and_key_ids
|
for server_name, key_ids in server_name_and_key_ids
|
||||||
],
|
],
|
||||||
consumeErrors=True,
|
consumeErrors=True,
|
||||||
|
@ -484,7 +486,8 @@ class Keyring(object):
|
||||||
|
|
||||||
yield logcontext.make_deferred_yieldable(defer.gatherResults(
|
yield logcontext.make_deferred_yieldable(defer.gatherResults(
|
||||||
[
|
[
|
||||||
preserve_fn(self.store_keys)(
|
run_in_background(
|
||||||
|
self.store_keys,
|
||||||
server_name=server_name,
|
server_name=server_name,
|
||||||
from_server=perspective_name,
|
from_server=perspective_name,
|
||||||
verify_keys=response_keys,
|
verify_keys=response_keys,
|
||||||
|
@ -542,7 +545,8 @@ class Keyring(object):
|
||||||
|
|
||||||
yield logcontext.make_deferred_yieldable(defer.gatherResults(
|
yield logcontext.make_deferred_yieldable(defer.gatherResults(
|
||||||
[
|
[
|
||||||
preserve_fn(self.store_keys)(
|
run_in_background(
|
||||||
|
self.store_keys,
|
||||||
server_name=key_server_name,
|
server_name=key_server_name,
|
||||||
from_server=server_name,
|
from_server=server_name,
|
||||||
verify_keys=verify_keys,
|
verify_keys=verify_keys,
|
||||||
|
@ -618,7 +622,8 @@ class Keyring(object):
|
||||||
|
|
||||||
yield logcontext.make_deferred_yieldable(defer.gatherResults(
|
yield logcontext.make_deferred_yieldable(defer.gatherResults(
|
||||||
[
|
[
|
||||||
preserve_fn(self.store.store_server_keys_json)(
|
run_in_background(
|
||||||
|
self.store.store_server_keys_json,
|
||||||
server_name=server_name,
|
server_name=server_name,
|
||||||
key_id=key_id,
|
key_id=key_id,
|
||||||
from_server=server_name,
|
from_server=server_name,
|
||||||
|
@ -719,7 +724,8 @@ class Keyring(object):
|
||||||
# TODO(markjh): Store whether the keys have expired.
|
# TODO(markjh): Store whether the keys have expired.
|
||||||
return logcontext.make_deferred_yieldable(defer.gatherResults(
|
return logcontext.make_deferred_yieldable(defer.gatherResults(
|
||||||
[
|
[
|
||||||
preserve_fn(self.store.store_server_verify_key)(
|
run_in_background(
|
||||||
|
self.store.store_server_verify_key,
|
||||||
server_name, server_name, key.time_added, key
|
server_name, server_name, key.time_added, key
|
||||||
)
|
)
|
||||||
for key_id, key in verify_keys.items()
|
for key_id, key in verify_keys.items()
|
||||||
|
|
|
@ -33,7 +33,7 @@ from synapse.federation.federation_base import (
|
||||||
import synapse.metrics
|
import synapse.metrics
|
||||||
from synapse.util import logcontext, unwrapFirstError
|
from synapse.util import logcontext, unwrapFirstError
|
||||||
from synapse.util.caches.expiringcache import ExpiringCache
|
from synapse.util.caches.expiringcache import ExpiringCache
|
||||||
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
|
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
|
||||||
from synapse.util.logutils import log_function
|
from synapse.util.logutils import log_function
|
||||||
from synapse.util.retryutils import NotRetryingDestination
|
from synapse.util.retryutils import NotRetryingDestination
|
||||||
|
|
||||||
|
@ -417,7 +417,8 @@ class FederationClient(FederationBase):
|
||||||
batch = set(missing_events[i:i + batch_size])
|
batch = set(missing_events[i:i + batch_size])
|
||||||
|
|
||||||
deferreds = [
|
deferreds = [
|
||||||
preserve_fn(self.get_pdu)(
|
run_in_background(
|
||||||
|
self.get_pdu,
|
||||||
destinations=random_server_list(),
|
destinations=random_server_list(),
|
||||||
event_id=e_id,
|
event_id=e_id,
|
||||||
)
|
)
|
||||||
|
|
|
@ -42,7 +42,7 @@ from twisted.internet import defer
|
||||||
|
|
||||||
from synapse.api.errors import SynapseError
|
from synapse.api.errors import SynapseError
|
||||||
from synapse.types import get_domain_from_id
|
from synapse.types import get_domain_from_id
|
||||||
from synapse.util.logcontext import preserve_fn
|
from synapse.util.logcontext import run_in_background
|
||||||
|
|
||||||
from signedjson.sign import sign_json
|
from signedjson.sign import sign_json
|
||||||
|
|
||||||
|
@ -196,4 +196,4 @@ class GroupAttestionRenewer(object):
|
||||||
group_id = row["group_id"]
|
group_id = row["group_id"]
|
||||||
user_id = row["user_id"]
|
user_id = row["user_id"]
|
||||||
|
|
||||||
preserve_fn(_renew_attestation)(group_id, user_id)
|
run_in_background(_renew_attestation, group_id, user_id)
|
||||||
|
|
|
@ -198,7 +198,10 @@ class ApplicationServicesHandler(object):
|
||||||
services = yield self._get_services_for_3pn(protocol)
|
services = yield self._get_services_for_3pn(protocol)
|
||||||
|
|
||||||
results = yield make_deferred_yieldable(defer.DeferredList([
|
results = yield make_deferred_yieldable(defer.DeferredList([
|
||||||
preserve_fn(self.appservice_api.query_3pe)(service, kind, protocol, fields)
|
run_in_background(
|
||||||
|
self.appservice_api.query_3pe,
|
||||||
|
service, kind, protocol, fields,
|
||||||
|
)
|
||||||
for service in services
|
for service in services
|
||||||
], consumeErrors=True))
|
], consumeErrors=True))
|
||||||
|
|
||||||
|
|
|
@ -24,7 +24,7 @@ from synapse.api.errors import (
|
||||||
SynapseError, CodeMessageException, FederationDeniedError,
|
SynapseError, CodeMessageException, FederationDeniedError,
|
||||||
)
|
)
|
||||||
from synapse.types import get_domain_from_id, UserID
|
from synapse.types import get_domain_from_id, UserID
|
||||||
from synapse.util.logcontext import preserve_fn, make_deferred_yieldable
|
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
|
||||||
from synapse.util.retryutils import NotRetryingDestination
|
from synapse.util.retryutils import NotRetryingDestination
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
@ -139,7 +139,7 @@ class E2eKeysHandler(object):
|
||||||
failures[destination] = _exception_to_failure(e)
|
failures[destination] = _exception_to_failure(e)
|
||||||
|
|
||||||
yield make_deferred_yieldable(defer.gatherResults([
|
yield make_deferred_yieldable(defer.gatherResults([
|
||||||
preserve_fn(do_remote_query)(destination)
|
run_in_background(do_remote_query, destination)
|
||||||
for destination in remote_queries_not_in_cache
|
for destination in remote_queries_not_in_cache
|
||||||
], consumeErrors=True))
|
], consumeErrors=True))
|
||||||
|
|
||||||
|
@ -242,7 +242,7 @@ class E2eKeysHandler(object):
|
||||||
failures[destination] = _exception_to_failure(e)
|
failures[destination] = _exception_to_failure(e)
|
||||||
|
|
||||||
yield make_deferred_yieldable(defer.gatherResults([
|
yield make_deferred_yieldable(defer.gatherResults([
|
||||||
preserve_fn(claim_client_keys)(destination)
|
run_in_background(claim_client_keys, destination)
|
||||||
for destination in remote_queries
|
for destination in remote_queries
|
||||||
], consumeErrors=True))
|
], consumeErrors=True))
|
||||||
|
|
||||||
|
|
|
@ -639,7 +639,8 @@ class FederationHandler(BaseHandler):
|
||||||
|
|
||||||
results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
|
results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
|
||||||
[
|
[
|
||||||
logcontext.preserve_fn(self.replication_layer.get_pdu)(
|
logcontext.run_in_background(
|
||||||
|
self.replication_layer.get_pdu,
|
||||||
[dest],
|
[dest],
|
||||||
event_id,
|
event_id,
|
||||||
outlier=True,
|
outlier=True,
|
||||||
|
@ -1025,7 +1026,7 @@ class FederationHandler(BaseHandler):
|
||||||
# lots of requests for missing prev_events which we do actually
|
# lots of requests for missing prev_events which we do actually
|
||||||
# have. Hence we fire off the deferred, but don't wait for it.
|
# have. Hence we fire off the deferred, but don't wait for it.
|
||||||
|
|
||||||
logcontext.preserve_fn(self._handle_queued_pdus)(room_queue)
|
logcontext.run_in_background(self._handle_queued_pdus, room_queue)
|
||||||
|
|
||||||
defer.returnValue(True)
|
defer.returnValue(True)
|
||||||
|
|
||||||
|
@ -1527,8 +1528,9 @@ class FederationHandler(BaseHandler):
|
||||||
if not backfilled:
|
if not backfilled:
|
||||||
# this intentionally does not yield: we don't care about the result
|
# this intentionally does not yield: we don't care about the result
|
||||||
# and don't need to wait for it.
|
# and don't need to wait for it.
|
||||||
logcontext.preserve_fn(self.pusher_pool.on_new_notifications)(
|
logcontext.run_in_background(
|
||||||
event_stream_id, max_stream_id
|
self.pusher_pool.on_new_notifications,
|
||||||
|
event_stream_id, max_stream_id,
|
||||||
)
|
)
|
||||||
|
|
||||||
defer.returnValue((context, event_stream_id, max_stream_id))
|
defer.returnValue((context, event_stream_id, max_stream_id))
|
||||||
|
@ -1542,7 +1544,8 @@ class FederationHandler(BaseHandler):
|
||||||
"""
|
"""
|
||||||
contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
|
contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
|
||||||
[
|
[
|
||||||
logcontext.preserve_fn(self._prep_event)(
|
logcontext.run_in_background(
|
||||||
|
self._prep_event,
|
||||||
origin,
|
origin,
|
||||||
ev_info["event"],
|
ev_info["event"],
|
||||||
state=ev_info.get("state"),
|
state=ev_info.get("state"),
|
||||||
|
@ -1871,7 +1874,8 @@ class FederationHandler(BaseHandler):
|
||||||
|
|
||||||
different_events = yield logcontext.make_deferred_yieldable(
|
different_events = yield logcontext.make_deferred_yieldable(
|
||||||
defer.gatherResults([
|
defer.gatherResults([
|
||||||
logcontext.preserve_fn(self.store.get_event)(
|
logcontext.run_in_background(
|
||||||
|
self.store.get_event,
|
||||||
d,
|
d,
|
||||||
allow_none=True,
|
allow_none=True,
|
||||||
allow_rejected=False,
|
allow_rejected=False,
|
||||||
|
|
|
@ -27,7 +27,7 @@ from synapse.types import (
|
||||||
from synapse.util import unwrapFirstError
|
from synapse.util import unwrapFirstError
|
||||||
from synapse.util.async import concurrently_execute
|
from synapse.util.async import concurrently_execute
|
||||||
from synapse.util.caches.snapshot_cache import SnapshotCache
|
from synapse.util.caches.snapshot_cache import SnapshotCache
|
||||||
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
|
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
|
||||||
from synapse.visibility import filter_events_for_client
|
from synapse.visibility import filter_events_for_client
|
||||||
|
|
||||||
from ._base import BaseHandler
|
from ._base import BaseHandler
|
||||||
|
@ -166,7 +166,8 @@ class InitialSyncHandler(BaseHandler):
|
||||||
(messages, token), current_state = yield make_deferred_yieldable(
|
(messages, token), current_state = yield make_deferred_yieldable(
|
||||||
defer.gatherResults(
|
defer.gatherResults(
|
||||||
[
|
[
|
||||||
preserve_fn(self.store.get_recent_events_for_room)(
|
run_in_background(
|
||||||
|
self.store.get_recent_events_for_room,
|
||||||
event.room_id,
|
event.room_id,
|
||||||
limit=limit,
|
limit=limit,
|
||||||
end_token=room_end_token,
|
end_token=room_end_token,
|
||||||
|
@ -391,9 +392,10 @@ class InitialSyncHandler(BaseHandler):
|
||||||
|
|
||||||
presence, receipts, (messages, token) = yield defer.gatherResults(
|
presence, receipts, (messages, token) = yield defer.gatherResults(
|
||||||
[
|
[
|
||||||
preserve_fn(get_presence)(),
|
run_in_background(get_presence),
|
||||||
preserve_fn(get_receipts)(),
|
run_in_background(get_receipts),
|
||||||
preserve_fn(self.store.get_recent_events_for_room)(
|
run_in_background(
|
||||||
|
self.store.get_recent_events_for_room,
|
||||||
room_id,
|
room_id,
|
||||||
limit=limit,
|
limit=limit,
|
||||||
end_token=now_token.room_key,
|
end_token=now_token.room_key,
|
||||||
|
|
|
@ -31,7 +31,7 @@ from synapse.types import (
|
||||||
UserID, RoomAlias, RoomStreamToken,
|
UserID, RoomAlias, RoomStreamToken,
|
||||||
)
|
)
|
||||||
from synapse.util.async import run_on_reactor, ReadWriteLock, Limiter
|
from synapse.util.async import run_on_reactor, ReadWriteLock, Limiter
|
||||||
from synapse.util.logcontext import preserve_fn, run_in_background
|
from synapse.util.logcontext import run_in_background
|
||||||
from synapse.util.metrics import measure_func
|
from synapse.util.metrics import measure_func
|
||||||
from synapse.util.frozenutils import frozendict_json_encoder
|
from synapse.util.frozenutils import frozendict_json_encoder
|
||||||
from synapse.util.stringutils import random_string
|
from synapse.util.stringutils import random_string
|
||||||
|
@ -857,7 +857,8 @@ class EventCreationHandler(object):
|
||||||
|
|
||||||
# this intentionally does not yield: we don't care about the result
|
# this intentionally does not yield: we don't care about the result
|
||||||
# and don't need to wait for it.
|
# and don't need to wait for it.
|
||||||
preserve_fn(self.pusher_pool.on_new_notifications)(
|
run_in_background(
|
||||||
|
self.pusher_pool.on_new_notifications,
|
||||||
event_stream_id, max_stream_id
|
event_stream_id, max_stream_id
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -872,7 +873,7 @@ class EventCreationHandler(object):
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception("Error notifying about new room event")
|
logger.exception("Error notifying about new room event")
|
||||||
|
|
||||||
preserve_fn(_notify)()
|
run_in_background(_notify)
|
||||||
|
|
||||||
if event.type == EventTypes.Message:
|
if event.type == EventTypes.Message:
|
||||||
# We don't want to block sending messages on any presence code. This
|
# We don't want to block sending messages on any presence code. This
|
||||||
|
|
|
@ -16,7 +16,7 @@
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
|
|
||||||
from synapse.api.errors import SynapseError, AuthError
|
from synapse.api.errors import SynapseError, AuthError
|
||||||
from synapse.util.logcontext import preserve_fn
|
from synapse.util.logcontext import run_in_background
|
||||||
from synapse.util.metrics import Measure
|
from synapse.util.metrics import Measure
|
||||||
from synapse.util.wheel_timer import WheelTimer
|
from synapse.util.wheel_timer import WheelTimer
|
||||||
from synapse.types import UserID, get_domain_from_id
|
from synapse.types import UserID, get_domain_from_id
|
||||||
|
@ -97,7 +97,8 @@ class TypingHandler(object):
|
||||||
if self.hs.is_mine_id(member.user_id):
|
if self.hs.is_mine_id(member.user_id):
|
||||||
last_fed_poke = self._member_last_federation_poke.get(member, None)
|
last_fed_poke = self._member_last_federation_poke.get(member, None)
|
||||||
if not last_fed_poke or last_fed_poke + FEDERATION_PING_INTERVAL <= now:
|
if not last_fed_poke or last_fed_poke + FEDERATION_PING_INTERVAL <= now:
|
||||||
preserve_fn(self._push_remote)(
|
run_in_background(
|
||||||
|
self._push_remote,
|
||||||
member=member,
|
member=member,
|
||||||
typing=True
|
typing=True
|
||||||
)
|
)
|
||||||
|
@ -196,7 +197,7 @@ class TypingHandler(object):
|
||||||
def _push_update(self, member, typing):
|
def _push_update(self, member, typing):
|
||||||
if self.hs.is_mine_id(member.user_id):
|
if self.hs.is_mine_id(member.user_id):
|
||||||
# Only send updates for changes to our own users.
|
# Only send updates for changes to our own users.
|
||||||
preserve_fn(self._push_remote)(member, typing)
|
run_in_background(self._push_remote, member, typing)
|
||||||
|
|
||||||
self._push_update_local(
|
self._push_update_local(
|
||||||
member=member,
|
member=member,
|
||||||
|
|
|
@ -14,13 +14,13 @@
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
|
import logging
|
||||||
|
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
|
|
||||||
from .pusher import PusherFactory
|
from synapse.push.pusher import PusherFactory
|
||||||
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
|
|
||||||
from synapse.util.async import run_on_reactor
|
from synapse.util.async import run_on_reactor
|
||||||
|
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
|
||||||
import logging
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@ -137,8 +137,9 @@ class PusherPool:
|
||||||
if u in self.pushers:
|
if u in self.pushers:
|
||||||
for p in self.pushers[u].values():
|
for p in self.pushers[u].values():
|
||||||
deferreds.append(
|
deferreds.append(
|
||||||
preserve_fn(p.on_new_notifications)(
|
run_in_background(
|
||||||
min_stream_id, max_stream_id
|
p.on_new_notifications,
|
||||||
|
min_stream_id, max_stream_id,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -166,7 +167,10 @@ class PusherPool:
|
||||||
if u in self.pushers:
|
if u in self.pushers:
|
||||||
for p in self.pushers[u].values():
|
for p in self.pushers[u].values():
|
||||||
deferreds.append(
|
deferreds.append(
|
||||||
preserve_fn(p.on_new_receipts)(min_stream_id, max_stream_id)
|
run_in_background(
|
||||||
|
p.on_new_receipts,
|
||||||
|
min_stream_id, max_stream_id,
|
||||||
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
yield make_deferred_yieldable(
|
yield make_deferred_yieldable(
|
||||||
|
@ -211,7 +215,7 @@ class PusherPool:
|
||||||
if appid_pushkey in byuser:
|
if appid_pushkey in byuser:
|
||||||
byuser[appid_pushkey].on_stop()
|
byuser[appid_pushkey].on_stop()
|
||||||
byuser[appid_pushkey] = p
|
byuser[appid_pushkey] = p
|
||||||
preserve_fn(p.on_started)()
|
run_in_background(p.on_started)
|
||||||
|
|
||||||
logger.info("Started pushers")
|
logger.info("Started pushers")
|
||||||
|
|
||||||
|
|
|
@ -35,7 +35,7 @@ from ._base import FileInfo
|
||||||
from synapse.api.errors import (
|
from synapse.api.errors import (
|
||||||
SynapseError, Codes,
|
SynapseError, Codes,
|
||||||
)
|
)
|
||||||
from synapse.util.logcontext import preserve_fn, make_deferred_yieldable
|
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
|
||||||
from synapse.util.stringutils import random_string
|
from synapse.util.stringutils import random_string
|
||||||
from synapse.util.caches.expiringcache import ExpiringCache
|
from synapse.util.caches.expiringcache import ExpiringCache
|
||||||
from synapse.http.client import SpiderHttpClient
|
from synapse.http.client import SpiderHttpClient
|
||||||
|
@ -144,7 +144,8 @@ class PreviewUrlResource(Resource):
|
||||||
observable = self._cache.get(url)
|
observable = self._cache.get(url)
|
||||||
|
|
||||||
if not observable:
|
if not observable:
|
||||||
download = preserve_fn(self._do_preview)(
|
download = run_in_background(
|
||||||
|
self._do_preview,
|
||||||
url, requester.user, ts,
|
url, requester.user, ts,
|
||||||
)
|
)
|
||||||
observable = ObservableDeferred(
|
observable = ObservableDeferred(
|
||||||
|
|
|
@ -20,7 +20,7 @@ from synapse.events import FrozenEvent
|
||||||
from synapse.events.utils import prune_event
|
from synapse.events.utils import prune_event
|
||||||
|
|
||||||
from synapse.util.logcontext import (
|
from synapse.util.logcontext import (
|
||||||
preserve_fn, PreserveLoggingContext, make_deferred_yieldable
|
PreserveLoggingContext, make_deferred_yieldable, run_in_background,
|
||||||
)
|
)
|
||||||
from synapse.util.metrics import Measure
|
from synapse.util.metrics import Measure
|
||||||
from synapse.api.errors import SynapseError
|
from synapse.api.errors import SynapseError
|
||||||
|
@ -319,7 +319,8 @@ class EventsWorkerStore(SQLBaseStore):
|
||||||
|
|
||||||
res = yield make_deferred_yieldable(defer.gatherResults(
|
res = yield make_deferred_yieldable(defer.gatherResults(
|
||||||
[
|
[
|
||||||
preserve_fn(self._get_event_from_row)(
|
run_in_background(
|
||||||
|
self._get_event_from_row,
|
||||||
row["internal_metadata"], row["json"], row["redacts"],
|
row["internal_metadata"], row["json"], row["redacts"],
|
||||||
rejected_reason=row["rejects"],
|
rejected_reason=row["rejects"],
|
||||||
)
|
)
|
||||||
|
|
|
@ -41,7 +41,7 @@ from synapse.storage.events import EventsWorkerStore
|
||||||
from synapse.util.caches.descriptors import cached
|
from synapse.util.caches.descriptors import cached
|
||||||
from synapse.types import RoomStreamToken
|
from synapse.types import RoomStreamToken
|
||||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||||
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
|
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
|
||||||
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
|
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
|
||||||
|
|
||||||
import abc
|
import abc
|
||||||
|
@ -198,7 +198,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
room_ids = list(room_ids)
|
room_ids = list(room_ids)
|
||||||
for rm_ids in (room_ids[i:i + 20] for i in xrange(0, len(room_ids), 20)):
|
for rm_ids in (room_ids[i:i + 20] for i in xrange(0, len(room_ids), 20)):
|
||||||
res = yield make_deferred_yieldable(defer.gatherResults([
|
res = yield make_deferred_yieldable(defer.gatherResults([
|
||||||
preserve_fn(self.get_room_events_stream_for_room)(
|
run_in_background(
|
||||||
|
self.get_room_events_stream_for_room,
|
||||||
room_id, from_key, to_key, limit, order=order,
|
room_id, from_key, to_key, limit, order=order,
|
||||||
)
|
)
|
||||||
for room_id in rm_ids
|
for room_id in rm_ids
|
||||||
|
|
|
@ -19,7 +19,7 @@ from twisted.internet.defer import CancelledError
|
||||||
from twisted.python import failure
|
from twisted.python import failure
|
||||||
|
|
||||||
from .logcontext import (
|
from .logcontext import (
|
||||||
PreserveLoggingContext, make_deferred_yieldable, preserve_fn
|
PreserveLoggingContext, make_deferred_yieldable, run_in_background
|
||||||
)
|
)
|
||||||
from synapse.util import logcontext, unwrapFirstError
|
from synapse.util import logcontext, unwrapFirstError
|
||||||
|
|
||||||
|
@ -163,7 +163,7 @@ def concurrently_execute(func, args, limit):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
return logcontext.make_deferred_yieldable(defer.gatherResults([
|
return logcontext.make_deferred_yieldable(defer.gatherResults([
|
||||||
preserve_fn(_concurrently_execute_inner)()
|
run_in_background(_concurrently_execute_inner)
|
||||||
for _ in xrange(limit)
|
for _ in xrange(limit)
|
||||||
], consumeErrors=True)).addErrback(unwrapFirstError)
|
], consumeErrors=True)).addErrback(unwrapFirstError)
|
||||||
|
|
||||||
|
|
|
@ -15,7 +15,7 @@
|
||||||
|
|
||||||
from twisted.internet import threads, reactor
|
from twisted.internet import threads, reactor
|
||||||
|
|
||||||
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
|
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
|
||||||
|
|
||||||
from six.moves import queue
|
from six.moves import queue
|
||||||
|
|
||||||
|
@ -70,7 +70,9 @@ class BackgroundFileConsumer(object):
|
||||||
|
|
||||||
self._producer = producer
|
self._producer = producer
|
||||||
self.streaming = streaming
|
self.streaming = streaming
|
||||||
self._finished_deferred = preserve_fn(threads.deferToThread)(self._writer)
|
self._finished_deferred = run_in_background(
|
||||||
|
threads.deferToThread, self._writer
|
||||||
|
)
|
||||||
if not streaming:
|
if not streaming:
|
||||||
self._producer.resumeProducing()
|
self._producer.resumeProducing()
|
||||||
|
|
||||||
|
|
|
@ -346,7 +346,7 @@ def make_deferred_yieldable(deferred):
|
||||||
returning a deferred. Then, when the deferred completes, restores the
|
returning a deferred. Then, when the deferred completes, restores the
|
||||||
current logcontext before running callbacks/errbacks.
|
current logcontext before running callbacks/errbacks.
|
||||||
|
|
||||||
(This is more-or-less the opposite operation to preserve_fn.)
|
(This is more-or-less the opposite operation to run_in_background.)
|
||||||
"""
|
"""
|
||||||
if isinstance(deferred, defer.Deferred) and not deferred.called:
|
if isinstance(deferred, defer.Deferred) and not deferred.called:
|
||||||
prev_context = LoggingContext.set_current_context(LoggingContext.sentinel)
|
prev_context = LoggingContext.set_current_context(LoggingContext.sentinel)
|
||||||
|
|
|
@ -18,7 +18,7 @@ from twisted.internet import defer
|
||||||
from synapse.api.errors import LimitExceededError
|
from synapse.api.errors import LimitExceededError
|
||||||
|
|
||||||
from synapse.util.async import sleep
|
from synapse.util.async import sleep
|
||||||
from synapse.util.logcontext import preserve_fn
|
from synapse.util.logcontext import run_in_background
|
||||||
|
|
||||||
import collections
|
import collections
|
||||||
import contextlib
|
import contextlib
|
||||||
|
@ -150,7 +150,7 @@ class _PerHostRatelimiter(object):
|
||||||
"Ratelimit [%s]: sleeping req",
|
"Ratelimit [%s]: sleeping req",
|
||||||
id(request_id),
|
id(request_id),
|
||||||
)
|
)
|
||||||
ret_defer = preserve_fn(sleep)(self.sleep_msec / 1000.0)
|
ret_defer = run_in_background(sleep, self.sleep_msec / 1000.0)
|
||||||
|
|
||||||
self.sleeping_requests.add(request_id)
|
self.sleeping_requests.add(request_id)
|
||||||
|
|
||||||
|
|
|
@ -203,8 +203,8 @@ class RetryDestinationLimiter(object):
|
||||||
)
|
)
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception(
|
logger.exception(
|
||||||
"Failed to store set_destination_retry_timings",
|
"Failed to store destination_retry_timings",
|
||||||
)
|
)
|
||||||
|
|
||||||
# we deliberately do this in the background.
|
# we deliberately do this in the background.
|
||||||
synapse.util.logcontext.preserve_fn(store_retry_timings)()
|
synapse.util.logcontext.run_in_background(store_retry_timings)
|
||||||
|
|
Loading…
Reference in a new issue