forked from MirrorHub/synapse
De-duplicate duplicate handling
move the "duplicate state event" handling down into `handle_new_client_event` where it can be shared between multiple call paths.
This commit is contained in:
parent
0991a2da93
commit
b520a1bf5a
2 changed files with 32 additions and 38 deletions
|
@ -674,22 +674,14 @@ class EventCreationHandler:
|
||||||
|
|
||||||
assert self.hs.is_mine(user), "User must be our own: %s" % (user,)
|
assert self.hs.is_mine(user), "User must be our own: %s" % (user,)
|
||||||
|
|
||||||
if event.is_state():
|
ev = await self.handle_new_client_event(
|
||||||
prev_event = await self.deduplicate_state_event(event, context)
|
|
||||||
if prev_event is not None:
|
|
||||||
logger.info(
|
|
||||||
"Not bothering to persist state event %s duplicated by %s",
|
|
||||||
event.event_id,
|
|
||||||
prev_event.event_id,
|
|
||||||
)
|
|
||||||
# we know it was persisted, so must have a stream ordering
|
|
||||||
assert prev_event.internal_metadata.stream_ordering
|
|
||||||
return prev_event.internal_metadata.stream_ordering
|
|
||||||
|
|
||||||
return await self.handle_new_client_event(
|
|
||||||
requester=requester, event=event, context=context, ratelimit=ratelimit
|
requester=requester, event=event, context=context, ratelimit=ratelimit
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# we know it was persisted, so must have a stream ordering
|
||||||
|
assert ev.internal_metadata.stream_ordering
|
||||||
|
return ev.internal_metadata.stream_ordering
|
||||||
|
|
||||||
async def deduplicate_state_event(
|
async def deduplicate_state_event(
|
||||||
self, event: EventBase, context: EventContext
|
self, event: EventBase, context: EventContext
|
||||||
) -> Optional[EventBase]:
|
) -> Optional[EventBase]:
|
||||||
|
@ -845,8 +837,10 @@ class EventCreationHandler:
|
||||||
context: EventContext,
|
context: EventContext,
|
||||||
ratelimit: bool = True,
|
ratelimit: bool = True,
|
||||||
extra_users: List[UserID] = [],
|
extra_users: List[UserID] = [],
|
||||||
) -> int:
|
) -> EventBase:
|
||||||
"""Processes a new event. This includes checking auth, persisting it,
|
"""Processes a new event.
|
||||||
|
|
||||||
|
This includes deduplicating, checking auth, persisting,
|
||||||
notifying users, sending to remote servers, etc.
|
notifying users, sending to remote servers, etc.
|
||||||
|
|
||||||
If called from a worker will hit out to the master process for final
|
If called from a worker will hit out to the master process for final
|
||||||
|
@ -860,9 +854,20 @@ class EventCreationHandler:
|
||||||
extra_users: Any extra users to notify about event
|
extra_users: Any extra users to notify about event
|
||||||
|
|
||||||
Return:
|
Return:
|
||||||
The stream_id of the persisted event.
|
If the event was deduplicated, the previous, duplicate, event. Otherwise,
|
||||||
|
`event`.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
if event.is_state():
|
||||||
|
prev_event = await self.deduplicate_state_event(event, context)
|
||||||
|
if prev_event is not None:
|
||||||
|
logger.info(
|
||||||
|
"Not bothering to persist state event %s duplicated by %s",
|
||||||
|
event.event_id,
|
||||||
|
prev_event.event_id,
|
||||||
|
)
|
||||||
|
return prev_event
|
||||||
|
|
||||||
if event.is_state() and (event.type, event.state_key) == (
|
if event.is_state() and (event.type, event.state_key) == (
|
||||||
EventTypes.Create,
|
EventTypes.Create,
|
||||||
"",
|
"",
|
||||||
|
@ -917,13 +922,13 @@ class EventCreationHandler:
|
||||||
)
|
)
|
||||||
stream_id = result["stream_id"]
|
stream_id = result["stream_id"]
|
||||||
event.internal_metadata.stream_ordering = stream_id
|
event.internal_metadata.stream_ordering = stream_id
|
||||||
return stream_id
|
return event
|
||||||
|
|
||||||
stream_id = await self.persist_and_notify_client_event(
|
stream_id = await self.persist_and_notify_client_event(
|
||||||
requester, event, context, ratelimit=ratelimit, extra_users=extra_users
|
requester, event, context, ratelimit=ratelimit, extra_users=extra_users
|
||||||
)
|
)
|
||||||
|
|
||||||
return stream_id
|
return event
|
||||||
except Exception:
|
except Exception:
|
||||||
# Ensure that we actually remove the entries in the push actions
|
# Ensure that we actually remove the entries in the push actions
|
||||||
# staging area, if we calculated them.
|
# staging area, if we calculated them.
|
||||||
|
|
|
@ -188,16 +188,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||||
require_consent=require_consent,
|
require_consent=require_consent,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Check if this event matches the previous membership event for the user.
|
|
||||||
duplicate = await self.event_creation_handler.deduplicate_state_event(
|
|
||||||
event, context
|
|
||||||
)
|
|
||||||
if duplicate is not None:
|
|
||||||
# Discard the new event since this membership change is a no-op.
|
|
||||||
# we know it was persisted, so must have a stream ordering.
|
|
||||||
assert duplicate.internal_metadata.stream_ordering
|
|
||||||
return duplicate.event_id, duplicate.internal_metadata.stream_ordering
|
|
||||||
|
|
||||||
prev_state_ids = await context.get_prev_state_ids()
|
prev_state_ids = await context.get_prev_state_ids()
|
||||||
|
|
||||||
prev_member_event_id = prev_state_ids.get((EventTypes.Member, user_id), None)
|
prev_member_event_id = prev_state_ids.get((EventTypes.Member, user_id), None)
|
||||||
|
@ -222,7 +212,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||||
retry_after_ms=int(1000 * (time_allowed - time_now_s))
|
retry_after_ms=int(1000 * (time_allowed - time_now_s))
|
||||||
)
|
)
|
||||||
|
|
||||||
stream_id = await self.event_creation_handler.handle_new_client_event(
|
result_event = await self.event_creation_handler.handle_new_client_event(
|
||||||
requester, event, context, extra_users=[target], ratelimit=ratelimit,
|
requester, event, context, extra_users=[target], ratelimit=ratelimit,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -232,7 +222,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||||
if prev_member_event.membership == Membership.JOIN:
|
if prev_member_event.membership == Membership.JOIN:
|
||||||
await self._user_left_room(target, room_id)
|
await self._user_left_room(target, room_id)
|
||||||
|
|
||||||
return event.event_id, stream_id
|
# we know it was persisted, so should have a stream ordering
|
||||||
|
assert result_event.internal_metadata.stream_ordering
|
||||||
|
return result_event.event_id, result_event.internal_metadata.stream_ordering
|
||||||
|
|
||||||
async def copy_room_tags_and_direct_to_room(
|
async def copy_room_tags_and_direct_to_room(
|
||||||
self, old_room_id, new_room_id, user_id
|
self, old_room_id, new_room_id, user_id
|
||||||
|
@ -673,12 +665,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
||||||
else:
|
else:
|
||||||
requester = types.create_requester(target_user)
|
requester = types.create_requester(target_user)
|
||||||
|
|
||||||
prev_event = await self.event_creation_handler.deduplicate_state_event(
|
|
||||||
event, context
|
|
||||||
)
|
|
||||||
if prev_event is not None:
|
|
||||||
return
|
|
||||||
|
|
||||||
prev_state_ids = await context.get_prev_state_ids()
|
prev_state_ids = await context.get_prev_state_ids()
|
||||||
if event.membership == Membership.JOIN:
|
if event.membership == Membership.JOIN:
|
||||||
if requester.is_guest:
|
if requester.is_guest:
|
||||||
|
@ -1186,10 +1172,13 @@ class RoomMemberMasterHandler(RoomMemberHandler):
|
||||||
|
|
||||||
context = await self.state_handler.compute_event_context(event)
|
context = await self.state_handler.compute_event_context(event)
|
||||||
context.app_service = requester.app_service
|
context.app_service = requester.app_service
|
||||||
stream_id = await self.event_creation_handler.handle_new_client_event(
|
result_event = await self.event_creation_handler.handle_new_client_event(
|
||||||
requester, event, context, extra_users=[UserID.from_string(target_user)],
|
requester, event, context, extra_users=[UserID.from_string(target_user)],
|
||||||
)
|
)
|
||||||
return event.event_id, stream_id
|
# we know it was persisted, so must have a stream ordering
|
||||||
|
assert result_event.internal_metadata.stream_ordering
|
||||||
|
|
||||||
|
return result_event.event_id, result_event.internal_metadata.stream_ordering
|
||||||
|
|
||||||
async def _user_left_room(self, target: UserID, room_id: str) -> None:
|
async def _user_left_room(self, target: UserID, room_id: str) -> None:
|
||||||
"""Implements RoomMemberHandler._user_left_room
|
"""Implements RoomMemberHandler._user_left_room
|
||||||
|
|
Loading…
Reference in a new issue