forked from MirrorHub/synapse
Make get_current_token accept a direction parameter, which tells whether the source whether we want a token for going 'forwards' or 'backwards'
This commit is contained in:
parent
84e6b4001f
commit
4df11b5039
4 changed files with 26 additions and 8 deletions
|
@ -89,7 +89,9 @@ class MessageHandler(BaseHandler):
|
|||
|
||||
if not pagin_config.from_token:
|
||||
pagin_config.from_token = (
|
||||
yield self.hs.get_event_sources().get_current_token()
|
||||
yield self.hs.get_event_sources().get_current_token(
|
||||
direction='b'
|
||||
)
|
||||
)
|
||||
|
||||
room_token = RoomStreamToken.parse(pagin_config.from_token.room_key)
|
||||
|
|
|
@ -577,8 +577,8 @@ class RoomEventSource(object):
|
|||
|
||||
defer.returnValue((events, end_key))
|
||||
|
||||
def get_current_key(self):
|
||||
return self.store.get_room_events_max_id()
|
||||
def get_current_key(self, direction='f'):
|
||||
return self.store.get_room_events_max_id(direction)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_pagination_rows(self, user, config, key):
|
||||
|
|
|
@ -364,9 +364,25 @@ class StreamStore(SQLBaseStore):
|
|||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_room_events_max_id(self):
|
||||
def get_room_events_max_id(self, direction='f'):
|
||||
token = yield self._stream_id_gen.get_max_token(self)
|
||||
defer.returnValue("s%d" % (token,))
|
||||
if direction != 'b':
|
||||
defer.returnValue("s%d" % (token,))
|
||||
else:
|
||||
topo = yield self.runInteraction(
|
||||
"_get_max_topological_txn", self._get_max_topological_txn
|
||||
)
|
||||
defer.returnValue("t%d-%d" % (topo, token))
|
||||
|
||||
def _get_max_topological_txn(self, txn):
|
||||
txn.execute(
|
||||
"SELECT MAX(topological_ordering) FROM events"
|
||||
" WHERE outlier = ?",
|
||||
(False,)
|
||||
)
|
||||
|
||||
rows = txn.fetchall()
|
||||
return rows[0][0] if rows else 0
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _get_min_token(self):
|
||||
|
|
|
@ -31,7 +31,7 @@ class NullSource(object):
|
|||
def get_new_events_for_user(self, user, from_key, limit):
|
||||
return defer.succeed(([], from_key))
|
||||
|
||||
def get_current_key(self):
|
||||
def get_current_key(self, direction='f'):
|
||||
return defer.succeed(0)
|
||||
|
||||
def get_pagination_rows(self, user, pagination_config, key):
|
||||
|
@ -52,10 +52,10 @@ class EventSources(object):
|
|||
}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_current_token(self):
|
||||
def get_current_token(self, direction='f'):
|
||||
token = StreamToken(
|
||||
room_key=(
|
||||
yield self.sources["room"].get_current_key()
|
||||
yield self.sources["room"].get_current_key(direction)
|
||||
),
|
||||
presence_key=(
|
||||
yield self.sources["presence"].get_current_key()
|
||||
|
|
Loading…
Reference in a new issue