forked from MirrorHub/synapse
Merge pull request #3193 from matrix-org/erikj/pagination_refactor
Refactor /context to reuse pagination storage functions
This commit is contained in:
commit
1aeb5e28a9
1 changed files with 99 additions and 110 deletions
|
@ -42,7 +42,7 @@ from synapse.util.caches.descriptors import cached
|
||||||
from synapse.types import RoomStreamToken
|
from synapse.types import RoomStreamToken
|
||||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||||
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
|
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
|
||||||
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
|
from synapse.storage.engines import PostgresEngine
|
||||||
|
|
||||||
import abc
|
import abc
|
||||||
import logging
|
import logging
|
||||||
|
@ -595,88 +595,28 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
retcols=["stream_ordering", "topological_ordering"],
|
retcols=["stream_ordering", "topological_ordering"],
|
||||||
)
|
)
|
||||||
|
|
||||||
token = RoomStreamToken(
|
# Paginating backwards includes the event at the token, but paginating
|
||||||
|
# forward doesn't.
|
||||||
|
before_token = RoomStreamToken(
|
||||||
|
results["topological_ordering"] - 1,
|
||||||
|
results["stream_ordering"],
|
||||||
|
)
|
||||||
|
|
||||||
|
after_token = RoomStreamToken(
|
||||||
results["topological_ordering"],
|
results["topological_ordering"],
|
||||||
results["stream_ordering"],
|
results["stream_ordering"],
|
||||||
)
|
)
|
||||||
|
|
||||||
if isinstance(self.database_engine, Sqlite3Engine):
|
rows, start_token = self._paginate_room_events_txn(
|
||||||
# SQLite3 doesn't optimise ``(x < a) OR (x = a AND y < b)``
|
txn, room_id, before_token, direction='b', limit=before_limit,
|
||||||
# So we give pass it to SQLite3 as the UNION ALL of the two queries.
|
|
||||||
|
|
||||||
query_before = (
|
|
||||||
"SELECT topological_ordering, stream_ordering, event_id FROM events"
|
|
||||||
" WHERE room_id = ? AND topological_ordering < ?"
|
|
||||||
" UNION ALL"
|
|
||||||
" SELECT topological_ordering, stream_ordering, event_id FROM events"
|
|
||||||
" WHERE room_id = ? AND topological_ordering = ? AND stream_ordering < ?"
|
|
||||||
" ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ?"
|
|
||||||
)
|
)
|
||||||
before_args = (
|
|
||||||
room_id, token.topological,
|
|
||||||
room_id, token.topological, token.stream,
|
|
||||||
before_limit,
|
|
||||||
)
|
|
||||||
|
|
||||||
query_after = (
|
|
||||||
"SELECT topological_ordering, stream_ordering, event_id FROM events"
|
|
||||||
" WHERE room_id = ? AND topological_ordering > ?"
|
|
||||||
" UNION ALL"
|
|
||||||
" SELECT topological_ordering, stream_ordering, event_id FROM events"
|
|
||||||
" WHERE room_id = ? AND topological_ordering = ? AND stream_ordering > ?"
|
|
||||||
" ORDER BY topological_ordering ASC, stream_ordering ASC LIMIT ?"
|
|
||||||
)
|
|
||||||
after_args = (
|
|
||||||
room_id, token.topological,
|
|
||||||
room_id, token.topological, token.stream,
|
|
||||||
after_limit,
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
query_before = (
|
|
||||||
"SELECT topological_ordering, stream_ordering, event_id FROM events"
|
|
||||||
" WHERE room_id = ? AND %s"
|
|
||||||
" ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ?"
|
|
||||||
) % (upper_bound(token, self.database_engine, inclusive=False),)
|
|
||||||
|
|
||||||
before_args = (room_id, before_limit)
|
|
||||||
|
|
||||||
query_after = (
|
|
||||||
"SELECT topological_ordering, stream_ordering, event_id FROM events"
|
|
||||||
" WHERE room_id = ? AND %s"
|
|
||||||
" ORDER BY topological_ordering ASC, stream_ordering ASC LIMIT ?"
|
|
||||||
) % (lower_bound(token, self.database_engine, inclusive=False),)
|
|
||||||
|
|
||||||
after_args = (room_id, after_limit)
|
|
||||||
|
|
||||||
txn.execute(query_before, before_args)
|
|
||||||
|
|
||||||
rows = self.cursor_to_dict(txn)
|
|
||||||
events_before = [r["event_id"] for r in rows]
|
events_before = [r["event_id"] for r in rows]
|
||||||
|
|
||||||
if rows:
|
rows, end_token = self._paginate_room_events_txn(
|
||||||
start_token = str(RoomStreamToken(
|
txn, room_id, after_token, direction='f', limit=after_limit,
|
||||||
rows[0]["topological_ordering"],
|
)
|
||||||
rows[0]["stream_ordering"] - 1,
|
|
||||||
))
|
|
||||||
else:
|
|
||||||
start_token = str(RoomStreamToken(
|
|
||||||
token.topological,
|
|
||||||
token.stream - 1,
|
|
||||||
))
|
|
||||||
|
|
||||||
txn.execute(query_after, after_args)
|
|
||||||
|
|
||||||
rows = self.cursor_to_dict(txn)
|
|
||||||
events_after = [r["event_id"] for r in rows]
|
events_after = [r["event_id"] for r in rows]
|
||||||
|
|
||||||
if rows:
|
|
||||||
end_token = str(RoomStreamToken(
|
|
||||||
rows[-1]["topological_ordering"],
|
|
||||||
rows[-1]["stream_ordering"],
|
|
||||||
))
|
|
||||||
else:
|
|
||||||
end_token = str(token)
|
|
||||||
|
|
||||||
return {
|
return {
|
||||||
"before": {
|
"before": {
|
||||||
"event_ids": events_before,
|
"event_ids": events_before,
|
||||||
|
@ -738,17 +678,28 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
def has_room_changed_since(self, room_id, stream_id):
|
def has_room_changed_since(self, room_id, stream_id):
|
||||||
return self._events_stream_cache.has_entity_changed(room_id, stream_id)
|
return self._events_stream_cache.has_entity_changed(room_id, stream_id)
|
||||||
|
|
||||||
|
def _paginate_room_events_txn(self, txn, room_id, from_token, to_token=None,
|
||||||
class StreamStore(StreamWorkerStore):
|
|
||||||
def get_room_max_stream_ordering(self):
|
|
||||||
return self._stream_id_gen.get_current_token()
|
|
||||||
|
|
||||||
def get_room_min_stream_ordering(self):
|
|
||||||
return self._backfill_id_gen.get_current_token()
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def paginate_room_events(self, room_id, from_key, to_key=None,
|
|
||||||
direction='b', limit=-1, event_filter=None):
|
direction='b', limit=-1, event_filter=None):
|
||||||
|
"""Returns list of events before or after a given token.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
txn
|
||||||
|
room_id (str)
|
||||||
|
from_token (RoomStreamToken): The token used to stream from
|
||||||
|
to_token (RoomStreamToken|None): A token which if given limits the
|
||||||
|
results to only those before
|
||||||
|
direction(char): Either 'b' or 'f' to indicate whether we are
|
||||||
|
paginating forwards or backwards from `from_key`.
|
||||||
|
limit (int): The maximum number of events to return. Zero or less
|
||||||
|
means no limit.
|
||||||
|
event_filter (Filter|None): If provided filters the events to
|
||||||
|
those that match the filter.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
tuple[list[dict], str]: Returns the results as a list of dicts and
|
||||||
|
a token that points to the end of the result set. The dicts have
|
||||||
|
the keys "event_id", "toplogical_ordering" and "stream_ordering".
|
||||||
|
"""
|
||||||
# Tokens really represent positions between elements, but we use
|
# Tokens really represent positions between elements, but we use
|
||||||
# the convention of pointing to the event before the gap. Hence
|
# the convention of pointing to the event before the gap. Hence
|
||||||
# we have a bit of asymmetry when it comes to equalities.
|
# we have a bit of asymmetry when it comes to equalities.
|
||||||
|
@ -756,20 +707,20 @@ class StreamStore(StreamWorkerStore):
|
||||||
if direction == 'b':
|
if direction == 'b':
|
||||||
order = "DESC"
|
order = "DESC"
|
||||||
bounds = upper_bound(
|
bounds = upper_bound(
|
||||||
RoomStreamToken.parse(from_key), self.database_engine
|
from_token, self.database_engine
|
||||||
)
|
)
|
||||||
if to_key:
|
if to_token:
|
||||||
bounds = "%s AND %s" % (bounds, lower_bound(
|
bounds = "%s AND %s" % (bounds, lower_bound(
|
||||||
RoomStreamToken.parse(to_key), self.database_engine
|
to_token, self.database_engine
|
||||||
))
|
))
|
||||||
else:
|
else:
|
||||||
order = "ASC"
|
order = "ASC"
|
||||||
bounds = lower_bound(
|
bounds = lower_bound(
|
||||||
RoomStreamToken.parse(from_key), self.database_engine
|
from_token, self.database_engine
|
||||||
)
|
)
|
||||||
if to_key:
|
if to_token:
|
||||||
bounds = "%s AND %s" % (bounds, upper_bound(
|
bounds = "%s AND %s" % (bounds, upper_bound(
|
||||||
RoomStreamToken.parse(to_key), self.database_engine
|
to_token, self.database_engine
|
||||||
))
|
))
|
||||||
|
|
||||||
filter_clause, filter_args = filter_to_clause(event_filter)
|
filter_clause, filter_args = filter_to_clause(event_filter)
|
||||||
|
@ -785,7 +736,8 @@ class StreamStore(StreamWorkerStore):
|
||||||
limit_str = ""
|
limit_str = ""
|
||||||
|
|
||||||
sql = (
|
sql = (
|
||||||
"SELECT * FROM events"
|
"SELECT event_id, topological_ordering, stream_ordering"
|
||||||
|
" FROM events"
|
||||||
" WHERE outlier = ? AND room_id = ? AND %(bounds)s"
|
" WHERE outlier = ? AND room_id = ? AND %(bounds)s"
|
||||||
" ORDER BY topological_ordering %(order)s,"
|
" ORDER BY topological_ordering %(order)s,"
|
||||||
" stream_ordering %(order)s %(limit)s"
|
" stream_ordering %(order)s %(limit)s"
|
||||||
|
@ -795,7 +747,6 @@ class StreamStore(StreamWorkerStore):
|
||||||
"limit": limit_str
|
"limit": limit_str
|
||||||
}
|
}
|
||||||
|
|
||||||
def f(txn):
|
|
||||||
txn.execute(sql, args)
|
txn.execute(sql, args)
|
||||||
|
|
||||||
rows = self.cursor_to_dict(txn)
|
rows = self.cursor_to_dict(txn)
|
||||||
|
@ -810,14 +761,44 @@ class StreamStore(StreamWorkerStore):
|
||||||
# when we are going backwards so we subtract one from the
|
# when we are going backwards so we subtract one from the
|
||||||
# stream part.
|
# stream part.
|
||||||
toke -= 1
|
toke -= 1
|
||||||
next_token = str(RoomStreamToken(topo, toke))
|
next_token = RoomStreamToken(topo, toke)
|
||||||
else:
|
else:
|
||||||
# TODO (erikj): We should work out what to do here instead.
|
# TODO (erikj): We should work out what to do here instead.
|
||||||
next_token = to_key if to_key else from_key
|
next_token = to_token if to_token else from_token
|
||||||
|
|
||||||
return rows, next_token,
|
return rows, str(next_token),
|
||||||
|
|
||||||
rows, token = yield self.runInteraction("paginate_room_events", f)
|
@defer.inlineCallbacks
|
||||||
|
def paginate_room_events(self, room_id, from_key, to_key=None,
|
||||||
|
direction='b', limit=-1, event_filter=None):
|
||||||
|
"""Returns list of events before or after a given token.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
room_id (str)
|
||||||
|
from_key (str): The token used to stream from
|
||||||
|
to_key (str|None): A token which if given limits the results to
|
||||||
|
only those before
|
||||||
|
direction(char): Either 'b' or 'f' to indicate whether we are
|
||||||
|
paginating forwards or backwards from `from_key`.
|
||||||
|
limit (int): The maximum number of events to return. Zero or less
|
||||||
|
means no limit.
|
||||||
|
event_filter (Filter|None): If provided filters the events to
|
||||||
|
those that match the filter.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
tuple[list[dict], str]: Returns the results as a list of dicts and
|
||||||
|
a token that points to the end of the result set. The dicts have
|
||||||
|
the keys "event_id", "toplogical_ordering" and "stream_orderign".
|
||||||
|
"""
|
||||||
|
|
||||||
|
from_key = RoomStreamToken.parse(from_key)
|
||||||
|
if to_key:
|
||||||
|
to_key = RoomStreamToken.parse(to_key)
|
||||||
|
|
||||||
|
rows, token = yield self.runInteraction(
|
||||||
|
"paginate_room_events", self._paginate_room_events_txn,
|
||||||
|
room_id, from_key, to_key, direction, limit, event_filter,
|
||||||
|
)
|
||||||
|
|
||||||
events = yield self._get_events(
|
events = yield self._get_events(
|
||||||
[r["event_id"] for r in rows],
|
[r["event_id"] for r in rows],
|
||||||
|
@ -827,3 +808,11 @@ class StreamStore(StreamWorkerStore):
|
||||||
self._set_before_and_after(events, rows)
|
self._set_before_and_after(events, rows)
|
||||||
|
|
||||||
defer.returnValue((events, token))
|
defer.returnValue((events, token))
|
||||||
|
|
||||||
|
|
||||||
|
class StreamStore(StreamWorkerStore):
|
||||||
|
def get_room_max_stream_ordering(self):
|
||||||
|
return self._stream_id_gen.get_current_token()
|
||||||
|
|
||||||
|
def get_room_min_stream_ordering(self):
|
||||||
|
return self._backfill_id_gen.get_current_token()
|
||||||
|
|
Loading…
Reference in a new issue