0
0
Fork 1
mirror of https://mau.dev/maunium/synapse.git synced 2024-12-14 20:03:54 +01:00

Fix perf of fetching the same events many times. (#10703)

The code to deduplicate repeated fetches of the same set of events was
N^2 (over the number of events requested), which could lead to a process
being completely wedged.

The main fix is to deduplicate the returned deferreds so we only await
on a deferred once rather than many times. Seperately, when handling the
returned events from the defrered we only add the events we care about
to the event map to be returned (so that we don't pay the price of
inserting extraneous events into the dict).
This commit is contained in:
Erik Johnston 2021-08-27 10:15:50 +01:00 committed by GitHub
parent 1800aabfc2
commit c4fa4f37cb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 24 additions and 6 deletions

1
changelog.d/10703.bugfix Normal file
View file

@ -0,0 +1 @@
Fix a regression introduced in v1.41.0 which affected the performance of concurrent fetches of large sets of events, in extreme cases causing the process to hang.

View file

@ -520,16 +520,26 @@ class EventsWorkerStore(SQLBaseStore):
# We now look up if we're already fetching some of the events in the DB,
# if so we wait for those lookups to finish instead of pulling the same
# events out of the DB multiple times.
already_fetching: Dict[str, defer.Deferred] = {}
#
# Note: we might get the same `ObservableDeferred` back for multiple
# events we're already fetching, so we deduplicate the deferreds to
# avoid extraneous work (if we don't do this we can end up in a n^2 mode
# when we wait on the same Deferred N times, then try and merge the
# same dict into itself N times).
already_fetching_ids: Set[str] = set()
already_fetching_deferreds: Set[
ObservableDeferred[Dict[str, _EventCacheEntry]]
] = set()
for event_id in missing_events_ids:
deferred = self._current_event_fetches.get(event_id)
if deferred is not None:
# We're already pulling the event out of the DB. Add the deferred
# to the collection of deferreds to wait on.
already_fetching[event_id] = deferred.observe()
already_fetching_ids.add(event_id)
already_fetching_deferreds.add(deferred)
missing_events_ids.difference_update(already_fetching)
missing_events_ids.difference_update(already_fetching_ids)
if missing_events_ids:
log_ctx = current_context()
@ -569,18 +579,25 @@ class EventsWorkerStore(SQLBaseStore):
with PreserveLoggingContext():
fetching_deferred.callback(missing_events)
if already_fetching:
if already_fetching_deferreds:
# Wait for the other event requests to finish and add their results
# to ours.
results = await make_deferred_yieldable(
defer.gatherResults(
already_fetching.values(),
(d.observe() for d in already_fetching_deferreds),
consumeErrors=True,
)
).addErrback(unwrapFirstError)
for result in results:
event_entry_map.update(result)
# We filter out events that we haven't asked for as we might get
# a *lot* of superfluous events back, and there is no point
# going through and inserting them all (which can take time).
event_entry_map.update(
(event_id, entry)
for event_id, entry in result.items()
if event_id in already_fetching_ids
)
if not allow_rejected:
event_entry_map = {