Generate historic pagination token for /messages when no ?from token provided (#12370)

This commit is contained in:
Eric Eastwood 2022-04-06 05:40:28 -05:00 committed by GitHub
parent 573cd0f92f
commit 793d03e2c5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 26 additions and 14 deletions

1
changelog.d/12370.bugfix Normal file
View file

@ -0,0 +1 @@
Fix `/messages` returning backfilled and [MSC2716](https://github.com/matrix-org/synapse/pull/12319) historic messages our of order.

View file

@ -441,7 +441,14 @@ class PaginationHandler:
if pagin_config.from_token: if pagin_config.from_token:
from_token = pagin_config.from_token from_token = pagin_config.from_token
else: else:
from_token = self.hs.get_event_sources().get_current_token_for_pagination() from_token = (
await self.hs.get_event_sources().get_current_token_for_pagination(
room_id
)
)
# We expect `/messages` to use historic pagination tokens by default but
# `/messages` should still works with live tokens when manually provided.
assert from_token.room_key.topological
if pagin_config.limit is None: if pagin_config.limit is None:
# This shouldn't happen as we've set a default limit before this # This shouldn't happen as we've set a default limit before this

View file

@ -1444,8 +1444,8 @@ class RoomEventSource(EventSource[RoomStreamToken, EventBase]):
def get_current_key(self) -> RoomStreamToken: def get_current_key(self) -> RoomStreamToken:
return self.store.get_room_max_token() return self.store.get_room_max_token()
def get_current_key_for_room(self, room_id: str) -> Awaitable[str]: def get_current_key_for_room(self, room_id: str) -> Awaitable[RoomStreamToken]:
return self.store.get_room_events_max_id(room_id) return self.store.get_current_room_stream_token_for_room_id(room_id)
class ShutdownRoomResponse(TypedDict): class ShutdownRoomResponse(TypedDict):

View file

@ -748,21 +748,23 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
"get_room_event_before_stream_ordering", _f "get_room_event_before_stream_ordering", _f
) )
async def get_room_events_max_id(self, room_id: Optional[str] = None) -> str: async def get_current_room_stream_token_for_room_id(
"""Returns the current token for rooms stream. self, room_id: Optional[str] = None
) -> RoomStreamToken:
"""Returns the current position of the rooms stream.
By default, it returns the current global stream token. Specifying a By default, it returns a live token with the current global stream
`room_id` causes it to return the current room specific topological token. Specifying a `room_id` causes it to return a historic token with
token. the room specific topological token.
""" """
token = self.get_room_max_stream_ordering() stream_ordering = self.get_room_max_stream_ordering()
if room_id is None: if room_id is None:
return "s%d" % (token,) return RoomStreamToken(None, stream_ordering)
else: else:
topo = await self.db_pool.runInteraction( topo = await self.db_pool.runInteraction(
"_get_max_topological_txn", self._get_max_topological_txn, room_id "_get_max_topological_txn", self._get_max_topological_txn, room_id
) )
return "t%d-%d" % (topo, token) return RoomStreamToken(topo, stream_ordering)
def get_stream_id_for_event_txn( def get_stream_id_for_event_txn(
self, self,

View file

@ -69,7 +69,7 @@ class EventSources:
) )
return token return token
def get_current_token_for_pagination(self) -> StreamToken: async def get_current_token_for_pagination(self, room_id: str) -> StreamToken:
"""Get the current token for a given room to be used to paginate """Get the current token for a given room to be used to paginate
events. events.
@ -80,7 +80,7 @@ class EventSources:
The current token for pagination. The current token for pagination.
""" """
token = StreamToken( token = StreamToken(
room_key=self.sources.room.get_current_key(), room_key=await self.sources.room.get_current_key_for_room(room_id),
presence_key=0, presence_key=0,
typing_key=0, typing_key=0,
receipt_key=0, receipt_key=0,

View file

@ -110,7 +110,9 @@ class PaginationTestCase(HomeserverTestCase):
def _filter_messages(self, filter: JsonDict) -> List[EventBase]: def _filter_messages(self, filter: JsonDict) -> List[EventBase]:
"""Make a request to /messages with a filter, returns the chunk of events.""" """Make a request to /messages with a filter, returns the chunk of events."""
from_token = self.hs.get_event_sources().get_current_token_for_pagination() from_token = self.get_success(
self.hs.get_event_sources().get_current_token_for_pagination(self.room_id)
)
events, next_key = self.get_success( events, next_key = self.get_success(
self.hs.get_datastores().main.paginate_room_events( self.hs.get_datastores().main.paginate_room_events(