forked from MirrorHub/synapse
Underscore-prefix private fields in FederationEventHandler
(#10746)
This commit is contained in:
parent
287108fb2e
commit
f30c9745ab
2 changed files with 74 additions and 71 deletions
1
changelog.d/10746.misc
Normal file
1
changelog.d/10746.misc
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Clean up some of the federation event authentication code for clarity.
|
|
@ -124,28 +124,28 @@ class FederationEventHandler:
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, hs: "HomeServer"):
|
def __init__(self, hs: "HomeServer"):
|
||||||
self.store = hs.get_datastore()
|
self._store = hs.get_datastore()
|
||||||
self.storage = hs.get_storage()
|
self._storage = hs.get_storage()
|
||||||
self.state_store = self.storage.state
|
self._state_store = self._storage.state
|
||||||
|
|
||||||
self.state_handler = hs.get_state_handler()
|
self._state_handler = hs.get_state_handler()
|
||||||
self.event_creation_handler = hs.get_event_creation_handler()
|
self._event_creation_handler = hs.get_event_creation_handler()
|
||||||
self._event_auth_handler = hs.get_event_auth_handler()
|
self._event_auth_handler = hs.get_event_auth_handler()
|
||||||
self._message_handler = hs.get_message_handler()
|
self._message_handler = hs.get_message_handler()
|
||||||
self.action_generator = hs.get_action_generator()
|
self._action_generator = hs.get_action_generator()
|
||||||
self._state_resolution_handler = hs.get_state_resolution_handler()
|
self._state_resolution_handler = hs.get_state_resolution_handler()
|
||||||
# avoid a circular dependency by deferring execution here
|
# avoid a circular dependency by deferring execution here
|
||||||
self._get_room_member_handler = hs.get_room_member_handler
|
self._get_room_member_handler = hs.get_room_member_handler
|
||||||
|
|
||||||
self.federation_client = hs.get_federation_client()
|
self._federation_client = hs.get_federation_client()
|
||||||
self.third_party_event_rules = hs.get_third_party_event_rules()
|
self._third_party_event_rules = hs.get_third_party_event_rules()
|
||||||
self._notifier = hs.get_notifier()
|
self._notifier = hs.get_notifier()
|
||||||
|
|
||||||
self.is_mine_id = hs.is_mine_id
|
self._is_mine_id = hs.is_mine_id
|
||||||
self._server_name = hs.hostname
|
self._server_name = hs.hostname
|
||||||
self._instance_name = hs.get_instance_name()
|
self._instance_name = hs.get_instance_name()
|
||||||
|
|
||||||
self.config = hs.config
|
self._config = hs.config
|
||||||
self._ephemeral_messages_enabled = hs.config.server.enable_ephemeral_messages
|
self._ephemeral_messages_enabled = hs.config.server.enable_ephemeral_messages
|
||||||
|
|
||||||
self._send_events = ReplicationFederationSendEventsRestServlet.make_client(hs)
|
self._send_events = ReplicationFederationSendEventsRestServlet.make_client(hs)
|
||||||
|
@ -177,7 +177,7 @@ class FederationEventHandler:
|
||||||
event_id = pdu.event_id
|
event_id = pdu.event_id
|
||||||
|
|
||||||
# We reprocess pdus when we have seen them only as outliers
|
# We reprocess pdus when we have seen them only as outliers
|
||||||
existing = await self.store.get_event(
|
existing = await self._store.get_event(
|
||||||
event_id, allow_none=True, allow_rejected=True
|
event_id, allow_none=True, allow_rejected=True
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -240,7 +240,7 @@ class FederationEventHandler:
|
||||||
# - Fetching state if we have a hole in the graph
|
# - Fetching state if we have a hole in the graph
|
||||||
if not pdu.internal_metadata.is_outlier():
|
if not pdu.internal_metadata.is_outlier():
|
||||||
prevs = set(pdu.prev_event_ids())
|
prevs = set(pdu.prev_event_ids())
|
||||||
seen = await self.store.have_events_in_timeline(prevs)
|
seen = await self._store.have_events_in_timeline(prevs)
|
||||||
missing_prevs = prevs - seen
|
missing_prevs = prevs - seen
|
||||||
|
|
||||||
if missing_prevs:
|
if missing_prevs:
|
||||||
|
@ -274,7 +274,7 @@ class FederationEventHandler:
|
||||||
|
|
||||||
# Update the set of things we've seen after trying to
|
# Update the set of things we've seen after trying to
|
||||||
# fetch the missing stuff
|
# fetch the missing stuff
|
||||||
seen = await self.store.have_events_in_timeline(prevs)
|
seen = await self._store.have_events_in_timeline(prevs)
|
||||||
missing_prevs = prevs - seen
|
missing_prevs = prevs - seen
|
||||||
|
|
||||||
if not missing_prevs:
|
if not missing_prevs:
|
||||||
|
@ -363,7 +363,7 @@ class FederationEventHandler:
|
||||||
# the room, so we send it on their behalf.
|
# the room, so we send it on their behalf.
|
||||||
event.internal_metadata.send_on_behalf_of = origin
|
event.internal_metadata.send_on_behalf_of = origin
|
||||||
|
|
||||||
context = await self.state_handler.compute_event_context(event)
|
context = await self._state_handler.compute_event_context(event)
|
||||||
context = await self._check_event_auth(origin, event, context)
|
context = await self._check_event_auth(origin, event, context)
|
||||||
if context.rejected:
|
if context.rejected:
|
||||||
raise SynapseError(
|
raise SynapseError(
|
||||||
|
@ -377,7 +377,7 @@ class FederationEventHandler:
|
||||||
# for knock events, we run the third-party event rules. It's not entirely clear
|
# for knock events, we run the third-party event rules. It's not entirely clear
|
||||||
# why we don't do this for other sorts of membership events.
|
# why we don't do this for other sorts of membership events.
|
||||||
if event.membership == Membership.KNOCK:
|
if event.membership == Membership.KNOCK:
|
||||||
event_allowed, _ = await self.third_party_event_rules.check_event_allowed(
|
event_allowed, _ = await self._third_party_event_rules.check_event_allowed(
|
||||||
event, context
|
event, context
|
||||||
)
|
)
|
||||||
if not event_allowed:
|
if not event_allowed:
|
||||||
|
@ -406,7 +406,7 @@ class FederationEventHandler:
|
||||||
prev_member_event_id = prev_state_ids.get((EventTypes.Member, user_id), None)
|
prev_member_event_id = prev_state_ids.get((EventTypes.Member, user_id), None)
|
||||||
prev_member_event = None
|
prev_member_event = None
|
||||||
if prev_member_event_id:
|
if prev_member_event_id:
|
||||||
prev_member_event = await self.store.get_event(prev_member_event_id)
|
prev_member_event = await self._store.get_event(prev_member_event_id)
|
||||||
|
|
||||||
# Check if the member should be allowed access via membership in a space.
|
# Check if the member should be allowed access via membership in a space.
|
||||||
await self._event_auth_handler.check_restricted_join_rules(
|
await self._event_auth_handler.check_restricted_join_rules(
|
||||||
|
@ -439,7 +439,7 @@ class FederationEventHandler:
|
||||||
if dest == self._server_name:
|
if dest == self._server_name:
|
||||||
raise SynapseError(400, "Can't backfill from self.")
|
raise SynapseError(400, "Can't backfill from self.")
|
||||||
|
|
||||||
events = await self.federation_client.backfill(
|
events = await self._federation_client.backfill(
|
||||||
dest, room_id, limit=limit, extremities=extremities
|
dest, room_id, limit=limit, extremities=extremities
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -471,12 +471,12 @@ class FederationEventHandler:
|
||||||
room_id = pdu.room_id
|
room_id = pdu.room_id
|
||||||
event_id = pdu.event_id
|
event_id = pdu.event_id
|
||||||
|
|
||||||
seen = await self.store.have_events_in_timeline(prevs)
|
seen = await self._store.have_events_in_timeline(prevs)
|
||||||
|
|
||||||
if not prevs - seen:
|
if not prevs - seen:
|
||||||
return
|
return
|
||||||
|
|
||||||
latest_list = await self.store.get_latest_event_ids_in_room(room_id)
|
latest_list = await self._store.get_latest_event_ids_in_room(room_id)
|
||||||
|
|
||||||
# We add the prev events that we have seen to the latest
|
# We add the prev events that we have seen to the latest
|
||||||
# list to ensure the remote server doesn't give them to us
|
# list to ensure the remote server doesn't give them to us
|
||||||
|
@ -538,7 +538,7 @@ class FederationEventHandler:
|
||||||
# All that said: Let's try increasing the timeout to 60s and see what happens.
|
# All that said: Let's try increasing the timeout to 60s and see what happens.
|
||||||
|
|
||||||
try:
|
try:
|
||||||
missing_events = await self.federation_client.get_missing_events(
|
missing_events = await self._federation_client.get_missing_events(
|
||||||
origin,
|
origin,
|
||||||
room_id,
|
room_id,
|
||||||
earliest_events_ids=list(latest),
|
earliest_events_ids=list(latest),
|
||||||
|
@ -611,7 +611,7 @@ class FederationEventHandler:
|
||||||
|
|
||||||
event_id = event.event_id
|
event_id = event.event_id
|
||||||
|
|
||||||
existing = await self.store.get_event(
|
existing = await self._store.get_event(
|
||||||
event_id, allow_none=True, allow_rejected=True
|
event_id, allow_none=True, allow_rejected=True
|
||||||
)
|
)
|
||||||
if existing:
|
if existing:
|
||||||
|
@ -676,7 +676,7 @@ class FederationEventHandler:
|
||||||
event_id = event.event_id
|
event_id = event.event_id
|
||||||
|
|
||||||
prevs = set(event.prev_event_ids())
|
prevs = set(event.prev_event_ids())
|
||||||
seen = await self.store.have_events_in_timeline(prevs)
|
seen = await self._store.have_events_in_timeline(prevs)
|
||||||
missing_prevs = prevs - seen
|
missing_prevs = prevs - seen
|
||||||
|
|
||||||
if not missing_prevs:
|
if not missing_prevs:
|
||||||
|
@ -693,7 +693,7 @@ class FederationEventHandler:
|
||||||
event_map = {event_id: event}
|
event_map = {event_id: event}
|
||||||
try:
|
try:
|
||||||
# Get the state of the events we know about
|
# Get the state of the events we know about
|
||||||
ours = await self.state_store.get_state_groups_ids(room_id, seen)
|
ours = await self._state_store.get_state_groups_ids(room_id, seen)
|
||||||
|
|
||||||
# state_maps is a list of mappings from (type, state_key) to event_id
|
# state_maps is a list of mappings from (type, state_key) to event_id
|
||||||
state_maps: List[StateMap[str]] = list(ours.values())
|
state_maps: List[StateMap[str]] = list(ours.values())
|
||||||
|
@ -722,13 +722,13 @@ class FederationEventHandler:
|
||||||
for x in remote_state:
|
for x in remote_state:
|
||||||
event_map[x.event_id] = x
|
event_map[x.event_id] = x
|
||||||
|
|
||||||
room_version = await self.store.get_room_version_id(room_id)
|
room_version = await self._store.get_room_version_id(room_id)
|
||||||
state_map = await self._state_resolution_handler.resolve_events_with_store(
|
state_map = await self._state_resolution_handler.resolve_events_with_store(
|
||||||
room_id,
|
room_id,
|
||||||
room_version,
|
room_version,
|
||||||
state_maps,
|
state_maps,
|
||||||
event_map,
|
event_map,
|
||||||
state_res_store=StateResolutionStore(self.store),
|
state_res_store=StateResolutionStore(self._store),
|
||||||
)
|
)
|
||||||
|
|
||||||
# We need to give _process_received_pdu the actual state events
|
# We need to give _process_received_pdu the actual state events
|
||||||
|
@ -736,7 +736,7 @@ class FederationEventHandler:
|
||||||
|
|
||||||
# First though we need to fetch all the events that are in
|
# First though we need to fetch all the events that are in
|
||||||
# state_map, so we can build up the state below.
|
# state_map, so we can build up the state below.
|
||||||
evs = await self.store.get_events(
|
evs = await self._store.get_events(
|
||||||
list(state_map.values()),
|
list(state_map.values()),
|
||||||
get_prev_content=False,
|
get_prev_content=False,
|
||||||
redact_behaviour=EventRedactBehaviour.AS_IS,
|
redact_behaviour=EventRedactBehaviour.AS_IS,
|
||||||
|
@ -776,7 +776,7 @@ class FederationEventHandler:
|
||||||
(
|
(
|
||||||
state_event_ids,
|
state_event_ids,
|
||||||
auth_event_ids,
|
auth_event_ids,
|
||||||
) = await self.federation_client.get_room_state_ids(
|
) = await self._federation_client.get_room_state_ids(
|
||||||
destination, room_id, event_id=event_id
|
destination, room_id, event_id=event_id
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -790,7 +790,7 @@ class FederationEventHandler:
|
||||||
desired_events = set(state_event_ids)
|
desired_events = set(state_event_ids)
|
||||||
desired_events.add(event_id)
|
desired_events.add(event_id)
|
||||||
logger.debug("Fetching %i events from cache/store", len(desired_events))
|
logger.debug("Fetching %i events from cache/store", len(desired_events))
|
||||||
fetched_events = await self.store.get_events(
|
fetched_events = await self._store.get_events(
|
||||||
desired_events, allow_rejected=True
|
desired_events, allow_rejected=True
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -811,7 +811,7 @@ class FederationEventHandler:
|
||||||
|
|
||||||
missing_auth_events = set(auth_event_ids) - fetched_events.keys()
|
missing_auth_events = set(auth_event_ids) - fetched_events.keys()
|
||||||
missing_auth_events.difference_update(
|
missing_auth_events.difference_update(
|
||||||
await self.store.have_seen_events(room_id, missing_auth_events)
|
await self._store.have_seen_events(room_id, missing_auth_events)
|
||||||
)
|
)
|
||||||
logger.debug("We are also missing %i auth events", len(missing_auth_events))
|
logger.debug("We are also missing %i auth events", len(missing_auth_events))
|
||||||
|
|
||||||
|
@ -824,7 +824,7 @@ class FederationEventHandler:
|
||||||
# we need to make sure we re-load from the database to get the rejected
|
# we need to make sure we re-load from the database to get the rejected
|
||||||
# state correct.
|
# state correct.
|
||||||
fetched_events.update(
|
fetched_events.update(
|
||||||
await self.store.get_events(missing_desired_events, allow_rejected=True)
|
await self._store.get_events(missing_desired_events, allow_rejected=True)
|
||||||
)
|
)
|
||||||
|
|
||||||
# check for events which were in the wrong room.
|
# check for events which were in the wrong room.
|
||||||
|
@ -903,7 +903,7 @@ class FederationEventHandler:
|
||||||
logger.debug("Processing event: %s", event)
|
logger.debug("Processing event: %s", event)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
context = await self.state_handler.compute_event_context(
|
context = await self._state_handler.compute_event_context(
|
||||||
event, old_state=state
|
event, old_state=state
|
||||||
)
|
)
|
||||||
await self._auth_and_persist_event(
|
await self._auth_and_persist_event(
|
||||||
|
@ -921,7 +921,7 @@ class FederationEventHandler:
|
||||||
device_id = event.content.get("device_id")
|
device_id = event.content.get("device_id")
|
||||||
sender_key = event.content.get("sender_key")
|
sender_key = event.content.get("sender_key")
|
||||||
|
|
||||||
cached_devices = await self.store.get_cached_devices_for_user(event.sender)
|
cached_devices = await self._store.get_cached_devices_for_user(event.sender)
|
||||||
|
|
||||||
resync = False # Whether we should resync device lists.
|
resync = False # Whether we should resync device lists.
|
||||||
|
|
||||||
|
@ -997,10 +997,10 @@ class FederationEventHandler:
|
||||||
"""
|
"""
|
||||||
|
|
||||||
try:
|
try:
|
||||||
await self.store.mark_remote_user_device_cache_as_stale(sender)
|
await self._store.mark_remote_user_device_cache_as_stale(sender)
|
||||||
|
|
||||||
# Immediately attempt a resync in the background
|
# Immediately attempt a resync in the background
|
||||||
if self.config.worker_app:
|
if self._config.worker_app:
|
||||||
await self._user_device_resync(user_id=sender)
|
await self._user_device_resync(user_id=sender)
|
||||||
else:
|
else:
|
||||||
await self._device_list_updater.user_device_resync(sender)
|
await self._device_list_updater.user_device_resync(sender)
|
||||||
|
@ -1026,12 +1026,12 @@ class FederationEventHandler:
|
||||||
|
|
||||||
# Skip processing a marker event if the room version doesn't
|
# Skip processing a marker event if the room version doesn't
|
||||||
# support it or the event is not from the room creator.
|
# support it or the event is not from the room creator.
|
||||||
room_version = await self.store.get_room_version(marker_event.room_id)
|
room_version = await self._store.get_room_version(marker_event.room_id)
|
||||||
create_event = await self.store.get_create_event_for_room(marker_event.room_id)
|
create_event = await self._store.get_create_event_for_room(marker_event.room_id)
|
||||||
room_creator = create_event.content.get(EventContentFields.ROOM_CREATOR)
|
room_creator = create_event.content.get(EventContentFields.ROOM_CREATOR)
|
||||||
if (
|
if (
|
||||||
not room_version.msc2716_historical
|
not room_version.msc2716_historical
|
||||||
or not self.config.experimental.msc2716_enabled
|
or not self._config.experimental.msc2716_enabled
|
||||||
or marker_event.sender != room_creator
|
or marker_event.sender != room_creator
|
||||||
):
|
):
|
||||||
return
|
return
|
||||||
|
@ -1056,7 +1056,7 @@ class FederationEventHandler:
|
||||||
[insertion_event_id],
|
[insertion_event_id],
|
||||||
)
|
)
|
||||||
|
|
||||||
insertion_event = await self.store.get_event(
|
insertion_event = await self._store.get_event(
|
||||||
insertion_event_id, allow_none=True
|
insertion_event_id, allow_none=True
|
||||||
)
|
)
|
||||||
if insertion_event is None:
|
if insertion_event is None:
|
||||||
|
@ -1074,7 +1074,7 @@ class FederationEventHandler:
|
||||||
marker_event,
|
marker_event,
|
||||||
)
|
)
|
||||||
|
|
||||||
await self.store.insert_insertion_extremity(
|
await self._store.insert_insertion_extremity(
|
||||||
insertion_event_id, marker_event.room_id
|
insertion_event_id, marker_event.room_id
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -1096,14 +1096,14 @@ class FederationEventHandler:
|
||||||
Logs a warning if we can't find the given event.
|
Logs a warning if we can't find the given event.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
room_version = await self.store.get_room_version(room_id)
|
room_version = await self._store.get_room_version(room_id)
|
||||||
|
|
||||||
event_map: Dict[str, EventBase] = {}
|
event_map: Dict[str, EventBase] = {}
|
||||||
|
|
||||||
async def get_event(event_id: str):
|
async def get_event(event_id: str):
|
||||||
with nested_logging_context(event_id):
|
with nested_logging_context(event_id):
|
||||||
try:
|
try:
|
||||||
event = await self.federation_client.get_pdu(
|
event = await self._federation_client.get_pdu(
|
||||||
[destination],
|
[destination],
|
||||||
event_id,
|
event_id,
|
||||||
room_version,
|
room_version,
|
||||||
|
@ -1139,7 +1139,7 @@ class FederationEventHandler:
|
||||||
for aid in event.auth_event_ids()
|
for aid in event.auth_event_ids()
|
||||||
if aid not in event_map
|
if aid not in event_map
|
||||||
]
|
]
|
||||||
persisted_events = await self.store.get_events(
|
persisted_events = await self._store.get_events(
|
||||||
auth_events,
|
auth_events,
|
||||||
allow_rejected=True,
|
allow_rejected=True,
|
||||||
)
|
)
|
||||||
|
@ -1183,7 +1183,7 @@ class FederationEventHandler:
|
||||||
async def prep(ev_info: _NewEventInfo):
|
async def prep(ev_info: _NewEventInfo):
|
||||||
event = ev_info.event
|
event = ev_info.event
|
||||||
with nested_logging_context(suffix=event.event_id):
|
with nested_logging_context(suffix=event.event_id):
|
||||||
res = await self.state_handler.compute_event_context(event)
|
res = await self._state_handler.compute_event_context(event)
|
||||||
res = await self._check_event_auth(
|
res = await self._check_event_auth(
|
||||||
origin,
|
origin,
|
||||||
event,
|
event,
|
||||||
|
@ -1286,7 +1286,7 @@ class FederationEventHandler:
|
||||||
Returns:
|
Returns:
|
||||||
The updated context object.
|
The updated context object.
|
||||||
"""
|
"""
|
||||||
room_version = await self.store.get_room_version_id(event.room_id)
|
room_version = await self._store.get_room_version_id(event.room_id)
|
||||||
room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
|
room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
|
||||||
|
|
||||||
if claimed_auth_event_map:
|
if claimed_auth_event_map:
|
||||||
|
@ -1299,7 +1299,7 @@ class FederationEventHandler:
|
||||||
auth_events_ids = self._event_auth_handler.compute_auth_events(
|
auth_events_ids = self._event_auth_handler.compute_auth_events(
|
||||||
event, prev_state_ids, for_verification=True
|
event, prev_state_ids, for_verification=True
|
||||||
)
|
)
|
||||||
auth_events_x = await self.store.get_events(auth_events_ids)
|
auth_events_x = await self._store.get_events(auth_events_ids)
|
||||||
auth_events = {(e.type, e.state_key): e for e in auth_events_x.values()}
|
auth_events = {(e.type, e.state_key): e for e in auth_events_x.values()}
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
@ -1334,7 +1334,7 @@ class FederationEventHandler:
|
||||||
# If we are going to send this event over federation we precaclculate
|
# If we are going to send this event over federation we precaclculate
|
||||||
# the joined hosts.
|
# the joined hosts.
|
||||||
if event.internal_metadata.get_send_on_behalf_of():
|
if event.internal_metadata.get_send_on_behalf_of():
|
||||||
await self.event_creation_handler.cache_joined_hosts_for_event(
|
await self._event_creation_handler.cache_joined_hosts_for_event(
|
||||||
event, context
|
event, context
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -1348,7 +1348,7 @@ class FederationEventHandler:
|
||||||
if guest_access == GuestAccess.CAN_JOIN:
|
if guest_access == GuestAccess.CAN_JOIN:
|
||||||
return
|
return
|
||||||
|
|
||||||
current_state_map = await self.state_handler.get_current_state(event.room_id)
|
current_state_map = await self._state_handler.get_current_state(event.room_id)
|
||||||
current_state = list(current_state_map.values())
|
current_state = list(current_state_map.values())
|
||||||
await self._get_room_member_handler().kick_guest_users(current_state)
|
await self._get_room_member_handler().kick_guest_users(current_state)
|
||||||
|
|
||||||
|
@ -1374,7 +1374,7 @@ class FederationEventHandler:
|
||||||
if backfilled or event.internal_metadata.is_outlier():
|
if backfilled or event.internal_metadata.is_outlier():
|
||||||
return
|
return
|
||||||
|
|
||||||
extrem_ids_list = await self.store.get_latest_event_ids_in_room(event.room_id)
|
extrem_ids_list = await self._store.get_latest_event_ids_in_room(event.room_id)
|
||||||
extrem_ids = set(extrem_ids_list)
|
extrem_ids = set(extrem_ids_list)
|
||||||
prev_event_ids = set(event.prev_event_ids())
|
prev_event_ids = set(event.prev_event_ids())
|
||||||
|
|
||||||
|
@ -1383,7 +1383,7 @@ class FederationEventHandler:
|
||||||
# state at the event, so no point rechecking auth for soft fail.
|
# state at the event, so no point rechecking auth for soft fail.
|
||||||
return
|
return
|
||||||
|
|
||||||
room_version = await self.store.get_room_version_id(event.room_id)
|
room_version = await self._store.get_room_version_id(event.room_id)
|
||||||
room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
|
room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
|
||||||
|
|
||||||
# Calculate the "current state".
|
# Calculate the "current state".
|
||||||
|
@ -1400,19 +1400,19 @@ class FederationEventHandler:
|
||||||
# given state at the event. This should correctly handle cases
|
# given state at the event. This should correctly handle cases
|
||||||
# like bans, especially with state res v2.
|
# like bans, especially with state res v2.
|
||||||
|
|
||||||
state_sets_d = await self.state_store.get_state_groups(
|
state_sets_d = await self._state_store.get_state_groups(
|
||||||
event.room_id, extrem_ids
|
event.room_id, extrem_ids
|
||||||
)
|
)
|
||||||
state_sets: List[Iterable[EventBase]] = list(state_sets_d.values())
|
state_sets: List[Iterable[EventBase]] = list(state_sets_d.values())
|
||||||
state_sets.append(state)
|
state_sets.append(state)
|
||||||
current_states = await self.state_handler.resolve_events(
|
current_states = await self._state_handler.resolve_events(
|
||||||
room_version, state_sets, event
|
room_version, state_sets, event
|
||||||
)
|
)
|
||||||
current_state_ids: StateMap[str] = {
|
current_state_ids: StateMap[str] = {
|
||||||
k: e.event_id for k, e in current_states.items()
|
k: e.event_id for k, e in current_states.items()
|
||||||
}
|
}
|
||||||
else:
|
else:
|
||||||
current_state_ids = await self.state_handler.get_current_state_ids(
|
current_state_ids = await self._state_handler.get_current_state_ids(
|
||||||
event.room_id, latest_event_ids=extrem_ids
|
event.room_id, latest_event_ids=extrem_ids
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -1428,7 +1428,7 @@ class FederationEventHandler:
|
||||||
e for k, e in current_state_ids.items() if k in auth_types
|
e for k, e in current_state_ids.items() if k in auth_types
|
||||||
]
|
]
|
||||||
|
|
||||||
auth_events_map = await self.store.get_events(current_state_ids_list)
|
auth_events_map = await self._store.get_events(current_state_ids_list)
|
||||||
current_auth_events = {
|
current_auth_events = {
|
||||||
(e.type, e.state_key): e for e in auth_events_map.values()
|
(e.type, e.state_key): e for e in auth_events_map.values()
|
||||||
}
|
}
|
||||||
|
@ -1499,7 +1499,9 @@ class FederationEventHandler:
|
||||||
#
|
#
|
||||||
# we start by checking if they are in the store, and then try calling /event_auth/.
|
# we start by checking if they are in the store, and then try calling /event_auth/.
|
||||||
if missing_auth:
|
if missing_auth:
|
||||||
have_events = await self.store.have_seen_events(event.room_id, missing_auth)
|
have_events = await self._store.have_seen_events(
|
||||||
|
event.room_id, missing_auth
|
||||||
|
)
|
||||||
logger.debug("Events %s are in the store", have_events)
|
logger.debug("Events %s are in the store", have_events)
|
||||||
missing_auth.difference_update(have_events)
|
missing_auth.difference_update(have_events)
|
||||||
|
|
||||||
|
@ -1508,7 +1510,7 @@ class FederationEventHandler:
|
||||||
logger.info("auth_events contains unknown events: %s", missing_auth)
|
logger.info("auth_events contains unknown events: %s", missing_auth)
|
||||||
try:
|
try:
|
||||||
try:
|
try:
|
||||||
remote_auth_chain = await self.federation_client.get_event_auth(
|
remote_auth_chain = await self._federation_client.get_event_auth(
|
||||||
origin, event.room_id, event.event_id
|
origin, event.room_id, event.event_id
|
||||||
)
|
)
|
||||||
except RequestSendFailed as e1:
|
except RequestSendFailed as e1:
|
||||||
|
@ -1517,7 +1519,7 @@ class FederationEventHandler:
|
||||||
logger.info("Failed to get event auth from remote: %s", e1)
|
logger.info("Failed to get event auth from remote: %s", e1)
|
||||||
return context, auth_events
|
return context, auth_events
|
||||||
|
|
||||||
seen_remotes = await self.store.have_seen_events(
|
seen_remotes = await self._store.have_seen_events(
|
||||||
event.room_id, [e.event_id for e in remote_auth_chain]
|
event.room_id, [e.event_id for e in remote_auth_chain]
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -1543,7 +1545,7 @@ class FederationEventHandler:
|
||||||
e.event_id,
|
e.event_id,
|
||||||
)
|
)
|
||||||
missing_auth_event_context = (
|
missing_auth_event_context = (
|
||||||
await self.state_handler.compute_event_context(e)
|
await self._state_handler.compute_event_context(e)
|
||||||
)
|
)
|
||||||
await self._auth_and_persist_event(
|
await self._auth_and_persist_event(
|
||||||
origin,
|
origin,
|
||||||
|
@ -1584,7 +1586,7 @@ class FederationEventHandler:
|
||||||
|
|
||||||
# XXX: currently this checks for redactions but I'm not convinced that is
|
# XXX: currently this checks for redactions but I'm not convinced that is
|
||||||
# necessary?
|
# necessary?
|
||||||
different_events = await self.store.get_events_as_list(different_auth)
|
different_events = await self._store.get_events_as_list(different_auth)
|
||||||
|
|
||||||
for d in different_events:
|
for d in different_events:
|
||||||
if d.room_id != event.room_id:
|
if d.room_id != event.room_id:
|
||||||
|
@ -1610,8 +1612,8 @@ class FederationEventHandler:
|
||||||
remote_auth_events.update({(d.type, d.state_key): d for d in different_events})
|
remote_auth_events.update({(d.type, d.state_key): d for d in different_events})
|
||||||
remote_state = remote_auth_events.values()
|
remote_state = remote_auth_events.values()
|
||||||
|
|
||||||
room_version = await self.store.get_room_version_id(event.room_id)
|
room_version = await self._store.get_room_version_id(event.room_id)
|
||||||
new_state = await self.state_handler.resolve_events(
|
new_state = await self._state_handler.resolve_events(
|
||||||
room_version, (local_state, remote_state), event
|
room_version, (local_state, remote_state), event
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -1669,7 +1671,7 @@ class FederationEventHandler:
|
||||||
|
|
||||||
# create a new state group as a delta from the existing one.
|
# create a new state group as a delta from the existing one.
|
||||||
prev_group = context.state_group
|
prev_group = context.state_group
|
||||||
state_group = await self.state_store.store_state_group(
|
state_group = await self._state_store.store_state_group(
|
||||||
event.event_id,
|
event.event_id,
|
||||||
event.room_id,
|
event.room_id,
|
||||||
prev_group=prev_group,
|
prev_group=prev_group,
|
||||||
|
@ -1701,9 +1703,9 @@ class FederationEventHandler:
|
||||||
not event.internal_metadata.is_outlier()
|
not event.internal_metadata.is_outlier()
|
||||||
and not backfilled
|
and not backfilled
|
||||||
and not context.rejected
|
and not context.rejected
|
||||||
and (await self.store.get_min_depth(event.room_id)) <= event.depth
|
and (await self._store.get_min_depth(event.room_id)) <= event.depth
|
||||||
):
|
):
|
||||||
await self.action_generator.handle_push_actions_for_event(
|
await self._action_generator.handle_push_actions_for_event(
|
||||||
event, context
|
event, context
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -1712,7 +1714,7 @@ class FederationEventHandler:
|
||||||
)
|
)
|
||||||
except Exception:
|
except Exception:
|
||||||
run_in_background(
|
run_in_background(
|
||||||
self.store.remove_push_actions_from_staging, event.event_id
|
self._store.remove_push_actions_from_staging, event.event_id
|
||||||
)
|
)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
@ -1737,27 +1739,27 @@ class FederationEventHandler:
|
||||||
The stream ID after which all events have been persisted.
|
The stream ID after which all events have been persisted.
|
||||||
"""
|
"""
|
||||||
if not event_and_contexts:
|
if not event_and_contexts:
|
||||||
return self.store.get_current_events_token()
|
return self._store.get_current_events_token()
|
||||||
|
|
||||||
instance = self.config.worker.events_shard_config.get_instance(room_id)
|
instance = self._config.worker.events_shard_config.get_instance(room_id)
|
||||||
if instance != self._instance_name:
|
if instance != self._instance_name:
|
||||||
# Limit the number of events sent over replication. We choose 200
|
# Limit the number of events sent over replication. We choose 200
|
||||||
# here as that is what we default to in `max_request_body_size(..)`
|
# here as that is what we default to in `max_request_body_size(..)`
|
||||||
for batch in batch_iter(event_and_contexts, 200):
|
for batch in batch_iter(event_and_contexts, 200):
|
||||||
result = await self._send_events(
|
result = await self._send_events(
|
||||||
instance_name=instance,
|
instance_name=instance,
|
||||||
store=self.store,
|
store=self._store,
|
||||||
room_id=room_id,
|
room_id=room_id,
|
||||||
event_and_contexts=batch,
|
event_and_contexts=batch,
|
||||||
backfilled=backfilled,
|
backfilled=backfilled,
|
||||||
)
|
)
|
||||||
return result["max_stream_id"]
|
return result["max_stream_id"]
|
||||||
else:
|
else:
|
||||||
assert self.storage.persistence
|
assert self._storage.persistence
|
||||||
|
|
||||||
# Note that this returns the events that were persisted, which may not be
|
# Note that this returns the events that were persisted, which may not be
|
||||||
# the same as were passed in if some were deduplicated due to transaction IDs.
|
# the same as were passed in if some were deduplicated due to transaction IDs.
|
||||||
events, max_stream_token = await self.storage.persistence.persist_events(
|
events, max_stream_token = await self._storage.persistence.persist_events(
|
||||||
event_and_contexts, backfilled=backfilled
|
event_and_contexts, backfilled=backfilled
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -1791,7 +1793,7 @@ class FederationEventHandler:
|
||||||
# users
|
# users
|
||||||
if event.internal_metadata.is_outlier():
|
if event.internal_metadata.is_outlier():
|
||||||
if event.membership != Membership.INVITE:
|
if event.membership != Membership.INVITE:
|
||||||
if not self.is_mine_id(target_user_id):
|
if not self._is_mine_id(target_user_id):
|
||||||
return
|
return
|
||||||
|
|
||||||
target_user = UserID.from_string(target_user_id)
|
target_user = UserID.from_string(target_user_id)
|
||||||
|
@ -1840,4 +1842,4 @@ class FederationEventHandler:
|
||||||
raise SynapseError(HTTPStatus.BAD_REQUEST, "Too many auth_events")
|
raise SynapseError(HTTPStatus.BAD_REQUEST, "Too many auth_events")
|
||||||
|
|
||||||
async def get_min_depth_for_context(self, context: str) -> int:
|
async def get_min_depth_for_context(self, context: str) -> int:
|
||||||
return await self.store.get_min_depth(context)
|
return await self._store.get_min_depth(context)
|
||||||
|
|
Loading…
Reference in a new issue