Start updating the sync API to match the specification

This commit is contained in:
Mark Haines 2015-10-01 17:53:07 +01:00
parent 301141515a
commit f31014b18f
3 changed files with 54 additions and 90 deletions

View file

@ -54,7 +54,7 @@ class Filtering(object):
]
room_level_definitions = [
"state", "events", "ephemeral"
"state", "timeline", "ephemeral"
]
for key in top_level_definitions:
@ -135,6 +135,9 @@ class Filter(object):
def __init__(self, filter_json):
self.filter_json = filter_json
def timeline_limit(self):
return self.filter_json.get("room", {}).get("timeline", {}).get(limit, 10)
def filter_public_user_data(self, events):
return self._filter_on_key(events, ["public_user_data"])

View file

@ -28,21 +28,26 @@ logger = logging.getLogger(__name__)
SyncConfig = collections.namedtuple("SyncConfig", [
"user",
"limit",
"gap",
"sort",
"backfill",
"filter",
])
class TimelineBatch(collections.namedtuple("TimelineBatch", [
"prev_batch",
"events",
"limited",
])):
__slots__ = []
def __nonzero__(self):
"""Make the result appear empty if there are no updates. This is used
to tell if room needs to be part of the sync result.
"""
return bool(self.events)
class RoomSyncResult(collections.namedtuple("RoomSyncResult", [
"room_id",
"limited",
"published",
"events",
"timeline",
"state",
"prev_batch",
"ephemeral",
])):
__slots__ = []
@ -51,13 +56,12 @@ class RoomSyncResult(collections.namedtuple("RoomSyncResult", [
"""Make the result appear empty if there are no updates. This is used
to tell if room needs to be part of the sync result.
"""
return bool(self.events or self.state or self.ephemeral)
return bool(self.timeline or self.state or self.ephemeral)
class SyncResult(collections.namedtuple("SyncResult", [
"next_batch", # Token for the next sync
"private_user_data", # List of private events for the user.
"public_user_data", # List of public events for all users.
"presence", # List of presence events for the user.
"rooms", # RoomSyncResult for each room.
])):
__slots__ = []
@ -133,12 +137,6 @@ class SyncHandler(BaseHandler):
Returns:
A Deferred SyncResult.
"""
if sync_config.sort == "timeline,desc":
# TODO(mjark): Handle going through events in reverse order?.
# What does "most recent events" mean when applying the limits mean
# in this case?
raise NotImplementedError()
now_token = yield self.event_sources.get_current_token()
presence_stream = self.event_sources.sources["presence"]
@ -155,20 +153,15 @@ class SyncHandler(BaseHandler):
membership_list=[Membership.INVITE, Membership.JOIN]
)
# TODO (mjark): Does public mean "published"?
published_rooms = yield self.store.get_rooms(is_public=True)
published_room_ids = set(r["room_id"] for r in published_rooms)
rooms = []
for event in room_list:
room_sync = yield self.initial_sync_for_room(
event.room_id, sync_config, now_token, published_room_ids
event.room_id, sync_config, now_token,
)
rooms.append(room_sync)
defer.returnValue(SyncResult(
public_user_data=presence,
private_user_data=[],
presence=presence,
rooms=rooms,
next_batch=now_token,
))
@ -192,7 +185,6 @@ class SyncHandler(BaseHandler):
defer.returnValue(RoomSyncResult(
room_id=room_id,
published=room_id in published_room_ids,
events=recents,
prev_batch=prev_batch_token,
state=current_state_events,
@ -219,7 +211,6 @@ class SyncHandler(BaseHandler):
presence, presence_key = yield presence_source.get_new_events_for_user(
user=sync_config.user,
from_key=since_token.presence_key,
limit=sync_config.limit,
)
now_token = now_token.copy_and_replace("presence_key", presence_key)
@ -227,7 +218,6 @@ class SyncHandler(BaseHandler):
typing, typing_key = yield typing_source.get_new_events_for_user(
user=sync_config.user,
from_key=since_token.typing_key,
limit=sync_config.limit,
)
now_token = now_token.copy_and_replace("typing_key", typing_key)
@ -252,16 +242,18 @@ class SyncHandler(BaseHandler):
published_rooms = yield self.store.get_rooms(is_public=True)
published_room_ids = set(r["room_id"] for r in published_rooms)
timeline_limit = sync_config.filter.timeline_limit()
room_events, _ = yield self.store.get_room_events_stream(
sync_config.user.to_string(),
from_key=since_token.room_key,
to_key=now_token.room_key,
room_id=None,
limit=sync_config.limit + 1,
limit=timeline_limit + 1,
)
rooms = []
if len(room_events) <= sync_config.limit:
if len(room_events) <= timeline_limit:
# There is no gap in any of the rooms. Therefore we can just
# partition the new events by room and return them.
events_by_room_id = {}
@ -365,8 +357,9 @@ class SyncHandler(BaseHandler):
max_repeat = 3 # Only try a few times per room, otherwise
room_key = now_token.room_key
end_key = room_key
timeline_limit = sync_config.filter.timeline_limit()
while limited and len(recents) < sync_config.limit and max_repeat:
while limited and len(recents) < timeline_limit and max_repeat:
events, keys = yield self.store.get_recent_events_for_room(
room_id,
limit=load_limit + 1,
@ -393,7 +386,9 @@ class SyncHandler(BaseHandler):
"room_key", room_key
)
defer.returnValue((recents, prev_batch_token, limited))
defer.returnValue(TimelineBatch(
events=recents, prev_batch=prev_batch_token, limited=limited
))
@defer.inlineCallbacks
def incremental_sync_with_gap_for_room(self, room_id, sync_config,
@ -408,7 +403,7 @@ class SyncHandler(BaseHandler):
# TODO(mjark): Check for redactions we might have missed.
recents, prev_batch_token, limited = yield self.load_filtered_recents(
batch = yield self.load_filtered_recents(
room_id, sync_config, now_token, since_token,
)
@ -437,11 +432,8 @@ class SyncHandler(BaseHandler):
room_sync = RoomSyncResult(
room_id=room_id,
published=room_id in published_room_ids,
events=recents,
prev_batch=prev_batch_token,
timeline=batch,
state=state_events_delta,
limited=limited,
ephemeral=typing_by_room.get(room_id, [])
)

View file

@ -36,47 +36,35 @@ class SyncRestServlet(RestServlet):
GET parameters::
timeout(int): How long to wait for new events in milliseconds.
limit(int): Maxiumum number of events per room to return.
gap(bool): Create gaps the message history if limit is exceeded to
ensure that the client has the most recent messages. Defaults to
"true".
sort(str,str): tuple of sort key (e.g. "timeline") and direction
(e.g. "asc", "desc"). Defaults to "timeline,asc".
since(batch_token): Batch token when asking for incremental deltas.
set_presence(str): What state the device presence should be set to.
default is "online".
backfill(bool): Should the HS request message history from other
servers. This may take a long time making it unsuitable for clients
expecting a prompt response. Defaults to "true".
filter(filter_id): A filter to apply to the events returned.
filter_*: Filter override parameters.
Response JSON::
{
"next_batch": // batch token for the next /sync
"private_user_data": // private events for this user.
"public_user_data": // public events for all users including the
// public events for this user.
"rooms": [{ // List of rooms with updates.
"room_id": // Id of the room being updated
"limited": // Was the per-room event limit exceeded?
"published": // Is the room published by our HS?
"event_map": // Map of EventID -> event JSON.
"events": { // The recent events in the room if gap is "true"
// otherwise the next events in the room.
"batch": [] // list of EventIDs in the "event_map".
"prev_batch": // back token for getting previous events.
}
"state": [] // list of EventIDs updating the current state to
// be what it should be at the end of the batch.
"ephemeral": []
"next_batch": // batch token for the next /sync
"presence": // presence data for the user.
"rooms": {
"roomlist": [{ // List of rooms with updates.
"room_id": // Id of the room being updated
"event_map": // Map of EventID -> event JSON.
"timeline": { // The recent events in the room if gap is "true"
"limited": // Was the per-room event limit exceeded?
// otherwise the next events in the room.
"batch": [] // list of EventIDs in the "event_map".
"prev_batch": // back token for getting previous events.
}
"state": [] // list of EventIDs updating the current state to
// be what it should be at the end of the batch.
"ephemeral": []
}]
}
}
"""
PATTERN = client_v2_pattern("/sync$")
ALLOWED_SORT = set(["timeline,asc", "timeline,desc"])
ALLOWED_PRESENCE = set(["online", "offline", "idle"])
ALLOWED_PRESENCE = set(["online", "offline"])
def __init__(self, hs):
super(SyncRestServlet, self).__init__()
@ -90,45 +78,29 @@ class SyncRestServlet(RestServlet):
user, token_id = yield self.auth.get_user_by_req(request)
timeout = parse_integer(request, "timeout", default=0)
limit = parse_integer(request, "limit", required=True)
gap = parse_boolean(request, "gap", default=True)
sort = parse_string(
request, "sort", default="timeline,asc",
allowed_values=self.ALLOWED_SORT
)
since = parse_string(request, "since")
set_presence = parse_string(
request, "set_presence", default="online",
allowed_values=self.ALLOWED_PRESENCE
)
backfill = parse_boolean(request, "backfill", default=False)
filter_id = parse_string(request, "filter", default=None)
logger.info(
"/sync: user=%r, timeout=%r, limit=%r, gap=%r, sort=%r, since=%r,"
" set_presence=%r, backfill=%r, filter_id=%r" % (
user, timeout, limit, gap, sort, since, set_presence,
backfill, filter_id
"/sync: user=%r, timeout=%r, since=%r,"
" set_presence=%r, filter_id=%r" % (
user, timeout, since, set_presence, filter_id
)
)
# TODO(mjark): Load filter and apply overrides.
try:
filter = yield self.filtering.get_user_filter(
user.localpart, filter_id
)
except:
filter = Filter({})
# filter = filter.apply_overrides(http_request)
# if filter.matches(event):
# # stuff
sync_config = SyncConfig(
user=user,
gap=gap,
limit=limit,
sort=sort,
backfill=backfill,
filter=filter,
)
@ -144,11 +116,8 @@ class SyncRestServlet(RestServlet):
time_now = self.clock.time_msec()
response_content = {
"public_user_data": self.encode_user_data(
sync_result.public_user_data, filter, time_now
),
"private_user_data": self.encode_user_data(
sync_result.private_user_data, filter, time_now
"presence": self.encode_user_data(
sync_result.presence, filter, time_now
),
"rooms": self.encode_rooms(
sync_result.rooms, filter, time_now, token_id