Add rooms that the user has left under archived in v2 sync.

This commit is contained in:
Mark Haines 2015-10-19 17:26:18 +01:00
parent b19b9535f6
commit 68b7fc3e2b
3 changed files with 161 additions and 9 deletions
synapse
handlers
rest/client/v2_alpha
storage

View file

@ -61,18 +61,37 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
return bool(self.timeline or self.state or self.ephemeral) return bool(self.timeline or self.state or self.ephemeral)
class ArchivedSyncResult(collections.namedtuple("JoinedSyncResult", [
"room_id",
"timeline",
"state",
])):
__slots__ = []
def __nonzero__(self):
"""Make the result appear empty if there are no updates. This is used
to tell if room needs to be part of the sync result.
"""
return bool(self.timeline or self.state)
class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [ class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [
"room_id", "room_id",
"invite", "invite",
])): ])):
__slots__ = [] __slots__ = []
def __nonzero__(self):
"""Invited rooms should always be reported to the client"""
return True
class SyncResult(collections.namedtuple("SyncResult", [ class SyncResult(collections.namedtuple("SyncResult", [
"next_batch", # Token for the next sync "next_batch", # Token for the next sync
"presence", # List of presence events for the user. "presence", # List of presence events for the user.
"joined", # JoinedSyncResult for each joined room. "joined", # JoinedSyncResult for each joined room.
"invited", # InvitedSyncResult for each invited room. "invited", # InvitedSyncResult for each invited room.
"archived", # ArchivedSyncResult for each archived room.
])): ])):
__slots__ = [] __slots__ = []
@ -156,11 +175,14 @@ class SyncHandler(BaseHandler):
) )
room_list = yield self.store.get_rooms_for_user_where_membership_is( room_list = yield self.store.get_rooms_for_user_where_membership_is(
user_id=sync_config.user.to_string(), user_id=sync_config.user.to_string(),
membership_list=[Membership.INVITE, Membership.JOIN] membership_list=[
Membership.INVITE, Membership.JOIN, Membership.LEAVE
]
) )
joined = [] joined = []
invited = [] invited = []
archived = []
for event in room_list: for event in room_list:
if event.membership == Membership.JOIN: if event.membership == Membership.JOIN:
room_sync = yield self.initial_sync_for_joined_room( room_sync = yield self.initial_sync_for_joined_room(
@ -173,11 +195,23 @@ class SyncHandler(BaseHandler):
room_id=event.room_id, room_id=event.room_id,
invite=invite, invite=invite,
)) ))
elif event.membership == Membership.LEAVE:
leave_token = now_token.copy_and_replace(
"room_key", "s%d" % (event.stream_ordering,)
)
room_sync = yield self.initial_sync_for_archived_room(
sync_config=sync_config,
room_id=event.room_id,
leave_event_id=event.event_id,
leave_token=leave_token,
)
archived.append(room_sync)
defer.returnValue(SyncResult( defer.returnValue(SyncResult(
presence=presence, presence=presence,
joined=joined, joined=joined,
invited=invited, invited=invited,
archived=archived,
next_batch=now_token, next_batch=now_token,
)) ))
@ -204,6 +238,28 @@ class SyncHandler(BaseHandler):
ephemeral=[], ephemeral=[],
)) ))
@defer.inlineCallbacks
def initial_sync_for_archived_room(self, room_id, sync_config,
leave_event_id, leave_token):
"""Sync a room for a client which is starting without any state
Returns:
A Deferred JoinedSyncResult.
"""
batch = yield self.load_filtered_recents(
room_id, sync_config, leave_token,
)
leave_state = yield self.store.get_state_for_events(
[leave_event_id], None
)
defer.returnValue(ArchivedSyncResult(
room_id=room_id,
timeline=batch,
state=leave_state[leave_event_id].values(),
))
@defer.inlineCallbacks @defer.inlineCallbacks
def incremental_sync_with_gap(self, sync_config, since_token): def incremental_sync_with_gap(self, sync_config, since_token):
""" Get the incremental delta needed to bring the client up to """ Get the incremental delta needed to bring the client up to
@ -257,18 +313,22 @@ class SyncHandler(BaseHandler):
) )
joined = [] joined = []
archived = []
if len(room_events) <= timeline_limit: if len(room_events) <= timeline_limit:
# There is no gap in any of the rooms. Therefore we can just # There is no gap in any of the rooms. Therefore we can just
# partition the new events by room and return them. # partition the new events by room and return them.
invite_events = [] invite_events = []
leave_events = []
events_by_room_id = {} events_by_room_id = {}
for event in room_events: for event in room_events:
events_by_room_id.setdefault(event.room_id, []).append(event) events_by_room_id.setdefault(event.room_id, []).append(event)
if event.room_id not in joined_room_ids: if event.room_id not in joined_room_ids:
if (event.type == EventTypes.Member if (event.type == EventTypes.Member
and event.membership == Membership.INVITE
and event.state_key == sync_config.user.to_string()): and event.state_key == sync_config.user.to_string()):
if event.membership == Membership.INVITE:
invite_events.append(event) invite_events.append(event)
elif event.membership == Membership.LEAVE:
leave_events.append(event)
for room_id in joined_room_ids: for room_id in joined_room_ids:
recents = events_by_room_id.get(room_id, []) recents = events_by_room_id.get(room_id, [])
@ -296,11 +356,16 @@ class SyncHandler(BaseHandler):
) )
if room_sync: if room_sync:
joined.append(room_sync) joined.append(room_sync)
else: else:
invite_events = yield self.store.get_invites_for_user( invite_events = yield self.store.get_invites_for_user(
sync_config.user.to_string() sync_config.user.to_string()
) )
leave_events = yield self.store.get_leave_events_for_user(
sync_config.user.to_string()
)
for room_id in joined_room_ids: for room_id in joined_room_ids:
room_sync = yield self.incremental_sync_with_gap_for_room( room_sync = yield self.incremental_sync_with_gap_for_room(
room_id, sync_config, since_token, now_token, room_id, sync_config, since_token, now_token,
@ -309,6 +374,12 @@ class SyncHandler(BaseHandler):
if room_sync: if room_sync:
joined.append(room_sync) joined.append(room_sync)
for leave_event in leave_events:
room_sync = yield self.incremental_sync_for_archived_room(
sync_config, leave_event, since_token
)
archived.append(room_sync)
invited = [ invited = [
InvitedSyncResult(room_id=event.room_id, invite=event) InvitedSyncResult(room_id=event.room_id, invite=event)
for event in invite_events for event in invite_events
@ -318,6 +389,7 @@ class SyncHandler(BaseHandler):
presence=presence, presence=presence,
joined=joined, joined=joined,
invited=invited, invited=invited,
archived=archived,
next_batch=now_token, next_batch=now_token,
)) ))
@ -416,6 +488,56 @@ class SyncHandler(BaseHandler):
defer.returnValue(room_sync) defer.returnValue(room_sync)
@defer.inlineCallbacks
def incremental_sync_for_archived_room(self, sync_config, leave_event,
since_token):
""" Get the incremental delta needed to bring the client up to date for
the archived room.
Returns:
A Deferred ArchivedSyncResult
"""
stream_token = yield self.store.get_stream_token_for_event(
leave_event.event_id
)
leave_token = since_token.copy_and_replace("room_key", stream_token)
batch = yield self.load_filtered_recents(
leave_event.room_id, sync_config, leave_token, since_token,
)
logging.debug("Recents %r", batch)
# TODO(mjark): This seems racy since this isn't being passed a
# token to indicate what point in the stream this is
leave_state = yield self.store.get_state_for_events(
[leave_event.event_id], None
)
state_events_at_leave = leave_state[leave_event.event_id].values()
state_at_previous_sync = yield self.get_state_at_previous_sync(
leave_event.room_id, since_token=since_token
)
state_events_delta = yield self.compute_state_delta(
since_token=since_token,
previous_state=state_at_previous_sync,
current_state=state_events_at_leave,
)
room_sync = ArchivedSyncResult(
room_id=leave_event.room_id,
timeline=batch,
state=state_events_delta,
)
logging.debug("Room sync: %r", room_sync)
defer.returnValue(room_sync)
@defer.inlineCallbacks @defer.inlineCallbacks
def get_state_at_previous_sync(self, room_id, since_token): def get_state_at_previous_sync(self, room_id, since_token):
""" Get the room state at the previous sync the client made. """ Get the room state at the previous sync the client made.

View file

@ -136,6 +136,10 @@ class SyncRestServlet(RestServlet):
sync_result.invited, filter, time_now, token_id sync_result.invited, filter, time_now, token_id
) )
archived = self.encode_archived(
sync_result.archived, filter, time_now, token_id
)
response_content = { response_content = {
"presence": self.encode_presence( "presence": self.encode_presence(
sync_result.presence, filter, time_now sync_result.presence, filter, time_now
@ -143,7 +147,7 @@ class SyncRestServlet(RestServlet):
"rooms": { "rooms": {
"joined": joined, "joined": joined,
"invited": invited, "invited": invited,
"archived": {}, "archived": archived,
}, },
"next_batch": sync_result.next_batch.to_string(), "next_batch": sync_result.next_batch.to_string(),
} }
@ -182,14 +186,20 @@ class SyncRestServlet(RestServlet):
return invited return invited
def encode_archived(self, rooms, filter, time_now, token_id):
joined = {}
for room in rooms:
joined[room.room_id] = self.encode_room(
room, filter, time_now, token_id, joined=False
)
return joined
@staticmethod @staticmethod
def encode_room(room, filter, time_now, token_id): def encode_room(room, filter, time_now, token_id, joined=True):
event_map = {} event_map = {}
state_events = filter.filter_room_state(room.state) state_events = filter.filter_room_state(room.state)
timeline_events = filter.filter_room_timeline(room.timeline.events)
ephemeral_events = filter.filter_room_ephemeral(room.ephemeral)
state_event_ids = [] state_event_ids = []
timeline_event_ids = []
for event in state_events: for event in state_events:
# TODO(mjark): Respect formatting requirements in the filter. # TODO(mjark): Respect formatting requirements in the filter.
event_map[event.event_id] = serialize_event( event_map[event.event_id] = serialize_event(
@ -198,6 +208,8 @@ class SyncRestServlet(RestServlet):
) )
state_event_ids.append(event.event_id) state_event_ids.append(event.event_id)
timeline_events = filter.filter_room_timeline(room.timeline.events)
timeline_event_ids = []
for event in timeline_events: for event in timeline_events:
# TODO(mjark): Respect formatting requirements in the filter. # TODO(mjark): Respect formatting requirements in the filter.
event_map[event.event_id] = serialize_event( event_map[event.event_id] = serialize_event(
@ -205,6 +217,7 @@ class SyncRestServlet(RestServlet):
event_format=format_event_for_client_v2_without_event_id, event_format=format_event_for_client_v2_without_event_id,
) )
timeline_event_ids.append(event.event_id) timeline_event_ids.append(event.event_id)
result = { result = {
"event_map": event_map, "event_map": event_map,
"timeline": { "timeline": {
@ -213,8 +226,12 @@ class SyncRestServlet(RestServlet):
"limited": room.timeline.limited, "limited": room.timeline.limited,
}, },
"state": {"events": state_event_ids}, "state": {"events": state_event_ids},
"ephemeral": {"events": ephemeral_events},
} }
if joined:
ephemeral_events = filter.filter_room_ephemeral(room.ephemeral)
result["ephemeral"] = {"events": ephemeral_events}
return result return result

View file

@ -124,6 +124,19 @@ class RoomMemberStore(SQLBaseStore):
invites.event_id for invite in invites invites.event_id for invite in invites
])) ]))
def get_leave_events_for_user(self, user_id):
""" Get all the leave events for a user
Args:
user_id (str): The user ID.
Returns:
A deferred list of event objects.
"""
return self.get_rooms_for_user_where_membership_is(
user_id, [Membership.LEAVE]
).addCallback(lambda leaves: self._get_events([
leave.event_id for leave in leaves
]))
def get_rooms_for_user_where_membership_is(self, user_id, membership_list): def get_rooms_for_user_where_membership_is(self, user_id, membership_list):
""" Get all the rooms for this user where the membership for this user """ Get all the rooms for this user where the membership for this user
matches one in the membership list. matches one in the membership list.