Merge pull request #3604 from matrix-org/rav/background_process_fixes

Wrap a number of things that run in the background
This commit is contained in:
Richard van der Hoff 2018-07-25 11:54:33 +01:00 committed by GitHub
commit 1bfb5bed1d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 95 additions and 30 deletions

1
changelog.d/3604.feature Normal file
View file

@ -0,0 +1 @@
Add metrics to track resource usage by background processes

View file

@ -49,6 +49,7 @@ from synapse.http.additional_resource import AdditionalResource
from synapse.http.server import RootRedirect from synapse.http.server import RootRedirect
from synapse.http.site import SynapseSite from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy from synapse.metrics import RegistryProxy
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.module_api import ModuleApi from synapse.module_api import ModuleApi
from synapse.python_dependencies import CONDITIONAL_REQUIREMENTS, check_requirements from synapse.python_dependencies import CONDITIONAL_REQUIREMENTS, check_requirements
@ -427,6 +428,9 @@ def run(hs):
# currently either 0 or 1 # currently either 0 or 1
stats_process = [] stats_process = []
def start_phone_stats_home():
run_as_background_process("phone_stats_home", phone_stats_home)
@defer.inlineCallbacks @defer.inlineCallbacks
def phone_stats_home(): def phone_stats_home():
logger.info("Gathering stats for reporting") logger.info("Gathering stats for reporting")
@ -498,7 +502,10 @@ def run(hs):
) )
def generate_user_daily_visit_stats(): def generate_user_daily_visit_stats():
hs.get_datastore().generate_user_daily_visits() run_as_background_process(
"generate_user_daily_visits",
hs.get_datastore().generate_user_daily_visits,
)
# Rather than update on per session basis, batch up the requests. # Rather than update on per session basis, batch up the requests.
# If you increase the loop period, the accuracy of user_daily_visits # If you increase the loop period, the accuracy of user_daily_visits
@ -507,7 +514,7 @@ def run(hs):
if hs.config.report_stats: if hs.config.report_stats:
logger.info("Scheduling stats reporting for 3 hour intervals") logger.info("Scheduling stats reporting for 3 hour intervals")
clock.looping_call(phone_stats_home, 3 * 60 * 60 * 1000) clock.looping_call(start_phone_stats_home, 3 * 60 * 60 * 1000)
# We need to defer this init for the cases that we daemonize # We need to defer this init for the cases that we daemonize
# otherwise the process ID we get is that of the non-daemon process # otherwise the process ID we get is that of the non-daemon process
@ -515,7 +522,7 @@ def run(hs):
# We wait 5 minutes to send the first set of stats as the server can # We wait 5 minutes to send the first set of stats as the server can
# be quite busy the first few minutes # be quite busy the first few minutes
clock.call_later(5 * 60, phone_stats_home) clock.call_later(5 * 60, start_phone_stats_home)
if hs.config.daemonize and hs.config.print_pidfile: if hs.config.daemonize and hs.config.print_pidfile:
print (hs.config.pid_file) print (hs.config.pid_file)

View file

@ -43,6 +43,7 @@ from signedjson.sign import sign_json
from twisted.internet import defer from twisted.internet import defer
from synapse.api.errors import SynapseError from synapse.api.errors import SynapseError
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import get_domain_from_id from synapse.types import get_domain_from_id
from synapse.util.logcontext import run_in_background from synapse.util.logcontext import run_in_background
@ -129,7 +130,7 @@ class GroupAttestionRenewer(object):
self.attestations = hs.get_groups_attestation_signing() self.attestations = hs.get_groups_attestation_signing()
self._renew_attestations_loop = self.clock.looping_call( self._renew_attestations_loop = self.clock.looping_call(
self._renew_attestations, 30 * 60 * 1000, self._start_renew_attestations, 30 * 60 * 1000,
) )
@defer.inlineCallbacks @defer.inlineCallbacks
@ -151,6 +152,9 @@ class GroupAttestionRenewer(object):
defer.returnValue({}) defer.returnValue({})
def _start_renew_attestations(self):
run_as_background_process("renew_attestations", self._renew_attestations)
@defer.inlineCallbacks @defer.inlineCallbacks
def _renew_attestations(self): def _renew_attestations(self):
"""Called periodically to check if we need to update any of our attestations """Called periodically to check if we need to update any of our attestations

View file

@ -192,7 +192,7 @@ class ReplicationClientHandler(object):
"""Returns a deferred that is resolved when we receive a SYNC command """Returns a deferred that is resolved when we receive a SYNC command
with given data. with given data.
Used by tests. [Not currently] used by tests.
""" """
return self.awaiting_syncs.setdefault(data, defer.Deferred()) return self.awaiting_syncs.setdefault(data, defer.Deferred())

View file

@ -25,6 +25,7 @@ from twisted.internet import defer
from twisted.internet.protocol import Factory from twisted.internet.protocol import Factory
from synapse.metrics import LaterGauge from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.metrics import Measure, measure_func from synapse.util.metrics import Measure, measure_func
from .protocol import ServerReplicationStreamProtocol from .protocol import ServerReplicationStreamProtocol
@ -117,7 +118,6 @@ class ReplicationStreamer(object):
for conn in self.connections: for conn in self.connections:
conn.send_error("server shutting down") conn.send_error("server shutting down")
@defer.inlineCallbacks
def on_notifier_poke(self): def on_notifier_poke(self):
"""Checks if there is actually any new data and sends it to the """Checks if there is actually any new data and sends it to the
connections if there are. connections if there are.
@ -132,14 +132,16 @@ class ReplicationStreamer(object):
stream.discard_updates_and_advance() stream.discard_updates_and_advance()
return return
# If we're in the process of checking for new updates, mark that fact self.pending_updates = True
# and return
if self.is_looping: if self.is_looping:
logger.debug("Noitifier poke loop already running") logger.debug("Notifier poke loop already running")
self.pending_updates = True
return return
self.pending_updates = True run_as_background_process("replication_notifier", self._run_notifier_loop)
@defer.inlineCallbacks
def _run_notifier_loop(self):
self.is_looping = True self.is_looping = True
try: try:

View file

@ -35,6 +35,7 @@ from synapse.api.errors import (
SynapseError, SynapseError,
) )
from synapse.http.matrixfederationclient import MatrixFederationHttpClient from synapse.http.matrixfederationclient import MatrixFederationHttpClient
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.async import Linearizer from synapse.util.async import Linearizer
from synapse.util.logcontext import make_deferred_yieldable from synapse.util.logcontext import make_deferred_yieldable
from synapse.util.retryutils import NotRetryingDestination from synapse.util.retryutils import NotRetryingDestination
@ -100,10 +101,15 @@ class MediaRepository(object):
) )
self.clock.looping_call( self.clock.looping_call(
self._update_recently_accessed, self._start_update_recently_accessed,
UPDATE_RECENTLY_ACCESSED_TS, UPDATE_RECENTLY_ACCESSED_TS,
) )
def _start_update_recently_accessed(self):
run_as_background_process(
"update_recently_accessed_media", self._update_recently_accessed,
)
@defer.inlineCallbacks @defer.inlineCallbacks
def _update_recently_accessed(self): def _update_recently_accessed(self):
remote_media = self.recently_accessed_remotes remote_media = self.recently_accessed_remotes

View file

@ -41,6 +41,7 @@ from synapse.http.server import (
wrap_json_request_handler, wrap_json_request_handler,
) )
from synapse.http.servlet import parse_integer, parse_string from synapse.http.servlet import parse_integer, parse_string
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.async import ObservableDeferred from synapse.util.async import ObservableDeferred
from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.logcontext import make_deferred_yieldable, run_in_background from synapse.util.logcontext import make_deferred_yieldable, run_in_background
@ -81,7 +82,7 @@ class PreviewUrlResource(Resource):
self._cache.start() self._cache.start()
self._cleaner_loop = self.clock.looping_call( self._cleaner_loop = self.clock.looping_call(
self._expire_url_cache_data, 10 * 1000 self._start_expire_url_cache_data, 10 * 1000,
) )
def render_OPTIONS(self, request): def render_OPTIONS(self, request):
@ -371,6 +372,11 @@ class PreviewUrlResource(Resource):
"etag": headers["ETag"][0] if "ETag" in headers else None, "etag": headers["ETag"][0] if "ETag" in headers else None,
}) })
def _start_expire_url_cache_data(self):
run_as_background_process(
"expire_url_cache_data", self._expire_url_cache_data,
)
@defer.inlineCallbacks @defer.inlineCallbacks
def _expire_url_cache_data(self): def _expire_url_cache_data(self):
"""Clean up expired url cache content, media and thumbnails. """Clean up expired url cache content, media and thumbnails.

View file

@ -21,6 +21,7 @@ from canonicaljson import json
from twisted.internet import defer from twisted.internet import defer
from synapse.api.errors import StoreError from synapse.api.errors import StoreError
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
from ._base import Cache, SQLBaseStore from ._base import Cache, SQLBaseStore
@ -711,6 +712,9 @@ class DeviceStore(SQLBaseStore):
logger.info("Pruned %d device list outbound pokes", txn.rowcount) logger.info("Pruned %d device list outbound pokes", txn.rowcount)
return self.runInteraction( run_as_background_process(
"_prune_old_outbound_device_pokes", _prune_txn "prune_old_outbound_device_pokes",
self.runInteraction,
"_prune_old_outbound_device_pokes",
_prune_txn,
) )

View file

@ -23,6 +23,7 @@ from unpaddedbase64 import encode_base64
from twisted.internet import defer from twisted.internet import defer
from synapse.api.errors import StoreError from synapse.api.errors import StoreError
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage._base import SQLBaseStore from synapse.storage._base import SQLBaseStore
from synapse.storage.events import EventsWorkerStore from synapse.storage.events import EventsWorkerStore
from synapse.storage.signatures import SignatureWorkerStore from synapse.storage.signatures import SignatureWorkerStore
@ -446,7 +447,7 @@ class EventFederationStore(EventFederationWorkerStore):
) )
hs.get_clock().looping_call( hs.get_clock().looping_call(
self._delete_old_forward_extrem_cache, 60 * 60 * 1000 self._delete_old_forward_extrem_cache, 60 * 60 * 1000,
) )
def _update_min_depth_for_room_txn(self, txn, room_id, depth): def _update_min_depth_for_room_txn(self, txn, room_id, depth):
@ -548,9 +549,11 @@ class EventFederationStore(EventFederationWorkerStore):
sql, sql,
(self.stream_ordering_month_ago, self.stream_ordering_month_ago,) (self.stream_ordering_month_ago, self.stream_ordering_month_ago,)
) )
return self.runInteraction( run_as_background_process(
"delete_old_forward_extrem_cache",
self.runInteraction,
"_delete_old_forward_extrem_cache", "_delete_old_forward_extrem_cache",
_delete_old_forward_extrem_cache_txn _delete_old_forward_extrem_cache_txn,
) )
def clean_room_for_join(self, room_id): def clean_room_for_join(self, room_id):

View file

@ -22,6 +22,7 @@ from canonicaljson import json
from twisted.internet import defer from twisted.internet import defer
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage._base import LoggingTransaction, SQLBaseStore from synapse.storage._base import LoggingTransaction, SQLBaseStore
from synapse.util.caches.descriptors import cachedInlineCallbacks from synapse.util.caches.descriptors import cachedInlineCallbacks
@ -458,11 +459,12 @@ class EventPushActionsWorkerStore(SQLBaseStore):
"Error removing push actions after event persistence failure", "Error removing push actions after event persistence failure",
) )
@defer.inlineCallbacks
def _find_stream_orderings_for_times(self): def _find_stream_orderings_for_times(self):
yield self.runInteraction( run_as_background_process(
"event_push_action_stream_orderings",
self.runInteraction,
"_find_stream_orderings_for_times", "_find_stream_orderings_for_times",
self._find_stream_orderings_for_times_txn self._find_stream_orderings_for_times_txn,
) )
def _find_stream_orderings_for_times_txn(self, txn): def _find_stream_orderings_for_times_txn(self, txn):
@ -604,7 +606,7 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
self._doing_notif_rotation = False self._doing_notif_rotation = False
self._rotate_notif_loop = self._clock.looping_call( self._rotate_notif_loop = self._clock.looping_call(
self._rotate_notifs, 30 * 60 * 1000 self._start_rotate_notifs, 30 * 60 * 1000,
) )
def _set_push_actions_for_event_and_users_txn(self, txn, events_and_contexts, def _set_push_actions_for_event_and_users_txn(self, txn, events_and_contexts,
@ -787,6 +789,9 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
WHERE room_id = ? AND user_id = ? AND stream_ordering <= ? WHERE room_id = ? AND user_id = ? AND stream_ordering <= ?
""", (room_id, user_id, stream_ordering)) """, (room_id, user_id, stream_ordering))
def _start_rotate_notifs(self):
run_as_background_process("rotate_notifs", self._rotate_notifs)
@defer.inlineCallbacks @defer.inlineCallbacks
def _rotate_notifs(self): def _rotate_notifs(self):
if self._doing_notif_rotation or self.stream_ordering_day_ago is None: if self._doing_notif_rotation or self.stream_ordering_day_ago is None:

View file

@ -22,6 +22,7 @@ from canonicaljson import encode_canonical_json, json
from twisted.internet import defer from twisted.internet import defer
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.caches.descriptors import cached from synapse.util.caches.descriptors import cached
from ._base import SQLBaseStore from ._base import SQLBaseStore
@ -57,7 +58,7 @@ class TransactionStore(SQLBaseStore):
def __init__(self, db_conn, hs): def __init__(self, db_conn, hs):
super(TransactionStore, self).__init__(db_conn, hs) super(TransactionStore, self).__init__(db_conn, hs)
self._clock.looping_call(self._cleanup_transactions, 30 * 60 * 1000) self._clock.looping_call(self._start_cleanup_transactions, 30 * 60 * 1000)
def get_received_txn_response(self, transaction_id, origin): def get_received_txn_response(self, transaction_id, origin):
"""For an incoming transaction from a given origin, check if we have """For an incoming transaction from a given origin, check if we have
@ -271,6 +272,9 @@ class TransactionStore(SQLBaseStore):
txn.execute(query, (self._clock.time_msec(),)) txn.execute(query, (self._clock.time_msec(),))
return self.cursor_to_dict(txn) return self.cursor_to_dict(txn)
def _start_cleanup_transactions(self):
run_as_background_process("cleanup_transactions", self._cleanup_transactions)
def _cleanup_transactions(self): def _cleanup_transactions(self):
now = self._clock.time_msec() now = self._clock.time_msec()
month_ago = now - 30 * 24 * 60 * 60 * 1000 month_ago = now - 30 * 24 * 60 * 60 * 1000

View file

@ -11,23 +11,44 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import tempfile import tempfile
from mock import Mock, NonCallableMock from mock import Mock, NonCallableMock
from twisted.internet import defer, reactor from twisted.internet import defer, reactor
from twisted.internet.defer import Deferred
from synapse.replication.tcp.client import ( from synapse.replication.tcp.client import (
ReplicationClientFactory, ReplicationClientFactory,
ReplicationClientHandler, ReplicationClientHandler,
) )
from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
from synapse.util.logcontext import PreserveLoggingContext, make_deferred_yieldable
from tests import unittest from tests import unittest
from tests.utils import setup_test_homeserver from tests.utils import setup_test_homeserver
class TestReplicationClientHandler(ReplicationClientHandler):
"""Overrides on_rdata so that we can wait for it to happen"""
def __init__(self, store):
super(TestReplicationClientHandler, self).__init__(store)
self._rdata_awaiters = []
def await_replication(self):
d = Deferred()
self._rdata_awaiters.append(d)
return make_deferred_yieldable(d)
def on_rdata(self, stream_name, token, rows):
awaiters = self._rdata_awaiters
self._rdata_awaiters = []
super(TestReplicationClientHandler, self).on_rdata(stream_name, token, rows)
with PreserveLoggingContext():
for a in awaiters:
a.callback(None)
class BaseSlavedStoreTestCase(unittest.TestCase): class BaseSlavedStoreTestCase(unittest.TestCase):
@defer.inlineCallbacks @defer.inlineCallbacks
def setUp(self): def setUp(self):
@ -52,7 +73,7 @@ class BaseSlavedStoreTestCase(unittest.TestCase):
self.addCleanup(listener.stopListening) self.addCleanup(listener.stopListening)
self.streamer = server_factory.streamer self.streamer = server_factory.streamer
self.replication_handler = ReplicationClientHandler(self.slaved_store) self.replication_handler = TestReplicationClientHandler(self.slaved_store)
client_factory = ReplicationClientFactory( client_factory = ReplicationClientFactory(
self.hs, "client_name", self.replication_handler self.hs, "client_name", self.replication_handler
) )
@ -60,12 +81,14 @@ class BaseSlavedStoreTestCase(unittest.TestCase):
self.addCleanup(client_factory.stopTrying) self.addCleanup(client_factory.stopTrying)
self.addCleanup(client_connector.disconnect) self.addCleanup(client_connector.disconnect)
@defer.inlineCallbacks
def replicate(self): def replicate(self):
yield self.streamer.on_notifier_poke() """Tell the master side of replication that something has happened, and then
d = self.replication_handler.await_sync("replication_test") wait for the replication to occur.
self.streamer.send_sync_to_all_connections("replication_test") """
yield d # xxx: should we be more specific in what we wait for?
d = self.replication_handler.await_replication()
self.streamer.on_notifier_poke()
return d
@defer.inlineCallbacks @defer.inlineCallbacks
def check(self, method, args, expected_result=None): def check(self, method, args, expected_result=None):