Fix a bug where the joined hosts for a given event were not being properly cached (#14125)

This commit is contained in:
Shay 2022-10-12 11:01:00 -07:00 committed by GitHub
parent e6e876b9b1
commit b6baa46db0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 54 additions and 48 deletions

1
changelog.d/14125.bugfix Normal file
View file

@ -0,0 +1 @@
Fix a bug introduced in v1.69.0rc1 where the joined hosts for a given event were not being properly cached.

View file

@ -414,7 +414,9 @@ class FederationEventHandler:
# First, precalculate the joined hosts so that the federation sender doesn't # First, precalculate the joined hosts so that the federation sender doesn't
# need to. # need to.
await self._event_creation_handler.cache_joined_hosts_for_event(event, context) await self._event_creation_handler.cache_joined_hosts_for_events(
[(event, context)]
)
await self._check_for_soft_fail(event, context=context, origin=origin) await self._check_for_soft_fail(event, context=context, origin=origin)
await self._run_push_actions_and_persist_event(event, context) await self._run_push_actions_and_persist_event(event, context)

View file

@ -1390,7 +1390,7 @@ class EventCreationHandler:
extra_users=extra_users, extra_users=extra_users,
), ),
run_in_background( run_in_background(
self.cache_joined_hosts_for_event, event, context self.cache_joined_hosts_for_events, events_and_context
).addErrback( ).addErrback(
log_failure, "cache_joined_hosts_for_event failed" log_failure, "cache_joined_hosts_for_event failed"
), ),
@ -1491,62 +1491,65 @@ class EventCreationHandler:
await self.store.remove_push_actions_from_staging(event.event_id) await self.store.remove_push_actions_from_staging(event.event_id)
raise raise
async def cache_joined_hosts_for_event( async def cache_joined_hosts_for_events(
self, event: EventBase, context: EventContext self, events_and_context: List[Tuple[EventBase, EventContext]]
) -> None: ) -> None:
"""Precalculate the joined hosts at the event, when using Redis, so that """Precalculate the joined hosts at each of the given events, when using Redis, so that
external federation senders don't have to recalculate it themselves. external federation senders don't have to recalculate it themselves.
""" """
if not self._external_cache.is_enabled(): for event, _ in events_and_context:
return if not self._external_cache.is_enabled():
# If external cache is enabled we should always have this.
assert self._external_cache_joined_hosts_updates is not None
# We actually store two mappings, event ID -> prev state group,
# state group -> joined hosts, which is much more space efficient
# than event ID -> joined hosts.
#
# Note: We have to cache event ID -> prev state group, as we don't
# store that in the DB.
#
# Note: We set the state group -> joined hosts cache if it hasn't been
# set for a while, so that the expiry time is reset.
state_entry = await self.state.resolve_state_groups_for_events(
event.room_id, event_ids=event.prev_event_ids()
)
if state_entry.state_group:
await self._external_cache.set(
"event_to_prev_state_group",
event.event_id,
state_entry.state_group,
expiry_ms=60 * 60 * 1000,
)
if state_entry.state_group in self._external_cache_joined_hosts_updates:
return return
state = await state_entry.get_state( # If external cache is enabled we should always have this.
self._storage_controllers.state, StateFilter.all() assert self._external_cache_joined_hosts_updates is not None
# We actually store two mappings, event ID -> prev state group,
# state group -> joined hosts, which is much more space efficient
# than event ID -> joined hosts.
#
# Note: We have to cache event ID -> prev state group, as we don't
# store that in the DB.
#
# Note: We set the state group -> joined hosts cache if it hasn't been
# set for a while, so that the expiry time is reset.
state_entry = await self.state.resolve_state_groups_for_events(
event.room_id, event_ids=event.prev_event_ids()
) )
with opentracing.start_active_span("get_joined_hosts"):
joined_hosts = await self.store.get_joined_hosts( if state_entry.state_group:
event.room_id, state, state_entry await self._external_cache.set(
"event_to_prev_state_group",
event.event_id,
state_entry.state_group,
expiry_ms=60 * 60 * 1000,
) )
# Note that the expiry times must be larger than the expiry time in if state_entry.state_group in self._external_cache_joined_hosts_updates:
# _external_cache_joined_hosts_updates. return
await self._external_cache.set(
"get_joined_hosts",
str(state_entry.state_group),
list(joined_hosts),
expiry_ms=60 * 60 * 1000,
)
self._external_cache_joined_hosts_updates[state_entry.state_group] = None state = await state_entry.get_state(
self._storage_controllers.state, StateFilter.all()
)
with opentracing.start_active_span("get_joined_hosts"):
joined_hosts = await self.store.get_joined_hosts(
event.room_id, state, state_entry
)
# Note that the expiry times must be larger than the expiry time in
# _external_cache_joined_hosts_updates.
await self._external_cache.set(
"get_joined_hosts",
str(state_entry.state_group),
list(joined_hosts),
expiry_ms=60 * 60 * 1000,
)
self._external_cache_joined_hosts_updates[
state_entry.state_group
] = None
async def _validate_canonical_alias( async def _validate_canonical_alias(
self, self,