Change the way pagination works to support out of order events.

This commit is contained in:
Erik Johnston 2014-08-19 14:19:48 +01:00
parent 1422a22970
commit 598a1d8ff9
8 changed files with 226 additions and 81 deletions

View file

@ -20,23 +20,23 @@ class PaginationConfig(object):
"""A configuration object which stores pagination parameters.""" """A configuration object which stores pagination parameters."""
def __init__(self, from_tok=None, to_tok=None, limit=0): def __init__(self, from_tok=None, to_tok=None, direction='f', limit=0):
self.from_tok = from_tok self.from_tok = from_tok
self.to_tok = to_tok self.to_tok = to_tok
self.direction = direction
self.limit = limit self.limit = limit
@classmethod @classmethod
def from_request(cls, request, raise_invalid_params=True): def from_request(cls, request, raise_invalid_params=True):
params = { params = {
"from_tok": PaginationStream.TOK_START, "direction": 'f',
"to_tok": PaginationStream.TOK_END,
"limit": 0
} }
query_param_mappings = [ # 3-tuple of qp_key, attribute, rules query_param_mappings = [ # 3-tuple of qp_key, attribute, rules
("from", "from_tok", lambda x: type(x) == str), ("from", "from_tok", lambda x: type(x) == str),
("to", "to_tok", lambda x: type(x) == str), ("to", "to_tok", lambda x: type(x) == str),
("limit", "limit", lambda x: x.isdigit()) ("limit", "limit", lambda x: x.isdigit()),
("dir", "direction", lambda x: x == 'f' or x == 'b'),
] ]
for qp, attr, is_valid in query_param_mappings: for qp, attr, is_valid in query_param_mappings:
@ -48,12 +48,17 @@ class PaginationConfig(object):
return PaginationConfig(**params) return PaginationConfig(**params)
def __str__(self):
return (
"<PaginationConfig from_tok=%s, to_tok=%s, "
"direction=%s, limit=%s>"
) % (self.from_tok, self.to_tok, self.direction, self.limit)
class PaginationStream(object): class PaginationStream(object):
""" An interface for streaming data as chunks. """ """ An interface for streaming data as chunks. """
TOK_START = "START"
TOK_END = "END" TOK_END = "END"
def get_chunk(self, config=None): def get_chunk(self, config=None):
@ -76,7 +81,7 @@ class StreamData(object):
self.hs = hs self.hs = hs
self.store = hs.get_datastore() self.store = hs.get_datastore()
def get_rows(self, user_id, from_pkey, to_pkey, limit): def get_rows(self, user_id, from_pkey, to_pkey, limit, direction):
""" Get event stream data between the specified pkeys. """ Get event stream data between the specified pkeys.
Args: Args:

View file

@ -38,8 +38,8 @@ class EventsStreamData(StreamData):
self.with_feedback = feedback self.with_feedback = feedback
@defer.inlineCallbacks @defer.inlineCallbacks
def get_rows(self, user_id, from_key, to_key, limit): def get_rows(self, user_id, from_key, to_key, limit, direction):
data, latest_ver = yield self.store.get_room_events_stream( data, latest_ver = yield self.store.get_room_events(
user_id=user_id, user_id=user_id,
from_key=from_key, from_key=from_key,
to_key=to_key, to_key=to_key,
@ -70,6 +70,15 @@ class EventStream(PaginationStream):
pagination_config.from_tok) pagination_config.from_tok)
pagination_config.to_tok = yield self.fix_token( pagination_config.to_tok = yield self.fix_token(
pagination_config.to_tok) pagination_config.to_tok)
if (
not pagination_config.to_tok
and pagination_config.direction == 'f'
):
pagination_config.to_tok = yield self.get_current_max_token()
logger.debug("pagination_config: %s", pagination_config)
defer.returnValue(pagination_config) defer.returnValue(pagination_config)
@defer.inlineCallbacks @defer.inlineCallbacks
@ -81,39 +90,42 @@ class EventStream(PaginationStream):
Returns: Returns:
The fixed-up token, which may == token. The fixed-up token, which may == token.
""" """
# replace TOK_START and TOK_END with 0_0_0 or -1_-1_-1 depending. if token == PaginationStream.TOK_END:
replacements = [ new_token = yield self.get_current_max_token()
(PaginationStream.TOK_START, "0"),
(PaginationStream.TOK_END, "-1")
]
for magic_token, key in replacements:
if magic_token == token:
token = EventStream.SEPARATOR.join(
[key] * len(self.stream_data)
)
# replace -1 values with an actual pkey logger.debug("fix_token: From %s to %s", token, new_token)
token_segments = self._split_token(token)
for i, tok in enumerate(token_segments): token = new_token
if tok == -1:
# add 1 to the max token because results are EXCLUSIVE from the defer.returnValue(token)
# latest version.
token_segments[i] = 1 + (yield self.stream_data[i].max_token())
defer.returnValue(EventStream.SEPARATOR.join(
str(x) for x in token_segments
))
@defer.inlineCallbacks @defer.inlineCallbacks
def get_chunk(self, config=None): def get_current_max_token(self):
new_token_parts = []
for s in self.stream_data:
mx = yield s.max_token()
new_token_parts.append(str(mx))
new_token = EventStream.SEPARATOR.join(new_token_parts)
logger.debug("get_current_max_token: %s", new_token)
defer.returnValue(new_token)
@defer.inlineCallbacks
def get_chunk(self, config):
# no support for limit on >1 streams, makes no sense. # no support for limit on >1 streams, makes no sense.
if config.limit and len(self.stream_data) > 1: if config.limit and len(self.stream_data) > 1:
raise EventStreamError( raise EventStreamError(
400, "Limit not supported on multiplexed streams." 400, "Limit not supported on multiplexed streams."
) )
(chunk_data, next_tok) = yield self._get_chunk_data(config.from_tok, chunk_data, next_tok = yield self._get_chunk_data(
config.to_tok, config.from_tok,
config.limit) config.to_tok,
config.limit,
config.direction,
)
defer.returnValue({ defer.returnValue({
"chunk": chunk_data, "chunk": chunk_data,
@ -122,7 +134,7 @@ class EventStream(PaginationStream):
}) })
@defer.inlineCallbacks @defer.inlineCallbacks
def _get_chunk_data(self, from_tok, to_tok, limit): def _get_chunk_data(self, from_tok, to_tok, limit, direction):
""" Get event data between the two tokens. """ Get event data between the two tokens.
Tokens are SEPARATOR separated values representing pkey values of Tokens are SEPARATOR separated values representing pkey values of
@ -140,11 +152,12 @@ class EventStream(PaginationStream):
EventStreamError if something went wrong. EventStreamError if something went wrong.
""" """
# sanity check # sanity check
if (from_tok.count(EventStream.SEPARATOR) != if to_tok is not None:
to_tok.count(EventStream.SEPARATOR) or if (from_tok.count(EventStream.SEPARATOR) !=
(from_tok.count(EventStream.SEPARATOR) + 1) != to_tok.count(EventStream.SEPARATOR) or
len(self.stream_data)): (from_tok.count(EventStream.SEPARATOR) + 1) !=
raise EventStreamError(400, "Token lengths don't match.") len(self.stream_data)):
raise EventStreamError(400, "Token lengths don't match.")
chunk = [] chunk = []
next_ver = [] next_ver = []
@ -158,7 +171,7 @@ class EventStream(PaginationStream):
continue continue
(event_chunk, max_pkey) = yield self.stream_data[i].get_rows( (event_chunk, max_pkey) = yield self.stream_data[i].get_rows(
self.user_id, from_pkey, to_pkey, limit self.user_id, from_pkey, to_pkey, limit, direction,
) )
chunk.extend([ chunk.extend([
@ -177,9 +190,8 @@ class EventStream(PaginationStream):
Returns: Returns:
A list of ints. A list of ints.
""" """
segments = token.split(EventStream.SEPARATOR) if token:
try: segments = token.split(EventStream.SEPARATOR)
int_segments = [int(x) for x in segments] else:
except ValueError: segments = [None] * len(self.stream_data)
raise EventStreamError(400, "Bad token: %s" % token) return segments
return int_segments

View file

@ -684,7 +684,7 @@ class PresenceStreamData(StreamData):
super(PresenceStreamData, self).__init__(hs) super(PresenceStreamData, self).__init__(hs)
self.presence = hs.get_handlers().presence_handler self.presence = hs.get_handlers().presence_handler
def get_rows(self, user_id, from_key, to_key, limit): def get_rows(self, user_id, from_key, to_key, limit, direction):
cachemap = self.presence._user_cachemap cachemap = self.presence._user_cachemap
# TODO(paul): limit, and filter by visibility # TODO(paul): limit, and filter by visibility

View file

@ -466,7 +466,7 @@ class RoomMemberHandler(BaseHandler):
for entry in member_list for entry in member_list
] ]
chunk_data = { chunk_data = {
"start": "START", "start": "START", # FIXME (erikj): START is no longer a valid value
"end": "END", "end": "END",
"chunk": event_list "chunk": event_list
} }
@ -811,4 +811,5 @@ class RoomListHandler(BaseHandler):
@defer.inlineCallbacks @defer.inlineCallbacks
def get_public_room_list(self): def get_public_room_list(self):
chunk = yield self.store.get_rooms(is_public=True) chunk = yield self.store.get_rooms(is_public=True)
# FIXME (erikj): START is no longer a valid value
defer.returnValue({"start": "START", "end": "END", "chunk": chunk}) defer.returnValue({"start": "START", "end": "END", "chunk": chunk})

View file

@ -14,7 +14,7 @@
*/ */
CREATE TABLE IF NOT EXISTS events( CREATE TABLE IF NOT EXISTS events(
token_ordering INTEGER PRIMARY KEY AUTOINCREMENT, stream_ordering INTEGER PRIMARY KEY AUTOINCREMENT,
topological_ordering INTEGER NOT NULL, topological_ordering INTEGER NOT NULL,
event_id TEXT NOT NULL, event_id TEXT NOT NULL,
type TEXT NOT NULL, type TEXT NOT NULL,

View file

@ -16,8 +16,9 @@
from twisted.internet import defer from twisted.internet import defer
from ._base import SQLBaseStore from ._base import SQLBaseStore
from synapse.api.errors import SynapseError
from synapse.api.constants import Membership from synapse.api.constants import Membership
from synapse.util.logutils import log_function
import json import json
import logging import logging
@ -29,9 +30,96 @@ logger = logging.getLogger(__name__)
MAX_STREAM_SIZE = 1000 MAX_STREAM_SIZE = 1000
_STREAM_TOKEN = "stream"
_TOPOLOGICAL_TOKEN = "topological"
def _parse_stream_token(string):
try:
if string[0] != 's':
raise
return int(string[1:])
except:
logger.debug("Not stream token: %s", string)
raise SynapseError(400, "Invalid token")
def _parse_topological_token(string):
try:
if string[0] != 't':
raise
parts = string[1:].split('-', 1)
return (int(parts[0]), int(parts[1]))
except:
logger.debug("Not topological token: %s", string)
raise SynapseError(400, "Invalid token")
def is_stream_token(string):
try:
_parse_stream_token(string)
return True
except:
return False
def is_topological_token(string):
try:
_parse_topological_token(string)
return True
except:
return False
def _get_token_bound(token, comparison):
try:
s = _parse_stream_token(token)
return "%s %s %d" % ("stream_ordering", comparison, s)
except:
pass
try:
top, stream = _parse_topological_token(token)
return "%s %s %d AND %s %s %d" % (
"topological_ordering", comparison, top,
"stream_ordering", comparison, stream,
)
except:
pass
raise SynapseError(400, "Invalid token")
class StreamStore(SQLBaseStore): class StreamStore(SQLBaseStore):
@log_function
def get_room_events(self, user_id, from_key, to_key, room_id, limit=0,
direction='f', with_feedback=False):
is_events = (
direction == 'f'
and is_stream_token(from_key)
and to_key and is_stream_token(to_key)
)
if is_events:
return self.get_room_events_stream(
user_id=user_id,
from_key=from_key,
to_key=to_key,
room_id=room_id,
limit=limit,
with_feedback=with_feedback,
)
else:
return self.paginate_room_events(
from_key=from_key,
to_key=to_key,
room_id=room_id,
limit=limit,
with_feedback=with_feedback,
)
@defer.inlineCallbacks @defer.inlineCallbacks
@log_function
def get_room_events_stream(self, user_id, from_key, to_key, room_id, def get_room_events_stream(self, user_id, from_key, to_key, room_id,
limit=0, with_feedback=False): limit=0, with_feedback=False):
# TODO (erikj): Handle compressed feedback # TODO (erikj): Handle compressed feedback
@ -54,8 +142,8 @@ class StreamStore(SQLBaseStore):
limit = MAX_STREAM_SIZE limit = MAX_STREAM_SIZE
# From and to keys should be integers from ordering. # From and to keys should be integers from ordering.
from_key = int(from_key) from_id = _parse_stream_token(from_key)
to_key = int(to_key) to_id = _parse_stream_token(to_key)
if from_key == to_key: if from_key == to_key:
defer.returnValue(([], to_key)) defer.returnValue(([], to_key))
@ -65,41 +153,78 @@ class StreamStore(SQLBaseStore):
"SELECT * FROM events as e WHERE " "SELECT * FROM events as e WHERE "
"((room_id IN (%(current)s)) OR " "((room_id IN (%(current)s)) OR "
"(event_id IN (%(invites)s))) " "(event_id IN (%(invites)s))) "
"AND e.stream_ordering > ? AND e.stream_ordering < ? "
"ORDER BY stream_ordering ASC LIMIT %(limit)d "
) % { ) % {
"current": current_room_membership_sql, "current": current_room_membership_sql,
"invites": invites_sql, "invites": invites_sql,
"limit": limit
} }
# Constraints and ordering depend on direction.
if from_key < to_key:
sql += (
"AND e.token_ordering > ? AND e.token_ordering < ? "
"ORDER BY token_ordering ASC LIMIT %(limit)d "
) % {"limit": limit}
else:
sql += (
"AND e.token_ordering < ? "
"AND e.token_ordering > ? "
"ORDER BY e.token_ordering DESC LIMIT %(limit)d "
) % {"limit": int(limit)}
rows = yield self._execute_and_decode( rows = yield self._execute_and_decode(
sql, sql,
user_id, user_id, Membership.INVITE, from_key, to_key user_id, user_id, Membership.INVITE, from_id, to_id
) )
ret = [self._parse_event_from_row(r) for r in rows] ret = [self._parse_event_from_row(r) for r in rows]
if rows: if rows:
if from_key < to_key: key = "s%d" % max([r["stream_ordering"] for r in rows])
key = max([r["token_ordering"] for r in rows])
else:
key = min([r["token_ordering"] for r in rows])
else: else:
# Assume we didn't get anything because there was nothing to get.
key = to_key key = to_key
defer.returnValue((ret, key)) defer.returnValue((ret, key))
@defer.inlineCallbacks
@log_function
def paginate_room_events(self, room_id, from_key, to_key=None,
direction='b', limit=-1,
with_feedback=False):
# TODO (erikj): Handle compressed feedback
from_comp = '<' if direction =='b' else '>'
to_comp = '>' if direction =='b' else '<'
order = "DESC" if direction == 'b' else "ASC"
args = [room_id]
bounds = _get_token_bound(from_key, from_comp)
if to_key:
bounds = "%s AND %s" % (bounds, _get_token_bound(to_key, to_comp))
if int(limit) > 0:
args.append(int(limit))
limit_str = " LIMIT ?"
else:
limit_str = ""
sql = (
"SELECT * FROM events "
"WHERE room_id = ? AND %(bounds)s "
"ORDER BY topological_ordering %(order)s, stream_ordering %(order)s %(limit)s "
) % {"bounds": bounds, "order": order, "limit": limit_str}
rows = yield self._execute_and_decode(
sql,
*args
)
if rows:
topo = rows[-1]["topological_ordering"]
toke = rows[-1]["stream_ordering"]
next_token = "t%s-%s" % (topo, toke)
else:
# TODO (erikj): We should work out what to do here instead.
next_token = to_key if to_key else from_key
defer.returnValue(
(
[self._parse_event_from_row(r) for r in rows],
next_token
)
)
@defer.inlineCallbacks @defer.inlineCallbacks
def get_recent_events_for_room(self, room_id, limit, with_feedback=False): def get_recent_events_for_room(self, room_id, limit, with_feedback=False):
# TODO (erikj): Handle compressed feedback # TODO (erikj): Handle compressed feedback
@ -108,8 +233,8 @@ class StreamStore(SQLBaseStore):
sql = ( sql = (
"SELECT * FROM events " "SELECT * FROM events "
"WHERE room_id = ? AND token_ordering <= ? " "WHERE room_id = ? AND stream_ordering <= ? "
"ORDER BY topological_ordering, token_ordering DESC LIMIT ? " "ORDER BY topological_ordering, stream_ordering DESC LIMIT ? "
) )
rows = yield self._execute_and_decode( rows = yield self._execute_and_decode(
@ -121,12 +246,12 @@ class StreamStore(SQLBaseStore):
if rows: if rows:
topo = rows[0]["topological_ordering"] topo = rows[0]["topological_ordering"]
toke = rows[0]["token_ordering"] toke = rows[0]["stream_ordering"]
start_token = "p%s-%s" % (topo, toke) start_token = "p%s-%s" % (topo, toke)
token = (start_token, end_token) token = (start_token, end_token)
else: else:
token = ("START", end_token) token = (end_token, end_token)
defer.returnValue( defer.returnValue(
( (
@ -138,11 +263,14 @@ class StreamStore(SQLBaseStore):
@defer.inlineCallbacks @defer.inlineCallbacks
def get_room_events_max_id(self): def get_room_events_max_id(self):
res = yield self._execute_and_decode( res = yield self._execute_and_decode(
"SELECT MAX(token_ordering) as m FROM events" "SELECT MAX(stream_ordering) as m FROM events"
) )
if not res: logger.debug("get_room_events_max_id: %s", res)
defer.returnValue(0)
if not res or not res[0] or not res[0]["m"]:
defer.returnValue("s1")
return return
defer.returnValue(res[0]["m"]) key = res[0]["m"] + 1
defer.returnValue("s%d" % (key,))

View file

@ -25,7 +25,6 @@ the eventHandlerService.
angular.module('eventStreamService', []) angular.module('eventStreamService', [])
.factory('eventStreamService', ['$q', '$timeout', 'matrixService', 'eventHandlerService', function($q, $timeout, matrixService, eventHandlerService) { .factory('eventStreamService', ['$q', '$timeout', 'matrixService', 'eventHandlerService', function($q, $timeout, matrixService, eventHandlerService) {
var END = "END"; var END = "END";
var START = "START";
var TIMEOUT_MS = 30000; var TIMEOUT_MS = 30000;
var ERR_TIMEOUT_MS = 5000; var ERR_TIMEOUT_MS = 5000;

View file

@ -230,8 +230,8 @@ angular.module('matrixService', [])
path = path.replace("$room_id", room_id); path = path.replace("$room_id", room_id);
var params = { var params = {
from: from_token, from: from_token,
to: "START", limit: limit,
limit: limit dir: 'b'
}; };
return doRequest("GET", path, params); return doRequest("GET", path, params);
}, },