forked from MirrorHub/synapse
Optimise missing prev_event handling (#9601)
Background: When we receive incoming federation traffic, and notice that we are missing prev_events from the incoming traffic, first we do a `/get_missing_events` request, and then if we still have missing prev_events, we set up new backwards-extremities. To do that, we need to make a `/state_ids` request to ask the remote server for the state at those prev_events, and then we may need to then ask the remote server for any events in that state which we don't already have, as well as the auth events for those missing state events, so that we can auth them. This PR attempts to optimise the processing of that state request. The `state_ids` API returns a list of the state events, as well as a list of all the auth events for *all* of those state events. The optimisation comes from the observation that we are currently loading all of those auth events into memory at the start of the operation, but we almost certainly aren't going to need *all* of the auth events. Rather, we can check that we have them, and leave the actual load into memory for later. (Ideally the federation API would tell us which auth events we're actually going to need, but it doesn't.) The effect of this is to reduce the number of events that I need to load for an event in Matrix HQ from about 60000 to about 22000, which means it can stay in my in-memory cache, whereas previously the sheer number of events meant that all 60K events had to be loaded from db for each request, due to the amount of cache churn. (NB I've already tripled the size of the cache from its default of 10K). Unfortunately I've ended up basically C&Ping `_get_state_for_room` and `_get_events_from_store_or_dest` into a new method, because `_get_state_for_room` is also called during backfill, which expects the auth events to be returned, so the same tricks don't work. That said, I don't really know why that codepath is completely different (ultimately we're doing the same thing in setting up a new backwards extremity) so I've left a TODO suggesting that we clean it up.
This commit is contained in:
parent
55da8df078
commit
af2248f8bf
3 changed files with 137 additions and 28 deletions
1
changelog.d/9601.feature
Normal file
1
changelog.d/9601.feature
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Optimise handling of incomplete room history for incoming federation.
|
|
@ -353,17 +353,16 @@ class FederationHandler(BaseHandler):
|
||||||
# Ask the remote server for the states we don't
|
# Ask the remote server for the states we don't
|
||||||
# know about
|
# know about
|
||||||
for p in prevs - seen:
|
for p in prevs - seen:
|
||||||
logger.info(
|
logger.info("Requesting state after missing prev_event %s", p)
|
||||||
"Requesting state at missing prev_event %s",
|
|
||||||
event_id,
|
|
||||||
)
|
|
||||||
|
|
||||||
with nested_logging_context(p):
|
with nested_logging_context(p):
|
||||||
# note that if any of the missing prevs share missing state or
|
# note that if any of the missing prevs share missing state or
|
||||||
# auth events, the requests to fetch those events are deduped
|
# auth events, the requests to fetch those events are deduped
|
||||||
# by the get_pdu_cache in federation_client.
|
# by the get_pdu_cache in federation_client.
|
||||||
(remote_state, _,) = await self._get_state_for_room(
|
remote_state = (
|
||||||
origin, room_id, p, include_event_in_state=True
|
await self._get_state_after_missing_prev_event(
|
||||||
|
origin, room_id, p
|
||||||
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
remote_state_map = {
|
remote_state_map = {
|
||||||
|
@ -539,7 +538,6 @@ class FederationHandler(BaseHandler):
|
||||||
destination: str,
|
destination: str,
|
||||||
room_id: str,
|
room_id: str,
|
||||||
event_id: str,
|
event_id: str,
|
||||||
include_event_in_state: bool = False,
|
|
||||||
) -> Tuple[List[EventBase], List[EventBase]]:
|
) -> Tuple[List[EventBase], List[EventBase]]:
|
||||||
"""Requests all of the room state at a given event from a remote homeserver.
|
"""Requests all of the room state at a given event from a remote homeserver.
|
||||||
|
|
||||||
|
@ -547,11 +545,9 @@ class FederationHandler(BaseHandler):
|
||||||
destination: The remote homeserver to query for the state.
|
destination: The remote homeserver to query for the state.
|
||||||
room_id: The id of the room we're interested in.
|
room_id: The id of the room we're interested in.
|
||||||
event_id: The id of the event we want the state at.
|
event_id: The id of the event we want the state at.
|
||||||
include_event_in_state: if true, the event itself will be included in the
|
|
||||||
returned state event list.
|
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
A list of events in the state, possibly including the event itself, and
|
A list of events in the state, not including the event itself, and
|
||||||
a list of events in the auth chain for the given event.
|
a list of events in the auth chain for the given event.
|
||||||
"""
|
"""
|
||||||
(
|
(
|
||||||
|
@ -563,9 +559,6 @@ class FederationHandler(BaseHandler):
|
||||||
|
|
||||||
desired_events = set(state_event_ids + auth_event_ids)
|
desired_events = set(state_event_ids + auth_event_ids)
|
||||||
|
|
||||||
if include_event_in_state:
|
|
||||||
desired_events.add(event_id)
|
|
||||||
|
|
||||||
event_map = await self._get_events_from_store_or_dest(
|
event_map = await self._get_events_from_store_or_dest(
|
||||||
destination, room_id, desired_events
|
destination, room_id, desired_events
|
||||||
)
|
)
|
||||||
|
@ -582,13 +575,6 @@ class FederationHandler(BaseHandler):
|
||||||
event_map[e_id] for e_id in state_event_ids if e_id in event_map
|
event_map[e_id] for e_id in state_event_ids if e_id in event_map
|
||||||
]
|
]
|
||||||
|
|
||||||
if include_event_in_state:
|
|
||||||
remote_event = event_map.get(event_id)
|
|
||||||
if not remote_event:
|
|
||||||
raise Exception("Unable to get missing prev_event %s" % (event_id,))
|
|
||||||
if remote_event.is_state() and remote_event.rejected_reason is None:
|
|
||||||
remote_state.append(remote_event)
|
|
||||||
|
|
||||||
auth_chain = [event_map[e_id] for e_id in auth_event_ids if e_id in event_map]
|
auth_chain = [event_map[e_id] for e_id in auth_event_ids if e_id in event_map]
|
||||||
auth_chain.sort(key=lambda e: e.depth)
|
auth_chain.sort(key=lambda e: e.depth)
|
||||||
|
|
||||||
|
@ -662,6 +648,131 @@ class FederationHandler(BaseHandler):
|
||||||
|
|
||||||
return fetched_events
|
return fetched_events
|
||||||
|
|
||||||
|
async def _get_state_after_missing_prev_event(
|
||||||
|
self,
|
||||||
|
destination: str,
|
||||||
|
room_id: str,
|
||||||
|
event_id: str,
|
||||||
|
) -> List[EventBase]:
|
||||||
|
"""Requests all of the room state at a given event from a remote homeserver.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
destination: The remote homeserver to query for the state.
|
||||||
|
room_id: The id of the room we're interested in.
|
||||||
|
event_id: The id of the event we want the state at.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
A list of events in the state, including the event itself
|
||||||
|
"""
|
||||||
|
# TODO: This function is basically the same as _get_state_for_room. Can
|
||||||
|
# we make backfill() use it, rather than having two code paths? I think the
|
||||||
|
# only difference is that backfill() persists the prev events separately.
|
||||||
|
|
||||||
|
(
|
||||||
|
state_event_ids,
|
||||||
|
auth_event_ids,
|
||||||
|
) = await self.federation_client.get_room_state_ids(
|
||||||
|
destination, room_id, event_id=event_id
|
||||||
|
)
|
||||||
|
|
||||||
|
logger.debug(
|
||||||
|
"state_ids returned %i state events, %i auth events",
|
||||||
|
len(state_event_ids),
|
||||||
|
len(auth_event_ids),
|
||||||
|
)
|
||||||
|
|
||||||
|
# start by just trying to fetch the events from the store
|
||||||
|
desired_events = set(state_event_ids)
|
||||||
|
desired_events.add(event_id)
|
||||||
|
logger.debug("Fetching %i events from cache/store", len(desired_events))
|
||||||
|
fetched_events = await self.store.get_events(
|
||||||
|
desired_events, allow_rejected=True
|
||||||
|
)
|
||||||
|
|
||||||
|
missing_desired_events = desired_events - fetched_events.keys()
|
||||||
|
logger.debug(
|
||||||
|
"We are missing %i events (got %i)",
|
||||||
|
len(missing_desired_events),
|
||||||
|
len(fetched_events),
|
||||||
|
)
|
||||||
|
|
||||||
|
# We probably won't need most of the auth events, so let's just check which
|
||||||
|
# we have for now, rather than thrashing the event cache with them all
|
||||||
|
# unnecessarily.
|
||||||
|
|
||||||
|
# TODO: we probably won't actually need all of the auth events, since we
|
||||||
|
# already have a bunch of the state events. It would be nice if the
|
||||||
|
# federation api gave us a way of finding out which we actually need.
|
||||||
|
|
||||||
|
missing_auth_events = set(auth_event_ids) - fetched_events.keys()
|
||||||
|
missing_auth_events.difference_update(
|
||||||
|
await self.store.have_seen_events(missing_auth_events)
|
||||||
|
)
|
||||||
|
logger.debug("We are also missing %i auth events", len(missing_auth_events))
|
||||||
|
|
||||||
|
missing_events = missing_desired_events | missing_auth_events
|
||||||
|
logger.debug("Fetching %i events from remote", len(missing_events))
|
||||||
|
await self._get_events_and_persist(
|
||||||
|
destination=destination, room_id=room_id, events=missing_events
|
||||||
|
)
|
||||||
|
|
||||||
|
# we need to make sure we re-load from the database to get the rejected
|
||||||
|
# state correct.
|
||||||
|
fetched_events.update(
|
||||||
|
(await self.store.get_events(missing_desired_events, allow_rejected=True))
|
||||||
|
)
|
||||||
|
|
||||||
|
# check for events which were in the wrong room.
|
||||||
|
#
|
||||||
|
# this can happen if a remote server claims that the state or
|
||||||
|
# auth_events at an event in room A are actually events in room B
|
||||||
|
|
||||||
|
bad_events = [
|
||||||
|
(event_id, event.room_id)
|
||||||
|
for event_id, event in fetched_events.items()
|
||||||
|
if event.room_id != room_id
|
||||||
|
]
|
||||||
|
|
||||||
|
for bad_event_id, bad_room_id in bad_events:
|
||||||
|
# This is a bogus situation, but since we may only discover it a long time
|
||||||
|
# after it happened, we try our best to carry on, by just omitting the
|
||||||
|
# bad events from the returned state set.
|
||||||
|
logger.warning(
|
||||||
|
"Remote server %s claims event %s in room %s is an auth/state "
|
||||||
|
"event in room %s",
|
||||||
|
destination,
|
||||||
|
bad_event_id,
|
||||||
|
bad_room_id,
|
||||||
|
room_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
del fetched_events[bad_event_id]
|
||||||
|
|
||||||
|
# if we couldn't get the prev event in question, that's a problem.
|
||||||
|
remote_event = fetched_events.get(event_id)
|
||||||
|
if not remote_event:
|
||||||
|
raise Exception("Unable to get missing prev_event %s" % (event_id,))
|
||||||
|
|
||||||
|
# missing state at that event is a warning, not a blocker
|
||||||
|
# XXX: this doesn't sound right? it means that we'll end up with incomplete
|
||||||
|
# state.
|
||||||
|
failed_to_fetch = desired_events - fetched_events.keys()
|
||||||
|
if failed_to_fetch:
|
||||||
|
logger.warning(
|
||||||
|
"Failed to fetch missing state events for %s %s",
|
||||||
|
event_id,
|
||||||
|
failed_to_fetch,
|
||||||
|
)
|
||||||
|
|
||||||
|
remote_state = [
|
||||||
|
fetched_events[e_id] for e_id in state_event_ids if e_id in fetched_events
|
||||||
|
]
|
||||||
|
|
||||||
|
if remote_event.is_state() and remote_event.rejected_reason is None:
|
||||||
|
remote_state.append(remote_event)
|
||||||
|
|
||||||
|
return remote_state
|
||||||
|
|
||||||
async def _process_received_pdu(
|
async def _process_received_pdu(
|
||||||
self,
|
self,
|
||||||
origin: str,
|
origin: str,
|
||||||
|
@ -841,7 +952,6 @@ class FederationHandler(BaseHandler):
|
||||||
destination=dest,
|
destination=dest,
|
||||||
room_id=room_id,
|
room_id=room_id,
|
||||||
event_id=e_id,
|
event_id=e_id,
|
||||||
include_event_in_state=False,
|
|
||||||
)
|
)
|
||||||
auth_events.update({a.event_id: a for a in auth})
|
auth_events.update({a.event_id: a for a in auth})
|
||||||
auth_events.update({s.event_id: s for s in state})
|
auth_events.update({s.event_id: s for s in state})
|
||||||
|
|
|
@ -12,7 +12,7 @@
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
import itertools
|
|
||||||
import logging
|
import logging
|
||||||
import threading
|
import threading
|
||||||
from collections import namedtuple
|
from collections import namedtuple
|
||||||
|
@ -1044,7 +1044,8 @@ class EventsWorkerStore(SQLBaseStore):
|
||||||
Returns:
|
Returns:
|
||||||
set[str]: The events we have already seen.
|
set[str]: The events we have already seen.
|
||||||
"""
|
"""
|
||||||
results = set()
|
# if the event cache contains the event, obviously we've seen it.
|
||||||
|
results = {x for x in event_ids if self._get_event_cache.contains(x)}
|
||||||
|
|
||||||
def have_seen_events_txn(txn, chunk):
|
def have_seen_events_txn(txn, chunk):
|
||||||
sql = "SELECT event_id FROM events as e WHERE "
|
sql = "SELECT event_id FROM events as e WHERE "
|
||||||
|
@ -1052,12 +1053,9 @@ class EventsWorkerStore(SQLBaseStore):
|
||||||
txn.database_engine, "e.event_id", chunk
|
txn.database_engine, "e.event_id", chunk
|
||||||
)
|
)
|
||||||
txn.execute(sql + clause, args)
|
txn.execute(sql + clause, args)
|
||||||
for (event_id,) in txn:
|
results.update(row[0] for row in txn)
|
||||||
results.add(event_id)
|
|
||||||
|
|
||||||
# break the input up into chunks of 100
|
for chunk in batch_iter((x for x in event_ids if x not in results), 100):
|
||||||
input_iterator = iter(event_ids)
|
|
||||||
for chunk in iter(lambda: list(itertools.islice(input_iterator, 100)), []):
|
|
||||||
await self.db_pool.runInteraction(
|
await self.db_pool.runInteraction(
|
||||||
"have_seen_events", have_seen_events_txn, chunk
|
"have_seen_events", have_seen_events_txn, chunk
|
||||||
)
|
)
|
||||||
|
|
Loading…
Reference in a new issue