mirror of
https://mau.dev/maunium/synapse.git
synced 2024-12-14 11:43:51 +01:00
Sliding Sync: Add more tracing (#17514)
Spawning from looking at a couple traces and wanting a little more info. Follow-up to github.com/element-hq/synapse/pull/17501 The changes in this PR allow you to find slow Sliding Sync traces ignoring the `wait_for_events` time. In Jaeger, you can now filter for the `current_sync_for_user` operation with `RESULT.result=true` indicating that it actually returned non-empty results. If you want to find traces for your own user, you can use `RESULT.result=true ARG.sync_config.user="@madlittlemods:matrix.org"`
This commit is contained in:
parent
bef6568537
commit
1dfa59b238
6 changed files with 351 additions and 245 deletions
1
changelog.d/17514.misc
Normal file
1
changelog.d/17514.misc
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Add more tracing to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint.
|
|
@ -51,7 +51,14 @@ from synapse.api.errors import SlidingSyncUnknownPosition
|
||||||
from synapse.events import EventBase, StrippedStateEvent
|
from synapse.events import EventBase, StrippedStateEvent
|
||||||
from synapse.events.utils import parse_stripped_state_event, strip_event
|
from synapse.events.utils import parse_stripped_state_event, strip_event
|
||||||
from synapse.handlers.relations import BundledAggregations
|
from synapse.handlers.relations import BundledAggregations
|
||||||
from synapse.logging.opentracing import log_kv, start_active_span, tag_args, trace
|
from synapse.logging.opentracing import (
|
||||||
|
SynapseTags,
|
||||||
|
log_kv,
|
||||||
|
set_tag,
|
||||||
|
start_active_span,
|
||||||
|
tag_args,
|
||||||
|
trace,
|
||||||
|
)
|
||||||
from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary
|
from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary
|
||||||
from synapse.storage.databases.main.state import (
|
from synapse.storage.databases.main.state import (
|
||||||
ROOM_UNKNOWN_SENTINEL,
|
ROOM_UNKNOWN_SENTINEL,
|
||||||
|
@ -534,125 +541,144 @@ class SlidingSyncHandler:
|
||||||
# Keep track of the rooms that we can display and need to fetch more info about
|
# Keep track of the rooms that we can display and need to fetch more info about
|
||||||
relevant_room_map: Dict[str, RoomSyncConfig] = {}
|
relevant_room_map: Dict[str, RoomSyncConfig] = {}
|
||||||
if has_lists and sync_config.lists is not None:
|
if has_lists and sync_config.lists is not None:
|
||||||
sync_room_map = await self.filter_rooms_relevant_for_sync(
|
with start_active_span("assemble_sliding_window_lists"):
|
||||||
user=sync_config.user,
|
sync_room_map = await self.filter_rooms_relevant_for_sync(
|
||||||
room_membership_for_user_map=room_membership_for_user_map,
|
user=sync_config.user,
|
||||||
)
|
room_membership_for_user_map=room_membership_for_user_map,
|
||||||
|
|
||||||
for list_key, list_config in sync_config.lists.items():
|
|
||||||
# Apply filters
|
|
||||||
filtered_sync_room_map = sync_room_map
|
|
||||||
if list_config.filters is not None:
|
|
||||||
filtered_sync_room_map = await self.filter_rooms(
|
|
||||||
sync_config.user, sync_room_map, list_config.filters, to_token
|
|
||||||
)
|
|
||||||
|
|
||||||
# Sort the list
|
|
||||||
sorted_room_info = await self.sort_rooms(
|
|
||||||
filtered_sync_room_map, to_token
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# Find which rooms are partially stated and may need to be filtered out
|
for list_key, list_config in sync_config.lists.items():
|
||||||
# depending on the `required_state` requested (see below).
|
# Apply filters
|
||||||
partial_state_room_map = await self.store.is_partial_state_room_batched(
|
filtered_sync_room_map = sync_room_map
|
||||||
filtered_sync_room_map.keys()
|
if list_config.filters is not None:
|
||||||
)
|
filtered_sync_room_map = await self.filter_rooms(
|
||||||
|
sync_config.user,
|
||||||
# Since creating the `RoomSyncConfig` takes some work, let's just do it
|
sync_room_map,
|
||||||
# once and make a copy whenever we need it.
|
list_config.filters,
|
||||||
room_sync_config = RoomSyncConfig.from_room_config(list_config)
|
to_token,
|
||||||
membership_state_keys = room_sync_config.required_state_map.get(
|
|
||||||
EventTypes.Member
|
|
||||||
)
|
|
||||||
# Also see `StateFilter.must_await_full_state(...)` for comparison
|
|
||||||
lazy_loading = (
|
|
||||||
membership_state_keys is not None
|
|
||||||
and StateValues.LAZY in membership_state_keys
|
|
||||||
)
|
|
||||||
|
|
||||||
ops: List[SlidingSyncResult.SlidingWindowList.Operation] = []
|
|
||||||
if list_config.ranges:
|
|
||||||
for range in list_config.ranges:
|
|
||||||
room_ids_in_list: List[str] = []
|
|
||||||
|
|
||||||
# We're going to loop through the sorted list of rooms starting
|
|
||||||
# at the range start index and keep adding rooms until we fill
|
|
||||||
# up the range or run out of rooms.
|
|
||||||
#
|
|
||||||
# Both sides of range are inclusive so we `+ 1`
|
|
||||||
max_num_rooms = range[1] - range[0] + 1
|
|
||||||
for room_membership in sorted_room_info[range[0] :]:
|
|
||||||
room_id = room_membership.room_id
|
|
||||||
|
|
||||||
if len(room_ids_in_list) >= max_num_rooms:
|
|
||||||
break
|
|
||||||
|
|
||||||
# Exclude partially-stated rooms unless the `required_state`
|
|
||||||
# only has `["m.room.member", "$LAZY"]` for membership
|
|
||||||
# (lazy-loading room members).
|
|
||||||
if partial_state_room_map.get(room_id) and not lazy_loading:
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Take the superset of the `RoomSyncConfig` for each room.
|
|
||||||
#
|
|
||||||
# Update our `relevant_room_map` with the room we're going
|
|
||||||
# to display and need to fetch more info about.
|
|
||||||
existing_room_sync_config = relevant_room_map.get(room_id)
|
|
||||||
if existing_room_sync_config is not None:
|
|
||||||
existing_room_sync_config.combine_room_sync_config(
|
|
||||||
room_sync_config
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
# Make a copy so if we modify it later, it doesn't
|
|
||||||
# affect all references.
|
|
||||||
relevant_room_map[room_id] = (
|
|
||||||
room_sync_config.deep_copy()
|
|
||||||
)
|
|
||||||
|
|
||||||
room_ids_in_list.append(room_id)
|
|
||||||
|
|
||||||
ops.append(
|
|
||||||
SlidingSyncResult.SlidingWindowList.Operation(
|
|
||||||
op=OperationType.SYNC,
|
|
||||||
range=range,
|
|
||||||
room_ids=room_ids_in_list,
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
|
|
||||||
lists[list_key] = SlidingSyncResult.SlidingWindowList(
|
# Sort the list
|
||||||
count=len(sorted_room_info),
|
sorted_room_info = await self.sort_rooms(
|
||||||
ops=ops,
|
filtered_sync_room_map, to_token
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Find which rooms are partially stated and may need to be filtered out
|
||||||
|
# depending on the `required_state` requested (see below).
|
||||||
|
partial_state_room_map = (
|
||||||
|
await self.store.is_partial_state_room_batched(
|
||||||
|
filtered_sync_room_map.keys()
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Since creating the `RoomSyncConfig` takes some work, let's just do it
|
||||||
|
# once and make a copy whenever we need it.
|
||||||
|
room_sync_config = RoomSyncConfig.from_room_config(list_config)
|
||||||
|
membership_state_keys = room_sync_config.required_state_map.get(
|
||||||
|
EventTypes.Member
|
||||||
|
)
|
||||||
|
# Also see `StateFilter.must_await_full_state(...)` for comparison
|
||||||
|
lazy_loading = (
|
||||||
|
membership_state_keys is not None
|
||||||
|
and StateValues.LAZY in membership_state_keys
|
||||||
|
)
|
||||||
|
|
||||||
|
ops: List[SlidingSyncResult.SlidingWindowList.Operation] = []
|
||||||
|
if list_config.ranges:
|
||||||
|
for range in list_config.ranges:
|
||||||
|
room_ids_in_list: List[str] = []
|
||||||
|
|
||||||
|
# We're going to loop through the sorted list of rooms starting
|
||||||
|
# at the range start index and keep adding rooms until we fill
|
||||||
|
# up the range or run out of rooms.
|
||||||
|
#
|
||||||
|
# Both sides of range are inclusive so we `+ 1`
|
||||||
|
max_num_rooms = range[1] - range[0] + 1
|
||||||
|
for room_membership in sorted_room_info[range[0] :]:
|
||||||
|
room_id = room_membership.room_id
|
||||||
|
|
||||||
|
if len(room_ids_in_list) >= max_num_rooms:
|
||||||
|
break
|
||||||
|
|
||||||
|
# Exclude partially-stated rooms unless the `required_state`
|
||||||
|
# only has `["m.room.member", "$LAZY"]` for membership
|
||||||
|
# (lazy-loading room members).
|
||||||
|
if (
|
||||||
|
partial_state_room_map.get(room_id)
|
||||||
|
and not lazy_loading
|
||||||
|
):
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Take the superset of the `RoomSyncConfig` for each room.
|
||||||
|
#
|
||||||
|
# Update our `relevant_room_map` with the room we're going
|
||||||
|
# to display and need to fetch more info about.
|
||||||
|
existing_room_sync_config = relevant_room_map.get(
|
||||||
|
room_id
|
||||||
|
)
|
||||||
|
if existing_room_sync_config is not None:
|
||||||
|
existing_room_sync_config.combine_room_sync_config(
|
||||||
|
room_sync_config
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
# Make a copy so if we modify it later, it doesn't
|
||||||
|
# affect all references.
|
||||||
|
relevant_room_map[room_id] = (
|
||||||
|
room_sync_config.deep_copy()
|
||||||
|
)
|
||||||
|
|
||||||
|
room_ids_in_list.append(room_id)
|
||||||
|
|
||||||
|
ops.append(
|
||||||
|
SlidingSyncResult.SlidingWindowList.Operation(
|
||||||
|
op=OperationType.SYNC,
|
||||||
|
range=range,
|
||||||
|
room_ids=room_ids_in_list,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
lists[list_key] = SlidingSyncResult.SlidingWindowList(
|
||||||
|
count=len(sorted_room_info),
|
||||||
|
ops=ops,
|
||||||
|
)
|
||||||
|
|
||||||
# Handle room subscriptions
|
# Handle room subscriptions
|
||||||
if has_room_subscriptions and sync_config.room_subscriptions is not None:
|
if has_room_subscriptions and sync_config.room_subscriptions is not None:
|
||||||
for room_id, room_subscription in sync_config.room_subscriptions.items():
|
with start_active_span("assemble_room_subscriptions"):
|
||||||
room_membership_for_user_at_to_token = (
|
for (
|
||||||
await self.check_room_subscription_allowed_for_user(
|
room_id,
|
||||||
room_id=room_id,
|
room_subscription,
|
||||||
room_membership_for_user_map=room_membership_for_user_map,
|
) in sync_config.room_subscriptions.items():
|
||||||
to_token=to_token,
|
room_membership_for_user_at_to_token = (
|
||||||
|
await self.check_room_subscription_allowed_for_user(
|
||||||
|
room_id=room_id,
|
||||||
|
room_membership_for_user_map=room_membership_for_user_map,
|
||||||
|
to_token=to_token,
|
||||||
|
)
|
||||||
)
|
)
|
||||||
)
|
|
||||||
|
|
||||||
# Skip this room if the user isn't allowed to see it
|
# Skip this room if the user isn't allowed to see it
|
||||||
if not room_membership_for_user_at_to_token:
|
if not room_membership_for_user_at_to_token:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
room_membership_for_user_map[room_id] = (
|
room_membership_for_user_map[room_id] = (
|
||||||
room_membership_for_user_at_to_token
|
room_membership_for_user_at_to_token
|
||||||
)
|
)
|
||||||
|
|
||||||
# Take the superset of the `RoomSyncConfig` for each room.
|
# Take the superset of the `RoomSyncConfig` for each room.
|
||||||
#
|
#
|
||||||
# Update our `relevant_room_map` with the room we're going to display
|
# Update our `relevant_room_map` with the room we're going to display
|
||||||
# and need to fetch more info about.
|
# and need to fetch more info about.
|
||||||
room_sync_config = RoomSyncConfig.from_room_config(room_subscription)
|
room_sync_config = RoomSyncConfig.from_room_config(
|
||||||
existing_room_sync_config = relevant_room_map.get(room_id)
|
room_subscription
|
||||||
if existing_room_sync_config is not None:
|
)
|
||||||
existing_room_sync_config.combine_room_sync_config(room_sync_config)
|
existing_room_sync_config = relevant_room_map.get(room_id)
|
||||||
else:
|
if existing_room_sync_config is not None:
|
||||||
relevant_room_map[room_id] = room_sync_config
|
existing_room_sync_config.combine_room_sync_config(
|
||||||
|
room_sync_config
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
relevant_room_map[room_id] = room_sync_config
|
||||||
|
|
||||||
# Fetch room data
|
# Fetch room data
|
||||||
rooms: Dict[str, SlidingSyncResult.RoomResult] = {}
|
rooms: Dict[str, SlidingSyncResult.RoomResult] = {}
|
||||||
|
@ -661,48 +687,49 @@ class SlidingSyncHandler:
|
||||||
# previously.
|
# previously.
|
||||||
# Keep track of the rooms that we're going to display and need to fetch more info about
|
# Keep track of the rooms that we're going to display and need to fetch more info about
|
||||||
relevant_rooms_to_send_map = relevant_room_map
|
relevant_rooms_to_send_map = relevant_room_map
|
||||||
if from_token:
|
with start_active_span("filter_relevant_rooms_to_send"):
|
||||||
rooms_should_send = set()
|
if from_token:
|
||||||
|
rooms_should_send = set()
|
||||||
|
|
||||||
# First we check if there are rooms that match a list/room
|
# First we check if there are rooms that match a list/room
|
||||||
# subscription and have updates we need to send (i.e. either because
|
# subscription and have updates we need to send (i.e. either because
|
||||||
# we haven't sent the room down, or we have but there are missing
|
# we haven't sent the room down, or we have but there are missing
|
||||||
# updates).
|
# updates).
|
||||||
for room_id in relevant_room_map:
|
for room_id in relevant_room_map:
|
||||||
status = await self.connection_store.have_sent_room(
|
status = await self.connection_store.have_sent_room(
|
||||||
sync_config,
|
sync_config,
|
||||||
from_token.connection_position,
|
from_token.connection_position,
|
||||||
room_id,
|
room_id,
|
||||||
|
)
|
||||||
|
if (
|
||||||
|
# The room was never sent down before so the client needs to know
|
||||||
|
# about it regardless of any updates.
|
||||||
|
status.status == HaveSentRoomFlag.NEVER
|
||||||
|
# `PREVIOUSLY` literally means the "room was sent down before *AND*
|
||||||
|
# there are updates we haven't sent down" so we already know this
|
||||||
|
# room has updates.
|
||||||
|
or status.status == HaveSentRoomFlag.PREVIOUSLY
|
||||||
|
):
|
||||||
|
rooms_should_send.add(room_id)
|
||||||
|
elif status.status == HaveSentRoomFlag.LIVE:
|
||||||
|
# We know that we've sent all updates up until `from_token`,
|
||||||
|
# so we just need to check if there have been updates since
|
||||||
|
# then.
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
assert_never(status.status)
|
||||||
|
|
||||||
|
# We only need to check for new events since any state changes
|
||||||
|
# will also come down as new events.
|
||||||
|
rooms_that_have_updates = self.store.get_rooms_that_might_have_updates(
|
||||||
|
relevant_room_map.keys(), from_token.stream_token.room_key
|
||||||
)
|
)
|
||||||
if (
|
rooms_should_send.update(rooms_that_have_updates)
|
||||||
# The room was never sent down before so the client needs to know
|
relevant_rooms_to_send_map = {
|
||||||
# about it regardless of any updates.
|
room_id: room_sync_config
|
||||||
status.status == HaveSentRoomFlag.NEVER
|
for room_id, room_sync_config in relevant_room_map.items()
|
||||||
# `PREVIOUSLY` literally means the "room was sent down before *AND*
|
if room_id in rooms_should_send
|
||||||
# there are updates we haven't sent down" so we already know this
|
}
|
||||||
# room has updates.
|
|
||||||
or status.status == HaveSentRoomFlag.PREVIOUSLY
|
|
||||||
):
|
|
||||||
rooms_should_send.add(room_id)
|
|
||||||
elif status.status == HaveSentRoomFlag.LIVE:
|
|
||||||
# We know that we've sent all updates up until `from_token`,
|
|
||||||
# so we just need to check if there have been updates since
|
|
||||||
# then.
|
|
||||||
pass
|
|
||||||
else:
|
|
||||||
assert_never(status.status)
|
|
||||||
|
|
||||||
# We only need to check for new events since any state changes
|
|
||||||
# will also come down as new events.
|
|
||||||
rooms_that_have_updates = self.store.get_rooms_that_might_have_updates(
|
|
||||||
relevant_room_map.keys(), from_token.stream_token.room_key
|
|
||||||
)
|
|
||||||
rooms_should_send.update(rooms_that_have_updates)
|
|
||||||
relevant_rooms_to_send_map = {
|
|
||||||
room_id: room_sync_config
|
|
||||||
for room_id, room_sync_config in relevant_room_map.items()
|
|
||||||
if room_id in rooms_should_send
|
|
||||||
}
|
|
||||||
|
|
||||||
@trace
|
@trace
|
||||||
@tag_args
|
@tag_args
|
||||||
|
@ -754,13 +781,20 @@ class SlidingSyncHandler:
|
||||||
# Initial sync without a `from_token` starts at `0`
|
# Initial sync without a `from_token` starts at `0`
|
||||||
connection_position = 0
|
connection_position = 0
|
||||||
|
|
||||||
return SlidingSyncResult(
|
sliding_sync_result = SlidingSyncResult(
|
||||||
next_pos=SlidingSyncStreamToken(to_token, connection_position),
|
next_pos=SlidingSyncStreamToken(to_token, connection_position),
|
||||||
lists=lists,
|
lists=lists,
|
||||||
rooms=rooms,
|
rooms=rooms,
|
||||||
extensions=extensions,
|
extensions=extensions,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Make it easy to find traces for syncs that aren't empty
|
||||||
|
set_tag(SynapseTags.RESULT_PREFIX + "result", bool(sliding_sync_result))
|
||||||
|
set_tag(SynapseTags.FUNC_ARG_PREFIX + "sync_config.user", user_id)
|
||||||
|
|
||||||
|
return sliding_sync_result
|
||||||
|
|
||||||
|
@trace
|
||||||
async def get_room_membership_for_user_at_to_token(
|
async def get_room_membership_for_user_at_to_token(
|
||||||
self,
|
self,
|
||||||
user: UserID,
|
user: UserID,
|
||||||
|
@ -1099,6 +1133,7 @@ class SlidingSyncHandler:
|
||||||
|
|
||||||
return sync_room_id_set
|
return sync_room_id_set
|
||||||
|
|
||||||
|
@trace
|
||||||
async def filter_rooms_relevant_for_sync(
|
async def filter_rooms_relevant_for_sync(
|
||||||
self,
|
self,
|
||||||
user: UserID,
|
user: UserID,
|
||||||
|
@ -1209,6 +1244,7 @@ class SlidingSyncHandler:
|
||||||
|
|
||||||
# return None
|
# return None
|
||||||
|
|
||||||
|
@trace
|
||||||
async def _bulk_get_stripped_state_for_rooms_from_sync_room_map(
|
async def _bulk_get_stripped_state_for_rooms_from_sync_room_map(
|
||||||
self,
|
self,
|
||||||
room_ids: StrCollection,
|
room_ids: StrCollection,
|
||||||
|
@ -1299,6 +1335,7 @@ class SlidingSyncHandler:
|
||||||
|
|
||||||
return room_id_to_stripped_state_map
|
return room_id_to_stripped_state_map
|
||||||
|
|
||||||
|
@trace
|
||||||
async def _bulk_get_partial_current_state_content_for_rooms(
|
async def _bulk_get_partial_current_state_content_for_rooms(
|
||||||
self,
|
self,
|
||||||
content_type: Literal[
|
content_type: Literal[
|
||||||
|
@ -1498,125 +1535,132 @@ class SlidingSyncHandler:
|
||||||
|
|
||||||
# Filter for Direct-Message (DM) rooms
|
# Filter for Direct-Message (DM) rooms
|
||||||
if filters.is_dm is not None:
|
if filters.is_dm is not None:
|
||||||
if filters.is_dm:
|
with start_active_span("filters.is_dm"):
|
||||||
# Only DM rooms please
|
if filters.is_dm:
|
||||||
filtered_room_id_set = {
|
# Only DM rooms please
|
||||||
room_id
|
filtered_room_id_set = {
|
||||||
for room_id in filtered_room_id_set
|
room_id
|
||||||
if sync_room_map[room_id].is_dm
|
for room_id in filtered_room_id_set
|
||||||
}
|
if sync_room_map[room_id].is_dm
|
||||||
else:
|
}
|
||||||
# Only non-DM rooms please
|
else:
|
||||||
filtered_room_id_set = {
|
# Only non-DM rooms please
|
||||||
room_id
|
filtered_room_id_set = {
|
||||||
for room_id in filtered_room_id_set
|
room_id
|
||||||
if not sync_room_map[room_id].is_dm
|
for room_id in filtered_room_id_set
|
||||||
}
|
if not sync_room_map[room_id].is_dm
|
||||||
|
}
|
||||||
|
|
||||||
if filters.spaces is not None:
|
if filters.spaces is not None:
|
||||||
raise NotImplementedError()
|
with start_active_span("filters.spaces"):
|
||||||
|
raise NotImplementedError()
|
||||||
|
|
||||||
# Filter for encrypted rooms
|
# Filter for encrypted rooms
|
||||||
if filters.is_encrypted is not None:
|
if filters.is_encrypted is not None:
|
||||||
room_id_to_encryption = (
|
with start_active_span("filters.is_encrypted"):
|
||||||
await self._bulk_get_partial_current_state_content_for_rooms(
|
room_id_to_encryption = (
|
||||||
content_type="room_encryption",
|
await self._bulk_get_partial_current_state_content_for_rooms(
|
||||||
room_ids=filtered_room_id_set,
|
content_type="room_encryption",
|
||||||
to_token=to_token,
|
room_ids=filtered_room_id_set,
|
||||||
sync_room_map=sync_room_map,
|
to_token=to_token,
|
||||||
room_id_to_stripped_state_map=room_id_to_stripped_state_map,
|
sync_room_map=sync_room_map,
|
||||||
|
room_id_to_stripped_state_map=room_id_to_stripped_state_map,
|
||||||
|
)
|
||||||
)
|
)
|
||||||
)
|
|
||||||
|
|
||||||
# Make a copy so we don't run into an error: `Set changed size during
|
# Make a copy so we don't run into an error: `Set changed size during
|
||||||
# iteration`, when we filter out and remove items
|
# iteration`, when we filter out and remove items
|
||||||
for room_id in filtered_room_id_set.copy():
|
for room_id in filtered_room_id_set.copy():
|
||||||
encryption = room_id_to_encryption.get(room_id, ROOM_UNKNOWN_SENTINEL)
|
encryption = room_id_to_encryption.get(
|
||||||
|
room_id, ROOM_UNKNOWN_SENTINEL
|
||||||
|
)
|
||||||
|
|
||||||
# Just remove rooms if we can't determine their encryption status
|
# Just remove rooms if we can't determine their encryption status
|
||||||
if encryption is ROOM_UNKNOWN_SENTINEL:
|
if encryption is ROOM_UNKNOWN_SENTINEL:
|
||||||
filtered_room_id_set.remove(room_id)
|
filtered_room_id_set.remove(room_id)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# If we're looking for encrypted rooms, filter out rooms that are not
|
# If we're looking for encrypted rooms, filter out rooms that are not
|
||||||
# encrypted and vice versa
|
# encrypted and vice versa
|
||||||
is_encrypted = encryption is not None
|
is_encrypted = encryption is not None
|
||||||
if (filters.is_encrypted and not is_encrypted) or (
|
if (filters.is_encrypted and not is_encrypted) or (
|
||||||
not filters.is_encrypted and is_encrypted
|
not filters.is_encrypted and is_encrypted
|
||||||
):
|
):
|
||||||
filtered_room_id_set.remove(room_id)
|
filtered_room_id_set.remove(room_id)
|
||||||
|
|
||||||
# Filter for rooms that the user has been invited to
|
# Filter for rooms that the user has been invited to
|
||||||
if filters.is_invite is not None:
|
if filters.is_invite is not None:
|
||||||
# Make a copy so we don't run into an error: `Set changed size during
|
with start_active_span("filters.is_invite"):
|
||||||
# iteration`, when we filter out and remove items
|
# Make a copy so we don't run into an error: `Set changed size during
|
||||||
for room_id in filtered_room_id_set.copy():
|
# iteration`, when we filter out and remove items
|
||||||
room_for_user = sync_room_map[room_id]
|
for room_id in filtered_room_id_set.copy():
|
||||||
# If we're looking for invite rooms, filter out rooms that the user is
|
room_for_user = sync_room_map[room_id]
|
||||||
# not invited to and vice versa
|
# If we're looking for invite rooms, filter out rooms that the user is
|
||||||
if (
|
# not invited to and vice versa
|
||||||
filters.is_invite and room_for_user.membership != Membership.INVITE
|
if (
|
||||||
) or (
|
filters.is_invite
|
||||||
not filters.is_invite
|
and room_for_user.membership != Membership.INVITE
|
||||||
and room_for_user.membership == Membership.INVITE
|
) or (
|
||||||
):
|
not filters.is_invite
|
||||||
filtered_room_id_set.remove(room_id)
|
and room_for_user.membership == Membership.INVITE
|
||||||
|
):
|
||||||
|
filtered_room_id_set.remove(room_id)
|
||||||
|
|
||||||
# Filter by room type (space vs room, etc). A room must match one of the types
|
# Filter by room type (space vs room, etc). A room must match one of the types
|
||||||
# provided in the list. `None` is a valid type for rooms which do not have a
|
# provided in the list. `None` is a valid type for rooms which do not have a
|
||||||
# room type.
|
# room type.
|
||||||
if filters.room_types is not None or filters.not_room_types is not None:
|
if filters.room_types is not None or filters.not_room_types is not None:
|
||||||
room_id_to_type = (
|
with start_active_span("filters.room_types"):
|
||||||
await self._bulk_get_partial_current_state_content_for_rooms(
|
room_id_to_type = (
|
||||||
content_type="room_type",
|
await self._bulk_get_partial_current_state_content_for_rooms(
|
||||||
room_ids=filtered_room_id_set,
|
content_type="room_type",
|
||||||
to_token=to_token,
|
room_ids=filtered_room_id_set,
|
||||||
sync_room_map=sync_room_map,
|
to_token=to_token,
|
||||||
room_id_to_stripped_state_map=room_id_to_stripped_state_map,
|
sync_room_map=sync_room_map,
|
||||||
|
room_id_to_stripped_state_map=room_id_to_stripped_state_map,
|
||||||
|
)
|
||||||
)
|
)
|
||||||
)
|
|
||||||
|
|
||||||
# Make a copy so we don't run into an error: `Set changed size during
|
# Make a copy so we don't run into an error: `Set changed size during
|
||||||
# iteration`, when we filter out and remove items
|
# iteration`, when we filter out and remove items
|
||||||
for room_id in filtered_room_id_set.copy():
|
for room_id in filtered_room_id_set.copy():
|
||||||
room_type = room_id_to_type.get(room_id, ROOM_UNKNOWN_SENTINEL)
|
room_type = room_id_to_type.get(room_id, ROOM_UNKNOWN_SENTINEL)
|
||||||
|
|
||||||
# Just remove rooms if we can't determine their type
|
# Just remove rooms if we can't determine their type
|
||||||
if room_type is ROOM_UNKNOWN_SENTINEL:
|
if room_type is ROOM_UNKNOWN_SENTINEL:
|
||||||
filtered_room_id_set.remove(room_id)
|
filtered_room_id_set.remove(room_id)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if (
|
if (
|
||||||
filters.room_types is not None
|
filters.room_types is not None
|
||||||
and room_type not in filters.room_types
|
and room_type not in filters.room_types
|
||||||
):
|
):
|
||||||
filtered_room_id_set.remove(room_id)
|
filtered_room_id_set.remove(room_id)
|
||||||
|
|
||||||
if (
|
if (
|
||||||
filters.not_room_types is not None
|
filters.not_room_types is not None
|
||||||
and room_type in filters.not_room_types
|
and room_type in filters.not_room_types
|
||||||
):
|
):
|
||||||
filtered_room_id_set.remove(room_id)
|
filtered_room_id_set.remove(room_id)
|
||||||
|
|
||||||
if filters.room_name_like is not None:
|
if filters.room_name_like is not None:
|
||||||
# TODO: The room name is a bit more sensitive to leak than the
|
with start_active_span("filters.room_name_like"):
|
||||||
# create/encryption event. Maybe we should consider a better way to fetch
|
# TODO: The room name is a bit more sensitive to leak than the
|
||||||
# historical state before implementing this.
|
# create/encryption event. Maybe we should consider a better way to fetch
|
||||||
#
|
# historical state before implementing this.
|
||||||
# room_id_to_create_content = await self._bulk_get_partial_current_state_content_for_rooms(
|
#
|
||||||
# content_type="room_name",
|
# room_id_to_create_content = await self._bulk_get_partial_current_state_content_for_rooms(
|
||||||
# room_ids=filtered_room_id_set,
|
# content_type="room_name",
|
||||||
# to_token=to_token,
|
# room_ids=filtered_room_id_set,
|
||||||
# sync_room_map=sync_room_map,
|
# to_token=to_token,
|
||||||
# room_id_to_stripped_state_map=room_id_to_stripped_state_map,
|
# sync_room_map=sync_room_map,
|
||||||
# )
|
# room_id_to_stripped_state_map=room_id_to_stripped_state_map,
|
||||||
raise NotImplementedError()
|
# )
|
||||||
|
raise NotImplementedError()
|
||||||
|
|
||||||
if filters.tags is not None:
|
if filters.tags is not None or filters.not_tags is not None:
|
||||||
raise NotImplementedError()
|
with start_active_span("filters.tags"):
|
||||||
|
raise NotImplementedError()
|
||||||
if filters.not_tags is not None:
|
|
||||||
raise NotImplementedError()
|
|
||||||
|
|
||||||
# Assemble a new sync room map but only with the `filtered_room_id_set`
|
# Assemble a new sync room map but only with the `filtered_room_id_set`
|
||||||
return {room_id: sync_room_map[room_id] for room_id in filtered_room_id_set}
|
return {room_id: sync_room_map[room_id] for room_id in filtered_room_id_set}
|
||||||
|
@ -1678,6 +1722,7 @@ class SlidingSyncHandler:
|
||||||
reverse=True,
|
reverse=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@trace
|
||||||
async def get_current_state_ids_at(
|
async def get_current_state_ids_at(
|
||||||
self,
|
self,
|
||||||
room_id: str,
|
room_id: str,
|
||||||
|
@ -1742,6 +1787,7 @@ class SlidingSyncHandler:
|
||||||
|
|
||||||
return state_ids
|
return state_ids
|
||||||
|
|
||||||
|
@trace
|
||||||
async def get_current_state_at(
|
async def get_current_state_at(
|
||||||
self,
|
self,
|
||||||
room_id: str,
|
room_id: str,
|
||||||
|
@ -1803,6 +1849,15 @@ class SlidingSyncHandler:
|
||||||
"""
|
"""
|
||||||
user = sync_config.user
|
user = sync_config.user
|
||||||
|
|
||||||
|
set_tag(
|
||||||
|
SynapseTags.FUNC_ARG_PREFIX + "membership",
|
||||||
|
room_membership_for_user_at_to_token.membership,
|
||||||
|
)
|
||||||
|
set_tag(
|
||||||
|
SynapseTags.FUNC_ARG_PREFIX + "timeline_limit",
|
||||||
|
room_sync_config.timeline_limit,
|
||||||
|
)
|
||||||
|
|
||||||
# Determine whether we should limit the timeline to the token range.
|
# Determine whether we should limit the timeline to the token range.
|
||||||
#
|
#
|
||||||
# We should return historical messages (before token range) in the
|
# We should return historical messages (before token range) in the
|
||||||
|
@ -2070,6 +2125,10 @@ class SlidingSyncHandler:
|
||||||
if StateValues.WILDCARD in room_sync_config.required_state_map.get(
|
if StateValues.WILDCARD in room_sync_config.required_state_map.get(
|
||||||
StateValues.WILDCARD, set()
|
StateValues.WILDCARD, set()
|
||||||
):
|
):
|
||||||
|
set_tag(
|
||||||
|
SynapseTags.FUNC_ARG_PREFIX + "required_state_wildcard",
|
||||||
|
True,
|
||||||
|
)
|
||||||
required_state_filter = StateFilter.all()
|
required_state_filter = StateFilter.all()
|
||||||
# TODO: `StateFilter` currently doesn't support wildcard event types. We're
|
# TODO: `StateFilter` currently doesn't support wildcard event types. We're
|
||||||
# currently working around this by returning all state to the client but it
|
# currently working around this by returning all state to the client but it
|
||||||
|
@ -2079,6 +2138,10 @@ class SlidingSyncHandler:
|
||||||
room_sync_config.required_state_map.get(StateValues.WILDCARD)
|
room_sync_config.required_state_map.get(StateValues.WILDCARD)
|
||||||
is not None
|
is not None
|
||||||
):
|
):
|
||||||
|
set_tag(
|
||||||
|
SynapseTags.FUNC_ARG_PREFIX + "required_state_wildcard_event_type",
|
||||||
|
True,
|
||||||
|
)
|
||||||
required_state_filter = StateFilter.all()
|
required_state_filter = StateFilter.all()
|
||||||
else:
|
else:
|
||||||
required_state_types: List[Tuple[str, Optional[str]]] = []
|
required_state_types: List[Tuple[str, Optional[str]]] = []
|
||||||
|
@ -2086,8 +2149,12 @@ class SlidingSyncHandler:
|
||||||
state_type,
|
state_type,
|
||||||
state_key_set,
|
state_key_set,
|
||||||
) in room_sync_config.required_state_map.items():
|
) in room_sync_config.required_state_map.items():
|
||||||
|
num_wild_state_keys = 0
|
||||||
|
lazy_load_room_members = False
|
||||||
|
num_others = 0
|
||||||
for state_key in state_key_set:
|
for state_key in state_key_set:
|
||||||
if state_key == StateValues.WILDCARD:
|
if state_key == StateValues.WILDCARD:
|
||||||
|
num_wild_state_keys += 1
|
||||||
# `None` is a wildcard in the `StateFilter`
|
# `None` is a wildcard in the `StateFilter`
|
||||||
required_state_types.append((state_type, None))
|
required_state_types.append((state_type, None))
|
||||||
# We need to fetch all relevant people when we're lazy-loading membership
|
# We need to fetch all relevant people when we're lazy-loading membership
|
||||||
|
@ -2095,6 +2162,7 @@ class SlidingSyncHandler:
|
||||||
state_type == EventTypes.Member
|
state_type == EventTypes.Member
|
||||||
and state_key == StateValues.LAZY
|
and state_key == StateValues.LAZY
|
||||||
):
|
):
|
||||||
|
lazy_load_room_members = True
|
||||||
# Everyone in the timeline is relevant
|
# Everyone in the timeline is relevant
|
||||||
timeline_membership: Set[str] = set()
|
timeline_membership: Set[str] = set()
|
||||||
if timeline_events is not None:
|
if timeline_events is not None:
|
||||||
|
@ -2109,10 +2177,26 @@ class SlidingSyncHandler:
|
||||||
# FIXME: We probably also care about invite, ban, kick, targets, etc
|
# FIXME: We probably also care about invite, ban, kick, targets, etc
|
||||||
# but the spec only mentions "senders".
|
# but the spec only mentions "senders".
|
||||||
elif state_key == StateValues.ME:
|
elif state_key == StateValues.ME:
|
||||||
|
num_others += 1
|
||||||
required_state_types.append((state_type, user.to_string()))
|
required_state_types.append((state_type, user.to_string()))
|
||||||
else:
|
else:
|
||||||
|
num_others += 1
|
||||||
required_state_types.append((state_type, state_key))
|
required_state_types.append((state_type, state_key))
|
||||||
|
|
||||||
|
set_tag(
|
||||||
|
SynapseTags.FUNC_ARG_PREFIX
|
||||||
|
+ "required_state_wildcard_state_key_count",
|
||||||
|
num_wild_state_keys,
|
||||||
|
)
|
||||||
|
set_tag(
|
||||||
|
SynapseTags.FUNC_ARG_PREFIX + "required_state_lazy",
|
||||||
|
lazy_load_room_members,
|
||||||
|
)
|
||||||
|
set_tag(
|
||||||
|
SynapseTags.FUNC_ARG_PREFIX + "required_state_other_count",
|
||||||
|
num_others,
|
||||||
|
)
|
||||||
|
|
||||||
required_state_filter = StateFilter.from_types(required_state_types)
|
required_state_filter = StateFilter.from_types(required_state_types)
|
||||||
|
|
||||||
# We need this base set of info for the response so let's just fetch it along
|
# We need this base set of info for the response so let's just fetch it along
|
||||||
|
@ -2208,6 +2292,8 @@ class SlidingSyncHandler:
|
||||||
if new_bump_event_pos.stream > 0:
|
if new_bump_event_pos.stream > 0:
|
||||||
bump_stamp = new_bump_event_pos.stream
|
bump_stamp = new_bump_event_pos.stream
|
||||||
|
|
||||||
|
set_tag(SynapseTags.RESULT_PREFIX + "initial", initial)
|
||||||
|
|
||||||
return SlidingSyncResult.RoomResult(
|
return SlidingSyncResult.RoomResult(
|
||||||
name=room_name,
|
name=room_name,
|
||||||
avatar=room_avatar,
|
avatar=room_avatar,
|
||||||
|
@ -2863,6 +2949,7 @@ class SlidingSyncConnectionStore:
|
||||||
|
|
||||||
return room_status
|
return room_status
|
||||||
|
|
||||||
|
@trace
|
||||||
async def record_rooms(
|
async def record_rooms(
|
||||||
self,
|
self,
|
||||||
sync_config: SlidingSyncConfig,
|
sync_config: SlidingSyncConfig,
|
||||||
|
@ -2938,6 +3025,7 @@ class SlidingSyncConnectionStore:
|
||||||
|
|
||||||
return new_store_token
|
return new_store_token
|
||||||
|
|
||||||
|
@trace
|
||||||
async def mark_token_seen(
|
async def mark_token_seen(
|
||||||
self,
|
self,
|
||||||
sync_config: SlidingSyncConfig,
|
sync_config: SlidingSyncConfig,
|
||||||
|
|
|
@ -899,6 +899,9 @@ class SlidingSyncRestServlet(RestServlet):
|
||||||
body = parse_and_validate_json_object_from_request(request, SlidingSyncBody)
|
body = parse_and_validate_json_object_from_request(request, SlidingSyncBody)
|
||||||
|
|
||||||
# Tag and log useful data to differentiate requests.
|
# Tag and log useful data to differentiate requests.
|
||||||
|
set_tag(
|
||||||
|
"sliding_sync.sync_type", "initial" if from_token is None else "incremental"
|
||||||
|
)
|
||||||
set_tag("sliding_sync.conn_id", body.conn_id or "")
|
set_tag("sliding_sync.conn_id", body.conn_id or "")
|
||||||
log_kv(
|
log_kv(
|
||||||
{
|
{
|
||||||
|
@ -912,6 +915,12 @@ class SlidingSyncRestServlet(RestServlet):
|
||||||
"sliding_sync.room_subscriptions": list(
|
"sliding_sync.room_subscriptions": list(
|
||||||
(body.room_subscriptions or {}).keys()
|
(body.room_subscriptions or {}).keys()
|
||||||
),
|
),
|
||||||
|
# We also include the number of room subscriptions because logs are
|
||||||
|
# limited to 1024 characters and the large room ID list above can be cut
|
||||||
|
# off.
|
||||||
|
"sliding_sync.num_room_subscriptions": len(
|
||||||
|
(body.room_subscriptions or {}).keys()
|
||||||
|
),
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -39,6 +39,7 @@ from typing import (
|
||||||
import attr
|
import attr
|
||||||
|
|
||||||
from synapse.api.constants import EventTypes, Membership
|
from synapse.api.constants import EventTypes, Membership
|
||||||
|
from synapse.logging.opentracing import trace
|
||||||
from synapse.metrics import LaterGauge
|
from synapse.metrics import LaterGauge
|
||||||
from synapse.metrics.background_process_metrics import wrap_as_background_process
|
from synapse.metrics.background_process_metrics import wrap_as_background_process
|
||||||
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
|
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
|
||||||
|
@ -422,6 +423,7 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
|
||||||
return invite
|
return invite
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
@trace
|
||||||
async def get_rooms_for_local_user_where_membership_is(
|
async def get_rooms_for_local_user_where_membership_is(
|
||||||
self,
|
self,
|
||||||
user_id: str,
|
user_id: str,
|
||||||
|
|
|
@ -24,6 +24,7 @@ from typing import List, Optional, Tuple
|
||||||
|
|
||||||
import attr
|
import attr
|
||||||
|
|
||||||
|
from synapse.logging.opentracing import trace
|
||||||
from synapse.storage._base import SQLBaseStore
|
from synapse.storage._base import SQLBaseStore
|
||||||
from synapse.storage.database import LoggingTransaction
|
from synapse.storage.database import LoggingTransaction
|
||||||
from synapse.storage.databases.main.stream import _filter_results_by_stream
|
from synapse.storage.databases.main.stream import _filter_results_by_stream
|
||||||
|
@ -159,6 +160,7 @@ class StateDeltasStore(SQLBaseStore):
|
||||||
self._get_max_stream_id_in_current_state_deltas_txn,
|
self._get_max_stream_id_in_current_state_deltas_txn,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@trace
|
||||||
async def get_current_state_deltas_for_room(
|
async def get_current_state_deltas_for_room(
|
||||||
self, room_id: str, from_token: RoomStreamToken, to_token: RoomStreamToken
|
self, room_id: str, from_token: RoomStreamToken, to_token: RoomStreamToken
|
||||||
) -> List[StateDelta]:
|
) -> List[StateDelta]:
|
||||||
|
|
|
@ -67,7 +67,7 @@ from synapse.api.constants import Direction, EventTypes, Membership
|
||||||
from synapse.api.filtering import Filter
|
from synapse.api.filtering import Filter
|
||||||
from synapse.events import EventBase
|
from synapse.events import EventBase
|
||||||
from synapse.logging.context import make_deferred_yieldable, run_in_background
|
from synapse.logging.context import make_deferred_yieldable, run_in_background
|
||||||
from synapse.logging.opentracing import trace
|
from synapse.logging.opentracing import tag_args, trace
|
||||||
from synapse.storage._base import SQLBaseStore
|
from synapse.storage._base import SQLBaseStore
|
||||||
from synapse.storage.database import (
|
from synapse.storage.database import (
|
||||||
DatabasePool,
|
DatabasePool,
|
||||||
|
@ -812,6 +812,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
|
|
||||||
return ret, key
|
return ret, key
|
||||||
|
|
||||||
|
@trace
|
||||||
async def get_current_state_delta_membership_changes_for_user(
|
async def get_current_state_delta_membership_changes_for_user(
|
||||||
self,
|
self,
|
||||||
user_id: str,
|
user_id: str,
|
||||||
|
@ -1186,6 +1187,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
@trace
|
||||||
async def get_last_event_pos_in_room_before_stream_ordering(
|
async def get_last_event_pos_in_room_before_stream_ordering(
|
||||||
self,
|
self,
|
||||||
room_id: str,
|
room_id: str,
|
||||||
|
@ -1940,6 +1942,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
return rows, next_token
|
return rows, next_token
|
||||||
|
|
||||||
@trace
|
@trace
|
||||||
|
@tag_args
|
||||||
async def paginate_room_events(
|
async def paginate_room_events(
|
||||||
self,
|
self,
|
||||||
room_id: str,
|
room_id: str,
|
||||||
|
@ -2105,6 +2108,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
@trace
|
||||||
def get_rooms_that_might_have_updates(
|
def get_rooms_that_might_have_updates(
|
||||||
self, room_ids: StrCollection, from_token: RoomStreamToken
|
self, room_ids: StrCollection, from_token: RoomStreamToken
|
||||||
) -> StrCollection:
|
) -> StrCollection:
|
||||||
|
|
Loading…
Reference in a new issue