Parallelize calls to fetch bundled aggregations. (#14510)

The bundled aggregations for annotations, references, and edits
can be parallelized.
This commit is contained in:
Patrick Cloke 2022-11-22 09:47:32 -05:00 committed by GitHub
parent 6d7523ef14
commit 7eb7460042
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 50 additions and 30 deletions

View file

@ -0,0 +1 @@
Reduce database load of [Client-Server endpoints](https://spec.matrix.org/v1.4/client-server-api/#aggregations) which return bundled aggregations.

View file

@ -20,10 +20,12 @@ import attr
from synapse.api.constants import EventTypes, RelationTypes from synapse.api.constants import EventTypes, RelationTypes
from synapse.api.errors import SynapseError from synapse.api.errors import SynapseError
from synapse.events import EventBase, relation_from_event from synapse.events import EventBase, relation_from_event
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.logging.opentracing import trace from synapse.logging.opentracing import trace
from synapse.storage.databases.main.relations import ThreadsNextBatch, _RelatedEvent from synapse.storage.databases.main.relations import ThreadsNextBatch, _RelatedEvent
from synapse.streams.config import PaginationConfig from synapse.streams.config import PaginationConfig
from synapse.types import JsonDict, Requester, UserID from synapse.types import JsonDict, Requester, UserID
from synapse.util.async_helpers import gather_results
from synapse.visibility import filter_events_for_client from synapse.visibility import filter_events_for_client
if TYPE_CHECKING: if TYPE_CHECKING:
@ -525,7 +527,8 @@ class RelationsHandler:
# (as that is what makes it part of the thread). # (as that is what makes it part of the thread).
relations_by_id[latest_thread_event.event_id] = RelationTypes.THREAD relations_by_id[latest_thread_event.event_id] = RelationTypes.THREAD
# Fetch any annotations (ie, reactions) to bundle with this event. async def _fetch_annotations() -> None:
"""Fetch any annotations (ie, reactions) to bundle with this event."""
annotations_by_event_id = await self.get_annotations_for_events( annotations_by_event_id = await self.get_annotations_for_events(
events_by_id.keys(), ignored_users=ignored_users events_by_id.keys(), ignored_users=ignored_users
) )
@ -535,7 +538,8 @@ class RelationsHandler:
"chunk": annotations "chunk": annotations
} }
# Fetch any references to bundle with this event. async def _fetch_references() -> None:
"""Fetch any references to bundle with this event."""
references_by_event_id = await self.get_references_for_events( references_by_event_id = await self.get_references_for_events(
events_by_id.keys(), ignored_users=ignored_users events_by_id.keys(), ignored_users=ignored_users
) )
@ -545,10 +549,13 @@ class RelationsHandler:
"chunk": [{"event_id": ev.event_id} for ev in references] "chunk": [{"event_id": ev.event_id} for ev in references]
} }
# Fetch any edits (but not for redacted events). async def _fetch_edits() -> None:
# """
# Note that there is no use in limiting edits by ignored users since the Fetch any edits (but not for redacted events).
# parent event should be ignored in the first place if the user is ignored.
Note that there is no use in limiting edits by ignored users since the
parent event should be ignored in the first place if the user is ignored.
"""
edits = await self._main_store.get_applicable_edits( edits = await self._main_store.get_applicable_edits(
[ [
event_id event_id
@ -559,6 +566,18 @@ class RelationsHandler:
for event_id, edit in edits.items(): for event_id, edit in edits.items():
results.setdefault(event_id, BundledAggregations()).replace = edit results.setdefault(event_id, BundledAggregations()).replace = edit
# Parallelize the calls for annotations, references, and edits since they
# are unrelated.
await make_deferred_yieldable(
gather_results(
(
run_in_background(_fetch_annotations),
run_in_background(_fetch_references),
run_in_background(_fetch_edits),
)
)
)
return results return results
async def get_threads( async def get_threads(