Implement full_state incremental sync

A hopefully-complete implementation of the full_state incremental sync, as
specced at https://github.com/matrix-org/matrix-doc/pull/133.

This actually turns out to be a relatively simple modification to the initial
sync implementation.
This commit is contained in:
Richard van der Hoff 2015-10-26 18:47:18 +00:00
parent 3f0a57eb9b
commit c79c4f9b14
2 changed files with 38 additions and 19 deletions
synapse
handlers
rest/client/v2_alpha

View file

@ -113,15 +113,20 @@ class SyncHandler(BaseHandler):
self.clock = hs.get_clock() self.clock = hs.get_clock()
@defer.inlineCallbacks @defer.inlineCallbacks
def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0): def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
full_state=False):
"""Get the sync for a client if we have new data for it now. Otherwise """Get the sync for a client if we have new data for it now. Otherwise
wait for new data to arrive on the server. If the timeout expires, then wait for new data to arrive on the server. If the timeout expires, then
return an empty sync result. return an empty sync result.
Returns: Returns:
A Deferred SyncResult. A Deferred SyncResult.
""" """
if timeout == 0 or since_token is None:
result = yield self.current_sync_for_user(sync_config, since_token) if timeout == 0 or since_token is None or full_state:
# we are going to return immediately, so don't bother calling
# notifier.wait_for_events.
result = yield self.current_sync_for_user(sync_config, since_token,
full_state=full_state)
defer.returnValue(result) defer.returnValue(result)
else: else:
def current_sync_callback(before_token, after_token): def current_sync_callback(before_token, after_token):
@ -146,19 +151,24 @@ class SyncHandler(BaseHandler):
) )
defer.returnValue(result) defer.returnValue(result)
def current_sync_for_user(self, sync_config, since_token=None): def current_sync_for_user(self, sync_config, since_token=None,
full_state=False):
"""Get the sync for client needed to match what the server has now. """Get the sync for client needed to match what the server has now.
Returns: Returns:
A Deferred SyncResult. A Deferred SyncResult.
""" """
if since_token is None: if since_token is None or full_state:
return self.initial_sync(sync_config) return self.full_state_sync(sync_config, since_token)
else: else:
return self.incremental_sync_with_gap(sync_config, since_token) return self.incremental_sync_with_gap(sync_config, since_token)
@defer.inlineCallbacks @defer.inlineCallbacks
def initial_sync(self, sync_config): def full_state_sync(self, sync_config, timeline_since_token):
"""Get a sync for a client which is starting without any state """Get a sync for a client which is starting without any state.
If a 'message_since_token' is given, only timeline events which have
happened since that token will be returned.
Returns: Returns:
A Deferred SyncResult. A Deferred SyncResult.
""" """
@ -192,8 +202,12 @@ class SyncHandler(BaseHandler):
archived = [] archived = []
for event in room_list: for event in room_list:
if event.membership == Membership.JOIN: if event.membership == Membership.JOIN:
room_sync = yield self.initial_sync_for_joined_room( room_sync = yield self.full_state_sync_for_joined_room(
event.room_id, sync_config, now_token, typing_by_room room_id=event.room_id,
sync_config=sync_config,
now_token=now_token,
timeline_since_token=timeline_since_token,
typing_by_room=typing_by_room
) )
joined.append(room_sync) joined.append(room_sync)
elif event.membership == Membership.INVITE: elif event.membership == Membership.INVITE:
@ -206,11 +220,12 @@ class SyncHandler(BaseHandler):
leave_token = now_token.copy_and_replace( leave_token = now_token.copy_and_replace(
"room_key", "s%d" % (event.stream_ordering,) "room_key", "s%d" % (event.stream_ordering,)
) )
room_sync = yield self.initial_sync_for_archived_room( room_sync = yield self.full_state_sync_for_archived_room(
sync_config=sync_config, sync_config=sync_config,
room_id=event.room_id, room_id=event.room_id,
leave_event_id=event.event_id, leave_event_id=event.event_id,
leave_token=leave_token, leave_token=leave_token,
timeline_since_token=timeline_since_token,
) )
archived.append(room_sync) archived.append(room_sync)
@ -223,7 +238,8 @@ class SyncHandler(BaseHandler):
)) ))
@defer.inlineCallbacks @defer.inlineCallbacks
def initial_sync_for_joined_room(self, room_id, sync_config, now_token, def full_state_sync_for_joined_room(self, room_id, sync_config,
now_token, timeline_since_token,
typing_by_room): typing_by_room):
"""Sync a room for a client which is starting without any state """Sync a room for a client which is starting without any state
Returns: Returns:
@ -231,7 +247,7 @@ class SyncHandler(BaseHandler):
""" """
batch = yield self.load_filtered_recents( batch = yield self.load_filtered_recents(
room_id, sync_config, now_token, room_id, sync_config, now_token, since_token=timeline_since_token
) )
current_state = yield self.state_handler.get_current_state( current_state = yield self.state_handler.get_current_state(
@ -278,15 +294,16 @@ class SyncHandler(BaseHandler):
defer.returnValue((now_token, typing_by_room)) defer.returnValue((now_token, typing_by_room))
@defer.inlineCallbacks @defer.inlineCallbacks
def initial_sync_for_archived_room(self, room_id, sync_config, def full_state_sync_for_archived_room(self, room_id, sync_config,
leave_event_id, leave_token): leave_event_id, leave_token,
timeline_since_token):
"""Sync a room for a client which is starting without any state """Sync a room for a client which is starting without any state
Returns: Returns:
A Deferred JoinedSyncResult. A Deferred JoinedSyncResult.
""" """
batch = yield self.load_filtered_recents( batch = yield self.load_filtered_recents(
room_id, sync_config, leave_token, room_id, sync_config, leave_token, since_token=timeline_since_token
) )
leave_state = yield self.store.get_state_for_events( leave_state = yield self.store.get_state_for_events(

View file

@ -16,7 +16,7 @@
from twisted.internet import defer from twisted.internet import defer
from synapse.http.servlet import ( from synapse.http.servlet import (
RestServlet, parse_string, parse_integer RestServlet, parse_string, parse_integer, parse_boolean
) )
from synapse.handlers.sync import SyncConfig from synapse.handlers.sync import SyncConfig
from synapse.types import StreamToken from synapse.types import StreamToken
@ -90,6 +90,7 @@ class SyncRestServlet(RestServlet):
allowed_values=self.ALLOWED_PRESENCE allowed_values=self.ALLOWED_PRESENCE
) )
filter_id = parse_string(request, "filter", default=None) filter_id = parse_string(request, "filter", default=None)
full_state = parse_boolean(request, "full_state", default=False)
logger.info( logger.info(
"/sync: user=%r, timeout=%r, since=%r," "/sync: user=%r, timeout=%r, since=%r,"
@ -120,7 +121,8 @@ class SyncRestServlet(RestServlet):
try: try:
sync_result = yield self.sync_handler.wait_for_sync_for_user( sync_result = yield self.sync_handler.wait_for_sync_for_user(
sync_config, since_token=since_token, timeout=timeout sync_config, since_token=since_token, timeout=timeout,
full_state=full_state
) )
finally: finally:
if set_presence == "online": if set_presence == "online":