forked from MirrorHub/synapse
Simplify cache invalidation after event persist txn (#13796)
This moves all the invalidations into a single place and de-duplicates the code involved in invalidating caches for a given event by using the base class method.
This commit is contained in:
parent
f34b0bc262
commit
6b4593a80f
4 changed files with 52 additions and 119 deletions
1
changelog.d/13796.misc
Normal file
1
changelog.d/13796.misc
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Use shared methods for cache invalidation when persisting events, remove duplicate codepaths. Contributed by Nick @ Beeper (@fizzadar).
|
|
@ -91,6 +91,9 @@ class SQLBaseStore(metaclass=ABCMeta):
|
||||||
self._attempt_to_invalidate_cache(
|
self._attempt_to_invalidate_cache(
|
||||||
"get_user_in_room_with_profile", (room_id, user_id)
|
"get_user_in_room_with_profile", (room_id, user_id)
|
||||||
)
|
)
|
||||||
|
self._attempt_to_invalidate_cache(
|
||||||
|
"get_rooms_for_user_with_stream_ordering", (user_id,)
|
||||||
|
)
|
||||||
|
|
||||||
# Purge other caches based on room state.
|
# Purge other caches based on room state.
|
||||||
self._attempt_to_invalidate_cache("get_room_summary", (room_id,))
|
self._attempt_to_invalidate_cache("get_room_summary", (room_id,))
|
||||||
|
|
|
@ -223,15 +223,16 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
||||||
# process triggering the invalidation is responsible for clearing any external
|
# process triggering the invalidation is responsible for clearing any external
|
||||||
# cached objects.
|
# cached objects.
|
||||||
self._invalidate_local_get_event_cache(event_id)
|
self._invalidate_local_get_event_cache(event_id)
|
||||||
self.have_seen_event.invalidate((room_id, event_id))
|
|
||||||
|
|
||||||
self.get_latest_event_ids_in_room.invalidate((room_id,))
|
self._attempt_to_invalidate_cache("have_seen_event", (room_id, event_id))
|
||||||
|
self._attempt_to_invalidate_cache("get_latest_event_ids_in_room", (room_id,))
|
||||||
self.get_unread_event_push_actions_by_room_for_user.invalidate((room_id,))
|
self._attempt_to_invalidate_cache(
|
||||||
|
"get_unread_event_push_actions_by_room_for_user", (room_id,)
|
||||||
|
)
|
||||||
|
|
||||||
# The `_get_membership_from_event_id` is immutable, except for the
|
# The `_get_membership_from_event_id` is immutable, except for the
|
||||||
# case where we look up an event *before* persisting it.
|
# case where we look up an event *before* persisting it.
|
||||||
self._get_membership_from_event_id.invalidate((event_id,))
|
self._attempt_to_invalidate_cache("_get_membership_from_event_id", (event_id,))
|
||||||
|
|
||||||
if not backfilled:
|
if not backfilled:
|
||||||
self._events_stream_cache.entity_has_changed(room_id, stream_ordering)
|
self._events_stream_cache.entity_has_changed(room_id, stream_ordering)
|
||||||
|
@ -240,19 +241,26 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
||||||
self._invalidate_local_get_event_cache(redacts)
|
self._invalidate_local_get_event_cache(redacts)
|
||||||
# Caches which might leak edits must be invalidated for the event being
|
# Caches which might leak edits must be invalidated for the event being
|
||||||
# redacted.
|
# redacted.
|
||||||
self.get_relations_for_event.invalidate((redacts,))
|
self._attempt_to_invalidate_cache("get_relations_for_event", (redacts,))
|
||||||
self.get_applicable_edit.invalidate((redacts,))
|
self._attempt_to_invalidate_cache("get_applicable_edit", (redacts,))
|
||||||
|
|
||||||
if etype == EventTypes.Member:
|
if etype == EventTypes.Member:
|
||||||
self._membership_stream_cache.entity_has_changed(state_key, stream_ordering)
|
self._membership_stream_cache.entity_has_changed(state_key, stream_ordering)
|
||||||
self.get_invited_rooms_for_local_user.invalidate((state_key,))
|
self._attempt_to_invalidate_cache(
|
||||||
|
"get_invited_rooms_for_local_user", (state_key,)
|
||||||
|
)
|
||||||
|
|
||||||
if relates_to:
|
if relates_to:
|
||||||
self.get_relations_for_event.invalidate((relates_to,))
|
self._attempt_to_invalidate_cache("get_relations_for_event", (relates_to,))
|
||||||
self.get_aggregation_groups_for_event.invalidate((relates_to,))
|
self._attempt_to_invalidate_cache(
|
||||||
self.get_applicable_edit.invalidate((relates_to,))
|
"get_aggregation_groups_for_event", (relates_to,)
|
||||||
self.get_thread_summary.invalidate((relates_to,))
|
)
|
||||||
self.get_thread_participated.invalidate((relates_to,))
|
self._attempt_to_invalidate_cache("get_applicable_edit", (relates_to,))
|
||||||
|
self._attempt_to_invalidate_cache("get_thread_summary", (relates_to,))
|
||||||
|
self._attempt_to_invalidate_cache("get_thread_participated", (relates_to,))
|
||||||
|
self._attempt_to_invalidate_cache(
|
||||||
|
"get_mutual_event_relations_for_rel_type", (relates_to,)
|
||||||
|
)
|
||||||
|
|
||||||
async def invalidate_cache_and_stream(
|
async def invalidate_cache_and_stream(
|
||||||
self, cache_name: str, keys: Tuple[Any, ...]
|
self, cache_name: str, keys: Tuple[Any, ...]
|
||||||
|
|
|
@ -35,7 +35,7 @@ import attr
|
||||||
from prometheus_client import Counter
|
from prometheus_client import Counter
|
||||||
|
|
||||||
import synapse.metrics
|
import synapse.metrics
|
||||||
from synapse.api.constants import EventContentFields, EventTypes, RelationTypes
|
from synapse.api.constants import EventContentFields, EventTypes
|
||||||
from synapse.api.errors import Codes, SynapseError
|
from synapse.api.errors import Codes, SynapseError
|
||||||
from synapse.api.room_versions import RoomVersions
|
from synapse.api.room_versions import RoomVersions
|
||||||
from synapse.events import EventBase, relation_from_event
|
from synapse.events import EventBase, relation_from_event
|
||||||
|
@ -410,6 +410,31 @@ class PersistEventsStore:
|
||||||
assert min_stream_order
|
assert min_stream_order
|
||||||
assert max_stream_order
|
assert max_stream_order
|
||||||
|
|
||||||
|
# Once the txn completes, invalidate all of the relevant caches. Note that we do this
|
||||||
|
# up here because it captures all the events_and_contexts before any are removed.
|
||||||
|
for event, _ in events_and_contexts:
|
||||||
|
self.store.invalidate_get_event_cache_after_txn(txn, event.event_id)
|
||||||
|
if event.redacts:
|
||||||
|
self.store.invalidate_get_event_cache_after_txn(txn, event.redacts)
|
||||||
|
|
||||||
|
relates_to = None
|
||||||
|
relation = relation_from_event(event)
|
||||||
|
if relation:
|
||||||
|
relates_to = relation.parent_id
|
||||||
|
|
||||||
|
assert event.internal_metadata.stream_ordering is not None
|
||||||
|
txn.call_after(
|
||||||
|
self.store._invalidate_caches_for_event,
|
||||||
|
event.internal_metadata.stream_ordering,
|
||||||
|
event.event_id,
|
||||||
|
event.room_id,
|
||||||
|
event.type,
|
||||||
|
getattr(event, "state_key", None),
|
||||||
|
event.redacts,
|
||||||
|
relates_to,
|
||||||
|
backfilled=False,
|
||||||
|
)
|
||||||
|
|
||||||
self._update_forward_extremities_txn(
|
self._update_forward_extremities_txn(
|
||||||
txn,
|
txn,
|
||||||
new_forward_extremities=new_forward_extremities,
|
new_forward_extremities=new_forward_extremities,
|
||||||
|
@ -459,6 +484,7 @@ class PersistEventsStore:
|
||||||
|
|
||||||
# We call this last as it assumes we've inserted the events into
|
# We call this last as it assumes we've inserted the events into
|
||||||
# room_memberships, where applicable.
|
# room_memberships, where applicable.
|
||||||
|
# NB: This function invalidates all state related caches
|
||||||
self._update_current_state_txn(txn, state_delta_for_room, min_stream_order)
|
self._update_current_state_txn(txn, state_delta_for_room, min_stream_order)
|
||||||
|
|
||||||
def _persist_event_auth_chain_txn(
|
def _persist_event_auth_chain_txn(
|
||||||
|
@ -1172,13 +1198,6 @@ class PersistEventsStore:
|
||||||
)
|
)
|
||||||
|
|
||||||
# Invalidate the various caches
|
# Invalidate the various caches
|
||||||
|
|
||||||
for member in members_changed:
|
|
||||||
txn.call_after(
|
|
||||||
self.store.get_rooms_for_user_with_stream_ordering.invalidate,
|
|
||||||
(member,),
|
|
||||||
)
|
|
||||||
|
|
||||||
self.store._invalidate_state_caches_and_stream(
|
self.store._invalidate_state_caches_and_stream(
|
||||||
txn, room_id, members_changed
|
txn, room_id, members_changed
|
||||||
)
|
)
|
||||||
|
@ -1222,9 +1241,6 @@ class PersistEventsStore:
|
||||||
self.db_pool.simple_delete_txn(
|
self.db_pool.simple_delete_txn(
|
||||||
txn, table="event_forward_extremities", keyvalues={"room_id": room_id}
|
txn, table="event_forward_extremities", keyvalues={"room_id": room_id}
|
||||||
)
|
)
|
||||||
txn.call_after(
|
|
||||||
self.store.get_latest_event_ids_in_room.invalidate, (room_id,)
|
|
||||||
)
|
|
||||||
|
|
||||||
self.db_pool.simple_insert_many_txn(
|
self.db_pool.simple_insert_many_txn(
|
||||||
txn,
|
txn,
|
||||||
|
@ -1294,8 +1310,6 @@ class PersistEventsStore:
|
||||||
"""
|
"""
|
||||||
depth_updates: Dict[str, int] = {}
|
depth_updates: Dict[str, int] = {}
|
||||||
for event, context in events_and_contexts:
|
for event, context in events_and_contexts:
|
||||||
# Remove the any existing cache entries for the event_ids
|
|
||||||
self.store.invalidate_get_event_cache_after_txn(txn, event.event_id)
|
|
||||||
# Then update the `stream_ordering` position to mark the latest
|
# Then update the `stream_ordering` position to mark the latest
|
||||||
# event as the front of the room. This should not be done for
|
# event as the front of the room. This should not be done for
|
||||||
# backfilled events because backfilled events have negative
|
# backfilled events because backfilled events have negative
|
||||||
|
@ -1697,16 +1711,7 @@ class PersistEventsStore:
|
||||||
txn.async_call_after(prefill)
|
txn.async_call_after(prefill)
|
||||||
|
|
||||||
def _store_redaction(self, txn: LoggingTransaction, event: EventBase) -> None:
|
def _store_redaction(self, txn: LoggingTransaction, event: EventBase) -> None:
|
||||||
"""Invalidate the caches for the redacted event.
|
|
||||||
|
|
||||||
Note that these caches are also cleared as part of event replication in
|
|
||||||
_invalidate_caches_for_event.
|
|
||||||
"""
|
|
||||||
assert event.redacts is not None
|
assert event.redacts is not None
|
||||||
self.store.invalidate_get_event_cache_after_txn(txn, event.redacts)
|
|
||||||
txn.call_after(self.store.get_relations_for_event.invalidate, (event.redacts,))
|
|
||||||
txn.call_after(self.store.get_applicable_edit.invalidate, (event.redacts,))
|
|
||||||
|
|
||||||
self.db_pool.simple_upsert_txn(
|
self.db_pool.simple_upsert_txn(
|
||||||
txn,
|
txn,
|
||||||
table="redactions",
|
table="redactions",
|
||||||
|
@ -1807,34 +1812,6 @@ class PersistEventsStore:
|
||||||
|
|
||||||
for event in events:
|
for event in events:
|
||||||
assert event.internal_metadata.stream_ordering is not None
|
assert event.internal_metadata.stream_ordering is not None
|
||||||
txn.call_after(
|
|
||||||
self.store._membership_stream_cache.entity_has_changed,
|
|
||||||
event.state_key,
|
|
||||||
event.internal_metadata.stream_ordering,
|
|
||||||
)
|
|
||||||
txn.call_after(
|
|
||||||
self.store.get_invited_rooms_for_local_user.invalidate,
|
|
||||||
(event.state_key,),
|
|
||||||
)
|
|
||||||
txn.call_after(
|
|
||||||
self.store.get_local_users_in_room.invalidate,
|
|
||||||
(event.room_id,),
|
|
||||||
)
|
|
||||||
txn.call_after(
|
|
||||||
self.store.get_number_joined_users_in_room.invalidate,
|
|
||||||
(event.room_id,),
|
|
||||||
)
|
|
||||||
txn.call_after(
|
|
||||||
self.store.get_user_in_room_with_profile.invalidate,
|
|
||||||
(event.room_id, event.state_key),
|
|
||||||
)
|
|
||||||
|
|
||||||
# The `_get_membership_from_event_id` is immutable, except for the
|
|
||||||
# case where we look up an event *before* persisting it.
|
|
||||||
txn.call_after(
|
|
||||||
self.store._get_membership_from_event_id.invalidate,
|
|
||||||
(event.event_id,),
|
|
||||||
)
|
|
||||||
|
|
||||||
# We update the local_current_membership table only if the event is
|
# We update the local_current_membership table only if the event is
|
||||||
# "current", i.e., its something that has just happened.
|
# "current", i.e., its something that has just happened.
|
||||||
|
@ -1883,35 +1860,6 @@ class PersistEventsStore:
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
txn.call_after(
|
|
||||||
self.store.get_relations_for_event.invalidate, (relation.parent_id,)
|
|
||||||
)
|
|
||||||
txn.call_after(
|
|
||||||
self.store.get_aggregation_groups_for_event.invalidate,
|
|
||||||
(relation.parent_id,),
|
|
||||||
)
|
|
||||||
txn.call_after(
|
|
||||||
self.store.get_mutual_event_relations_for_rel_type.invalidate,
|
|
||||||
(relation.parent_id,),
|
|
||||||
)
|
|
||||||
|
|
||||||
if relation.rel_type == RelationTypes.REPLACE:
|
|
||||||
txn.call_after(
|
|
||||||
self.store.get_applicable_edit.invalidate, (relation.parent_id,)
|
|
||||||
)
|
|
||||||
|
|
||||||
if relation.rel_type == RelationTypes.THREAD:
|
|
||||||
txn.call_after(
|
|
||||||
self.store.get_thread_summary.invalidate, (relation.parent_id,)
|
|
||||||
)
|
|
||||||
# It should be safe to only invalidate the cache if the user has not
|
|
||||||
# previously participated in the thread, but that's difficult (and
|
|
||||||
# potentially error-prone) so it is always invalidated.
|
|
||||||
txn.call_after(
|
|
||||||
self.store.get_thread_participated.invalidate,
|
|
||||||
(relation.parent_id, event.sender),
|
|
||||||
)
|
|
||||||
|
|
||||||
def _handle_insertion_event(
|
def _handle_insertion_event(
|
||||||
self, txn: LoggingTransaction, event: EventBase
|
self, txn: LoggingTransaction, event: EventBase
|
||||||
) -> None:
|
) -> None:
|
||||||
|
@ -2213,28 +2161,6 @@ class PersistEventsStore:
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
room_to_event_ids: Dict[str, List[str]] = {}
|
|
||||||
for e in non_outlier_events:
|
|
||||||
room_to_event_ids.setdefault(e.room_id, []).append(e.event_id)
|
|
||||||
|
|
||||||
for room_id, event_ids in room_to_event_ids.items():
|
|
||||||
rows = self.db_pool.simple_select_many_txn(
|
|
||||||
txn,
|
|
||||||
table="event_push_actions_staging",
|
|
||||||
column="event_id",
|
|
||||||
iterable=event_ids,
|
|
||||||
keyvalues={},
|
|
||||||
retcols=("user_id",),
|
|
||||||
)
|
|
||||||
|
|
||||||
user_ids = {row["user_id"] for row in rows}
|
|
||||||
|
|
||||||
for user_id in user_ids:
|
|
||||||
txn.call_after(
|
|
||||||
self.store.get_unread_event_push_actions_by_room_for_user.invalidate,
|
|
||||||
(room_id, user_id),
|
|
||||||
)
|
|
||||||
|
|
||||||
# Now we delete the staging area for *all* events that were being
|
# Now we delete the staging area for *all* events that were being
|
||||||
# persisted.
|
# persisted.
|
||||||
txn.execute_batch(
|
txn.execute_batch(
|
||||||
|
@ -2249,11 +2175,6 @@ class PersistEventsStore:
|
||||||
def _remove_push_actions_for_event_id_txn(
|
def _remove_push_actions_for_event_id_txn(
|
||||||
self, txn: LoggingTransaction, room_id: str, event_id: str
|
self, txn: LoggingTransaction, room_id: str, event_id: str
|
||||||
) -> None:
|
) -> None:
|
||||||
# Sad that we have to blow away the cache for the whole room here
|
|
||||||
txn.call_after(
|
|
||||||
self.store.get_unread_event_push_actions_by_room_for_user.invalidate,
|
|
||||||
(room_id,),
|
|
||||||
)
|
|
||||||
txn.execute(
|
txn.execute(
|
||||||
"DELETE FROM event_push_actions WHERE room_id = ? AND event_id = ?",
|
"DELETE FROM event_push_actions WHERE room_id = ? AND event_id = ?",
|
||||||
(room_id, event_id),
|
(room_id, event_id),
|
||||||
|
|
Loading…
Reference in a new issue