mirror of
https://mau.dev/maunium/synapse.git
synced 2024-12-14 21:03:51 +01:00
Remove get rooms for user with stream ordering (#13991)
By getting the joined rooms before the current token we avoid any reading history to confirm a user *was* in a room. We can then use any membership change events, which we already fetch during sync, to determine the final list of joined room IDs.
This commit is contained in:
parent
2b6d41ebd6
commit
0506bb100e
2 changed files with 70 additions and 80 deletions
1
changelog.d/13991.misc
Normal file
1
changelog.d/13991.misc
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Optimise queries used to get a users rooms during sync. Contributed by Nick @ Beeper (@fizzadar).
|
|
@ -1317,6 +1317,19 @@ class SyncHandler:
|
||||||
At the end, we transfer data from the `sync_result_builder` to a new `SyncResult`
|
At the end, we transfer data from the `sync_result_builder` to a new `SyncResult`
|
||||||
instance to signify that the sync calculation is complete.
|
instance to signify that the sync calculation is complete.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
user_id = sync_config.user.to_string()
|
||||||
|
app_service = self.store.get_app_service_by_user_id(user_id)
|
||||||
|
if app_service:
|
||||||
|
# We no longer support AS users using /sync directly.
|
||||||
|
# See https://github.com/matrix-org/matrix-doc/issues/1144
|
||||||
|
raise NotImplementedError()
|
||||||
|
|
||||||
|
# Note: we get the users room list *before* we get the current token, this
|
||||||
|
# avoids checking back in history if rooms are joined after the token is fetched.
|
||||||
|
token_before_rooms = self.event_sources.get_current_token()
|
||||||
|
mutable_joined_room_ids = set(await self.store.get_rooms_for_user(user_id))
|
||||||
|
|
||||||
# NB: The now_token gets changed by some of the generate_sync_* methods,
|
# NB: The now_token gets changed by some of the generate_sync_* methods,
|
||||||
# this is due to some of the underlying streams not supporting the ability
|
# this is due to some of the underlying streams not supporting the ability
|
||||||
# to query up to a given point.
|
# to query up to a given point.
|
||||||
|
@ -1324,6 +1337,57 @@ class SyncHandler:
|
||||||
now_token = self.event_sources.get_current_token()
|
now_token = self.event_sources.get_current_token()
|
||||||
log_kv({"now_token": now_token})
|
log_kv({"now_token": now_token})
|
||||||
|
|
||||||
|
# Since we fetched the users room list before the token, there's a small window
|
||||||
|
# during which membership events may have been persisted, so we fetch these now
|
||||||
|
# and modify the joined room list for any changes between the get_rooms_for_user
|
||||||
|
# call and the get_current_token call.
|
||||||
|
membership_change_events = []
|
||||||
|
if since_token:
|
||||||
|
membership_change_events = await self.store.get_membership_changes_for_user(
|
||||||
|
user_id, since_token.room_key, now_token.room_key, self.rooms_to_exclude
|
||||||
|
)
|
||||||
|
|
||||||
|
mem_last_change_by_room_id: Dict[str, EventBase] = {}
|
||||||
|
for event in membership_change_events:
|
||||||
|
mem_last_change_by_room_id[event.room_id] = event
|
||||||
|
|
||||||
|
# For the latest membership event in each room found, add/remove the room ID
|
||||||
|
# from the joined room list accordingly. In this case we only care if the
|
||||||
|
# latest change is JOIN.
|
||||||
|
|
||||||
|
for room_id, event in mem_last_change_by_room_id.items():
|
||||||
|
assert event.internal_metadata.stream_ordering
|
||||||
|
if (
|
||||||
|
event.internal_metadata.stream_ordering
|
||||||
|
< token_before_rooms.room_key.stream
|
||||||
|
):
|
||||||
|
continue
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
"User membership change between getting rooms and current token: %s %s %s",
|
||||||
|
user_id,
|
||||||
|
event.membership,
|
||||||
|
room_id,
|
||||||
|
)
|
||||||
|
# User joined a room - we have to then check the room state to ensure we
|
||||||
|
# respect any bans if there's a race between the join and ban events.
|
||||||
|
if event.membership == Membership.JOIN:
|
||||||
|
user_ids_in_room = await self.store.get_users_in_room(room_id)
|
||||||
|
if user_id in user_ids_in_room:
|
||||||
|
mutable_joined_room_ids.add(room_id)
|
||||||
|
# The user left the room, or left and was re-invited but not joined yet
|
||||||
|
else:
|
||||||
|
mutable_joined_room_ids.discard(room_id)
|
||||||
|
|
||||||
|
# Now we have our list of joined room IDs, exclude as configured and freeze
|
||||||
|
joined_room_ids = frozenset(
|
||||||
|
(
|
||||||
|
room_id
|
||||||
|
for room_id in mutable_joined_room_ids
|
||||||
|
if room_id not in self.rooms_to_exclude
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
logger.debug(
|
logger.debug(
|
||||||
"Calculating sync response for %r between %s and %s",
|
"Calculating sync response for %r between %s and %s",
|
||||||
sync_config.user,
|
sync_config.user,
|
||||||
|
@ -1331,22 +1395,13 @@ class SyncHandler:
|
||||||
now_token,
|
now_token,
|
||||||
)
|
)
|
||||||
|
|
||||||
user_id = sync_config.user.to_string()
|
|
||||||
app_service = self.store.get_app_service_by_user_id(user_id)
|
|
||||||
if app_service:
|
|
||||||
# We no longer support AS users using /sync directly.
|
|
||||||
# See https://github.com/matrix-org/matrix-doc/issues/1144
|
|
||||||
raise NotImplementedError()
|
|
||||||
else:
|
|
||||||
joined_room_ids = await self.get_rooms_for_user_at(
|
|
||||||
user_id, now_token.room_key
|
|
||||||
)
|
|
||||||
sync_result_builder = SyncResultBuilder(
|
sync_result_builder = SyncResultBuilder(
|
||||||
sync_config,
|
sync_config,
|
||||||
full_state,
|
full_state,
|
||||||
since_token=since_token,
|
since_token=since_token,
|
||||||
now_token=now_token,
|
now_token=now_token,
|
||||||
joined_room_ids=joined_room_ids,
|
joined_room_ids=joined_room_ids,
|
||||||
|
membership_change_events=membership_change_events,
|
||||||
)
|
)
|
||||||
|
|
||||||
logger.debug("Fetching account data")
|
logger.debug("Fetching account data")
|
||||||
|
@ -1827,19 +1882,12 @@ class SyncHandler:
|
||||||
|
|
||||||
Does not modify the `sync_result_builder`.
|
Does not modify the `sync_result_builder`.
|
||||||
"""
|
"""
|
||||||
user_id = sync_result_builder.sync_config.user.to_string()
|
|
||||||
since_token = sync_result_builder.since_token
|
since_token = sync_result_builder.since_token
|
||||||
now_token = sync_result_builder.now_token
|
membership_change_events = sync_result_builder.membership_change_events
|
||||||
|
|
||||||
assert since_token
|
assert since_token
|
||||||
|
|
||||||
# Get a list of membership change events that have happened to the user
|
if membership_change_events:
|
||||||
# requesting the sync.
|
|
||||||
membership_changes = await self.store.get_membership_changes_for_user(
|
|
||||||
user_id, since_token.room_key, now_token.room_key
|
|
||||||
)
|
|
||||||
|
|
||||||
if membership_changes:
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
stream_id = since_token.room_key.stream
|
stream_id = since_token.room_key.stream
|
||||||
|
@ -1878,16 +1926,10 @@ class SyncHandler:
|
||||||
since_token = sync_result_builder.since_token
|
since_token = sync_result_builder.since_token
|
||||||
now_token = sync_result_builder.now_token
|
now_token = sync_result_builder.now_token
|
||||||
sync_config = sync_result_builder.sync_config
|
sync_config = sync_result_builder.sync_config
|
||||||
|
membership_change_events = sync_result_builder.membership_change_events
|
||||||
|
|
||||||
assert since_token
|
assert since_token
|
||||||
|
|
||||||
# TODO: we've already called this function and ran this query in
|
|
||||||
# _have_rooms_changed. We could keep the results in memory to avoid a
|
|
||||||
# second query, at the cost of more complicated source code.
|
|
||||||
membership_change_events = await self.store.get_membership_changes_for_user(
|
|
||||||
user_id, since_token.room_key, now_token.room_key, self.rooms_to_exclude
|
|
||||||
)
|
|
||||||
|
|
||||||
mem_change_events_by_room_id: Dict[str, List[EventBase]] = {}
|
mem_change_events_by_room_id: Dict[str, List[EventBase]] = {}
|
||||||
for event in membership_change_events:
|
for event in membership_change_events:
|
||||||
mem_change_events_by_room_id.setdefault(event.room_id, []).append(event)
|
mem_change_events_by_room_id.setdefault(event.room_id, []).append(event)
|
||||||
|
@ -2415,60 +2457,6 @@ class SyncHandler:
|
||||||
else:
|
else:
|
||||||
raise Exception("Unrecognized rtype: %r", room_builder.rtype)
|
raise Exception("Unrecognized rtype: %r", room_builder.rtype)
|
||||||
|
|
||||||
async def get_rooms_for_user_at(
|
|
||||||
self,
|
|
||||||
user_id: str,
|
|
||||||
room_key: RoomStreamToken,
|
|
||||||
) -> FrozenSet[str]:
|
|
||||||
"""Get set of joined rooms for a user at the given stream ordering.
|
|
||||||
|
|
||||||
The stream ordering *must* be recent, otherwise this may throw an
|
|
||||||
exception if older than a month. (This function is called with the
|
|
||||||
current token, which should be perfectly fine).
|
|
||||||
|
|
||||||
Args:
|
|
||||||
user_id
|
|
||||||
stream_ordering
|
|
||||||
|
|
||||||
ReturnValue:
|
|
||||||
Set of room_ids the user is in at given stream_ordering.
|
|
||||||
"""
|
|
||||||
joined_rooms = await self.store.get_rooms_for_user_with_stream_ordering(user_id)
|
|
||||||
|
|
||||||
joined_room_ids = set()
|
|
||||||
|
|
||||||
# We need to check that the stream ordering of the join for each room
|
|
||||||
# is before the stream_ordering asked for. This might not be the case
|
|
||||||
# if the user joins a room between us getting the current token and
|
|
||||||
# calling `get_rooms_for_user_with_stream_ordering`.
|
|
||||||
# If the membership's stream ordering is after the given stream
|
|
||||||
# ordering, we need to go and work out if the user was in the room
|
|
||||||
# before.
|
|
||||||
# We also need to check whether the room should be excluded from sync
|
|
||||||
# responses as per the homeserver config.
|
|
||||||
for joined_room in joined_rooms:
|
|
||||||
if joined_room.room_id in self.rooms_to_exclude:
|
|
||||||
continue
|
|
||||||
|
|
||||||
if not joined_room.event_pos.persisted_after(room_key):
|
|
||||||
joined_room_ids.add(joined_room.room_id)
|
|
||||||
continue
|
|
||||||
|
|
||||||
logger.info("User joined room after current token: %s", joined_room.room_id)
|
|
||||||
|
|
||||||
extrems = (
|
|
||||||
await self.store.get_forward_extremities_for_room_at_stream_ordering(
|
|
||||||
joined_room.room_id, joined_room.event_pos.stream
|
|
||||||
)
|
|
||||||
)
|
|
||||||
user_ids_in_room = await self.state.get_current_user_ids_in_room(
|
|
||||||
joined_room.room_id, extrems
|
|
||||||
)
|
|
||||||
if user_id in user_ids_in_room:
|
|
||||||
joined_room_ids.add(joined_room.room_id)
|
|
||||||
|
|
||||||
return frozenset(joined_room_ids)
|
|
||||||
|
|
||||||
|
|
||||||
def _action_has_highlight(actions: List[JsonDict]) -> bool:
|
def _action_has_highlight(actions: List[JsonDict]) -> bool:
|
||||||
for action in actions:
|
for action in actions:
|
||||||
|
@ -2565,6 +2553,7 @@ class SyncResultBuilder:
|
||||||
since_token: Optional[StreamToken]
|
since_token: Optional[StreamToken]
|
||||||
now_token: StreamToken
|
now_token: StreamToken
|
||||||
joined_room_ids: FrozenSet[str]
|
joined_room_ids: FrozenSet[str]
|
||||||
|
membership_change_events: List[EventBase]
|
||||||
|
|
||||||
presence: List[UserPresenceState] = attr.Factory(list)
|
presence: List[UserPresenceState] = attr.Factory(list)
|
||||||
account_data: List[JsonDict] = attr.Factory(list)
|
account_data: List[JsonDict] = attr.Factory(list)
|
||||||
|
|
Loading…
Reference in a new issue