0
0
Fork 1
mirror of https://mau.dev/maunium/synapse.git synced 2024-11-06 14:48:56 +01:00

Merge pull request #940 from matrix-org/erikj/fed_state_cache

Cache federation state responses
This commit is contained in:
Erik Johnston 2016-08-02 15:21:37 +01:00 committed by GitHub
commit 7b0f6293f2
5 changed files with 61 additions and 33 deletions

View file

@ -21,10 +21,11 @@ from .units import Transaction, Edu
from synapse.util.async import Linearizer from synapse.util.async import Linearizer
from synapse.util.logutils import log_function from synapse.util.logutils import log_function
from synapse.util.caches.response_cache import ResponseCache
from synapse.events import FrozenEvent from synapse.events import FrozenEvent
import synapse.metrics import synapse.metrics
from synapse.api.errors import FederationError, SynapseError from synapse.api.errors import AuthError, FederationError, SynapseError
from synapse.crypto.event_signing import compute_event_signature from synapse.crypto.event_signing import compute_event_signature
@ -48,9 +49,15 @@ class FederationServer(FederationBase):
def __init__(self, hs): def __init__(self, hs):
super(FederationServer, self).__init__(hs) super(FederationServer, self).__init__(hs)
self.auth = hs.get_auth()
self._room_pdu_linearizer = Linearizer() self._room_pdu_linearizer = Linearizer()
self._server_linearizer = Linearizer() self._server_linearizer = Linearizer()
# We cache responses to state queries, as they take a while and often
# come in waves.
self._state_resp_cache = ResponseCache(hs, timeout_ms=30000)
def set_handler(self, handler): def set_handler(self, handler):
"""Sets the handler that the replication layer will use to communicate """Sets the handler that the replication layer will use to communicate
receipt of new PDUs from other home servers. The required methods are receipt of new PDUs from other home servers. The required methods are
@ -188,28 +195,45 @@ class FederationServer(FederationBase):
@defer.inlineCallbacks @defer.inlineCallbacks
@log_function @log_function
def on_context_state_request(self, origin, room_id, event_id): def on_context_state_request(self, origin, room_id, event_id):
with (yield self._server_linearizer.queue((origin, room_id))): if not event_id:
if event_id: raise NotImplementedError("Specify an event")
pdus = yield self.handler.get_state_for_pdu(
origin, room_id, event_id,
)
auth_chain = yield self.store.get_auth_chain(
[pdu.event_id for pdu in pdus]
)
for event in auth_chain: in_room = yield self.auth.check_host_in_room(room_id, origin)
# We sign these again because there was a bug where we if not in_room:
# incorrectly signed things the first time round raise AuthError(403, "Host not in room.")
if self.hs.is_mine_id(event.event_id):
event.signatures.update( result = self._state_resp_cache.get((room_id, event_id))
compute_event_signature( if not result:
event, with (yield self._server_linearizer.queue((origin, room_id))):
self.hs.hostname, resp = yield self.response_cache.set(
self.hs.config.signing_key[0] (room_id, event_id),
) self._on_context_state_request_compute(room_id, event_id)
) )
else: else:
raise NotImplementedError("Specify an event") resp = yield result
defer.returnValue((200, resp))
@defer.inlineCallbacks
def _on_context_state_request_compute(self, room_id, event_id):
pdus = yield self.handler.get_state_for_pdu(
room_id, event_id,
)
auth_chain = yield self.store.get_auth_chain(
[pdu.event_id for pdu in pdus]
)
for event in auth_chain:
# We sign these again because there was a bug where we
# incorrectly signed things the first time round
if self.hs.is_mine_id(event.event_id):
event.signatures.update(
compute_event_signature(
event,
self.hs.hostname,
self.hs.config.signing_key[0]
)
)
defer.returnValue((200, { defer.returnValue((200, {
"pdus": [pdu.get_pdu_json() for pdu in pdus], "pdus": [pdu.get_pdu_json() for pdu in pdus],

View file

@ -991,14 +991,9 @@ class FederationHandler(BaseHandler):
defer.returnValue(None) defer.returnValue(None)
@defer.inlineCallbacks @defer.inlineCallbacks
def get_state_for_pdu(self, origin, room_id, event_id, do_auth=True): def get_state_for_pdu(self, room_id, event_id):
yield run_on_reactor() yield run_on_reactor()
if do_auth:
in_room = yield self.auth.check_host_in_room(room_id, origin)
if not in_room:
raise AuthError(403, "Host not in room.")
state_groups = yield self.store.get_state_groups( state_groups = yield self.store.get_state_groups(
room_id, [event_id] room_id, [event_id]
) )

View file

@ -345,8 +345,8 @@ class RoomCreationHandler(BaseHandler):
class RoomListHandler(BaseHandler): class RoomListHandler(BaseHandler):
def __init__(self, hs): def __init__(self, hs):
super(RoomListHandler, self).__init__(hs) super(RoomListHandler, self).__init__(hs)
self.response_cache = ResponseCache() self.response_cache = ResponseCache(hs)
self.remote_list_request_cache = ResponseCache() self.remote_list_request_cache = ResponseCache(hs)
self.remote_list_cache = {} self.remote_list_cache = {}
self.fetch_looping_call = hs.get_clock().looping_call( self.fetch_looping_call = hs.get_clock().looping_call(
self.fetch_all_remote_lists, REMOTE_ROOM_LIST_POLL_INTERVAL self.fetch_all_remote_lists, REMOTE_ROOM_LIST_POLL_INTERVAL

View file

@ -138,7 +138,7 @@ class SyncHandler(object):
self.presence_handler = hs.get_presence_handler() self.presence_handler = hs.get_presence_handler()
self.event_sources = hs.get_event_sources() self.event_sources = hs.get_event_sources()
self.clock = hs.get_clock() self.clock = hs.get_clock()
self.response_cache = ResponseCache() self.response_cache = ResponseCache(hs)
def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0, def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
full_state=False): full_state=False):

View file

@ -24,9 +24,12 @@ class ResponseCache(object):
used rather than trying to compute a new response. used rather than trying to compute a new response.
""" """
def __init__(self): def __init__(self, hs, timeout_ms=0):
self.pending_result_cache = {} # Requests that haven't finished yet. self.pending_result_cache = {} # Requests that haven't finished yet.
self.clock = hs.get_clock()
self.timeout_sec = timeout_ms / 1000.
def get(self, key): def get(self, key):
result = self.pending_result_cache.get(key) result = self.pending_result_cache.get(key)
if result is not None: if result is not None:
@ -39,7 +42,13 @@ class ResponseCache(object):
self.pending_result_cache[key] = result self.pending_result_cache[key] = result
def remove(r): def remove(r):
self.pending_result_cache.pop(key, None) if self.timeout_sec:
self.clock.call_later(
self.timeout_sec,
self.pending_result_cache.pop, key, None,
)
else:
self.pending_result_cache.pop(key, None)
return r return r
result.addBoth(remove) result.addBoth(remove)