forked from MirrorHub/synapse
Fix some instances of ExpiringCache not expiring cache items
ExpiringCache required that `start()` be called before it would actually start expiring entries. A number of places didn't do that. This PR removes `start` from ExpiringCache, and automatically starts backround reaping process on creation instead.
This commit is contained in:
parent
ad53a5497d
commit
8601c24287
16 changed files with 12 additions and 39 deletions
|
@ -172,7 +172,6 @@ def start(config_options):
|
||||||
|
|
||||||
def start():
|
def start():
|
||||||
ps.get_datastore().start_profiling()
|
ps.get_datastore().start_profiling()
|
||||||
ps.get_state_handler().start_caching()
|
|
||||||
|
|
||||||
reactor.callWhenRunning(start)
|
reactor.callWhenRunning(start)
|
||||||
|
|
||||||
|
|
|
@ -181,7 +181,6 @@ def start(config_options):
|
||||||
ss.start_listening(config.worker_listeners)
|
ss.start_listening(config.worker_listeners)
|
||||||
|
|
||||||
def start():
|
def start():
|
||||||
ss.get_state_handler().start_caching()
|
|
||||||
ss.get_datastore().start_profiling()
|
ss.get_datastore().start_profiling()
|
||||||
|
|
||||||
reactor.callWhenRunning(start)
|
reactor.callWhenRunning(start)
|
||||||
|
|
|
@ -199,7 +199,6 @@ def start(config_options):
|
||||||
ss.start_listening(config.worker_listeners)
|
ss.start_listening(config.worker_listeners)
|
||||||
|
|
||||||
def start():
|
def start():
|
||||||
ss.get_state_handler().start_caching()
|
|
||||||
ss.get_datastore().start_profiling()
|
ss.get_datastore().start_profiling()
|
||||||
|
|
||||||
reactor.callWhenRunning(start)
|
reactor.callWhenRunning(start)
|
||||||
|
|
|
@ -168,7 +168,6 @@ def start(config_options):
|
||||||
ss.start_listening(config.worker_listeners)
|
ss.start_listening(config.worker_listeners)
|
||||||
|
|
||||||
def start():
|
def start():
|
||||||
ss.get_state_handler().start_caching()
|
|
||||||
ss.get_datastore().start_profiling()
|
ss.get_datastore().start_profiling()
|
||||||
|
|
||||||
reactor.callWhenRunning(start)
|
reactor.callWhenRunning(start)
|
||||||
|
|
|
@ -201,7 +201,6 @@ def start(config_options):
|
||||||
|
|
||||||
def start():
|
def start():
|
||||||
ps.get_datastore().start_profiling()
|
ps.get_datastore().start_profiling()
|
||||||
ps.get_state_handler().start_caching()
|
|
||||||
|
|
||||||
reactor.callWhenRunning(start)
|
reactor.callWhenRunning(start)
|
||||||
_base.start_worker_reactor("synapse-federation-sender", config)
|
_base.start_worker_reactor("synapse-federation-sender", config)
|
||||||
|
|
|
@ -258,7 +258,6 @@ def start(config_options):
|
||||||
ss.start_listening(config.worker_listeners)
|
ss.start_listening(config.worker_listeners)
|
||||||
|
|
||||||
def start():
|
def start():
|
||||||
ss.get_state_handler().start_caching()
|
|
||||||
ss.get_datastore().start_profiling()
|
ss.get_datastore().start_profiling()
|
||||||
|
|
||||||
reactor.callWhenRunning(start)
|
reactor.callWhenRunning(start)
|
||||||
|
|
|
@ -384,7 +384,6 @@ def setup(config_options):
|
||||||
|
|
||||||
def start():
|
def start():
|
||||||
hs.get_pusherpool().start()
|
hs.get_pusherpool().start()
|
||||||
hs.get_state_handler().start_caching()
|
|
||||||
hs.get_datastore().start_profiling()
|
hs.get_datastore().start_profiling()
|
||||||
hs.get_datastore().start_doing_background_updates()
|
hs.get_datastore().start_doing_background_updates()
|
||||||
hs.get_federation_client().start_get_pdu_cache()
|
hs.get_federation_client().start_get_pdu_cache()
|
||||||
|
|
|
@ -168,7 +168,6 @@ def start(config_options):
|
||||||
ss.start_listening(config.worker_listeners)
|
ss.start_listening(config.worker_listeners)
|
||||||
|
|
||||||
def start():
|
def start():
|
||||||
ss.get_state_handler().start_caching()
|
|
||||||
ss.get_datastore().start_profiling()
|
ss.get_datastore().start_profiling()
|
||||||
|
|
||||||
reactor.callWhenRunning(start)
|
reactor.callWhenRunning(start)
|
||||||
|
|
|
@ -228,7 +228,6 @@ def start(config_options):
|
||||||
def start():
|
def start():
|
||||||
ps.get_pusherpool().start()
|
ps.get_pusherpool().start()
|
||||||
ps.get_datastore().start_profiling()
|
ps.get_datastore().start_profiling()
|
||||||
ps.get_state_handler().start_caching()
|
|
||||||
|
|
||||||
reactor.callWhenRunning(start)
|
reactor.callWhenRunning(start)
|
||||||
|
|
||||||
|
|
|
@ -435,7 +435,6 @@ def start(config_options):
|
||||||
|
|
||||||
def start():
|
def start():
|
||||||
ss.get_datastore().start_profiling()
|
ss.get_datastore().start_profiling()
|
||||||
ss.get_state_handler().start_caching()
|
|
||||||
|
|
||||||
reactor.callWhenRunning(start)
|
reactor.callWhenRunning(start)
|
||||||
|
|
||||||
|
|
|
@ -229,7 +229,6 @@ def start(config_options):
|
||||||
|
|
||||||
def start():
|
def start():
|
||||||
ps.get_datastore().start_profiling()
|
ps.get_datastore().start_profiling()
|
||||||
ps.get_state_handler().start_caching()
|
|
||||||
|
|
||||||
reactor.callWhenRunning(start)
|
reactor.callWhenRunning(start)
|
||||||
|
|
||||||
|
|
|
@ -66,6 +66,14 @@ class FederationClient(FederationBase):
|
||||||
self.state = hs.get_state_handler()
|
self.state = hs.get_state_handler()
|
||||||
self.transport_layer = hs.get_federation_transport_client()
|
self.transport_layer = hs.get_federation_transport_client()
|
||||||
|
|
||||||
|
self._get_pdu_cache = ExpiringCache(
|
||||||
|
cache_name="get_pdu_cache",
|
||||||
|
clock=self._clock,
|
||||||
|
max_len=1000,
|
||||||
|
expiry_ms=120 * 1000,
|
||||||
|
reset_expiry_on_get=False,
|
||||||
|
)
|
||||||
|
|
||||||
def _clear_tried_cache(self):
|
def _clear_tried_cache(self):
|
||||||
"""Clear pdu_destination_tried cache"""
|
"""Clear pdu_destination_tried cache"""
|
||||||
now = self._clock.time_msec()
|
now = self._clock.time_msec()
|
||||||
|
@ -82,17 +90,6 @@ class FederationClient(FederationBase):
|
||||||
if destination_dict:
|
if destination_dict:
|
||||||
self.pdu_destination_tried[event_id] = destination_dict
|
self.pdu_destination_tried[event_id] = destination_dict
|
||||||
|
|
||||||
def start_get_pdu_cache(self):
|
|
||||||
self._get_pdu_cache = ExpiringCache(
|
|
||||||
cache_name="get_pdu_cache",
|
|
||||||
clock=self._clock,
|
|
||||||
max_len=1000,
|
|
||||||
expiry_ms=120 * 1000,
|
|
||||||
reset_expiry_on_get=False,
|
|
||||||
)
|
|
||||||
|
|
||||||
self._get_pdu_cache.start()
|
|
||||||
|
|
||||||
@log_function
|
@log_function
|
||||||
def make_query(self, destination, query_type, args,
|
def make_query(self, destination, query_type, args,
|
||||||
retry_on_dns_fail=False, ignore_backoff=False):
|
retry_on_dns_fail=False, ignore_backoff=False):
|
||||||
|
@ -229,7 +226,6 @@ class FederationClient(FederationBase):
|
||||||
|
|
||||||
# TODO: Rate limit the number of times we try and get the same event.
|
# TODO: Rate limit the number of times we try and get the same event.
|
||||||
|
|
||||||
if self._get_pdu_cache:
|
|
||||||
ev = self._get_pdu_cache.get(event_id)
|
ev = self._get_pdu_cache.get(event_id)
|
||||||
if ev:
|
if ev:
|
||||||
defer.returnValue(ev)
|
defer.returnValue(ev)
|
||||||
|
@ -285,7 +281,7 @@ class FederationClient(FederationBase):
|
||||||
)
|
)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if self._get_pdu_cache is not None and signed_pdu:
|
if signed_pdu:
|
||||||
self._get_pdu_cache[event_id] = signed_pdu
|
self._get_pdu_cache[event_id] = signed_pdu
|
||||||
|
|
||||||
defer.returnValue(signed_pdu)
|
defer.returnValue(signed_pdu)
|
||||||
|
|
|
@ -79,7 +79,6 @@ class PreviewUrlResource(Resource):
|
||||||
# don't spider URLs more often than once an hour
|
# don't spider URLs more often than once an hour
|
||||||
expiry_ms=60 * 60 * 1000,
|
expiry_ms=60 * 60 * 1000,
|
||||||
)
|
)
|
||||||
self._cache.start()
|
|
||||||
|
|
||||||
self._cleaner_loop = self.clock.looping_call(
|
self._cleaner_loop = self.clock.looping_call(
|
||||||
self._start_expire_url_cache_data, 10 * 1000,
|
self._start_expire_url_cache_data, 10 * 1000,
|
||||||
|
|
|
@ -95,10 +95,6 @@ class StateHandler(object):
|
||||||
self.hs = hs
|
self.hs = hs
|
||||||
self._state_resolution_handler = hs.get_state_resolution_handler()
|
self._state_resolution_handler = hs.get_state_resolution_handler()
|
||||||
|
|
||||||
def start_caching(self):
|
|
||||||
# TODO: remove this shim
|
|
||||||
self._state_resolution_handler.start_caching()
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def get_current_state(self, room_id, event_type=None, state_key="",
|
def get_current_state(self, room_id, event_type=None, state_key="",
|
||||||
latest_event_ids=None):
|
latest_event_ids=None):
|
||||||
|
@ -428,9 +424,6 @@ class StateResolutionHandler(object):
|
||||||
self._state_cache = None
|
self._state_cache = None
|
||||||
self.resolve_linearizer = Linearizer(name="state_resolve_lock")
|
self.resolve_linearizer = Linearizer(name="state_resolve_lock")
|
||||||
|
|
||||||
def start_caching(self):
|
|
||||||
logger.debug("start_caching")
|
|
||||||
|
|
||||||
self._state_cache = ExpiringCache(
|
self._state_cache = ExpiringCache(
|
||||||
cache_name="state_cache",
|
cache_name="state_cache",
|
||||||
clock=self.clock,
|
clock=self.clock,
|
||||||
|
@ -440,8 +433,6 @@ class StateResolutionHandler(object):
|
||||||
reset_expiry_on_get=True,
|
reset_expiry_on_get=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
self._state_cache.start()
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@log_function
|
@log_function
|
||||||
def resolve_state_groups(
|
def resolve_state_groups(
|
||||||
|
|
|
@ -58,7 +58,6 @@ class ExpiringCache(object):
|
||||||
|
|
||||||
self.metrics = register_cache("expiring", cache_name, self)
|
self.metrics = register_cache("expiring", cache_name, self)
|
||||||
|
|
||||||
def start(self):
|
|
||||||
if not self._expiry_ms:
|
if not self._expiry_ms:
|
||||||
# Don't bother starting the loop if things never expire
|
# Don't bother starting the loop if things never expire
|
||||||
return
|
return
|
||||||
|
|
|
@ -65,7 +65,6 @@ class ExpiringCacheTestCase(unittest.TestCase):
|
||||||
def test_time_eviction(self):
|
def test_time_eviction(self):
|
||||||
clock = MockClock()
|
clock = MockClock()
|
||||||
cache = ExpiringCache("test", clock, expiry_ms=1000)
|
cache = ExpiringCache("test", clock, expiry_ms=1000)
|
||||||
cache.start()
|
|
||||||
|
|
||||||
cache["key"] = 1
|
cache["key"] = 1
|
||||||
clock.advance_time(0.5)
|
clock.advance_time(0.5)
|
||||||
|
|
Loading…
Reference in a new issue