forked from MirrorHub/synapse
Merge branch 'erikj/refactor_pagination_bounds' into erikj/reactions_base
This commit is contained in:
commit
a9fc71c372
5 changed files with 120 additions and 61 deletions
1
changelog.d/5191.misc
Normal file
1
changelog.d/5191.misc
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Make generating SQL bounds for pagination generic.
|
|
@ -171,7 +171,7 @@ class EventsWorkerStore(SQLBaseStore):
|
||||||
allow_rejected (bool): If True return rejected events.
|
allow_rejected (bool): If True return rejected events.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Deferred[list]: List of events fetched from the database. The
|
Deferred[list[EventBase]]: List of events fetched from the database. The
|
||||||
events are in the same order as `event_ids` arg.
|
events are in the same order as `event_ids` arg.
|
||||||
|
|
||||||
Note that the returned list may be smaller than the list of event
|
Note that the returned list may be smaller than the list of event
|
||||||
|
|
|
@ -64,59 +64,120 @@ _EventDictReturn = namedtuple(
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def lower_bound(token, engine, inclusive=False):
|
def generate_pagination_where_clause(
|
||||||
inclusive = "=" if inclusive else ""
|
direction, column_names, from_token, to_token, engine,
|
||||||
if token.topological is None:
|
):
|
||||||
return "(%d <%s %s)" % (token.stream, inclusive, "stream_ordering")
|
"""Creates an SQL expression to bound the columns by the pagination
|
||||||
else:
|
tokens.
|
||||||
if isinstance(engine, PostgresEngine):
|
|
||||||
# Postgres doesn't optimise ``(x < a) OR (x=a AND y<b)`` as well
|
For example creates an SQL expression like:
|
||||||
# as it optimises ``(x,y) < (a,b)`` on multicolumn indexes. So we
|
|
||||||
# use the later form when running against postgres.
|
(6, 7) >= (topological_ordering, stream_ordering)
|
||||||
return "((%d,%d) <%s (%s,%s))" % (
|
AND (5, 3) < (topological_ordering, stream_ordering)
|
||||||
token.topological,
|
|
||||||
token.stream,
|
would be generated for dir=b, from_token=(6, 7) and to_token=(5, 3).
|
||||||
inclusive,
|
|
||||||
"topological_ordering",
|
Args:
|
||||||
"stream_ordering",
|
direction (str): Whether we're paginating backwards("b") or
|
||||||
|
forwards ("f").
|
||||||
|
column_names (tuple[str, str]): The column names to bound. Must *not*
|
||||||
|
be user defined as these get inserted directly into the SQL
|
||||||
|
statement without escapes.
|
||||||
|
from_token (tuple[int, int]|None)
|
||||||
|
to_token (tuple[int, int]|None)
|
||||||
|
engine: The database engine to generate the clauses for
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
str: The sql expression
|
||||||
|
"""
|
||||||
|
assert direction in ("b", "f")
|
||||||
|
|
||||||
|
where_clause = []
|
||||||
|
if from_token:
|
||||||
|
where_clause.append(
|
||||||
|
_make_generic_sql_bound(
|
||||||
|
bound=">=" if direction == "b" else "<",
|
||||||
|
column_names=column_names,
|
||||||
|
values=from_token,
|
||||||
|
engine=engine,
|
||||||
)
|
)
|
||||||
return "(%d < %s OR (%d = %s AND %d <%s %s))" % (
|
|
||||||
token.topological,
|
|
||||||
"topological_ordering",
|
|
||||||
token.topological,
|
|
||||||
"topological_ordering",
|
|
||||||
token.stream,
|
|
||||||
inclusive,
|
|
||||||
"stream_ordering",
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if to_token:
|
||||||
def upper_bound(token, engine, inclusive=True):
|
where_clause.append(
|
||||||
inclusive = "=" if inclusive else ""
|
_make_generic_sql_bound(
|
||||||
if token.topological is None:
|
bound="<" if direction == "b" else ">=",
|
||||||
return "(%d >%s %s)" % (token.stream, inclusive, "stream_ordering")
|
column_names=column_names,
|
||||||
else:
|
values=to_token,
|
||||||
if isinstance(engine, PostgresEngine):
|
engine=engine,
|
||||||
# Postgres doesn't optimise ``(x > a) OR (x=a AND y>b)`` as well
|
|
||||||
# as it optimises ``(x,y) > (a,b)`` on multicolumn indexes. So we
|
|
||||||
# use the later form when running against postgres.
|
|
||||||
return "((%d,%d) >%s (%s,%s))" % (
|
|
||||||
token.topological,
|
|
||||||
token.stream,
|
|
||||||
inclusive,
|
|
||||||
"topological_ordering",
|
|
||||||
"stream_ordering",
|
|
||||||
)
|
)
|
||||||
return "(%d > %s OR (%d = %s AND %d >%s %s))" % (
|
|
||||||
token.topological,
|
|
||||||
"topological_ordering",
|
|
||||||
token.topological,
|
|
||||||
"topological_ordering",
|
|
||||||
token.stream,
|
|
||||||
inclusive,
|
|
||||||
"stream_ordering",
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
return " AND ".join(where_clause)
|
||||||
|
|
||||||
|
|
||||||
|
def _make_generic_sql_bound(bound, column_names, values, engine):
|
||||||
|
"""Create an SQL expression that bounds the given column names by the
|
||||||
|
values, e.g. create the equivalent of `(1, 2) < (col1, col2)`.
|
||||||
|
|
||||||
|
Only works with two columns.
|
||||||
|
|
||||||
|
Older versions of SQLite don't support that syntax so we have to expand it
|
||||||
|
out manually.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
bound (str): The comparison operator to use. One of ">", "<", ">=",
|
||||||
|
"<=", where the values are on the left and columns on the right.
|
||||||
|
names (tuple[str, str]): The column names. Must *not* be user defined
|
||||||
|
as these get inserted directly into the SQL statement without
|
||||||
|
escapes.
|
||||||
|
values (tuple[int, int]): The values to bound the columns by.
|
||||||
|
engine: The database engine to generate the SQL for
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
str
|
||||||
|
"""
|
||||||
|
|
||||||
|
assert(bound in (">", "<", ">=", "<="))
|
||||||
|
|
||||||
|
name1, name2 = column_names
|
||||||
|
val1, val2 = values
|
||||||
|
|
||||||
|
if val1 is None:
|
||||||
|
val2 = int(val2)
|
||||||
|
return "(%d %s %s)" % (val2, bound, name2)
|
||||||
|
|
||||||
|
val1 = int(val1)
|
||||||
|
val2 = int(val2)
|
||||||
|
|
||||||
|
if isinstance(engine, PostgresEngine):
|
||||||
|
# Postgres doesn't optimise ``(x < a) OR (x=a AND y<b)`` as well
|
||||||
|
# as it optimises ``(x,y) < (a,b)`` on multicolumn indexes. So we
|
||||||
|
# use the later form when running against postgres.
|
||||||
|
return "((%d,%d) %s (%s,%s))" % (
|
||||||
|
val1, val2,
|
||||||
|
bound,
|
||||||
|
name1, name2,
|
||||||
|
)
|
||||||
|
|
||||||
|
# We want to generate queries of e.g. the form:
|
||||||
|
#
|
||||||
|
# (val1 < name1 OR (val1 = name1 AND val2 <= name2))
|
||||||
|
#
|
||||||
|
# which is equivalent to (val1, val2) < (name1, name2)
|
||||||
|
|
||||||
|
return """(
|
||||||
|
{val1:d} {strict_bound} {name1}
|
||||||
|
OR ({val1:d} = {name1} AND {val2:d} {bound} {name2})
|
||||||
|
)""".format(
|
||||||
|
name1=name1,
|
||||||
|
val1=val1,
|
||||||
|
name2=name2,
|
||||||
|
val2=val2,
|
||||||
|
strict_bound=bound[0], # The first bound must always be strict equality here
|
||||||
|
bound=bound,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def filter_to_clause(event_filter):
|
def filter_to_clause(event_filter):
|
||||||
# NB: This may create SQL clauses that don't optimise well (and we don't
|
# NB: This may create SQL clauses that don't optimise well (and we don't
|
||||||
|
@ -762,20 +823,16 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
args = [False, room_id]
|
args = [False, room_id]
|
||||||
if direction == 'b':
|
if direction == 'b':
|
||||||
order = "DESC"
|
order = "DESC"
|
||||||
bounds = upper_bound(from_token, self.database_engine)
|
|
||||||
if to_token:
|
|
||||||
bounds = "%s AND %s" % (
|
|
||||||
bounds,
|
|
||||||
lower_bound(to_token, self.database_engine),
|
|
||||||
)
|
|
||||||
else:
|
else:
|
||||||
order = "ASC"
|
order = "ASC"
|
||||||
bounds = lower_bound(from_token, self.database_engine)
|
|
||||||
if to_token:
|
bounds = generate_pagination_where_clause(
|
||||||
bounds = "%s AND %s" % (
|
direction=direction,
|
||||||
bounds,
|
column_names=("topological_ordering", "stream_ordering"),
|
||||||
upper_bound(to_token, self.database_engine),
|
from_token=from_token,
|
||||||
)
|
to_token=to_token,
|
||||||
|
engine=self.database_engine,
|
||||||
|
)
|
||||||
|
|
||||||
filter_clause, filter_args = filter_to_clause(event_filter)
|
filter_clause, filter_args = filter_to_clause(event_filter)
|
||||||
|
|
||||||
|
|
|
@ -166,7 +166,7 @@ def yieldable_gather_results(func, iter, *args, **kwargs):
|
||||||
*args: Arguments to be passed to each call to func
|
*args: Arguments to be passed to each call to func
|
||||||
|
|
||||||
Returns
|
Returns
|
||||||
Deferred: Resolved when all functions have been invoked, or errors if
|
Deferred[list]: Resolved when all functions have been invoked, or errors if
|
||||||
one of the function calls fails.
|
one of the function calls fails.
|
||||||
"""
|
"""
|
||||||
return logcontext.make_deferred_yieldable(defer.gatherResults([
|
return logcontext.make_deferred_yieldable(defer.gatherResults([
|
||||||
|
|
|
@ -27,6 +27,7 @@ def user_left_room(distributor, user, room_id):
|
||||||
distributor.fire("user_left_room", user=user, room_id=room_id)
|
distributor.fire("user_left_room", user=user, room_id=room_id)
|
||||||
|
|
||||||
|
|
||||||
|
# XXX: this is no longer used. We should probably kill it.
|
||||||
def user_joined_room(distributor, user, room_id):
|
def user_joined_room(distributor, user, room_id):
|
||||||
distributor.fire("user_joined_room", user=user, room_id=room_id)
|
distributor.fire("user_joined_room", user=user, room_id=room_id)
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue