Wrap a number of things that run in the background

This will reduce the number of "Starting db connection from sentinel context"
warnings, and will help with our metrics.
This commit is contained in:
Richard van der Hoff 2018-07-25 09:41:12 +01:00
parent 1e5dbdcbb1
commit 371da42ae4
9 changed files with 63 additions and 22 deletions

View file

@ -49,6 +49,7 @@ from synapse.http.additional_resource import AdditionalResource
from synapse.http.server import RootRedirect
from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.module_api import ModuleApi
from synapse.python_dependencies import CONDITIONAL_REQUIREMENTS, check_requirements
@ -427,6 +428,9 @@ def run(hs):
# currently either 0 or 1
stats_process = []
def start_phone_stats_home():
run_as_background_process("phone_stats_home", phone_stats_home)
@defer.inlineCallbacks
def phone_stats_home():
logger.info("Gathering stats for reporting")
@ -498,7 +502,10 @@ def run(hs):
)
def generate_user_daily_visit_stats():
hs.get_datastore().generate_user_daily_visits()
run_as_background_process(
"generate_user_daily_visits",
hs.get_datastore().generate_user_daily_visits,
)
# Rather than update on per session basis, batch up the requests.
# If you increase the loop period, the accuracy of user_daily_visits
@ -507,7 +514,7 @@ def run(hs):
if hs.config.report_stats:
logger.info("Scheduling stats reporting for 3 hour intervals")
clock.looping_call(phone_stats_home, 3 * 60 * 60 * 1000)
clock.looping_call(start_phone_stats_home, 3 * 60 * 60 * 1000)
# We need to defer this init for the cases that we daemonize
# otherwise the process ID we get is that of the non-daemon process
@ -515,7 +522,7 @@ def run(hs):
# We wait 5 minutes to send the first set of stats as the server can
# be quite busy the first few minutes
clock.call_later(5 * 60, phone_stats_home)
clock.call_later(5 * 60, start_phone_stats_home)
if hs.config.daemonize and hs.config.print_pidfile:
print (hs.config.pid_file)

View file

@ -43,6 +43,7 @@ from signedjson.sign import sign_json
from twisted.internet import defer
from synapse.api.errors import SynapseError
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import get_domain_from_id
from synapse.util.logcontext import run_in_background
@ -129,7 +130,7 @@ class GroupAttestionRenewer(object):
self.attestations = hs.get_groups_attestation_signing()
self._renew_attestations_loop = self.clock.looping_call(
self._renew_attestations, 30 * 60 * 1000,
self._start_renew_attestations, 30 * 60 * 1000,
)
@defer.inlineCallbacks
@ -151,6 +152,9 @@ class GroupAttestionRenewer(object):
defer.returnValue({})
def _start_renew_attestations(self):
run_as_background_process("renew_attestations", self._renew_attestations)
@defer.inlineCallbacks
def _renew_attestations(self):
"""Called periodically to check if we need to update any of our attestations

View file

@ -25,6 +25,7 @@ from twisted.internet import defer
from twisted.internet.protocol import Factory
from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.metrics import Measure, measure_func
from .protocol import ServerReplicationStreamProtocol
@ -117,7 +118,6 @@ class ReplicationStreamer(object):
for conn in self.connections:
conn.send_error("server shutting down")
@defer.inlineCallbacks
def on_notifier_poke(self):
"""Checks if there is actually any new data and sends it to the
connections if there are.
@ -132,14 +132,16 @@ class ReplicationStreamer(object):
stream.discard_updates_and_advance()
return
# If we're in the process of checking for new updates, mark that fact
# and return
self.pending_updates = True
if self.is_looping:
logger.debug("Noitifier poke loop already running")
self.pending_updates = True
logger.debug("Notifier poke loop already running")
return
self.pending_updates = True
run_as_background_process("replication_notifier", self._run_notifier_loop)
@defer.inlineCallbacks
def _run_notifier_loop(self):
self.is_looping = True
try:

View file

@ -35,6 +35,7 @@ from synapse.api.errors import (
SynapseError,
)
from synapse.http.matrixfederationclient import MatrixFederationHttpClient
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.async import Linearizer
from synapse.util.logcontext import make_deferred_yieldable
from synapse.util.retryutils import NotRetryingDestination
@ -100,10 +101,15 @@ class MediaRepository(object):
)
self.clock.looping_call(
self._update_recently_accessed,
self._start_update_recently_accessed,
UPDATE_RECENTLY_ACCESSED_TS,
)
def _start_update_recently_accessed(self):
run_as_background_process(
"update_recently_accessed_media", self._update_recently_accessed,
)
@defer.inlineCallbacks
def _update_recently_accessed(self):
remote_media = self.recently_accessed_remotes

View file

@ -41,6 +41,7 @@ from synapse.http.server import (
wrap_json_request_handler,
)
from synapse.http.servlet import parse_integer, parse_string
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.async import ObservableDeferred
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
@ -81,7 +82,7 @@ class PreviewUrlResource(Resource):
self._cache.start()
self._cleaner_loop = self.clock.looping_call(
self._expire_url_cache_data, 10 * 1000
self._start_expire_url_cache_data, 10 * 1000,
)
def render_OPTIONS(self, request):
@ -371,6 +372,11 @@ class PreviewUrlResource(Resource):
"etag": headers["ETag"][0] if "ETag" in headers else None,
})
def _start_expire_url_cache_data(self):
run_as_background_process(
"expire_url_cache_data", self._expire_url_cache_data,
)
@defer.inlineCallbacks
def _expire_url_cache_data(self):
"""Clean up expired url cache content, media and thumbnails.

View file

@ -21,6 +21,7 @@ from canonicaljson import json
from twisted.internet import defer
from synapse.api.errors import StoreError
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
from ._base import Cache, SQLBaseStore
@ -711,6 +712,9 @@ class DeviceStore(SQLBaseStore):
logger.info("Pruned %d device list outbound pokes", txn.rowcount)
return self.runInteraction(
"_prune_old_outbound_device_pokes", _prune_txn
run_as_background_process(
"prune_old_outbound_device_pokes",
self.runInteraction,
"_prune_old_outbound_device_pokes",
_prune_txn,
)

View file

@ -23,6 +23,7 @@ from unpaddedbase64 import encode_base64
from twisted.internet import defer
from synapse.api.errors import StoreError
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage._base import SQLBaseStore
from synapse.storage.events import EventsWorkerStore
from synapse.storage.signatures import SignatureWorkerStore
@ -446,7 +447,7 @@ class EventFederationStore(EventFederationWorkerStore):
)
hs.get_clock().looping_call(
self._delete_old_forward_extrem_cache, 60 * 60 * 1000
self._delete_old_forward_extrem_cache, 60 * 60 * 1000,
)
def _update_min_depth_for_room_txn(self, txn, room_id, depth):
@ -548,9 +549,11 @@ class EventFederationStore(EventFederationWorkerStore):
sql,
(self.stream_ordering_month_ago, self.stream_ordering_month_ago,)
)
return self.runInteraction(
run_as_background_process(
"delete_old_forward_extrem_cache",
self.runInteraction,
"_delete_old_forward_extrem_cache",
_delete_old_forward_extrem_cache_txn
_delete_old_forward_extrem_cache_txn,
)
def clean_room_for_join(self, room_id):

View file

@ -22,6 +22,7 @@ from canonicaljson import json
from twisted.internet import defer
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage._base import LoggingTransaction, SQLBaseStore
from synapse.util.caches.descriptors import cachedInlineCallbacks
@ -458,11 +459,12 @@ class EventPushActionsWorkerStore(SQLBaseStore):
"Error removing push actions after event persistence failure",
)
@defer.inlineCallbacks
def _find_stream_orderings_for_times(self):
yield self.runInteraction(
run_as_background_process(
"event_push_action_stream_orderings",
self.runInteraction,
"_find_stream_orderings_for_times",
self._find_stream_orderings_for_times_txn
self._find_stream_orderings_for_times_txn,
)
def _find_stream_orderings_for_times_txn(self, txn):
@ -604,7 +606,7 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
self._doing_notif_rotation = False
self._rotate_notif_loop = self._clock.looping_call(
self._rotate_notifs, 30 * 60 * 1000
self._start_rotate_notifs, 30 * 60 * 1000,
)
def _set_push_actions_for_event_and_users_txn(self, txn, events_and_contexts,
@ -787,6 +789,9 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
WHERE room_id = ? AND user_id = ? AND stream_ordering <= ?
""", (room_id, user_id, stream_ordering))
def _start_rotate_notifs(self):
run_as_background_process("rotate_notifs", self._rotate_notifs)
@defer.inlineCallbacks
def _rotate_notifs(self):
if self._doing_notif_rotation or self.stream_ordering_day_ago is None:

View file

@ -22,6 +22,7 @@ from canonicaljson import encode_canonical_json, json
from twisted.internet import defer
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.caches.descriptors import cached
from ._base import SQLBaseStore
@ -57,7 +58,7 @@ class TransactionStore(SQLBaseStore):
def __init__(self, db_conn, hs):
super(TransactionStore, self).__init__(db_conn, hs)
self._clock.looping_call(self._cleanup_transactions, 30 * 60 * 1000)
self._clock.looping_call(self._start_cleanup_transactions, 30 * 60 * 1000)
def get_received_txn_response(self, transaction_id, origin):
"""For an incoming transaction from a given origin, check if we have
@ -271,6 +272,9 @@ class TransactionStore(SQLBaseStore):
txn.execute(query, (self._clock.time_msec(),))
return self.cursor_to_dict(txn)
def _start_cleanup_transactions(self):
run_as_background_process("cleanup_transactions", self._cleanup_transactions)
def _cleanup_transactions(self):
now = self._clock.time_msec()
month_ago = now - 30 * 24 * 60 * 60 * 1000