diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 58e2d25f9..a71cba8ef 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -1008,15 +1008,30 @@ class RoomEventSource(object): limit=limit, ) else: - events, end_key = yield self.store.get_room_events_stream( - user_id=user.to_string(), + room_events = yield self.store.get_room_changes_for_user( + user.to_string(), from_key, to_key + ) + + room_to_events = yield self.store.get_room_events_stream_for_rooms( + room_ids=room_ids, from_key=from_key, to_key=to_key, - limit=limit, - room_ids=room_ids, - is_guest=is_guest, + limit=limit or 10, ) + events = list(room_events) + events.extend(e for evs, _ in room_to_events.values() for e in evs) + + events.sort(key=lambda e: e.internal_metadata.order) + + if limit: + events[:] = events[:limit] + + if events: + end_key = events[-1].internal_metadata.after + else: + end_key = to_key + defer.returnValue((events, end_key)) def get_current_key(self, direction='f'): diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 8dc8f5c64..a03458c2f 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -39,7 +39,6 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cachedInlineCallbacks from synapse.api.constants import EventTypes from synapse.types import RoomStreamToken -from synapse.util.logutils import log_function import logging @@ -288,11 +287,12 @@ class StreamStore(SQLBaseStore): get_prev_content=True ) + self._set_before_and_after(ret, rows, topo_order=False) + return ret return self.runInteraction("get_room_changes_for_user", f) - @log_function def get_room_events_stream( self, user_id, @@ -598,6 +598,10 @@ class StreamStore(SQLBaseStore): internal = event.internal_metadata internal.before = str(RoomStreamToken(topo, stream - 1)) internal.after = str(RoomStreamToken(topo, stream)) + internal.order = ( + int(topo) if topo else 0, + int(stream), + ) @defer.inlineCallbacks def get_events_around(self, room_id, event_id, before_limit, after_limit):